From 5b293a415b1a9ac21cee31fd8bfd54363ad50b0e Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 20 Nov 2024 19:56:05 +0100 Subject: [PATCH 1/6] stdlib: add bounded_min_heap structure --- manifest/product_octez.ml | 1 + src/lib_stdlib/bounded_min_heap.ml | 244 +++++++++++++++++++ src/lib_stdlib/bounded_min_heap.mli | 73 ++++++ src/lib_stdlib/test/dune | 1 + src/lib_stdlib/test/test_bounded_min_heap.ml | 124 ++++++++++ src/lib_stdlib/tezos_stdlib.ml | 1 + 6 files changed, 444 insertions(+) create mode 100644 src/lib_stdlib/bounded_min_heap.ml create mode 100644 src/lib_stdlib/bounded_min_heap.mli create mode 100644 src/lib_stdlib/test/test_bounded_min_heap.ml diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 662c86515127..8333ff930391 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -245,6 +245,7 @@ let _octez_stdlib_tests = "test_bits"; "test_tzList"; "test_bounded_heap"; + "Test_bounded_min_heap"; "test_tzString"; "test_fallbackArray"; "test_functionalArray"; diff --git a/src/lib_stdlib/bounded_min_heap.ml b/src/lib_stdlib/bounded_min_heap.ml new file mode 100644 index 000000000000..689a8e651a79 --- /dev/null +++ b/src/lib_stdlib/bounded_min_heap.ml @@ -0,0 +1,244 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(* This heap structure could maybe be improved by maintaining a start + index. This would reduce the complexity of `pop` to O(1) but would + requires more complex bubble_ functions. Also, it could be + interesting to add `pop_n` and `peek_n`, this might to changes the + type of `t`, which is why it was not introduced at first to reduce + the complexity of it. *) + +module Make + (Id : Stdlib.Hashtbl.HashedType) + (E : sig + include Stdlib.Set.OrderedType + + val id : t -> Id.t + end) = +struct + module Hash_map = Hashtbl.Make (Id) + + (* Invariants : + - all indices between 0 and size (excluded) have data: + size = + Array.fold_left + (fun count e -> if Option.is_some e then count + 1 else count) 0 data + - the heap property itself: + let P(i, j) = + 0 <= i < size /\ 0 <= j < size => + data.(j) = None \/ E.compare data.(i) data.(j) <= 0 + in \forall 0 <= i < size, P(i, 2 * i + 1) /\ P(i, 2 * (i + 1)) + - the size of [hash_map] is equal to [size] + - the hash_map is the map from id to indice: + \forall 0 <= i < size, hash_map( id data.(i) ) = i + *) + type t = { + mutable size : int; + data : E.t option array; + hash_map : int Hash_map.t; + } + + let create capacity = + if capacity < 0 || capacity > Sys.max_array_length then + invalid_arg + (Format.sprintf "Bounded_min_heap.Make(_).create. capacity %d" capacity) ; + { + size = 0; + data = Array.make capacity None; + hash_map = Hash_map.create capacity; + } + + let get_data data i = + match data.(i) with + | Some elt -> elt + | None -> + (* only used locally to get data we are sure exists*) + assert false + + let swap i j t = + let j_elt = t.data.(j) in + let i_elt = t.data.(i) in + let j_id = Option.map E.id j_elt in + let i_id = Option.map E.id i_elt in + + (* replace in heap structure *) + t.data.(j) <- i_elt ; + t.data.(i) <- j_elt ; + + (* replace in id -> index structure *) + Option.iter (fun j_id -> Hash_map.replace t.hash_map j_id i) j_id ; + Option.iter (fun i_id -> Hash_map.replace t.hash_map i_id j) i_id + + let parent i = (i - 1) / 2 + + let left_child i = (2 * i) + 1 + + let right_child i = 2 * (i + 1) + + let childrens i = (left_child i, right_child i) + + (* Bubble up the value located at index [i] such until the t + property is locally fulfilled *) + let bubble_up i t = + let rec loop i = + let p = parent i in + if E.compare (get_data t.data i) (get_data t.data p) < 0 then ( + swap i p t ; + loop p) + in + loop i + + (* Bubble down the value at index [i] until the t property is + locally fulfilled, aka the value at index [i] is smaller than the + one of its two children *) + let bubble_down i ({size; data; _} as t) = + let rec loop i = + let left_index, right_index = childrens i in + let value = get_data data i in + if left_index < size then ( + assert (data.(left_index) <> None) ; + let left_value = get_data data left_index in + if right_index < size then ( + (* swap the value with the smallest of its two children *) + assert (data.(right_index) <> None) ; + let right_value = get_data data right_index in + if E.compare right_value left_value < 0 then ( + if E.compare value right_value > 0 then ( + swap i right_index t ; + loop right_index)) + else if E.compare value left_value > 0 then ( + swap i left_index t ; + loop left_index)) + else if + E.compare value left_value > 0 + (* swap the value with its left child, since the right one does not exist *) + then ( + swap i left_index t ; + loop left_index)) + in + loop i + + let add t id x pos = + t.data.(pos) <- Some x ; + t.size <- pos + 1 ; + Hash_map.replace t.hash_map id pos ; + bubble_up pos t + + let insert x t = + let id = E.id x in + let exits_elt_i_opt = Hash_map.find_opt t.hash_map id in + match exits_elt_i_opt with + (* Element already exists, we replace it if the order is + inferior. *) + | Some elt_i -> + let elt = get_data t.data elt_i in + if E.compare x elt < 0 then ( + t.data.(elt_i) <- Some x ; + bubble_up elt_i t) ; + Ok () + | None -> + let pos = t.size in + let data = t.data in + if pos < Array.length data then ( + add t id x pos ; + Ok ()) + else Error "no space left in heap" + + let pop t = + if t.size = 0 then None + else + let min_elem = get_data t.data 0 in + t.data.(0) <- None ; + t.size <- t.size - 1 ; + if t.size > 0 then ( + swap 0 t.size t ; + bubble_down 0 t) ; + let id = E.id min_elem in + Hash_map.remove t.hash_map id ; + assert (Hash_map.length t.hash_map = t.size) ; + Some min_elem + + let peek_min t = if t.size = 0 then None else t.data.(0) + + let elements t = + let a = Array.init t.size (get_data t.data) in + Array.sort E.compare a ; + Array.to_list a + + let length {size; _} = size + + let mem elt t = + let id = E.id elt in + Hash_map.mem t.hash_map id + + let find_opt id t = + let elt_i = Hash_map.find_opt t.hash_map id in + Option.map (get_data t.data) elt_i + + module Internal_for_tests = struct + let check_heap_invariant ~pp_elt h = + let get_data_opt i = + if i < Array.length h.data then h.data.(i) else None + in + let is_greater_or_none x i = + match (x, get_data_opt i) with + | Some _x, None -> true + | Some x, Some l -> E.compare x l < 0 + | None, Some _ | None, None -> false (* impossible case *) + in + let rec check_invariant_for_all_index i = + if i = 0 then () + else + let index = i - 1 in + let data_opt = h.data.(index) in + let id_opt = Option.map E.id data_opt in + let index_opt = Option.map (Hash_map.find h.hash_map) id_opt in + let has_data = Option.is_some data_opt in + let has_map_index = Option.is_some index_opt in + let map_index_is_correct = + Option.value ~default:false + @@ Option.map (Int.equal index) index_opt + in + let left_child_index = left_child index in + let right_child_index = right_child index in + let left_child_is_none_or_inf = + is_greater_or_none data_opt left_child_index + in + let right_child_is_none_or_inf = + is_greater_or_none data_opt right_child_index + in + let correct_map_size = Hash_map.length h.hash_map = h.size in + let is_correct = + has_data && has_map_index && map_index_is_correct + && left_child_is_none_or_inf && right_child_is_none_or_inf + && correct_map_size + in + if is_correct then check_invariant_for_all_index (i - 1) + else + failwith + (Format.asprintf + "invariant for index %d are invalid:@.has_data: \ + %b@.has_map_index: %b@.map_index_is_correct: %b@.left_child \ + : (valid: %b, %d, %a)@.right_child: (valid: %b, %d, \ + %a)@.correct_map_size: %b@." + index + has_data + has_map_index + map_index_is_correct + left_child_is_none_or_inf + left_child_index + Format.(pp_print_option pp_elt) + (get_data_opt left_child_index) + right_child_is_none_or_inf + right_child_index + Format.(pp_print_option pp_elt) + (get_data_opt right_child_index) + correct_map_size) + in + check_invariant_for_all_index h.size + end +end diff --git a/src/lib_stdlib/bounded_min_heap.mli b/src/lib_stdlib/bounded_min_heap.mli new file mode 100644 index 000000000000..a65da7ad3f39 --- /dev/null +++ b/src/lib_stdlib/bounded_min_heap.mli @@ -0,0 +1,73 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Bounded min-heap: keep only the [n] smallest elements. *) + +(** + [Make (ID) (E)] creates a min heap for element of type E.t using + the [E.compare] function. The heap is bounded and adding more + elements than its capacity will fail. The module [ID] is used to + check if an element already exists at insertion. +*) +module Make + (Id : Stdlib.Hashtbl.HashedType) + (E : sig + include Stdlib.Set.OrderedType + + val id : t -> Id.t + end) : sig + module Hash_map : Hashtbl.S with type key := Id.t + + type t + + (** [create capacity] creates a bounded sequence of at most + [capacity] elements. + + Raise [Invalid_argument] if [size < 0] or [size > Sys.max_array_length]. *) + val create : int -> t + + (** [insert e h] adds element [e] to bounded sequence [h] if [h] is + not full. + + Worst-case complexity: O(log n) where n is the capacity of the + heap. + + If there is an element [e'] with [E.id e' = E.id e], it will be + replaced iff [E.compare e e' < 0] . *) + val insert : E.t -> t -> (unit, string) result + + (** [pop h] removes the smallest element from the heap [h], + [None] if empty. *) + val pop : t -> E.t option + + (** [peek_min h] returns, without removing, the smallest element from + the heap [h], [None] if empty. *) + val peek_min : t -> E.t option + + (** [elements h] returns the contents of [h] as a sorted + list in increasing order according to [E.compare]. + + Worst-case complexity: O(n log n) where n is the size of the heap. + *) + val elements : t -> E.t list + + (** [length h] is the number of elements held by [h]. *) + val length : t -> int + + (** [mem elt h] returns true is the [elt] already exists in + [h], i.e. E.compare returns 0. *) + val mem : E.t -> t -> bool + + (** [find id h] returns the first elements of [h], that as [E.id e = + id] is true. *) + val find_opt : Id.t -> t -> E.t option + + module Internal_for_tests : sig + val check_heap_invariant : + pp_elt:(Format.formatter -> E.t -> unit) -> t -> unit + end +end diff --git a/src/lib_stdlib/test/dune b/src/lib_stdlib/test/dune index a47fa826e1ba..28f60c028b66 100644 --- a/src/lib_stdlib/test/dune +++ b/src/lib_stdlib/test/dune @@ -23,6 +23,7 @@ test_bits test_tzList test_bounded_heap + Test_bounded_min_heap test_tzString test_fallbackArray test_functionalArray diff --git a/src/lib_stdlib/test/test_bounded_min_heap.ml b/src/lib_stdlib/test/test_bounded_min_heap.ml new file mode 100644 index 000000000000..e6fb0d4b1250 --- /dev/null +++ b/src/lib_stdlib/test/test_bounded_min_heap.ml @@ -0,0 +1,124 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Testing + ------- + Component: Standard library + Invocation: dune exec src/lib_stdlib/test/main.exe \ + -- -f src/lib_stdlib/test/test_bounded_min_heap.ml + Subject: Unit tests bounded min heap +*) + +module Assert = Assert + +module Id = struct + include String + + let hash = Stdlib.Hashtbl.hash +end + +let id i = string_of_int i + +module B = + Bounded_min_heap.Make + (Id) + (struct + include Int + + let id = id + end) + +let take_nth_smallest n l = + TzList.take_n n (List.sort (fun x y -> Compare.Int.compare x y) l) + +(* At least 2 elements, since we'll create a bounded set of size #elements / 2 + *) +let size = QCheck2.Gen.int_range 2 1000 + +let check_insert_heap ?(expect_heap_is_full = false) v b = + let res = B.insert v b in + let () = + if expect_heap_is_full then + let error = "no space left in heap" in + Assert.equal ~msg:__LOC__ res (Error error) + else Assert.equal ~msg:__LOC__ res (Ok ()) + in + B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b + +let test_empty_works () = + let b = B.create 0 in + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_heap ~expect_heap_is_full:true 1 b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] + +let test_heap_works () = + let b = B.create 10 in + let check_insert_list l = List.iter (fun v -> check_insert_heap v b) l in + let check_pop ~__LOC__ = + List.iter (fun v -> + let extracted = B.pop b in + B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b ; + Assert.equal + ~pp:Format.(pp_print_option pp_print_int) + ~msg:__LOC__ + extracted + (Some v)) + in + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_list [3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 1) ; + check_pop ~__LOC__ [1; 2; 3] ; + Assert.equal ~msg:__LOC__ (B.pop b) None ; + Assert.equal ~msg:__LOC__ (B.peek_min b) None ; + check_insert_list [10; 4; 8; 2; 6] ; + Assert.equal ~msg:__LOC__ (B.elements b) [2; 4; 6; 8; 10] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 2) ; + check_pop ~__LOC__ [2; 4] ; + Assert.equal ~msg:__LOC__ (B.elements b) [6; 8; 10] ; + Assert.equal ~msg:__LOC__ (B.peek_min b) (Some 6) ; + check_insert_list [9; 1; 7; 3; 5; 2; 4] ; + Assert.equal + ~pp:Format.(pp_print_list pp_print_int) + ~msg:__LOC__ + (B.elements b) + [1; 2; 3; 4; 5; 6; 7; 8; 9; 10] ; + check_insert_heap ~expect_heap_is_full:true 0 b ; + check_insert_heap ~expect_heap_is_full:true 11 b ; + Assert.is_true + ~loc:__LOC__ + (List.for_all (fun v -> B.mem v b) [10; 9; 8; 7; 6; 5; 4; 3; 2; 1]) ; + Assert.is_false ~loc:__LOC__ (B.mem 0 b) ; + Assert.is_false ~loc:__LOC__ (B.mem 11 b) ; + Assert.equal + ~msg:__LOC__ + (List.for_all + (fun v -> + let id = id v in + let v' = B.find_opt id b in + v' = Some v) + [10; 9; 8; 7; 6; 5; 4; 3; 2; 1]) + true ; + Assert.equal ~msg:__LOC__ (B.find_opt (id 0) b) None ; + Assert.equal ~msg:__LOC__ (B.find_opt (id 11) b) None ; + () + +let () = + Alcotest.run + ~__FILE__ + "stdlib" + [ + ( "Bounded_min_heap", + [ + ("create 0 works", `Quick, test_empty_works); + ("basic operation works", `Quick, test_heap_works); + ] ); + ] diff --git a/src/lib_stdlib/tezos_stdlib.ml b/src/lib_stdlib/tezos_stdlib.ml index 11ffe56c1fb9..fb644e1bcb55 100644 --- a/src/lib_stdlib/tezos_stdlib.ml +++ b/src/lib_stdlib/tezos_stdlib.ml @@ -17,6 +17,7 @@ module Bits = Bits module Bloomer = Bloomer module Bounded_heap = Bounded_heap +module Bounded_min_heap = Bounded_min_heap module Circular_buffer = Circular_buffer module Compare = Compare module FallbackArray = FallbackArray -- GitLab From 1ca5291edb3d82f62bf69089a2562da717432f79 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 19 Nov 2024 16:36:27 +0100 Subject: [PATCH 2/6] soru/node: add heap for the batcher conflict when cherry-picking 54966598c5896a695e6dbb0e37c74e663f1bf65b: - At L1743 ``` Metrics.Batcher.set_inject_time @@ Ptime.diff inject_timestamp get_timestamp ; ``` is ``` Metrics.wrap (fun () -> let inject_timestamp = Time.System.now () in Metrics.Batcher.set_inject_time @@ Ptime.diff inject_timestamp get_timestamp) ; ``` I left current v21 version --- CHANGES.rst | 6 + src/lib_smart_rollup/l2_message.ml | 86 +++++-- src/lib_smart_rollup/l2_message.mli | 6 +- src/lib_smart_rollup/rollup_node_services.ml | 25 +- src/lib_smart_rollup_node/batcher.ml | 230 +++++++++--------- src/lib_smart_rollup_node/batcher.mli | 17 +- .../batcher_worker_types.ml | 14 +- .../batcher_worker_types.mli | 1 + src/lib_smart_rollup_node/rpc_directory.ml | 12 +- tezt/lib_tezos/sc_rollup_rpc.ml | 10 +- tezt/lib_tezos/sc_rollup_rpc.mli | 6 +- tezt/tests/sc_rollup.ml | 216 +++++++++++++++- 12 files changed, 452 insertions(+), 177 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 14c1d9d1d53e..9eebc3cb71f1 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -52,6 +52,12 @@ Docker Images Smart Rollup node ----------------- +- Updated batcher with a new order structure. The RPC + ``/local/batcher/injection`` now has a new query argument + possibility ``"order": ``. The batcher will batch the + received chunk with the following priority order: First chunks with + ascending order then chunks by order of arrival. (MR :gl:`!15672`) + - Updated RPC ``/local/batcher/injection`` with a new query argument possibility. When the rpc contains ``"drop_duplicate": true`` then the batcher will drop the messages that were already injected with a diff --git a/src/lib_smart_rollup/l2_message.ml b/src/lib_smart_rollup/l2_message.ml index 7dac51c005b4..dcf17e7ec224 100644 --- a/src/lib_smart_rollup/l2_message.ml +++ b/src/lib_smart_rollup/l2_message.ml @@ -23,36 +23,11 @@ (* *) (*****************************************************************************) -type t = { - counter : Z.t option; - (** Each message is given a unique counter to allow for the batcher to - receive multiple identical messages. *) - content : string; (** The actual content of the message. *) -} - let content_encoding = let open Data_encoding in def "sc_l2_message" ~description:"A hex encoded smart rollup message" @@ string' Hex -let encoding = - let open Data_encoding in - conv - (fun {counter; content} -> (counter, content)) - (fun (counter, content) -> {counter; content}) - @@ obj2 (opt "counter" z) (req "content" content_encoding) - -let make = - let counter = ref Z.zero in - fun ~unique content -> - if unique then ( - let m = {content; counter = Some !counter} in - counter := Z.succ !counter ; - m) - else {content; counter = None} - -let content m = m.content - module Id = Tezos_crypto.Blake2B.Make (Tezos_crypto.Base58) @@ -71,4 +46,63 @@ let () = type id = Id.t -let id t = Id.hash_bytes [Data_encoding.Binary.to_bytes_exn encoding t] +let id_encoding = + let open Data_encoding in + obj2 (opt "counter" z) (req "content" content_encoding) + +let id ?counter content = + Id.hash_bytes + [Data_encoding.Binary.to_bytes_exn id_encoding (counter, content)] + +type t = {order : Z.t option; counter : Z.t; id : Id.t; content : string} + +let make = + let counter = ref Z.zero in + fun ?order ~unique content -> + let counter_for_id = if unique then Some !counter else None in + let id = id ?counter:counter_for_id content in + let m = {order; counter = !counter; id; content} in + counter := Z.succ !counter ; + m + +let id t = t.id + +let content t = t.content + +let counter t = t.counter +(* + We need to transform a the encoding to be retro compatible with the previous encoding + + { + "id: , + "message": { + "content": , + "counter: + } + } + + *) + +let encoding = + let open Data_encoding in + conv + (fun {order; counter; id; content} -> (order, id, (content, counter))) + (fun (order, id, (content, counter)) -> {order; counter; id; content}) + @@ obj3 + (opt "order" n) + (req "id" Id.encoding) + (req "message" (obj2 (req "content" content_encoding) (req "counter" n))) + +(* compare order messages in the following order: First are messages + with an `order`, meaning the messages priority was set by the + caller, then the messages are ordered by the timestamp of + registration. *) +let compare msg1 msg2 = + let counter_cmp () = Z.compare msg1.counter msg2.counter in + match (msg1.order, msg2.order) with + | Some o1, Some o2 -> + let cmp = Z.compare o1 o2 in + if cmp = 0 then counter_cmp () else cmp + | None, None -> counter_cmp () + | Some _p, _ -> -1 + | _, Some _p -> 1 diff --git a/src/lib_smart_rollup/l2_message.mli b/src/lib_smart_rollup/l2_message.mli index f2552671e3f6..e08e22926aca 100644 --- a/src/lib_smart_rollup/l2_message.mli +++ b/src/lib_smart_rollup/l2_message.mli @@ -31,7 +31,7 @@ type t (multiple identical calls to [make] will return different values) because it will be given a unique id. Using [unique = false], on the contrary, makes the call idempotent. *) -val make : unique:bool -> string -> t +val make : ?order:Z.t -> unique:bool -> string -> t (** [content message] returns the string content of [message], i.e. [content (make s) = s]. *) @@ -50,3 +50,7 @@ val content_encoding : string Data_encoding.t val encoding : t Data_encoding.t val id : t -> Id.t + +val counter : t -> Z.t + +val compare : t -> t -> int diff --git a/src/lib_smart_rollup/rollup_node_services.ml b/src/lib_smart_rollup/rollup_node_services.ml index 5f3d080344b5..ca4e824ff0ff 100644 --- a/src/lib_smart_rollup/rollup_node_services.ml +++ b/src/lib_smart_rollup/rollup_node_services.ml @@ -219,8 +219,7 @@ module Encodings = struct (req "message_index" int31) (opt "message" Outbox_message.summary_encoding)))) - let queued_message = - obj2 (req "id" L2_message.Id.encoding) (req "message" L2_message.encoding) + let queued_message = L2_message.encoding let batcher_queue = list queued_message @@ -804,15 +803,29 @@ module Local = struct (path / "gc_info") let injection = - let query_drop_duplicate : bool Tezos_rpc.Query.t = + let z_rpc_arg = + Resto.Arg.make + ~name:"z" + ~destruct:(fun s -> Ok (Z.of_string s)) + ~construct:Z.to_string + () + in + let query = let open Tezos_rpc.Query in - query Fun.id - |+ field "drop_duplicate" Tezos_rpc.Arg.bool false Fun.id + query (fun order drop_duplicate -> + object + method order = order + + method drop_duplicate = drop_duplicate + end) + |+ opt_field "order" z_rpc_arg (fun q -> q#order) + |+ field "drop_duplicate" Tezos_rpc.Arg.bool false (fun q -> + q#drop_duplicate) |> seal in Tezos_rpc.Service.post_service ~description:"Inject messages in the batcher's queue" - ~query:query_drop_duplicate + ~query ~input: Data_encoding.( def diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index 88fa5154cc7e..83b137fcf361 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -24,7 +24,7 @@ (*****************************************************************************) open Batcher_worker_types -module Message_queue = Hash_queue.Make (L2_message.Id) (L2_message) +module Message_heap = Bounded_min_heap.Make (L2_message.Id) (L2_message) module Batcher_events = Batcher_events.Declare (struct let worker_name = "batcher" @@ -40,7 +40,7 @@ type status = Pending_batch | Batched of Injector.Inj_operation.id type state = { node_ctxt : Node_context.ro; - messages : Message_queue.t; + messages_heap : Message_heap.t; batched : Batched_messages.t; mutable plugin : (module Protocol_plugin_sig.S); } @@ -81,105 +81,87 @@ let max_batch_size {node_ctxt; plugin; _} = ~default:Plugin.Batcher_constants.protocol_max_batch_size let get_batches state ~only_full = - let ( current_rev_batch, - current_batch_size, - current_batch_elements, - full_batches ) = - Message_queue.fold - (fun msg_id - message - ( current_rev_batch, - current_batch_size, - current_batch_elements, - full_batches ) -> + let max_batch_size = max_batch_size state in + let max_batch_elements = state.node_ctxt.config.batcher.max_batch_elements in + let heap = state.messages_heap in + let add_batch rev_batch rev_batches = + if List.is_empty rev_batch then rev_batches + else + let batch = List.rev rev_batch in + batch :: rev_batches + in + let rec pop_until_enough (rev_batch, batch_size, batch_elements) rev_batches = + let message = Message_heap.peek_min heap in + match message with + | None -> add_batch rev_batch rev_batches + | Some message -> let size = message_size (L2_message.content message) in - let new_batch_size = current_batch_size + size in - let new_batch_elements = current_batch_elements + 1 in + let new_batch_size = batch_size + size in + let new_batch_elements = batch_elements + 1 in if - new_batch_size <= max_batch_size state - && new_batch_elements - <= state.node_ctxt.config.batcher.max_batch_elements + new_batch_size <= max_batch_size + && new_batch_elements <= max_batch_elements then + (* pop the message as we only peeked it. *) + let _message = Message_heap.pop heap in (* We can add the message to the current batch because we are still within the bounds. *) - ( (msg_id, message) :: current_rev_batch, - new_batch_size, - new_batch_elements, - full_batches ) + pop_until_enough + (message :: rev_batch, new_batch_size, new_batch_elements) + rev_batches else - (* The batch augmented with the message would be too big but it is - below the limit without it. We finalize the current batch and - create a new one for the message. NOTE: Messages in the queue are - always < [state.conf.max_batch_size] because {!on_register} only - accepts those. *) - let batch = List.rev current_rev_batch in - ([(msg_id, message)], size, 1, batch :: full_batches)) - state.messages - ([], 0, 0, []) - in - let batches = + (* we haven't pop the message, so we can safely call + continue_or_stop. *) + continue_or_stop (add_batch rev_batch rev_batches) + and continue_or_stop rev_batches = if - (not only_full) - || current_batch_size >= state.node_ctxt.config.batcher.min_batch_size - && current_batch_elements - >= state.node_ctxt.config.batcher.min_batch_elements - then - (* We have enough to make a batch with the last non-full batch. *) - List.rev current_rev_batch :: full_batches - else full_batches + only_full + && Message_heap.length heap + < state.node_ctxt.config.batcher.min_batch_elements + then rev_batches + else pop_until_enough ([], 0, 0) rev_batches in - List.fold_left - (fun (batches, to_remove) -> function - | [] -> (batches, to_remove) - | batch -> - let msg_hashes, batch = List.split batch in - let to_remove = List.rev_append msg_hashes to_remove in - (batch :: batches, to_remove)) - ([], []) - batches + continue_or_stop [] |> List.rev let produce_batches state ~only_full = let open Lwt_result_syntax in let start_timestamp = Time.System.now () in - let batches, to_remove = get_batches state ~only_full in + let batches = get_batches state ~only_full in let get_timestamp = Time.System.now () in + let nb_messages_batched = + List.fold_left (fun len l -> len + List.length l) 0 batches + in Metrics.wrap (fun () -> Metrics.Batcher.set_messages_queue_size - @@ Message_queue.length state.messages ; - Metrics.Batcher.set_messages_size @@ List.length to_remove ; + @@ Message_heap.length state.messages_heap ; + Metrics.Batcher.set_messages_size @@ nb_messages_batched ; Metrics.Batcher.set_batches_size @@ List.length batches ; Metrics.Batcher.set_get_time @@ Ptime.diff get_timestamp start_timestamp) ; - match batches with - | [] -> return_unit - | _ -> - let* () = - Metrics.wrap_lwt (fun () -> - Metrics.Batcher.set_last_batch_time start_timestamp ; - let* block = Node_context.last_processed_head_opt state.node_ctxt in - let () = - match block with - | Some block -> - Metrics.Batcher.set_last_batch_level block.header.level - | None -> () - in - return_unit) - in - let* () = inject_batches state batches in - let*! () = - Batcher_events.(emit batched) - (List.length batches, List.length to_remove) - in - let inject_timestamp = Time.System.now () in - Metrics.Batcher.set_inject_time - @@ Ptime.diff inject_timestamp get_timestamp ; - List.iter - (fun tr_hash -> Message_queue.remove state.messages tr_hash) - to_remove ; - return_unit + if List.is_empty batches then return_unit + else + let* () = + Metrics.wrap_lwt (fun () -> + Metrics.Batcher.set_last_batch_time start_timestamp ; + let* block = Node_context.last_processed_head_opt state.node_ctxt in + let () = + match block with + | Some block -> + Metrics.Batcher.set_last_batch_level block.header.level + | None -> () + in + return_unit) + in + let* () = inject_batches state batches in + let*! () = + Batcher_events.(emit batched) (List.length batches, nb_messages_batched) + in + let inject_timestamp = Time.System.now () in + Metrics.Batcher.set_inject_time @@ Ptime.diff inject_timestamp get_timestamp ; + return_unit -let message_already_added state msg_id = - match Batched_messages.find_opt state.batched msg_id with - | None -> false +let message_already_batched state msg = + match Batched_messages.find_opt state.batched (L2_message.id msg) with + | None -> false (* check is done in insertion in the heap *) | Some {l1_id; _} -> (* If injector know about the operation then it's either pending, injected or included and we don't re-inject the @@ -188,8 +170,7 @@ let message_already_added state msg_id = `retention_period`) and the user wants to re-inject it. *) Option.is_some @@ Injector.operation_status l1_id -let on_register ~drop_duplicate state (messages : string list) = - let open Lwt_result_syntax in +let make_l2_messages ?order ~unique state (messages : string list) = let module Plugin = (val state.plugin) in let max_size_msg = min @@ -197,29 +178,52 @@ let on_register ~drop_duplicate state (messages : string list) = + 4 (* We add 4 because [message_size] adds 4. *)) (max_batch_size state) in + List.mapi_e + (fun i message -> + if message_size message > max_size_msg then + error_with "Message %d is too large (max size is %d)" i max_size_msg + else Ok (L2_message.make ?order ~unique message)) + messages + +type error += Heap_insertion_failed of string + +let () = + register_error_kind + ~id:"batcher.heap_insertion_failed" + ~title:"Heap insertion failed" + ~description:"Heap insertion failed" + ~pp:(fun ppf err -> Format.fprintf ppf "Heap insertion failed with %s." err) + `Permanent + Data_encoding.(obj1 (req "error" string)) + (function Heap_insertion_failed error -> Some error | _ -> None) + (fun error -> Heap_insertion_failed error) + +let add_messages_into_heap ~drop_duplicate state = + let open Lwt_result_syntax in + List.map_es @@ fun message -> + let msg_id = L2_message.id message in + let* () = + (* check if already batched *) + if drop_duplicate && message_already_batched state message then + let*! () = Batcher_events.(emit dropped_msg) msg_id in + return_unit + else + let*? () = + Result.map_error (fun err_msg -> [Heap_insertion_failed err_msg]) + @@ Message_heap.insert message state.messages_heap + in + return_unit + in + return msg_id + +let on_register ?order ~drop_duplicate state (messages : string list) = + let open Lwt_result_syntax in + let module Plugin = (val state.plugin) in let*? messages = - List.mapi_e - (fun i message -> - if message_size message > max_size_msg then - error_with "Message %d is too large (max size is %d)" i max_size_msg - else Ok (L2_message.make ~unique:(not drop_duplicate) message)) - messages + make_l2_messages ?order ~unique:(not drop_duplicate) state messages in let*! () = Batcher_events.(emit queue) (List.length messages) in - let*! ids = - List.map_s - (fun message -> - let msg_id = L2_message.id message in - let*! () = - if drop_duplicate && message_already_added state msg_id then - Batcher_events.(emit dropped_msg) msg_id - else ( - Message_queue.replace state.messages msg_id message ; - Lwt.return_unit) - in - Lwt.return msg_id) - messages - in + let* ids = add_messages_into_heap ~drop_duplicate state messages in let+ () = produce_batches state ~only_full:true in ids @@ -228,7 +232,7 @@ let on_new_head state = produce_batches state ~only_full:false let init_batcher_state plugin node_ctxt = { node_ctxt; - messages = Message_queue.create 100_000 (* ~ 400MB *); + messages_heap = Message_heap.create 100_000 (* ~ 400MB *); batched = Batched_messages.create 100_000 (* ~ 400MB *); plugin; } @@ -269,8 +273,8 @@ module Handlers = struct fun w request -> let state = Worker.state w in match request with - | Request.Register {messages; drop_duplicate} -> - protect @@ fun () -> on_register ~drop_duplicate state messages + | Request.Register {order; messages; drop_duplicate} -> + protect @@ fun () -> on_register ?order ~drop_duplicate state messages | Request.Produce_batches -> protect @@ fun () -> on_new_head state type launch_error = error trace @@ -364,13 +368,13 @@ let find_message id = let open Result_syntax in let+ w = worker () in let state = Worker.state w in - Message_queue.find_opt state.messages id + Message_heap.find_opt id state.messages_heap let get_queue () = let open Result_syntax in let+ w = worker () in let state = Worker.state w in - Message_queue.bindings state.messages + Message_heap.elements state.messages_heap let handle_request_error rq = let open Lwt_syntax in @@ -382,12 +386,12 @@ let handle_request_error rq = | Error (Closed (Some errs)) -> Lwt.return_error errs | Error (Any exn) -> Lwt.return_error [Exn exn] -let register_messages ~drop_duplicate messages = +let register_messages ?order ~drop_duplicate messages = let open Lwt_result_syntax in let*? w = worker () in Worker.Queue.push_request_and_wait w - (Request.Register {messages; drop_duplicate}) + (Request.Register {order; messages; drop_duplicate}) |> handle_request_error let produce_batches () = @@ -409,7 +413,7 @@ let shutdown () = | Ok w -> Worker.shutdown w let message_status state msg_id = - match Message_queue.find_opt state.messages msg_id with + match Message_heap.find_opt msg_id state.messages_heap with | Some msg -> Some (Pending_batch, L2_message.content msg) | None -> ( match Batched_messages.find_opt state.batched msg_id with diff --git a/src/lib_smart_rollup_node/batcher.mli b/src/lib_smart_rollup_node/batcher.mli index 6607ec757ab3..239cb431dc58 100644 --- a/src/lib_smart_rollup_node/batcher.mli +++ b/src/lib_smart_rollup_node/batcher.mli @@ -54,14 +54,19 @@ val find_message : L2_message.id -> L2_message.t option tzresult (** List all queued messages in the order they appear in the queue, i.e. the message that were added first to the queue are at the end of list. *) -val get_queue : unit -> (L2_message.id * L2_message.t) list tzresult +val get_queue : unit -> L2_message.t list tzresult -(** [register_messages ~drop_duplicate messages] registers new L2 - [messages] in the queue of the batcher for future injection on - L1. If [drop_duplicate = false] then it injects the message even - if it was already injected in a previous l1 operations. *) +(** [register_messages ?order ~drop_duplicate messages] registers + new L2 [messages] in the queue of the batcher for future injection + on L1. If [drop_duplicate = false] then it injects the message + even if it was already injected in a previous l1 operations. If + the priority is set then add the message in the heap at the + correct place. *) val register_messages : - drop_duplicate:bool -> string list -> L2_message.id list tzresult Lwt.t + ?order:Z.t -> + drop_duplicate:bool -> + string list -> + L2_message.id list tzresult Lwt.t (** The status of a message in the batcher. Returns [None] if the message is not known by the batcher (the batcher only keeps the batched status of the last diff --git a/src/lib_smart_rollup_node/batcher_worker_types.ml b/src/lib_smart_rollup_node/batcher_worker_types.ml index b49cf016a0b6..a55b1a9a1f59 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.ml +++ b/src/lib_smart_rollup_node/batcher_worker_types.ml @@ -26,6 +26,7 @@ module Request = struct type ('a, 'b) t = | Register : { + order : Z.t option; messages : string list; drop_duplicate : bool; } @@ -43,16 +44,17 @@ module Request = struct case (Tag 0) ~title:"Register" - (obj3 + (obj4 (req "request" (constant "register")) + (opt "order" n) (req "messages" (list L2_message.content_encoding)) (req "drop_duplicate" bool)) (function - | View (Register {messages; drop_duplicate}) -> - Some ((), messages, drop_duplicate) + | View (Register {order; messages; drop_duplicate}) -> + Some ((), order, messages, drop_duplicate) | _ -> None) - (fun ((), messages, drop_duplicate) -> - View (Register {messages; drop_duplicate})); + (fun ((), order, messages, drop_duplicate) -> + View (Register {order; messages; drop_duplicate})); case (Tag 1) ~title:"Produce_batches" @@ -63,7 +65,7 @@ module Request = struct let pp ppf (View r) = match r with - | Register {messages; drop_duplicate} -> + | Register {order = _; messages; drop_duplicate} -> Format.fprintf ppf "register %d new L2 message%a" diff --git a/src/lib_smart_rollup_node/batcher_worker_types.mli b/src/lib_smart_rollup_node/batcher_worker_types.mli index f67af7bf73b8..7d70fa678991 100644 --- a/src/lib_smart_rollup_node/batcher_worker_types.mli +++ b/src/lib_smart_rollup_node/batcher_worker_types.mli @@ -30,6 +30,7 @@ module Request : sig (** Type of requests accepted by the batcher worker. *) type ('a, 'b) t = | Register : { + order : Z.t option; messages : string list; drop_duplicate : bool; } diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 47ec5032a853..4436ddf29e4c 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -451,8 +451,11 @@ let () = let () = Local_directory.register0 Rollup_node_services.Local.injection - @@ fun _node_ctxt drop_duplicate messages -> - Batcher.register_messages ~drop_duplicate messages + @@ fun _node_ctxt query messages -> + Batcher.register_messages + ?order:query#order + ~drop_duplicate:query#drop_duplicate + messages let () = Local_directory.register0 Rollup_node_services.Local.dal_batcher_injection @@ -466,10 +469,7 @@ let () = let () = Local_directory.register0 Rollup_node_services.Local.batcher_queue - @@ fun _node_ctxt () () -> - let open Lwt_result_syntax in - let*? queue = Batcher.get_queue () in - return queue + @@ fun _node_ctxt () () -> Batcher.get_queue () |> Lwt.return (** [commitment_level_of_inbox_level node_ctxt inbox_level] returns the level of the commitment which should include the inbox of level diff --git a/tezt/lib_tezos/sc_rollup_rpc.ml b/tezt/lib_tezos/sc_rollup_rpc.ml index 1d14fa9bdd85..3bacd84fed24 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.ml +++ b/tezt/lib_tezos/sc_rollup_rpc.ml @@ -243,17 +243,21 @@ let get_global_block_durable_state_value ?(block = "head") ~pvm_kind ~operation ["global"; "block"; block; "durable"; pvm_kind; op] (f operation) -let post_local_batcher_injection ?drop_duplicate ~messages () = +let post_local_batcher_injection ?order ?drop_duplicate ~messages () = let data = Data (`A (List.map (fun s -> `String Hex.(of_string s |> show)) messages)) in let query_string = - Option.map (fun b -> [("drop_duplicate", string_of_bool b)]) drop_duplicate + let order = Option.map (fun v -> ("order", string_of_int v)) order in + let drop_duplicate = + Option.map (fun b -> ("drop_duplicate", string_of_bool b)) drop_duplicate + in + Option.to_list order @ Option.to_list drop_duplicate in make POST ["local"; "batcher"; "injection"] - ?query_string + ~query_string ~data JSON.(fun json -> as_list json |> List.map as_string) diff --git a/tezt/lib_tezos/sc_rollup_rpc.mli b/tezt/lib_tezos/sc_rollup_rpc.mli index 8b83bdc3b68e..34e385512105 100644 --- a/tezt/lib_tezos/sc_rollup_rpc.mli +++ b/tezt/lib_tezos/sc_rollup_rpc.mli @@ -172,7 +172,11 @@ val get_global_block_durable_state_value : queue the rollup node's batcher and returns the list of message hashes injected. *) val post_local_batcher_injection : - ?drop_duplicate:bool -> messages:string list -> unit -> string list RPC_core.t + ?order:int -> + ?drop_duplicate:bool -> + messages:string list -> + unit -> + string list RPC_core.t (** RPC: [POST local/dal/batcher/injection] injects the given [messages] in the rollup node's DAL queue. *) diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 323630b1f5f5..16cb16b66c8e 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -5794,11 +5794,198 @@ let test_multiple_batcher_key ~kind = ~error_msg:"Reused %L keys but should have rotated not reused any." ; unit +let test_batcher_order_msgs ~kind = + test_full_scenario + { + tags = ["batcher"; "messages"; "order"]; + variant = None; + description = "rollup node - Batcher order message correctly"; + } + ~mode:Batcher + ~kind + ~operators:[(Sc_rollup_node.Batching, Constant.bootstrap1.alias)] + @@ fun _protocol rollup_node rollup_addr node client -> + let min_batch_elements = 10 in + let max_batch_elements = 20 in + let* _config = Sc_rollup_node.config_init rollup_node rollup_addr in + let () = + Sc_rollup_node.Config_file.update rollup_node + @@ JSON.update "batcher" + @@ fun json -> + let json = + JSON.put + ( "min_batch_elements", + JSON.annotate + ~origin:"min batch elements size" + (`Float (Int.to_float min_batch_elements)) ) + json + in + let json = + JSON.put + ( "max_batch_elements", + JSON.annotate + ~origin:"max batch elements size" + (`Float (Int.to_float max_batch_elements)) ) + json + in + json + in + + let inject_int_of_string ?order ?drop_duplicate messages = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.post_local_batcher_injection + ?order + ?drop_duplicate + ~messages:(List.map string_of_int messages) + () + in + let wait_for_included_and_returns_msgs () = + let find_map_op_content op_content_json = + let kind = JSON.(op_content_json |-> "kind" |> as_string) in + if kind = "smart_rollup_add_messages" then + let msgs = + JSON.(op_content_json |-> "message" |> as_list |> List.map as_string) + in + Some msgs + else None + in + wait_for_included_and_map_ops_content + rollup_node + node + ~timeout:30. + ~find_map_op_content + in + + let check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages = + let* queued_hashes, queued_msgs = + let* queued_msgs = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.get_local_batcher_queue () + in + return @@ List.split queued_msgs + in + Check.( + (List.sort String.compare queued_hashes + = List.sort_uniq String.compare expected_queued_hashes) + (list string) + ~__LOC__) + ~error_msg:"Queued msgs hashes %L was found but %R was expected" ; + + Check.( + (List.map int_of_string queued_msgs = expected_messages) + (list int) + ~__LOC__) + ~error_msg:"Queued msgs %L was found but %R was expected" ; + unit + in + + let bake_then_check_included_msgs ~__LOC__ ~nb_batches ~expected_messages = + let* level = Node.get_level node in + let wait_for_injected = + wait_until_n_batches_are_injected rollup_node ~nb_batches + in + let* () = Client.bake_for_and_wait client + and* () = wait_for_injected + and* _lvl = Sc_rollup_node.wait_for_level rollup_node (level + 1) in + let wait_for_included = wait_for_included_and_returns_msgs () in + let* () = Client.bake_for_and_wait client + and* msgs = wait_for_included + and* _lvl = Sc_rollup_node.wait_for_level rollup_node (level + 2) in + + Check.( + (List.map + (List.map (fun s -> int_of_string @@ Hex.to_string (`Hex s))) + msgs + = expected_messages) + (list (list int)) + ~__LOC__) + ~error_msg:"Included msgs %L was found but %R was expected" ; + unit + in + + let* level = Node.get_level node in + let* () = Sc_rollup_node.run ~event_level:`Debug rollup_node rollup_addr [] in + let* _ = Sc_rollup_node.wait_for_level rollup_node level in + + (* To make sure we are all bootstrapped, might be unused *) + Log.info "injecting 3 messages with order specified (e.g. [10; 20; 30])" ; + let* hashes_1 = + fold 3 [] (fun i acc -> + let order = 10 * (i + 1) in + let* hashes = inject_int_of_string ~order [order] in + return @@ hashes @ acc) + in + Log.info + "injecting 4 messages with order specified in reverses (e.g. [ 30; 20; 10])" ; + let* hashes_2 = + fold 3 [] (fun i acc -> + let order = 30 - (10 * i) in + let* hashes = inject_int_of_string ~order [order] in + return @@ hashes @ acc) + in + Log.info "injecting 1 message with no order specified" ; + let* hashes_3 = inject_int_of_string [31] in + Log.info "injecting 2 time the same message with order 11 and ~drop_duplicate" ; + let* hashes_4 = inject_int_of_string ~order:11 ~drop_duplicate:true [1; 1] in + Log.info "Reinjecting the same message with order 0" ; + let* hashes_5 = inject_int_of_string ~order:0 ~drop_duplicate:true [1] in + let expected_queued_hashes = + hashes_1 @ hashes_2 @ hashes_3 @ hashes_4 @ hashes_5 + in + + let expected_messages = [1; 10; 10; 20; 20; 30; 30; 31] in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + let* () = + bake_then_check_included_msgs + ~__LOC__ + ~nb_batches:1 + ~expected_messages:[expected_messages] + in + + Log.info + "Injecting lots of messages to make sure order is preserved over batches, \ + e.g. first batches contains elements smaller than second batch." ; + let half_min_batch = min_batch_elements / 2 in + let* hashes_1 = + (* [0; 10; ... 40] *) + inject_int_of_string @@ List.init half_min_batch (fun i -> i * 10) + in + let* hashes_2 = + (* [50; 60; ... 80] *) + inject_int_of_string + @@ List.init (half_min_batch - 1) (fun i -> (half_min_batch + i) * 10) + in + let expected_queued_hashes = hashes_1 @ hashes_2 in + let expected_messages = + List.init (min_batch_elements - 1) (fun i -> i * 10) + in + let* () = check_queue ~__LOC__ ~expected_queued_hashes ~expected_messages in + + (* this willl trigger batch production *) + let* _hashes_4 = + (* [90; 100; ... 290] *) + inject_int_of_string + (((min_batch_elements - 1) * 10) + :: List.init max_batch_elements (fun i -> (min_batch_elements + i) * 10)) + in + let expected_messages = + [ + (* [0; 10; ... 190] *) + List.init max_batch_elements (fun i -> i * 10); + (* [200; 210; ... 290] *) + List.init min_batch_elements (fun i -> (max_batch_elements + i) * 10); + ] + in + let* () = + bake_then_check_included_msgs ~__LOC__ ~nb_batches:1 ~expected_messages + in + unit + (** Injector only uses key that have no operation in the mempool currently. 1. Batcher setup: - 5 signers in the batcher. - - 10 batches to inject. + - 10 batches to inject_int_of_string. 2. First Block: - Mempool: - Contains enough batches to fill the block from users with high fees. @@ -5923,8 +6110,8 @@ let test_injector_uses_available_keys ~kind = let* _lvl = Client.bake_for_and_wait client in let* () = inject_n_msgs_batches_in_rollup_node - (* we inject 2 times the number of operators so the rollup node - must inject in two salvos all the batches. *) + (* we inject_int_of_string 2 times the number of operators so the rollup node + must inject_int_of_string in two salvos all the batches. *) ~nb_of_batches:(2 * nb_operators) ~msg_per_batch ~msg_size @@ -5968,7 +6155,7 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = } @@ fun _protocol rollup_node rollup_addr _node client -> Log.info "Batcher setup" ; - let inject ~drop_duplicate ~messages = + let inject_int_of_string ~drop_duplicate ~messages = Sc_rollup_node.RPC.call rollup_node @@ Sc_rollup_rpc.post_local_batcher_injection ~drop_duplicate ~messages () in @@ -6014,7 +6201,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = let* () = if inject_msgs then ( - let* message_ids = inject ~drop_duplicate:true ~messages in + let* message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let message_id = List.nth message_ids 0 in Check.( (message_id = check_message_id) @@ -6045,9 +6234,13 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = [Injector_retention_period 1] and* _lvl = Sc_rollup_node.wait_sync rollup_node ~timeout:10. in - let* messages_id_first = inject ~drop_duplicate:false ~messages in + let* messages_id_first = + inject_int_of_string ~drop_duplicate:false ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:2 () in - let* messages_id_second = inject ~drop_duplicate:false ~messages in + let* messages_id_second = + inject_int_of_string ~drop_duplicate:false ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:4 () in Check.( (messages_id_first <> messages_id_second) @@ -6055,7 +6248,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = (list string) ~error_msg:"first messages batch id %L must be different than second %R") ; - let* first_message_ids = inject ~drop_duplicate:true ~messages in + let* first_message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:5 () in let check_message_id = List.nth first_message_ids 0 in @@ -6068,7 +6263,9 @@ let test_batcher_dont_reinject_already_injected_messages ~kind = "messages id with check on must be equal (given %L, expected %R).") ; Log.info "Resubmiting the same message dont add it to the queue." ; - let* second_message_ids = inject ~drop_duplicate:true ~messages in + let* second_message_ids = + inject_int_of_string ~drop_duplicate:true ~messages + in let* () = check_batcher_queue_size ~__LOC__ ~expected_size:5 () in Check.( (first_message_ids = second_message_ids) @@ -6439,6 +6636,7 @@ let register_protocol_independent () = test_accuser protocols ; test_bailout_refutation protocols ; test_multiple_batcher_key ~kind protocols ; + test_batcher_order_msgs ~kind protocols ; test_injector_uses_available_keys protocols ~kind ; test_batcher_dont_reinject_already_injected_messages protocols ~kind ; test_private_rollup_node_publish_in_whitelist protocols ; -- GitLab From be741f5a4a64f9f346d98342f7d22ab68680b038 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 3 Dec 2024 10:53:04 +0100 Subject: [PATCH 3/6] injector: add order to injector operation type --- src/lib_injector/injector_operation.ml | 73 +++++++++++++++---- src/lib_injector/injector_server_operation.ml | 4 + .../injector_server_operation.mli | 2 + src/lib_injector/injector_sigs.ml | 16 +++- src/lib_smart_rollup/operation_kind.ml | 12 +++ src/lib_smart_rollup/operation_kind.mli | 8 ++ src/lib_smart_rollup_node/injector.ml | 26 ++++--- src/lib_smart_rollup_node/rpc_directory.ml | 9 ++- 8 files changed, 121 insertions(+), 29 deletions(-) diff --git a/src/lib_injector/injector_operation.ml b/src/lib_injector/injector_operation.ml index 27436fff64b3..a1744c9f3d56 100644 --- a/src/lib_injector/injector_operation.ml +++ b/src/lib_injector/injector_operation.ml @@ -33,7 +33,13 @@ module Make (O : PARAM_OPERATION) : type errors = {count : int; last_error : tztrace option} - type t = {id : id; operation : O.t; mutable errors : errors} + type t = { + id : id; + order : Z.t option; + counter : Z.t; + operation : O.t; + mutable errors : errors; + } let hash_inner_operation nonce op = Id.hash_string @@ -46,16 +52,14 @@ module Make (O : PARAM_OPERATION) : let no_errors = {count = 0; last_error = None} - let make operation = - let nonce = - if not @@ O.unique operation then ( - let c = !counter in - counter := Z.succ !counter ; - Some c) - else None + let make ?order operation = + let nonce_for_hash_id = + if not @@ O.unique operation then Some !counter else None in - let id = hash_inner_operation nonce operation in - {id; operation; errors = no_errors} + let id = hash_inner_operation nonce_for_hash_id operation in + let op = {id; order; counter = !counter; operation; errors = no_errors} in + counter := Z.succ !counter ; + op let errors_encoding = let open Data_encoding in @@ -67,20 +71,59 @@ module Make (O : PARAM_OPERATION) : let encoding = let open Data_encoding in conv - (fun {id; operation; errors} -> (id, operation, errors)) - (fun (id, operation, errors) -> {id; operation; errors}) - @@ obj3 + (fun {id; order; counter; operation; errors} -> + (id, order, counter, operation, errors)) + (fun (id, order, counter, operation, errors) -> + {id; order; counter; operation; errors}) + @@ obj5 (req "id" Id.encoding) + (opt "order" n) + (req "counter" n) (req "operation" O.encoding) (dft "errors" errors_encoding no_errors) - let pp ppf {id; operation; errors} = + let pp ppf {id; order; counter = _; operation; errors} = let pp_errors ppf errors = if errors.count = 0 then () else Format.fprintf ppf " [%d errors]" errors.count in - Format.fprintf ppf "%a (%a)%a" O.pp operation Id.pp id pp_errors errors + let pp_order = + Format.pp_print_option @@ fun ppf -> + Format.fprintf ppf " [priority order %a]" Z.pp_print + in + Format.fprintf + ppf + "%a%a (%a)%a" + O.pp + operation + pp_order + order + Id.pp + id + pp_errors + errors let register_error op error_trace = op.errors <- {count = op.errors.count + 1; last_error = Some error_trace} + + let id {id; _} = id + + (* Compare operations with the following logic: + + - Operations without an explicit `order` or lesser than operations with + one + - The value of `order` is used to compare two operations with `order` set; + - Otherwise, use the timestamp to compare *) + let compare op1 op2 = + let op_compare = O.compare op1.operation op2.operation in + if not (op_compare = 0) then op_compare + else + let counter_cmp () = Z.compare op1.counter op2.counter in + match (op1.order, op2.order) with + | Some o1, Some o2 -> + let cmp = Z.compare o1 o2 in + if cmp = 0 then counter_cmp () else cmp + | None, None -> counter_cmp () + | Some _p, _ -> -1 + | _, Some _p -> 1 end diff --git a/src/lib_injector/injector_server_operation.ml b/src/lib_injector/injector_server_operation.ml index 33160896fd67..839e0040bdde 100644 --- a/src/lib_injector/injector_server_operation.ml +++ b/src/lib_injector/injector_server_operation.ml @@ -62,3 +62,7 @@ let pp ppf = function Format.fprintf ppf "Transaction of %Ld tez" amount let unique = function Transaction _ -> true + +let compare _ _ = 0 +(* Compare function used to prioritize a type of operation over the + other. Here it's not necessary. *) diff --git a/src/lib_injector/injector_server_operation.mli b/src/lib_injector/injector_server_operation.mli index ccaa602d4b24..824abecaec49 100644 --- a/src/lib_injector/injector_server_operation.mli +++ b/src/lib_injector/injector_server_operation.mli @@ -24,3 +24,5 @@ val pp : Format.formatter -> t -> unit (** [false] if the injector will accept duplicate such operations. *) val unique : t -> bool + +val compare : t -> t -> int diff --git a/src/lib_injector/injector_sigs.ml b/src/lib_injector/injector_sigs.ml index 08cef9a21c24..891809821001 100644 --- a/src/lib_injector/injector_sigs.ml +++ b/src/lib_injector/injector_sigs.ml @@ -94,6 +94,8 @@ module type PARAM_OPERATION = sig one operation [op]. Otherwise, it will allow duplicate such operations to appear in the queue. *) val unique : t -> bool + + val compare : t -> t -> int end (** Internal representation of injector operations. *) @@ -108,10 +110,16 @@ module type INJECTOR_OPERATION = sig (** The type of L1 operations that are injected on Tezos. These have an id attached to them that allows tracking and retrieving their status. *) - type t = private {id : id; operation : operation; mutable errors : errors} + type t = private { + id : id; + order : Z.t option; + counter : Z.t; + operation : operation; + mutable errors : errors; + } (** [make op] returns an L1 operation with the corresponding hash. *) - val make : operation -> t + val make : ?order:Z.t -> operation -> t (** Encoding for L1 operations *) val encoding : t Data_encoding.t @@ -123,6 +131,10 @@ module type INJECTOR_OPERATION = sig (** Register an error as occurring during injection of an operation. Its internal error counter is incremented. *) val register_error : t -> tztrace -> unit + + val id : t -> id + + val compare : t -> t -> int end (** Module type for parameter of functor {!Injector_functor.Make}. *) diff --git a/src/lib_smart_rollup/operation_kind.ml b/src/lib_smart_rollup/operation_kind.ml index d2b3f94d11b0..80d6b4d64577 100644 --- a/src/lib_smart_rollup/operation_kind.ml +++ b/src/lib_smart_rollup/operation_kind.ml @@ -79,3 +79,15 @@ let map_encoding value_encoding = ~string_of_key:to_string ~key_of_string:of_string_exn ~value_encoding) + +let priority_order = function + | Timeout -> 0 + | Refute -> 1 + | Publish -> 2 + | Publish_dal_commitment -> 2 + | Cement -> 2 + | Recover -> 3 + | Add_messages -> 4 + | Execute_outbox_message -> 5 + +let compare_priority k1 k2 = compare (priority_order k1) (priority_order k2) diff --git a/src/lib_smart_rollup/operation_kind.mli b/src/lib_smart_rollup/operation_kind.mli index bc963e1833cd..7d79c3a0738d 100644 --- a/src/lib_smart_rollup/operation_kind.mli +++ b/src/lib_smart_rollup/operation_kind.mli @@ -39,3 +39,11 @@ val of_string_exn : string -> t val encoding : t Data_encoding.t val map_encoding : (t -> 'value Data_encoding.t) -> 'value Map.t Data_encoding.t + +(** [compare_priority] Comparison over tag for order of + importance. Order is given by the list {!all}. it's the following: + + Timeout < Refute < [Publish; Publish_dal_commitment; Cement] < + Recover < Add_messages < Execute_outbox_message. +*) +val compare_priority : t -> t -> int diff --git a/src/lib_smart_rollup_node/injector.ml b/src/lib_smart_rollup_node/injector.ml index 8c243d238efc..2d98f3d82387 100644 --- a/src/lib_smart_rollup_node/injector.ml +++ b/src/lib_smart_rollup_node/injector.ml @@ -59,7 +59,21 @@ module Parameters : let encoding : t Data_encoding.t = Operation_kind.encoding end - module Operation = L1_operation + module Operation = struct + include L1_operation + + let tag : t -> Tag.t = function + | Add_messages _ -> Add_messages + | Cement _ -> Cement + | Publish _ -> Publish + | Timeout _ -> Timeout + | Refute _ -> Refute + | Recover_bond _ -> Recover + | Execute_outbox_message _ -> Execute_outbox_message + | Publish_dal_commitment _ -> Publish_dal_commitment + + let compare op1 op2 = Operation_kind.compare_priority (tag op1) (tag op2) + end (* TODO: https://gitlab.com/tezos/tezos/-/issues/3459 Very coarse approximation for the number of operation we @@ -74,15 +88,7 @@ module Parameters : | Execute_outbox_message -> 1 | Publish_dal_commitment -> 1 - let operation_tag : Operation.t -> Tag.t = function - | Add_messages _ -> Add_messages - | Cement _ -> Cement - | Publish _ -> Publish - | Timeout _ -> Timeout - | Refute _ -> Refute - | Recover_bond _ -> Recover - | Execute_outbox_message _ -> Execute_outbox_message - | Publish_dal_commitment _ -> Publish_dal_commitment + let operation_tag = Operation.tag let fee_parameter {fee_parameters; _} operation = let operation_kind = operation_tag operation in diff --git a/src/lib_smart_rollup_node/rpc_directory.ml b/src/lib_smart_rollup_node/rpc_directory.ml index 4436ddf29e4c..0ec7c507da03 100644 --- a/src/lib_smart_rollup_node/rpc_directory.ml +++ b/src/lib_smart_rollup_node/rpc_directory.ml @@ -663,8 +663,13 @@ let () = let rops = List.rev_map (fun Injector.Inj_operation. - {operation = op; errors = {count; last_error}; id = _} -> - Rollup_node_services.{op; errors = count; last_error}) + { + operation = op; + errors = {count; last_error}; + id = _; + order = _; + counter = _; + } -> Rollup_node_services.{op; errors = count; last_error}) ops in (tags, List.rev rops)) -- GitLab From 2dd650ff00f21e20d37ac2745169d5aa754fe714 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Tue, 3 Dec 2024 16:59:48 +0100 Subject: [PATCH 4/6] injector: add heap disk persistant storage --- src/lib_injector/disk_persistence.ml | 196 +++++++++++++++++++ src/lib_injector/disk_persistence.mli | 73 +++++++ src/lib_stdlib/bounded_min_heap.ml | 61 +++++- src/lib_stdlib/bounded_min_heap.mli | 15 +- src/lib_stdlib/test/test_bounded_min_heap.ml | 35 +++- 5 files changed, 369 insertions(+), 11 deletions(-) diff --git a/src/lib_injector/disk_persistence.ml b/src/lib_injector/disk_persistence.ml index 37e5d05dc1d0..d0e2f5ccfb7c 100644 --- a/src/lib_injector/disk_persistence.ml +++ b/src/lib_injector/disk_persistence.ml @@ -31,6 +31,7 @@ type error += | Io_error of [`Close | `Open] Lwt_utils_unix.io_error | Unix_error of Unix.error | Decoding_error of Data_encoding.Binary.read_error + | Heap_insertion_failed of string let () = register_error_kind @@ -126,6 +127,15 @@ let () = Data_encoding.(obj1 (req "error" Data_encoding.Binary.read_error_encoding)) (function Decoding_error e -> Some e | _ -> None) (fun e -> Decoding_error e) ; + register_error_kind + ~id:"disk_persistence.heap_insertion_failed" + ~title:"Heap insertion failed" + ~description:"Heap insertion failed" + ~pp:(fun ppf err -> Format.fprintf ppf "Heap insertion failed with %s." err) + `Permanent + Data_encoding.(obj1 (req "error" string)) + (function Heap_insertion_failed error -> Some error | _ -> None) + (fun error -> Heap_insertion_failed error) ; () module type H = sig @@ -435,3 +445,189 @@ struct List.iter (fun (k, v, _) -> Q.replace q.queue k v) list ; return q end + +module Make_heap + (N : sig + val name : string + end) + (K : Tezos_crypto.Intfs.HASH) + (V : sig + type t + + val id : t -> K.t + + val compare : t -> t -> int + + val persist : t -> bool + + val encoding : t Data_encoding.t + end) = +struct + module H = Bounded_min_heap.Make (K) (V) + + type t = {path : string; metadata_path : string; heap : H.t} + + let counter = ref min_int + + let filedata h k = Filename.concat h.path (K.to_b58check k) + + let filemetadata h k = Filename.concat h.metadata_path (K.to_b58check k) + + let create ~data_dir n = + let open Lwt_result_syntax in + let heap = H.create n in + let path = Filename.concat data_dir N.name in + let metadata_path = Filename.concat path "metadata" in + let* () = create_dir path in + let+ () = create_dir metadata_path in + {path; metadata_path; heap} + + let remove_persitant_data h k = + let open Lwt_result_syntax in + let* () = delete_file (filedata h k) + and* () = delete_file (filemetadata h k) in + return_unit + + let remove h k = + let open Lwt_result_syntax in + H.remove h.heap k ; + let* () = remove_persitant_data h k in + return_unit + + let create_metadata () = + let time = Time.System.now () in + let d, ps = Ptime.to_span time |> Ptime.Span.to_d_ps in + let c = !counter in + incr counter ; + (d, ps, c) + + let metadata_encoding = + let open Data_encoding in + conv + (fun (d, ps, c) -> (Int64.of_int d, ps, Int64.of_int c)) + (fun (d, ps, c) -> (Int64.to_int d, ps, Int64.to_int c)) + @@ tup3 int64 int64 int64 + + let heap_insert h v = + Result.map_error (fun err_msg -> [Heap_insertion_failed err_msg]) + @@ H.insert v h.heap + + let insert h k v = + let open Lwt_result_syntax in + let*? () = heap_insert h v in + if V.persist v then + let* () = write_value (filedata h k) V.encoding v + and* () = + write_value (filemetadata h k) metadata_encoding (create_metadata ()) + in + return_unit + else return_unit + + let pop h = + let open Lwt_result_syntax in + let elt_opt = H.pop h.heap in + let* () = + Option.iter_es + (fun v -> + let k = V.id v in + remove_persitant_data h k) + elt_opt + in + return elt_opt + + let clear h = + let open Lwt_syntax in + H.clear h.heap ; + (* Remove the persistent elements from the disk. *) + let elts = Lwt_unix.files_of_directory h.path in + let metadata_elts = Lwt_unix.files_of_directory h.metadata_path in + let unlink file = + Lwt.catch + (fun () -> + if Sys.is_directory file then return_unit else Lwt_unix.unlink file) + (fun e -> + Format.ksprintf + Stdlib.failwith + "Error in unlink %s: %s" + file + (Printexc.to_string e)) + in + let* () = + Lwt_stream.iter_s (fun f -> unlink (Filename.concat h.path f)) elts + and* () = + Lwt_stream.iter_s + (fun m -> unlink (Filename.concat h.metadata_path m)) + metadata_elts + in + return_unit + + let peek_min h = H.peek_min h.heap + + let length h = H.length h.heap + + let find_opt h k = H.find_opt k h.heap + + let elements h = H.elements h.heap + + let remove_predicate f h = + let open Lwt_result_syntax in + let removed_id = H.remove_predicate f h.heap in + let* () = List.iter_es (remove_persitant_data h) removed_id in + return_unit + + let load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter = + let open Lwt_result_syntax in + let* h = create ~data_dir capacity in + let*! d = Lwt_unix.opendir h.path in + let rec browse acc = + let*! filename = + let open Lwt_syntax in + Lwt.catch + (fun () -> + let+ f = Lwt_unix.readdir d in + Some f) + (function End_of_file -> return_none | e -> Lwt.reraise e) + in + match filename with + | None -> return acc + | Some filename -> + let* acc = + match K.of_b58check_opt filename with + | None -> return acc + | Some k -> ( + let+ v_meta = + match warn_unreadable with + | None -> + let* v = read_value (filedata h k) V.encoding + and* meta = + read_value (filemetadata h k) metadata_encoding + in + return_some (v, meta) + | Some warn -> + let open Lwt_syntax in + let* v = maybe_read_value ~warn (filedata h k) V.encoding + and* meta = + maybe_read_value + ~warn + (filemetadata h k) + metadata_encoding + in + return_ok @@ Option.bind v + @@ fun v -> Option.bind meta @@ fun meta -> Some (v, meta) + in + match v_meta with + | None -> acc + | Some (v, meta) -> + if filter v then (k, v, meta) :: acc else acc) + in + browse acc + in + let* list = browse [] in + let list = + List.fast_sort + (fun (_, _, meta1) (_, _, meta2) -> Stdlib.compare meta1 meta2) + list + in + let*? () = List.iter_e (fun (_k, v, _) -> heap_insert h v) list in + return h +end diff --git a/src/lib_injector/disk_persistence.mli b/src/lib_injector/disk_persistence.mli index 5f0bede5ca0a..a87fc1ab466f 100644 --- a/src/lib_injector/disk_persistence.mli +++ b/src/lib_injector/disk_persistence.mli @@ -177,3 +177,76 @@ module Make_queue filter:(V.t -> bool) -> t tzresult Lwt.t end + +(** Create an on-disk persistent version of the {!Bounded_min_heap} + data structure. *) +module Make_heap + (N : sig + (** Name used to derive a path (relative to [data_dir] in [load_from_disk]) of where + to store the persistent information for this queue. *) + val name : string + end) + (K : Tezos_crypto.Intfs.HASH) + (V : sig + type t + + val id : t -> K.t + + val compare : t -> t -> int + + val persist : t -> bool + + val encoding : t Data_encoding.t + end) : sig + type t + + (** [remove h k] removes the binding from [k] in [h]. If [k] is not bound in + [c], it does nothing. The removal is persisted on disk. *) + val remove : t -> K.t -> unit tzresult Lwt.t + + (** [remove_predicate f h] removes the binding in [h] where [f elt = + true]. The removal is persisted on disk. *) + val remove_predicate : (V.t -> bool) -> t -> unit tzresult Lwt.t + + (** [insert h K.t v] binds the key [k] to the value [v] in the queue [h]. This + may or may not cause another binding to be removed, depending on the + number of bindings already present in [h]. The addition (or replacement) + is persisted on disk. *) + val insert : t -> K.t -> V.t -> unit tzresult Lwt.t + + (** [peek_min h] return, without removing, the smallest element from + the heap [h], [None] if empty. *) + val peek_min : t -> V.t option + + (** [pop h] remove the smallest element from the heap [h], + [None] if empty. *) + val pop : t -> V.t option tzresult Lwt.t + + (** [find_opt h k] is [Some v] if [k] is bound to [v] in [h]. It is [None] + otherwise. *) + val find_opt : t -> K.t -> V.t option + + (** [elemets h] returns the elements of the queue [h] from oldest to + newest. *) + val elements : t -> V.t list + + (** [length h] is the number of bindings held by [h]. *) + val length : t -> int + + (** [clear h] empties the queue [h] and removes its persistent content on + disk. *) + val clear : t -> unit Lwt.t + + (** [load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter] creates a + bounded hash queue of capacity [capacity]. The queue is populated by + persistent elements present in [data_dir/N.name] which pass the [filter] + (the directory is created if it does not exist). If [warn_unreadable] is + [Some warn], unreadable files are ignored but a warning is printed with + [warn], otherwise the loading fails on the first unreadable file. *) + val load_from_disk : + warn_unreadable:(string -> error trace -> unit Lwt.t) option -> + capacity:int -> + data_dir:string -> + filter:(V.t -> bool) -> + t tzresult Lwt.t +end diff --git a/src/lib_stdlib/bounded_min_heap.ml b/src/lib_stdlib/bounded_min_heap.ml index 689a8e651a79..9f3ef189b059 100644 --- a/src/lib_stdlib/bounded_min_heap.ml +++ b/src/lib_stdlib/bounded_min_heap.ml @@ -148,17 +148,28 @@ struct Ok ()) else Error "no space left in heap" + let remove_index t index = + t.data.(index) <- None ; + t.size <- t.size - 1 ; + if t.size > 0 && index <> t.size then ( + swap index t.size t ; + bubble_down index t) + + let remove t id = + (* uses `find_opt` and not `find` because it may failed if called + with unknown id *) + match Hash_map.find_opt t.hash_map id with + | None -> () + | Some index -> + remove_index t index ; + Hash_map.remove t.hash_map id + let pop t = if t.size = 0 then None else let min_elem = get_data t.data 0 in - t.data.(0) <- None ; - t.size <- t.size - 1 ; - if t.size > 0 then ( - swap 0 t.size t ; - bubble_down 0 t) ; let id = E.id min_elem in - Hash_map.remove t.hash_map id ; + remove t id ; assert (Hash_map.length t.hash_map = t.size) ; Some min_elem @@ -179,8 +190,27 @@ struct let elt_i = Hash_map.find_opt t.hash_map id in Option.map (get_data t.data) elt_i + let clear t = + Hash_map.clear t.hash_map ; + Array.fill t.data 0 t.size None ; + t.size <- 0 + + let remove_predicate f (t : t) = + let rec aux heap_index acc = + if heap_index >= t.size then acc + else + let value = get_data t.data heap_index in + if f value then ( + let id = E.id value in + remove t id ; + assert (Hash_map.length t.hash_map = t.size) ; + aux heap_index (id :: acc)) + else aux (heap_index + 1) acc + in + aux 0 [] + module Internal_for_tests = struct - let check_heap_invariant ~pp_elt h = + let check_heap_invariant ~pp_id ~pp_elt h = let get_data_opt i = if i < Array.length h.data then h.data.(i) else None in @@ -239,6 +269,21 @@ struct (get_data_opt right_child_index) correct_map_size) in - check_invariant_for_all_index h.size + Format.printf + "@.@.HEAP:@.heap: %a@.size: %d@.hash_map: %a@.@." + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + (pp_print_option pp_elt)) + (h.data |> Array.to_list) + h.size + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + (fun fmt (id, index) -> + Format.fprintf fmt "(%a, %d)" pp_id id index)) + (Hash_map.to_seq h.hash_map |> List.of_seq) ; + check_invariant_for_all_index h.size ; + assert (Hash_map.length h.hash_map = h.size) end end diff --git a/src/lib_stdlib/bounded_min_heap.mli b/src/lib_stdlib/bounded_min_heap.mli index a65da7ad3f39..c438ea05fb2d 100644 --- a/src/lib_stdlib/bounded_min_heap.mli +++ b/src/lib_stdlib/bounded_min_heap.mli @@ -66,8 +66,21 @@ module Make id] is true. *) val find_opt : Id.t -> t -> E.t option + (** [remove h id] removes the element with [id] from the heap, or leaves [h] unchanged if [id] is not present in [h]. *) + val remove : t -> Id.t -> unit + + (** [remove_predicate f h] removes the elements [elt] for which [f elt = true] from + the heap and returns the list of ids removed. *) + val remove_predicate : (E.t -> bool) -> t -> Id.t list + + (** [clear h] deletes all data from the heap. *) + val clear : t -> unit + module Internal_for_tests : sig val check_heap_invariant : - pp_elt:(Format.formatter -> E.t -> unit) -> t -> unit + pp_id:(Format.formatter -> Id.t -> unit) -> + pp_elt:(Format.formatter -> E.t -> unit) -> + t -> + unit end end diff --git a/src/lib_stdlib/test/test_bounded_min_heap.ml b/src/lib_stdlib/test/test_bounded_min_heap.ml index e6fb0d4b1250..96c94a45f890 100644 --- a/src/lib_stdlib/test/test_bounded_min_heap.ml +++ b/src/lib_stdlib/test/test_bounded_min_heap.ml @@ -39,6 +39,12 @@ let take_nth_smallest n l = *) let size = QCheck2.Gen.int_range 2 1000 +let check_invariant b = + B.Internal_for_tests.check_heap_invariant + ~pp_id:Format.pp_print_string + ~pp_elt:Format.pp_print_int + b + let check_insert_heap ?(expect_heap_is_full = false) v b = let res = B.insert v b in let () = @@ -47,7 +53,7 @@ let check_insert_heap ?(expect_heap_is_full = false) v b = Assert.equal ~msg:__LOC__ res (Error error) else Assert.equal ~msg:__LOC__ res (Ok ()) in - B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b + check_invariant b let test_empty_works () = let b = B.create 0 in @@ -63,7 +69,7 @@ let test_heap_works () = let check_pop ~__LOC__ = List.iter (fun v -> let extracted = B.pop b in - B.Internal_for_tests.check_heap_invariant ~pp_elt:Format.pp_print_int b ; + check_invariant b ; Assert.equal ~pp:Format.(pp_print_option pp_print_int) ~msg:__LOC__ @@ -109,6 +115,31 @@ let test_heap_works () = true ; Assert.equal ~msg:__LOC__ (B.find_opt (id 0) b) None ; Assert.equal ~msg:__LOC__ (B.find_opt (id 11) b) None ; + B.clear b ; + check_invariant b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + check_insert_list [1; 2; 3; 4; 4; 3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3; 4] ; + B.remove b (string_of_int 4) ; + check_invariant b ; + B.remove b (string_of_int 2) ; + check_invariant b ; + B.remove b (string_of_int 0) ; + check_invariant b ; + B.remove b (string_of_int 1) ; + check_invariant b ; + B.remove b (string_of_int 3) ; + check_invariant b ; + B.remove b (string_of_int 0) ; + check_invariant b ; + Assert.equal ~msg:__LOC__ (B.elements b) [] ; + check_insert_list [4; 3; 2; 1] ; + Assert.equal ~msg:__LOC__ (B.elements b) [1; 2; 3; 4] ; + let removed_ids = B.remove_predicate (fun i -> i <= 2) b in + Assert.equal ~msg:__LOC__ (List.sort String.compare removed_ids) ["1"; "2"] ; + Assert.equal ~msg:__LOC__ (B.elements b) [3; 4] ; + check_invariant b ; + () let () = -- GitLab From eb915ebee8398695106339f4be6ef89ad80e467e Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 4 Dec 2024 09:43:31 +0100 Subject: [PATCH 5/6] injector: uses heap instead of queue --- src/lib_injector/disk_persistence.ml | 155 -------------------------- src/lib_injector/disk_persistence.mli | 60 ---------- src/lib_injector/injector_functor.ml | 138 +++++++++++------------ tezt/tests/sc_rollup.ml | 113 +++++++++++++++++++ 4 files changed, 178 insertions(+), 288 deletions(-) diff --git a/src/lib_injector/disk_persistence.ml b/src/lib_injector/disk_persistence.ml index d0e2f5ccfb7c..4682f257e930 100644 --- a/src/lib_injector/disk_persistence.ml +++ b/src/lib_injector/disk_persistence.ml @@ -291,161 +291,6 @@ module Make_table (H : H) = struct t end -module Make_queue - (N : sig - val name : string - end) - (K : Tezos_crypto.Intfs.HASH) - (V : sig - type t - - val encoding : t Data_encoding.t - - val persist : t -> bool - end) = -struct - module Q = Hash_queue.Make (K) (V) - - type t = {path : string; metadata_path : string; queue : Q.t} - - let counter = ref min_int - - let filedata q k = Filename.concat q.path (K.to_b58check k) - - let filemetadata q k = Filename.concat q.metadata_path (K.to_b58check k) - - let create ~data_dir n = - let open Lwt_result_syntax in - let queue = Q.create n in - let path = Filename.concat data_dir N.name in - let metadata_path = Filename.concat path "metadata" in - let* () = create_dir path in - let+ () = create_dir metadata_path in - {path; metadata_path; queue} - - let remove q k = - let open Lwt_result_syntax in - Q.remove q.queue k ; - let* () = delete_file (filedata q k) - and* () = delete_file (filemetadata q k) in - return_unit - - let create_metadata () = - let time = Time.System.now () in - let d, ps = Ptime.to_span time |> Ptime.Span.to_d_ps in - let c = !counter in - incr counter ; - (d, ps, c) - - let metadata_encoding = - let open Data_encoding in - conv - (fun (d, ps, c) -> (Int64.of_int d, ps, Int64.of_int c)) - (fun (d, ps, c) -> (Int64.to_int d, ps, Int64.to_int c)) - @@ tup3 int64 int64 int64 - - let replace q k v = - let open Lwt_result_syntax in - Q.replace q.queue k v ; - if V.persist v then - let* () = write_value (filedata q k) V.encoding v - and* () = - write_value (filemetadata q k) metadata_encoding (create_metadata ()) - in - return_unit - else return_unit - - let clear q = - let open Lwt_syntax in - Q.clear q.queue ; - (* Remove the persistent elements from the disk. *) - let elts = Lwt_unix.files_of_directory q.path in - let metadata_elts = Lwt_unix.files_of_directory q.metadata_path in - let unlink file = - Lwt.catch - (fun () -> - if Sys.is_directory file then return_unit else Lwt_unix.unlink file) - (fun e -> - Format.ksprintf - Stdlib.failwith - "Error in unlink %s: %s" - file - (Printexc.to_string e)) - in - let* () = - Lwt_stream.iter_s (fun f -> unlink (Filename.concat q.path f)) elts - and* () = - Lwt_stream.iter_s - (fun m -> unlink (Filename.concat q.metadata_path m)) - metadata_elts - in - return_unit - - let fold f q = Q.fold f q.queue - - let length q = Q.length q.queue - - let find_opt q k = Q.find_opt q.queue k - - let elements q = Q.elements q.queue - - let load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter = - let open Lwt_result_syntax in - let* q = create ~data_dir capacity in - let*! d = Lwt_unix.opendir q.path in - let rec browse acc = - let*! filename = - let open Lwt_syntax in - Lwt.catch - (fun () -> - let+ f = Lwt_unix.readdir d in - Some f) - (function End_of_file -> return_none | e -> Lwt.reraise e) - in - match filename with - | None -> return acc - | Some filename -> - let* acc = - match K.of_b58check_opt filename with - | None -> return acc - | Some k -> ( - let+ v_meta = - match warn_unreadable with - | None -> - let* v = read_value (filedata q k) V.encoding - and* meta = - read_value (filemetadata q k) metadata_encoding - in - return_some (v, meta) - | Some warn -> - let open Lwt_syntax in - let* v = maybe_read_value ~warn (filedata q k) V.encoding - and* meta = - maybe_read_value - ~warn - (filemetadata q k) - metadata_encoding - in - return_ok @@ Option.bind v - @@ fun v -> Option.bind meta @@ fun meta -> Some (v, meta) - in - match v_meta with - | None -> acc - | Some (v, meta) -> - if filter v then (k, v, meta) :: acc else acc) - in - browse acc - in - let* list = browse [] in - let list = - List.fast_sort - (fun (_, _, meta1) (_, _, meta2) -> Stdlib.compare meta1 meta2) - list - in - List.iter (fun (k, v, _) -> Q.replace q.queue k v) list ; - return q -end - module Make_heap (N : sig val name : string diff --git a/src/lib_injector/disk_persistence.mli b/src/lib_injector/disk_persistence.mli index a87fc1ab466f..34d26769daba 100644 --- a/src/lib_injector/disk_persistence.mli +++ b/src/lib_injector/disk_persistence.mli @@ -118,66 +118,6 @@ module Make_table (H : H) : sig t tzresult Lwt.t end -(** Create an on-disk persistent version of the {!Hash_queue} data structure. *) -module Make_queue - (N : sig - (** Name used to derive a path (relative to [data_dir] in [load_from_disk]) of where - to store the persistent information for this queue. *) - val name : string - end) - (K : Tezos_crypto.Intfs.HASH) - (V : sig - type t - - val persist : t -> bool - - val encoding : t Data_encoding.t - end) : sig - type t - - (** [remove q k] removes the binding from [k] in [q]. If [k] is not bound in - [c], it does nothing. The removal is persisted on disk. *) - val remove : t -> K.t -> unit tzresult Lwt.t - - (** [replace q k v] binds the key [k] to the value [v] in the queue [q]. This - may or may not cause another binding to be removed, depending on the - number of bindings already present in [q]. The addition (or replacement) - is persisted on disk. *) - val replace : t -> K.t -> V.t -> unit tzresult Lwt.t - - (** [fold f q init] folds the function [f] over the bindings - of [q] (in memory). The elements are iterated from oldest to newest. *) - val fold : (K.t -> V.t -> 'a -> 'a) -> t -> 'a -> 'a - - (** [find_opt q k] is [Some v] if [k] is bound to [v] in [q]. It is [None] - otherwise. *) - val find_opt : t -> K.t -> V.t option - - (** [elemets q] returns the elements of the queue [q] from oldest to - newest. *) - val elements : t -> V.t list - - (** [length q] is the number of bindings held by [q]. *) - val length : t -> int - - (** [clear q] empties the queue [q] and removes its persistent content on - disk. *) - val clear : t -> unit Lwt.t - - (** [load_from_disk ~warn_unreadable ~capacity ~data_dir ~filter] creates a - bounded hash queue of capacity [capacity]. The queue is populated by - persistent elements present in [data_dir/N.name] which pass the [filter] - (the directory is created if it does not exist). If [warn_unreadable] is - [Some warn], unreadable files are ignored but a warning is printed with - [warn], otherwise the loading fails on the first unreadable file. *) - val load_from_disk : - warn_unreadable:(string -> error trace -> unit Lwt.t) option -> - capacity:int -> - data_dir:string -> - filter:(V.t -> bool) -> - t tzresult Lwt.t -end - (** Create an on-disk persistent version of the {!Bounded_min_heap} data structure. *) module Make_heap diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 646ce3da2cb8..8b42cf2d9883 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -110,10 +110,10 @@ module Make (Parameters : PARAMETERS) = struct | Injected of injected_info | Included of included_info - module Op_queue = - Disk_persistence.Make_queue + module Op_heap = + Disk_persistence.Make_heap (struct - let name = "operations_queue" + let name = "operations_heap" end) (Id) (struct @@ -165,8 +165,7 @@ module Make (Parameters : PARAMETERS) = struct strategy : injection_strategy; (** The strategy of this worker for injecting the pending operations. *) save_dir : string; (** Path to where save persistent state *) - queue : Op_queue.t; - (** The queue of pending operations for this injector. *) + heap : Op_heap.t; (** The heap of pending operations for this injector. *) injected : injected_state; (** The information about injected operations. *) included : included_state; @@ -265,14 +264,14 @@ module Make (Parameters : PARAMETERS) = struct Event.(emit loaded_from_disk) (List.map (fun s -> s.alias) signers, tags, nb, kind) in - let* queue = - Op_queue.load_from_disk + let* heap = + Op_heap.load_from_disk ~warn_unreadable ~capacity:50_000 ~data_dir ~filter:(filter (fun op -> op.Inj_operation.operation)) in - let*! () = emit_event_loaded "operations_queue" @@ Op_queue.length queue in + let*! () = emit_event_loaded "operations_queue" @@ Op_heap.length heap in (* Very coarse approximation for the number of operation we expect for each block *) let n = @@ -313,7 +312,7 @@ module Make (Parameters : PARAMETERS) = struct tags; strategy; save_dir = data_dir; - queue; + heap; injected = {injected_operations; injected_ophs}; included = {included_operations; included_in_blocks}; state; @@ -326,11 +325,11 @@ module Make (Parameters : PARAMETERS) = struct } (** We consider an operation to already exist in the injector if either: - - It is already in the queue + - It is already in the heap - It is injected but not included - It is included but not confirmed. *) let already_exists state op_hash = - Op_queue.find_opt state.queue op_hash <> None + Op_heap.find_opt state.heap op_hash <> None || Injected_operations.mem state.injected.injected_operations op_hash || match @@ -343,8 +342,8 @@ module Make (Parameters : PARAMETERS) = struct | Some {level = head_level; _} -> Int32.sub head_level l1_level < Int32.of_int confirmations) - (** Add an operation to the pending queue corresponding to the signer for this - operation. *) + (** Add an operation to the pending heap corresponding to the signer for this + operation. *) let add_pending_operation ?(retry = false) state (op : Inj_operation.t) = let open Lwt_result_syntax in if already_exists state op.id then @@ -356,7 +355,7 @@ module Make (Parameters : PARAMETERS) = struct state op.operation in - Op_queue.replace state.queue op.id op + Op_heap.insert state.heap op.id op (** Mark operations as injected (in [oph]). *) let add_injected_operations state {pkh = signer_pkh; _} oph ~injection_level @@ -557,12 +556,12 @@ module Make (Parameters : PARAMETERS) = struct let*! () = Event.(emit1 number_of_operations_in_queue) state - (Op_queue.length state.queue) + (Op_heap.length state.heap) in (* We perform a dichotomy by injecting the first half of the operations (we are not looking to maximize the number of operations injected because of the cost of simulation). Only the operations - which are actually injected will be removed from the queue so the + which are actually injected will be removed from the heap so the other half will be reconsidered later. *) match keep_half operations with | None -> @@ -602,7 +601,7 @@ module Make (Parameters : PARAMETERS) = struct op.errors.count op.errors.last_error in - Op_queue.remove state.queue op.id + Op_heap.remove state.heap op.id else let*! () = Event.(emit3 ?signers error_simulation_operation) @@ -696,46 +695,44 @@ module Make (Parameters : PARAMETERS) = struct in (oph, operations) - (** Retrieve as many batch of operations from the queue while batch - size remains below the size limit. *) - let get_n_ops_batch_from_queue ~size_limit state n = - let exception - Reached_limit of - (int * Inj_operation.t list * int * Inj_operation.t list list) - in + (** Retrieve as many batch of operations from the heap while batch + size remains below the size limit. *) + let get_n_ops_batch_from_queue ~size_limit state nb_signers = + let open Lwt_result_syntax in let module Proto_client = (val state.proto_client) in let min_size = Block_hash.size + Signature.size Signature.zero in let op_size op = Proto_client.operation_size op.Inj_operation.operation + Proto_client.operation_size_overhead in - let _current_size, rev_current_ops, nb_batch, rev_ops_batch = - try - Op_queue.fold - (fun _oph - op - ((current_size, rev_current_ops, nb_batch, rev_ops_batch) as acc) -> - if nb_batch = n then raise (Reached_limit acc) ; - let new_size = current_size + op_size op in - if new_size > size_limit then - let current_ops = List.rev rev_current_ops in - ( min_size + op_size op, - [op], - nb_batch + 1, - current_ops :: rev_ops_batch ) - else (new_size, op :: rev_current_ops, nb_batch, rev_ops_batch)) - state.queue - (min_size, [], 0, []) - with Reached_limit acc -> acc + let add_batch rev_batch rev_batches = + if List.is_empty rev_batch then rev_batches + else + let batch = List.rev rev_batch in + batch :: rev_batches in - let rev_ops_batch = - if nb_batch < n then - (* Add the last batch, even if it's not of full size, to ensure a larger number of batches. *) - let current_ops = List.rev rev_current_ops in - current_ops :: rev_ops_batch - else rev_ops_batch + let rec get_until_enough (batch_size, rev_ops) (nb_batches, rev_batches) = + let op_opt = Op_heap.peek_min state.heap in + match op_opt with + | None -> + (* Heap is empty, finalize current batch *) + return @@ add_batch rev_ops rev_batches + | Some op -> + let new_size = batch_size + op_size op in + if new_size <= size_limit then + (* pop the message as we only peeked it. *) + let* _op = Op_heap.pop state.heap in + get_until_enough (new_size, op :: rev_ops) (nb_batches, rev_batches) + else + (* we haven't pop the message, so we can safely call + continue_or_stop. *) + continue_or_stop (nb_batches + 1, add_batch rev_ops rev_batches) + and continue_or_stop (nb_batches, rev_batches) = + if nb_batches = nb_signers then return rev_batches + else get_until_enough (min_size, []) (nb_batches, rev_batches) in - List.rev rev_ops_batch + let* rev_batches = continue_or_stop (0, []) in + return @@ List.rev rev_batches (* Ignore operations that are allowed to fail. *) let ignore_ignorable_failing_operations state operations = @@ -762,9 +759,9 @@ module Make (Parameters : PARAMETERS) = struct `Ignored operations_to_drop (** [inject_pending_operations_round state ?size_limit signer] - injects operations from the pending queue [state.pending], whose + injects operations from the pending heap [state.pending], whose total size does not exceed [size_limit] using [signer]. Upon - successful injection, the operations are removed from the queue + successful injection, the operations are removed from the heap and marked as injected. *) let inject_pending_operations_round state signer operations_to_inject = let open Lwt_result_syntax in @@ -808,7 +805,7 @@ module Make (Parameters : PARAMETERS) = struct let* () = List.iter_es (fun (_index, op) -> - Op_queue.remove state.queue op.Inj_operation.id) + Op_heap.remove state.heap op.Inj_operation.id) injected_operations in let*! () = @@ -843,7 +840,7 @@ module Make (Parameters : PARAMETERS) = struct in let* () = List.iter_es - (fun op -> Op_queue.remove state.queue op.Inj_operation.id) + (fun op -> Op_heap.remove state.heap op.Inj_operation.id) operations_to_drop in return (`Continue 0)) @@ -854,7 +851,7 @@ module Make (Parameters : PARAMETERS) = struct Proto_client.max_operation_data_length) () = let open Lwt_result_syntax in let signers = available_signers state in - let ops_batch = + let* ops_batch = get_n_ops_batch_from_queue ~size_limit state (List.length signers) in let signers_and_ops = List.combine_drop signers ops_batch in @@ -1007,7 +1004,7 @@ module Make (Parameters : PARAMETERS) = struct (** [revert_included_operations state block] marks the known (by this injector) manager operations contained in [block] as not being included any more, typically in the case of a reorganization where [block] is on an alternative - chain. The operations are put back in the pending queue. *) + chain. The operations are put back in the pending heap. *) let revert_included_operations state block = let open Lwt_result_syntax in when_ (has_included_operations state) @@ fun () -> @@ -1018,7 +1015,7 @@ module Make (Parameters : PARAMETERS) = struct (List.map (fun o -> o.op.id) revert_infos) in (* TODO: https://gitlab.com/tezos/tezos/-/issues/2814 - maybe put at the front of the queue for re-injection. *) + maybe put at the front of the heap for re-injection. *) List.iter_es (fun {op; _} -> let*! requeue = @@ -1097,7 +1094,7 @@ module Make (Parameters : PARAMETERS) = struct let set_metrics state = Metrics.wrap @@ fun () -> let tags = Tags.to_seq state.tags |> List.of_seq in - Metrics.set_queue_size tags (Op_queue.length state.queue) ; + Metrics.set_queue_size tags (Op_heap.length state.heap) ; Metrics.set_injected_operations_size tags (Injected_operations.length state.injected.injected_operations) ; @@ -1170,7 +1167,7 @@ module Make (Parameters : PARAMETERS) = struct set_last_head state head (* The request {Request.Inject} triggers an injection of the operations - the pending queue. *) + the pending heap. *) let on_inject state = let open Lwt_result_syntax in let* total_nb_injected_op = @@ -1180,7 +1177,7 @@ module Make (Parameters : PARAMETERS) = struct let*! () = Event.(emit1 number_of_operations_in_queue) state - (Op_queue.length state.queue) + (Op_heap.length state.heap) in return () @@ -1189,18 +1186,13 @@ module Make (Parameters : PARAMETERS) = struct Injected_ophs.clear state.injected.injected_ophs ; Included_operations.clear state.included.included_operations ; Included_in_blocks.clear state.included.included_in_blocks ; - Op_queue.clear state.queue + Op_heap.clear state.heap let remove_operations_with_tag tag state = - let open Lwt_result_syntax in - Op_queue.fold - (fun id op acc -> - if Parameters.Tag.equal (Parameters.operation_tag op.operation) tag then - let* () = Op_queue.remove state.queue id in - acc - else acc) - state.queue - return_unit + Op_heap.remove_predicate + (fun op -> + Parameters.Tag.equal (Parameters.operation_tag op.operation) tag) + state.heap module Types = struct type nonrec state = state @@ -1576,7 +1568,7 @@ module Make (Parameters : PARAMETERS) = struct |> List.map (fun (tags, _w) -> Tags.to_seq tags |> List.of_seq) let op_status_in_worker state l1_hash = - match Op_queue.find_opt state.queue l1_hash with + match Op_heap.find_opt state.heap l1_hash with | Some op -> Some (Pending op.operation) | None -> ( match @@ -1603,7 +1595,7 @@ module Make (Parameters : PARAMETERS) = struct List.fold_left (fun (acc, total) (tags, w) -> let state = Worker.state w in - let len = Op_queue.length state.queue in + let len = Op_heap.length state.heap in let tag_list = Tags.to_seq tags |> List.of_seq in ((tag_list, len) :: acc, total + len)) ([], 0) @@ -1619,9 +1611,9 @@ module Make (Parameters : PARAMETERS) = struct if not to_count then acc else let state = Worker.state w in - let queue = Op_queue.elements state.queue in + let heap = Op_heap.elements state.heap in let tag_list = Tags.to_seq tags |> List.of_seq in - (tag_list, queue) :: acc) + (tag_list, heap) :: acc) [] workers diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 16cb16b66c8e..044c49fc260f 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -5981,6 +5981,118 @@ let test_batcher_order_msgs ~kind = in unit +let test_injector_order_operations_by_kind ~kind = + let commitment_period = 5 in + let challenge_window = 5 in + test_full_scenario + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6650 + + cf multiple_batcher_test comment. *) + ~rpc_external:false + ~kind + ~commitment_period + ~challenge_window + ~mode:Operator + { + variant = None; + tags = ["injector"; "operations"; "order"]; + description = "Injector order operations by kind"; + } + @@ fun _protocol rollup_node rollup_addr _node client -> + let* () = Sc_rollup_node.run ~event_level:`Debug rollup_node rollup_addr [] in + let* _ = Client.bake_for_and_wait client in + (* to be 1 block after the origination, otherwise it fails for + something we don't care *) + let check_sr_ops_are_ordered () = + let* block = Client.RPC.call client @@ RPC.get_chain_block () in + let ops = JSON.(block |-> "operations" |=> 3 |> as_list) in + let ops_kind = + let open JSON in + let filter_sr_op json = + let kind = json |-> "kind" |> as_string in + if String.starts_with ~prefix:"smart_rollup" kind then Some kind + else None + in + List.map + (fun op -> op |-> "contents" |> as_list |> List.filter_map filter_sr_op) + ops + |> List.flatten + in + let all = + [ + ["smart_rollup_timeout"]; + ["smart_rollup_refute"]; + ["smart_rollup_publish"; "smart_rollup_cement"]; + ["smart_rollup_recover"]; + ["smart_rollup_add_messages"]; + ["smart_rollup_execute_outbox_message"]; + ] + in + let is_sorted = + let rec aux all l = + match (all, l) with + | _, hd :: rest when not (String.starts_with ~prefix:"smart_rollup" hd) + -> + (*skip op not smart rollup*) + aux all rest + | current :: all_rest, hd :: rest -> + if List.mem hd current then aux all rest else aux all_rest l + | _, [] -> (* all element of l appeared in all in the same order *) true + | [], _ -> + (* There is still at least an element of L but the + sorted list is empty. L was not correctly + ordered. *) + false + in + aux all + in + Check.is_true + ~__LOC__ + ~error_msg: + (Format.asprintf + "injected operations are not sorted, [%a]" + Format.( + pp_print_list + ~pp_sep:(fun fmt () -> Format.pp_print_string fmt "; ") + pp_print_string) + ops_kind) + @@ is_sorted ops_kind ; + unit + in + + let hook i = + let* () = + (* check done at each block so we don't miss any *) + check_sr_ops_are_ordered () + in + let* _ = + Sc_rollup_node.RPC.call rollup_node + @@ Sc_rollup_rpc.post_local_batcher_injection + ~messages:[string_of_int i] + () + in + unit + in + (* We only check add_messages, publish and cement operations + order. We could do more but test becomes more involved and I + think it's not necessary. *) + let* level = + bake_until_lpc_updated + ~at_least:(commitment_period + 2) + ~hook + client + rollup_node + in + let* _ = + bake_until_lcc_updated + ~at_least:(challenge_window + 2) + ~hook + client + rollup_node + ~level + in + unit + (** Injector only uses key that have no operation in the mempool currently. 1. Batcher setup: @@ -6638,6 +6750,7 @@ let register_protocol_independent () = test_multiple_batcher_key ~kind protocols ; test_batcher_order_msgs ~kind protocols ; test_injector_uses_available_keys protocols ~kind ; + test_injector_order_operations_by_kind protocols ~kind ; test_batcher_dont_reinject_already_injected_messages protocols ~kind ; test_private_rollup_node_publish_in_whitelist protocols ; test_private_rollup_node_publish_not_in_whitelist protocols ; -- GitLab From 100cc0a56304114b34a5ffd85623a4966649e8e3 Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 4 Dec 2024 09:46:00 +0100 Subject: [PATCH 6/6] batcher: inject batch with lowest element's order from batch --- src/lib_injector/injector_functor.ml | 4 +-- src/lib_injector/injector_sigs.ml | 2 +- src/lib_smart_rollup/l2_message.ml | 2 ++ src/lib_smart_rollup/l2_message.mli | 2 ++ src/lib_smart_rollup_node/batcher.ml | 32 ++++++++++++++------- src/lib_smart_rollup_node/injector.ml | 4 +-- src/lib_smart_rollup_node/injector.mli | 5 +++- tezt/tests/sc_rollup.ml | 40 ++++++++++++++++++++------ 8 files changed, 67 insertions(+), 24 deletions(-) diff --git a/src/lib_injector/injector_functor.ml b/src/lib_injector/injector_functor.ml index 8b42cf2d9883..9b64d5d567a8 100644 --- a/src/lib_injector/injector_functor.ml +++ b/src/lib_injector/injector_functor.ml @@ -1551,9 +1551,9 @@ module Make (Parameters : PARAMETERS) = struct tag | Some worker -> return worker - let add_pending_operation op = + let add_pending_operation ?order op = let open Lwt_result_syntax in - let operation = Inj_operation.make op in + let operation = Inj_operation.make ?order op in let*? w = worker_of_tag (Parameters.operation_tag op) in let* () = add_pending_operation (Worker.state w) operation in return operation.id diff --git a/src/lib_injector/injector_sigs.ml b/src/lib_injector/injector_sigs.ml index 891809821001..e6e090d11d62 100644 --- a/src/lib_injector/injector_sigs.ml +++ b/src/lib_injector/injector_sigs.ml @@ -332,7 +332,7 @@ module type S = sig (** Add an operation as pending injection in the injector. It returns the id of the operation in the injector queue. *) - val add_pending_operation : operation -> Id.t tzresult Lwt.t + val add_pending_operation : ?order:Z.t -> operation -> Id.t tzresult Lwt.t (** Trigger an injection of the pending operations for all workers. If [tags] is given, only the workers which have a tag in [tags] inject their pending diff --git a/src/lib_smart_rollup/l2_message.ml b/src/lib_smart_rollup/l2_message.ml index dcf17e7ec224..377975d602fc 100644 --- a/src/lib_smart_rollup/l2_message.ml +++ b/src/lib_smart_rollup/l2_message.ml @@ -106,3 +106,5 @@ let compare msg1 msg2 = | None, None -> counter_cmp () | Some _p, _ -> -1 | _, Some _p -> 1 + +let order {order; _} = order diff --git a/src/lib_smart_rollup/l2_message.mli b/src/lib_smart_rollup/l2_message.mli index e08e22926aca..0153adf5170d 100644 --- a/src/lib_smart_rollup/l2_message.mli +++ b/src/lib_smart_rollup/l2_message.mli @@ -53,4 +53,6 @@ val id : t -> Id.t val counter : t -> Z.t +val order : t -> Z.t option + val compare : t -> t -> int diff --git a/src/lib_smart_rollup_node/batcher.ml b/src/lib_smart_rollup_node/batcher.ml index 83b137fcf361..e0ae3810525d 100644 --- a/src/lib_smart_rollup_node/batcher.ml +++ b/src/lib_smart_rollup_node/batcher.ml @@ -49,12 +49,13 @@ let message_size s = (* Encoded as length of s on 4 bytes + s *) 4 + String.length s -let inject_batch state (l2_messages : L2_message.t list) = +let inject_batch ?order state (l2_messages : L2_message.t list) = let open Lwt_result_syntax in let messages = List.map L2_message.content l2_messages in let operation = L1_operation.Add_messages {messages} in let* l1_id = Injector.check_and_add_pending_operation + ?order state.node_ctxt.config.mode operation in @@ -72,7 +73,11 @@ let inject_batch state (l2_messages : L2_message.t list) = Batched_messages.replace state.batched id {content; l1_id}) l2_messages -let inject_batches state = List.iter_es (inject_batch state) +let inject_batches state = + List.iter_es (fun (min_order, batch) -> + (* We inject that batch taking the smallest order of that batch, + i.e. the one of the first element. *) + inject_batch ?order:min_order state batch) let max_batch_size {node_ctxt; plugin; _} = let module Plugin = (val plugin) in @@ -84,17 +89,23 @@ let get_batches state ~only_full = let max_batch_size = max_batch_size state in let max_batch_elements = state.node_ctxt.config.batcher.max_batch_elements in let heap = state.messages_heap in - let add_batch rev_batch rev_batches = + let add_batch ~min_order rev_batch rev_batches = if List.is_empty rev_batch then rev_batches else let batch = List.rev rev_batch in - batch :: rev_batches + (min_order, batch) :: rev_batches in - let rec pop_until_enough (rev_batch, batch_size, batch_elements) rev_batches = + let rec pop_until_enough ~is_first + (min_order, rev_batch, batch_size, batch_elements) rev_batches = let message = Message_heap.peek_min heap in match message with - | None -> add_batch rev_batch rev_batches + | None -> add_batch ~min_order rev_batch rev_batches | Some message -> + let min_order = + (* first element of the batch has the lower order. We only + care about this one.*) + if is_first then L2_message.order message else min_order + in let size = message_size (L2_message.content message) in let new_batch_size = batch_size + size in let new_batch_elements = batch_elements + 1 in @@ -107,19 +118,20 @@ let get_batches state ~only_full = (* We can add the message to the current batch because we are still within the bounds. *) pop_until_enough - (message :: rev_batch, new_batch_size, new_batch_elements) + ~is_first:false + (min_order, message :: rev_batch, new_batch_size, new_batch_elements) rev_batches else (* we haven't pop the message, so we can safely call continue_or_stop. *) - continue_or_stop (add_batch rev_batch rev_batches) + continue_or_stop (add_batch ~min_order rev_batch rev_batches) and continue_or_stop rev_batches = if only_full && Message_heap.length heap < state.node_ctxt.config.batcher.min_batch_elements then rev_batches - else pop_until_enough ([], 0, 0) rev_batches + else pop_until_enough ~is_first:true (None, [], 0, 0) rev_batches in continue_or_stop [] |> List.rev @@ -129,7 +141,7 @@ let produce_batches state ~only_full = let batches = get_batches state ~only_full in let get_timestamp = Time.System.now () in let nb_messages_batched = - List.fold_left (fun len l -> len + List.length l) 0 batches + List.fold_left (fun len (_min_order, l) -> len + List.length l) 0 batches in Metrics.wrap (fun () -> Metrics.Batcher.set_messages_queue_size diff --git a/src/lib_smart_rollup_node/injector.ml b/src/lib_smart_rollup_node/injector.ml index 2d98f3d82387..3a75431a3417 100644 --- a/src/lib_smart_rollup_node/injector.ml +++ b/src/lib_smart_rollup_node/injector.ml @@ -171,10 +171,10 @@ end include Injector_functor.Make (Parameters) -let check_and_add_pending_operation (mode : Configuration.mode) +let check_and_add_pending_operation (mode : Configuration.mode) ?order (operation : L1_operation.t) = let open Lwt_result_syntax in if Configuration.(can_inject mode (Parameters.operation_tag operation)) then - let* hash = add_pending_operation operation in + let* hash = add_pending_operation ?order operation in return (Some hash) else return None diff --git a/src/lib_smart_rollup_node/injector.mli b/src/lib_smart_rollup_node/injector.mli index 0409250dd43d..ae60572f4302 100644 --- a/src/lib_smart_rollup_node/injector.mli +++ b/src/lib_smart_rollup_node/injector.mli @@ -43,4 +43,7 @@ include mode. If allowed, adds it to the pending operation. Returns [None] when the operation is not allowed in the mode. *) val check_and_add_pending_operation : - Configuration.mode -> L1_operation.t -> Inj_operation.id option tzresult Lwt.t + Configuration.mode -> + ?order:Z.t -> + L1_operation.t -> + Inj_operation.id option tzresult Lwt.t diff --git a/tezt/tests/sc_rollup.ml b/tezt/tests/sc_rollup.ml index 044c49fc260f..d1133a49c4d4 100644 --- a/tezt/tests/sc_rollup.ml +++ b/tezt/tests/sc_rollup.ml @@ -423,9 +423,13 @@ let wait_for_included_successful_operation node ~operation_kind = let wait_until_n_batches_are_injected rollup_node ~nb_batches = let nb_injected = ref 0 in - Sc_rollup_node.wait_for rollup_node "injected_ops.v0" @@ fun _json -> - nb_injected := !nb_injected + 1 ; - if !nb_injected >= nb_batches then Some () else None + Sc_rollup_node.wait_for rollup_node "injected_ops.v0" @@ fun json -> + JSON.( + json |-> "operations" |> as_list + |> List.iter (fun json -> + let kind = json |-> "kind" |> as_string in + if kind = "add_messages" then nb_injected := !nb_injected + 1)) ; + if !nb_injected = nb_batches then Some () else None let send_message_batcher_aux ?rpc_hooks client sc_node msgs = let batched = @@ -5879,10 +5883,12 @@ let test_batcher_order_msgs ~kind = unit in - let bake_then_check_included_msgs ~__LOC__ ~nb_batches ~expected_messages = + let bake_then_check_included_msgs ~__LOC__ ~expected_messages = let* level = Node.get_level node in let wait_for_injected = - wait_until_n_batches_are_injected rollup_node ~nb_batches + wait_until_n_batches_are_injected + rollup_node + ~nb_batches:(List.length expected_messages) in let* () = Client.bake_for_and_wait client and* () = wait_for_injected @@ -5938,7 +5944,6 @@ let test_batcher_order_msgs ~kind = let* () = bake_then_check_included_msgs ~__LOC__ - ~nb_batches:1 ~expected_messages:[expected_messages] in @@ -5976,9 +5981,28 @@ let test_batcher_order_msgs ~kind = List.init min_batch_elements (fun i -> (max_batch_elements + i) * 10); ] in - let* () = - bake_then_check_included_msgs ~__LOC__ ~nb_batches:1 ~expected_messages + let* () = bake_then_check_included_msgs ~__LOC__ ~expected_messages in + + Log.info + "Injecting multiple time the mimimal batches element in reverse order of \ + priority to make sure the injector order them correctly" ; + (* [0; 1; ... 9] *) + let messages_no_order = List.init min_batch_elements Fun.id in + let* _hashes = inject_int_of_string messages_no_order in + (* [10; 11; ... 19] *) + let messages_order_2 = + List.init min_batch_elements (fun i -> min_batch_elements + i) + in + let* _hashes = inject_int_of_string ~order:2 messages_order_2 in + (* [20; 21; ... 29] *) + let messages_order_1 = + List.init min_batch_elements (fun i -> (2 * min_batch_elements) + i) + in + let* _hashes = inject_int_of_string ~order:1 messages_order_1 in + let expected_messages = + [messages_order_1; messages_order_2; messages_no_order] in + let* () = bake_then_check_included_msgs ~__LOC__ ~expected_messages in unit let test_injector_order_operations_by_kind ~kind = -- GitLab