diff --git a/src/lib_agnostic_baker/commands.ml b/src/lib_agnostic_baker/commands.ml index 08096dbf989b5fa81831dd5405e1e3fe06432e96..3e87a264b7fe734748a164a13fbde6b1f7658203 100644 --- a/src/lib_agnostic_baker/commands.ml +++ b/src/lib_agnostic_baker/commands.ml @@ -124,8 +124,10 @@ module Dal = struct let ignore_topics = arg_list_to_clic ignore_topics_arg + let batching_configuration = arg_to_clic batching_configuration_arg + let args = - Tezos_clic.args24 + Tezos_clic.args25 data_dir config_file rpc_addr @@ -150,6 +152,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration let commands = let open Tezos_clic in @@ -184,7 +187,8 @@ module Dal = struct verbose, ignore_l1_config_peers, disable_amplification, - ignore_topics ) + ignore_topics, + batching_configuration ) _cctxt -> let attester_profile = Option.value ~default:[] attester_profile in @@ -218,6 +222,7 @@ module Dal = struct ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration in match options with | Ok options -> Cli.run cmd options diff --git a/src/lib_dal_node/cli.ml b/src/lib_dal_node/cli.ml index fbb4db92f842868ecd71ead87f3540ab89dfe9a0..a74ac644282930895cfbb2b6ec3a1ec0fe543edf 100644 --- a/src/lib_dal_node/cli.ml +++ b/src/lib_dal_node/cli.ml @@ -573,6 +573,30 @@ module Term = struct let ignore_topics = arg_list_to_cmdliner ignore_topics_arg + let batching_configuration_arg = + let open Configuration_file in + let pp fmt = function + | Disabled -> Format.fprintf fmt "disabled" + | Enabled {time_interval} -> Format.fprintf fmt "%d" time_interval + in + let parse = function + | "disabled" -> Ok Disabled + | s -> ( + try Ok (Enabled {time_interval = int_of_string s}) + with Failure _ -> Error "Unrecognized int") + in + make_arg + ~parse + ~doc: + "The time (in milliseconds) for which shards are accumulated by the \ + gossipsub automaton before triggering the validation of the batch. \ + \"disabled\" to deactivate the batching of shards." + ~placeholder:"INT|\"disabled\"" + ~pp + "batching-time-interval" + + let batching_configuration = arg_to_cmdliner batching_configuration_arg + let term process = Cmdliner.Term.( ret @@ -582,7 +606,7 @@ module Term = struct $ operator_profile $ observer_profile $ bootstrap_profile $ peers $ history_mode $ service_name $ service_namespace $ fetch_trusted_setup $ disable_shard_validation $ verbose $ ignore_l1_config_peers - $ disable_amplification $ ignore_topics)) + $ disable_amplification $ ignore_topics $ batching_configuration)) end type t = Run | Config_init | Config_update | Debug_print_store_schemas @@ -748,13 +772,15 @@ type options = { ignore_l1_config_peers : bool; disable_amplification : bool; ignore_topics : Signature.public_key_hash list; + batching_configuration : Configuration_file.batching_configuration option; } let cli_options_to_options data_dir config_file rpc_addr expected_pow listen_addr public_addr endpoint slots_backup_uris trust_slots_backup_uris metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation - verbose ignore_l1_config_peers disable_amplification ignore_topics = + verbose ignore_l1_config_peers disable_amplification ignore_topics + batching_configuration = let open Result_syntax in let profile = Controller_profiles.make ~attesters ~operators ?observers () in let* profile = @@ -800,6 +826,7 @@ let cli_options_to_options data_dir config_file rpc_addr expected_pow ignore_l1_config_peers; disable_amplification; ignore_topics; + batching_configuration; } let merge_experimental_features _ _configuration = () @@ -828,6 +855,7 @@ let merge ignore_l1_config_peers; disable_amplification; ignore_topics = _; + batching_configuration; } configuration = let profile = match profile with @@ -876,6 +904,10 @@ let merge configuration.ignore_l1_config_peers || ignore_l1_config_peers; disable_amplification = configuration.disable_amplification || disable_amplification; + batching_configuration = + Option.value + ~default:configuration.batching_configuration + batching_configuration; } let wrap_with_error main_promise = @@ -977,7 +1009,7 @@ let commands = metrics_addr attesters operators observers bootstrap_flag peers history_mode service_name service_namespace fetch_trusted_setup disable_shard_validation verbose ignore_l1_config_peers - disable_amplification ignore_pkhs = + disable_amplification ignore_pkhs batching_configuration = match cli_options_to_options data_dir @@ -1004,6 +1036,7 @@ let commands = ignore_l1_config_peers disable_amplification ignore_pkhs + batching_configuration with | Ok options -> main_run subcommand options | Error msg -> `Error msg diff --git a/src/lib_dal_node/cli.mli b/src/lib_dal_node/cli.mli index 809e481855417a218ff92325a52c2c2b530e4ffc..c1e746cfa71d7fcfe5fd30e71debce4ada63dd32 100644 --- a/src/lib_dal_node/cli.mli +++ b/src/lib_dal_node/cli.mli @@ -101,6 +101,8 @@ module Term : sig val verbose_switch : switch val ignore_topics_arg : Signature.public_key_hash arg_list + + val batching_configuration_arg : Configuration_file.batching_configuration arg end (** {2 Command-line options} *) @@ -154,6 +156,9 @@ type options = { (** Disable amplification. Default value is false. *) ignore_topics : Signature.public_key_hash list; (** Do not distribute shards of these pkhs. *) + batching_configuration : Configuration_file.batching_configuration option; + (** The configuration used for batching verification of received shards + via GossipSub to save cryptographic computation. *) } (** Subcommands that can be used by the DAL node. In the future this type @@ -186,6 +191,7 @@ val cli_options_to_options : bool -> bool -> Signature.public_key_hash list -> + Configuration_file.batching_configuration option -> (options, bool * string) result val run : t -> options -> unit tzresult Lwt.t diff --git a/src/lib_dal_node/configuration_file.ml b/src/lib_dal_node/configuration_file.ml index 61ed3a9667961eb5c727ff3de6ebe86b12df2b64..fb43e51c911477a2395024ab1ffe973b27ec9dcd 100644 --- a/src/lib_dal_node/configuration_file.ml +++ b/src/lib_dal_node/configuration_file.ml @@ -33,6 +33,8 @@ type neighbor = {addr : string; port : int} type history_mode = Rolling of {blocks : [`Auto | `Some of int]} | Full +type batching_configuration = Disabled | Enabled of {time_interval : int} + type experimental_features = unit let history_mode_encoding = @@ -64,6 +66,27 @@ let history_mode_encoding = (fun () -> Full); ] +let batching_configuration_encoding = + let open Data_encoding in + union + [ + case + ~title:"disabled" + ~description:"" + (Tag 0) + unit + (function Disabled -> Some () | Enabled _ -> None) + (fun () -> Disabled); + case + ~title:"enabled" + ~description:"" + (Tag 1) + int31 + (function + | Disabled -> None | Enabled {time_interval} -> Some time_interval) + (fun time_interval -> Enabled {time_interval}); + ] + type t = { data_dir : string; rpc_addr : P2p_point.Id.t; @@ -85,6 +108,7 @@ type t = { verbose : bool; ignore_l1_config_peers : bool; disable_amplification : bool; + batching_configuration : batching_configuration; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".tezos-dal-node" @@ -127,6 +151,19 @@ let default_experimental_features = () let default_fetch_trusted_setup = true +(* By default, when a shard is received, we wait for 0.1 seconds for other + shards of the same commitment before launching the cryptographic validation + of the shards. + Since there shards are supposed to be received several levels in advance, + the risk that this 0.1 second delay makes the validation happen too late + is very low. + It also slows down gossiping a bit, since messages are advertised only after + validation, so if a message has to go through several nodes before reaching + its final destination, the waiting delay accumulates and may be of a few + seconds. It looks fine with 8 blocks of attestation lag and 6 seconds block + time but if those values are reduced a lot, this might become an issue.*) +let default_batching_configuration = Enabled {time_interval = 100} + let default = { data_dir = default_data_dir; @@ -149,6 +186,7 @@ let default = verbose = false; ignore_l1_config_peers = false; disable_amplification = false; + batching_configuration = default_batching_configuration; } let uri_encoding : Uri.t Data_encoding.t = @@ -192,48 +230,51 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; } -> - ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) )) - (fun ( ( data_dir, - rpc_addr, - listen_addr, - public_addr, - peers, - expected_pow, - endpoint, - slots_backup_uris, - trust_slots_backup_uris, - metrics_addr ), - ( history_mode, - profile, - version, - service_name, - service_namespace, - experimental_features, - fetch_trusted_setup, - verbose, - ignore_l1_config_peers, - disable_amplification ) ) + ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration )) + (fun ( ( ( data_dir, + rpc_addr, + listen_addr, + public_addr, + peers, + expected_pow, + endpoint, + slots_backup_uris, + trust_slots_backup_uris, + metrics_addr ), + ( history_mode, + profile, + version, + service_name, + service_namespace, + experimental_features, + fetch_trusted_setup, + verbose, + ignore_l1_config_peers, + disable_amplification ) ), + batching_configuration ) -> { data_dir; @@ -256,109 +297,118 @@ let encoding : t Data_encoding.t = verbose; ignore_l1_config_peers; disable_amplification; + batching_configuration; }) (merge_objs - (obj10 - (dft - "data-dir" - ~description:"Location of the data dir" - string - default_data_dir) - (dft - "rpc-addr" - ~description:"RPC address" - P2p_point.Id.encoding - default_rpc_addr) - (dft - "net-addr" - ~description:"P2P listening address of this node" - P2p_point.Id.encoding - default_listen_addr) - (dft - "public-addr" - ~description:"P2P public address of this node" - P2p_point.Id.encoding - default_public_addr) - (dft - "peers" - ~description:"P2P addresses of remote peers" - (list string) - default_peers) - (dft - "expected-pow" - ~description:"Expected P2P identity's PoW" - float - default_expected_pow) - (dft - "endpoint" - ~description:"The Tezos node endpoint" - uri_encoding - default_endpoint) - (dft - "slots_backup_uris" - ~description:"Optional HTTP endpoints to fetch missing slots from." - (list uri_encoding) - []) - (dft - "trust_slots_backup_uris" - ~description: - "Whether to trust the data downlaoded from the provided HTTP \ - backup URIs." - bool - false) - (dft - "metrics-addr" - ~description:"The point for the DAL node metrics server" - (Encoding.option P2p_point.Id.encoding) - None)) - (obj10 - (dft - "history_mode" - ~description:"The history mode for the DAL node" - history_mode_encoding - default_history_mode) - (dft - "profiles" - ~description:"The Octez DAL node profiles" - Profile_manager.unresolved_encoding - Profile_manager.Empty) - (req "version" ~description:"The configuration file version" int31) - (dft - "service_name" - ~description:"Name of the service" - Data_encoding.string - default.service_name) - (dft - "service_namespace" - ~description:"Namespace for the service" - Data_encoding.string - default.service_namespace) - (dft - "experimental_features" - ~description:"Experimental features" - experimental_features_encoding - default_experimental_features) - (dft - "fetch_trusted_setup" - ~description:"Install trusted setup" - bool - true) - (dft - "verbose" - ~description: - "Whether to emit details about frequent logging events" - bool - default.verbose) - (dft - "ignore_l1_config_peers" - ~description:"Ignore the boot(strap) peers provided by L1" - bool - default.ignore_l1_config_peers) + (merge_objs + (obj10 + (dft + "data-dir" + ~description:"Location of the data dir" + string + default_data_dir) + (dft + "rpc-addr" + ~description:"RPC address" + P2p_point.Id.encoding + default_rpc_addr) + (dft + "net-addr" + ~description:"P2P listening address of this node" + P2p_point.Id.encoding + default_listen_addr) + (dft + "public-addr" + ~description:"P2P public address of this node" + P2p_point.Id.encoding + default_public_addr) + (dft + "peers" + ~description:"P2P addresses of remote peers" + (list string) + default_peers) + (dft + "expected-pow" + ~description:"Expected P2P identity's PoW" + float + default_expected_pow) + (dft + "endpoint" + ~description:"The Tezos node endpoint" + uri_encoding + default_endpoint) + (dft + "slots_backup_uris" + ~description: + "Optional HTTP endpoints to fetch missing slots from." + (list uri_encoding) + []) + (dft + "trust_slots_backup_uris" + ~description: + "Whether to trust the data downlaoded from the provided HTTP \ + backup URIs." + bool + false) + (dft + "metrics-addr" + ~description:"The point for the DAL node metrics server" + (Encoding.option P2p_point.Id.encoding) + None)) + (obj10 + (dft + "history_mode" + ~description:"The history mode for the DAL node" + history_mode_encoding + default_history_mode) + (dft + "profiles" + ~description:"The Octez DAL node profiles" + Profile_manager.unresolved_encoding + Profile_manager.Empty) + (req "version" ~description:"The configuration file version" int31) + (dft + "service_name" + ~description:"Name of the service" + Data_encoding.string + default.service_name) + (dft + "service_namespace" + ~description:"Namespace for the service" + Data_encoding.string + default.service_namespace) + (dft + "experimental_features" + ~description:"Experimental features" + experimental_features_encoding + default_experimental_features) + (dft + "fetch_trusted_setup" + ~description:"Install trusted setup" + bool + true) + (dft + "verbose" + ~description: + "Whether to emit details about frequent logging events" + bool + default.verbose) + (dft + "ignore_l1_config_peers" + ~description:"Ignore the boot(strap) peers provided by L1" + bool + default.ignore_l1_config_peers) + (dft + "disable_amplification" + ~description:"Disable amplification" + bool + default.disable_amplification))) + (obj1 (dft - "disable_amplification" - ~description:"Disable amplification" - bool - default.disable_amplification))) + "batching_configuration" + ~description:"Set the batching delay for shard verification" + batching_configuration_encoding + default.batching_configuration))) type error += DAL_node_unable_to_write_configuration_file of string diff --git a/src/lib_dal_node/configuration_file.mli b/src/lib_dal_node/configuration_file.mli index 920aa5f6bca2d770e6184af00a9a8f5f3959e7fc..92aa24545ddaec4015a0cabd7a28d07d916141ec 100644 --- a/src/lib_dal_node/configuration_file.mli +++ b/src/lib_dal_node/configuration_file.mli @@ -34,6 +34,15 @@ type history_mode = profile. *) | Full (** [Full] keeps the shards forever *) +(** The configuration of the validation of shards in batch mode.*) +type batching_configuration = + | Disabled + (** [Disabled] enforces the validation of shards one by one at reception time. *) + | Enabled of {time_interval : int} + (** [Enabled {time_interval}] accumulates messages received during + [time_interval] milliseconds and verifies all shards related to the + same commitment in the same batch in one pass. *) + (** Configuration settings for experimental features, with no backward compatibility guarantees. *) type experimental_features = unit @@ -73,6 +82,9 @@ type t = { ignore_l1_config_peers : bool; (** Ignore the boot(strap) peers provided by L1. *) disable_amplification : bool; (** Disable amplification. *) + batching_configuration : batching_configuration; + (** The configuration of the batching of the shards. + The default is [Enabled{time_interval=100}]. *) } (** [default] is the default configuration. *) diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index a82b5250f10442a68f693432a1dd4ff7ba6c53b1..8f9f353c6e9821bbc8061bc9d8254eec512fccfa 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -90,15 +90,3 @@ let bootstrap_dns_refresh_delay = 300. acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) let traps_cache_size = 50 - -(* When a shard is received, we wait for 1 second for other shards before - launching the cryptographic validation of the shards. - Since there shards are supposed to be received several levels in advance, - the risk that this 1.2 second delay makes the validation happen too late - is very low. - It also slows down gossiping a bit, since messages are advertised only after - validation, so if a message has to go through several nodes before reaching - its final destination, the waiting delay accumulates and may be of a few - seconds. It looks fine with 8 blocks of attestation lag and 8 seconds block - time but if those values are reduced a lot, this might become an issue. *) -let batch_time_interval = Types.Span.of_float_s 1.2 diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 9838529a7355d0ff1c170e37860a37e6847681ba..014c228644bb05534a08914135f7a86d09abe7aa 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -78,7 +78,3 @@ val bootstrap_dns_refresh_delay : float is acceptable since the cache is sparsely populated due to [proto_parameters.traps_fraction]. *) val traps_cache_size : int - -(** The time (in seconds) for which shards are accumulated by the gossipsub - automaton before triggering the validation of the batch. *) -val batch_time_interval : Types.Span.t diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 91e7bad0fde9c266aef91e3faaaff859a962f60f..e082b90940141d49a2936be9569fd0423ecadc23 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -437,11 +437,18 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file {peer_id = identity.peer_id; maybe_reachable_point = public_addr} in let gs_worker = + let batching_interval = + match config.batching_configuration with + | Disabled -> None + | Enabled {time_interval} -> + let time_in_second = float_of_int time_interval /. 1000. in + Some (Types.Span.of_float_s time_in_second) + in Gossipsub.Worker.( make ~initial_points:get_initial_points ~events_logging:(Logging.event ~verbose:config.verbose) - ~batching_interval:Constants.batch_time_interval + ?batching_interval ~self rng limits @@ -547,12 +554,19 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir ~config_file Node_context.warn_if_attesters_not_delegates ctxt profile | _ -> return_unit in - Gossipsub.Worker.Validate_message_hook.set_batch - (Message_validation.gossipsub_batch_validation - ctxt - cryptobox - ~head_level - proto_parameters) ; + let () = + match config.batching_configuration with + | Enabled _ -> + Gossipsub.Worker.Validate_message_hook.set_batch + (Message_validation.gossipsub_batch_validation + ctxt + cryptobox + ~head_level + proto_parameters) + | Disabled -> () + in + (* Even if the batch validation is activated, one has to register a per message + validation for the validation of message id. *) Gossipsub.Worker.Validate_message_hook.set (Message_validation.gossipsub_app_messages_validation ctxt