diff --git a/src/bin_dal_node/cli.ml b/src/bin_dal_node/cli.ml index 34e0a917f9a28963f24ea95be60545437bdb3297..d6cc8d17391c371c8a4764c712dd63758e84ae67 100644 --- a/src/bin_dal_node/cli.ml +++ b/src/bin_dal_node/cli.ml @@ -343,6 +343,13 @@ module Term = struct & opt (some bool) None & info ~docs ~doc ~docv:"true | false" ["fetch-trusted-setup"]) + let verbose = + let open Cmdliner in + let doc = + "Controls the verbosity of some emitted events. Default value is false." + in + Arg.(value & flag & info ~docs ~doc ["verbose"]) + (* Experimental features. *) let sqlite3_backend = @@ -364,7 +371,7 @@ module Term = struct $ public_addr $ endpoint $ metrics_addr $ attester_profile $ producer_profile $ observer_profile $ bootstrap_profile $ peers $ history_mode $ service_name $ service_namespace $ sqlite3_backend - $ fetch_trusted_setup)) + $ fetch_trusted_setup $ verbose)) end type t = Run | Config_init | Config_update | Debug_print_store_schemas @@ -522,13 +529,14 @@ type options = { service_namespace : string option; experimental_features : experimental_features; fetch_trusted_setup : bool option; + verbose : bool; } let make ~run = let run subcommand data_dir rpc_addr expected_pow listen_addr public_addr endpoint metrics_addr attesters producers observers bootstrap_flag peers history_mode service_name service_namespace sqlite3_backend - fetch_trusted_setup = + fetch_trusted_setup verbose = let run profile = run subcommand @@ -547,6 +555,7 @@ let make ~run = service_namespace; experimental_features = {sqlite3_backend}; fetch_trusted_setup; + verbose; } in let profile = Operator_profile.make ~attesters ~producers ?observers () in diff --git a/src/bin_dal_node/cli.mli b/src/bin_dal_node/cli.mli index 1d9df010f2a0ad50f0476b2ba25dce9d9a0cd41b..ce84edb74b54d10447acfed05d91450bb31160f1 100644 --- a/src/bin_dal_node/cli.mli +++ b/src/bin_dal_node/cli.mli @@ -57,7 +57,9 @@ type options = { experimental_features : experimental_features; (** Experimental features. *) fetch_trusted_setup : bool option; (** Should the trusted setup be installed if required and invalid? - In case of [None] at init it is considered as yes.*) + In case of [None] at init it is considered as yes.*) + verbose : bool; + (** Do not aggregate emitted events. Default value is false. *) } (** Subcommands that can be used by the DAL node. In the future this type diff --git a/src/bin_dal_node/configuration_file.ml b/src/bin_dal_node/configuration_file.ml index deb937903363ececa72b04e40dcfc8dc2a5168c6..732a6bd3d3429f8800c28077895c9a686f6b8c96 100644 --- a/src/bin_dal_node/configuration_file.ml +++ b/src/bin_dal_node/configuration_file.ml @@ -81,6 +81,7 @@ type t = { service_namespace : string option; experimental_features : experimental_features; fetch_trusted_setup : bool; + verbose : bool; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".tezos-dal-node" @@ -143,6 +144,7 @@ let default = service_namespace = None; experimental_features = default_experimental_features; fetch_trusted_setup = default_fetch_trusted_setup; + verbose = false; } let neighbor_encoding : neighbor Data_encoding.t = @@ -198,6 +200,7 @@ let encoding : t Data_encoding.t = service_namespace; experimental_features; fetch_trusted_setup; + verbose; } -> ( ( data_dir, rpc_addr, @@ -215,7 +218,8 @@ let encoding : t Data_encoding.t = service_name, service_namespace, experimental_features, - fetch_trusted_setup ) )) + fetch_trusted_setup, + verbose ) )) (fun ( ( data_dir, rpc_addr, listen_addr, @@ -232,7 +236,8 @@ let encoding : t Data_encoding.t = service_name, service_namespace, experimental_features, - fetch_trusted_setup ) ) -> + fetch_trusted_setup, + verbose ) ) -> { data_dir; rpc_addr; @@ -251,6 +256,7 @@ let encoding : t Data_encoding.t = service_namespace; experimental_features; fetch_trusted_setup; + verbose; }) (merge_objs (obj10 @@ -304,7 +310,7 @@ let encoding : t Data_encoding.t = ~description:"The point for the DAL node metrics server" (Encoding.option P2p_point.Id.encoding) None)) - (obj7 + (obj8 (dft "history_mode" ~description:"The history mode for the DAL node" @@ -335,7 +341,13 @@ let encoding : t Data_encoding.t = "fetch_trusted_setup" ~description:"Install trusted setup" bool - true))) + true) + (dft + "verbose" + ~description: + "Whether to emit details about frequent logging events" + bool + default.verbose))) module V0 = struct type t = { @@ -490,6 +502,7 @@ let from_v0 v0 = service_namespace = None; experimental_features = default_experimental_features; fetch_trusted_setup = true; + verbose = false; } type error += DAL_node_unable_to_write_configuration_file of string diff --git a/src/bin_dal_node/configuration_file.mli b/src/bin_dal_node/configuration_file.mli index 927998aa311f717a79ac15db32a14fb2db3b3840..3c7f90908a56b36b6b98592aae5dea7494584d8c 100644 --- a/src/bin_dal_node/configuration_file.mli +++ b/src/bin_dal_node/configuration_file.mli @@ -68,6 +68,9 @@ type t = { experimental_features : experimental_features; (** Experimental features. *) fetch_trusted_setup : bool; (** Should the trusted setup be downloaded if not found or has invalid hash. *) + verbose : bool; + (** Whether to emit detailed events for frequently received control + messages from remote peers or only a synthesis. *) } (** [default] is the default configuration. *) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 974828fba2016c3c6fdaa182cf2a3cb3f4cbb86a..f98178d717c0d82e080796acee5b4f9460fa2a01 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -807,7 +807,7 @@ let update_timing_shard_received node_ctxt shards_timing_table slot_id timing let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer - node_store node_ctxt amplificator = + node_store node_ctxt amplificator ~verbose = let open Gossipsub in let timing_table_size = 2 * proto_parameters.Types.attestation_lag @@ -865,7 +865,8 @@ let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer Transport_layer_hooks.activate gs_worker transport_layer - ~app_messages_callback:(shards_handler node_store)) + ~app_messages_callback:(shards_handler node_store) + ~verbose) (fun exn -> "[dal_node] error in Daemon.connect_gossipsub_with_p2p: " ^ Printexc.to_string exn @@ -1321,7 +1322,7 @@ let run ~data_dir ~configuration_override = Gossipsub.Worker.( make ~bootstrap_points:get_bootstrap_points - ~events_logging:Logging.event + ~events_logging:(Logging.event ~verbose:config.verbose) ~self rng limits @@ -1509,7 +1510,8 @@ let run ~data_dir ~configuration_override = transport_layer shards_store ctxt - amplificator ; + amplificator + ~verbose:config.verbose ; let*! () = Gossipsub.Transport_layer.activate ~additional_points:points transport_layer in diff --git a/src/bin_dal_node/main.ml b/src/bin_dal_node/main.ml index 3e71eb6307ea96893a87b6346845f553909eee7c..874703f52d289b69e5fa0847f69e162f3764a409 100644 --- a/src/bin_dal_node/main.ml +++ b/src/bin_dal_node/main.ml @@ -44,6 +44,7 @@ let merge service_namespace; experimental_features; fetch_trusted_setup; + verbose; } configuration = let profile = match profile with @@ -81,6 +82,7 @@ let merge merge_experimental_features experimental_features configuration.experimental_features; + verbose = configuration.verbose || verbose; } let wrap_with_error main_promise = diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index aff50c6c985e9edad9c25c17d5760ab8cc79b6db..e57488c616500552103963b555efe2c02d982373 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -56,7 +56,7 @@ module Worker : sig and type Point.t = Types.Point.t module Logging : sig - val event : event -> unit Monad.t + val event : verbose:bool -> event -> unit Monad.t end (** A hook to set or update messages and messages IDs validation @@ -196,6 +196,7 @@ module Transport_layer_hooks : sig Transport_layer.t -> app_messages_callback: (Types.Message.t -> Types.Message_id.t -> unit tzresult Lwt.t) -> + verbose:bool -> unit Lwt.t end diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index b93d584eb8c961f76391a49fec4712dbbb2f9b9e..4cbd3dbeebb654653d7dbcb98231d0d7298bd122 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -169,6 +169,26 @@ module Events = struct ("backoff", Types.Span.encoding) ("px", list P2p_peer.Id.encoding) + let prune_short = + declare_2 + ~section + ~name:(prefix "prune_short") + ~msg:"Process Prune {peer} for {count} topics" + ~level:Info + ~pp1:P2p_peer.Id.pp + ("peer", P2p_peer.Id.encoding) + ("count", Data_encoding.int31) + + let graft_short = + declare_2 + ~section + ~name:(prefix "graft_short") + ~msg:"Process Graft {peer} for {count} topics" + ~level:Info + ~pp1:P2p_peer.Id.pp + ("peer", P2p_peer.Id.encoding) + ("count", Data_encoding.int31) + let ihave = declare_3 ~section @@ -195,8 +215,71 @@ module Events = struct ("message_ids", list Types.Message_id.encoding) end -let event = +(* Bounded map, serving as cache to store shard reception timing values *) +module Frequent_messages_cache = + Aches.Vache.Map (Aches.Vache.LRU_Sloppy) (Aches.Vache.Strong) + (struct + include Types.Peer + + let hash p = P2p_peer.Id.hash p.peer_id + end) + +let emit_short_events_never_ending_promise cache frequency short_event = + let open Lwt_syntax in + let emit_short_events () = + let stats = + Frequent_messages_cache.fold + (fun key value accu -> (key, value) :: accu) + cache + [] + in + Frequent_messages_cache.clear cache ; + List.iter_s + (fun (from_peer, count_messages) -> + Events.emit short_event (from_peer.Types.Peer.peer_id, count_messages)) + stats + in + let rec loop () = + let* () = Lwt_unix.sleep frequency in + let* () = emit_short_events () in + loop () + in + let p = + Lwt.catch (fun () -> loop ()) (function _exn -> emit_short_events ()) + in + let _id : Lwt_exit.clean_up_callback_id = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ @@ fun _exit_status -> + Lwt.cancel p |> return + in + p + +let update_short_events_cache cache from_peer = + let count_messages = + Frequent_messages_cache.find_opt cache from_peer |> Option.value ~default:0 + in + Frequent_messages_cache.replace cache from_peer (count_messages + 1) ; + Lwt.return_unit + +let event ~verbose = let open Events in + (* The prune_backoff used for bootstrap DAL nodes is 10 seconds. We take half + of that value as the amount of time to wait before emitting an aggregated + graft or prune event. *) + let print_short_events_every = 5. in + let frequent_graft_messages_cache = Frequent_messages_cache.create 1000 in + let frequent_prune_messages_cache = Frequent_messages_cache.create 1000 in + let _emit_short_grafts_never_ending_promise : unit Lwt.t = + emit_short_events_never_ending_promise + frequent_graft_messages_cache + print_short_events_every + graft_short + in + let _emit_short_prunes_never_ending_promise : unit Lwt.t = + emit_short_events_never_ending_promise + frequent_prune_messages_cache + print_short_events_every + prune_short + in function | Check_unknown_messages -> emit check_unknown_messages () | Heartbeat -> emit heartbeat () @@ -217,15 +300,25 @@ let event = emit message_with_header (from_peer.peer_id, topic, message_id) | Subscribe {topic} -> emit subscribe (from_peer.peer_id, topic) | Unsubscribe {topic} -> emit unsubscribe (from_peer.peer_id, topic) - | Graft {topic} -> emit graft (from_peer.peer_id, topic) + | Graft {topic} -> + if not verbose then + update_short_events_cache + frequent_graft_messages_cache + from_peer + else emit graft (from_peer.peer_id, topic) | Prune {topic; px; backoff} -> - emit - prune - ( from_peer.peer_id, - topic, - backoff, - List.of_seq px - |> List.map (fun Types.Peer.{peer_id; _} -> peer_id) ) + if not verbose then + update_short_events_cache + frequent_prune_messages_cache + from_peer + else + emit + prune + ( from_peer.peer_id, + topic, + backoff, + List.of_seq px + |> List.map (fun Types.Peer.{peer_id; _} -> peer_id) ) | IHave {topic; message_ids} -> emit ihave (from_peer.peer_id, topic, message_ids) | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) diff --git a/src/lib_dal_node/gossipsub/gs_logging.mli b/src/lib_dal_node/gossipsub/gs_logging.mli index 1b8e61a76b0fe7661a292240c6063c351586636c..0bd320b30c0edd2d66a5cd78d5eef14cd3f0e8e8 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.mli +++ b/src/lib_dal_node/gossipsub/gs_logging.mli @@ -24,5 +24,7 @@ (* *) (*****************************************************************************) -(** [event e] logs the event [e] of the Gossipsub worker *) -val event : Gs_interface.Worker_instance.event -> unit Lwt.t +(** [event ~verbose e] logs the event [e] of the Gossipsub worker. When + [verbose] is unset, only a few messages of GS control messages are + logged. *) +val event : verbose:bool -> Gs_interface.Worker_instance.event -> unit Lwt.t diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index 0846dfcf0af04409797c569503d1e2b85473be72..b99bcde7125bf2469e5a8bd5dd7135505b5307d0 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -54,6 +54,14 @@ module Events = struct ~pp1:Worker.GS.Message_id.pp ("message_id", Types.Message_id.encoding) + let messages_notified_to_app_short = + declare_1 + ~section + ~name:(prefix "messages_notified_to_app_short") + ~msg:"Successfully notified {count} messages to the application" + ~level:Info + ("count", Data_encoding.int31) + let app_message_callback_failed = declare_2 ~section @@ -287,23 +295,46 @@ let transport_layer_inputs_handler gs_worker p2p_layer = (** This loop pops messages from application output stream and calls the given [app_messages_callback] on them. *) -let app_messages_handler gs_worker ~app_messages_callback = - let open Lwt_syntax in - let rec loop app_output_stream = - let* Worker.{message; message_id; topic = _} = - Worker.Stream.pop app_output_stream +let app_messages_handler = + (* The min block time used currently on test networks is 4 secs. We take half + of that value as the amount of time to wait before emitting a + messages_notified_to_app_short event. *) + let print_short_events_every = 2. in + let message_notified_count = ref None in + let may_emit_short_message_notified_to_app_event () = + let now = Unix.gettimeofday () in + let ((count_messages, last_reset) as new_data) = + match !message_notified_count with + | None -> (1, now) + | Some (count, last_reset) -> (count + 1, last_reset) in - let* res = app_messages_callback message message_id in - let* () = - match res with - | Ok () -> Events.(emit message_notified_to_app message_id) - | Error err -> Events.(emit app_message_callback_failed (message_id, err)) - in - loop app_output_stream + if now -. last_reset <= print_short_events_every then ( + message_notified_count := Some new_data ; + Lwt.return_unit) + else ( + message_notified_count := None ; + Events.(emit messages_notified_to_app_short count_messages)) in - Worker.app_output_stream gs_worker |> loop + fun gs_worker ~app_messages_callback ~verbose -> + let open Lwt_syntax in + let rec loop app_output_stream = + let* Worker.{message; message_id; topic = _} = + Worker.Stream.pop app_output_stream + in + let* res = app_messages_callback message message_id in + let* () = + match res with + | Ok () -> + if not verbose then may_emit_short_message_notified_to_app_event () + else Events.(emit message_notified_to_app message_id) + | Error err -> + Events.(emit app_message_callback_failed (message_id, err)) + in + loop app_output_stream + in + Worker.app_output_stream gs_worker |> loop -let activate gs_worker p2p_layer ~app_messages_callback = +let activate gs_worker p2p_layer ~app_messages_callback ~verbose = (* Register a handler to notify new P2P connections to GS. *) let () = new_connections_handler gs_worker p2p_layer @@ -315,5 +346,5 @@ let activate gs_worker p2p_layer ~app_messages_callback = [ gs_worker_p2p_output_handler gs_worker p2p_layer; transport_layer_inputs_handler gs_worker p2p_layer; - app_messages_handler gs_worker ~app_messages_callback; + app_messages_handler gs_worker ~app_messages_callback ~verbose; ] diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.mli b/src/lib_dal_node/gossipsub/gs_transport_connection.mli index e9f7c2bc8deaf1e45cb87fc7264b6c9f932e57cf..e6d0eaf1eecb9e0eda3e81659f6042e76e06b75e 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.mli +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.mli @@ -24,7 +24,7 @@ (* *) (*****************************************************************************) -(** [activate gs_worker transport_layer ~app_messages_handler] connects the +(** [activate gs_worker transport_layer ~app_messages_handler ~verbose] connects the given [gs_worker] and [transport_layer]. (Dis)connections and messages of the transport layer are forwarded to the GS worker. P2P output messages and (dis)connection requests are forwarded from the GS worker to the transport @@ -32,7 +32,9 @@ The [app_messages_handler] is invoked when some application messages are put by the Gossipsub worker in the application output stream. -*) + + The [verbose] flag controls the amount of events produced for some frequent + GS messages like the notification of messages to the application layer. *) val activate : Gs_interface.Worker_instance.t -> ( Gs_interface.Worker_instance.p2p_message, @@ -41,4 +43,5 @@ val activate : P2p.t -> app_messages_callback: (Types.Message.t -> Types.Message_id.t -> unit tzresult Lwt.t) -> + verbose:bool -> unit Lwt.t diff --git a/tezt/lib_tezos/dal_node.ml b/tezt/lib_tezos/dal_node.ml index 4553bb62ca72d5547f9285902306718b9e2433f8..44f77e1b68f03fac7c542a3f81be7a3a775fa8c2 100644 --- a/tezt/lib_tezos/dal_node.ml +++ b/tezt/lib_tezos/dal_node.ml @@ -388,7 +388,7 @@ let run ?env ?event_level node = ?env ?event_level node - ["run"; "--data-dir"; node.persistent_state.data_dir] + ["run"; "--verbose"; "true"; "--data-dir"; node.persistent_state.data_dir] let run ?(wait_ready = true) ?env ?event_level node = let* () = run ?env ?event_level node in