diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index 23dfb3375dd6fe23af87a220b95a6ca948dbf9a5..5298b4471fad637d77dd185d9799529d4f68e661 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -583,11 +583,17 @@ let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash in return_unit in + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level:level + proto_parameters) ; Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - level + ~head_level:level proto_parameters) ; let*! () = remove_old_level_stored_data proto_parameters ctxt level in let* () = diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 8f9f353c6e9821bbc8061bc9d8254eec512fccfa..a82b5250f10442a68f693432a1dd4ff7ba6c53b1 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -90,3 +90,15 @@ let bootstrap_dns_refresh_delay = 300. acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) let traps_cache_size = 50 + +(* When a shard is received, we wait for 1 second for other shards before + launching the cryptographic validation of the shards. + Since there shards are supposed to be received several levels in advance, + the risk that this 1.2 second delay makes the validation happen too late + is very low. + It also slows down gossiping a bit, since messages are advertised only after + validation, so if a message has to go through several nodes before reaching + its final destination, the waiting delay accumulates and may be of a few + seconds. It looks fine with 8 blocks of attestation lag and 8 seconds block + time but if those values are reduced a lot, this might become an issue. *) +let batch_time_interval = Types.Span.of_float_s 1.2 diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 014c228644bb05534a08914135f7a86d09abe7aa..9838529a7355d0ff1c170e37860a37e6847681ba 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -78,3 +78,7 @@ val bootstrap_dns_refresh_delay : float is acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) val traps_cache_size : int + +(** The time (in seconds) for which shards are accumulated by the gossipsub + automaton before triggering the validation of the batch. *) +val batch_time_interval : Types.Span.t diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index ea48b3cf9a846123d740b2e11506909de29d3190..8a19d665e4aa6cdd41f758ef0b6e8b4bdcb7aa9b 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -440,6 +440,7 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file make ~initial_points:get_initial_points ~events_logging:(Logging.event ~verbose:config.verbose) + ~batching_interval:Constants.batch_time_interval ~self rng limits @@ -530,11 +531,17 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Node_context.warn_if_attesters_not_delegates ctxt profile | _ -> return_unit in + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level + proto_parameters) ; Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - head_level + ~head_level proto_parameters) ; let is_prover_profile = Profile_manager.is_prover_profile profile_ctxt in (* Initialize amplificator if in prover profile. diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 8729259fb86f4d32c8e52799db97bcaa8225ccc4..d3ec8d508a80811401b08c47569285ae5c323760 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -519,6 +519,21 @@ open struct ("message_id", Types.Message_id.encoding) ("validation_error", Data_encoding.string) + let batch_validation_error = + declare_3 + ~section + ~prefix_name_with_section:true + ~name:"batch_validation_failed" + ~msg: + "validating batch of messages for level {level} and slot {slot_index} \ + failed with error {validation_error}" + ~level:Warning + ~pp1:(fun fmt -> Format.fprintf fmt "%ld") + ~pp2:(fun fmt -> Format.fprintf fmt "%d") + ("level", Data_encoding.int32) + ("slot_index", Data_encoding.int31) + ("validation_error", Data_encoding.string) + let p2p_server_is_ready = declare_1 ~section @@ -1314,6 +1329,12 @@ let emit_dont_wait__message_validation_error ~message_id ~validation_error = message_validation_error (message_id, validation_error) +let emit_dont_wait__batch_validation_error ~level ~slot_index ~validation_error + = + emit__dont_wait__use_with_care + batch_validation_error + (level, slot_index, validation_error) + let emit_p2p_server_is_ready ~point = emit p2p_server_is_ready point let emit_rpc_server_is_ready ~point = emit rpc_server_is_ready point diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index 8f2fd94aedf4dbc94aa995780754e5f1dd6e1247..5fd3f84b9671432ff270b4fa2a4f36031e3f8e50 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -69,6 +69,16 @@ module Worker : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index c6ce6d3a18eeecd0f9874fd4e397def53a047340..2a5331b470dbb5795fe454fadfa3f4ba5a8302ee 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -35,6 +35,13 @@ module Validate_message_hook = struct ~msg:"The message validation function is not set" ~level:Warning + let warn_batch_validation_function_not_set = + Internal_event.Simple.declare_0 + ~section:["dal"; "gs_interface"] + ~name:"batch_validation_function_not_set" + ~msg:"The batch validation function is not set" + ~level:Warning + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 Refactor gossipsub integration to avoid this mutable hook in the lib. *) let check_message = @@ -45,12 +52,26 @@ module Validate_message_hook = struct ()) ; `Unknown) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 + Refactor gossipsub integration to avoid this mutable hook in the lib. *) + let check_message_batch = + ref (fun batch -> + Internal_event.Simple.( + emit__dont_wait__use_with_care + (warn_batch_validation_function_not_set ()) + ()) ; + List.map (fun _ -> `Unknown) batch) + let set func = check_message := func + + let set_batch func = check_message_batch := func end let message_valid ?message ~message_id () = !Validate_message_hook.check_message ?message ~message_id () +let message_valid_batch batch = !Validate_message_hook.check_message_batch batch + module Automaton_config : AUTOMATON_CONFIG with type Time.t = Types.Time.t @@ -76,6 +97,8 @@ module Automaton_config : include Types.Message let valid = message_valid + + let valid_batch = message_valid_batch end end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.mli b/src/lib_dal_node/gossipsub/gs_interface.mli index 7a66f419f2e67964ed8cb6938f8b090f33254494..82d45dd32209ccc562931409291a9ffe05eab9fd 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.mli +++ b/src/lib_dal_node/gossipsub/gs_interface.mli @@ -56,4 +56,14 @@ module Validate_message_hook : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 27fceb2d315943feed04464d958b53276ce239fe..0c533e0f2b0e6b531021156fd4158d74b9d5dada 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -43,6 +43,15 @@ module Events = struct ~level:Debug () + let batch_treatment = + declare_0 + ~section + ~prefix_name_with_section:true + ~name:"batch_treatment" + ~msg:"Validation of a message batch" + ~level:Debug + () + let check_unknown_messages = declare_0 ~section @@ -254,3 +263,4 @@ let event ~verbose = | IHave {topic; message_ids} -> emit ihave (from_peer.peer_id, topic, message_ids) | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) + | Process_batch _ -> emit batch_treatment () diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 5854cabbdc4345300cec248ae5b3b2a2cea4321f..facb572fb98236875ed9516f546d15d50a0bf1b0 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -23,6 +23,18 @@ (* *) (*****************************************************************************) +let string_of_validation_error = function + | `Invalid_degree_strictly_less_than_expected Cryptobox.{given; expected} -> + Format.sprintf + "Invalid_degree_strictly_less_than_expected. Given: %d, expected: %d" + given + expected + | `Invalid_shard -> "Invalid_shard" + | `Shard_index_out_of_range s -> + Format.sprintf "Shard_index_out_of_range(%s)" s + | `Shard_length_mismatch -> "Shard_length_mismatch" + | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" + (** [gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox message message_id] allows checking whether the given [message] identified by [message_id] is valid with the current [cryptobox] parameters. The validity check is @@ -56,20 +68,7 @@ let gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox match res with | Ok () -> `Valid | Error err -> - let validation_error = - match err with - | `Invalid_degree_strictly_less_than_expected {given; expected} -> - Format.sprintf - "Invalid_degree_strictly_less_than_expected. Given: %d, \ - expected: %d" - given - expected - | `Invalid_shard -> "Invalid_shard" - | `Shard_index_out_of_range s -> - Format.sprintf "Shard_index_out_of_range(%s)" s - | `Shard_length_mismatch -> "Shard_length_mismatch" - | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" - in + let validation_error = string_of_validation_error err in Event.emit_dont_wait__message_validation_error ~message_id ~validation_error ; @@ -143,7 +142,7 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = gossipsub_message_id_topic_validation ctxt proto_parameters message_id | other -> other -(** [gossipsub_app_messages_validation ctxt cryptobox head_level +(** [gossipsub_app_messages_validation ctxt cryptobox ~head_level attestation_lag ?message ~message_id ()] checks for the validity of the given message (if any) and message id. @@ -153,8 +152,8 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = {!gossipsub_message_id_validation}. Then, if a message is given, {!gossipsub_app_message_payload_validation} is used to check its validity. *) -let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters - ?message ~message_id () = +let gossipsub_app_messages_validation ctxt cryptobox ~head_level + proto_parameters ?message ~message_id () = if Node_context.is_bootstrap_node ctxt then (* 1. As bootstrap nodes advertise their profiles to controller nodes, they shouldn't receive messages or messages ids. If this @@ -210,3 +209,194 @@ let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters | other -> (* 4. In the case the message id is not Valid. *) other + +type batch_identifier = {level : int32; slot_index : int} + +module Batch_identifier_hashable = struct + type t = batch_identifier + + let equal a b = a.level = b.level && a.slot_index = b.slot_index + + let hash = Hashtbl.hash +end + +module Batch_tbl = Hashtbl.Make (Batch_identifier_hashable) + +type batch_elem = { + index : int; + id : Types.Message_id.t; + message : Types.Message.t; +} + +type 'a batch_result = { + to_check_in_batch : batch_elem list Batch_tbl.t; + not_valid : (int * 'a) list; + (* List of indices in the original batch with the error to output for + this element. *) +} + +let triage ctxt head_level proto_parameters batch = + let to_check_in_batch = + Batch_tbl.create proto_parameters.Types.number_of_slots + in + let not_valid = + List.fold_left_i + (fun index + not_valid + ( _peer, + _topic, + (Types.Message_id.{level; slot_index; _} as id), + message, + _peers ) + -> + (* Have some slack for outdated messages. *) + let slack = 4 in + if + Int32.( + sub head_level level + > of_int (proto_parameters.Types.attestation_lag + slack)) + then (index, `Outdated) :: not_valid + else + match gossipsub_message_id_validation ctxt proto_parameters id with + | `Valid -> + (* The shard is added to the right batch. + Since the message id check has already been performed, we have + the guarantee that all shards associated to a given + (level, slot_index) pair have the commitment published on the L1, + hence they all belong to the same slot, hence the shards can be + passed to the function verifying several shards simultaneously. *) + let already_sorted = + Batch_tbl.find_opt to_check_in_batch {level; slot_index} + in + let new_entry = + match already_sorted with + | None -> [{index; id; message}] + | Some list -> {index; id; message} :: list + in + Batch_tbl.replace to_check_in_batch {level; slot_index} new_entry ; + not_valid + | other -> + (* In the case the message id is not Valid. *) + (index, other) :: not_valid) + [] + batch + in + {to_check_in_batch; not_valid} + +let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch + = + if Node_context.is_bootstrap_node ctxt then + (* As bootstrap nodes advertise their profiles to controller + nodes, they shouldn't receive messages or messages ids. If this + happens, received data are considered as spam (invalid), and the remote + peer might be punished, depending on the Gossipsub implementation. *) + List.map (fun _ -> `Invalid) batch + else + let batch_size = List.length batch in + let result = Array.init batch_size (fun _ -> `Unknown) in + let {to_check_in_batch; not_valid} = + triage ctxt head_level proto_parameters batch + in + List.iter (fun (index, error) -> result.(index) <- error) not_valid ; + + let store = Node_context.get_store ctxt in + let traps_store = Store.traps store in + + (* [treat_batch] does not invalidate a whole slot when a single shard is + invalid, otherwise it would be possible for a byzantine actor to craft + a wrong message with the id of the published slot to prevent the validation + of all the valid shards associated to this slot. *) + let rec treat_batch = function + | [] -> () + | ({level; slot_index}, batch_list) :: remaining_to_treat -> ( + let shards, proofs = + List.fold_left + (fun (shards, proofs) {message; id; _} -> + let Types.Message.{share; shard_proof} = message in + let Types.Message_id.{shard_index; _} = id in + let shard = Cryptobox.{share; index = shard_index} in + (shard :: shards, shard_proof :: proofs)) + ([], []) + batch_list + in + (* We never add empty list in the to_check_in_batch table. *) + let {id = {commitment; _}; _} = + Option.value_f + ~default:(fun () -> assert false) + (List.hd batch_list) + in + let res = + Dal_metrics.sample_time + ~sampling_frequency: + Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi cryptobox commitment shards proofs) + in + match res with + | Ok () -> + List.iter + (fun {index; id; message} -> + (* We register traps only if the message is valid. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + The [proto_parameters] are those for the last known finalized + level, which may differ from those of the slot level. This + will be an issue when the value of the [traps_fraction] + changes. (We cannot use {!Node_context.get_proto_parameters}, + as it is not monad-free; we'll need to use mapping from levels + to parameters.) *) + Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + id + message ; + result.(index) <- `Valid) + batch_list ; + treat_batch remaining_to_treat + | Error err -> + let validation_error = string_of_validation_error err in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + (* Since [verify_multi] does not outputs which shards are the + invalid ones and since we do not want to invalidate legitimate + shards because a byzantine actor added some false shards, we + revalidate all the shards by half recursiveley until we have + identified the invalid shards. *) + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat) + | exception exn -> + (* Don't crash if crypto raised an exception. *) + let validation_error = Printexc.to_string exn in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat)) + in + treat_batch (Batch_tbl.to_seq to_check_in_batch |> List.of_seq) ; + Array.to_list result diff --git a/src/lib_dal_node/message_validation.mli b/src/lib_dal_node/message_validation.mli index 728bb63f66445b42a7a301d4622ff3573f9cae39..b87985ffc44778dba8b05493e0ef18bfda1f01b3 100644 --- a/src/lib_dal_node/message_validation.mli +++ b/src/lib_dal_node/message_validation.mli @@ -53,9 +53,35 @@ val gossipsub_app_messages_validation : Node_context.t -> Cryptobox.t -> - int32 -> + head_level:int32 -> Types.proto_parameters -> ?message:Types.Message.t -> message_id:Types.Message_id.t -> unit -> [> `Invalid | `Outdated | `Unknown | `Valid] + +(** [gossipsub_batch_validation ctxt cryptobox head_level + proto_parameters batch ()] validates a batch of Gossipsub messages and + their associated message ids in the context of the given DAL node. + + The validation follows the same layered approach as [gossipsub_app_messages_validation] + except that after checking the age and validity of the id of a message, + each message is affected to a sub-batch with only messages for the same + level and slot. + + The cryptographic verification is then done per sub-batch. + + This function is intended to be registered as the Gossipsub batch validation hook. +*) +val gossipsub_batch_validation : + Node_context.t -> + Cryptobox.t -> + head_level:int32 -> + Types.proto_parameters -> + (Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [> `Invalid | `Outdated | `Unknown | `Valid] list diff --git a/src/lib_gossipsub/README.md b/src/lib_gossipsub/README.md index 7eba3edac12fedad8a785493ac650eb6a8624e93..9a16a6eab677f63795d65612d9eca42faa507e41 100644 --- a/src/lib_gossipsub/README.md +++ b/src/lib_gossipsub/README.md @@ -45,9 +45,12 @@ receives inputs from both the P2P and application layers, processes them using the automaton, and emits corresponding actions back to the P2P layer (messages, connections, disconnections) and the application layer (full messages). -The WORKER module is designed to be agnostic to the underlying concurrency model. +The WORKER module was designed to be agnostic to the underlying concurrency model. Rather than depending on a specific concurrency library (such as Lwt), it requires the host application to supply an I/O monad and stream implementation. +It is no longer the case since merge request [!18848](https://gitlab.com/tezos/tezos/-/merge_requests/18848) which introduces the use of `Lwt.async` in the +function `batch_accumulator` of the worker. + This design allows the application to retain control over the event loop and execution model, enabling the worker to integrate cleanly into diverse runtime environments. diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index 27d5127274355cabc77d65d225b29fb7b0b4d3cf..312a3373ec738137aa1ddb5f59b78468cf7dbbac 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -84,6 +84,8 @@ module Make (C : AUTOMATON_CONFIG) : type set_application_score = {peer : Peer.t; score : float} + type message_handling = Sequentially | In_batches of {time_interval : span} + (* FIXME not sure subtyping for output is useful. If it is, it is probably for few ouputs and could be removed. *) type _ output = @@ -130,6 +132,12 @@ module Make (C : AUTOMATON_CONFIG) : | Invalid_message : [`Receive_message] output | Unknown_validity : [`Receive_message] output | Outdated : [`Receive_message] output + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output | Already_joined : [`Join] output | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output | Not_joined : [`Leave] output @@ -1150,15 +1158,19 @@ module Make (C : AUTOMATON_CONFIG) : in fail Invalid_message - let handle sender topic message_id message = + let initial_message_checks {topic; message_id; sender; _} : + ( state, + Peer.Set.t, + [`Receive_message] output ) + Tezos_gossipsub__State_monad.check = let open Monad.Syntax in let*! mesh_opt = find_mesh topic in - let*? peers_in_mesh = + let** peers_in_mesh = match mesh_opt with | Some peers -> pass peers | None -> fail Not_subscribed in - let*? () = + let** () = let*! message_cache in match Message_cache.get_first_seen_time_and_senders message_id message_cache @@ -1173,29 +1185,100 @@ module Make (C : AUTOMATON_CONFIG) : in fail Already_received in - let*? () = check_message_id_valid sender topic message_id in - let*? () = check_message_valid sender topic message message_id in + let** () = check_message_id_valid sender topic message_id in + pass peers_in_mesh + + let peers_to_route sender peers_in_mesh topic = + let open Monad.Syntax in let peers = Peer.Set.remove sender peers_in_mesh in - let* () = - put_message_in_cache ~peer:(Some sender) message_id message topic - in - let* () = - update_score sender (fun stats -> - Score.first_message_delivered stats topic) - in let* direct_peers = get_direct_peers topic in - let to_route = Peer.Set.union peers direct_peers in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 + return (Peer.Set.union peers direct_peers) + + let handle ~batching_configuration + ({sender; topic; message_id; message} as receive_message) : + [`Receive_message] output Monad.t = + let open Monad.Syntax in + let*? peers_in_mesh = initial_message_checks receive_message in + match batching_configuration with + | Sequentially -> + let*? () = check_message_valid sender topic message message_id in + let* () = + put_message_in_cache ~peer:(Some sender) message_id message topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + let* to_route = peers_to_route sender peers_in_mesh topic in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 Filter out peers from which we already received the message, or an IHave message? *) - Route_message {to_route} |> return + Route_message {to_route} |> return + | In_batches _ -> + let* to_route = peers_to_route sender peers_in_mesh topic in + return (To_include_in_batch (receive_message, to_route)) end - let handle_receive_message : - receive_message -> [`Receive_message] output Monad.t = - fun {sender; topic; message_id; message} -> - Receive_message.handle sender topic message_id message + let handle_receive_message ~batching_configuration receive_message : + [`Receive_message] output Monad.t = + Receive_message.handle ~batching_configuration receive_message + + let check_message_batch batch = + let unfolded_batch = + List.map + (fun ({sender; topic; message_id; message}, peers) -> + (sender, topic, message_id, message, peers)) + batch + in + List.combine + ~when_different_lengths:() + batch + (Message.valid_batch unfolded_batch) + + let apply_batch (batch : (receive_message * Peer.Set.t) list) : + [`Treated_batch] output Monad.t = + let open Monad.Syntax in + match check_message_batch batch with + | Error () -> + let l = + List.map + (fun (received_msg, _peers) -> (received_msg, Unknown_validity)) + batch + in + return (Batch_result l) + | Ok annotated_batch -> + let* l = + Monad.map_fold + (fun ( ( ({sender; topic; message_id; message} as received_msg), + peers ), + result ) + -> + match result with + | `Valid -> + let* () = + put_message_in_cache + ~peer:(Some sender) + message_id + message + topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + return (received_msg, Route_message {to_route = peers}) + | `Outdated -> return (received_msg, Outdated) + | `Unknown -> return (received_msg, Unknown_validity) + | `Invalid -> + let* () = + update_score sender (fun stats -> + Score.invalid_message_delivered stats topic) + in + return (received_msg, Invalid_message)) + annotated_batch + in + return (Batch_result l) module Publish_message = struct let check_not_seen message_id = @@ -2415,7 +2498,9 @@ module Make (C : AUTOMATON_CONFIG) : "Route_message %a" Fmt.Dump.(record [field "to_route" Fun.id pp_peer_set]) to_route + | To_include_in_batch _ -> fprintf fmtr "To include in batch" | Already_received -> fprintf fmtr "Already_received" + | Batch_result _res -> fprintf fmtr "Batch_result" | Not_subscribed -> fprintf fmtr "Not_subscribed" | Already_joined -> fprintf fmtr "Already_joined" | Joining_topic {to_graft} -> diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 1c010c0ff7a2a2d67da6b3c83dc2a8f2045e1dcf..3eebe69ca4ceeab077d8ac0bd7e17a8345f30f02 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -69,6 +69,23 @@ module type AUTOMATON_SUBCONFIG = sig message_id:Message_id.t -> unit -> [`Valid | `Unknown | `Outdated | `Invalid] + + (** [valid_batch] performs an application layer-level validity check on a + message batch. + + If the [message_handling] field of the gossipsub worker is [Sequentially], + this function is never called. + This function is called when the field is [In_batches]. However, [valid] + is still called even in this case: since message ids are not batched + and [valid_batch] cannot be called without providing a message, [valid] + remains used when processing message ids without the message, especially + the [IHave] events. + + The output is a list of the same length as the input, containing the + result for each message of the batch. *) + val valid_batch : + (Peer.t * Topic.t * Message_id.t * t * Peer.Set.t) list -> + [`Valid | `Unknown | `Outdated | `Invalid] list end end @@ -619,6 +636,8 @@ module type AUTOMATON = sig type set_application_score = {peer : Peer.t; score : float} + type message_handling = Sequentially | In_batches of {time_interval : span} + (** Output produced by one of the actions below. *) type _ output = | Ihave_from_peer_with_low_score : { @@ -705,7 +724,7 @@ module type AUTOMATON = sig - Direct peers for the message's topic; - The peers in the topic's mesh minus the original sender of the message. *) | Already_received : [`Receive_message] output - (** Received a message that has already been recevied before. *) + (** Received a message that has already been received before. *) | Not_subscribed : [`Receive_message] output (** Received a message from a remote peer for a topic we are not subscribed to (called "unknown topic" in the Go implementation). *) @@ -714,6 +733,17 @@ module type AUTOMATON = sig | Unknown_validity : [`Receive_message] output (** Validity cannot be decided yet. *) | Outdated : [`Receive_message] output (** The message is outdated. *) + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output + (** The message passed all the Gossipsub checks. The worker should add this + message in the current batch. It must be noted that most checks + (especially cryptographic ones) are done when the batch is processed. *) + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output + (** Result of the processing of the batch. It contains the list of all + messages of the batch with their respective output. *) | Already_joined : [`Join] output (** Attempting to join a topic we already joined. *) | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output @@ -811,10 +841,17 @@ module type AUTOMATON = sig on this topic. *) val handle_prune : prune -> [`Prune] monad - (** [handle_receive_message { sender; topic; message_id; message }] handles - a message received from [sender] on the gossip network. The function returns - a set of peers to which the (full) message will be directly forwarded. *) - val handle_receive_message : receive_message -> [`Receive_message] monad + (** [handle_receive_message ~batching_configuration received_message] handles a + [received_message] on the gossip network. The function returns either a + rejection reason for the message or a set of peers to which: + - the (full) message will be directly forwarded if [batching_configuration] + is [Sequential]. + - the message might be forwarded after validation of the batch it will be + included in. *) + val handle_receive_message : + batching_configuration:message_handling -> + receive_message -> + [`Receive_message] monad (** [publish { topic; message_id; message }] allows to publish a message on the gossip network from the local node. The function returns a set of peers @@ -834,6 +871,12 @@ module type AUTOMATON = sig topic. *) val leave : leave -> [`Leave] monad + (** [apply_batch batch] handles a complete batch and outputs an element of the + state monad which encapsulates the list of [[`Receive_message] ouptut] + result for each entry of the batch. *) + val apply_batch : + (receive_message * Peer.Set.t) list -> [`Treated_batch] monad + (** [set_application_score {peer; score}] handles setting the application score of [peer]. If the peer is not known, this does nothing. *) val set_application_score : @@ -1183,6 +1226,7 @@ module type WORKER = sig | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * GS.Peer.Set.t) list (** [make ~events_logging ~initial_points rng limits parameters] initializes a new Gossipsub automaton with the given arguments. Then, it initializes @@ -1193,6 +1237,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?initial_points:(unit -> Point.t list) -> + ?batching_interval:GS.span -> self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index dcb2b813c1c971e0b9a6fff3506f5fdb524dc7fc..363da1901f910580338de0012b94f5e384c8224b 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -224,6 +224,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * Peer.Set.t) list module Bounded_message_map = struct (* We maintain the invariant that: @@ -269,6 +270,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end + type batch_state = + | Pending + | Accumulating of (GS.receive_message * Peer.Set.t) list + (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to communicate with the application and P2P layers. *) @@ -285,6 +290,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : unknown_validity_messages : Bounded_message_map.t; unreachable_points : int64 Point.Map.t; (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) + message_handling : GS.message_handling; } (** A worker instance is made of its status and state. *) @@ -364,7 +370,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Note that it's the responsability of the automaton modules to filter out peers based on various criteria (bad score, connection expired, ...). *) - let handle_receive_message received_message = function + let handle_receive_message (received_message : GS.receive_message) : + worker_state * [`Receive_message] GS.output -> worker_state = function | state, GS.Route_message {to_route} -> Introspection.update_count_recv_valid_app_messages state.stats `Incr ; let ({sender = _; topic; message_id; message} : GS.receive_message) = @@ -376,7 +383,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let has_joined = View.(has_joined topic @@ view state.gossip_state) in if has_joined then emit_app_output state message_with_header ; state - | state, GS.Already_received | state, GS.Not_subscribed -> state + | state, GS.Already_received + | state, GS.Not_subscribed + | state, GS.To_include_in_batch _ -> + state | state, GS.Unknown_validity -> Introspection.update_count_recv_unknown_validity_app_messages state.stats @@ -741,15 +751,72 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.leave {topic} gossip_state |> update_gossip_state state |> handle_leave topic + let handle_batch (state, GS.Batch_result batch) = + List.fold_left + (fun state (msg, out) -> handle_receive_message msg (state, out)) + state + batch + + let apply_batch_event ({gossip_state; _} as state) batch = + GS.apply_batch batch gossip_state + |> update_gossip_state state |> handle_batch + + (* This function uses Lwt.async, making the gossipsub library not independent + of the concurrency model anymore. + If independence from [Lwt] is considered useful to recover, a proposed + implementation could be: + https://gitlab.com/tezos/tezos/-/commit/a361dd827b43b601eb9de89e7187f9bcf7fc64d1 + This implementation is not the current implementation, because it has the + disadvantage of having a "ticking clock" and accumulate whatever arrives + between two ticks. It was considered nicer to have the accumulation to + start at the reception of the first shard, because due to network latencies, + the reception will most probably not align well with the clock. + + Even with current design, it happens to have the shards for a commitment + to be splitted between two batches. Increasing the frequency of this event + would probably not be an issue. *) + let batch_accumulator = + let current_batch = ref Pending in + fun output time_interval events_stream -> + match output with + | GS.To_include_in_batch content -> ( + match !current_batch with + | Pending -> + let open Lwt_syntax in + Lwt.async (fun () -> + let* () = Lwt_unix.sleep (GS.Span.to_float_s time_interval) in + let batch = + match !current_batch with + | Accumulating batch -> batch + | Pending -> [] + in + current_batch := Pending ; + Stream.push (Process_batch batch) events_stream ; + return_unit) ; + current_batch := Accumulating [content] + | Accumulating prev_contents -> + current_batch := Accumulating (content :: prev_contents)) + | _ -> () + (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> - let receive_message = - {GS.sender = from_peer; topic; message_id; message} - in - (GS.handle_receive_message receive_message gossip_state - |> update_gossip_state state - |> handle_receive_message receive_message) + (let receive_message = + {GS.sender = from_peer; topic; message_id; message} + in + let batching_configuration = state.message_handling in + let new_state, output = + GS.handle_receive_message + ~batching_configuration + receive_message + gossip_state + in + (match batching_configuration with + | Sequentially -> () + | In_batches {time_interval} -> + batch_accumulator output time_interval state.events_stream) ; + update_gossip_state state (new_state, output) + |> handle_receive_message receive_message) [@profiler.span_f {verbosity = Notice} ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] @@ -857,7 +924,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : match GS.Message_id.valid message.message_id with | `Valid | `Invalid -> let state = {state with unknown_validity_messages} in - GS.handle_receive_message message state.gossip_state + GS.handle_receive_message + ~batching_configuration:state.message_handling + message + state.gossip_state |> update_gossip_state state |> handle_receive_message message |> @@ -899,6 +969,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state [@profiler.span_f {verbosity = Notice} ["apply_event"; "Check_unknown_messages"]] + | Process_batch batch -> apply_batch_event state batch (** A helper function that pushes events in the state *) let push e {status = _; state; self = _} = Stream.push e state.events_stream @@ -997,7 +1068,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : event_loop_promise let make ?(events_logging = fun _event -> Monad.return ()) - ?(initial_points = fun () -> []) ~self rng limits parameters = + ?(initial_points = fun () -> []) ?batching_interval ~self rng limits + parameters = { self; status = Starting; @@ -1014,6 +1086,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : events_logging; unknown_validity_messages = Bounded_message_map.make ~capacity:10_000; unreachable_points = Point.Map.empty; + message_handling = + (match batching_interval with + | None -> Sequentially + | Some time_interval -> In_batches {time_interval}); }; } diff --git a/src/lib_gossipsub/state_monad.ml b/src/lib_gossipsub/state_monad.ml index b35254fa5d03235fe60144cb2434b111319e469e..9caaf284d62b9cf9dc3b3b137b356fe9ad5fb5b6 100644 --- a/src/lib_gossipsub/state_monad.ml +++ b/src/lib_gossipsub/state_monad.ml @@ -43,10 +43,17 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return_pass : 'pass -> ('state, 'pass, 'fail) check val return_fail : 'fail -> ('state, 'pass, 'fail) check + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + module Syntax : sig val ( let* ) : ('state, 'a) t -> ('a -> ('state, 'b) t) -> ('state, 'b) t @@ -57,6 +64,11 @@ module type S = sig val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return : 'a -> ('state, 'a) t val pass : 'pass -> ('state, 'pass, 'fail) check @@ -85,6 +97,18 @@ module M : S = struct let return_fail x = `Fail x |> return + let bind_check c f = + bind c (function `Pass res -> f res | `Fail e -> return_fail e) + + let map_fold f l = + let rec bis acc state = function + | [] -> return (List.rev acc) state + | hd :: tl -> + let state, value = f hd state in + bis (value :: acc) state tl + in + fun state -> bis [] state l + module Syntax = struct let ( let* ) = bind @@ -92,6 +116,8 @@ module M : S = struct let ( let*! ) = get + let ( let** ) = bind_check + let return = return let pass = return_pass diff --git a/src/lib_gossipsub/state_monad.mli b/src/lib_gossipsub/state_monad.mli index 9466a6fa013c035b459178dee36cddd784a4d0eb..fc72d997b9b5d8870f9e009c9faf2641ffbafdf3 100644 --- a/src/lib_gossipsub/state_monad.mli +++ b/src/lib_gossipsub/state_monad.mli @@ -57,7 +57,13 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t - (** [return_pass v] is the default constuctor of [`Pass] values in the + (** [bind_check m f] is the bind function for the [('state, 'a, 'fail) t] monad. *) + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + + (** [return_pass v] is the default constructor of [`Pass] values in the [('state, 'pass, 'fail) check] monad. *) val return_pass : 'pass -> ('state, 'pass, 'fail) check @@ -65,6 +71,10 @@ module type S = sig [('state, 'pass, 'fail) check] monad. *) val return_fail : 'fail -> ('state, 'pass, 'fail) check + (** [map_fold f l] iteratively applies [f] to all elements of [l] updating + the state at each step. *) + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + (** Infix notations and/or shorter aliases for the functions above. *) module Syntax : sig (** Infix notation for {!bind}. *) @@ -79,6 +89,12 @@ module type S = sig (** Infix notation for {!get}. *) val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + (** Infix notation for {!bind_check}. *) + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + (** Re-exporting {!return} function in this sub-module. *) val return : 'a -> ('state, 'a) t diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml index 6a6deebe2f18bd4ebadfb98267b706bda980e161..c025ee0f5e59f3368217d71241cdf693547d8a58 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml @@ -251,8 +251,12 @@ struct let set_application_score x = Set_application_score x |> i end - let dispatch : type a. a input -> GS.state -> GS.state * a GS.output = - fun i state -> + let dispatch : type a. + a input -> + ?batching_configuration:GS.message_handling -> + GS.state -> + GS.state * a GS.output = + fun i ?(batching_configuration = Sequentially) state -> match i with | Add_peer m -> GS.add_peer m state | Remove_peer m -> GS.remove_peer m state @@ -260,7 +264,8 @@ struct | Iwant m -> GS.handle_iwant m state | Graft m -> GS.handle_graft m state | Prune m -> GS.handle_prune m state - | Receive_message m -> GS.handle_receive_message m state + | Receive_message m -> + GS.handle_receive_message ~batching_configuration m state | Publish_message m -> GS.publish_message m state | Heartbeat -> GS.heartbeat state | Join m -> GS.join m state @@ -465,7 +470,7 @@ struct (* Construction of the trace generator. *) (* Evaluate a [raw] on an initial state yields a trace. *) - let raw_to_trace state raw = + let raw_to_trace ?batching_configuration state raw = let open M in let seq = SeqM.M.unfold next raw in let* _, _, rev_trace = @@ -474,7 +479,7 @@ struct match event with | Input i -> Time.set time ; - let state', output = dispatch i state in + let state', output = dispatch ?batching_configuration i state in let step = Transition {time; input = i; state; state'; output} in @@ -501,10 +506,10 @@ struct let seq = SeqM.M.unfold Fragment.next raw in SeqM.fold_left (fun acc event -> f event acc) init seq - let run state fragment : trace t = + let run ?batching_configuration state fragment : trace t = let open M in let* raw = Fragment.raw_generator fragment in - Fragment.raw_to_trace state raw + Fragment.raw_to_trace ?batching_configuration state raw let check_fold (type e inv) (f : transition -> inv -> (inv, e) result) init trace : (unit, e * trace) result = diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli index c0ff8fd20f8b137dd1ebf5378c679c5ee73e78e2..649c2cacb945c971fd48b7e43b8565ef61f91b8e 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli @@ -226,7 +226,11 @@ val fold : Fragment.t -> (event -> 'a -> 'a) -> 'a -> 'a t (** [run s0 f] constructs a {!trace} generator by running the gossipsub automaton on inputs generated from [f]. *) -val run : GS.state -> Fragment.t -> trace t +val run : + ?batching_configuration:GS.message_handling -> + GS.state -> + Fragment.t -> + trace t (** [check_fold step inv trace] folds the predicate [step] with initial invariant [inv] on [trace]. If the [step] fails with [Error e], [check_predicate] returns [Error (e, prefix)] diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml index d6038b1fe3f7f65f3d5c20238632353104522976..67a993886cbf212c6aabce582e91881892fdefc4 100644 --- a/src/lib_gossipsub/test/test_gossipsub.ml +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -50,7 +50,12 @@ let rng = let () = let open Default_limits in let default_limits = default_limits () in - Test_unit.register rng default_limits parameters ; + Test_unit.register + ~message_handlings: + [Sequentially; In_batches {time_interval = Milliseconds.of_int_ms 100}] + rng + default_limits + parameters ; Test_integration_worker.register rng default_limits parameters ; Test_pbt.register rng default_limits parameters ; Test_message_cache.register () diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 9ba07142f11e2feed35a38279365c77127fe967f..f5fcfac2adb7b37c0befd933abb0b5c09bf6d9bb 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -167,6 +167,8 @@ module Automaton_config : include String_iterable let valid ?message ~message_id () = Validity_hook.apply message message_id + + let valid_batch = List.map (fun _ -> `Valid) end end end diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index fa6a945efef7f128f5c780045f5104b266b13809..23059b3f80e9b05beec082f4919d9b375d317a51 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -75,7 +75,10 @@ let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state = view.message_cache with | None -> - Test.fail "Expected entry in message cache for message id %d" message_id + Test.fail + ~__LOC__ + "Expected entry in message cache for message id %d" + message_id | Some (_message_cache_state, message, _access) -> Check.( (message = expected_message) @@ -578,10 +581,18 @@ let test_fanout rng limits parameters = (** Tests that receiving a message for a subscribed topic: - Returns peers to publish to. - Inserts message into message cache. *) -let test_receiving_message rng limits parameters = +let test_receiving_message ~batching_configuration rng limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message" + ~title: + ("Gossipsub: Test receiving message" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receiving_message"] @@ fun () -> let topic = "test" in @@ -602,14 +613,17 @@ let test_receiving_message rng limits parameters = let message_id = 0 in (* Receive a message for a joined topic. *) let state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id; message} + state in let peers_to_route = match output with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message {to_route} -> to_route + | Route_message {to_route} | To_include_in_batch (_, to_route) -> to_route in (* Should return [degree_optimal] peers to route the message to. *) Check.( @@ -617,6 +631,18 @@ let test_receiving_message rng limits parameters = int ~error_msg:"Expected %R, got %L" ~__LOC__) ; + (* In batch mode, the message is added to cache only once the batch has been + treated. *) + let state = + match output with + | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) + | _ -> + Test.fail + ~__LOC__ + "Output should have been Route_message, Already_received or \ + To_include_in_batch" + in (* [message_id] should be added to the message cache. *) assert_in_message_cache ~__LOC__ @@ -647,10 +673,19 @@ let test_receiving_message rng limits parameters = It might be possible to simplify this setup (for instance, by having just one peer in the mesh?). *) -let test_message_duplicate_score_bug rng _limits parameters = +let test_message_duplicate_score_bug ~batching_configuration rng _limits + parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test message duplicate score bug" + ~title: + ("Gossipsub: Test message duplicate score bug" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "duplicate_score_bug"] @@ fun () -> (* Ignore decays, for simplicity; they are used when refreshing score during @@ -689,20 +724,44 @@ let test_message_duplicate_score_bug rng _limits parameters = let message = "some_data" in let state, _ = GS.handle_graft {peer = sender; topic} state in (* Receive a message for a joined topic. *) - let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + let state, output1 = + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 0; message} + state in let () = - match output with + match output1 with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message _ -> () + | Route_message _ | To_include_in_batch _ -> () in (* Send one more message so that [sender]'s P2 score is greater than its P3 score. In this way its score is positive, and [sender] is not pruned. *) - let state, _ = - GS.handle_receive_message {sender; topic; message_id = 1; message} state + let state, output2 = + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 1; message} + state + in + (* If we are in batch mode, the batch containing both messages as to be + treated. *) + let state = + match batching_configuration with + | Sequentially -> state + | In_batches _ -> + fst + (GS.apply_batch + (List.map + (function + | GS.To_include_in_batch x -> x + | _ -> + Test.fail + ~__LOC__ + "In batch mode, received message should be in batch.") + [output1; output2]) + state) in (* Send the first message again. *) let per_topic_score_limits = @@ -716,13 +775,16 @@ let test_message_duplicate_score_bug rng _limits parameters = let state, _ = GS.heartbeat state in assert_mesh_inclusion ~__LOC__ ~topic ~peer:sender ~is_included:true state ; let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 0; message} + state in let () = match output with | Already_received -> () | Not_subscribed | Invalid_message | Unknown_validity | Outdated - | Route_message _ -> + | Route_message _ | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling should fail with Already_received." in let expected_score = @@ -749,10 +811,19 @@ let test_message_duplicate_score_bug rng _limits parameters = (** Tests that we do not route the message when receiving a message for an unsubscribed topic. *) -let test_receiving_message_for_unsusbcribed_topic rng limits parameters = +let test_receiving_message_for_unsusbcribed_topic ~batching_configuration rng + limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message for unsubscribed topic" + ~title: + ("Gossipsub: Test receiving message for unsubscribed topic" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receive_message"; "fanout"] @@ fun () -> let topic = "topic" in @@ -775,11 +846,14 @@ let test_receiving_message_for_unsusbcribed_topic rng limits parameters = let message = "some data" in let message_id = 0 in let _state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id; message} + state in match output with | Already_received | Route_message _ | Invalid_message | Unknown_validity - | Outdated -> + | Outdated | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling of received message should fail with [Not_subscribed]." @@ -2119,10 +2193,18 @@ let test_scoring_p1 rng _limits parameters = (** Test for P2 (First Message Deliveries). Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3247 *) -let test_scoring_p2 rng _limits parameters = +let test_scoring_p2 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P2" + ~title: + ("Gossipsub: Scoring P2" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p2"] @@ fun () -> let first_message_deliveries_weight = 2.0 in @@ -2153,15 +2235,18 @@ let test_scoring_p2 rng _limits parameters = let receive_message ~__LOC__ peer message_id message state = let state, output = GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in match output with | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) | _ -> Test.fail ~__LOC__ - "Output should have been Route_message or Already_received" + "Output should have been Route_message, Already_received or \ + To_include_in_batch" in (* peer 0 delivers message first *) let message_id, message = gen_message () in @@ -2230,10 +2315,18 @@ let test_scoring_p2 rng _limits parameters = Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3895 and https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3979 *) -let test_scoring_p4 rng _limits parameters = +let test_scoring_p4 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P4" + ~title: + ("Gossipsub: Scoring P4" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p4"] @@ fun () -> let invalid_message_deliveries_weight = -2.0 in @@ -2276,6 +2369,7 @@ let test_scoring_p4 rng _limits parameters = let message_id, message = gen_message () in let state, _ = GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in @@ -2430,15 +2524,12 @@ let test_scoring_p7_grafts_before_backoff rng _limits parameters = (* TODO: https://gitlab.com/tezos/tezos/-/issues/5293 Add test for the described test scenario *) -let register rng limits parameters = +let register ~message_handlings rng limits parameters = test_ignore_graft_from_unknown_topic rng limits parameters ; test_handle_received_subscriptions rng limits parameters ; test_join_adds_peers_to_mesh rng limits parameters ; test_join_adds_fanout_to_mesh rng limits parameters ; test_publish_without_flood_publishing rng limits parameters ; - test_receiving_message_for_unsusbcribed_topic rng limits parameters ; - test_receiving_message rng limits parameters ; - test_message_duplicate_score_bug rng limits parameters ; test_fanout rng limits parameters ; test_handle_graft_for_joined_topic rng limits parameters ; test_handle_graft_for_not_joined_topic rng limits parameters ; @@ -2465,7 +2556,21 @@ let register rng limits parameters = test_ignore_too_many_messages_in_ihave rng limits parameters ; test_heartbeat_scenario rng limits parameters ; test_scoring_p1 rng limits parameters ; - test_scoring_p2 rng limits parameters ; - test_scoring_p4 rng limits parameters ; test_scoring_p5 rng limits parameters ; - test_scoring_p7_grafts_before_backoff rng limits parameters + test_scoring_p7_grafts_before_backoff rng limits parameters ; + List.iter + (fun batching_configuration -> + test_receiving_message ~batching_configuration rng limits parameters ; + test_message_duplicate_score_bug + ~batching_configuration + rng + limits + parameters ; + test_receiving_message_for_unsusbcribed_topic + ~batching_configuration + rng + limits + parameters ; + test_scoring_p2 ~batching_configuration rng limits parameters ; + test_scoring_p4 ~batching_configuration rng limits parameters) + message_handlings