diff --git a/src/lib_agnostic_baker/commands.ml b/src/lib_agnostic_baker/commands.ml index 08096dbf989b5fa81831dd5405e1e3fe06432e96..3e87a264b7fe734748a164a13fbde6b1f7658203 100644 --- a/src/lib_agnostic_baker/commands.ml +++ b/src/lib_agnostic_baker/commands.ml @@ -124,8 +124,10 @@ module Dal = struct let ignore_topics = arg_list_to_clic ignore_topics_arg + let batching_configuration = arg_to_clic batching_configuration_arg + let args = - Tezos_clic.args24 + Tezos_clic.args25 data_dir config_file rpc_addr @@ -150,6 +152,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration let commands = let open Tezos_clic in @@ -184,7 +187,8 @@ module Dal = struct verbose, ignore_l1_config_peers, disable_amplification, - ignore_topics ) + ignore_topics, + batching_configuration ) _cctxt -> let attester_profile = Option.value ~default:[] attester_profile in @@ -218,6 +222,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration in match options with | Ok options -> Cli.run cmd options diff --git a/src/lib_crypto_dal/cryptobox.ml b/src/lib_crypto_dal/cryptobox.ml index e3dd8140adcb122138894e30380afe53502c96f6..ae8267f986f64103c1199b70b424141109fb7e09 100644 --- a/src/lib_crypto_dal/cryptobox.ml +++ b/src/lib_crypto_dal/cryptobox.ml @@ -148,7 +148,9 @@ module Inner = struct type page = bytes - type share = Scalar.t array + module Share = Compare.Array (Scalar) + + type share = Share.t type shard = {index : int; share : share} diff --git a/src/lib_crypto_dal/cryptobox.mli b/src/lib_crypto_dal/cryptobox.mli index df89a5429615a92a710372e689394de1f0a6d8f1..471f975ae2ed52fd79c1a3094f95787c30e62619 100644 --- a/src/lib_crypto_dal/cryptobox.mli +++ b/src/lib_crypto_dal/cryptobox.mli @@ -199,14 +199,18 @@ val string_of_commit_error : string (** A portion of the data represented by a polynomial. *) -type share +module Share : Compare.COMPARABLE + +type share = Share.t (** A shard is share with its index (see {!val:shards_from_polynomial}). *) type shard = {index : int; share : share} (** A proof that a shard belongs to some commitment. *) -type shard_proof +module Proof : Compare.COMPARABLE + +type shard_proof = Proof.t module Verifier : VERIFIER diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index e623a88f7d00e65ae5d069342caf3c0aca4806b6..f5e7fe525d37e4a2f44587e16f934299093de474 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -579,11 +579,17 @@ let new_finalized_head ctxt cctxt l1_crawler cryptobox finalized_block_hash in return_unit in + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level:level + proto_parameters) ; Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - level + ~head_level:level proto_parameters) ; let*! () = remove_old_level_stored_data proto_parameters ctxt level in let* () = diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index fbb4db92f842868ecd71ead87f3540ab89dfe9a0..a74ac644282930895cfbb2b6ec3a1ec0fe543edf 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -573,6 +573,30 @@ module Term = struct let ignore_topics = arg_list_to_cmdliner ignore_topics_arg + let batching_configuration_arg = + let open Configuration_file in + let pp fmt = function + | Disabled -> Format.fprintf fmt "disabled" + | Enabled {time_interval} -> Format.fprintf fmt "%d" time_interval + in + let parse = function + | "disabled" -> Ok Disabled + | s -> ( + try Ok (Enabled {time_interval = int_of_string s}) + with Failure _ -> Error "Unrecognized int") + in + make_arg + ~parse + ~doc: + "The time (in milliseconds) for which shards are accumulated by the \ + gossipsub automaton before triggering the validation of the batch. \ + \"disabled\" to deactivate the batching of shards." + ~placeholder:"INT|\"disabled\"" + ~pp + "batching-time-interval" + + let batching_configuration = arg_to_cmdliner batching_configuration_arg + let term process = Cmdliner.Term.( ret @@ -582,7 +606,7 @@ module Term = struct $ operator_profile $ observer_profile $ bootstrap_profile $ peers $ history_mode $ service_name $ service_namespace $ fetch_trusted_setup $ disable_shard_validation $ verbose $ ignore_l1_config_peers - $ disable_amplification $ ignore_topics)) + $ disable_amplification $ ignore_topics $ batching_configuration)) end type t = Run | Config_init | Config_update | Debug_print_store_schemas @@ -748,13 +772,15 @@ type options = { ignore_l1_config_peers : bool; disable_amplification : bool; ignore_topics : Signature.public_key_hash list; + batching_configuration : Configuration_file.batching_configuration option; } let cli_options_to_options data_dir config_file rpc_addr expected_pow listen_addr public_addr endpoint slots_backup_uris trust_slots_backup_uris metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation - verbose ignore_l1_config_peers disable_amplification ignore_topics = + verbose ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration = let open Result_syntax in let profile = Controller_profiles.make ~attesters ~operators ?observers () in let* profile = @@ -800,6 +826,7 @@ let cli_options_to_options data_dir config_file rpc_addr expected_pow ignore_l1_config_peers; disable_amplification; ignore_topics; + batching_configuration; } let merge_experimental_features _ _configuration = () @@ -828,6 +855,7 @@ let merge ignore_l1_config_peers; disable_amplification; ignore_topics = _; + batching_configuration; } configuration = let profile = match profile with @@ -876,6 +904,10 @@ let merge configuration.ignore_l1_config_peers || ignore_l1_config_peers; disable_amplification = configuration.disable_amplification || disable_amplification; + batching_configuration = + Option.value + ~default:configuration.batching_configuration + batching_configuration; } let wrap_with_error main_promise = @@ -977,7 +1009,7 @@ let commands = metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation verbose ignore_l1_config_peers - disable_amplification ignore_pkhs = + disable_amplification ignore_pkhs batching_configuration = match cli_options_to_options data_dir @@ -1004,6 +1036,7 @@ let commands = ignore_l1_config_peers disable_amplification ignore_pkhs + batching_configuration with | Ok options -> main_run subcommand options | Error msg -> `Error msg diff --git a/src/lib_dal_node/cli.mli b/src/lib_dal_node/cli.mli index 809e481855417a218ff92325a52c2c2b530e4ffc..c1e746cfa71d7fcfe5fd30e71debce4ada63dd32 100644 --- a/src/lib_dal_node/cli.mli +++ b/src/lib_dal_node/cli.mli @@ -101,6 +101,8 @@ module Term : sig val verbose_switch : switch val ignore_topics_arg : Signature.public_key_hash arg_list + + val batching_configuration_arg : Configuration_file.batching_configuration arg end (** {2 Command-line options} *) @@ -154,6 +156,9 @@ type options = { (** Disable amplification. Default value is false. *) ignore_topics : Signature.public_key_hash list; (** Do not distribute shards of these pkhs. *) + batching_configuration : Configuration_file.batching_configuration option; + (** The configuration used for batching verification of received shards + via GossipSub to save cryptographic computation. *) } (** Subcommands that can be used by the DAL node. In the future this type @@ -186,6 +191,7 @@ val cli_options_to_options : bool -> bool -> Signature.public_key_hash list -> + Configuration_file.batching_configuration option -> (options, bool * string) result val run : t -> options -> unit tzresult Lwt.t diff --git a/src/lib_dal_node/configuration_file.ml b/src/lib_dal_node/configuration_file.ml index 03ed6c5dd56a6a9cbbb8d58322b318a1e416ba6a..23bade49cb5e099b2c664b2c079a5139d1612290 100644 --- a/src/lib_dal_node/configuration_file.ml +++ b/src/lib_dal_node/configuration_file.ml @@ -33,6 +33,8 @@ type neighbor = {addr : string; port : int} type history_mode = Rolling of {blocks : [`Auto | `Some of int]} | Full +type batching_configuration = Disabled | Enabled of {time_interval : int} + type experimental_features = unit let history_mode_encoding = @@ -64,6 +66,27 @@ let history_mode_encoding = (fun () -> Full); ] +let batching_configuration_encoding = + let open Data_encoding in + union + [ + case + ~title:"disabled" + ~description:"" + (Tag 0) + unit + (function Disabled -> Some () | Enabled _ -> None) + (fun () -> Disabled); + case + ~title:"enabled" + ~description:"" + (Tag 1) + int31 + (function + | Disabled -> None | Enabled {time_interval} -> Some time_interval) + (fun time_interval -> Enabled {time_interval}); + ] + type t = { data_dir : string; rpc_addr : P2p_point.Id.t; @@ -85,6 +108,7 @@ type t = { verbose : bool; ignore_l1_config_peers : bool; disable_amplification : bool; + batching_configuration : batching_configuration; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".tezos-dal-node" @@ -127,6 +151,19 @@ let default_experimental_features = () let default_fetch_trusted_setup = true +(* By default, when a shard is received, we wait for 0.1 seconds for other + shards of the same commitment before launching the cryptographic validation + of the shards. + Since there shards are supposed to be received several levels in advance, + the risk that this 0.1 second delay makes the validation happen too late + is very low. + It also slows down gossiping a bit, since messages are advertised only after + validation, so if a message has to go through several nodes before reaching + its final destination, the waiting delay accumulates and may be of a few + seconds. It looks fine with 8 blocks of attestation lag and 6 seconds block + time but if those values are reduced a lot, this might become an issue.*) +let default_batching_configuration = Enabled {time_interval = 100} + let default = { data_dir = default_data_dir; @@ -149,6 +186,7 @@ let default = verbose = false; ignore_l1_config_peers = false; disable_amplification = false; + batching_configuration = default_batching_configuration; } let uri_encoding : Uri.t Data_encoding.t = @@ -192,47 +230,50 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; } -> - ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) )) - (fun ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) ) -> + ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration )) + (fun ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration ) -> { data_dir; rpc_addr; @@ -254,109 +295,118 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; }) (merge_objs - (obj10 - (dft - "data-dir" - ~description:"Location of the data dir" - string - default_data_dir) - (dft - "rpc-addr" - ~description:"RPC address" - P2p_point.Id.encoding - default_rpc_addr) - (dft - "net-addr" - ~description:"P2P address of this node" - P2p_point.Id.encoding - default_listen_addr) - (dft - "public-addr" - ~description:"P2P address of this node" - P2p_point.Id.encoding - default_listen_addr) - (dft - "peers" - ~description:"P2P addresses of remote peers" - (list string) - default_peers) - (dft - "expected-pow" - ~description:"Expected P2P identity's PoW" - float - default_expected_pow) - (dft - "endpoint" - ~description:"The Tezos node endpoint" - uri_encoding - default_endpoint) - (dft - "slots_backup_uris" - ~description:"Optional HTTP endpoints to fetch missing slots from." - (list uri_encoding) - []) - (dft - "trust_slots_backup_uris" - ~description: - "Whether to trust the data downlaoded from the provided HTTP \ - backup URIs." - bool - false) - (dft - "metrics-addr" - ~description:"The point for the DAL node metrics server" - (Encoding.option P2p_point.Id.encoding) - None)) - (obj10 - (dft - "history_mode" - ~description:"The history mode for the DAL node" - history_mode_encoding - default_history_mode) - (dft - "profiles" - ~description:"The Octez DAL node profiles" - Profile_manager.unresolved_encoding - Profile_manager.Empty) - (req "version" ~description:"The configuration file version" int31) - (dft - "service_name" - ~description:"Name of the service" - Data_encoding.string - default.service_name) - (dft - "service_namespace" - ~description:"Namespace for the service" - Data_encoding.string - default.service_namespace) - (dft - "experimental_features" - ~description:"Experimental features" - experimental_features_encoding - default_experimental_features) - (dft - "fetch_trusted_setup" - ~description:"Install trusted setup" - bool - true) - (dft - "verbose" - ~description: - "Whether to emit details about frequent logging events" - bool - default.verbose) - (dft - "ignore_l1_config_peers" - ~description:"Ignore the boot(strap) peers provided by L1" - bool - default.ignore_l1_config_peers) + (merge_objs + (obj10 + (dft + "data-dir" + ~description:"Location of the data dir" + string + default_data_dir) + (dft + "rpc-addr" + ~description:"RPC address" + P2p_point.Id.encoding + default_rpc_addr) + (dft + "net-addr" + ~description:"P2P listening address of this node" + P2p_point.Id.encoding + default_listen_addr) + (dft + "public-addr" + ~description:"P2P public address of this node" + P2p_point.Id.encoding + default_public_addr) + (dft + "peers" + ~description:"P2P addresses of remote peers" + (list string) + default_peers) + (dft + "expected-pow" + ~description:"Expected P2P identity's PoW" + float + default_expected_pow) + (dft + "endpoint" + ~description:"The Tezos node endpoint" + uri_encoding + default_endpoint) + (dft + "slots_backup_uris" + ~description: + "Optional HTTP endpoints to fetch missing slots from." + (list uri_encoding) + []) + (dft + "trust_slots_backup_uris" + ~description: + "Whether to trust the data downlaoded from the provided HTTP \ + backup URIs." + bool + false) + (dft + "metrics-addr" + ~description:"The point for the DAL node metrics server" + (Encoding.option P2p_point.Id.encoding) + None)) + (obj10 + (dft + "history_mode" + ~description:"The history mode for the DAL node" + history_mode_encoding + default_history_mode) + (dft + "profiles" + ~description:"The Octez DAL node profiles" + Profile_manager.unresolved_encoding + Profile_manager.Empty) + (req "version" ~description:"The configuration file version" int31) + (dft + "service_name" + ~description:"Name of the service" + Data_encoding.string + default.service_name) + (dft + "service_namespace" + ~description:"Namespace for the service" + Data_encoding.string + default.service_namespace) + (dft + "experimental_features" + ~description:"Experimental features" + experimental_features_encoding + default_experimental_features) + (dft + "fetch_trusted_setup" + ~description:"Install trusted setup" + bool + true) + (dft + "verbose" + ~description: + "Whether to emit details about frequent logging events" + bool + default.verbose) + (dft + "ignore_l1_config_peers" + ~description:"Ignore the boot(strap) peers provided by L1" + bool + default.ignore_l1_config_peers) + (dft + "disable_amplification" + ~description:"Disable amplification" + bool + default.disable_amplification))) + (obj1 (dft - "disable_amplification" - ~description:"Disable amplification" - bool - default.disable_amplification))) + "batching_configuration" + ~description:"Set the batching delay for shard verification" + batching_configuration_encoding + default.batching_configuration))) type error += DAL_node_unable_to_write_configuration_file of string diff --git a/src/lib_dal_node/configuration_file.mli b/src/lib_dal_node/configuration_file.mli index 920aa5f6bca2d770e6184af00a9a8f5f3959e7fc..92aa24545ddaec4015a0cabd7a28d07d916141ec 100644 --- a/src/lib_dal_node/configuration_file.mli +++ b/src/lib_dal_node/configuration_file.mli @@ -34,6 +34,15 @@ type history_mode = profile. *) | Full (** [Full] keeps the shards forever *) +(** The configuration of the validation of shards in batch mode.*) +type batching_configuration = + | Disabled + (** [Disabled] enforces the validation of shards one by one at reception time. *) + | Enabled of {time_interval : int} + (** [Enabled {time_interval}] accumulates messages received during + [time_interval] milliseconds and verifies all shards related to the + same commitment in the same batch in one pass. *) + (** Configuration settings for experimental features, with no backward compatibility guarantees. *) type experimental_features = unit @@ -73,6 +82,9 @@ type t = { ignore_l1_config_peers : bool; (** Ignore the boot(strap) peers provided by L1. *) disable_amplification : bool; (** Disable amplification. *) + batching_configuration : batching_configuration; + (** The configuration of the batching of the shards. + The default is [Enabled{time_interval=100}]. *) } (** [default] is the default configuration. *) diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index fcd19d0740e149c79f226c61d79c8fdcd953024f..efc84d6da3af928743937500cadadb2aa879c0c8 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -434,10 +434,18 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file {peer_id = identity.peer_id; maybe_reachable_point = public_addr} in let gs_worker = + let batching_interval = + match config.batching_configuration with + | Disabled -> None + | Enabled {time_interval} -> + let time_in_second = float_of_int time_interval /. 1000. in + Some (Types.Span.of_float_s time_in_second) + in Gossipsub.Worker.( make ~initial_points:get_initial_points ~events_logging:(Logging.event ~verbose:config.verbose) + ?batching_interval ~self rng limits @@ -528,11 +536,24 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Node_context.warn_if_attesters_not_delegates ctxt profile | _ -> return_unit in + let () = + match config.batching_configuration with + | Enabled _ -> + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level + proto_parameters) + | Disabled -> () + in + (* Even if the batch validation is activated, one has to register a per message + validation for the validation of message id. *) Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt cryptobox - head_level + ~head_level proto_parameters) ; let is_prover_profile = Profile_manager.is_prover_profile profile_ctxt in (* Initialize amplificator if in prover profile. diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 8729259fb86f4d32c8e52799db97bcaa8225ccc4..d3ec8d508a80811401b08c47569285ae5c323760 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -519,6 +519,21 @@ open struct ("message_id", Types.Message_id.encoding) ("validation_error", Data_encoding.string) + let batch_validation_error = + declare_3 + ~section + ~prefix_name_with_section:true + ~name:"batch_validation_failed" + ~msg: + "validating batch of messages for level {level} and slot {slot_index} \ + failed with error {validation_error}" + ~level:Warning + ~pp1:(fun fmt -> Format.fprintf fmt "%ld") + ~pp2:(fun fmt -> Format.fprintf fmt "%d") + ("level", Data_encoding.int32) + ("slot_index", Data_encoding.int31) + ("validation_error", Data_encoding.string) + let p2p_server_is_ready = declare_1 ~section @@ -1314,6 +1329,12 @@ let emit_dont_wait__message_validation_error ~message_id ~validation_error = message_validation_error (message_id, validation_error) +let emit_dont_wait__batch_validation_error ~level ~slot_index ~validation_error + = + emit__dont_wait__use_with_care + batch_validation_error + (level, slot_index, validation_error) + let emit_p2p_server_is_ready ~point = emit p2p_server_is_ready point let emit_rpc_server_is_ready ~point = emit rpc_server_is_ready point diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index 8f2fd94aedf4dbc94aa995780754e5f1dd6e1247..5fd3f84b9671432ff270b4fa2a4f36031e3f8e50 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -69,6 +69,16 @@ module Worker : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index c6ce6d3a18eeecd0f9874fd4e397def53a047340..2a5331b470dbb5795fe454fadfa3f4ba5a8302ee 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -35,6 +35,13 @@ module Validate_message_hook = struct ~msg:"The message validation function is not set" ~level:Warning + let warn_batch_validation_function_not_set = + Internal_event.Simple.declare_0 + ~section:["dal"; "gs_interface"] + ~name:"batch_validation_function_not_set" + ~msg:"The batch validation function is not set" + ~level:Warning + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 Refactor gossipsub integration to avoid this mutable hook in the lib. *) let check_message = @@ -45,12 +52,26 @@ module Validate_message_hook = struct ()) ; `Unknown) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5674 + Refactor gossipsub integration to avoid this mutable hook in the lib. *) + let check_message_batch = + ref (fun batch -> + Internal_event.Simple.( + emit__dont_wait__use_with_care + (warn_batch_validation_function_not_set ()) + ()) ; + List.map (fun _ -> `Unknown) batch) + let set func = check_message := func + + let set_batch func = check_message_batch := func end let message_valid ?message ~message_id () = !Validate_message_hook.check_message ?message ~message_id () +let message_valid_batch batch = !Validate_message_hook.check_message_batch batch + module Automaton_config : AUTOMATON_CONFIG with type Time.t = Types.Time.t @@ -76,6 +97,8 @@ module Automaton_config : include Types.Message let valid = message_valid + + let valid_batch = message_valid_batch end end end diff --git a/src/lib_dal_node/gossipsub/gs_interface.mli b/src/lib_dal_node/gossipsub/gs_interface.mli index 7a66f419f2e67964ed8cb6938f8b090f33254494..82d45dd32209ccc562931409291a9ffe05eab9fd 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.mli +++ b/src/lib_dal_node/gossipsub/gs_interface.mli @@ -56,4 +56,14 @@ module Validate_message_hook : sig unit -> [`Valid | `Unknown | `Outdated | `Invalid]) -> unit + + val set_batch : + ((Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [`Invalid | `Outdated | `Unknown | `Valid] list) -> + unit end diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 27fceb2d315943feed04464d958b53276ce239fe..0c533e0f2b0e6b531021156fd4158d74b9d5dada 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -43,6 +43,15 @@ module Events = struct ~level:Debug () + let batch_treatment = + declare_0 + ~section + ~prefix_name_with_section:true + ~name:"batch_treatment" + ~msg:"Validation of a message batch" + ~level:Debug + () + let check_unknown_messages = declare_0 ~section @@ -254,3 +263,4 @@ let event ~verbose = | IHave {topic; message_ids} -> emit ihave (from_peer.peer_id, topic, message_ids) | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) + | Process_batch _ -> emit batch_treatment () diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index 5854cabbdc4345300cec248ae5b3b2a2cea4321f..e76ca487a8b21c1e02193175d378edaeb50aaad3 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -23,6 +23,18 @@ (* *) (*****************************************************************************) +let string_of_validation_error = function + | `Invalid_degree_strictly_less_than_expected Cryptobox.{given; expected} -> + Format.sprintf + "Invalid_degree_strictly_less_than_expected. Given: %d, expected: %d" + given + expected + | `Invalid_shard -> "Invalid_shard" + | `Shard_index_out_of_range s -> + Format.sprintf "Shard_index_out_of_range(%s)" s + | `Shard_length_mismatch -> "Shard_length_mismatch" + | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" + (** [gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox message message_id] allows checking whether the given [message] identified by [message_id] is valid with the current [cryptobox] parameters. The validity check is @@ -56,20 +68,7 @@ let gossipsub_app_message_payload_validation ~disable_shard_validation cryptobox match res with | Ok () -> `Valid | Error err -> - let validation_error = - match err with - | `Invalid_degree_strictly_less_than_expected {given; expected} -> - Format.sprintf - "Invalid_degree_strictly_less_than_expected. Given: %d, \ - expected: %d" - given - expected - | `Invalid_shard -> "Invalid_shard" - | `Shard_index_out_of_range s -> - Format.sprintf "Shard_index_out_of_range(%s)" s - | `Shard_length_mismatch -> "Shard_length_mismatch" - | `Prover_SRS_not_loaded -> "Prover_SRS_not_loaded" - in + let validation_error = string_of_validation_error err in Event.emit_dont_wait__message_validation_error ~message_id ~validation_error ; @@ -143,7 +142,7 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = gossipsub_message_id_topic_validation ctxt proto_parameters message_id | other -> other -(** [gossipsub_app_messages_validation ctxt cryptobox head_level +(** [gossipsub_app_messages_validation ctxt cryptobox ~head_level attestation_lag ?message ~message_id ()] checks for the validity of the given message (if any) and message id. @@ -153,8 +152,8 @@ let gossipsub_message_id_validation ctxt proto_parameters message_id = {!gossipsub_message_id_validation}. Then, if a message is given, {!gossipsub_app_message_payload_validation} is used to check its validity. *) -let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters - ?message ~message_id () = +let gossipsub_app_messages_validation ctxt cryptobox ~head_level + proto_parameters ?message ~message_id () = if Node_context.is_bootstrap_node ctxt then (* 1. As bootstrap nodes advertise their profiles to controller nodes, they shouldn't receive messages or messages ids. If this @@ -210,3 +209,204 @@ let gossipsub_app_messages_validation ctxt cryptobox head_level proto_parameters | other -> (* 4. In the case the message id is not Valid. *) other + +type batch_identifier = {level : int32; slot_index : int} + +module Batch_identifier_hashable = struct + type t = batch_identifier + + let equal a b = a.level = b.level && a.slot_index = b.slot_index + + let hash = Hashtbl.hash +end + +module Batch_tbl = Hashtbl.Make (Batch_identifier_hashable) + +type batch_elem = { + index : int; + id : Types.Message_id.t; + message : Types.Message.t; +} + +type 'a batch_result = { + to_check_in_batch : batch_elem list Batch_tbl.t; + not_valid : (int * 'a) list; + (* List of indices in the original batch with the error to output for + this element. *) +} + +let triage ctxt head_level proto_parameters batch = + let to_check_in_batch = + Batch_tbl.create proto_parameters.Types.number_of_slots + in + let not_valid = + List.fold_left_i + (fun index + not_valid + ( _peer, + _topic, + (Types.Message_id.{level; slot_index; _} as id), + message, + _peers ) -> + (* Have some slack for outdated messages. *) + let slack = 4 in + if + Int32.( + sub head_level level + > of_int (proto_parameters.Types.attestation_lag + slack)) + then (index, `Outdated) :: not_valid + else + match gossipsub_message_id_validation ctxt proto_parameters id with + | `Valid -> + (* The shard is added to the right batch. + Since the message id check has already been performed, we have + the guarantee that all shards associated to a given + (level, slot_index) pair have the commitment published on the L1, + hence they all belong to the same slot, hence the shards can be + passed to the function verifying several shards simultaneously. *) + let already_sorted = + Batch_tbl.find_opt to_check_in_batch {level; slot_index} + in + let new_entry = + match already_sorted with + | None -> [{index; id; message}] + | Some list -> {index; id; message} :: list + in + Batch_tbl.replace to_check_in_batch {level; slot_index} new_entry ; + not_valid + | other -> + (* In the case the message id is not Valid. *) + (index, other) :: not_valid) + [] + batch + in + {to_check_in_batch; not_valid} + +let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch + = + if Node_context.is_bootstrap_node ctxt then + (* As bootstrap nodes advertise their profiles to controller + nodes, they shouldn't receive messages or messages ids. If this + happens, received data are considered as spam (invalid), and the remote + peer might be punished, depending on the Gossipsub implementation. *) + List.map (fun _ -> `Invalid) batch + else + let batch_size = List.length batch in + let result = Array.init batch_size (fun _ -> `Unknown) in + let {to_check_in_batch; not_valid} = + triage ctxt head_level proto_parameters batch + in + List.iter (fun (index, error) -> result.(index) <- error) not_valid ; + + let store = Node_context.get_store ctxt in + let traps_store = Store.traps store in + + (* [treat_batch] does not invalidate a whole slot when a single shard is + invalid, otherwise it would be possible for a byzantine actor to craft + a wrong message with the id of the published slot to prevent the validation + of all the valid shards associated to this slot. *) + let rec treat_batch = function + | [] -> () + | ({level; slot_index}, batch_list) :: remaining_to_treat -> ( + let shards, proofs = + List.fold_left + (fun (shards, proofs) {message; id; _} -> + let Types.Message.{share; shard_proof} = message in + let Types.Message_id.{shard_index; _} = id in + let shard = Cryptobox.{share; index = shard_index} in + (shard :: shards, shard_proof :: proofs)) + ([], []) + batch_list + in + (* We never add empty list in the to_check_in_batch table. *) + let {id = {commitment; _}; _} = + Option.value_f + ~default:(fun () -> assert false) + (List.hd batch_list) + in + let res = + if Node_context.get_disable_shard_validation ctxt then Ok () + else + Dal_metrics.sample_time + ~sampling_frequency: + Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi + cryptobox + commitment + shards + proofs) + [@profiler.wrap_f + {driver_ids = [Opentelemetry]} + (Opentelemetry_helpers.trace_slot + ~name:"verify_shards" + Types.Slot_id.{slot_level = level; slot_index})] + in + match res with + | Ok () -> + List.iter + (fun {index; id; message} -> + (* We register traps only if the message is valid. *) + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7742 + The [proto_parameters] are those for the last known finalized + level, which may differ from those of the slot level. This + will be an issue when the value of the [traps_fraction] + changes. (We cannot use {!Node_context.get_proto_parameters}, + as it is not monad-free; we'll need to use mapping from levels + to parameters.) *) + Slot_manager.maybe_register_trap + traps_store + ~traps_fraction:proto_parameters.traps_fraction + id + message ; + result.(index) <- `Valid) + batch_list ; + treat_batch remaining_to_treat + | Error err -> + let validation_error = string_of_validation_error err in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + (* Since [verify_multi] does not outputs which shards are the + invalid ones and since we do not want to invalidate legitimate + shards because a byzantine actor added some false shards, we + revalidate all the shards by half recursiveley until we have + identified the invalid shards. *) + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat) + | exception exn -> + (* Don't crash if crypto raised an exception. *) + let validation_error = Printexc.to_string exn in + Event.emit_dont_wait__batch_validation_error + ~level + ~slot_index + ~validation_error ; + let batch_size = List.length batch_list in + if batch_size = 1 then + List.iter + (fun {index; _} -> result.(index) <- `Invalid) + batch_list + else + let first_half, second_half = + List.split_n (batch_size / 2) batch_list + in + treat_batch + (({level; slot_index}, first_half) + :: ({level; slot_index}, second_half) + :: remaining_to_treat)) + in + treat_batch (Batch_tbl.to_seq to_check_in_batch |> List.of_seq) ; + Array.to_list result diff --git a/src/lib_dal_node/message_validation.mli b/src/lib_dal_node/message_validation.mli index 728bb63f66445b42a7a301d4622ff3573f9cae39..b87985ffc44778dba8b05493e0ef18bfda1f01b3 100644 --- a/src/lib_dal_node/message_validation.mli +++ b/src/lib_dal_node/message_validation.mli @@ -53,9 +53,35 @@ val gossipsub_app_messages_validation : Node_context.t -> Cryptobox.t -> - int32 -> + head_level:int32 -> Types.proto_parameters -> ?message:Types.Message.t -> message_id:Types.Message_id.t -> unit -> [> `Invalid | `Outdated | `Unknown | `Valid] + +(** [gossipsub_batch_validation ctxt cryptobox head_level + proto_parameters batch ()] validates a batch of Gossipsub messages and + their associated message ids in the context of the given DAL node. + + The validation follows the same layered approach as [gossipsub_app_messages_validation] + except that after checking the age and validity of the id of a message, + each message is affected to a sub-batch with only messages for the same + level and slot. + + The cryptographic verification is then done per sub-batch. + + This function is intended to be registered as the Gossipsub batch validation hook. +*) +val gossipsub_batch_validation : + Node_context.t -> + Cryptobox.t -> + head_level:int32 -> + Types.proto_parameters -> + (Types.Peer.t + * Types.Topic.t + * Types.Message_id.t + * Types.Message.t + * Types.Peer.Set.t) + list -> + [> `Invalid | `Outdated | `Unknown | `Valid] list diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 4a44bb40734d1543f760fa38913f5e4ff954c06a..4de2c3ad83ccc9c5ac89ec618116325c79741c74 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -178,6 +178,16 @@ module Message = struct (obj2 (req "share" Cryptobox.share_encoding) (req "shard_proof" Cryptobox.shard_proof_encoding)) + + module Cmp = struct + type nonrec t = t + + let compare t1 t2 = + Compare.or_else (Cryptobox.Share.compare t1.share t2.share) (fun () -> + Cryptobox.Proof.compare t1.shard_proof t2.shard_proof) + end + + include Compare.Make (Cmp) end module Peer = struct diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index e2f3249afb29a7fc125125140d838dff0aef1ac4..4c668b155f7f6e3425f875b70bcf8164cdfd9ca3 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -116,6 +116,8 @@ module Message : sig include PRINTABLE with type t := t include ENCODABLE with type t := t + + include COMPARABLE with type t := t end (** A peer from the point of view of gossipsub. *) diff --git a/src/lib_gossipsub/README.md b/src/lib_gossipsub/README.md index 7eba3edac12fedad8a785493ac650eb6a8624e93..9a16a6eab677f63795d65612d9eca42faa507e41 100644 --- a/src/lib_gossipsub/README.md +++ b/src/lib_gossipsub/README.md @@ -45,9 +45,12 @@ receives inputs from both the P2P and application layers, processes them using the automaton, and emits corresponding actions back to the P2P layer (messages, connections, disconnections) and the application layer (full messages). -The WORKER module is designed to be agnostic to the underlying concurrency model. +The WORKER module was designed to be agnostic to the underlying concurrency model. Rather than depending on a specific concurrency library (such as Lwt), it requires the host application to supply an I/O monad and stream implementation. +It is no longer the case since merge request [!18848](https://gitlab.com/tezos/tezos/-/merge_requests/18848) which introduces the use of `Lwt.async` in the +function `batch_accumulator` of the worker. + This design allows the application to retain control over the event loop and execution model, enabling the worker to integrate cleanly into diverse runtime environments. diff --git a/src/lib_gossipsub/gossipsub_automaton.ml b/src/lib_gossipsub/gossipsub_automaton.ml index a0cb0d5a6b55f6fc729d5d2454302cf65ea4d867..f6428e40da87a8f55eb46f1e8aa4962d80eca684 100644 --- a/src/lib_gossipsub/gossipsub_automaton.ml +++ b/src/lib_gossipsub/gossipsub_automaton.ml @@ -84,6 +84,8 @@ module Make (C : AUTOMATON_CONFIG) : type set_application_score = {peer : Peer.t; score : float} + type message_handling = Sequentially | In_batches of {time_interval : span} + (* FIXME not sure subtyping for output is useful. If it is, it is probably for few ouputs and could be removed. *) type _ output = @@ -130,6 +132,12 @@ module Make (C : AUTOMATON_CONFIG) : | Invalid_message : [`Receive_message] output | Unknown_validity : [`Receive_message] output | Outdated : [`Receive_message] output + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output | Already_joined : [`Join] output | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output | Not_joined : [`Leave] output @@ -1150,15 +1158,19 @@ module Make (C : AUTOMATON_CONFIG) : in fail Invalid_message - let handle sender topic message_id message = + let initial_message_checks {topic; message_id; sender; _} : + ( state, + Peer.Set.t, + [`Receive_message] output ) + Tezos_gossipsub__State_monad.check = let open Monad.Syntax in let*! mesh_opt = find_mesh topic in - let*? peers_in_mesh = + let** peers_in_mesh = match mesh_opt with | Some peers -> pass peers | None -> fail Not_subscribed in - let*? () = + let** () = let*! message_cache in match Message_cache.get_first_seen_time_and_senders message_id message_cache @@ -1173,29 +1185,98 @@ module Make (C : AUTOMATON_CONFIG) : in fail Already_received in - let*? () = check_message_id_valid sender topic message_id in - let*? () = check_message_valid sender topic message message_id in + let** () = check_message_id_valid sender topic message_id in + pass peers_in_mesh + + let peers_to_route sender peers_in_mesh topic = + let open Monad.Syntax in let peers = Peer.Set.remove sender peers_in_mesh in - let* () = - put_message_in_cache ~peer:(Some sender) message_id message topic - in - let* () = - update_score sender (fun stats -> - Score.first_message_delivered stats topic) - in let* direct_peers = get_direct_peers topic in - let to_route = Peer.Set.union peers direct_peers in - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 + return (Peer.Set.union peers direct_peers) - Filter out peers from which we already received the message, or an - IHave message? *) - Route_message {to_route} |> return + let handle ~batching_configuration + ({sender; topic; message_id; message} as receive_message) : + [`Receive_message] output Monad.t = + let open Monad.Syntax in + let*? peers_in_mesh = initial_message_checks receive_message in + match batching_configuration with + | Sequentially -> + let*? () = check_message_valid sender topic message message_id in + let* () = + put_message_in_cache ~peer:(Some sender) message_id message topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + let* to_route = peers_to_route sender peers_in_mesh topic in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5272 + Filter out peers from which we already received the message, or an + IHave message? *) + Route_message {to_route} |> return + | In_batches _ -> + let* to_route = peers_to_route sender peers_in_mesh topic in + return (To_include_in_batch (receive_message, to_route)) end - let handle_receive_message : - receive_message -> [`Receive_message] output Monad.t = - fun {sender; topic; message_id; message} -> - Receive_message.handle sender topic message_id message + let handle_receive_message ~batching_configuration receive_message : + [`Receive_message] output Monad.t = + Receive_message.handle ~batching_configuration receive_message + + let check_message_batch batch = + let unfolded_batch = + List.map + (fun ({sender; topic; message_id; message}, peers) -> + (sender, topic, message_id, message, peers)) + batch + in + List.combine + ~when_different_lengths:() + batch + (Message.valid_batch unfolded_batch) + + let apply_batch (batch : (receive_message * Peer.Set.t) list) : + [`Treated_batch] output Monad.t = + let open Monad.Syntax in + match check_message_batch batch with + | Error () -> + let l = + List.map + (fun (received_msg, _peers) -> (received_msg, Unknown_validity)) + batch + in + return (Batch_result l) + | Ok annotated_batch -> + let* l = + Monad.map_fold + (fun ( ( ({sender; topic; message_id; message} as received_msg), + peers ), + result ) -> + match result with + | `Valid -> + let* () = + put_message_in_cache + ~peer:(Some sender) + message_id + message + topic + in + let* () = + update_score sender (fun stats -> + Score.first_message_delivered stats topic) + in + return (received_msg, Route_message {to_route = peers}) + | `Outdated -> return (received_msg, Outdated) + | `Unknown -> return (received_msg, Unknown_validity) + | `Invalid -> + let* () = + update_score sender (fun stats -> + Score.invalid_message_delivered stats topic) + in + return (received_msg, Invalid_message)) + annotated_batch + in + return (Batch_result l) module Publish_message = struct let check_not_seen message_id = @@ -2416,7 +2497,9 @@ module Make (C : AUTOMATON_CONFIG) : "Route_message %a" Fmt.Dump.(record [field "to_route" Fun.id pp_peer_set]) to_route + | To_include_in_batch _ -> fprintf fmtr "To include in batch" | Already_received -> fprintf fmtr "Already_received" + | Batch_result _res -> fprintf fmtr "Batch_result" | Not_subscribed -> fprintf fmtr "Not_subscribed" | Already_joined -> fprintf fmtr "Already_joined" | Joining_topic {to_graft} -> diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 1c010c0ff7a2a2d67da6b3c83dc2a8f2045e1dcf..6045c6ccf61e58355f7389a255708813c02f7810 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -53,7 +53,11 @@ module type AUTOMATON_SUBCONFIG = sig end module Message : sig - include PRINTABLE + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t (** [valid] performs an application layer-level validity check on a message id and a message if given. @@ -69,6 +73,23 @@ module type AUTOMATON_SUBCONFIG = sig message_id:Message_id.t -> unit -> [`Valid | `Unknown | `Outdated | `Invalid] + + (** [valid_batch] performs an application layer-level validity check on a + message batch. + + If the [message_handling] field of the gossipsub worker is [Sequentially], + this function is never called. + This function is called when the field is [In_batches]. However, [valid] + is still called even in this case: since message ids are not batched + and [valid_batch] cannot be called without providing a message, [valid] + remains used when processing message ids without the message, especially + the [IHave] events. + + The output is a list of the same length as the input, containing the + result for each message of the batch. *) + val valid_batch : + (Peer.t * Topic.t * Message_id.t * t * Peer.Set.t) list -> + [`Valid | `Unknown | `Outdated | `Invalid] list end end @@ -547,7 +568,13 @@ module type AUTOMATON = sig end (** Module for message *) - module Message : PRINTABLE + module Message : sig + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t + end (** Module for time *) module Time : PRINTABLE @@ -619,6 +646,8 @@ module type AUTOMATON = sig type set_application_score = {peer : Peer.t; score : float} + type message_handling = Sequentially | In_batches of {time_interval : span} + (** Output produced by one of the actions below. *) type _ output = | Ihave_from_peer_with_low_score : { @@ -705,7 +734,7 @@ module type AUTOMATON = sig - Direct peers for the message's topic; - The peers in the topic's mesh minus the original sender of the message. *) | Already_received : [`Receive_message] output - (** Received a message that has already been recevied before. *) + (** Received a message that has already been received before. *) | Not_subscribed : [`Receive_message] output (** Received a message from a remote peer for a topic we are not subscribed to (called "unknown topic" in the Go implementation). *) @@ -714,6 +743,17 @@ module type AUTOMATON = sig | Unknown_validity : [`Receive_message] output (** Validity cannot be decided yet. *) | Outdated : [`Receive_message] output (** The message is outdated. *) + | To_include_in_batch : + (receive_message * Peer.Set.t) + -> [`Receive_message] output + (** The message passed all the Gossipsub checks. The worker should add this + message in the current batch. It must be noted that most checks + (especially cryptographic ones) are done when the batch is processed. *) + | Batch_result : + (receive_message * [`Receive_message] output) list + -> [`Treated_batch] output + (** Result of the processing of the batch. It contains the list of all + messages of the batch with their respective output. *) | Already_joined : [`Join] output (** Attempting to join a topic we already joined. *) | Joining_topic : {to_graft : Peer.Set.t} -> [`Join] output @@ -811,10 +851,17 @@ module type AUTOMATON = sig on this topic. *) val handle_prune : prune -> [`Prune] monad - (** [handle_receive_message { sender; topic; message_id; message }] handles - a message received from [sender] on the gossip network. The function returns - a set of peers to which the (full) message will be directly forwarded. *) - val handle_receive_message : receive_message -> [`Receive_message] monad + (** [handle_receive_message ~batching_configuration received_message] handles a + [received_message] on the gossip network. The function returns either a + rejection reason for the message or a set of peers to which: + - the (full) message will be directly forwarded if [batching_configuration] + is [Sequential]. + - the message might be forwarded after validation of the batch it will be + included in. *) + val handle_receive_message : + batching_configuration:message_handling -> + receive_message -> + [`Receive_message] monad (** [publish { topic; message_id; message }] allows to publish a message on the gossip network from the local node. The function returns a set of peers @@ -834,6 +881,12 @@ module type AUTOMATON = sig topic. *) val leave : leave -> [`Leave] monad + (** [apply_batch batch] handles a complete batch and outputs an element of the + state monad which encapsulates the list of [[`Receive_message] ouptut] + result for each entry of the batch. *) + val apply_batch : + (receive_message * Peer.Set.t) list -> [`Treated_batch] monad + (** [set_application_score {peer; score}] handles setting the application score of [peer]. If the peer is not known, this does nothing. *) val set_application_score : @@ -1183,6 +1236,7 @@ module type WORKER = sig | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * GS.Peer.Set.t) list (** [make ~events_logging ~initial_points rng limits parameters] initializes a new Gossipsub automaton with the given arguments. Then, it initializes @@ -1193,6 +1247,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?initial_points:(unit -> Point.t list) -> + ?batching_interval:GS.span -> self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 30e181cf5d0364043b81b1f42b6f273c7ab2d766..0ca396a777f70547114484d508a26f5d206cfce9 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -224,6 +224,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | P2P_input of p2p_input | App_input of app_input | Check_unknown_messages + | Process_batch of (GS.receive_message * Peer.Set.t) list module Bounded_message_map = struct (* We maintain the invariant that: @@ -269,6 +270,18 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end + module ReceiveMsgMap = Map.Make (struct + type t = GS.receive_message + + let compare (m1 : t) (m2 : t) = + Compare.or_else + (Compare.or_else (Topic.compare m1.topic m2.topic) (fun () -> + Message_id.compare m1.message_id m2.message_id)) + (fun () -> Message.compare m1.message m2.message) + end) + + type batch_state = Pending | Accumulating of Peer.Set.t ReceiveMsgMap.t + (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to communicate with the application and P2P layers. *) @@ -285,6 +298,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : unknown_validity_messages : Bounded_message_map.t; unreachable_points : int64 Point.Map.t; (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) + message_handling : GS.message_handling; } (** A worker instance is made of its status and state. *) @@ -364,7 +378,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Note that it's the responsability of the automaton modules to filter out peers based on various criteria (bad score, connection expired, ...). *) - let handle_receive_message received_message = function + let handle_receive_message (received_message : GS.receive_message) : + worker_state * [`Receive_message] GS.output -> worker_state = function | state, GS.Route_message {to_route} -> Introspection.update_count_recv_valid_app_messages state.stats `Incr ; let ({sender = _; topic; message_id; message} : GS.receive_message) = @@ -376,7 +391,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let has_joined = View.(has_joined topic @@ view state.gossip_state) in if has_joined then emit_app_output state message_with_header ; state - | state, GS.Already_received | state, GS.Not_subscribed -> state + | state, GS.Already_received + | state, GS.Not_subscribed + | state, GS.To_include_in_batch _ -> + state | state, GS.Unknown_validity -> Introspection.update_count_recv_unknown_validity_app_messages state.stats @@ -741,15 +759,81 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.leave {topic} gossip_state |> update_gossip_state state |> handle_leave topic + let handle_batch (state, GS.Batch_result batch) = + List.fold_left + (fun state (msg, out) -> handle_receive_message msg (state, out)) + state + batch + + let apply_batch_event ({gossip_state; _} as state) batch = + GS.apply_batch batch gossip_state + |> update_gossip_state state |> handle_batch + + (* This function uses Lwt.async, making the gossipsub library not independent + of the concurrency model anymore. + If independence from [Lwt] is considered useful to recover, a proposed + implementation could be: + https://gitlab.com/tezos/tezos/-/commit/a361dd827b43b601eb9de89e7187f9bcf7fc64d1 + This implementation is not the current implementation, because it has the + disadvantage of having a "ticking clock" and accumulate whatever arrives + between two ticks. It was considered nicer to have the accumulation to + start at the reception of the first shard, because due to network latencies, + the reception will most probably not align well with the clock. + + Even with current design, it happens to have the shards for a commitment + to be splitted between two batches. Increasing the frequency of this event + would probably not be an issue. *) + let batch_accumulator = + let current_batch = ref Pending in + fun output time_interval events_stream -> + match output with + | GS.To_include_in_batch (msg, peers) -> ( + match !current_batch with + | Pending -> + let open Lwt_syntax in + Lwt.async (fun () -> + let* () = Lwt_unix.sleep (GS.Span.to_float_s time_interval) in + let batch = + match !current_batch with + | Accumulating batch -> ReceiveMsgMap.bindings batch + | Pending -> [] + in + current_batch := Pending ; + Stream.push (Process_batch batch) events_stream ; + return_unit) ; + let content_map = ReceiveMsgMap.singleton msg peers in + current_batch := Accumulating content_map + | Accumulating prev_contents -> + let new_contents = + ReceiveMsgMap.update + msg + (function + | None -> Some peers + | Some prev_peers -> Some (Peer.Set.inter peers prev_peers)) + prev_contents + in + current_batch := Accumulating new_contents) + | _ -> () + (** Handling messages received from the P2P network. *) let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> - let receive_message = - {GS.sender = from_peer; topic; message_id; message} - in - (GS.handle_receive_message receive_message gossip_state - |> update_gossip_state state - |> handle_receive_message receive_message) + (let receive_message = + {GS.sender = from_peer; topic; message_id; message} + in + let batching_configuration = state.message_handling in + let new_state, output = + GS.handle_receive_message + ~batching_configuration + receive_message + gossip_state + in + (match batching_configuration with + | Sequentially -> () + | In_batches {time_interval} -> + batch_accumulator output time_interval state.events_stream) ; + update_gossip_state state (new_state, output) + |> handle_receive_message receive_message) [@profiler.span_f {verbosity = Notice} ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] @@ -858,7 +942,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : match GS.Message_id.valid message.message_id with | `Valid | `Invalid -> let state = {state with unknown_validity_messages} in - GS.handle_receive_message message state.gossip_state + GS.handle_receive_message + ~batching_configuration:state.message_handling + message + state.gossip_state |> update_gossip_state state |> handle_receive_message message |> (* Other messages are processed recursively *) @@ -899,6 +986,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state [@profiler.span_f {verbosity = Notice} ["apply_event"; "Check_unknown_messages"]] + | Process_batch batch -> apply_batch_event state batch (** A helper function that pushes events in the state *) let push e {status = _; state; self = _} = Stream.push e state.events_stream @@ -997,7 +1085,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : event_loop_promise let make ?(events_logging = fun _event -> Monad.return ()) - ?(initial_points = fun () -> []) ~self rng limits parameters = + ?(initial_points = fun () -> []) ?batching_interval ~self rng limits + parameters = { self; status = Starting; @@ -1014,6 +1103,10 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : events_logging; unknown_validity_messages = Bounded_message_map.make ~capacity:10_000; unreachable_points = Point.Map.empty; + message_handling = + (match batching_interval with + | None -> Sequentially + | Some time_interval -> In_batches {time_interval}); }; } diff --git a/src/lib_gossipsub/state_monad.ml b/src/lib_gossipsub/state_monad.ml index b35254fa5d03235fe60144cb2434b111319e469e..9caaf284d62b9cf9dc3b3b137b356fe9ad5fb5b6 100644 --- a/src/lib_gossipsub/state_monad.ml +++ b/src/lib_gossipsub/state_monad.ml @@ -43,10 +43,17 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return_pass : 'pass -> ('state, 'pass, 'fail) check val return_fail : 'fail -> ('state, 'pass, 'fail) check + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + module Syntax : sig val ( let* ) : ('state, 'a) t -> ('a -> ('state, 'b) t) -> ('state, 'b) t @@ -57,6 +64,11 @@ module type S = sig val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + val return : 'a -> ('state, 'a) t val pass : 'pass -> ('state, 'pass, 'fail) check @@ -85,6 +97,18 @@ module M : S = struct let return_fail x = `Fail x |> return + let bind_check c f = + bind c (function `Pass res -> f res | `Fail e -> return_fail e) + + let map_fold f l = + let rec bis acc state = function + | [] -> return (List.rev acc) state + | hd :: tl -> + let state, value = f hd state in + bis (value :: acc) state tl + in + fun state -> bis [] state l + module Syntax = struct let ( let* ) = bind @@ -92,6 +116,8 @@ module M : S = struct let ( let*! ) = get + let ( let** ) = bind_check + let return = return let pass = return_pass diff --git a/src/lib_gossipsub/state_monad.mli b/src/lib_gossipsub/state_monad.mli index 9466a6fa013c035b459178dee36cddd784a4d0eb..fc72d997b9b5d8870f9e009c9faf2641ffbafdf3 100644 --- a/src/lib_gossipsub/state_monad.mli +++ b/src/lib_gossipsub/state_monad.mli @@ -57,7 +57,13 @@ module type S = sig ('pass -> ('state, 'fail) t) -> ('state, 'fail) t - (** [return_pass v] is the default constuctor of [`Pass] values in the + (** [bind_check m f] is the bind function for the [('state, 'a, 'fail) t] monad. *) + val bind_check : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + + (** [return_pass v] is the default constructor of [`Pass] values in the [('state, 'pass, 'fail) check] monad. *) val return_pass : 'pass -> ('state, 'pass, 'fail) check @@ -65,6 +71,10 @@ module type S = sig [('state, 'pass, 'fail) check] monad. *) val return_fail : 'fail -> ('state, 'pass, 'fail) check + (** [map_fold f l] iteratively applies [f] to all elements of [l] updating + the state at each step. *) + val map_fold : ('a -> ('state, 'b) t) -> 'a list -> ('state, 'b list) t + (** Infix notations and/or shorter aliases for the functions above. *) module Syntax : sig (** Infix notation for {!bind}. *) @@ -79,6 +89,12 @@ module type S = sig (** Infix notation for {!get}. *) val ( let*! ) : ('state -> 'a) -> ('a -> ('state, 'b) t) -> ('state, 'b) t + (** Infix notation for {!bind_check}. *) + val ( let** ) : + ('state, 'a, 'fail) check -> + ('a -> ('state, 'pass, 'fail) check) -> + ('state, 'pass, 'fail) check + (** Re-exporting {!return} function in this sub-module. *) val return : 'a -> ('state, 'a) t diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml index 317b6853266053926c17358169e40a9faf512f1f..53e52ee10682c8afbdc5eb64e7690d0b2bf459d4 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.ml +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.ml @@ -250,8 +250,13 @@ struct let set_application_score x = Set_application_score x |> i end - let dispatch : type a. a input -> GS.state -> GS.state * a GS.output = - fun i state -> + let dispatch : + type a. + a input -> + ?batching_configuration:GS.message_handling -> + GS.state -> + GS.state * a GS.output = + fun i ?(batching_configuration = Sequentially) state -> match i with | Add_peer m -> GS.add_peer m state | Remove_peer m -> GS.remove_peer m state @@ -259,7 +264,8 @@ struct | Iwant m -> GS.handle_iwant m state | Graft m -> GS.handle_graft m state | Prune m -> GS.handle_prune m state - | Receive_message m -> GS.handle_receive_message m state + | Receive_message m -> + GS.handle_receive_message ~batching_configuration m state | Publish_message m -> GS.publish_message m state | Heartbeat -> GS.heartbeat state | Join m -> GS.join m state @@ -465,7 +471,7 @@ struct (* Construction of the trace generator. *) (* Evaluate a [raw] on an initial state yields a trace. *) - let raw_to_trace state raw = + let raw_to_trace ?batching_configuration state raw = let open M in let seq = SeqM.M.unfold next raw in let* _, _, rev_trace = @@ -474,7 +480,7 @@ struct match event with | Input i -> Time.set time ; - let state', output = dispatch i state in + let state', output = dispatch ?batching_configuration i state in let step = Transition {time; input = i; state; state'; output} in @@ -501,10 +507,10 @@ struct let seq = SeqM.M.unfold Fragment.next raw in SeqM.fold_left (fun acc event -> f event acc) init seq - let run state fragment : trace t = + let run ?batching_configuration state fragment : trace t = let open M in let* raw = Fragment.raw_generator fragment in - Fragment.raw_to_trace state raw + Fragment.raw_to_trace ?batching_configuration state raw let check_fold (type e inv) (f : transition -> inv -> (inv, e) result) init trace : (unit, e * trace) result = diff --git a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli index c0ff8fd20f8b137dd1ebf5378c679c5ee73e78e2..649c2cacb945c971fd48b7e43b8565ef61f91b8e 100644 --- a/src/lib_gossipsub/test/gossipsub_pbt_generators.mli +++ b/src/lib_gossipsub/test/gossipsub_pbt_generators.mli @@ -226,7 +226,11 @@ val fold : Fragment.t -> (event -> 'a -> 'a) -> 'a -> 'a t (** [run s0 f] constructs a {!trace} generator by running the gossipsub automaton on inputs generated from [f]. *) -val run : GS.state -> Fragment.t -> trace t +val run : + ?batching_configuration:GS.message_handling -> + GS.state -> + Fragment.t -> + trace t (** [check_fold step inv trace] folds the predicate [step] with initial invariant [inv] on [trace]. If the [step] fails with [Error e], [check_predicate] returns [Error (e, prefix)] diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml index d6038b1fe3f7f65f3d5c20238632353104522976..67a993886cbf212c6aabce582e91881892fdefc4 100644 --- a/src/lib_gossipsub/test/test_gossipsub.ml +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -50,7 +50,12 @@ let rng = let () = let open Default_limits in let default_limits = default_limits () in - Test_unit.register rng default_limits parameters ; + Test_unit.register + ~message_handlings: + [Sequentially; In_batches {time_interval = Milliseconds.of_int_ms 100}] + rng + default_limits + parameters ; Test_integration_worker.register rng default_limits parameters ; Test_pbt.register rng default_limits parameters ; Test_message_cache.register () diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 9ba07142f11e2feed35a38279365c77127fe967f..f5fcfac2adb7b37c0befd933abb0b5c09bf6d9bb 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -167,6 +167,8 @@ module Automaton_config : include String_iterable let valid ?message ~message_id () = Validity_hook.apply message message_id + + let valid_batch = List.map (fun _ -> `Valid) end end end diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 2a2243564714843120b3a98ad9caff6270ab5679..a283a47de7121d91891c29f59877f00ca527f3b4 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -75,7 +75,10 @@ let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state = view.message_cache with | None -> - Test.fail "Expected entry in message cache for message id %d" message_id + Test.fail + ~__LOC__ + "Expected entry in message cache for message id %d" + message_id | Some (_message_cache_state, message, _access) -> Check.( (message = expected_message) @@ -578,10 +581,18 @@ let test_fanout rng limits parameters = (** Tests that receiving a message for a subscribed topic: - Returns peers to publish to. - Inserts message into message cache. *) -let test_receiving_message rng limits parameters = +let test_receiving_message ~batching_configuration rng limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message" + ~title: + ("Gossipsub: Test receiving message" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receiving_message"] @@ fun () -> let topic = "test" in @@ -602,14 +613,17 @@ let test_receiving_message rng limits parameters = let message_id = 0 in (* Receive a message for a joined topic. *) let state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id; message} + state in let peers_to_route = match output with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message {to_route} -> to_route + | Route_message {to_route} | To_include_in_batch (_, to_route) -> to_route in (* Should return [degree_optimal] peers to route the message to. *) Check.( @@ -617,6 +631,18 @@ let test_receiving_message rng limits parameters = int ~error_msg:"Expected %R, got %L" ~__LOC__) ; + (* In batch mode, the message is added to cache only once the batch has been + treated. *) + let state = + match output with + | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) + | _ -> + Test.fail + ~__LOC__ + "Output should have been Route_message, Already_received or \ + To_include_in_batch" + in (* [message_id] should be added to the message cache. *) assert_in_message_cache ~__LOC__ @@ -647,10 +673,19 @@ let test_receiving_message rng limits parameters = It might be possible to simplify this setup (for instance, by having just one peer in the mesh?). *) -let test_message_duplicate_score_bug rng _limits parameters = +let test_message_duplicate_score_bug ~batching_configuration rng _limits + parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test message duplicate score bug" + ~title: + ("Gossipsub: Test message duplicate score bug" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "duplicate_score_bug"] @@ fun () -> (* Ignore decays, for simplicity; they are used when refreshing score during @@ -689,20 +724,44 @@ let test_message_duplicate_score_bug rng _limits parameters = let message = "some_data" in let state, _ = GS.handle_graft {peer = sender; topic} state in (* Receive a message for a joined topic. *) - let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + let state, output1 = + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 0; message} + state in let () = - match output with + match output1 with | Already_received | Not_subscribed | Invalid_message | Unknown_validity | Outdated -> Test.fail ~__LOC__ "Handling of received message should succeed." - | Route_message _ -> () + | Route_message _ | To_include_in_batch _ -> () in (* Send one more message so that [sender]'s P2 score is greater than its P3 score. In this way its score is positive, and [sender] is not pruned. *) - let state, _ = - GS.handle_receive_message {sender; topic; message_id = 1; message} state + let state, output2 = + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 1; message} + state + in + (* If we are in batch mode, the batch containing both messages as to be + treated. *) + let state = + match batching_configuration with + | Sequentially -> state + | In_batches _ -> + fst + (GS.apply_batch + (List.map + (function + | GS.To_include_in_batch x -> x + | _ -> + Test.fail + ~__LOC__ + "In batch mode, received message should be in batch.") + [output1; output2]) + state) in (* Send the first message again. *) let per_topic_score_limits = @@ -716,13 +775,16 @@ let test_message_duplicate_score_bug rng _limits parameters = let state, _ = GS.heartbeat state in assert_mesh_inclusion ~__LOC__ ~topic ~peer:sender ~is_included:true state ; let state, output = - GS.handle_receive_message {sender; topic; message_id = 0; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id = 0; message} + state in let () = match output with | Already_received -> () | Not_subscribed | Invalid_message | Unknown_validity | Outdated - | Route_message _ -> + | Route_message _ | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling should fail with Already_received." in let expected_score = @@ -749,10 +811,19 @@ let test_message_duplicate_score_bug rng _limits parameters = (** Tests that we do not route the message when receiving a message for an unsubscribed topic. *) -let test_receiving_message_for_unsusbcribed_topic rng limits parameters = +let test_receiving_message_for_unsusbcribed_topic ~batching_configuration rng + limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Test receiving message for unsubscribed topic" + ~title: + ("Gossipsub: Test receiving message for unsubscribed topic" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "receive_message"; "fanout"] @@ fun () -> let topic = "topic" in @@ -775,11 +846,14 @@ let test_receiving_message_for_unsusbcribed_topic rng limits parameters = let message = "some data" in let message_id = 0 in let _state, output = - GS.handle_receive_message {sender; topic; message_id; message} state + GS.handle_receive_message + ~batching_configuration + {sender; topic; message_id; message} + state in match output with | Already_received | Route_message _ | Invalid_message | Unknown_validity - | Outdated -> + | Outdated | To_include_in_batch _ -> Test.fail ~__LOC__ "Handling of received message should fail with [Not_subscribed]." @@ -2119,10 +2193,18 @@ let test_scoring_p1 rng _limits parameters = (** Test for P2 (First Message Deliveries). Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3247 *) -let test_scoring_p2 rng _limits parameters = +let test_scoring_p2 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P2" + ~title: + ("Gossipsub: Scoring P2" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p2"] @@ fun () -> let first_message_deliveries_weight = 2.0 in @@ -2153,15 +2235,18 @@ let test_scoring_p2 rng _limits parameters = let receive_message ~__LOC__ peer message_id message state = let state, output = GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in match output with | GS.Route_message _ | Already_received -> state + | To_include_in_batch out -> fst (GS.apply_batch [out] state) | _ -> Test.fail ~__LOC__ - "Output should have been Route_message or Already_received" + "Output should have been Route_message, Already_received or \ + To_include_in_batch" in (* peer 0 delivers message first *) let message_id, message = gen_message () in @@ -2230,10 +2315,18 @@ let test_scoring_p2 rng _limits parameters = Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3895 and https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L3979 *) -let test_scoring_p4 rng _limits parameters = +let test_scoring_p4 ~batching_configuration rng _limits parameters = Tezt_core.Test.register ~__FILE__ - ~title:"Gossipsub: Scoring P4" + ~title: + ("Gossipsub: Scoring P4" + ^ + match batching_configuration with + | GS.Sequentially -> "" + | GS.In_batches {time_interval} -> + Format.sprintf + " with batches of %fs" + (GS.Span.to_float_s time_interval)) ~tags:["gossipsub"; "scoring"; "p4"] @@ fun () -> let invalid_message_deliveries_weight = -2.0 in @@ -2276,6 +2369,7 @@ let test_scoring_p4 rng _limits parameters = let message_id, message = gen_message () in let state, _ = GS.handle_receive_message + ~batching_configuration {sender = peer; topic; message_id; message} state in @@ -2430,15 +2524,12 @@ let test_scoring_p7_grafts_before_backoff rng _limits parameters = (* TODO: https://gitlab.com/tezos/tezos/-/issues/5293 Add test for the described test scenario *) -let register rng limits parameters = +let register ~message_handlings rng limits parameters = test_ignore_graft_from_unknown_topic rng limits parameters ; test_handle_received_subscriptions rng limits parameters ; test_join_adds_peers_to_mesh rng limits parameters ; test_join_adds_fanout_to_mesh rng limits parameters ; test_publish_without_flood_publishing rng limits parameters ; - test_receiving_message_for_unsusbcribed_topic rng limits parameters ; - test_receiving_message rng limits parameters ; - test_message_duplicate_score_bug rng limits parameters ; test_fanout rng limits parameters ; test_handle_graft_for_joined_topic rng limits parameters ; test_handle_graft_for_not_joined_topic rng limits parameters ; @@ -2465,7 +2556,21 @@ let register rng limits parameters = test_ignore_too_many_messages_in_ihave rng limits parameters ; test_heartbeat_scenario rng limits parameters ; test_scoring_p1 rng limits parameters ; - test_scoring_p2 rng limits parameters ; - test_scoring_p4 rng limits parameters ; test_scoring_p5 rng limits parameters ; - test_scoring_p7_grafts_before_backoff rng limits parameters + test_scoring_p7_grafts_before_backoff rng limits parameters ; + List.iter + (fun batching_configuration -> + test_receiving_message ~batching_configuration rng limits parameters ; + test_message_duplicate_score_bug + ~batching_configuration + rng + limits + parameters ; + test_receiving_message_for_unsusbcribed_topic + ~batching_configuration + rng + limits + parameters ; + test_scoring_p2 ~batching_configuration rng limits parameters ; + test_scoring_p4 ~batching_configuration rng limits parameters) + message_handlings diff --git a/src/lib_stdlib/compare.ml b/src/lib_stdlib/compare.ml index e041882af1ffb4d211c2f5c86547b45685139a3c..741bd9a3bba1265612aaad900ae52ddae7c0987f 100644 --- a/src/lib_stdlib/compare.ml +++ b/src/lib_stdlib/compare.ml @@ -91,6 +91,22 @@ module List (P : COMPARABLE) = Make (struct if hd <> 0 then hd else compare xs ys end) +module Array (P : COMPARABLE) = Make (struct + type t = P.t array + + let compare a b = + let len_a = Array.length a in + let len_b = Array.length b in + let rec loop i = + if i = len_a then if i = len_b then 0 else -1 + else if i = len_b then 1 + else + let c = P.compare a.(i) b.(i) in + if c <> 0 then c else loop (i + 1) + in + loop 0 +end) + module Option (P : COMPARABLE) = Make (struct type t = P.t option diff --git a/src/lib_stdlib/compare.mli b/src/lib_stdlib/compare.mli index 424b2709a4722164986dbafe3648538ab044774c..4d851ffa7b044a1cd3e9dc77b9e6207aec21a571 100644 --- a/src/lib_stdlib/compare.mli +++ b/src/lib_stdlib/compare.mli @@ -151,6 +151,8 @@ module Q : S with type t = Q.t module List (P : COMPARABLE) : S with type t = P.t list +module Array (P : COMPARABLE) : S with type t = P.t array + module Option (P : COMPARABLE) : S with type t = P.t option module Result (Ok : COMPARABLE) (Error : COMPARABLE) :