diff --git a/CHANGES.rst b/CHANGES.rst index 14c1d9d1d53eb52fa7916618390859e19bc7e719..9eebc3cb71f1be453912c8dd99320f4f73134aed 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -52,6 +52,12 @@ Docker Images Smart Rollup node ----------------- +- Updated batcher with a new order structure. The RPC + ``/local/batcher/injection`` now has a new query argument + possibility ``"order": ``. The batcher will batch the + received chunk with the following priority order: First chunks with + ascending order then chunks by order of arrival. (MR :gl:`!15672`) + - Updated RPC ``/local/batcher/injection`` with a new query argument possibility. When the rpc contains ``"drop_duplicate": true`` then the batcher will drop the messages that were already injected with a diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 662c86515127b26199acb947dd650c5a33394e7e..8333ff930391d4f2dc6b270e7aa85a7ce5eafa2f 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -245,6 +245,7 @@ let _octez_stdlib_tests = "test_bits"; "test_tzList"; "test_bounded_heap"; + "Test_bounded_min_heap"; "test_tzString"; "test_fallbackArray"; "test_functionalArray"; diff --git a/src/lib_smart_rollup/l2_message.ml b/src/lib_smart_rollup/l2_message.ml index 7dac51c005b41680b0644ed180641d44ce363069..dcf17e7ec224b7526ff503ef2f46e1fc8fc4a37f 100644 --- a/src/lib_smart_rollup/l2_message.ml +++ b/src/lib_smart_rollup/l2_message.ml @@ -23,36 +23,11 @@ (* *) (*****************************************************************************) -type t = { - counter : Z.t option; - (** Each message is given a unique counter to allow for the batcher to - receive multiple identical messages. *) - content : string; (** The actual content of the message. *) -} - let content_encoding = let open Data_encoding in def "sc_l2_message" ~description:"A hex encoded smart rollup message" @@ string' Hex -let encoding = - let open Data_encoding in - conv - (fun {counter; content} -> (counter, content)) - (fun (counter, content) -> {counter; content}) - @@ obj2 (opt "counter" z) (req "content" content_encoding) - -let make = - let counter = ref Z.zero in - fun ~unique content -> - if unique then ( - let m = {content; counter = Some !counter} in - counter := Z.succ !counter ; - m) - else {content; counter = None} - -let content m = m.content - module Id = Tezos_crypto.Blake2B.Make (Tezos_crypto.Base58) @@ -71,4 +46,63 @@ let () = type id = Id.t -let id t = Id.hash_bytes [Data_encoding.Binary.to_bytes_exn encoding t] +let id_encoding = + let open Data_encoding in + obj2 (opt "counter" z) (req "content" content_encoding) + +let id ?counter content = + Id.hash_bytes + [Data_encoding.Binary.to_bytes_exn id_encoding (counter, content)] + +type t = {order : Z.t option; counter : Z.t; id : Id.t; content : string} + +let make = + let counter = ref Z.zero in + fun ?order ~unique content -> + let counter_for_id = if unique then Some !counter else None in + let id = id ?counter:counter_for_id content in + let m = {order; counter = !counter; id; content} in + counter := Z.succ !counter ; + m + +let id t = t.id + +let content t = t.content + +let counter t = t.counter +(* + We need to transform a the encoding to be retro compatible with the previous encoding + + { + "id: , + "message": { + "content": , + "counter: + } + } + + *) + +let encoding = + let open Data_encoding in + conv + (fun {order; counter; id; content} -> (order, id, (content, counter))) + (fun (order, id, (content, counter)) -> {order; counter; id; content}) + @@ obj3 + (opt "order" n) + (req "id" Id.encoding) + (req "message" (obj2 (req "content" content_encoding) (req "counter" n))) + +(* compare order messages in the following order: First are messages + with an `order`, meaning the messages priority was set by the + caller, then the messages are ordered by the timestamp of + registration. *) +let compare msg1 msg2 = + let counter_cmp () = Z.compare msg1.counter msg2.counter in + match (msg1.order, msg2.order) with + | Some o1, Some o2 -> + let cmp = Z.compare o1 o2 in + if cmp = 0 then counter_cmp () else cmp + | None, None -> counter_cmp () + | Some _p, _ -> -1 + | _, Some _p -> 1 diff --git a/src/lib_smart_rollup/l2_message.mli b/src/lib_smart_rollup/l2_message.mli index f2552671e3f6ec5cf7eb37e63cfcae513d406984..e08e22926aca7e229266ad42f6856e2ccbccb255 100644 --- a/src/lib_smart_rollup/l2_message.mli +++ b/src/lib_smart_rollup/l2_message.mli @@ -31,7 +31,7 @@ type t (multiple identical calls to [make] will return different values) because it will be given a unique id. Using [unique = false], on the contrary, makes the call idempotent. *) -val make : unique:bool -> string -> t +val make : ?order:Z.t -> unique:bool -> string -> t (** [content message] returns the string content of [message], i.e. [content (make s) = s]. *) @@ -50,3 +50,7 @@ val content_encoding : string Data_encoding.t val encoding : t Data_encoding.t val id : t -> Id.t + +val counter : t -> Z.t + +val compare : t -> t -> int diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index 5f3d080344b542bb7ea2ca1db50eabb57e40b658..ca4e824ff0ff02ecd5139ebfb91ec38e45abb986 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -219,8 +219,7 @@ module Encodings = struct (req "message_index" int31) (opt "message" Outbox_message.summary_encoding)))) - let queued_message = - obj2 (req "id" L2_message.Id.encoding) (req "message" L2_message.encoding) + let queued_message = L2_message.encoding let batcher_queue = list queued_message @@ -804,15 +803,29 @@ module Local = struct (path / "gc_info") let injection = - let query_drop_duplicate : bool Tezos_rpc.Query.t = + let z_rpc_arg = + Resto.Arg.make + ~name:"z" + ~destruct:(fun s -> Ok (Z.of_string s)) + ~construct:Z.to_string + () + in + let query = let open Tezos_rpc.Query in - query Fun.id - |+ field "drop_duplicate" Tezos_rpc.Arg.bool false Fun.id + query (fun order drop_duplicate -> + object + method order = order + + method drop_duplicate = drop_duplicate + end) + |+ opt_field "order" z_rpc_arg (fun q -> q#order) + |+ field "drop_duplicate" Tezos_rpc.Arg.bool false (fun q -> + q#drop_duplicate) |> seal in Tezos_rpc.Service.post_service ~description:"Inject messages in the batcher's queue" - ~query:query_drop_duplicate + ~query ~input: Data_encoding.( def diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index 88fa5154cc7e9ebf6f1ff0561c10157e95eddde3..83b137fcf361f8d595f246010b4591d679319a18 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -24,7 +24,7 @@ (*****************************************************************************) open Batcher_worker_types -module Message_queue = Hash_queue.Make (L2_message.Id) (L2_message) +module Message_heap = Bounded_min_heap.Make (L2_message.Id) (L2_message) module Batcher_events = Batcher_events.Declare (struct let worker_name = "batcher" @@ -40,7 +40,7 @@ type status = Pending_batch | Batched of Injector.Inj_operation.id type state = { node_ctxt : Node_context.ro; - messages : Message_queue.t; + messages_heap : Message_heap.t; batched : Batched_messages.t; mutable plugin : (module Protocol_plugin_sig.S); } @@ -81,105 +81,87 @@ let max_batch_size {node_ctxt; plugin; _} = ~default:Plugin.Batcher_constants.protocol_max_batch_size let get_batches state ~only_full = - let ( current_rev_batch, - current_batch_size, - current_batch_elements, - full_batches ) = - Message_queue.fold - (fun msg_id - message - ( current_rev_batch, - current_batch_size, - current_batch_elements, - full_batches ) -> + let max_batch_size = max_batch_size state in + let max_batch_elements = state.node_ctxt.config.batcher.max_batch_elements in + let heap = state.messages_heap in + let add_batch rev_batch rev_batches = + if List.is_empty rev_batch then rev_batches + else + let batch = List.rev rev_batch in + batch :: rev_batches + in + let rec pop_until_enough (rev_batch, batch_size, batch_elements) rev_batches = + let message = Message_heap.peek_min heap in + match message with + | None -> add_batch rev_batch rev_batches + | Some message -> let size = message_size (L2_message.content message) in - let new_batch_size = current_batch_size + size in - let new_batch_elements = current_batch_elements + 1 in + let new_batch_size = batch_size + size in + let new_batch_elements = batch_elements + 1 in if - new_batch_size <= max_batch_size state - && new_batch_elements - <= state.node_ctxt.config.batcher.max_batch_elements + new_batch_size <= max_batch_size + && new_batch_elements <= max_batch_elements then + (* pop the message as we only peeked it. *) + let _message = Message_heap.pop heap in (* We can add the message to the current batch because we are still within the bounds. *) - ( (msg_id, message) :: current_rev_batch, - new_batch_size, - new_batch_elements, - full_batches ) + pop_until_enough + (message :: rev_batch, new_batch_size, new_batch_elements) + rev_batches else - (* The batch augmented with the message would be too big but it is - below the limit without it. We finalize the current batch and - create a new one for the message. NOTE: Messages in the queue are - always < [state.conf.max_batch_size] because {!on_register} only - accepts those. *) - let batch = List.rev current_rev_batch in - ([(msg_id, message)], size, 1, batch :: full_batches)) - state.messages - ([], 0, 0, []) - in - let batches = + (* we haven't pop the message, so we can safely call + continue_or_stop. *) + continue_or_stop (add_batch rev_batch rev_batches) + and continue_or_stop rev_batches = if - (not only_full) - || current_batch_size >= state.node_ctxt.config.batcher.min_batch_size - && current_batch_elements - >= state.node_ctxt.config.batcher.min_batch_elements - then - (* We have enough to make a batch with the last non-full batch. *) - List.rev current_rev_batch :: full_batches - else full_batches + only_full + && Message_heap.length heap + < state.node_ctxt.config.batcher.min_batch_elements + then rev_batches + else pop_until_enough ([], 0, 0) rev_batches in - List.fold_left - (fun (batches, to_remove) -> function - | [] -> (batches, to_remove) - | batch -> - let msg_hashes, batch = List.split batch in - let to_remove = List.rev_append msg_hashes to_remove in - (batch :: batches, to_remove)) - ([], []) - batches + continue_or_stop [] |> List.rev let produce_batches state ~only_full = let open Lwt_result_syntax in let start_timestamp = Time.System.now () in - let batches, to_remove = get_batches state ~only_full in + let batches = get_batches state ~only_full in let get_timestamp = Time.System.now () in + let nb_messages_batched = + List.fold_left (fun len l -> len + List.length l) 0 batches + in Metrics.wrap (fun () -> Metrics.Batcher.set_messages_queue_size - @@ Message_queue.length state.messages ; - Metrics.Batcher.set_messages_size @@ List.length to_remove ; + @@ Message_heap.length state.messages_heap ; + Metrics.Batcher.set_messages_size @@ nb_messages_batched ; Metrics.Batcher.set_batches_size @@ List.length batches ; Metrics.Batcher.set_get_time @@ Ptime.diff get_timestamp start_timestamp) ; - match batches with - | [] -> return_unit - | _ -> - let* () = - Metrics.wrap_lwt (fun () -> - Metrics.Batcher.set_last_batch_time start_timestamp ; - let* block = Node_context.last_processed_head_opt state.node_ctxt in - let () = - match block with - | Some block -> - Metrics.Batcher.set_last_batch_level block.header.level - | None -> () - in - return_unit) - in - let* () = inject_batches state batches in - let*! () = - Batcher_events.(emit batched) - (List.length batches, List.length to_remove) - in - let inject_timestamp = Time.System.now () in - Metrics.Batcher.set_inject_time - @@ Ptime.diff inject_timestamp get_timestamp ; - List.iter - (fun tr_hash -> Message_queue.remove state.messages tr_hash) - to_remove ; - return_unit + if List.is_empty batches then return_unit + else + let* () = + Metrics.wrap_lwt (fun () -> + Metrics.Batcher.set_last_batch_time start_timestamp ; + let* block = Node_context.last_processed_head_opt state.node_ctxt in + let () = + match block with + | Some block -> + Metrics.Batcher.set_last_batch_level block.header.level + | None -> () + in + return_unit) + in + let* () = inject_batches state batches in + let*! () = + Batcher_events.(emit batched) (List.length batches, nb_messages_batched) + in + let inject_timestamp = Time.System.now () in + Metrics.Batcher.set_inject_time @@ Ptime.diff inject_timestamp get_timestamp ; + return_unit -let message_already_added state msg_id = - match Batched_messages.find_opt state.batched msg_id with - | None -> false +let message_already_batched state msg = + match Batched_messages.find_opt state.batched (L2_message.id msg) with + | None -> false (* check is done in insertion in the heap *) | Some {l1_id; _} -> (* If injector know about the operation then it's either pending, injected or included and we don't re-inject the @@ -188,8 +170,7 @@ let message_already_added state msg_id = `retention_period`) and the user wants to re-inject it. *) Option.is_some @@ Injector.operation_status l1_id -let on_register ~drop_duplicate state (messages : string list) = - let open Lwt_result_syntax in +let make_l2_messages ?order ~unique state (messages : string list) = let module Plugin = (val state.plugin) in let max_size_msg = min @@ -197,29 +178,52 @@ let on_register ~drop_duplicate state (messages : string list) = + 4 (* We add 4 because [message_size] adds 4. *)) (max_batch_size state) in + List.mapi_e + (fun i message -> + if message_size message > max_size_msg then + error_with "Message %d is too large (max size is %d)" i max_size_msg + else Ok (L2_message.make ?order ~unique message)) + messages + +type error += Heap_insertion_failed of string + +let () = + register_error_kind + ~id:"batcher.heap_insertion_failed" + ~title:"Heap insertion failed" + ~description:"Heap insertion failed" + ~pp:(fun ppf err -> Format.fprintf ppf "Heap insertion failed with %s." err) + `Permanent + Data_encoding.(obj1 (req "error" string)) + (function Heap_insertion_failed error -> Some error | _ -> None) + (fun error -> Heap_insertion_failed error) + +let add_messages_into_heap ~drop_duplicate state = + let open Lwt_result_syntax in + List.map_es @@ fun message -> + let msg_id = L2_message.id message in + let* () = + (* check if already batched *) + if drop_duplicate && message_already_batched state message then + let*! () = Batcher_events.(emit dropped_msg) msg_id in + return_unit + else + let*? () = + Result.map_error (fun err_msg -> [Heap_insertion_failed err_msg]) + @@ Message_heap.insert message state.messages_heap + in + return_unit + in + return msg_id + +let on_register ?order ~drop_duplicate state (messages : string list) = + let open Lwt_result_syntax in + let module Plugin = (val state.plugin) in let*? messages = - List.mapi_e - (fun i message -> - if message_size message > max_size_msg then - error_with "Message %d is too large (max size is %d)" i max_size_msg - else Ok (L2_message.make ~unique:(not drop_duplicate) message)) - messages + make_l2_messages ?order ~unique:(not drop_duplicate) state messages in let*! () = Batcher_events.(emit queue) (List.length messages) in - let*! ids = - List.map_s - (fun message -> - let msg_id = L2_message.id message in - let*! () = - if drop_duplicate && message_already_added state msg_id then - Batcher_events.(emit dropped_msg) msg_id - else ( - Message_queue.replace state.messages msg_id message ; - Lwt.return_unit) - in - Lwt.return msg_id) - messages - in + let* ids = add_messages_into_heap ~drop_duplicate state messages in let+ () = produce_batches state ~only_full:true in ids @@ -228,7 +232,7 @@ let on_new_head state = produce_batches state ~only_full:false let init_batcher_state plugin node_ctxt = { node_ctxt; - messages = Message_queue.create 100_000 (* ~ 400MB *); + messages_heap = Message_heap.create 100_000 (* ~ 400MB *); batched = Batched_messages.create 100_000 (* ~ 400MB *); plugin; } @@ -269,8 +273,8 @@ module Handlers = struct fun w request -> let state = Worker.state w in match request with - | Request.Register {messages; drop_duplicate} -> - protect @@ fun () -> on_register ~drop_duplicate state messages + | Request.Register {order; messages; drop_duplicate} -> + protect @@ fun () -> on_register ?order ~drop_duplicate state messages | Request.Produce_batches -> protect @@ fun () -> on_new_head state type launch_error = error trace @@ -364,13 +368,13 @@ let find_message id = let open Result_syntax in let+ w = worker () in let state = Worker.state w in - Message_queue.find_opt state.messages id + Message_heap.find_opt id state.messages_heap let get_queue () = let open Result_syntax in let+ w = worker () in let state = Worker.state w in - Message_queue.bindings state.messages + Message_heap.elements state.messages_heap let handle_request_error rq = let open Lwt_syntax in @@ -382,12 +386,12 @@ let handle_request_error rq = | Error (Closed (Some errs)) -> Lwt.return_error errs | Error (Any exn) -> Lwt.return_error [Exn exn] -let register_messages ~drop_duplicate messages = +let register_messages ?order ~drop_duplicate messages = let open Lwt_result_syntax in let*? w = worker () in Worker.Queue.push_request_and_wait w - (Request.Register {messages; drop_duplicate}) + (Request.Register {order; messages; drop_duplicate}) |> handle_request_error let produce_batches () = @@ -409,7 +413,7 @@ let shutdown () = | Ok w -> Worker.shutdown w let message_status state msg_id = - match Message_queue.find_opt state.messages msg_id with + match Message_heap.find_opt msg_id state.messages_heap with | Some msg -> Some (Pending_batch, L2_message.content msg) | None -> ( match Batched_messages.find_opt state.batched msg_id with diff --git a/src/lib_smart_rollup_node/batcher.mli b/src/lib_smart_rollup_node/batcher.mli index 6607ec757ab396774a63b051dad8fd5ed820b32f..239cb431dc581859f2397c41d985c2c3f72dffd2 100644 --- a/src/lib_smart_rollup_node/batcher.mli +++ b/src/lib_smart_rollup_node/batcher.mli @@ -54,14 +54,19 @@ val find_message : L2_message.id -> L2_message.t option tzresult (** List all queued messages in the order they appear in the queue, i.e. the message that were added first to the queue are at the end of list. *) -val get_queue : unit -> (L2_message.id * L2_message.t) list tzresult +val get_queue : unit -> L2_message.t list tzresult -(** [register_messages ~drop_duplicate messages] registers new L2 - [messages] in the queue of the batcher for future injection on - L1. If [drop_duplicate = false] then it injects the message even - if it was already injected in a previous l1 operations. *) +(** [register_messages ?order ~drop_duplicate messages] registers + new L2 [messages] in the queue of the batcher for future injection + on L1. If [drop_duplicate = false] then it injects the message + even if it was already injected in a previous l1 operations. If + the priority is set then add the message in the heap at the + correct place. *) val register_messages : - drop_duplicate:bool -> string list -> L2_message.id list tzresult Lwt.t + ?order:Z.t -> + drop_duplicate:bool -> + string list -> + L2_message.id list tzresult Lwt.t (** The status of a message in the batcher. Returns [None] if the message is not known by the batcher (the batcher only keeps the batched status of the last diff --git a/src/lib_smart_rollup_node/batcher_worker_types.ml b/src/lib_smart_rollup_node/batcher_worker_types.ml index b49cf016a0b6468732bceca9331aa8c7a5d02b46..a55b1a9a1f59bb1e9ba869795a875645e08e0048 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.ml +++ b/src/lib_smart_rollup_node/batcher_worker_types.ml @@ -26,6 +26,7 @@ module Request = struct type ('a, 'b) t = | Register : { + order : Z.t option; messages : string list; drop_duplicate : bool; } @@ -43,16 +44,17 @@ module Request = struct case (Tag 0) ~title:"Register" - (obj3 + (obj4 (req "request" (constant "register")) + (opt "order" n) (req "messages" (list L2_message.content_encoding)) (req "drop_duplicate" bool)) (function - | View (Register {messages; drop_duplicate}) -> - Some ((), messages, drop_duplicate) + | View (Register {order; messages; drop_duplicate}) -> + Some ((), order, messages, drop_duplicate) | _ -> None) - (fun ((), messages, drop_duplicate) -> - View (Register {messages; drop_duplicate})); + (fun ((), order, messages, drop_duplicate) -> + View (Register {order; messages; drop_duplicate})); case (Tag 1) ~title:"Produce_batches" @@ -63,7 +65,7 @@ module Request = struct let pp ppf (View r) = match r with - | Register {messages; drop_duplicate} -> + | Register {order = _; messages; drop_duplicate} -> Format.fprintf ppf "register %d new L2 message%a" diff --git a/src/lib_smart_rollup_node/batcher_worker_types.mli b/src/lib_smart_rollup_node/batcher_worker_types.mli index f67af7bf73b88ce1cb11c02c51ae3f3fe909f7b2..7d70fa678991e20d5800c88ced86345c289a6436 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.mli +++ b/src/lib_smart_rollup_node/batcher_worker_types.mli @@ -30,6 +30,7 @@ module Request : sig (** Type of requests accepted by the batcher worker. *) type ('a, 'b) t = | Register : { + order : Z.t option; messages : string list; drop_duplicate : bool; } diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 47ec5032a8535708ede95ad1f9bade9c7a14e6ad..4436ddf29e4cf013e669591f533e7433208b7d3c 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -451,8 +451,11 @@ let () = let () = Local_directory.register0 Rollup_node_services.Local.injection - @@ fun _node_ctxt drop_duplicate messages -> - Batcher.register_messages ~drop_duplicate messages + @@ fun _node_ctxt query messages -> + Batcher.register_messages + ?order:query#order + ~drop_duplicate:query#drop_duplicate + messages let () = Local_directory.register0 Rollup_node_services.Local.dal_batcher_injection @@ -466,10 +469,7 @@ let () = let () = Local_directory.register0 Rollup_node_services.Local.batcher_queue - @@ fun _node_ctxt () () -> - let open Lwt_result_syntax in - let*? queue = Batcher.get_queue () in - return queue + @@ fun _node_ctxt () () -> Batcher.get_queue () |> Lwt.return (** [commitment_level_of_inbox_level node_ctxt inbox_level] returns the level of the commitment which should include the inbox of level diff --git a/src/lib_stdlib/bounded_min_heap.ml b/src/lib_stdlib/bounded_min_heap.ml new file mode 100644 index 0000000000000000000000000000000000000000..689a8e651a797a1c0f6151b9f5b456787847fd88 --- /dev/null +++ b/src/lib_stdlib/bounded_min_heap.ml @@ -0,0 +1,244 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(* This heap structure could maybe be improved by maintaining a start + index. This would reduce the complexity of `pop` to O(1) but would + requires more complex bubble_ functions. Also, it could be + interesting to add `pop_n` and `peek_n`, this might to changes the + type of `t`, which is why it was not introduced at first to reduce + the complexity of it. *) + +module Make + (Id : Stdlib.Hashtbl.HashedType) + (E : sig + include Stdlib.Set.OrderedType + + val id : t -> Id.t + end) = +struct + module Hash_map = Hashtbl.Make (Id) + + (* Invariants : + - all indices between 0 and size (excluded) have data: + size = + Array.fold_left + (fun count e -> if Option.is_some e then count + 1 else count) 0 data + - the heap property itself: + let P(i, j) = + 0 <= i < size /\ 0 <= j < size => + data.(j) = None \/ E.compare data.(i) data.(j) <= 0 + in \forall 0 <= i < size, P(i, 2 * i + 1) /\ P(i, 2 * (i + 1)) + - the size of [hash_map] is equal to [size] + - the hash_map is the map from id to indice: + \forall 0 <= i < size, hash_map( id data.(i) ) = i + *) + type t = { + mutable size : int; + data : E.t option array; + hash_map : int Hash_map.t; + } + + let create capacity = + if capacity < 0 || capacity > Sys.max_array_length then + invalid_arg + (Format.sprintf "Bounded_min_heap.Make(_).create. capacity %d" capacity) ; + { + size = 0; + data = Array.make capacity None; + hash_map = Hash_map.create capacity; + } + + let get_data data i = + match data.(i) with + | Some elt -> elt + | None -> + (* only used locally to get data we are sure exists*) + assert false + + let swap i j t = + let j_elt = t.data.(j) in + let i_elt = t.data.(i) in + let j_id = Option.map E.id j_elt in + let i_id = Option.map E.id i_elt in + + (* replace in heap structure *) + t.data.(j) <- i_elt ; + t.data.(i) <- j_elt ; + + (* replace in id -> index structure *) + Option.iter (fun j_id -> Hash_map.replace t.hash_map j_id i) j_id ; + Option.iter (fun i_id -> Hash_map.replace t.hash_map i_id j) i_id + + let parent i = (i - 1) / 2 + + let left_child i = (2 * i) + 1 + + let right_child i = 2 * (i + 1) + + let childrens i = (left_child i, right_child i) + + (* Bubble up the value located at index [i] such until the t + property is locally fulfilled *) + let bubble_up i t = + let rec loop i = + let p = parent i in + if E.compare (get_data t.data i) (get_data t.data p) < 0 then ( + swap i p t ; + loop p) + in + loop i + + (* Bubble down the value at index [i] until the t property is + locally fulfilled, aka the value at index [i] is smaller than the + one of its two children *) + let bubble_down i ({size; data; _} as t) = + let rec loop i = + let left_index, right_index = childrens i in + let value = get_data data i in + if left_index < size then ( + assert (data.(left_index) <> None) ; + let left_value = get_data data left_index in + if right_index < size then ( + (* swap the value with the smallest of its two children *) + assert (data.(right_index) <> None) ; + let right_value = get_data data right_index in + if E.compare right_value left_value < 0 then ( + if E.compare value right_value > 0 then ( + swap i right_index t ; + loop right_index)) + else if E.compare value left_value > 0 then ( + swap i left_index t ; + loop left_index)) + else if + E.compare value left_value > 0 + (* swap the value with its left child, since the right one does not exist *) + then ( + swap i left_index t ; + loop left_index)) + in + loop i + + let add t id x pos = + t.data.(pos) <- Some x ; + t.size <- pos + 1 ; + Hash_map.replace t.hash_map id pos ; + bubble_up pos t + + let insert x t = + let id = E.id x in + let exits_elt_i_opt = Hash_map.find_opt t.hash_map id in + match exits_elt_i_opt with + (* Element already exists, we replace it if the order is + inferior. *) + | Some elt_i -> + let elt = get_data t.data elt_i in + if E.compare x elt < 0 then ( + t.data.(elt_i) <- Some x ; + bubble_up elt_i t) ; + Ok () + | None -> + let pos = t.size in + let data = t.data in + if pos < Array.length data then ( + add t id x pos ; + Ok ()) + else Error "no space left in heap" + + let pop t = + if t.size = 0 then None + else + let min_elem = get_data t.data 0 in + t.data.(0) <- None ; + t.size <- t.size - 1 ; + if t.size > 0 then ( + swap 0 t.size t ; + bubble_down 0 t) ; + let id = E.id min_elem in + Hash_map.remove t.hash_map id ; + assert (Hash_map.length t.hash_map = t.size) ; + Some min_elem + + let peek_min t = if t.size = 0 then None else t.data.(0) + + let elements t = + let a = Array.init t.size (get_data t.data) in + Array.sort E.compare a ; + Array.to_list a + + let length {size; _} = size + + let mem elt t = + let id = E.id elt in + Hash_map.mem t.hash_map id + + let find_opt id t = + let elt_i = Hash_map.find_opt t.hash_map id in + Option.map (get_data t.data) elt_i + + module Internal_for_tests = struct + let check_heap_invariant ~pp_elt h = + let get_data_opt i = + if i < Array.length h.data then h.data.(i) else None + in + let is_greater_or_none x i = + match (x, get_data_opt i) with + | Some _x, None -> true + | Some x, Some l -> E.compare x l < 0 + | None, Some _ | None, None -> false (* impossible case *) + in + let rec check_invariant_for_all_index i = + if i = 0 then () + else + let index = i - 1 in + let data_opt = h.data.(index) in + let id_opt = Option.map E.id data_opt in + let index_opt = Option.map (Hash_map.find h.hash_map) id_opt in + let has_data = Option.is_some data_opt in + let has_map_index = Option.is_some index_opt in + let map_index_is_correct = + Option.value ~default:false + @@ Option.map (Int.equal index) index_opt + in + let left_child_index = left_child index in + let right_child_index = right_child index in + let left_child_is_none_or_inf = + is_greater_or_none data_opt left_child_index + in + let right_child_is_none_or_inf = + is_greater_or_none data_opt right_child_index + in + let correct_map_size = Hash_map.length h.hash_map = h.size in + let is_correct = + has_data && has_map_index && map_index_is_correct + && left_child_is_none_or_inf && right_child_is_none_or_inf + && correct_map_size + in + if is_correct then check_invariant_for_all_index (i - 1) + else + failwith + (Format.asprintf + "invariant for index %d are invalid:@.has_data: \ + %b@.has_map_index: %b@.map_index_is_correct: %b@.left_child \ + : (valid: %b, %d, %a)@.right_child: (valid: %b, %d, \ + %a)@.correct_map_size: %b@." + index + has_data + has_map_index + map_index_is_correct + left_child_is_none_or_inf + left_child_index + Format.(pp_print_option pp_elt) + (get_data_opt left_child_index) + right_child_is_none_or_inf + right_child_index + Format.(pp_print_option pp_elt) + (get_data_opt right_child_index) + correct_map_size) + in + check_invariant_for_all_index h.size + end +end diff --git a/src/lib_stdlib/bounded_min_heap.mli b/src/lib_stdlib/bounded_min_heap.mli new file mode 100644 index 0000000000000000000000000000000000000000..a65da7ad3f39048aec314f876e34ef76d95c6ca8 --- /dev/null +++ b/src/lib_stdlib/bounded_min_heap.mli @@ -0,0 +1,73 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Bounded min-heap: keep only the [n] smallest elements. *) + +(** + [Make (ID) (E)] creates a min heap for element of type E.t using + the [E.compare] function. The heap is bounded and adding more + elements than its capacity will fail. The module [ID] is used to + check if an element already exists at insertion. +*) +module Make + (Id : Stdlib.Hashtbl.HashedType) + (E : sig + include Stdlib.Set.OrderedType + + val id : t -> Id.t + end) : sig + module Hash_map : Hashtbl.S with type key := Id.t + + type t + + (** [create capacity] creates a bounded sequence of at most + [capacity] elements. + + Raise [Invalid_argument] if [size < 0] or [size > Sys.max_array_length]. *) + val create : int -> t + + (** [insert e h] adds element [e] to bounded sequence [h] if [h] is + not full. + + Worst-case complexity: O(log n) where n is the capacity of the + heap. + + If there is an element [e'] with [E.id e' = E.id e], it will be + replaced iff [E.compare e e' < 0] . *) + val insert : E.t -> t -> (unit, string) result + + (** [pop h] removes the smallest element from the heap [h], + [None] if empty. *) + val pop : t -> E.t option + + (** [peek_min h] returns, without removing, the smallest element from + the heap [h], [None] if empty. *) + val peek_min : t -> E.t option + + (** [elements h] returns the contents of [h] as a sorted + list in increasing order according to [E.compare]. + + Worst-case complexity: O(n log n) where n is the size of the heap. + *) + val elements : t -> E.t list + + (** [length h] is the number of elements held by [h]. *) + val length : t -> int + + (** [mem elt h] returns true is the [elt] already exists in + [h], i.e. E.compare returns 0. *) + val mem : E.t -> t -> bool + + (** [find id h] returns the first elements of [h], that as [E.id e = + id] is true. *) + val find_opt : Id.t -> t -> E.t option + + module Internal_for_tests : sig + val check_heap_invariant : + pp_elt:(Format.formatter -> E.t -> unit) -> t -> unit + end +end diff --git a/src/lib_stdlib/test/dune b/src/lib_stdlib/test/dune index a47fa826e1ba67d62a49f334950d0bd42a7dc48b..28f60c028b66379090d5f908753c1d1083dfce99 100644 --- a/src/lib_stdlib/test/dune +++ b/src/lib_stdlib/test/dune @@ -23,6 +23,7 @@ test_bits test_tzList test_bounded_heap + Test_bounded_min_heap test_tzString test_fallbackArray test_functionalArray diff --git a/src/lib_stdlib/test/test_bounded_min_heap.ml b/src/lib_stdlib/test/test_bounded_min_heap.ml new file mode 100644 index 0000000000000000000000000000000000000000..e6fb0d4b12500cb6c7d7b138080f2b0340841f1c --- /dev/null +++ b/src/lib_stdlib/test/test_bounded_min_heap.ml @@ -0,0 +1,124 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Testing + ------- + Component: Standard library + Invocation: dune exec src/lib_stdlib/test/main.exe \ + -- -f src/lib_stdlib/test/test_bounded_min_heap.ml + Subject: Unit tests bounded min heap +*) + +module Assert = Assert + +module Id = struct + include String + + let hash = Stdlib.Hashtbl.hash +end + +let id i = string_of_int i + +module B = + Bounded_min_heap.Make + (Id) + (struct + include Int + + let id = id + end) + +let take_nth_smallest n l = + TzList.take_n n (List.sort (fun x y -> Compare.Int.compare x y) l) + +(* At least 2 elements, since we'll create a bounded set of size #elements / 2 + *) +let size = QCheck2.Gen.int_range 2 1000 + +let check_insert_heap ?(expect_heap_is_full = false) v b = + let res = B.insert v b in + let () = + if expect_heap_is_full then + let error = "no space left in heap" in + Assert.equal ~msg:__LOC__ res (Error error) + else Assert.equal ~msg:__LOC__ res (Ok ()) + in + B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b + +let test_empty_works () = + let b = B.create 0 in + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_heap ~expect_heap_is_full:true 1 b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] + +let test_heap_works () = + let b = B.create 10 in + let check_insert_list l = List.iter (fun v -> check_insert_heap v b) l in + let check_pop ~__LOC__ = + List.iter (fun v -> + let extracted = B.pop b in + B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b ; + Assert.equal + ~pp:Format.(pp_print_option pp_print_int) + ~msg:__LOC__ + extracted + (Some v)) + in + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_list [3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 1) ; + check_pop ~__LOC__ [1; 2; 3] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_list [10; 4; 8; 2; 6] ; + Assert.equal ~msg:__LOC__ (B.elements b) [2; 4; 6; 8; 10] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 2) ; + check_pop ~__LOC__ [2; 4] ; + Assert.equal ~msg:__LOC__ (B.elements b) [6; 8; 10] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 6) ; + check_insert_list [9; 1; 7; 3; 5; 2; 4] ; + Assert.equal + ~pp:Format.(pp_print_list pp_print_int) + ~msg:__LOC__ + (B.elements b) + [1; 2; 3; 4; 5; 6; 7; 8; 9; 10] ; + check_insert_heap ~expect_heap_is_full:true 0 b ; + check_insert_heap ~expect_heap_is_full:true 11 b ; + Assert.is_true + ~loc:__LOC__ + (List.for_all (fun v -> B.mem v b) [10; 9; 8; 7; 6; 5; 4; 3; 2; 1]) ; + Assert.is_false ~loc:__LOC__ (B.mem 0 b) ; + Assert.is_false ~loc:__LOC__ (B.mem 11 b) ; + Assert.equal + ~msg:__LOC__ + (List.for_all + (fun v -> + let id = id v in + let v' = B.find_opt id b in + v' = Some v) + [10; 9; 8; 7; 6; 5; 4; 3; 2; 1]) + true ; + Assert.equal ~msg:__LOC__ (B.find_opt (id 0) b) None ; + Assert.equal ~msg:__LOC__ (B.find_opt (id 11) b) None ; + () + +let () = + Alcotest.run + ~__FILE__ + "stdlib" + [ + ( "Bounded_min_heap", + [ + ("create 0 works", `Quick, test_empty_works); + ("basic operation works", `Quick, test_heap_works); + ] ); + ] diff --git a/src/lib_stdlib/tezos_stdlib.ml b/src/lib_stdlib/tezos_stdlib.ml index 11ffe56c1fb9983a8fab38ef720347d7bcb49920..fb644e1bcb5571dfd5a27fc30d69903b39a8b820 100644 --- a/src/lib_stdlib/tezos_stdlib.ml +++ b/src/lib_stdlib/tezos_stdlib.ml @@ -17,6 +17,7 @@ module Bits = Bits module Bloomer = Bloomer module Bounded_heap = Bounded_heap +module Bounded_min_heap = Bounded_min_heap module Circular_buffer = Circular_buffer module Compare = Compare module FallbackArray = FallbackArray diff --git a/tezt/lib_tezos/sc_rollup_rpc.ml b/tezt/lib_tezos/sc_rollup_rpc.ml index 1d14fa9bdd854493317b2563911afe20608616ce..3bacd84fed24f591754a7f3675133068a5ec8cb5 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.ml +++ b/tezt/lib_tezos/sc_rollup_rpc.ml @@ -243,17 +243,21 @@ let get_global_block_durable_state_value ?(block = "head") ~pvm_kind ~operation ["global"; "block"; block; "durable"; pvm_kind; op] (f operation) -let post_local_batcher_injection ?drop_duplicate ~messages () = +let post_local_batcher_injection ?order ?drop_duplicate ~messages () = let data = Data (`A (List.map (fun s -> `String Hex.(of_string s |> show)) messages)) in let query_string = - Option.map (fun b -> [("drop_duplicate", string_of_bool b)]) drop_duplicate + let order = Option.map (fun v -> ("order", string_of_int v)) order in + let drop_duplicate = + Option.map (fun b -> ("drop_duplicate", string_of_bool b)) drop_duplicate + in + Option.to_list order @ Option.to_list drop_duplicate in make POST ["local"; "batcher"; "injection"] - ?query_string + ~query_string ~data JSON.(fun json -> as_list json |> List.map as_string) diff --git a/tezt/lib_tezos/sc_rollup_rpc.mli b/tezt/lib_tezos/sc_rollup_rpc.mli index 8b83bdc3b68e5deb4b3ee6cb54f436c6d199cf3d..34e385512105b7493ff03c6f844820ad80a5df90 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.mli +++ b/tezt/lib_tezos/sc_rollup_rpc.mli @@ -172,7 +172,11 @@ val get_global_block_durable_state_value : queue the rollup node's batcher and returns the list of message hashes injected. *) val post_local_batcher_injection : - ?drop_duplicate:bool -> messages:string list -> unit -> string list RPC_core.t + ?order:int -> + ?drop_duplicate:bool -> + messages:string list -> + unit -> + string list RPC_core.t (** RPC: [POST local/dal/batcher/injection] injects the given [messages] in the rollup node's DAL queue. *) diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 323630b1f5f5bfe70423cf1c43922af24181e3a2..16cb16b66c8e4b7f45e7bdf2809b708d44e8a972 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -5794,11 +5794,198 @@ let test_multiple_batcher_key ~kind = ~error_msg:"Reused %L keys but should have rotated not reused any." ; unit +let test_batcher_order_msgs ~kind = + test_full_scenario + { + tags = ["batcher"; "messages"; "order"]; + variant = None; + description = "rollup node - Batcher order message correctly"; + } + ~mode:Batcher + ~kind + ~operators:[(Sc_rollup_node.Batching, Constant.bootstrap1.alias)] + @@ fun _protocol rollup_node rollup_addr node client -> + let min_batch_elements = 10 in + let max_batch_elements = 20 in + let* _config = Sc_rollup_node.config_init rollup_node rollup_addr in + let () = + Sc_rollup_node.Config_file.update rollup_node + @@ JSON.update "batcher" + @@ fun json -> + let json = + JSON.put + ( "min_batch_elements", + JSON.annotate + ~origin:"min batch elements size" + (`Float (Int.to_float min_batch_elements)) ) + json + in + let json = + JSON.put + ( "max_batch_elements", + JSON.annotate + ~origin:"max batch elements size" + (`Float (Int.to_float max_batch_elements)) ) + json + in + json + in + + let inject_int_of_string ?order ?drop_duplicate messages = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.post_local_batcher_injection + ?order + ?drop_duplicate + ~messages:(List.map string_of_int messages) + () + in + let wait_for_included_and_returns_msgs () = + let find_map_op_content op_content_json = + let kind = JSON.(op_content_json |-> "kind" |> as_string) in + if kind = "smart_rollup_add_messages" then + let msgs = + JSON.(op_content_json |-> "message" |> as_list |> List.map as_string) + in + Some msgs + else None + in + wait_for_included_and_map_ops_content + rollup_node + node + ~timeout:30. + ~find_map_op_content + in + + let check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages = + let* queued_hashes, queued_msgs = + let* queued_msgs = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.get_local_batcher_queue () + in + return @@ List.split queued_msgs + in + Check.( + (List.sort String.compare queued_hashes + = List.sort_uniq String.compare expected_queued_hashes) + (list string) + ~__LOC__) + ~error_msg:"Queued msgs hashes %L was found but %R was expected" ; + + Check.( + (List.map int_of_string queued_msgs = expected_messages) + (list int) + ~__LOC__) + ~error_msg:"Queued msgs %L was found but %R was expected" ; + unit + in + + let bake_then_check_included_msgs ~__LOC__ ~nb_batches ~expected_messages = + let* level = Node.get_level node in + let wait_for_injected = + wait_until_n_batches_are_injected rollup_node ~nb_batches + in + let* () = Client.bake_for_and_wait client + and* () = wait_for_injected + and* _lvl = Sc_rollup_node.wait_for_level rollup_node (level + 1) in + let wait_for_included = wait_for_included_and_returns_msgs () in + let* () = Client.bake_for_and_wait client + and* msgs = wait_for_included + and* _lvl = Sc_rollup_node.wait_for_level rollup_node (level + 2) in + + Check.( + (List.map + (List.map (fun s -> int_of_string @@ Hex.to_string (`Hex s))) + msgs + = expected_messages) + (list (list int)) + ~__LOC__) + ~error_msg:"Included msgs %L was found but %R was expected" ; + unit + in + + let* level = Node.get_level node in + let* () = Sc_rollup_node.run ~event_level:`Debug rollup_node rollup_addr [] in + let* _ = Sc_rollup_node.wait_for_level rollup_node level in + + (* To make sure we are all bootstrapped, might be unused *) + Log.info "injecting 3 messages with order specified (e.g. [10; 20; 30])" ; + let* hashes_1 = + fold 3 [] (fun i acc -> + let order = 10 * (i + 1) in + let* hashes = inject_int_of_string ~order [order] in + return @@ hashes @ acc) + in + Log.info + "injecting 4 messages with order specified in reverses (e.g. [ 30; 20; 10])" ; + let* hashes_2 = + fold 3 [] (fun i acc -> + let order = 30 - (10 * i) in + let* hashes = inject_int_of_string ~order [order] in + return @@ hashes @ acc) + in + Log.info "injecting 1 message with no order specified" ; + let* hashes_3 = inject_int_of_string [31] in + Log.info "injecting 2 time the same message with order 11 and ~drop_duplicate" ; + let* hashes_4 = inject_int_of_string ~order:11 ~drop_duplicate:true [1; 1] in + Log.info "Reinjecting the same message with order 0" ; + let* hashes_5 = inject_int_of_string ~order:0 ~drop_duplicate:true [1] in + let expected_queued_hashes = + hashes_1 @ hashes_2 @ hashes_3 @ hashes_4 @ hashes_5 + in + + let expected_messages = [1; 10; 10; 20; 20; 30; 30; 31] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + let* () = + bake_then_check_included_msgs + ~__LOC__ + ~nb_batches:1 + ~expected_messages:[expected_messages] + in + + Log.info + "Injecting lots of messages to make sure order is preserved over batches, \ + e.g. first batches contains elements smaller than second batch." ; + let half_min_batch = min_batch_elements / 2 in + let* hashes_1 = + (* [0; 10; ... 40] *) + inject_int_of_string @@ List.init half_min_batch (fun i -> i * 10) + in + let* hashes_2 = + (* [50; 60; ... 80] *) + inject_int_of_string + @@ List.init (half_min_batch - 1) (fun i -> (half_min_batch + i) * 10) + in + let expected_queued_hashes = hashes_1 @ hashes_2 in + let expected_messages = + List.init (min_batch_elements - 1) (fun i -> i * 10) + in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + + (* this willl trigger batch production *) + let* _hashes_4 = + (* [90; 100; ... 290] *) + inject_int_of_string + (((min_batch_elements - 1) * 10) + :: List.init max_batch_elements (fun i -> (min_batch_elements + i) * 10)) + in + let expected_messages = + [ + (* [0; 10; ... 190] *) + List.init max_batch_elements (fun i -> i * 10); + (* [200; 210; ... 290] *) + List.init min_batch_elements (fun i -> (max_batch_elements + i) * 10); + ] + in + let* () = + bake_then_check_included_msgs ~__LOC__ ~nb_batches:1 ~expected_messages + in + unit + (** Injector only uses key that have no operation in the mempool currently. 1. Batcher setup: - 5 signers in the batcher. - - 10 batches to inject. + - 10 batches to inject_int_of_string. 2. First Block: - Mempool: - Contains enough batches to fill the block from users with high fees. @@ -5923,8 +6110,8 @@ let test_injector_uses_available_keys ~kind = let* _lvl = Client.bake_for_and_wait client in let* () = inject_n_msgs_batches_in_rollup_node - (* we inject 2 times the number of operators so the rollup node - must inject in two salvos all the batches. *) + (* we inject_int_of_string 2 times the number of operators so the rollup node + must inject_int_of_string in two salvos all the batches. *) ~nb_of_batches:(2 * nb_operators) ~msg_per_batch ~msg_size @@ -5968,7 +6155,7 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = } @@ fun _protocol rollup_node rollup_addr _node client -> Log.info "Batcher setup" ; - let inject ~drop_duplicate ~messages = + let inject_int_of_string ~drop_duplicate ~messages = Sc_rollup_node.RPC.call rollup_node @@ Sc_rollup_rpc.post_local_batcher_injection ~drop_duplicate ~messages () in @@ -6014,7 +6201,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = let* () = if inject_msgs then ( - let* message_ids = inject ~drop_duplicate:true ~messages in + let* message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let message_id = List.nth message_ids 0 in Check.( (message_id = check_message_id) @@ -6045,9 +6234,13 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = [Injector_retention_period 1] and* _lvl = Sc_rollup_node.wait_sync rollup_node ~timeout:10. in - let* messages_id_first = inject ~drop_duplicate:false ~messages in + let* messages_id_first = + inject_int_of_string ~drop_duplicate:false ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:2 () in - let* messages_id_second = inject ~drop_duplicate:false ~messages in + let* messages_id_second = + inject_int_of_string ~drop_duplicate:false ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:4 () in Check.( (messages_id_first <> messages_id_second) @@ -6055,7 +6248,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = (list string) ~error_msg:"first messages batch id %L must be different than second %R") ; - let* first_message_ids = inject ~drop_duplicate:true ~messages in + let* first_message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:5 () in let check_message_id = List.nth first_message_ids 0 in @@ -6068,7 +6263,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = "messages id with check on must be equal (given %L, expected %R).") ; Log.info "Resubmiting the same message dont add it to the queue." ; - let* second_message_ids = inject ~drop_duplicate:true ~messages in + let* second_message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:5 () in Check.( (first_message_ids = second_message_ids) @@ -6439,6 +6636,7 @@ let register_protocol_independent () = test_accuser protocols ; test_bailout_refutation protocols ; test_multiple_batcher_key ~kind protocols ; + test_batcher_order_msgs ~kind protocols ; test_injector_uses_available_keys protocols ~kind ; test_batcher_dont_reinject_already_injected_messages protocols ~kind ; test_private_rollup_node_publish_in_whitelist protocols ;