From f911d836968765bc837355fee1c44e36b7549bd1 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 3 Dec 2024 10:53:04 +0100 Subject: [PATCH 1/4] injector: add order to injector operation type --- src/lib_injector/injector_operation.ml | 73 +++++++++++++++---- src/lib_injector/injector_server_operation.ml | 4 + .../injector_server_operation.mli | 2 + src/lib_injector/injector_sigs.ml | 16 +++- src/lib_smart_rollup/operation_kind.ml | 12 +++ src/lib_smart_rollup/operation_kind.mli | 8 ++ src/lib_smart_rollup_node/injector.ml | 26 ++++--- src/lib_smart_rollup_node/rpc_directory.ml | 9 ++- 8 files changed, 121 insertions(+), 29 deletions(-) diff --git a/src/lib_injector/injector_operation.ml b/src/lib_injector/injector_operation.ml index 27436fff64b3..a1744c9f3d56 100644 --- a/src/lib_injector/injector_operation.ml +++ b/src/lib_injector/injector_operation.ml @@ -33,7 +33,13 @@ module Make (O : PARAM_OPERATION) : type errors = {count : int; last_error : tztrace option} - type t = {id : id; operation : O.t; mutable errors : errors} + type t = { + id : id; + order : Z.t option; + counter : Z.t; + operation : O.t; + mutable errors : errors; + } let hash_inner_operation nonce op = Id.hash_string @@ -46,16 +52,14 @@ module Make (O : PARAM_OPERATION) : let no_errors = {count = 0; last_error = None} - let make operation = - let nonce = - if not @@ O.unique operation then ( - let c = !counter in - counter := Z.succ !counter ; - Some c) - else None + let make ?order operation = + let nonce_for_hash_id = + if not @@ O.unique operation then Some !counter else None in - let id = hash_inner_operation nonce operation in - {id; operation; errors = no_errors} + let id = hash_inner_operation nonce_for_hash_id operation in + let op = {id; order; counter = !counter; operation; errors = no_errors} in + counter := Z.succ !counter ; + op let errors_encoding = let open Data_encoding in @@ -67,20 +71,59 @@ module Make (O : PARAM_OPERATION) : let encoding = let open Data_encoding in conv - (fun {id; operation; errors} -> (id, operation, errors)) - (fun (id, operation, errors) -> {id; operation; errors}) - @@ obj3 + (fun {id; order; counter; operation; errors} -> + (id, order, counter, operation, errors)) + (fun (id, order, counter, operation, errors) -> + {id; order; counter; operation; errors}) + @@ obj5 (req "id" Id.encoding) + (opt "order" n) + (req "counter" n) (req "operation" O.encoding) (dft "errors" errors_encoding no_errors) - let pp ppf {id; operation; errors} = + let pp ppf {id; order; counter = _; operation; errors} = let pp_errors ppf errors = if errors.count = 0 then () else Format.fprintf ppf " [%d errors]" errors.count in - Format.fprintf ppf "%a (%a)%a" O.pp operation Id.pp id pp_errors errors + let pp_order = + Format.pp_print_option @@ fun ppf -> + Format.fprintf ppf " [priority order %a]" Z.pp_print + in + Format.fprintf + ppf + "%a%a (%a)%a" + O.pp + operation + pp_order + order + Id.pp + id + pp_errors + errors let register_error op error_trace = op.errors <- {count = op.errors.count + 1; last_error = Some error_trace} + + let id {id; _} = id + + (* Compare operations with the following logic: + + - Operations without an explicit `order` or lesser than operations with + one + - The value of `order` is used to compare two operations with `order` set; + - Otherwise, use the timestamp to compare *) + let compare op1 op2 = + let op_compare = O.compare op1.operation op2.operation in + if not (op_compare = 0) then op_compare + else + let counter_cmp () = Z.compare op1.counter op2.counter in + match (op1.order, op2.order) with + | Some o1, Some o2 -> + let cmp = Z.compare o1 o2 in + if cmp = 0 then counter_cmp () else cmp + | None, None -> counter_cmp () + | Some _p, _ -> -1 + | _, Some _p -> 1 end diff --git a/src/lib_injector/injector_server_operation.ml b/src/lib_injector/injector_server_operation.ml index 33160896fd67..839e0040bdde 100644 --- a/src/lib_injector/injector_server_operation.ml +++ b/src/lib_injector/injector_server_operation.ml @@ -62,3 +62,7 @@ let pp ppf = function Format.fprintf ppf "Transaction of %Ld tez" amount let unique = function Transaction _ -> true + +let compare _ _ = 0 +(* Compare function used to prioritize a type of operation over the + other. Here it's not necessary. *) diff --git a/src/lib_injector/injector_server_operation.mli b/src/lib_injector/injector_server_operation.mli index ccaa602d4b24..824abecaec49 100644 --- a/src/lib_injector/injector_server_operation.mli +++ b/src/lib_injector/injector_server_operation.mli @@ -24,3 +24,5 @@ val pp : Format.formatter -> t -> unit (** [false] if the injector will accept duplicate such operations. *) val unique : t -> bool + +val compare : t -> t -> int diff --git a/src/lib_injector/injector_sigs.ml b/src/lib_injector/injector_sigs.ml index 3485a737407b..49ca23982c5a 100644 --- a/src/lib_injector/injector_sigs.ml +++ b/src/lib_injector/injector_sigs.ml @@ -94,6 +94,8 @@ module type PARAM_OPERATION = sig one operation [op]. Otherwise, it will allow duplicate such operations to appear in the queue. *) val unique : t -> bool + + val compare : t -> t -> int end (** Internal representation of injector operations. *) @@ -108,10 +110,16 @@ module type INJECTOR_OPERATION = sig (** The type of L1 operations that are injected on Tezos. These have an id attached to them that allows tracking and retrieving their status. *) - type t = private {id : id; operation : operation; mutable errors : errors} + type t = private { + id : id; + order : Z.t option; + counter : Z.t; + operation : operation; + mutable errors : errors; + } (** [make op] returns an L1 operation with the corresponding hash. *) - val make : operation -> t + val make : ?order:Z.t -> operation -> t (** Encoding for L1 operations *) val encoding : t Data_encoding.t @@ -123,6 +131,10 @@ module type INJECTOR_OPERATION = sig (** Register an error as occurring during injection of an operation. Its internal error counter is incremented. *) val register_error : t -> tztrace -> unit + + val id : t -> id + + val compare : t -> t -> int end (** Module type for parameter of functor {!Injector_functor.Make}. *) diff --git a/src/lib_smart_rollup/operation_kind.ml b/src/lib_smart_rollup/operation_kind.ml index d2b3f94d11b0..80d6b4d64577 100644 --- a/src/lib_smart_rollup/operation_kind.ml +++ b/src/lib_smart_rollup/operation_kind.ml @@ -79,3 +79,15 @@ let map_encoding value_encoding = ~string_of_key:to_string ~key_of_string:of_string_exn ~value_encoding) + +let priority_order = function + | Timeout -> 0 + | Refute -> 1 + | Publish -> 2 + | Publish_dal_commitment -> 2 + | Cement -> 2 + | Recover -> 3 + | Add_messages -> 4 + | Execute_outbox_message -> 5 + +let compare_priority k1 k2 = compare (priority_order k1) (priority_order k2) diff --git a/src/lib_smart_rollup/operation_kind.mli b/src/lib_smart_rollup/operation_kind.mli index bc963e1833cd..7d79c3a0738d 100644 --- a/src/lib_smart_rollup/operation_kind.mli +++ b/src/lib_smart_rollup/operation_kind.mli @@ -39,3 +39,11 @@ val of_string_exn : string -> t val encoding : t Data_encoding.t val map_encoding : (t -> 'value Data_encoding.t) -> 'value Map.t Data_encoding.t + +(** [compare_priority] Comparison over tag for order of + importance. Order is given by the list {!all}. it's the following: + + Timeout < Refute < [Publish; Publish_dal_commitment; Cement] < + Recover < Add_messages < Execute_outbox_message. +*) +val compare_priority : t -> t -> int diff --git a/src/lib_smart_rollup_node/injector.ml b/src/lib_smart_rollup_node/injector.ml index 8c243d238efc..2d98f3d82387 100644 --- a/src/lib_smart_rollup_node/injector.ml +++ b/src/lib_smart_rollup_node/injector.ml @@ -59,7 +59,21 @@ module Parameters : let encoding : t Data_encoding.t = Operation_kind.encoding end - module Operation = L1_operation + module Operation = struct + include L1_operation + + let tag : t -> Tag.t = function + | Add_messages _ -> Add_messages + | Cement _ -> Cement + | Publish _ -> Publish + | Timeout _ -> Timeout + | Refute _ -> Refute + | Recover_bond _ -> Recover + | Execute_outbox_message _ -> Execute_outbox_message + | Publish_dal_commitment _ -> Publish_dal_commitment + + let compare op1 op2 = Operation_kind.compare_priority (tag op1) (tag op2) + end (* TODO: https://gitlab.com/tezos/tezos/-/issues/3459 Very coarse approximation for the number of operation we @@ -74,15 +88,7 @@ module Parameters : | Execute_outbox_message -> 1 | Publish_dal_commitment -> 1 - let operation_tag : Operation.t -> Tag.t = function - | Add_messages _ -> Add_messages - | Cement _ -> Cement - | Publish _ -> Publish - | Timeout _ -> Timeout - | Refute _ -> Refute - | Recover_bond _ -> Recover - | Execute_outbox_message _ -> Execute_outbox_message - | Publish_dal_commitment _ -> Publish_dal_commitment + let operation_tag = Operation.tag let fee_parameter {fee_parameters; _} operation = let operation_kind = operation_tag operation in diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 4436ddf29e4c..0ec7c507da03 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -663,8 +663,13 @@ let () = let rops = List.rev_map (fun Injector.Inj_operation. - {operation = op; errors = {count; last_error}; id = _} -> - Rollup_node_services.{op; errors = count; last_error}) + { + operation = op; + errors = {count; last_error}; + id = _; + order = _; + counter = _; + } -> Rollup_node_services.{op; errors = count; last_error}) ops in (tags, List.rev rops)) -- GitLab From a63a6cf237fc6977fc94b8d2f7719e65654933c7 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 3 Dec 2024 16:59:48 +0100 Subject: [PATCH 2/4] injector: add heap disk persistant storage --- src/lib_injector/disk_persistence.ml | 196 +++++++++++++++++++ src/lib_injector/disk_persistence.mli | 73 +++++++ src/lib_stdlib/bounded_min_heap.ml | 61 +++++- src/lib_stdlib/bounded_min_heap.mli | 15 +- src/lib_stdlib/test/test_bounded_min_heap.ml | 35 +++- 5 files changed, 369 insertions(+), 11 deletions(-) diff --git a/src/lib_injector/disk_persistence.ml b/src/lib_injector/disk_persistence.ml index 37e5d05dc1d0..d0e2f5ccfb7c 100644 --- a/src/lib_injector/disk_persistence.ml +++ b/src/lib_injector/disk_persistence.ml @@ -31,6 +31,7 @@ type error += | Io_error of [`Close | `Open] Lwt_utils_unix.io_error | Unix_error of Unix.error | Decoding_error of Data_encoding.Binary.read_error + | Heap_insertion_failed of string let () = register_error_kind @@ -126,6 +127,15 @@ let () = Data_encoding.(obj1 (req "error" Data_encoding.Binary.read_error_encoding)) (function Decoding_error e -> Some e | _ -> None) (fun e -> Decoding_error e) ; + register_error_kind + ~id:"disk_persistence.heap_insertion_failed" + ~title:"Heap insertion failed" + ~description:"Heap insertion failed" + ~pp:(fun ppf err -> Format.fprintf ppf "Heap insertion failed with %s." err) + `Permanent + Data_encoding.(obj1 (req "error" string)) + (function Heap_insertion_failed error -> Some error | _ -> None) + (fun error -> Heap_insertion_failed error) ; () module type H = sig @@ -435,3 +445,189 @@ struct List.iter (fun (k, v, _) -> Q.replace q.queue k v) list ; return q end + +module Make_heap + (N : sig + val name : string + end) + (K : Tezos_crypto.Intfs.HASH) + (V : sig + type t + + val id : t -> K.t + + val compare : t -> t -> int + + val persist : t -> bool + + val encoding : t Data_encoding.t + end) = +struct + module H = Bounded_min_heap.Make (K) (V) + + type t = {path : string; metadata_path : string; heap : H.t} + + let counter = ref min_int + + let filedata h k = Filename.concat h.path (K.to_b58check k) + + let filemetadata h k = Filename.concat h.metadata_path (K.to_b58check k) + + let create ~data_dir n = + let open Lwt_result_syntax in + let heap = H.create n in + let path = Filename.concat data_dir N.name in + let metadata_path = Filename.concat path "metadata" in + let* () = create_dir path in + let+ () = create_dir metadata_path in + {path; metadata_path; heap} + + let remove_persitant_data h k = + let open Lwt_result_syntax in + let* () = delete_file (filedata h k) + and* () = delete_file (filemetadata h k) in + return_unit + + let remove h k = + let open Lwt_result_syntax in + H.remove h.heap k ; + let* () = remove_persitant_data h k in + return_unit + + let create_metadata () = + let time = Time.System.now () in + let d, ps = Ptime.to_span time |> Ptime.Span.to_d_ps in + let c = !counter in + incr counter ; + (d, ps, c) + + let metadata_encoding = + let open Data_encoding in + conv + (fun (d, ps, c) -> (Int64.of_int d, ps, Int64.of_int c)) + (fun (d, ps, c) -> (Int64.to_int d, ps, Int64.to_int c)) + @@ tup3 int64 int64 int64 + + let heap_insert h v = + Result.map_error (fun err_msg -> [Heap_insertion_failed err_msg]) + @@ H.insert v h.heap + + let insert h k v = + let open Lwt_result_syntax in + let*? () = heap_insert h v in + if V.persist v then + let* () = write_value (filedata h k) V.encoding v + and* () = + write_value (filemetadata h k) metadata_encoding (create_metadata ()) + in + return_unit + else return_unit + + let pop h = + let open Lwt_result_syntax in + let elt_opt = H.pop h.heap in + let* () = + Option.iter_es + (fun v -> + let k = V.id v in + remove_persitant_data h k) + elt_opt + in + return elt_opt + + let clear h = + let open Lwt_syntax in + H.clear h.heap ; + (* Remove the persistent elements from the disk. *) + let elts = Lwt_unix.files_of_directory h.path in + let metadata_elts = Lwt_unix.files_of_directory h.metadata_path in + let unlink file = + Lwt.catch + (fun () -> + if Sys.is_directory file then return_unit else Lwt_unix.unlink file) + (fun e -> + Format.ksprintf + Stdlib.failwith + "Error in unlink %s: %s" + file + (Printexc.to_string e)) + in + let* () = + Lwt_stream.iter_s (fun f -> unlink (Filename.concat h.path f)) elts + and* () = + Lwt_stream.iter_s + (fun m -> unlink (Filename.concat h.metadata_path m)) + metadata_elts + in + return_unit + + let peek_min h = H.peek_min h.heap + + let length h = H.length h.heap + + let find_opt h k = H.find_opt k h.heap + + let elements h = H.elements h.heap + + let remove_predicate f h = + let open Lwt_result_syntax in + let removed_id = H.remove_predicate f h.heap in + let* () = List.iter_es (remove_persitant_data h) removed_id in + return_unit + + let load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter = + let open Lwt_result_syntax in + let* h = create ~data_dir capacity in + let*! d = Lwt_unix.opendir h.path in + let rec browse acc = + let*! filename = + let open Lwt_syntax in + Lwt.catch + (fun () -> + let+ f = Lwt_unix.readdir d in + Some f) + (function End_of_file -> return_none | e -> Lwt.reraise e) + in + match filename with + | None -> return acc + | Some filename -> + let* acc = + match K.of_b58check_opt filename with + | None -> return acc + | Some k -> ( + let+ v_meta = + match warn_unreadable with + | None -> + let* v = read_value (filedata h k) V.encoding + and* meta = + read_value (filemetadata h k) metadata_encoding + in + return_some (v, meta) + | Some warn -> + let open Lwt_syntax in + let* v = maybe_read_value ~warn (filedata h k) V.encoding + and* meta = + maybe_read_value + ~warn + (filemetadata h k) + metadata_encoding + in + return_ok @@ Option.bind v + @@ fun v -> Option.bind meta @@ fun meta -> Some (v, meta) + in + match v_meta with + | None -> acc + | Some (v, meta) -> + if filter v then (k, v, meta) :: acc else acc) + in + browse acc + in + let* list = browse [] in + let list = + List.fast_sort + (fun (_, _, meta1) (_, _, meta2) -> Stdlib.compare meta1 meta2) + list + in + let*? () = List.iter_e (fun (_k, v, _) -> heap_insert h v) list in + return h +end diff --git a/src/lib_injector/disk_persistence.mli b/src/lib_injector/disk_persistence.mli index 5f0bede5ca0a..a87fc1ab466f 100644 --- a/src/lib_injector/disk_persistence.mli +++ b/src/lib_injector/disk_persistence.mli @@ -177,3 +177,76 @@ module Make_queue filter:(V.t -> bool) -> t tzresult Lwt.t end + +(** Create an on-disk persistent version of the {!Bounded_min_heap} + data structure. *) +module Make_heap + (N : sig + (** Name used to derive a path (relative to [data_dir] in [load_from_disk]) of where + to store the persistent information for this queue. *) + val name : string + end) + (K : Tezos_crypto.Intfs.HASH) + (V : sig + type t + + val id : t -> K.t + + val compare : t -> t -> int + + val persist : t -> bool + + val encoding : t Data_encoding.t + end) : sig + type t + + (** [remove h k] removes the binding from [k] in [h]. If [k] is not bound in + [c], it does nothing. The removal is persisted on disk. *) + val remove : t -> K.t -> unit tzresult Lwt.t + + (** [remove_predicate f h] removes the binding in [h] where [f elt = + true]. The removal is persisted on disk. *) + val remove_predicate : (V.t -> bool) -> t -> unit tzresult Lwt.t + + (** [insert h K.t v] binds the key [k] to the value [v] in the queue [h]. This + may or may not cause another binding to be removed, depending on the + number of bindings already present in [h]. The addition (or replacement) + is persisted on disk. *) + val insert : t -> K.t -> V.t -> unit tzresult Lwt.t + + (** [peek_min h] return, without removing, the smallest element from + the heap [h], [None] if empty. *) + val peek_min : t -> V.t option + + (** [pop h] remove the smallest element from the heap [h], + [None] if empty. *) + val pop : t -> V.t option tzresult Lwt.t + + (** [find_opt h k] is [Some v] if [k] is bound to [v] in [h]. It is [None] + otherwise. *) + val find_opt : t -> K.t -> V.t option + + (** [elemets h] returns the elements of the queue [h] from oldest to + newest. *) + val elements : t -> V.t list + + (** [length h] is the number of bindings held by [h]. *) + val length : t -> int + + (** [clear h] empties the queue [h] and removes its persistent content on + disk. *) + val clear : t -> unit Lwt.t + + (** [load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter] creates a + bounded hash queue of capacity [capacity]. The queue is populated by + persistent elements present in [data_dir/N.name] which pass the [filter] + (the directory is created if it does not exist). If [warn_unreadable] is + [Some warn], unreadable files are ignored but a warning is printed with + [warn], otherwise the loading fails on the first unreadable file. *) + val load_from_disk : + warn_unreadable:(string -> error trace -> unit Lwt.t) option -> + capacity:int -> + data_dir:string -> + filter:(V.t -> bool) -> + t tzresult Lwt.t +end diff --git a/src/lib_stdlib/bounded_min_heap.ml b/src/lib_stdlib/bounded_min_heap.ml index 689a8e651a79..9f3ef189b059 100644 --- a/src/lib_stdlib/bounded_min_heap.ml +++ b/src/lib_stdlib/bounded_min_heap.ml @@ -148,17 +148,28 @@ struct Ok ()) else Error "no space left in heap" + let remove_index t index = + t.data.(index) <- None ; + t.size <- t.size - 1 ; + if t.size > 0 && index <> t.size then ( + swap index t.size t ; + bubble_down index t) + + let remove t id = + (* uses `find_opt` and not `find` because it may failed if called + with unknown id *) + match Hash_map.find_opt t.hash_map id with + | None -> () + | Some index -> + remove_index t index ; + Hash_map.remove t.hash_map id + let pop t = if t.size = 0 then None else let min_elem = get_data t.data 0 in - t.data.(0) <- None ; - t.size <- t.size - 1 ; - if t.size > 0 then ( - swap 0 t.size t ; - bubble_down 0 t) ; let id = E.id min_elem in - Hash_map.remove t.hash_map id ; + remove t id ; assert (Hash_map.length t.hash_map = t.size) ; Some min_elem @@ -179,8 +190,27 @@ struct let elt_i = Hash_map.find_opt t.hash_map id in Option.map (get_data t.data) elt_i + let clear t = + Hash_map.clear t.hash_map ; + Array.fill t.data 0 t.size None ; + t.size <- 0 + + let remove_predicate f (t : t) = + let rec aux heap_index acc = + if heap_index >= t.size then acc + else + let value = get_data t.data heap_index in + if f value then ( + let id = E.id value in + remove t id ; + assert (Hash_map.length t.hash_map = t.size) ; + aux heap_index (id :: acc)) + else aux (heap_index + 1) acc + in + aux 0 [] + module Internal_for_tests = struct - let check_heap_invariant ~pp_elt h = + let check_heap_invariant ~pp_id ~pp_elt h = let get_data_opt i = if i < Array.length h.data then h.data.(i) else None in @@ -239,6 +269,21 @@ struct (get_data_opt right_child_index) correct_map_size) in - check_invariant_for_all_index h.size + Format.printf + "@.@.HEAP:@.heap: %a@.size: %d@.hash_map: %a@.@." + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + (pp_print_option pp_elt)) + (h.data |> Array.to_list) + h.size + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + (fun fmt (id, index) -> + Format.fprintf fmt "(%a, %d)" pp_id id index)) + (Hash_map.to_seq h.hash_map |> List.of_seq) ; + check_invariant_for_all_index h.size ; + assert (Hash_map.length h.hash_map = h.size) end end diff --git a/src/lib_stdlib/bounded_min_heap.mli b/src/lib_stdlib/bounded_min_heap.mli index a65da7ad3f39..c438ea05fb2d 100644 --- a/src/lib_stdlib/bounded_min_heap.mli +++ b/src/lib_stdlib/bounded_min_heap.mli @@ -66,8 +66,21 @@ module Make id] is true. *) val find_opt : Id.t -> t -> E.t option + (** [remove h id] removes the element with [id] from the heap, or leaves [h] unchanged if [id] is not present in [h]. *) + val remove : t -> Id.t -> unit + + (** [remove_predicate f h] removes the elements [elt] for which [f elt = true] from + the heap and returns the list of ids removed. *) + val remove_predicate : (E.t -> bool) -> t -> Id.t list + + (** [clear h] deletes all data from the heap. *) + val clear : t -> unit + module Internal_for_tests : sig val check_heap_invariant : - pp_elt:(Format.formatter -> E.t -> unit) -> t -> unit + pp_id:(Format.formatter -> Id.t -> unit) -> + pp_elt:(Format.formatter -> E.t -> unit) -> + t -> + unit end end diff --git a/src/lib_stdlib/test/test_bounded_min_heap.ml b/src/lib_stdlib/test/test_bounded_min_heap.ml index e6fb0d4b1250..96c94a45f890 100644 --- a/src/lib_stdlib/test/test_bounded_min_heap.ml +++ b/src/lib_stdlib/test/test_bounded_min_heap.ml @@ -39,6 +39,12 @@ let take_nth_smallest n l = *) let size = QCheck2.Gen.int_range 2 1000 +let check_invariant b = + B.Internal_for_tests.check_heap_invariant + ~pp_id:Format.pp_print_string + ~pp_elt:Format.pp_print_int + b + let check_insert_heap ?(expect_heap_is_full = false) v b = let res = B.insert v b in let () = @@ -47,7 +53,7 @@ let check_insert_heap ?(expect_heap_is_full = false) v b = Assert.equal ~msg:__LOC__ res (Error error) else Assert.equal ~msg:__LOC__ res (Ok ()) in - B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b + check_invariant b let test_empty_works () = let b = B.create 0 in @@ -63,7 +69,7 @@ let test_heap_works () = let check_pop ~__LOC__ = List.iter (fun v -> let extracted = B.pop b in - B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b ; + check_invariant b ; Assert.equal ~pp:Format.(pp_print_option pp_print_int) ~msg:__LOC__ @@ -109,6 +115,31 @@ let test_heap_works () = true ; Assert.equal ~msg:__LOC__ (B.find_opt (id 0) b) None ; Assert.equal ~msg:__LOC__ (B.find_opt (id 11) b) None ; + B.clear b ; + check_invariant b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + check_insert_list [1; 2; 3; 4; 4; 3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3; 4] ; + B.remove b (string_of_int 4) ; + check_invariant b ; + B.remove b (string_of_int 2) ; + check_invariant b ; + B.remove b (string_of_int 0) ; + check_invariant b ; + B.remove b (string_of_int 1) ; + check_invariant b ; + B.remove b (string_of_int 3) ; + check_invariant b ; + B.remove b (string_of_int 0) ; + check_invariant b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + check_insert_list [4; 3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3; 4] ; + let removed_ids = B.remove_predicate (fun i -> i <= 2) b in + Assert.equal ~msg:__LOC__ (List.sort String.compare removed_ids) ["1"; "2"] ; + Assert.equal ~msg:__LOC__ (B.elements b) [3; 4] ; + check_invariant b ; + () let () = -- GitLab From 0486076e8ed970b7c678d5ae9f7887a717e2205b Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 4 Dec 2024 09:43:31 +0100 Subject: [PATCH 3/4] injector: uses heap instead of queue --- src/lib_injector/disk_persistence.ml | 155 -------------------------- src/lib_injector/disk_persistence.mli | 60 ---------- src/lib_injector/injector_functor.ml | 138 +++++++++++------------ tezt/tests/sc_rollup.ml | 113 +++++++++++++++++++ 4 files changed, 178 insertions(+), 288 deletions(-) diff --git a/src/lib_injector/disk_persistence.ml b/src/lib_injector/disk_persistence.ml index d0e2f5ccfb7c..4682f257e930 100644 --- a/src/lib_injector/disk_persistence.ml +++ b/src/lib_injector/disk_persistence.ml @@ -291,161 +291,6 @@ module Make_table (H : H) = struct t end -module Make_queue - (N : sig - val name : string - end) - (K : Tezos_crypto.Intfs.HASH) - (V : sig - type t - - val encoding : t Data_encoding.t - - val persist : t -> bool - end) = -struct - module Q = Hash_queue.Make (K) (V) - - type t = {path : string; metadata_path : string; queue : Q.t} - - let counter = ref min_int - - let filedata q k = Filename.concat q.path (K.to_b58check k) - - let filemetadata q k = Filename.concat q.metadata_path (K.to_b58check k) - - let create ~data_dir n = - let open Lwt_result_syntax in - let queue = Q.create n in - let path = Filename.concat data_dir N.name in - let metadata_path = Filename.concat path "metadata" in - let* () = create_dir path in - let+ () = create_dir metadata_path in - {path; metadata_path; queue} - - let remove q k = - let open Lwt_result_syntax in - Q.remove q.queue k ; - let* () = delete_file (filedata q k) - and* () = delete_file (filemetadata q k) in - return_unit - - let create_metadata () = - let time = Time.System.now () in - let d, ps = Ptime.to_span time |> Ptime.Span.to_d_ps in - let c = !counter in - incr counter ; - (d, ps, c) - - let metadata_encoding = - let open Data_encoding in - conv - (fun (d, ps, c) -> (Int64.of_int d, ps, Int64.of_int c)) - (fun (d, ps, c) -> (Int64.to_int d, ps, Int64.to_int c)) - @@ tup3 int64 int64 int64 - - let replace q k v = - let open Lwt_result_syntax in - Q.replace q.queue k v ; - if V.persist v then - let* () = write_value (filedata q k) V.encoding v - and* () = - write_value (filemetadata q k) metadata_encoding (create_metadata ()) - in - return_unit - else return_unit - - let clear q = - let open Lwt_syntax in - Q.clear q.queue ; - (* Remove the persistent elements from the disk. *) - let elts = Lwt_unix.files_of_directory q.path in - let metadata_elts = Lwt_unix.files_of_directory q.metadata_path in - let unlink file = - Lwt.catch - (fun () -> - if Sys.is_directory file then return_unit else Lwt_unix.unlink file) - (fun e -> - Format.ksprintf - Stdlib.failwith - "Error in unlink %s: %s" - file - (Printexc.to_string e)) - in - let* () = - Lwt_stream.iter_s (fun f -> unlink (Filename.concat q.path f)) elts - and* () = - Lwt_stream.iter_s - (fun m -> unlink (Filename.concat q.metadata_path m)) - metadata_elts - in - return_unit - - let fold f q = Q.fold f q.queue - - let length q = Q.length q.queue - - let find_opt q k = Q.find_opt q.queue k - - let elements q = Q.elements q.queue - - let load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter = - let open Lwt_result_syntax in - let* q = create ~data_dir capacity in - let*! d = Lwt_unix.opendir q.path in - let rec browse acc = - let*! filename = - let open Lwt_syntax in - Lwt.catch - (fun () -> - let+ f = Lwt_unix.readdir d in - Some f) - (function End_of_file -> return_none | e -> Lwt.reraise e) - in - match filename with - | None -> return acc - | Some filename -> - let* acc = - match K.of_b58check_opt filename with - | None -> return acc - | Some k -> ( - let+ v_meta = - match warn_unreadable with - | None -> - let* v = read_value (filedata q k) V.encoding - and* meta = - read_value (filemetadata q k) metadata_encoding - in - return_some (v, meta) - | Some warn -> - let open Lwt_syntax in - let* v = maybe_read_value ~warn (filedata q k) V.encoding - and* meta = - maybe_read_value - ~warn - (filemetadata q k) - metadata_encoding - in - return_ok @@ Option.bind v - @@ fun v -> Option.bind meta @@ fun meta -> Some (v, meta) - in - match v_meta with - | None -> acc - | Some (v, meta) -> - if filter v then (k, v, meta) :: acc else acc) - in - browse acc - in - let* list = browse [] in - let list = - List.fast_sort - (fun (_, _, meta1) (_, _, meta2) -> Stdlib.compare meta1 meta2) - list - in - List.iter (fun (k, v, _) -> Q.replace q.queue k v) list ; - return q -end - module Make_heap (N : sig val name : string diff --git a/src/lib_injector/disk_persistence.mli b/src/lib_injector/disk_persistence.mli index a87fc1ab466f..34d26769daba 100644 --- a/src/lib_injector/disk_persistence.mli +++ b/src/lib_injector/disk_persistence.mli @@ -118,66 +118,6 @@ module Make_table (H : H) : sig t tzresult Lwt.t end -(** Create an on-disk persistent version of the {!Hash_queue} data structure. *) -module Make_queue - (N : sig - (** Name used to derive a path (relative to [data_dir] in [load_from_disk]) of where - to store the persistent information for this queue. *) - val name : string - end) - (K : Tezos_crypto.Intfs.HASH) - (V : sig - type t - - val persist : t -> bool - - val encoding : t Data_encoding.t - end) : sig - type t - - (** [remove q k] removes the binding from [k] in [q]. If [k] is not bound in - [c], it does nothing. The removal is persisted on disk. *) - val remove : t -> K.t -> unit tzresult Lwt.t - - (** [replace q k v] binds the key [k] to the value [v] in the queue [q]. This - may or may not cause another binding to be removed, depending on the - number of bindings already present in [q]. The addition (or replacement) - is persisted on disk. *) - val replace : t -> K.t -> V.t -> unit tzresult Lwt.t - - (** [fold f q init] folds the function [f] over the bindings - of [q] (in memory). The elements are iterated from oldest to newest. *) - val fold : (K.t -> V.t -> 'a -> 'a) -> t -> 'a -> 'a - - (** [find_opt q k] is [Some v] if [k] is bound to [v] in [q]. It is [None] - otherwise. *) - val find_opt : t -> K.t -> V.t option - - (** [elemets q] returns the elements of the queue [q] from oldest to - newest. *) - val elements : t -> V.t list - - (** [length q] is the number of bindings held by [q]. *) - val length : t -> int - - (** [clear q] empties the queue [q] and removes its persistent content on - disk. *) - val clear : t -> unit Lwt.t - - (** [load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter] creates a - bounded hash queue of capacity [capacity]. The queue is populated by - persistent elements present in [data_dir/N.name] which pass the [filter] - (the directory is created if it does not exist). If [warn_unreadable] is - [Some warn], unreadable files are ignored but a warning is printed with - [warn], otherwise the loading fails on the first unreadable file. *) - val load_from_disk : - warn_unreadable:(string -> error trace -> unit Lwt.t) option -> - capacity:int -> - data_dir:string -> - filter:(V.t -> bool) -> - t tzresult Lwt.t -end - (** Create an on-disk persistent version of the {!Bounded_min_heap} data structure. *) module Make_heap diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 945efb668080..376c5fbd4f4c 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -110,10 +110,10 @@ module Make (Parameters : PARAMETERS) = struct | Injected of injected_info | Included of included_info - module Op_queue = - Disk_persistence.Make_queue + module Op_heap = + Disk_persistence.Make_heap (struct - let name = "operations_queue" + let name = "operations_heap" end) (Id) (struct @@ -165,8 +165,7 @@ module Make (Parameters : PARAMETERS) = struct strategy : injection_strategy; (** The strategy of this worker for injecting the pending operations. *) save_dir : string; (** Path to where save persistent state *) - queue : Op_queue.t; - (** The queue of pending operations for this injector. *) + heap : Op_heap.t; (** The heap of pending operations for this injector. *) injected : injected_state; (** The information about injected operations. *) included : included_state; @@ -265,14 +264,14 @@ module Make (Parameters : PARAMETERS) = struct Event.(emit loaded_from_disk) (List.map (fun s -> s.alias) signers, tags, nb, kind) in - let* queue = - Op_queue.load_from_disk + let* heap = + Op_heap.load_from_disk ~warn_unreadable ~capacity:50_000 ~data_dir ~filter:(filter (fun op -> op.Inj_operation.operation)) in - let*! () = emit_event_loaded "operations_queue" @@ Op_queue.length queue in + let*! () = emit_event_loaded "operations_queue" @@ Op_heap.length heap in (* Very coarse approximation for the number of operation we expect for each block *) let n = @@ -313,7 +312,7 @@ module Make (Parameters : PARAMETERS) = struct tags; strategy; save_dir = data_dir; - queue; + heap; injected = {injected_operations; injected_ophs}; included = {included_operations; included_in_blocks}; state; @@ -326,11 +325,11 @@ module Make (Parameters : PARAMETERS) = struct } (** We consider an operation to already exist in the injector if either: - - It is already in the queue + - It is already in the heap - It is injected but not included - It is included but not confirmed. *) let already_exists state op_hash = - Op_queue.find_opt state.queue op_hash <> None + Op_heap.find_opt state.heap op_hash <> None || Injected_operations.mem state.injected.injected_operations op_hash || match @@ -343,8 +342,8 @@ module Make (Parameters : PARAMETERS) = struct | Some {level = head_level; _} -> Int32.sub head_level l1_level < Int32.of_int confirmations) - (** Add an operation to the pending queue corresponding to the signer for this - operation. *) + (** Add an operation to the pending heap corresponding to the signer for this + operation. *) let add_pending_operation ?(retry = false) state (op : Inj_operation.t) = let open Lwt_result_syntax in if already_exists state op.id then @@ -356,7 +355,7 @@ module Make (Parameters : PARAMETERS) = struct state op.operation in - Op_queue.replace state.queue op.id op + Op_heap.insert state.heap op.id op (** Mark operations as injected (in [oph]). *) let add_injected_operations state {pkh = signer_pkh; _} oph ~injection_level @@ -557,12 +556,12 @@ module Make (Parameters : PARAMETERS) = struct let*! () = Event.(emit1 number_of_operations_in_queue) state - (Op_queue.length state.queue) + (Op_heap.length state.heap) in (* We perform a dichotomy by injecting the first half of the operations (we are not looking to maximize the number of operations injected because of the cost of simulation). Only the operations - which are actually injected will be removed from the queue so the + which are actually injected will be removed from the heap so the other half will be reconsidered later. *) match keep_half operations with | None -> @@ -602,7 +601,7 @@ module Make (Parameters : PARAMETERS) = struct op.errors.count op.errors.last_error in - Op_queue.remove state.queue op.id + Op_heap.remove state.heap op.id else let*! () = Event.(emit3 ?signers error_simulation_operation) @@ -696,46 +695,44 @@ module Make (Parameters : PARAMETERS) = struct in (oph, operations) - (** Retrieve as many batch of operations from the queue while batch - size remains below the size limit. *) - let get_n_ops_batch_from_queue ~size_limit state n = - let exception - Reached_limit of - (int * Inj_operation.t list * int * Inj_operation.t list list) - in + (** Retrieve as many batch of operations from the heap while batch + size remains below the size limit. *) + let get_n_ops_batch_from_queue ~size_limit state nb_signers = + let open Lwt_result_syntax in let module Proto_client = (val state.proto_client) in let min_size = Block_hash.size + Signature.size Signature.zero in let op_size op = Proto_client.operation_size op.Inj_operation.operation + Proto_client.operation_size_overhead in - let _current_size, rev_current_ops, nb_batch, rev_ops_batch = - try - Op_queue.fold - (fun _oph - op - ((current_size, rev_current_ops, nb_batch, rev_ops_batch) as acc) -> - if nb_batch = n then raise (Reached_limit acc) ; - let new_size = current_size + op_size op in - if new_size > size_limit then - let current_ops = List.rev rev_current_ops in - ( min_size + op_size op, - [op], - nb_batch + 1, - current_ops :: rev_ops_batch ) - else (new_size, op :: rev_current_ops, nb_batch, rev_ops_batch)) - state.queue - (min_size, [], 0, []) - with Reached_limit acc -> acc + let add_batch rev_batch rev_batches = + if List.is_empty rev_batch then rev_batches + else + let batch = List.rev rev_batch in + batch :: rev_batches in - let rev_ops_batch = - if nb_batch < n then - (* Add the last batch, even if it's not of full size, to ensure a larger number of batches. *) - let current_ops = List.rev rev_current_ops in - current_ops :: rev_ops_batch - else rev_ops_batch + let rec get_until_enough (batch_size, rev_ops) (nb_batches, rev_batches) = + let op_opt = Op_heap.peek_min state.heap in + match op_opt with + | None -> + (* Heap is empty, finalize current batch *) + return @@ add_batch rev_ops rev_batches + | Some op -> + let new_size = batch_size + op_size op in + if new_size <= size_limit then + (* pop the message as we only peeked it. *) + let* _op = Op_heap.pop state.heap in + get_until_enough (new_size, op :: rev_ops) (nb_batches, rev_batches) + else + (* we haven't pop the message, so we can safely call + continue_or_stop. *) + continue_or_stop (nb_batches + 1, add_batch rev_ops rev_batches) + and continue_or_stop (nb_batches, rev_batches) = + if nb_batches = nb_signers then return rev_batches + else get_until_enough (min_size, []) (nb_batches, rev_batches) in - List.rev rev_ops_batch + let* rev_batches = continue_or_stop (0, []) in + return @@ List.rev rev_batches (* Ignore operations that are allowed to fail. *) let ignore_ignorable_failing_operations state operations = @@ -762,9 +759,9 @@ module Make (Parameters : PARAMETERS) = struct `Ignored operations_to_drop (** [inject_pending_operations_round state ?size_limit signer] - injects operations from the pending queue [state.pending], whose + injects operations from the pending heap [state.pending], whose total size does not exceed [size_limit] using [signer]. Upon - successful injection, the operations are removed from the queue + successful injection, the operations are removed from the heap and marked as injected. *) let inject_pending_operations_round state signer operations_to_inject = let open Lwt_result_syntax in @@ -808,7 +805,7 @@ module Make (Parameters : PARAMETERS) = struct let* () = List.iter_es (fun (_index, op) -> - Op_queue.remove state.queue op.Inj_operation.id) + Op_heap.remove state.heap op.Inj_operation.id) injected_operations in let*! () = @@ -843,7 +840,7 @@ module Make (Parameters : PARAMETERS) = struct in let* () = List.iter_es - (fun op -> Op_queue.remove state.queue op.Inj_operation.id) + (fun op -> Op_heap.remove state.heap op.Inj_operation.id) operations_to_drop in return (`Continue 0)) @@ -854,7 +851,7 @@ module Make (Parameters : PARAMETERS) = struct Proto_client.max_operation_data_length) () = let open Lwt_result_syntax in let signers = available_signers state in - let ops_batch = + let* ops_batch = get_n_ops_batch_from_queue ~size_limit state (List.length signers) in let signers_and_ops = List.combine_drop signers ops_batch in @@ -1007,7 +1004,7 @@ module Make (Parameters : PARAMETERS) = struct (** [revert_included_operations state block] marks the known (by this injector) manager operations contained in [block] as not being included any more, typically in the case of a reorganization where [block] is on an alternative - chain. The operations are put back in the pending queue. *) + chain. The operations are put back in the pending heap. *) let revert_included_operations state block = let open Lwt_result_syntax in when_ (has_included_operations state) @@ fun () -> @@ -1018,7 +1015,7 @@ module Make (Parameters : PARAMETERS) = struct (List.map (fun o -> o.op.id) revert_infos) in (* TODO: https://gitlab.com/tezos/tezos/-/issues/2814 - maybe put at the front of the queue for re-injection. *) + maybe put at the front of the heap for re-injection. *) List.iter_es (fun {op; _} -> let*! requeue = @@ -1122,7 +1119,7 @@ module Make (Parameters : PARAMETERS) = struct let set_metrics state = Metrics.wrap @@ fun () -> let tags = Tags.to_seq state.tags |> List.of_seq in - Metrics.set_queue_size tags (Op_queue.length state.queue) ; + Metrics.set_queue_size tags (Op_heap.length state.heap) ; Metrics.set_injected_operations_size tags (Injected_operations.length state.injected.injected_operations) ; @@ -1196,7 +1193,7 @@ module Make (Parameters : PARAMETERS) = struct set_last_head state head (* The request {Request.Inject} triggers an injection of the operations - the pending queue. *) + the pending heap. *) let on_inject state = let open Lwt_result_syntax in let* total_nb_injected_op = @@ -1206,7 +1203,7 @@ module Make (Parameters : PARAMETERS) = struct let*! () = Event.(emit1 number_of_operations_in_queue) state - (Op_queue.length state.queue) + (Op_heap.length state.heap) in return () @@ -1215,18 +1212,13 @@ module Make (Parameters : PARAMETERS) = struct Injected_ophs.clear state.injected.injected_ophs ; Included_operations.clear state.included.included_operations ; Included_in_blocks.clear state.included.included_in_blocks ; - Op_queue.clear state.queue + Op_heap.clear state.heap let remove_operations_with_tag tag state = - let open Lwt_result_syntax in - Op_queue.fold - (fun id op acc -> - if Parameters.Tag.equal (Parameters.operation_tag op.operation) tag then - let* () = Op_queue.remove state.queue id in - acc - else acc) - state.queue - return_unit + Op_heap.remove_predicate + (fun op -> + Parameters.Tag.equal (Parameters.operation_tag op.operation) tag) + state.heap module Types = struct type nonrec state = state @@ -1602,7 +1594,7 @@ module Make (Parameters : PARAMETERS) = struct |> List.map (fun (tags, _w) -> Tags.to_seq tags |> List.of_seq) let op_status_in_worker state l1_hash = - match Op_queue.find_opt state.queue l1_hash with + match Op_heap.find_opt state.heap l1_hash with | Some op -> Some (Pending op.operation) | None -> ( match @@ -1629,7 +1621,7 @@ module Make (Parameters : PARAMETERS) = struct List.fold_left (fun (acc, total) (tags, w) -> let state = Worker.state w in - let len = Op_queue.length state.queue in + let len = Op_heap.length state.heap in let tag_list = Tags.to_seq tags |> List.of_seq in ((tag_list, len) :: acc, total + len)) ([], 0) @@ -1645,9 +1637,9 @@ module Make (Parameters : PARAMETERS) = struct if not to_count then acc else let state = Worker.state w in - let queue = Op_queue.elements state.queue in + let heap = Op_heap.elements state.heap in let tag_list = Tags.to_seq tags |> List.of_seq in - (tag_list, queue) :: acc) + (tag_list, heap) :: acc) [] workers diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index d3209ba869aa..5dc38b6e9f83 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -6024,6 +6024,118 @@ let test_batcher_order_msgs ~kind = in unit +let test_injector_order_operations_by_kind ~kind = + let commitment_period = 5 in + let challenge_window = 5 in + test_full_scenario + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6650 + + cf multiple_batcher_test comment. *) + ~rpc_external:false + ~kind + ~commitment_period + ~challenge_window + ~mode:Operator + { + variant = None; + tags = ["injector"; "operations"; "order"]; + description = "Injector order operations by kind"; + } + @@ fun _protocol rollup_node rollup_addr _node client -> + let* () = Sc_rollup_node.run ~event_level:`Debug rollup_node rollup_addr [] in + let* _ = Client.bake_for_and_wait client in + (* to be 1 block after the origination, otherwise it fails for + something we don't care *) + let check_sr_ops_are_ordered () = + let* block = Client.RPC.call client @@ RPC.get_chain_block () in + let ops = JSON.(block |-> "operations" |=> 3 |> as_list) in + let ops_kind = + let open JSON in + let filter_sr_op json = + let kind = json |-> "kind" |> as_string in + if String.starts_with ~prefix:"smart_rollup" kind then Some kind + else None + in + List.map + (fun op -> op |-> "contents" |> as_list |> List.filter_map filter_sr_op) + ops + |> List.flatten + in + let all = + [ + ["smart_rollup_timeout"]; + ["smart_rollup_refute"]; + ["smart_rollup_publish"; "smart_rollup_cement"]; + ["smart_rollup_recover"]; + ["smart_rollup_add_messages"]; + ["smart_rollup_execute_outbox_message"]; + ] + in + let is_sorted = + let rec aux all l = + match (all, l) with + | _, hd :: rest when not (String.starts_with ~prefix:"smart_rollup" hd) + -> + (*skip op not smart rollup*) + aux all rest + | current :: all_rest, hd :: rest -> + if List.mem hd current then aux all rest else aux all_rest l + | _, [] -> (* all element of l appeared in all in the same order *) true + | [], _ -> + (* There is still at least an element of L but the + sorted list is empty. L was not correctly + ordered. *) + false + in + aux all + in + Check.is_true + ~__LOC__ + ~error_msg: + (Format.asprintf + "injected operations are not sorted, [%a]" + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + pp_print_string) + ops_kind) + @@ is_sorted ops_kind ; + unit + in + + let hook i = + let* () = + (* check done at each block so we don't miss any *) + check_sr_ops_are_ordered () + in + let* _ = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.post_local_batcher_injection + ~messages:[string_of_int i] + () + in + unit + in + (* We only check add_messages, publish and cement operations + order. We could do more but test becomes more involved and I + think it's not necessary. *) + let* level = + bake_until_lpc_updated + ~at_least:(commitment_period + 2) + ~hook + client + rollup_node + in + let* _ = + bake_until_lcc_updated + ~at_least:(challenge_window + 2) + ~hook + client + rollup_node + ~level + in + unit + (** Injector only uses key that have no operation in the mempool currently. 1. Batcher setup: @@ -6681,6 +6793,7 @@ let register_protocol_independent () = test_multiple_batcher_key ~kind protocols ; test_batcher_order_msgs ~kind protocols ; test_injector_uses_available_keys protocols ~kind ; + test_injector_order_operations_by_kind protocols ~kind ; test_batcher_dont_reinject_already_injected_messages protocols ~kind ; test_private_rollup_node_publish_in_whitelist protocols ; test_private_rollup_node_publish_not_in_whitelist protocols ; -- GitLab From 907a4645fde4759c7d1e4ef5d8531fd1eda45ded Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 4 Dec 2024 09:46:00 +0100 Subject: [PATCH 4/4] batcher: inject batch with lowest element's order from batch --- src/lib_injector/injector_functor.ml | 4 +-- src/lib_injector/injector_sigs.ml | 2 +- src/lib_smart_rollup/l2_message.ml | 2 ++ src/lib_smart_rollup/l2_message.mli | 2 ++ src/lib_smart_rollup_node/batcher.ml | 32 ++++++++++++++------- src/lib_smart_rollup_node/injector.ml | 4 +-- src/lib_smart_rollup_node/injector.mli | 5 +++- tezt/tests/sc_rollup.ml | 40 ++++++++++++++++++++------ 8 files changed, 67 insertions(+), 24 deletions(-) diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 376c5fbd4f4c..46a1955fa28c 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -1577,9 +1577,9 @@ module Make (Parameters : PARAMETERS) = struct tag | Some worker -> return worker - let add_pending_operation op = + let add_pending_operation ?order op = let open Lwt_result_syntax in - let operation = Inj_operation.make op in + let operation = Inj_operation.make ?order op in let*? w = worker_of_tag (Parameters.operation_tag op) in let* () = add_pending_operation (Worker.state w) operation in return operation.id diff --git a/src/lib_injector/injector_sigs.ml b/src/lib_injector/injector_sigs.ml index 49ca23982c5a..417c7a3b48db 100644 --- a/src/lib_injector/injector_sigs.ml +++ b/src/lib_injector/injector_sigs.ml @@ -339,7 +339,7 @@ module type S = sig (** Add an operation as pending injection in the injector. It returns the id of the operation in the injector queue. *) - val add_pending_operation : operation -> Id.t tzresult Lwt.t + val add_pending_operation : ?order:Z.t -> operation -> Id.t tzresult Lwt.t (** Trigger an injection of the pending operations for all workers. If [tags] is given, only the workers which have a tag in [tags] inject their pending diff --git a/src/lib_smart_rollup/l2_message.ml b/src/lib_smart_rollup/l2_message.ml index dcf17e7ec224..377975d602fc 100644 --- a/src/lib_smart_rollup/l2_message.ml +++ b/src/lib_smart_rollup/l2_message.ml @@ -106,3 +106,5 @@ let compare msg1 msg2 = | None, None -> counter_cmp () | Some _p, _ -> -1 | _, Some _p -> 1 + +let order {order; _} = order diff --git a/src/lib_smart_rollup/l2_message.mli b/src/lib_smart_rollup/l2_message.mli index e08e22926aca..0153adf5170d 100644 --- a/src/lib_smart_rollup/l2_message.mli +++ b/src/lib_smart_rollup/l2_message.mli @@ -53,4 +53,6 @@ val id : t -> Id.t val counter : t -> Z.t +val order : t -> Z.t option + val compare : t -> t -> int diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index ce0f021b5d9f..ac2aacf71741 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -49,12 +49,13 @@ let message_size s = (* Encoded as length of s on 4 bytes + s *) 4 + String.length s -let inject_batch state (l2_messages : L2_message.t list) = +let inject_batch ?order state (l2_messages : L2_message.t list) = let open Lwt_result_syntax in let messages = List.map L2_message.content l2_messages in let operation = L1_operation.Add_messages {messages} in let* l1_id = Injector.check_and_add_pending_operation + ?order state.node_ctxt.config.mode operation in @@ -72,7 +73,11 @@ let inject_batch state (l2_messages : L2_message.t list) = Batched_messages.replace state.batched id {content; l1_id}) l2_messages -let inject_batches state = List.iter_es (inject_batch state) +let inject_batches state = + List.iter_es (fun (min_order, batch) -> + (* We inject that batch taking the smallest order of that batch, + i.e. the one of the first element. *) + inject_batch ?order:min_order state batch) let max_batch_size {node_ctxt; plugin; _} = let module Plugin = (val plugin) in @@ -84,17 +89,23 @@ let get_batches state ~only_full = let max_batch_size = max_batch_size state in let max_batch_elements = state.node_ctxt.config.batcher.max_batch_elements in let heap = state.messages_heap in - let add_batch rev_batch rev_batches = + let add_batch ~min_order rev_batch rev_batches = if List.is_empty rev_batch then rev_batches else let batch = List.rev rev_batch in - batch :: rev_batches + (min_order, batch) :: rev_batches in - let rec pop_until_enough (rev_batch, batch_size, batch_elements) rev_batches = + let rec pop_until_enough ~is_first + (min_order, rev_batch, batch_size, batch_elements) rev_batches = let message = Message_heap.peek_min heap in match message with - | None -> add_batch rev_batch rev_batches + | None -> add_batch ~min_order rev_batch rev_batches | Some message -> + let min_order = + (* first element of the batch has the lower order. We only + care about this one.*) + if is_first then L2_message.order message else min_order + in let size = message_size (L2_message.content message) in let new_batch_size = batch_size + size in let new_batch_elements = batch_elements + 1 in @@ -107,19 +118,20 @@ let get_batches state ~only_full = (* We can add the message to the current batch because we are still within the bounds. *) pop_until_enough - (message :: rev_batch, new_batch_size, new_batch_elements) + ~is_first:false + (min_order, message :: rev_batch, new_batch_size, new_batch_elements) rev_batches else (* we haven't pop the message, so we can safely call continue_or_stop. *) - continue_or_stop (add_batch rev_batch rev_batches) + continue_or_stop (add_batch ~min_order rev_batch rev_batches) and continue_or_stop rev_batches = if only_full && Message_heap.length heap < state.node_ctxt.config.batcher.min_batch_elements then rev_batches - else pop_until_enough ([], 0, 0) rev_batches + else pop_until_enough ~is_first:true (None, [], 0, 0) rev_batches in continue_or_stop [] |> List.rev @@ -129,7 +141,7 @@ let produce_batches state ~only_full = let batches = get_batches state ~only_full in let get_timestamp = Time.System.now () in let nb_messages_batched = - List.fold_left (fun len l -> len + List.length l) 0 batches + List.fold_left (fun len (_min_order, l) -> len + List.length l) 0 batches in Metrics.wrap (fun () -> Metrics.Batcher.set_messages_queue_size diff --git a/src/lib_smart_rollup_node/injector.ml b/src/lib_smart_rollup_node/injector.ml index 2d98f3d82387..3a75431a3417 100644 --- a/src/lib_smart_rollup_node/injector.ml +++ b/src/lib_smart_rollup_node/injector.ml @@ -171,10 +171,10 @@ end include Injector_functor.Make (Parameters) -let check_and_add_pending_operation (mode : Configuration.mode) +let check_and_add_pending_operation (mode : Configuration.mode) ?order (operation : L1_operation.t) = let open Lwt_result_syntax in if Configuration.(can_inject mode (Parameters.operation_tag operation)) then - let* hash = add_pending_operation operation in + let* hash = add_pending_operation ?order operation in return (Some hash) else return None diff --git a/src/lib_smart_rollup_node/injector.mli b/src/lib_smart_rollup_node/injector.mli index 0409250dd43d..ae60572f4302 100644 --- a/src/lib_smart_rollup_node/injector.mli +++ b/src/lib_smart_rollup_node/injector.mli @@ -43,4 +43,7 @@ include mode. If allowed, adds it to the pending operation. Returns [None] when the operation is not allowed in the mode. *) val check_and_add_pending_operation : - Configuration.mode -> L1_operation.t -> Inj_operation.id option tzresult Lwt.t + Configuration.mode -> + ?order:Z.t -> + L1_operation.t -> + Inj_operation.id option tzresult Lwt.t diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 5dc38b6e9f83..a13e369e479e 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -423,9 +423,13 @@ let wait_for_included_successful_operation node ~operation_kind = let wait_until_n_batches_are_injected rollup_node ~nb_batches = let nb_injected = ref 0 in - Sc_rollup_node.wait_for rollup_node "injected_ops.v0" @@ fun _json -> - nb_injected := !nb_injected + 1 ; - if !nb_injected >= nb_batches then Some () else None + Sc_rollup_node.wait_for rollup_node "injected_ops.v0" @@ fun json -> + JSON.( + json |-> "operations" |> as_list + |> List.iter (fun json -> + let kind = json |-> "kind" |> as_string in + if kind = "add_messages" then nb_injected := !nb_injected + 1)) ; + if !nb_injected = nb_batches then Some () else None let send_message_batcher_aux ?rpc_hooks client sc_node msgs = let batched = @@ -5922,10 +5926,12 @@ let test_batcher_order_msgs ~kind = unit in - let bake_then_check_included_msgs ~__LOC__ ~nb_batches ~expected_messages = + let bake_then_check_included_msgs ~__LOC__ ~expected_messages = let* level = Node.get_level node in let wait_for_injected = - wait_until_n_batches_are_injected rollup_node ~nb_batches + wait_until_n_batches_are_injected + rollup_node + ~nb_batches:(List.length expected_messages) in let* () = Client.bake_for_and_wait client and* () = wait_for_injected @@ -5981,7 +5987,6 @@ let test_batcher_order_msgs ~kind = let* () = bake_then_check_included_msgs ~__LOC__ - ~nb_batches:1 ~expected_messages:[expected_messages] in @@ -6019,9 +6024,28 @@ let test_batcher_order_msgs ~kind = List.init min_batch_elements (fun i -> (max_batch_elements + i) * 10); ] in - let* () = - bake_then_check_included_msgs ~__LOC__ ~nb_batches:1 ~expected_messages + let* () = bake_then_check_included_msgs ~__LOC__ ~expected_messages in + + Log.info + "Injecting multiple time the mimimal batches element in reverse order of \ + priority to make sure the injector order them correctly" ; + (* [0; 1; ... 9] *) + let messages_no_order = List.init min_batch_elements Fun.id in + let* _hashes = inject_int_of_string messages_no_order in + (* [10; 11; ... 19] *) + let messages_order_2 = + List.init min_batch_elements (fun i -> min_batch_elements + i) + in + let* _hashes = inject_int_of_string ~order:2 messages_order_2 in + (* [20; 21; ... 29] *) + let messages_order_1 = + List.init min_batch_elements (fun i -> (2 * min_batch_elements) + i) + in + let* _hashes = inject_int_of_string ~order:1 messages_order_1 in + let expected_messages = + [messages_order_1; messages_order_2; messages_no_order] in + let* () = bake_then_check_included_msgs ~__LOC__ ~expected_messages in unit let test_injector_order_operations_by_kind ~kind = -- GitLab