From 01dd62446decc4b71dab731bc479a084e14caa15 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 3 Sep 2025 13:54:15 +0200 Subject: [PATCH 01/16] Gossipsub/state: Add monadic operation for 'a check --- src/lib_gossipsub/state_monad.ml | 26 ++++++++++++++++++++++++++ src/lib_gossipsub/state_monad.mli | 18 +++++++++++++++++- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/src/lib_gossipsub/state_monad.ml b/src/lib_gossipsub/state_monad.ml index b35254fa5d03..9caaf284d62b 100644 --- a/src/lib_gossipsub/state_monad.ml +++ b/src/lib_gossipsub/state_monad.ml @@ -43,10 +43,17 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return_pass : 'pass -> ('state, 'pass, 'fail) check val return_fail : 'fail -> ('state, 'pass, 'fail) check + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + module Syntax : sig val ( let* ) : ('state, 'a) t -> ('a -> ('state, 'b) t) -> ('state, 'b) t @@ -57,6 +64,11 @@ module type S = sig val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return : 'a -> ('state, 'a) t val pass : 'pass -> ('state, 'pass, 'fail) check @@ -85,6 +97,18 @@ module M : S = struct let return_fail x = `Fail x |> return + let bind_check c f = + bind c (function `Pass res -> f res | `Fail e -> return_fail e) + + let map_fold f l = + let rec bis acc state = function + | [] -> return (List.rev acc) state + | hd :: tl -> + let state, value = f hd state in + bis (value :: acc) state tl + in + fun state -> bis [] state l + module Syntax = struct let ( let* ) = bind @@ -92,6 +116,8 @@ module M : S = struct let ( let*! ) = get + let ( let** ) = bind_check + let return = return let pass = return_pass diff --git a/src/lib_gossipsub/state_monad.mli b/src/lib_gossipsub/state_monad.mli index 9466a6fa013c..fc72d997b9b5 100644 --- a/src/lib_gossipsub/state_monad.mli +++ b/src/lib_gossipsub/state_monad.mli @@ -57,7 +57,13 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t - (** [return_pass v] is the default constuctor of [`Pass] values in the + (** [bind_check m f] is the bind function for the [('state, 'a, 'fail) t] monad. *) + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + + (** [return_pass v] is the default constructor of [`Pass] values in the [('state, 'pass, 'fail) check] monad. *) val return_pass : 'pass -> ('state, 'pass, 'fail) check @@ -65,6 +71,10 @@ module type S = sig [('state, 'pass, 'fail) check] monad. *) val return_fail : 'fail -> ('state, 'pass, 'fail) check + (** [map_fold f l] iteratively applies [f] to all elements of [l] updating + the state at each step. *) + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + (** Infix notations and/or shorter aliases for the functions above. *) module Syntax : sig (** Infix notation for {!bind}. *) @@ -79,6 +89,12 @@ module type S = sig (** Infix notation for {!get}. *) val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + (** Infix notation for {!bind_check}. *) + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + (** Re-exporting {!return} function in this sub-module. *) val return : 'a -> ('state, 'a) t -- GitLab From 91e36c869859aadd230cce7b4c2f83caeb81f8f8 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 3 Sep 2025 13:54:28 +0200 Subject: [PATCH 02/16] Gossipsub: Allow to treat messages in batches --- src/lib_dal_node/gossipsub/gs_interface.ml | 16 +++ src/lib_dal_node/gossipsub/gs_interface.mli | 10 ++ src/lib_dal_node/gossipsub/gs_logging.ml | 10 ++ src/lib_gossipsub/gossipsub_automaton.ml | 109 ++++++++++++++++-- src/lib_gossipsub/gossipsub_intf.ml | 65 ++++++++++- src/lib_gossipsub/gossipsub_worker.ml | 61 ++++++++-- .../test/gossipsub_pbt_generators.ml | 2 +- .../test/test_gossipsub_shared.ml | 2 + src/lib_gossipsub/test/test_unit.ml | 32 +++-- 9 files changed, 268 insertions(+), 39 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index c6ce6d3a18ee..e8170ebbb5b7 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -45,12 +45,26 @@ module Validate_message_hook = struct ()) ; `Unknown) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 + Refactor gossipsub integration to avoid this mutable hook in the lib. *) + let check_message_batch = + ref (fun _batch -> + Internal_event.Simple.( + emit__dont_wait__use_with_care + (warn_validation_function_not_set ()) + ()) ; + []) + let set func = check_message := func + + let set_batch func = check_message_batch := func end let message_valid ?message ~message_id () = !Validate_message_hook.check_message ?message ~message_id () +let message_valid_batch = !Validate_message_hook.check_message_batch + module Automaton_config : AUTOMATON_CONFIG with type Time.t = Types.Time.t @@ -76,6 +90,8 @@ module Automaton_config : include Types.Message let valid = message_valid + + let valid_batch = message_valid_batch end end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.mli b/src/lib_dal_node/gossipsub/gs_interface.mli index 7a66f419f2e6..82d45dd32209 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.mli +++ b/src/lib_dal_node/gossipsub/gs_interface.mli @@ -56,4 +56,14 @@ module Validate_message_hook : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 27fceb2d3159..0c533e0f2b0e 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -43,6 +43,15 @@ module Events = struct ~level:Debug () + let batch_treatment = + declare_0 + ~section + ~prefix_name_with_section:true + ~name:"batch_treatment" + ~msg:"Validation of a message batch" + ~level:Debug + () + let check_unknown_messages = declare_0 ~section @@ -254,3 +263,4 @@ let event ~verbose = | IHave {topic; message_ids} -> emit ihave (from_peer.peer_id, topic, message_ids) | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) + | Process_batch _ -> emit batch_treatment () diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index a0cb0d5a6b55..3cf24e5a1cc4 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -130,6 +130,10 @@ module Make (C : AUTOMATON_CONFIG) : | Invalid_message : [`Receive_message] output | Unknown_validity : [`Receive_message] output | Outdated : [`Receive_message] output + | Included_in_batch : [`Receive_message] output + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output | Already_joined : [`Join] output | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output | Not_joined : [`Leave] output @@ -1150,15 +1154,19 @@ module Make (C : AUTOMATON_CONFIG) : in fail Invalid_message - let handle sender topic message_id message = + let initial_message_checks {topic; message_id; sender; _} : + ( state, + Peer.Set.t, + [`Receive_message] output ) + Tezos_gossipsub__State_monad.check = let open Monad.Syntax in let*! mesh_opt = find_mesh topic in - let*? peers_in_mesh = + let** peers_in_mesh = match mesh_opt with | Some peers -> pass peers | None -> fail Not_subscribed in - let*? () = + let** () = let*! message_cache in match Message_cache.get_first_seen_time_and_senders message_id message_cache @@ -1173,9 +1181,21 @@ module Make (C : AUTOMATON_CONFIG) : in fail Already_received in - let*? () = check_message_id_valid sender topic message_id in - let*? () = check_message_valid sender topic message message_id in + let** () = check_message_id_valid sender topic message_id in + pass peers_in_mesh + + let peers_to_route sender peers_in_mesh topic = + let open Monad.Syntax in let peers = Peer.Set.remove sender peers_in_mesh in + let* direct_peers = get_direct_peers topic in + return (Peer.Set.union peers direct_peers) + + let handle_sequentially + ({sender; topic; message_id; message} as receive_message) : + [`Receive_message] output Monad.t = + let open Monad.Syntax in + let*? peers_in_mesh = initial_message_checks receive_message in + let*? () = check_message_valid sender topic message message_id in let* () = put_message_in_cache ~peer:(Some sender) message_id message topic in @@ -1183,19 +1203,86 @@ module Make (C : AUTOMATON_CONFIG) : update_score sender (fun stats -> Score.first_message_delivered stats topic) in - let* direct_peers = get_direct_peers topic in - let to_route = Peer.Set.union peers direct_peers in + let* to_route = peers_to_route sender peers_in_mesh topic in (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 Filter out peers from which we already received the message, or an IHave message? *) Route_message {to_route} |> return + + let handle_batch = + let msg_batch = Queue.create () in + let running = ref false in + let start_batch_timer time callback = + let open Lwt_syntax in + Lwt.async (fun () -> + let* () = Lwt_unix.sleep time in + let batch = List.of_seq (Queue.to_seq msg_batch) in + Queue.clear msg_batch ; + running := false ; + callback batch ; + return_unit) + in + fun ~callback ~time_interval ({sender; topic; _} as receive_message) -> + let open Monad.Syntax in + let*? peers_in_mesh = initial_message_checks receive_message in + let* to_route = peers_to_route sender peers_in_mesh topic in + Queue.push (receive_message, to_route) msg_batch ; + if not !running then ( + running := true ; + start_batch_timer time_interval callback) ; + return Included_in_batch end - let handle_receive_message : + let handle_receive_message_sequentially : receive_message -> [`Receive_message] output Monad.t = - fun {sender; topic; message_id; message} -> - Receive_message.handle sender topic message_id message + fun receive_message -> Receive_message.handle_sequentially receive_message + + let handle_receive_message_batch ~callback ~time_interval : + receive_message -> [`Receive_message] output Monad.t = + fun receive_message -> + Receive_message.handle_batch ~callback ~time_interval receive_message + + let check_message_batch batch = + let unfolded_batch = + List.map + (fun ({sender; topic; message_id; message}, peers) -> + (sender, topic, message_id, message, peers)) + batch + in + List.combine + ~when_different_lengths:() + batch + (Message.valid_batch unfolded_batch) + + let apply_batch (batch : (receive_message * Peer.Set.t) list) : + [`Treated_batch] output Monad.t = + let open Monad.Syntax in + match check_message_batch batch with + | Error () -> + let l = + List.map + (fun (received_msg, _peers) -> (received_msg, Unknown_validity)) + batch + in + return (Batch_result l) + | Ok annotated_batch -> + let* l = + Monad.map_fold + (fun ((({sender; topic; _} as received_msg), peers), result) -> + match result with + | `Valid -> return (received_msg, Route_message {to_route = peers}) + | `Outdated -> return (received_msg, Outdated) + | `Unknown -> return (received_msg, Unknown_validity) + | `Invalid -> + let* () = + update_score sender (fun stats -> + Score.invalid_message_delivered stats topic) + in + return (received_msg, Invalid_message)) + annotated_batch + in + return (Batch_result l) module Publish_message = struct let check_not_seen message_id = @@ -2416,7 +2503,9 @@ module Make (C : AUTOMATON_CONFIG) : "Route_message %a" Fmt.Dump.(record [field "to_route" Fun.id pp_peer_set]) to_route + | Included_in_batch -> fprintf fmtr "Included in batch" | Already_received -> fprintf fmtr "Already_received" + | Batch_result _res -> fprintf fmtr "Batch_result" | Not_subscribed -> fprintf fmtr "Not_subscribed" | Already_joined -> fprintf fmtr "Already_joined" | Joining_topic {to_graft} -> diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 1c010c0ff7a2..6eebd35ef235 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -69,6 +69,23 @@ module type AUTOMATON_SUBCONFIG = sig message_id:Message_id.t -> unit -> [`Valid | `Unknown | `Outdated | `Invalid] + + (** [valid_batch] performs an application layer-level validity check on a + message batch. + + If the [message_handling] field of the gossipsub worker is [Sequentially], + this function is never called. + This function is called when the field is [In_batches]. However, [valid] + is still called even in this case: since message ids are not batched + and [valid_batch] cannot be called without providing a message, [valid] + remains used when processing message ids without the message, especially + the [IHave] events. + + The output is a list of the same length as the input, containing the + result for each message of the batch. *) + val valid_batch : + (Peer.t * Topic.t * Message_id.t * t * Peer.Set.t) list -> + [`Valid | `Unknown | `Outdated | `Invalid] list end end @@ -705,7 +722,7 @@ module type AUTOMATON = sig - Direct peers for the message's topic; - The peers in the topic's mesh minus the original sender of the message. *) | Already_received : [`Receive_message] output - (** Received a message that has already been recevied before. *) + (** Received a message that has already been received before. *) | Not_subscribed : [`Receive_message] output (** Received a message from a remote peer for a topic we are not subscribed to (called "unknown topic" in the Go implementation). *) @@ -714,6 +731,17 @@ module type AUTOMATON = sig | Unknown_validity : [`Receive_message] output (** Validity cannot be decided yet. *) | Outdated : [`Receive_message] output (** The message is outdated. *) + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output + (** The message passed all the Gossipsub checks. The worker should add this + message in the current batch. It must be noted that most checks + (especially cryptographic ones) are done when the batch is processed. *) + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output + (** Result of the processing of the batch. It contains the list of all + messages of the batch with their respective output. *) | Already_joined : [`Join] output (** Attempting to join a topic we already joined. *) | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output @@ -811,10 +839,29 @@ module type AUTOMATON = sig on this topic. *) val handle_prune : prune -> [`Prune] monad - (** [handle_receive_message { sender; topic; message_id; message }] handles - a message received from [sender] on the gossip network. The function returns - a set of peers to which the (full) message will be directly forwarded. *) - val handle_receive_message : receive_message -> [`Receive_message] monad + (** [handle_receive_message_sequentially received_message] handles a + [received_message] on the gossip network. The function returns either a + rejection reason for the message or a set of peers to which the (full) + message will be directly forwarded. *) + val handle_receive_message_sequentially : + receive_message -> [`Receive_message] monad + + (** [handle_receive_message_batch callback time_interval received_message] + handles a message received from [sender] on the gossip network. + The function computes a set of peers to which the (full) message will be + directly forwarded. + If no batch is currently in production, it creates a new one. + It appends the received message paired with the computed set of peers to + the current batch. + When a batch exists for [time_interval] seconds, the list of received + messages (with their associated set of peers) is passed to the [callback] + function, which pushes an event [Batch_to_treat batch] on the stream of + events the automaton should handle. *) + val handle_receive_message_batch : + callback:((receive_message * Peer.Set.t) list -> unit) -> + time_interval:float -> + receive_message -> + [`Receive_message] monad (** [publish { topic; message_id; message }] allows to publish a message on the gossip network from the local node. The function returns a set of peers @@ -834,6 +881,12 @@ module type AUTOMATON = sig topic. *) val leave : leave -> [`Leave] monad + (** [apply_batch batch] handles a complete batch and outputs an element of the + state monad which encapsulates the list of [[`Receive_message] ouptut] + result for each entry of the batch. *) + val apply_batch : + (receive_message * Peer.Set.t) list -> [`Treated_batch] monad + (** [set_application_score {peer; score}] handles setting the application score of [peer]. If the peer is not known, this does nothing. *) val set_application_score : @@ -1183,6 +1236,7 @@ module type WORKER = sig | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * GS.Peer.Set.t) list (** [make ~events_logging ~initial_points rng limits parameters] initializes a new Gossipsub automaton with the given arguments. Then, it initializes @@ -1193,6 +1247,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?initial_points:(unit -> Point.t list) -> + ?batching_interval:float -> self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 30e181cf5d03..089a859186c5 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -224,6 +224,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * Peer.Set.t) list module Bounded_message_map = struct (* We maintain the invariant that: @@ -269,6 +270,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end + type message_treatment = + | Sequentially + | In_batches of {time_interval : float (* In seconds *)} + (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to communicate with the application and P2P layers. *) @@ -285,6 +290,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : unknown_validity_messages : Bounded_message_map.t; unreachable_points : int64 Point.Map.t; (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) + message_handling : message_treatment; } (** A worker instance is made of its status and state. *) @@ -364,7 +370,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Note that it's the responsability of the automaton modules to filter out peers based on various criteria (bad score, connection expired, ...). *) - let handle_receive_message received_message = function + let handle_receive_message (received_message : GS.receive_message) : + worker_state * [`Receive_message] GS.output -> worker_state = function | state, GS.Route_message {to_route} -> Introspection.update_count_recv_valid_app_messages state.stats `Incr ; let ({sender = _; topic; message_id; message} : GS.receive_message) = @@ -376,7 +383,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let has_joined = View.(has_joined topic @@ view state.gossip_state) in if has_joined then emit_app_output state message_with_header ; state - | state, GS.Already_received | state, GS.Not_subscribed -> state + | state, GS.Already_received + | state, GS.Not_subscribed + | state, GS.Included_in_batch -> + state | state, GS.Unknown_validity -> Introspection.update_count_recv_unknown_validity_app_messages state.stats @@ -741,18 +751,39 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.leave {topic} gossip_state |> update_gossip_state state |> handle_leave topic + let handle_batch (state, GS.Batch_result batch) = + List.fold_left + (fun state (msg, out) -> handle_receive_message msg (state, out)) + state + batch + + let apply_batch_event ({gossip_state; _} as state) batch = + GS.apply_batch batch gossip_state + |> update_gossip_state state |> handle_batch + (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function - | Message_with_header {message; topic; message_id} -> - let receive_message = - {GS.sender = from_peer; topic; message_id; message} - in - (GS.handle_receive_message receive_message gossip_state - |> update_gossip_state state - |> handle_receive_message receive_message) + | Message_with_header {message; topic; message_id} -> ( + (let receive_message = + {GS.sender = from_peer; topic; message_id; message} + in + match state.message_handling with + | Sequentially -> + GS.handle_receive_message_sequentially receive_message gossip_state + |> update_gossip_state state + |> handle_receive_message receive_message + | In_batches {time_interval} -> + GS.handle_receive_message_batch + ~callback:(fun batch -> + Stream.push (Batch_to_treat batch) state.events_stream) + ~time_interval + receive_message + gossip_state + |> update_gossip_state state + |> handle_receive_message receive_message) [@profiler.span_f {verbosity = Notice} - ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] + ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]]) | Graft {topic} -> let graft : GS.graft = {peer = from_peer; topic} in (GS.handle_graft graft gossip_state @@ -858,7 +889,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : match GS.Message_id.valid message.message_id with | `Valid | `Invalid -> let state = {state with unknown_validity_messages} in - GS.handle_receive_message message state.gossip_state + GS.handle_receive_message_sequentially message state.gossip_state |> update_gossip_state state |> handle_receive_message message |> (* Other messages are processed recursively *) @@ -899,6 +930,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state [@profiler.span_f {verbosity = Notice} ["apply_event"; "Check_unknown_messages"]] + | Process_batch batch -> apply_batch_event state batch (** A helper function that pushes events in the state *) let push e {status = _; state; self = _} = Stream.push e state.events_stream @@ -997,7 +1029,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : event_loop_promise let make ?(events_logging = fun _event -> Monad.return ()) - ?(initial_points = fun () -> []) ~self rng limits parameters = + ?(initial_points = fun () -> []) ?batching_interval ~self rng limits + parameters = { self; status = Starting; @@ -1014,6 +1047,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : events_logging; unknown_validity_messages = Bounded_message_map.make ~capacity:10_000; unreachable_points = Point.Map.empty; + message_handling = + (match batching_interval with + | None -> Sequentially + | Some time_interval -> In_batches {time_interval}); }; } diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml index 317b68532660..875a76210f50 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml @@ -259,7 +259,7 @@ struct | Iwant m -> GS.handle_iwant m state | Graft m -> GS.handle_graft m state | Prune m -> GS.handle_prune m state - | Receive_message m -> GS.handle_receive_message m state + | Receive_message m -> GS.handle_receive_message_sequentially m state | Publish_message m -> GS.publish_message m state | Heartbeat -> GS.heartbeat state | Join m -> GS.join m state diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 9ba07142f11e..f5fcfac2adb7 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -167,6 +167,8 @@ module Automaton_config : include String_iterable let valid ?message ~message_id () = Validity_hook.apply message message_id + + let valid_batch = List.map (fun _ -> `Valid) end end end diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 2a2243564714..6c57c479d480 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -602,14 +602,16 @@ let test_receiving_message rng limits parameters = let message_id = 0 in (* Receive a message for a joined topic. *) let state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message_sequentially + {sender; topic; message_id; message} + state in let peers_to_route = match output with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message {to_route} -> to_route + | Route_message {to_route} | To_include_in_batch (_, to_route) -> to_route in (* Should return [degree_optimal] peers to route the message to. *) Check.( @@ -690,19 +692,23 @@ let test_message_duplicate_score_bug rng _limits parameters = let state, _ = GS.handle_graft {peer = sender; topic} state in (* Receive a message for a joined topic. *) let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + GS.handle_receive_message_sequentially + {sender; topic; message_id = 0; message} + state in let () = match output with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message _ -> () + | Route_message _ | To_include_in_batch _ -> () in (* Send one more message so that [sender]'s P2 score is greater than its P3 score. In this way its score is positive, and [sender] is not pruned. *) let state, _ = - GS.handle_receive_message {sender; topic; message_id = 1; message} state + GS.handle_receive_message_sequentially + {sender; topic; message_id = 1; message} + state in (* Send the first message again. *) let per_topic_score_limits = @@ -716,13 +722,15 @@ let test_message_duplicate_score_bug rng _limits parameters = let state, _ = GS.heartbeat state in assert_mesh_inclusion ~__LOC__ ~topic ~peer:sender ~is_included:true state ; let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + GS.handle_receive_message_sequentially + {sender; topic; message_id = 0; message} + state in let () = match output with | Already_received -> () | Not_subscribed | Invalid_message | Unknown_validity | Outdated - | Route_message _ -> + | Route_message _ | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling should fail with Already_received." in let expected_score = @@ -775,11 +783,13 @@ let test_receiving_message_for_unsusbcribed_topic rng limits parameters = let message = "some data" in let message_id = 0 in let _state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message_sequentially + {sender; topic; message_id; message} + state in match output with | Already_received | Route_message _ | Invalid_message | Unknown_validity - | Outdated -> + | Outdated | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling of received message should fail with [Not_subscribed]." @@ -2152,7 +2162,7 @@ let test_scoring_p2 rng _limits parameters = let peers = Array.of_list peers in let receive_message ~__LOC__ peer message_id message state = let state, output = - GS.handle_receive_message + GS.handle_receive_message_sequentially {sender = peer; topic; message_id; message} state in @@ -2275,7 +2285,7 @@ let test_scoring_p4 rng _limits parameters = (fun state () -> let message_id, message = gen_message () in let state, _ = - GS.handle_receive_message + GS.handle_receive_message_sequentially {sender = peer; topic; message_id; message} state in -- GitLab From 78ea239460a74a9cd781b5259c96f96266e9a97d Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Fri, 1 Aug 2025 10:50:15 +0200 Subject: [PATCH 03/16] Gossipsub: Move Lwt_async batching logic from the automaton to the worker --- src/lib_gossipsub/README.md | 5 ++- src/lib_gossipsub/gossipsub_automaton.ml | 39 ++++++----------- src/lib_gossipsub/gossipsub_intf.ml | 6 +-- src/lib_gossipsub/gossipsub_worker.ml | 55 ++++++++++++++++++++---- 4 files changed, 64 insertions(+), 41 deletions(-) diff --git a/src/lib_gossipsub/README.md b/src/lib_gossipsub/README.md index 7eba3edac12f..9a16a6eab677 100644 --- a/src/lib_gossipsub/README.md +++ b/src/lib_gossipsub/README.md @@ -45,9 +45,12 @@ receives inputs from both the P2P and application layers, processes them using the automaton, and emits corresponding actions back to the P2P layer (messages, connections, disconnections) and the application layer (full messages). -The WORKER module is designed to be agnostic to the underlying concurrency model. +The WORKER module was designed to be agnostic to the underlying concurrency model. Rather than depending on a specific concurrency library (such as Lwt), it requires the host application to supply an I/O monad and stream implementation. +It is no longer the case since merge request [!18848](https://gitlab.com/tezos/tezos/-/merge_requests/18848) which introduces the use of `Lwt.async` in the +function `batch_accumulator` of the worker. + This design allows the application to retain control over the event loop and execution model, enabling the worker to integrate cleanly into diverse runtime environments. diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index 3cf24e5a1cc4..a1338d112de4 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -130,7 +130,9 @@ module Make (C : AUTOMATON_CONFIG) : | Invalid_message : [`Receive_message] output | Unknown_validity : [`Receive_message] output | Outdated : [`Receive_message] output - | Included_in_batch : [`Receive_message] output + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output | Batch_result : (receive_message * [`Receive_message] output) list -> [`Treated_batch] output @@ -1210,38 +1212,21 @@ module Make (C : AUTOMATON_CONFIG) : IHave message? *) Route_message {to_route} |> return - let handle_batch = - let msg_batch = Queue.create () in - let running = ref false in - let start_batch_timer time callback = - let open Lwt_syntax in - Lwt.async (fun () -> - let* () = Lwt_unix.sleep time in - let batch = List.of_seq (Queue.to_seq msg_batch) in - Queue.clear msg_batch ; - running := false ; - callback batch ; - return_unit) - in - fun ~callback ~time_interval ({sender; topic; _} as receive_message) -> - let open Monad.Syntax in - let*? peers_in_mesh = initial_message_checks receive_message in - let* to_route = peers_to_route sender peers_in_mesh topic in - Queue.push (receive_message, to_route) msg_batch ; - if not !running then ( - running := true ; - start_batch_timer time_interval callback) ; - return Included_in_batch + let handle_batch ({sender; topic; message_id; _} as receive_message) = + let open Monad.Syntax in + let*? peers_in_mesh = initial_message_checks receive_message in + let*? () = check_message_id_valid sender topic message_id in + let* to_route = peers_to_route sender peers_in_mesh topic in + return (To_include_in_batch (receive_message, to_route)) end let handle_receive_message_sequentially : receive_message -> [`Receive_message] output Monad.t = fun receive_message -> Receive_message.handle_sequentially receive_message - let handle_receive_message_batch ~callback ~time_interval : + let handle_receive_message_batch : receive_message -> [`Receive_message] output Monad.t = - fun receive_message -> - Receive_message.handle_batch ~callback ~time_interval receive_message + fun receive_message -> Receive_message.handle_batch receive_message let check_message_batch batch = let unfolded_batch = @@ -2503,7 +2488,7 @@ module Make (C : AUTOMATON_CONFIG) : "Route_message %a" Fmt.Dump.(record [field "to_route" Fun.id pp_peer_set]) to_route - | Included_in_batch -> fprintf fmtr "Included in batch" + | To_include_in_batch _ -> fprintf fmtr "To include in batch" | Already_received -> fprintf fmtr "Already_received" | Batch_result _res -> fprintf fmtr "Batch_result" | Not_subscribed -> fprintf fmtr "Not_subscribed" diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 6eebd35ef235..0191e74ff282 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -857,11 +857,7 @@ module type AUTOMATON = sig messages (with their associated set of peers) is passed to the [callback] function, which pushes an event [Batch_to_treat batch] on the stream of events the automaton should handle. *) - val handle_receive_message_batch : - callback:((receive_message * Peer.Set.t) list -> unit) -> - time_interval:float -> - receive_message -> - [`Receive_message] monad + val handle_receive_message_batch : receive_message -> [`Receive_message] monad (** [publish { topic; message_id; message }] allows to publish a message on the gossip network from the local node. The function returns a set of peers diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 089a859186c5..558d70f5dce7 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -274,6 +274,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Sequentially | In_batches of {time_interval : float (* In seconds *)} + type batch_state = + | Pending + | Accumulating of (GS.receive_message * Peer.Set.t) list + (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to communicate with the application and P2P layers. *) @@ -385,7 +389,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state | state, GS.Already_received | state, GS.Not_subscribed - | state, GS.Included_in_batch -> + | state, GS.To_include_in_batch _ -> state | state, GS.Unknown_validity -> Introspection.update_count_recv_unknown_validity_app_messages @@ -761,6 +765,43 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.apply_batch batch gossip_state |> update_gossip_state state |> handle_batch + (* This function uses Lwt.async, making the gossipsub library not independent + of the concurrency model anymore. + If independence from [Lwt] is considered useful to recover, a proposed + implementation could be: + https://gitlab.com/tezos/tezos/-/commit/a361dd827b43b601eb9de89e7187f9bcf7fc64d1 + This implementation is not the current implementation, because it has the + disadvantage of having a "ticking clock" and accumulate whatever arrives + between two ticks. It was considered nicer to have the accumulation to + start at the reception of the first shard, because due to network latencies, + the reception will most probably not align well with the clock. + + Even with current design, it happens to have the shards for a commitment + to be splitted between two batches. Increasing the frequency of this event + would probably not be an issue. *) + let batch_accumulator = + let current_batch = ref Pending in + fun output time_interval events_stream -> + match output with + | GS.To_include_in_batch content -> ( + match !current_batch with + | Pending -> + let open Lwt_syntax in + Lwt.async (fun () -> + let* () = Lwt_unix.sleep time_interval in + let batch = + match !current_batch with + | Accumulating batch -> batch + | Pending -> [] + in + current_batch := Pending ; + Stream.push (Batch_to_treat batch) events_stream ; + return_unit) ; + current_batch := Accumulating [content] + | Accumulating prev_contents -> + current_batch := Accumulating (content :: prev_contents)) + | _ -> () + (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> ( @@ -773,13 +814,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : |> update_gossip_state state |> handle_receive_message receive_message | In_batches {time_interval} -> - GS.handle_receive_message_batch - ~callback:(fun batch -> - Stream.push (Batch_to_treat batch) state.events_stream) - ~time_interval - receive_message - gossip_state - |> update_gossip_state state + let new_state, output = + GS.handle_receive_message_batch receive_message gossip_state + in + batch_accumulator output time_interval state.events_stream ; + update_gossip_state state (new_state, output) |> handle_receive_message receive_message) [@profiler.span_f {verbosity = Notice} -- GitLab From 122ac70f5ccd3b71d5011103338f49042bef469f Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Fri, 1 Aug 2025 11:08:50 +0200 Subject: [PATCH 04/16] Gossipsub: Update cache and peer score according to batch analysis result --- src/lib_gossipsub/gossipsub_automaton.ml | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index a1338d112de4..177cfd4c945d 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -1254,9 +1254,24 @@ module Make (C : AUTOMATON_CONFIG) : | Ok annotated_batch -> let* l = Monad.map_fold - (fun ((({sender; topic; _} as received_msg), peers), result) -> + (fun ( ( ({sender; topic; message_id; message} as received_msg), + peers ), + result ) + -> match result with - | `Valid -> return (received_msg, Route_message {to_route = peers}) + | `Valid -> + let* () = + put_message_in_cache + ~peer:(Some sender) + message_id + message + topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + return (received_msg, Route_message {to_route = peers}) | `Outdated -> return (received_msg, Outdated) | `Unknown -> return (received_msg, Unknown_validity) | `Invalid -> -- GitLab From 33695ed0ab5c6b03ec9b42f11926ccccd42c9572 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Tue, 29 Jul 2025 19:01:43 +0200 Subject: [PATCH 05/16] DAL/node: Use verify multi to run cryptographic verification in batch --- src/lib_dal_node/block_handler.ml | 8 +- src/lib_dal_node/constants.ml | 12 ++ src/lib_dal_node/constants.mli | 4 + src/lib_dal_node/daemon.ml | 9 +- src/lib_dal_node/event.ml | 21 +++ src/lib_dal_node/gossipsub/gossipsub.mli | 10 ++ src/lib_dal_node/gossipsub/gs_interface.ml | 15 +- src/lib_dal_node/message_validation.ml | 181 +++++++++++++++++++-- src/lib_dal_node/message_validation.mli | 28 +++- 9 files changed, 264 insertions(+), 24 deletions(-) diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index e623a88f7d00..f5e7fe525d37 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -579,11 +579,17 @@ let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash in return_unit in + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level:level + proto_parameters) ; Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - level + ~head_level:level proto_parameters) ; let*! () = remove_old_level_stored_data proto_parameters ctxt level in let* () = diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 8f9f353c6e98..97fa3d4a095d 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -90,3 +90,15 @@ let bootstrap_dns_refresh_delay = 300. acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) let traps_cache_size = 50 + +(* When a shard is received, we wait for 1 second for other shards before + launching the cryptographic validation of the shards. + Since there shards are supposed to be received several levels in advance, + the risk that this 1.2 second delay makes the validation happen too late + is very low. + It also slows down gossiping a bit, since messages are advertised only after + validation, so if a message has to go through several nodes before reaching + its final destination, the waiting delay accumulates and may be of a few + seconds. It looks fine with 8 blocks of attestation lag and 8 seconds block + time but if those values are reduced a lot, this might become an issue. *) +let batch_time_interval = 1.2 diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 014c228644bb..a9cad9728e8e 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -78,3 +78,7 @@ val bootstrap_dns_refresh_delay : float is acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) val traps_cache_size : int + +(** The time (in seconds) for which shards are accumulated by the gossipsub + automaton before triggering the validation of the batch. *) +val batch_time_interval : float diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index fcd19d0740e1..2f8083a16a6b 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -438,6 +438,7 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file make ~initial_points:get_initial_points ~events_logging:(Logging.event ~verbose:config.verbose) + ~batching_interval:Constants.batch_time_interval ~self rng limits @@ -528,11 +529,17 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Node_context.warn_if_attesters_not_delegates ctxt profile | _ -> return_unit in + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level + proto_parameters) ; Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - head_level + ~head_level proto_parameters) ; let is_prover_profile = Profile_manager.is_prover_profile profile_ctxt in (* Initialize amplificator if in prover profile. diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 8729259fb86f..d3ec8d508a80 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -519,6 +519,21 @@ open struct ("message_id", Types.Message_id.encoding) ("validation_error", Data_encoding.string) + let batch_validation_error = + declare_3 + ~section + ~prefix_name_with_section:true + ~name:"batch_validation_failed" + ~msg: + "validating batch of messages for level {level} and slot {slot_index} \ + failed with error {validation_error}" + ~level:Warning + ~pp1:(fun fmt -> Format.fprintf fmt "%ld") + ~pp2:(fun fmt -> Format.fprintf fmt "%d") + ("level", Data_encoding.int32) + ("slot_index", Data_encoding.int31) + ("validation_error", Data_encoding.string) + let p2p_server_is_ready = declare_1 ~section @@ -1314,6 +1329,12 @@ let emit_dont_wait__message_validation_error ~message_id ~validation_error = message_validation_error (message_id, validation_error) +let emit_dont_wait__batch_validation_error ~level ~slot_index ~validation_error + = + emit__dont_wait__use_with_care + batch_validation_error + (level, slot_index, validation_error) + let emit_p2p_server_is_ready ~point = emit p2p_server_is_ready point let emit_rpc_server_is_ready ~point = emit rpc_server_is_ready point diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index 8f2fd94aedf4..5fd3f84b9671 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -69,6 +69,16 @@ module Worker : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index e8170ebbb5b7..2a5331b470db 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -35,6 +35,13 @@ module Validate_message_hook = struct ~msg:"The message validation function is not set" ~level:Warning + let warn_batch_validation_function_not_set = + Internal_event.Simple.declare_0 + ~section:["dal"; "gs_interface"] + ~name:"batch_validation_function_not_set" + ~msg:"The batch validation function is not set" + ~level:Warning + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 Refactor gossipsub integration to avoid this mutable hook in the lib. *) let check_message = @@ -48,12 +55,12 @@ module Validate_message_hook = struct (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 Refactor gossipsub integration to avoid this mutable hook in the lib. *) let check_message_batch = - ref (fun _batch -> + ref (fun batch -> Internal_event.Simple.( emit__dont_wait__use_with_care - (warn_validation_function_not_set ()) + (warn_batch_validation_function_not_set ()) ()) ; - []) + List.map (fun _ -> `Unknown) batch) let set func = check_message := func @@ -63,7 +70,7 @@ end let message_valid ?message ~message_id () = !Validate_message_hook.check_message ?message ~message_id () -let message_valid_batch = !Validate_message_hook.check_message_batch +let message_valid_batch batch = !Validate_message_hook.check_message_batch batch module Automaton_config : AUTOMATON_CONFIG diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 5854cabbdc43..53bd56a9ffbb 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -23,6 +23,18 @@ (* *) (*****************************************************************************) +let string_of_validation_error = function + | `Invalid_degree_strictly_less_than_expected Cryptobox.{given; expected} -> + Format.sprintf + "Invalid_degree_strictly_less_than_expected. Given: %d, expected: %d" + given + expected + | `Invalid_shard -> "Invalid_shard" + | `Shard_index_out_of_range s -> + Format.sprintf "Shard_index_out_of_range(%s)" s + | `Shard_length_mismatch -> "Shard_length_mismatch" + | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" + (** [gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox message message_id] allows checking whether the given [message] identified by [message_id] is valid with the current [cryptobox] parameters. The validity check is @@ -56,20 +68,7 @@ let gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox match res with | Ok () -> `Valid | Error err -> - let validation_error = - match err with - | `Invalid_degree_strictly_less_than_expected {given; expected} -> - Format.sprintf - "Invalid_degree_strictly_less_than_expected. Given: %d, \ - expected: %d" - given - expected - | `Invalid_shard -> "Invalid_shard" - | `Shard_index_out_of_range s -> - Format.sprintf "Shard_index_out_of_range(%s)" s - | `Shard_length_mismatch -> "Shard_length_mismatch" - | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" - in + let validation_error = string_of_validation_error err in Event.emit_dont_wait__message_validation_error ~message_id ~validation_error ; @@ -143,7 +142,7 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = gossipsub_message_id_topic_validation ctxt proto_parameters message_id | other -> other -(** [gossipsub_app_messages_validation ctxt cryptobox head_level +(** [gossipsub_app_messages_validation ctxt cryptobox ~head_level attestation_lag ?message ~message_id ()] checks for the validity of the given message (if any) and message id. @@ -153,8 +152,8 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = {!gossipsub_message_id_validation}. Then, if a message is given, {!gossipsub_app_message_payload_validation} is used to check its validity. *) -let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters - ?message ~message_id () = +let gossipsub_app_messages_validation ctxt cryptobox ~head_level + proto_parameters ?message ~message_id () = if Node_context.is_bootstrap_node ctxt then (* 1. As bootstrap nodes advertise their profiles to controller nodes, they shouldn't receive messages or messages ids. If this @@ -210,3 +209,151 @@ let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters | other -> (* 4. In the case the message id is not Valid. *) other + +type batch_identifier = {level : int32; slot_index : int} + +module Batch_identifier_hashable = struct + type t = batch_identifier + + let equal a b = a.level = b.level && a.slot_index = b.slot_index + + let hash = Hashtbl.hash +end + +module Batch_tbl = Hashtbl.Make (Batch_identifier_hashable) + +type batch_elem = { + index : int; + id : Types.Message_id.t; + message : Types.Message.t; +} + +type 'a batch_result = { + to_check_in_batch : batch_elem list Batch_tbl.t; + not_valid : (int * 'a) list; + (* List of indices in the original batch with the error to output for + this element. *) +} + +let triage ctxt head_level proto_parameters batch = + let to_check_in_batch = + Batch_tbl.create proto_parameters.Types.number_of_slots + in + let not_valid = + List.fold_left_i + (fun index + not_valid + ( _peer, + _topic, + (Types.Message_id.{level; slot_index; _} as id), + message, + _peers ) -> + (* Have some slack for outdated messages. *) + let slack = 4 in + if + Int32.( + sub head_level level + > of_int (proto_parameters.Types.attestation_lag + slack)) + then (index, `Outdated) :: not_valid + else + match gossipsub_message_id_validation ctxt proto_parameters id with + | `Valid -> + (* We register traps only if the message id is valid, but before + cryptographic check. *) + let store = Node_context.get_store ctxt in + let traps_store = Store.traps store in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + The [proto_parameters] are those for the last known finalized + level, which may differ from those of the slot level. This + will be an issue when the value of the [traps_fraction] + changes. (We cannot use {!Node_context.get_proto_parameters}, + as it is not monad-free; we'll need to use mapping from levels + to parameters.) *) + Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + id + message ; + (* The shard is added to the right batch. + Since the message id check has already been performed, we have + the guarantee that all shards associated to a given + (level, slot_index) pair have the commitment published on the L1, + hence they all belong to the same slot, hence the shards can be + passed to the function verifying several shards simultaneously. *) + let already_sorted = + Batch_tbl.find_opt to_check_in_batch {level; slot_index} + in + let new_entry = + match already_sorted with + | None -> [{index; id; message}] + | Some list -> {index; id; message} :: list + in + Batch_tbl.replace to_check_in_batch {level; slot_index} new_entry ; + not_valid + | other -> + (* In the case the message id is not Valid. *) + (index, other) :: not_valid) + [] + batch + in + {to_check_in_batch; not_valid} + +let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch + = + if Node_context.is_bootstrap_node ctxt then + (* As bootstrap nodes advertise their profiles to controller + nodes, they shouldn't receive messages or messages ids. If this + happens, received data are considered as spam (invalid), and the remote + peer might be punished, depending on the Gossipsub implementation. *) + List.map (fun _ -> `Invalid) batch + else + let batch_size = List.length batch in + let result = Array.init batch_size (fun _ -> `Unknown) in + let {to_check_in_batch; not_valid} = + triage ctxt head_level proto_parameters batch + in + List.iter (fun (index, error) -> result.(index) <- error) not_valid ; + + Batch_tbl.iter + (fun {level; slot_index} batch_list -> + let shards, proofs = + List.fold_left + (fun (shards, proofs) {message; id; _} -> + let Types.Message.{share; shard_proof} = message in + let Types.Message_id.{shard_index; _} = id in + let shard = Cryptobox.{share; index = shard_index} in + (shard :: shards, shard_proof :: proofs)) + ([], []) + batch_list + in + (* We never add empty list in the to_check_in_batch table. *) + let {id = {commitment; _}; _} = + Option.value_f ~default:(fun () -> assert false) (List.hd batch_list) + in + let res = + Dal_metrics.sample_time + ~sampling_frequency:Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi cryptobox commitment shards proofs) + in + match res with + | Ok () -> + List.iter (fun {index; _} -> result.(index) <- `Valid) batch_list + | Error err -> + let validation_error = string_of_validation_error err in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + List.iter (fun {index; _} -> result.(index) <- `Invalid) batch_list + | exception exn -> + (* Don't crash if crypto raised an exception. *) + let validation_error = Printexc.to_string exn in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + List.iter (fun {index; _} -> result.(index) <- `Invalid) batch_list) + to_check_in_batch ; + Array.to_list result diff --git a/src/lib_dal_node/message_validation.mli b/src/lib_dal_node/message_validation.mli index 728bb63f6644..b87985ffc447 100644 --- a/src/lib_dal_node/message_validation.mli +++ b/src/lib_dal_node/message_validation.mli @@ -53,9 +53,35 @@ val gossipsub_app_messages_validation : Node_context.t -> Cryptobox.t -> - int32 -> + head_level:int32 -> Types.proto_parameters -> ?message:Types.Message.t -> message_id:Types.Message_id.t -> unit -> [> `Invalid | `Outdated | `Unknown | `Valid] + +(** [gossipsub_batch_validation ctxt cryptobox head_level + proto_parameters batch ()] validates a batch of Gossipsub messages and + their associated message ids in the context of the given DAL node. + + The validation follows the same layered approach as [gossipsub_app_messages_validation] + except that after checking the age and validity of the id of a message, + each message is affected to a sub-batch with only messages for the same + level and slot. + + The cryptographic verification is then done per sub-batch. + + This function is intended to be registered as the Gossipsub batch validation hook. +*) +val gossipsub_batch_validation : + Node_context.t -> + Cryptobox.t -> + head_level:int32 -> + Types.proto_parameters -> + (Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [> `Invalid | `Outdated | `Unknown | `Valid] list -- GitLab From 7038ae0ab4d9d3f4b13098e6ea1a1728ad8b6fcd Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Fri, 1 Aug 2025 11:40:32 +0200 Subject: [PATCH 06/16] DAL: Register trap only after cryptographic checks --- src/lib_dal_node/message_validation.ml | 37 ++++++++++++++------------ 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 53bd56a9ffbb..cbdeaaad1b27 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -258,22 +258,6 @@ let triage ctxt head_level proto_parameters batch = else match gossipsub_message_id_validation ctxt proto_parameters id with | `Valid -> - (* We register traps only if the message id is valid, but before - cryptographic check. *) - let store = Node_context.get_store ctxt in - let traps_store = Store.traps store in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 - The [proto_parameters] are those for the last known finalized - level, which may differ from those of the slot level. This - will be an issue when the value of the [traps_fraction] - changes. (We cannot use {!Node_context.get_proto_parameters}, - as it is not monad-free; we'll need to use mapping from levels - to parameters.) *) - Slot_manager.maybe_register_trap - traps_store - ~traps_fraction:proto_parameters.traps_fraction - id - message ; (* The shard is added to the right batch. Since the message id check has already been performed, we have the guarantee that all shards associated to a given @@ -314,6 +298,9 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch in List.iter (fun (index, error) -> result.(index) <- error) not_valid ; + let store = Node_context.get_store ctxt in + let traps_store = Store.traps store in + Batch_tbl.iter (fun {level; slot_index} batch_list -> let shards, proofs = @@ -339,7 +326,23 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch in match res with | Ok () -> - List.iter (fun {index; _} -> result.(index) <- `Valid) batch_list + List.iter + (fun {index; id; message} -> + (* We register traps only if the message is valid. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + The [proto_parameters] are those for the last known finalized + level, which may differ from those of the slot level. This + will be an issue when the value of the [traps_fraction] + changes. (We cannot use {!Node_context.get_proto_parameters}, + as it is not monad-free; we'll need to use mapping from levels + to parameters.) *) + Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + id + message ; + result.(index) <- `Valid) + batch_list | Error err -> let validation_error = string_of_validation_error err in Event.emit_dont_wait__batch_validation_error -- GitLab From 6a437f0c342cc401b203c99ec69764a007d0d26e Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Fri, 1 Aug 2025 11:54:42 +0200 Subject: [PATCH 07/16] DAL: Add a dichotomy treatment in batch --- src/lib_dal_node/message_validation.ml | 153 ++++++++++++++++--------- 1 file changed, 96 insertions(+), 57 deletions(-) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index cbdeaaad1b27..9d660359956f 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -301,62 +301,101 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch let store = Node_context.get_store ctxt in let traps_store = Store.traps store in - Batch_tbl.iter - (fun {level; slot_index} batch_list -> - let shards, proofs = - List.fold_left - (fun (shards, proofs) {message; id; _} -> - let Types.Message.{share; shard_proof} = message in - let Types.Message_id.{shard_index; _} = id in - let shard = Cryptobox.{share; index = shard_index} in - (shard :: shards, shard_proof :: proofs)) - ([], []) - batch_list - in - (* We never add empty list in the to_check_in_batch table. *) - let {id = {commitment; _}; _} = - Option.value_f ~default:(fun () -> assert false) (List.hd batch_list) - in - let res = - Dal_metrics.sample_time - ~sampling_frequency:Constants.shards_verification_sampling_frequency - ~metric_updater:Dal_metrics.update_shards_verification_time - ~to_sample:(fun () -> - Cryptobox.verify_shard_multi cryptobox commitment shards proofs) - in - match res with - | Ok () -> - List.iter - (fun {index; id; message} -> - (* We register traps only if the message is valid. *) - (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 - The [proto_parameters] are those for the last known finalized - level, which may differ from those of the slot level. This - will be an issue when the value of the [traps_fraction] - changes. (We cannot use {!Node_context.get_proto_parameters}, - as it is not monad-free; we'll need to use mapping from levels - to parameters.) *) - Slot_manager.maybe_register_trap - traps_store - ~traps_fraction:proto_parameters.traps_fraction - id - message ; - result.(index) <- `Valid) + (* [treat_batch] does not invalidate a whole slot when a single shard is + invalid, otherwise it would be possible for a byzantine actor to craft + a wrong message with the id of the published slot to prevent the validation + of all the valid shards associated to this slot. *) + let rec treat_batch = function + | [] -> () + | ({level; slot_index}, batch_list) :: remaining_to_treat -> ( + let shards, proofs = + List.fold_left + (fun (shards, proofs) {message; id; _} -> + let Types.Message.{share; shard_proof} = message in + let Types.Message_id.{shard_index; _} = id in + let shard = Cryptobox.{share; index = shard_index} in + (shard :: shards, shard_proof :: proofs)) + ([], []) batch_list - | Error err -> - let validation_error = string_of_validation_error err in - Event.emit_dont_wait__batch_validation_error - ~level - ~slot_index - ~validation_error ; - List.iter (fun {index; _} -> result.(index) <- `Invalid) batch_list - | exception exn -> - (* Don't crash if crypto raised an exception. *) - let validation_error = Printexc.to_string exn in - Event.emit_dont_wait__batch_validation_error - ~level - ~slot_index - ~validation_error ; - List.iter (fun {index; _} -> result.(index) <- `Invalid) batch_list) - to_check_in_batch ; + in + (* We never add empty list in the to_check_in_batch table. *) + let {id = {commitment; _}; _} = + Option.value_f + ~default:(fun () -> assert false) + (List.hd batch_list) + in + let res = + Dal_metrics.sample_time + ~sampling_frequency: + Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi cryptobox commitment shards proofs) + in + match res with + | Ok () -> + List.iter + (fun {index; id; message} -> + (* We register traps only if the message is valid. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + The [proto_parameters] are those for the last known finalized + level, which may differ from those of the slot level. This + will be an issue when the value of the [traps_fraction] + changes. (We cannot use {!Node_context.get_proto_parameters}, + as it is not monad-free; we'll need to use mapping from levels + to parameters.) *) + Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + id + message ; + result.(index) <- `Valid) + batch_list ; + treat_batch remaining_to_treat + | Error err -> + let validation_error = string_of_validation_error err in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + (* Since [verify_multi] does not outputs which shards are the + invalid ones and since we do not want to invalidate legitimate + shards because a byzantine actor added some false shards, we + revalidate all the shards by half recursiveley until we have + identified the invalid shards. *) + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat) + | exception exn -> + (* Don't crash if crypto raised an exception. *) + let validation_error = Printexc.to_string exn in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat)) + in + treat_batch (Batch_tbl.to_seq to_check_in_batch |> List.of_seq) ; Array.to_list result -- GitLab From 3e83245cac412c480627025aa2ab8639b53f5782 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 15:32:06 +0200 Subject: [PATCH 08/16] Gossipsub: Have only one handle_receive_message function which matches on the message_handling field of the state --- src/lib_gossipsub/gossipsub_automaton.ml | 58 +++++++++++------------- src/lib_gossipsub/gossipsub_intf.ml | 32 ++++++------- src/lib_gossipsub/gossipsub_worker.ml | 40 ++++++++-------- 3 files changed, 61 insertions(+), 69 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index 177cfd4c945d..09c05a1e2f02 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -84,6 +84,10 @@ module Make (C : AUTOMATON_CONFIG) : type set_application_score = {peer : Peer.t; score : float} + type message_handling = + | Sequentially + | In_batches of {time_interval : float (* In seconds *)} + (* FIXME not sure subtyping for output is useful. If it is, it is probably for few ouputs and could be removed. *) type _ output = @@ -1192,41 +1196,34 @@ module Make (C : AUTOMATON_CONFIG) : let* direct_peers = get_direct_peers topic in return (Peer.Set.union peers direct_peers) - let handle_sequentially + let handle ~batching_configuration ({sender; topic; message_id; message} as receive_message) : [`Receive_message] output Monad.t = let open Monad.Syntax in let*? peers_in_mesh = initial_message_checks receive_message in - let*? () = check_message_valid sender topic message message_id in - let* () = - put_message_in_cache ~peer:(Some sender) message_id message topic - in - let* () = - update_score sender (fun stats -> - Score.first_message_delivered stats topic) - in - let* to_route = peers_to_route sender peers_in_mesh topic in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 - - Filter out peers from which we already received the message, or an - IHave message? *) - Route_message {to_route} |> return - - let handle_batch ({sender; topic; message_id; _} as receive_message) = - let open Monad.Syntax in - let*? peers_in_mesh = initial_message_checks receive_message in - let*? () = check_message_id_valid sender topic message_id in - let* to_route = peers_to_route sender peers_in_mesh topic in - return (To_include_in_batch (receive_message, to_route)) + match batching_configuration with + | Sequentially -> + let*? () = check_message_valid sender topic message message_id in + let* () = + put_message_in_cache ~peer:(Some sender) message_id message topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + let* to_route = peers_to_route sender peers_in_mesh topic in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 + Filter out peers from which we already received the message, or an + IHave message? *) + Route_message {to_route} |> return + | In_batches _ -> + let* to_route = peers_to_route sender peers_in_mesh topic in + return (To_include_in_batch (receive_message, to_route)) end - let handle_receive_message_sequentially : - receive_message -> [`Receive_message] output Monad.t = - fun receive_message -> Receive_message.handle_sequentially receive_message - - let handle_receive_message_batch : - receive_message -> [`Receive_message] output Monad.t = - fun receive_message -> Receive_message.handle_batch receive_message + let handle_receive_message ~batching_configuration receive_message : + [`Receive_message] output Monad.t = + Receive_message.handle ~batching_configuration receive_message let check_message_batch batch = let unfolded_batch = @@ -1256,8 +1253,7 @@ module Make (C : AUTOMATON_CONFIG) : Monad.map_fold (fun ( ( ({sender; topic; message_id; message} as received_msg), peers ), - result ) - -> + result ) -> match result with | `Valid -> let* () = diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 0191e74ff282..43efecd6f39a 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -636,6 +636,10 @@ module type AUTOMATON = sig type set_application_score = {peer : Peer.t; score : float} + type message_handling = + | Sequentially + | In_batches of {time_interval : float (* In seconds *)} + (** Output produced by one of the actions below. *) type _ output = | Ihave_from_peer_with_low_score : { @@ -839,25 +843,17 @@ module type AUTOMATON = sig on this topic. *) val handle_prune : prune -> [`Prune] monad - (** [handle_receive_message_sequentially received_message] handles a + (** [handle_receive_message ~batching_configuration received_message] handles a [received_message] on the gossip network. The function returns either a - rejection reason for the message or a set of peers to which the (full) - message will be directly forwarded. *) - val handle_receive_message_sequentially : - receive_message -> [`Receive_message] monad - - (** [handle_receive_message_batch callback time_interval received_message] - handles a message received from [sender] on the gossip network. - The function computes a set of peers to which the (full) message will be - directly forwarded. - If no batch is currently in production, it creates a new one. - It appends the received message paired with the computed set of peers to - the current batch. - When a batch exists for [time_interval] seconds, the list of received - messages (with their associated set of peers) is passed to the [callback] - function, which pushes an event [Batch_to_treat batch] on the stream of - events the automaton should handle. *) - val handle_receive_message_batch : receive_message -> [`Receive_message] monad + rejection reason for the message or a set of peers to which: + - the (full) message will be directly forwarded if [batching_configuration] + is [Sequential]. + - the message might be forwarded after validation of the batch it will be + included in. *) + val handle_receive_message : + batching_configuration:message_handling -> + receive_message -> + [`Receive_message] monad (** [publish { topic; message_id; message }] allows to publish a message on the gossip network from the local node. The function returns a set of peers diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 558d70f5dce7..d4faf390a785 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -270,10 +270,6 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end - type message_treatment = - | Sequentially - | In_batches of {time_interval : float (* In seconds *)} - type batch_state = | Pending | Accumulating of (GS.receive_message * Peer.Set.t) list @@ -294,7 +290,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : unknown_validity_messages : Bounded_message_map.t; unreachable_points : int64 Point.Map.t; (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) - message_handling : message_treatment; + message_handling : GS.message_handling; } (** A worker instance is made of its status and state. *) @@ -795,7 +791,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Pending -> [] in current_batch := Pending ; - Stream.push (Batch_to_treat batch) events_stream ; + Stream.push (Process_batch batch) events_stream ; return_unit) ; current_batch := Accumulating [content] | Accumulating prev_contents -> @@ -804,25 +800,26 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function - | Message_with_header {message; topic; message_id} -> ( + | Message_with_header {message; topic; message_id} -> (let receive_message = {GS.sender = from_peer; topic; message_id; message} in - match state.message_handling with - | Sequentially -> - GS.handle_receive_message_sequentially receive_message gossip_state - |> update_gossip_state state - |> handle_receive_message receive_message + let batching_configuration = state.message_handling in + let new_state, output = + GS.handle_receive_message + ~batching_configuration + receive_message + gossip_state + in + (match batching_configuration with + | Sequentially -> () | In_batches {time_interval} -> - let new_state, output = - GS.handle_receive_message_batch receive_message gossip_state - in - batch_accumulator output time_interval state.events_stream ; - update_gossip_state state (new_state, output) - |> handle_receive_message receive_message) + batch_accumulator output time_interval state.events_stream) ; + update_gossip_state state (new_state, output) + |> handle_receive_message receive_message) [@profiler.span_f {verbosity = Notice} - ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]]) + ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] | Graft {topic} -> let graft : GS.graft = {peer = from_peer; topic} in (GS.handle_graft graft gossip_state @@ -928,7 +925,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : match GS.Message_id.valid message.message_id with | `Valid | `Invalid -> let state = {state with unknown_validity_messages} in - GS.handle_receive_message_sequentially message state.gossip_state + GS.handle_receive_message + ~batching_configuration:state.message_handling + message + state.gossip_state |> update_gossip_state state |> handle_receive_message message |> (* Other messages are processed recursively *) -- GitLab From a553a0b827fd14d6a10eba3e7737807d2392c48f Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 15:36:03 +0200 Subject: [PATCH 09/16] Gossipsub: Use span instead of float for time parameter of the batching --- src/lib_dal_node/constants.ml | 2 +- src/lib_dal_node/constants.mli | 2 +- src/lib_gossipsub/gossipsub_automaton.ml | 4 +--- src/lib_gossipsub/gossipsub_intf.ml | 6 ++---- src/lib_gossipsub/gossipsub_worker.ml | 2 +- 5 files changed, 6 insertions(+), 10 deletions(-) diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 97fa3d4a095d..a82b5250f104 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -101,4 +101,4 @@ let traps_cache_size = 50 its final destination, the waiting delay accumulates and may be of a few seconds. It looks fine with 8 blocks of attestation lag and 8 seconds block time but if those values are reduced a lot, this might become an issue. *) -let batch_time_interval = 1.2 +let batch_time_interval = Types.Span.of_float_s 1.2 diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index a9cad9728e8e..9838529a7355 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -81,4 +81,4 @@ val traps_cache_size : int (** The time (in seconds) for which shards are accumulated by the gossipsub automaton before triggering the validation of the batch. *) -val batch_time_interval : float +val batch_time_interval : Types.Span.t diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index 09c05a1e2f02..f6428e40da87 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -84,9 +84,7 @@ module Make (C : AUTOMATON_CONFIG) : type set_application_score = {peer : Peer.t; score : float} - type message_handling = - | Sequentially - | In_batches of {time_interval : float (* In seconds *)} + type message_handling = Sequentially | In_batches of {time_interval : span} (* FIXME not sure subtyping for output is useful. If it is, it is probably for few ouputs and could be removed. *) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 43efecd6f39a..3eebe69ca4ce 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -636,9 +636,7 @@ module type AUTOMATON = sig type set_application_score = {peer : Peer.t; score : float} - type message_handling = - | Sequentially - | In_batches of {time_interval : float (* In seconds *)} + type message_handling = Sequentially | In_batches of {time_interval : span} (** Output produced by one of the actions below. *) type _ output = @@ -1239,7 +1237,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?initial_points:(unit -> Point.t list) -> - ?batching_interval:float -> + ?batching_interval:GS.span -> self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index d4faf390a785..eb8e2f023927 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -784,7 +784,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Pending -> let open Lwt_syntax in Lwt.async (fun () -> - let* () = Lwt_unix.sleep time_interval in + let* () = Lwt_unix.sleep (GS.Span.to_float_s time_interval) in let batch = match !current_batch with | Accumulating batch -> batch -- GitLab From eb5e04a662053896dc5ae67742f67fdd402ce79c Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 16:42:54 +0200 Subject: [PATCH 10/16] Gossipsub: Update unit tests --- .../test/gossipsub_pbt_generators.ml | 20 ++- .../test/gossipsub_pbt_generators.mli | 6 +- src/lib_gossipsub/test/test_gossipsub.ml | 7 +- src/lib_gossipsub/test/test_unit.ml | 153 ++++++++++++++---- 4 files changed, 148 insertions(+), 38 deletions(-) diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml index 875a76210f50..53e52ee10682 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml @@ -250,8 +250,13 @@ struct let set_application_score x = Set_application_score x |> i end - let dispatch : type a. a input -> GS.state -> GS.state * a GS.output = - fun i state -> + let dispatch : + type a. + a input -> + ?batching_configuration:GS.message_handling -> + GS.state -> + GS.state * a GS.output = + fun i ?(batching_configuration = Sequentially) state -> match i with | Add_peer m -> GS.add_peer m state | Remove_peer m -> GS.remove_peer m state @@ -259,7 +264,8 @@ struct | Iwant m -> GS.handle_iwant m state | Graft m -> GS.handle_graft m state | Prune m -> GS.handle_prune m state - | Receive_message m -> GS.handle_receive_message_sequentially m state + | Receive_message m -> + GS.handle_receive_message ~batching_configuration m state | Publish_message m -> GS.publish_message m state | Heartbeat -> GS.heartbeat state | Join m -> GS.join m state @@ -465,7 +471,7 @@ struct (* Construction of the trace generator. *) (* Evaluate a [raw] on an initial state yields a trace. *) - let raw_to_trace state raw = + let raw_to_trace ?batching_configuration state raw = let open M in let seq = SeqM.M.unfold next raw in let* _, _, rev_trace = @@ -474,7 +480,7 @@ struct match event with | Input i -> Time.set time ; - let state', output = dispatch i state in + let state', output = dispatch ?batching_configuration i state in let step = Transition {time; input = i; state; state'; output} in @@ -501,10 +507,10 @@ struct let seq = SeqM.M.unfold Fragment.next raw in SeqM.fold_left (fun acc event -> f event acc) init seq - let run state fragment : trace t = + let run ?batching_configuration state fragment : trace t = let open M in let* raw = Fragment.raw_generator fragment in - Fragment.raw_to_trace state raw + Fragment.raw_to_trace ?batching_configuration state raw let check_fold (type e inv) (f : transition -> inv -> (inv, e) result) init trace : (unit, e * trace) result = diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli index c0ff8fd20f8b..649c2cacb945 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli @@ -226,7 +226,11 @@ val fold : Fragment.t -> (event -> 'a -> 'a) -> 'a -> 'a t (** [run s0 f] constructs a {!trace} generator by running the gossipsub automaton on inputs generated from [f]. *) -val run : GS.state -> Fragment.t -> trace t +val run : + ?batching_configuration:GS.message_handling -> + GS.state -> + Fragment.t -> + trace t (** [check_fold step inv trace] folds the predicate [step] with initial invariant [inv] on [trace]. If the [step] fails with [Error e], [check_predicate] returns [Error (e, prefix)] diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml index d6038b1fe3f7..67a993886cbf 100644 --- a/src/lib_gossipsub/test/test_gossipsub.ml +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -50,7 +50,12 @@ let rng = let () = let open Default_limits in let default_limits = default_limits () in - Test_unit.register rng default_limits parameters ; + Test_unit.register + ~message_handlings: + [Sequentially; In_batches {time_interval = Milliseconds.of_int_ms 100}] + rng + default_limits + parameters ; Test_integration_worker.register rng default_limits parameters ; Test_pbt.register rng default_limits parameters ; Test_message_cache.register () diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 6c57c479d480..a283a47de712 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -75,7 +75,10 @@ let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state = view.message_cache with | None -> - Test.fail "Expected entry in message cache for message id %d" message_id + Test.fail + ~__LOC__ + "Expected entry in message cache for message id %d" + message_id | Some (_message_cache_state, message, _access) -> Check.( (message = expected_message) @@ -578,10 +581,18 @@ let test_fanout rng limits parameters = (** Tests that receiving a message for a subscribed topic: - Returns peers to publish to. - Inserts message into message cache. *) -let test_receiving_message rng limits parameters = +let test_receiving_message ~batching_configuration rng limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message" + ~title: + ("Gossipsub: Test receiving message" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receiving_message"] @@ fun () -> let topic = "test" in @@ -602,7 +613,8 @@ let test_receiving_message rng limits parameters = let message_id = 0 in (* Receive a message for a joined topic. *) let state, output = - GS.handle_receive_message_sequentially + GS.handle_receive_message + ~batching_configuration {sender; topic; message_id; message} state in @@ -619,6 +631,18 @@ let test_receiving_message rng limits parameters = int ~error_msg:"Expected %R, got %L" ~__LOC__) ; + (* In batch mode, the message is added to cache only once the batch has been + treated. *) + let state = + match output with + | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) + | _ -> + Test.fail + ~__LOC__ + "Output should have been Route_message, Already_received or \ + To_include_in_batch" + in (* [message_id] should be added to the message cache. *) assert_in_message_cache ~__LOC__ @@ -649,10 +673,19 @@ let test_receiving_message rng limits parameters = It might be possible to simplify this setup (for instance, by having just one peer in the mesh?). *) -let test_message_duplicate_score_bug rng _limits parameters = +let test_message_duplicate_score_bug ~batching_configuration rng _limits + parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test message duplicate score bug" + ~title: + ("Gossipsub: Test message duplicate score bug" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "duplicate_score_bug"] @@ fun () -> (* Ignore decays, for simplicity; they are used when refreshing score during @@ -691,13 +724,14 @@ let test_message_duplicate_score_bug rng _limits parameters = let message = "some_data" in let state, _ = GS.handle_graft {peer = sender; topic} state in (* Receive a message for a joined topic. *) - let state, output = - GS.handle_receive_message_sequentially + let state, output1 = + GS.handle_receive_message + ~batching_configuration {sender; topic; message_id = 0; message} state in let () = - match output with + match output1 with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." @@ -705,11 +739,30 @@ let test_message_duplicate_score_bug rng _limits parameters = in (* Send one more message so that [sender]'s P2 score is greater than its P3 score. In this way its score is positive, and [sender] is not pruned. *) - let state, _ = - GS.handle_receive_message_sequentially + let state, output2 = + GS.handle_receive_message + ~batching_configuration {sender; topic; message_id = 1; message} state in + (* If we are in batch mode, the batch containing both messages as to be + treated. *) + let state = + match batching_configuration with + | Sequentially -> state + | In_batches _ -> + fst + (GS.apply_batch + (List.map + (function + | GS.To_include_in_batch x -> x + | _ -> + Test.fail + ~__LOC__ + "In batch mode, received message should be in batch.") + [output1; output2]) + state) + in (* Send the first message again. *) let per_topic_score_limits = (GS.Score.Internal_for_tests.get_topic_params limits.score_limits) topic @@ -722,7 +775,8 @@ let test_message_duplicate_score_bug rng _limits parameters = let state, _ = GS.heartbeat state in assert_mesh_inclusion ~__LOC__ ~topic ~peer:sender ~is_included:true state ; let state, output = - GS.handle_receive_message_sequentially + GS.handle_receive_message + ~batching_configuration {sender; topic; message_id = 0; message} state in @@ -757,10 +811,19 @@ let test_message_duplicate_score_bug rng _limits parameters = (** Tests that we do not route the message when receiving a message for an unsubscribed topic. *) -let test_receiving_message_for_unsusbcribed_topic rng limits parameters = +let test_receiving_message_for_unsusbcribed_topic ~batching_configuration rng + limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message for unsubscribed topic" + ~title: + ("Gossipsub: Test receiving message for unsubscribed topic" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receive_message"; "fanout"] @@ fun () -> let topic = "topic" in @@ -783,7 +846,8 @@ let test_receiving_message_for_unsusbcribed_topic rng limits parameters = let message = "some data" in let message_id = 0 in let _state, output = - GS.handle_receive_message_sequentially + GS.handle_receive_message + ~batching_configuration {sender; topic; message_id; message} state in @@ -2129,10 +2193,18 @@ let test_scoring_p1 rng _limits parameters = (** Test for P2 (First Message Deliveries). Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3247 *) -let test_scoring_p2 rng _limits parameters = +let test_scoring_p2 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P2" + ~title: + ("Gossipsub: Scoring P2" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p2"] @@ fun () -> let first_message_deliveries_weight = 2.0 in @@ -2162,16 +2234,19 @@ let test_scoring_p2 rng _limits parameters = let peers = Array.of_list peers in let receive_message ~__LOC__ peer message_id message state = let state, output = - GS.handle_receive_message_sequentially + GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in match output with | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) | _ -> Test.fail ~__LOC__ - "Output should have been Route_message or Already_received" + "Output should have been Route_message, Already_received or \ + To_include_in_batch" in (* peer 0 delivers message first *) let message_id, message = gen_message () in @@ -2240,10 +2315,18 @@ let test_scoring_p2 rng _limits parameters = Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3895 and https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3979 *) -let test_scoring_p4 rng _limits parameters = +let test_scoring_p4 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P4" + ~title: + ("Gossipsub: Scoring P4" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p4"] @@ fun () -> let invalid_message_deliveries_weight = -2.0 in @@ -2285,7 +2368,8 @@ let test_scoring_p4 rng _limits parameters = (fun state () -> let message_id, message = gen_message () in let state, _ = - GS.handle_receive_message_sequentially + GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in @@ -2440,15 +2524,12 @@ let test_scoring_p7_grafts_before_backoff rng _limits parameters = (* TODO: https://gitlab.com/tezos/tezos/-/issues/5293 Add test for the described test scenario *) -let register rng limits parameters = +let register ~message_handlings rng limits parameters = test_ignore_graft_from_unknown_topic rng limits parameters ; test_handle_received_subscriptions rng limits parameters ; test_join_adds_peers_to_mesh rng limits parameters ; test_join_adds_fanout_to_mesh rng limits parameters ; test_publish_without_flood_publishing rng limits parameters ; - test_receiving_message_for_unsusbcribed_topic rng limits parameters ; - test_receiving_message rng limits parameters ; - test_message_duplicate_score_bug rng limits parameters ; test_fanout rng limits parameters ; test_handle_graft_for_joined_topic rng limits parameters ; test_handle_graft_for_not_joined_topic rng limits parameters ; @@ -2475,7 +2556,21 @@ let register rng limits parameters = test_ignore_too_many_messages_in_ihave rng limits parameters ; test_heartbeat_scenario rng limits parameters ; test_scoring_p1 rng limits parameters ; - test_scoring_p2 rng limits parameters ; - test_scoring_p4 rng limits parameters ; test_scoring_p5 rng limits parameters ; - test_scoring_p7_grafts_before_backoff rng limits parameters + test_scoring_p7_grafts_before_backoff rng limits parameters ; + List.iter + (fun batching_configuration -> + test_receiving_message ~batching_configuration rng limits parameters ; + test_message_duplicate_score_bug + ~batching_configuration + rng + limits + parameters ; + test_receiving_message_for_unsusbcribed_topic + ~batching_configuration + rng + limits + parameters ; + test_scoring_p2 ~batching_configuration rng limits parameters ; + test_scoring_p4 ~batching_configuration rng limits parameters) + message_handlings -- GitLab From 0a377e882fd4b0166920c4555c74b6dcced13630 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 18:39:29 +0200 Subject: [PATCH 11/16] DAL: Add option to set batching of shards for verification --- src/lib_agnostic_baker/commands.ml | 9 +- src/lib_dal_node/cli.ml | 39 ++- src/lib_dal_node/cli.mli | 6 + src/lib_dal_node/configuration_file.ml | 330 ++++++++++++++---------- src/lib_dal_node/configuration_file.mli | 12 + src/lib_dal_node/constants.ml | 12 - src/lib_dal_node/constants.mli | 4 - src/lib_dal_node/daemon.ml | 28 +- 8 files changed, 272 insertions(+), 168 deletions(-) diff --git a/src/lib_agnostic_baker/commands.ml b/src/lib_agnostic_baker/commands.ml index 08096dbf989b..3e87a264b7fe 100644 --- a/src/lib_agnostic_baker/commands.ml +++ b/src/lib_agnostic_baker/commands.ml @@ -124,8 +124,10 @@ module Dal = struct let ignore_topics = arg_list_to_clic ignore_topics_arg + let batching_configuration = arg_to_clic batching_configuration_arg + let args = - Tezos_clic.args24 + Tezos_clic.args25 data_dir config_file rpc_addr @@ -150,6 +152,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration let commands = let open Tezos_clic in @@ -184,7 +187,8 @@ module Dal = struct verbose, ignore_l1_config_peers, disable_amplification, - ignore_topics ) + ignore_topics, + batching_configuration ) _cctxt -> let attester_profile = Option.value ~default:[] attester_profile in @@ -218,6 +222,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration in match options with | Ok options -> Cli.run cmd options diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index fbb4db92f842..a74ac6442829 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -573,6 +573,30 @@ module Term = struct let ignore_topics = arg_list_to_cmdliner ignore_topics_arg + let batching_configuration_arg = + let open Configuration_file in + let pp fmt = function + | Disabled -> Format.fprintf fmt "disabled" + | Enabled {time_interval} -> Format.fprintf fmt "%d" time_interval + in + let parse = function + | "disabled" -> Ok Disabled + | s -> ( + try Ok (Enabled {time_interval = int_of_string s}) + with Failure _ -> Error "Unrecognized int") + in + make_arg + ~parse + ~doc: + "The time (in milliseconds) for which shards are accumulated by the \ + gossipsub automaton before triggering the validation of the batch. \ + \"disabled\" to deactivate the batching of shards." + ~placeholder:"INT|\"disabled\"" + ~pp + "batching-time-interval" + + let batching_configuration = arg_to_cmdliner batching_configuration_arg + let term process = Cmdliner.Term.( ret @@ -582,7 +606,7 @@ module Term = struct $ operator_profile $ observer_profile $ bootstrap_profile $ peers $ history_mode $ service_name $ service_namespace $ fetch_trusted_setup $ disable_shard_validation $ verbose $ ignore_l1_config_peers - $ disable_amplification $ ignore_topics)) + $ disable_amplification $ ignore_topics $ batching_configuration)) end type t = Run | Config_init | Config_update | Debug_print_store_schemas @@ -748,13 +772,15 @@ type options = { ignore_l1_config_peers : bool; disable_amplification : bool; ignore_topics : Signature.public_key_hash list; + batching_configuration : Configuration_file.batching_configuration option; } let cli_options_to_options data_dir config_file rpc_addr expected_pow listen_addr public_addr endpoint slots_backup_uris trust_slots_backup_uris metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation - verbose ignore_l1_config_peers disable_amplification ignore_topics = + verbose ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration = let open Result_syntax in let profile = Controller_profiles.make ~attesters ~operators ?observers () in let* profile = @@ -800,6 +826,7 @@ let cli_options_to_options data_dir config_file rpc_addr expected_pow ignore_l1_config_peers; disable_amplification; ignore_topics; + batching_configuration; } let merge_experimental_features _ _configuration = () @@ -828,6 +855,7 @@ let merge ignore_l1_config_peers; disable_amplification; ignore_topics = _; + batching_configuration; } configuration = let profile = match profile with @@ -876,6 +904,10 @@ let merge configuration.ignore_l1_config_peers || ignore_l1_config_peers; disable_amplification = configuration.disable_amplification || disable_amplification; + batching_configuration = + Option.value + ~default:configuration.batching_configuration + batching_configuration; } let wrap_with_error main_promise = @@ -977,7 +1009,7 @@ let commands = metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation verbose ignore_l1_config_peers - disable_amplification ignore_pkhs = + disable_amplification ignore_pkhs batching_configuration = match cli_options_to_options data_dir @@ -1004,6 +1036,7 @@ let commands = ignore_l1_config_peers disable_amplification ignore_pkhs + batching_configuration with | Ok options -> main_run subcommand options | Error msg -> `Error msg diff --git a/src/lib_dal_node/cli.mli b/src/lib_dal_node/cli.mli index 809e48185541..c1e746cfa71d 100644 --- a/src/lib_dal_node/cli.mli +++ b/src/lib_dal_node/cli.mli @@ -101,6 +101,8 @@ module Term : sig val verbose_switch : switch val ignore_topics_arg : Signature.public_key_hash arg_list + + val batching_configuration_arg : Configuration_file.batching_configuration arg end (** {2 Command-line options} *) @@ -154,6 +156,9 @@ type options = { (** Disable amplification. Default value is false. *) ignore_topics : Signature.public_key_hash list; (** Do not distribute shards of these pkhs. *) + batching_configuration : Configuration_file.batching_configuration option; + (** The configuration used for batching verification of received shards + via GossipSub to save cryptographic computation. *) } (** Subcommands that can be used by the DAL node. In the future this type @@ -186,6 +191,7 @@ val cli_options_to_options : bool -> bool -> Signature.public_key_hash list -> + Configuration_file.batching_configuration option -> (options, bool * string) result val run : t -> options -> unit tzresult Lwt.t diff --git a/src/lib_dal_node/configuration_file.ml b/src/lib_dal_node/configuration_file.ml index 03ed6c5dd56a..23bade49cb5e 100644 --- a/src/lib_dal_node/configuration_file.ml +++ b/src/lib_dal_node/configuration_file.ml @@ -33,6 +33,8 @@ type neighbor = {addr : string; port : int} type history_mode = Rolling of {blocks : [`Auto | `Some of int]} | Full +type batching_configuration = Disabled | Enabled of {time_interval : int} + type experimental_features = unit let history_mode_encoding = @@ -64,6 +66,27 @@ let history_mode_encoding = (fun () -> Full); ] +let batching_configuration_encoding = + let open Data_encoding in + union + [ + case + ~title:"disabled" + ~description:"" + (Tag 0) + unit + (function Disabled -> Some () | Enabled _ -> None) + (fun () -> Disabled); + case + ~title:"enabled" + ~description:"" + (Tag 1) + int31 + (function + | Disabled -> None | Enabled {time_interval} -> Some time_interval) + (fun time_interval -> Enabled {time_interval}); + ] + type t = { data_dir : string; rpc_addr : P2p_point.Id.t; @@ -85,6 +108,7 @@ type t = { verbose : bool; ignore_l1_config_peers : bool; disable_amplification : bool; + batching_configuration : batching_configuration; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".tezos-dal-node" @@ -127,6 +151,19 @@ let default_experimental_features = () let default_fetch_trusted_setup = true +(* By default, when a shard is received, we wait for 0.1 seconds for other + shards of the same commitment before launching the cryptographic validation + of the shards. + Since there shards are supposed to be received several levels in advance, + the risk that this 0.1 second delay makes the validation happen too late + is very low. + It also slows down gossiping a bit, since messages are advertised only after + validation, so if a message has to go through several nodes before reaching + its final destination, the waiting delay accumulates and may be of a few + seconds. It looks fine with 8 blocks of attestation lag and 6 seconds block + time but if those values are reduced a lot, this might become an issue.*) +let default_batching_configuration = Enabled {time_interval = 100} + let default = { data_dir = default_data_dir; @@ -149,6 +186,7 @@ let default = verbose = false; ignore_l1_config_peers = false; disable_amplification = false; + batching_configuration = default_batching_configuration; } let uri_encoding : Uri.t Data_encoding.t = @@ -192,47 +230,50 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; } -> - ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) )) - (fun ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) ) -> + ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration )) + (fun ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration ) -> { data_dir; rpc_addr; @@ -254,109 +295,118 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; }) (merge_objs - (obj10 - (dft - "data-dir" - ~description:"Location of the data dir" - string - default_data_dir) - (dft - "rpc-addr" - ~description:"RPC address" - P2p_point.Id.encoding - default_rpc_addr) - (dft - "net-addr" - ~description:"P2P address of this node" - P2p_point.Id.encoding - default_listen_addr) - (dft - "public-addr" - ~description:"P2P address of this node" - P2p_point.Id.encoding - default_listen_addr) - (dft - "peers" - ~description:"P2P addresses of remote peers" - (list string) - default_peers) - (dft - "expected-pow" - ~description:"Expected P2P identity's PoW" - float - default_expected_pow) - (dft - "endpoint" - ~description:"The Tezos node endpoint" - uri_encoding - default_endpoint) - (dft - "slots_backup_uris" - ~description:"Optional HTTP endpoints to fetch missing slots from." - (list uri_encoding) - []) - (dft - "trust_slots_backup_uris" - ~description: - "Whether to trust the data downlaoded from the provided HTTP \ - backup URIs." - bool - false) - (dft - "metrics-addr" - ~description:"The point for the DAL node metrics server" - (Encoding.option P2p_point.Id.encoding) - None)) - (obj10 - (dft - "history_mode" - ~description:"The history mode for the DAL node" - history_mode_encoding - default_history_mode) - (dft - "profiles" - ~description:"The Octez DAL node profiles" - Profile_manager.unresolved_encoding - Profile_manager.Empty) - (req "version" ~description:"The configuration file version" int31) - (dft - "service_name" - ~description:"Name of the service" - Data_encoding.string - default.service_name) - (dft - "service_namespace" - ~description:"Namespace for the service" - Data_encoding.string - default.service_namespace) - (dft - "experimental_features" - ~description:"Experimental features" - experimental_features_encoding - default_experimental_features) - (dft - "fetch_trusted_setup" - ~description:"Install trusted setup" - bool - true) - (dft - "verbose" - ~description: - "Whether to emit details about frequent logging events" - bool - default.verbose) - (dft - "ignore_l1_config_peers" - ~description:"Ignore the boot(strap) peers provided by L1" - bool - default.ignore_l1_config_peers) + (merge_objs + (obj10 + (dft + "data-dir" + ~description:"Location of the data dir" + string + default_data_dir) + (dft + "rpc-addr" + ~description:"RPC address" + P2p_point.Id.encoding + default_rpc_addr) + (dft + "net-addr" + ~description:"P2P listening address of this node" + P2p_point.Id.encoding + default_listen_addr) + (dft + "public-addr" + ~description:"P2P public address of this node" + P2p_point.Id.encoding + default_public_addr) + (dft + "peers" + ~description:"P2P addresses of remote peers" + (list string) + default_peers) + (dft + "expected-pow" + ~description:"Expected P2P identity's PoW" + float + default_expected_pow) + (dft + "endpoint" + ~description:"The Tezos node endpoint" + uri_encoding + default_endpoint) + (dft + "slots_backup_uris" + ~description: + "Optional HTTP endpoints to fetch missing slots from." + (list uri_encoding) + []) + (dft + "trust_slots_backup_uris" + ~description: + "Whether to trust the data downlaoded from the provided HTTP \ + backup URIs." + bool + false) + (dft + "metrics-addr" + ~description:"The point for the DAL node metrics server" + (Encoding.option P2p_point.Id.encoding) + None)) + (obj10 + (dft + "history_mode" + ~description:"The history mode for the DAL node" + history_mode_encoding + default_history_mode) + (dft + "profiles" + ~description:"The Octez DAL node profiles" + Profile_manager.unresolved_encoding + Profile_manager.Empty) + (req "version" ~description:"The configuration file version" int31) + (dft + "service_name" + ~description:"Name of the service" + Data_encoding.string + default.service_name) + (dft + "service_namespace" + ~description:"Namespace for the service" + Data_encoding.string + default.service_namespace) + (dft + "experimental_features" + ~description:"Experimental features" + experimental_features_encoding + default_experimental_features) + (dft + "fetch_trusted_setup" + ~description:"Install trusted setup" + bool + true) + (dft + "verbose" + ~description: + "Whether to emit details about frequent logging events" + bool + default.verbose) + (dft + "ignore_l1_config_peers" + ~description:"Ignore the boot(strap) peers provided by L1" + bool + default.ignore_l1_config_peers) + (dft + "disable_amplification" + ~description:"Disable amplification" + bool + default.disable_amplification))) + (obj1 (dft - "disable_amplification" - ~description:"Disable amplification" - bool - default.disable_amplification))) + "batching_configuration" + ~description:"Set the batching delay for shard verification" + batching_configuration_encoding + default.batching_configuration))) type error += DAL_node_unable_to_write_configuration_file of string diff --git a/src/lib_dal_node/configuration_file.mli b/src/lib_dal_node/configuration_file.mli index 920aa5f6bca2..92aa24545dda 100644 --- a/src/lib_dal_node/configuration_file.mli +++ b/src/lib_dal_node/configuration_file.mli @@ -34,6 +34,15 @@ type history_mode = profile. *) | Full (** [Full] keeps the shards forever *) +(** The configuration of the validation of shards in batch mode.*) +type batching_configuration = + | Disabled + (** [Disabled] enforces the validation of shards one by one at reception time. *) + | Enabled of {time_interval : int} + (** [Enabled {time_interval}] accumulates messages received during + [time_interval] milliseconds and verifies all shards related to the + same commitment in the same batch in one pass. *) + (** Configuration settings for experimental features, with no backward compatibility guarantees. *) type experimental_features = unit @@ -73,6 +82,9 @@ type t = { ignore_l1_config_peers : bool; (** Ignore the boot(strap) peers provided by L1. *) disable_amplification : bool; (** Disable amplification. *) + batching_configuration : batching_configuration; + (** The configuration of the batching of the shards. + The default is [Enabled{time_interval=100}]. *) } (** [default] is the default configuration. *) diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index a82b5250f104..8f9f353c6e98 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -90,15 +90,3 @@ let bootstrap_dns_refresh_delay = 300. acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) let traps_cache_size = 50 - -(* When a shard is received, we wait for 1 second for other shards before - launching the cryptographic validation of the shards. - Since there shards are supposed to be received several levels in advance, - the risk that this 1.2 second delay makes the validation happen too late - is very low. - It also slows down gossiping a bit, since messages are advertised only after - validation, so if a message has to go through several nodes before reaching - its final destination, the waiting delay accumulates and may be of a few - seconds. It looks fine with 8 blocks of attestation lag and 8 seconds block - time but if those values are reduced a lot, this might become an issue. *) -let batch_time_interval = Types.Span.of_float_s 1.2 diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 9838529a7355..014c228644bb 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -78,7 +78,3 @@ val bootstrap_dns_refresh_delay : float is acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) val traps_cache_size : int - -(** The time (in seconds) for which shards are accumulated by the gossipsub - automaton before triggering the validation of the batch. *) -val batch_time_interval : Types.Span.t diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 2f8083a16a6b..efc84d6da3af 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -434,11 +434,18 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file {peer_id = identity.peer_id; maybe_reachable_point = public_addr} in let gs_worker = + let batching_interval = + match config.batching_configuration with + | Disabled -> None + | Enabled {time_interval} -> + let time_in_second = float_of_int time_interval /. 1000. in + Some (Types.Span.of_float_s time_in_second) + in Gossipsub.Worker.( make ~initial_points:get_initial_points ~events_logging:(Logging.event ~verbose:config.verbose) - ~batching_interval:Constants.batch_time_interval + ?batching_interval ~self rng limits @@ -529,12 +536,19 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Node_context.warn_if_attesters_not_delegates ctxt profile | _ -> return_unit in - Gossipsub.Worker.Validate_message_hook.set_batch - (Message_validation.gossipsub_batch_validation - ctxt - cryptobox - ~head_level - proto_parameters) ; + let () = + match config.batching_configuration with + | Enabled _ -> + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level + proto_parameters) + | Disabled -> () + in + (* Even if the batch validation is activated, one has to register a per message + validation for the validation of message id. *) Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt -- GitLab From 9237f53f7966923a11e08258d696c9633555aaec Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 19:39:54 +0200 Subject: [PATCH 12/16] DAL: allow to disable shards validation in batch mode --- src/lib_dal_node/message_validation.ml | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 9d660359956f..9c383032beaf 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -325,12 +325,18 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch (List.hd batch_list) in let res = - Dal_metrics.sample_time - ~sampling_frequency: - Constants.shards_verification_sampling_frequency - ~metric_updater:Dal_metrics.update_shards_verification_time - ~to_sample:(fun () -> - Cryptobox.verify_shard_multi cryptobox commitment shards proofs) + if Node_context.get_disable_shard_validation ctxt then Ok () + else + Dal_metrics.sample_time + ~sampling_frequency: + Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi + cryptobox + commitment + shards + proofs) in match res with | Ok () -> -- GitLab From 9855735d584b85ec9e7328f166250b16d68bc86b Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 19:39:54 +0200 Subject: [PATCH 13/16] DAL: add open telemetry profiling for cryptography in batch mode --- src/lib_dal_node/message_validation.ml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 9c383032beaf..e76ca487a8b2 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -337,6 +337,11 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch commitment shards proofs) + [@profiler.wrap_f + {driver_ids = [Opentelemetry]} + (Opentelemetry_helpers.trace_slot + ~name:"verify_shards" + Types.Slot_id.{slot_level = level; slot_index})] in match res with | Ok () -> -- GitLab From d43b264d14326e87851b1ff72f2ad425ed121231 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 19:39:54 +0200 Subject: [PATCH 14/16] Stdlib: Add lexicographic comparison of arrays --- src/lib_stdlib/compare.ml | 16 ++++++++++++++++ src/lib_stdlib/compare.mli | 2 ++ 2 files changed, 18 insertions(+) diff --git a/src/lib_stdlib/compare.ml b/src/lib_stdlib/compare.ml index e041882af1ff..741bd9a3bba1 100644 --- a/src/lib_stdlib/compare.ml +++ b/src/lib_stdlib/compare.ml @@ -91,6 +91,22 @@ module List (P : COMPARABLE) = Make (struct if hd <> 0 then hd else compare xs ys end) +module Array (P : COMPARABLE) = Make (struct + type t = P.t array + + let compare a b = + let len_a = Array.length a in + let len_b = Array.length b in + let rec loop i = + if i = len_a then if i = len_b then 0 else -1 + else if i = len_b then 1 + else + let c = P.compare a.(i) b.(i) in + if c <> 0 then c else loop (i + 1) + in + loop 0 +end) + module Option (P : COMPARABLE) = Make (struct type t = P.t option diff --git a/src/lib_stdlib/compare.mli b/src/lib_stdlib/compare.mli index 424b2709a472..4d851ffa7b04 100644 --- a/src/lib_stdlib/compare.mli +++ b/src/lib_stdlib/compare.mli @@ -151,6 +151,8 @@ module Q : S with type t = Q.t module List (P : COMPARABLE) : S with type t = P.t list +module Array (P : COMPARABLE) : S with type t = P.t array + module Option (P : COMPARABLE) : S with type t = P.t option module Result (Ok : COMPARABLE) (Error : COMPARABLE) : -- GitLab From a78a1cc9af14979502edb1408301fc8f08bac820 Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 19:39:55 +0200 Subject: [PATCH 15/16] DAL/Gossipsub: Make Message module COMPARABLE --- src/lib_crypto_dal/cryptobox.ml | 4 +++- src/lib_crypto_dal/cryptobox.mli | 8 ++++++-- src/lib_dal_node_services/types.ml | 10 ++++++++++ src/lib_dal_node_services/types.mli | 2 ++ src/lib_gossipsub/gossipsub_intf.ml | 14 ++++++++++++-- 5 files changed, 33 insertions(+), 5 deletions(-) diff --git a/src/lib_crypto_dal/cryptobox.ml b/src/lib_crypto_dal/cryptobox.ml index e3dd8140adcb..ae8267f986f6 100644 --- a/src/lib_crypto_dal/cryptobox.ml +++ b/src/lib_crypto_dal/cryptobox.ml @@ -148,7 +148,9 @@ module Inner = struct type page = bytes - type share = Scalar.t array + module Share = Compare.Array (Scalar) + + type share = Share.t type shard = {index : int; share : share} diff --git a/src/lib_crypto_dal/cryptobox.mli b/src/lib_crypto_dal/cryptobox.mli index df89a5429615..471f975ae2ed 100644 --- a/src/lib_crypto_dal/cryptobox.mli +++ b/src/lib_crypto_dal/cryptobox.mli @@ -199,14 +199,18 @@ val string_of_commit_error : string (** A portion of the data represented by a polynomial. *) -type share +module Share : Compare.COMPARABLE + +type share = Share.t (** A shard is share with its index (see {!val:shards_from_polynomial}). *) type shard = {index : int; share : share} (** A proof that a shard belongs to some commitment. *) -type shard_proof +module Proof : Compare.COMPARABLE + +type shard_proof = Proof.t module Verifier : VERIFIER diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 4a44bb40734d..4de2c3ad83cc 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -178,6 +178,16 @@ module Message = struct (obj2 (req "share" Cryptobox.share_encoding) (req "shard_proof" Cryptobox.shard_proof_encoding)) + + module Cmp = struct + type nonrec t = t + + let compare t1 t2 = + Compare.or_else (Cryptobox.Share.compare t1.share t2.share) (fun () -> + Cryptobox.Proof.compare t1.shard_proof t2.shard_proof) + end + + include Compare.Make (Cmp) end module Peer = struct diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index e2f3249afb29..4c668b155f7f 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -116,6 +116,8 @@ module Message : sig include PRINTABLE with type t := t include ENCODABLE with type t := t + + include COMPARABLE with type t := t end (** A peer from the point of view of gossipsub. *) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 3eebe69ca4ce..6045c6ccf61e 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -53,7 +53,11 @@ module type AUTOMATON_SUBCONFIG = sig end module Message : sig - include PRINTABLE + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t (** [valid] performs an application layer-level validity check on a message id and a message if given. @@ -564,7 +568,13 @@ module type AUTOMATON = sig end (** Module for message *) - module Message : PRINTABLE + module Message : sig + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t + end (** Module for time *) module Time : PRINTABLE -- GitLab From 7c6dee38e55f5c3980996f57ecc8c982994b2d7c Mon Sep 17 00:00:00 2001 From: Guillaume Genestier Date: Wed, 17 Sep 2025 19:41:13 +0200 Subject: [PATCH 16/16] Gossipsub: Use map instead of list for batch to avoid treating twice the same shard --- src/lib_gossipsub/gossipsub_worker.ml | 31 +++++++++++++++++++++------ 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index eb8e2f023927..0ca396a777f7 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -270,9 +270,17 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end - type batch_state = - | Pending - | Accumulating of (GS.receive_message * Peer.Set.t) list + module ReceiveMsgMap = Map.Make (struct + type t = GS.receive_message + + let compare (m1 : t) (m2 : t) = + Compare.or_else + (Compare.or_else (Topic.compare m1.topic m2.topic) (fun () -> + Message_id.compare m1.message_id m2.message_id)) + (fun () -> Message.compare m1.message m2.message) + end) + + type batch_state = Pending | Accumulating of Peer.Set.t ReceiveMsgMap.t (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to @@ -779,7 +787,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let current_batch = ref Pending in fun output time_interval events_stream -> match output with - | GS.To_include_in_batch content -> ( + | GS.To_include_in_batch (msg, peers) -> ( match !current_batch with | Pending -> let open Lwt_syntax in @@ -787,15 +795,24 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let* () = Lwt_unix.sleep (GS.Span.to_float_s time_interval) in let batch = match !current_batch with - | Accumulating batch -> batch + | Accumulating batch -> ReceiveMsgMap.bindings batch | Pending -> [] in current_batch := Pending ; Stream.push (Process_batch batch) events_stream ; return_unit) ; - current_batch := Accumulating [content] + let content_map = ReceiveMsgMap.singleton msg peers in + current_batch := Accumulating content_map | Accumulating prev_contents -> - current_batch := Accumulating (content :: prev_contents)) + let new_contents = + ReceiveMsgMap.update + msg + (function + | None -> Some peers + | Some prev_peers -> Some (Peer.Set.inter peers prev_peers)) + prev_contents + in + current_batch := Accumulating new_contents) | _ -> () (** Handling messages received from the P2P network. *) -- GitLab