From 0d7c578c768337d963b504e0f7eb7e704dce1edb Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 16 Jun 2025 15:54:27 +0100 Subject: [PATCH 1/2] Gossipsub: Add profiler --- manifest/product_octez.ml | 4 + src/lib_dal_node/daemon.ml | 24 ++++- src/lib_dal_node/gossipsub/gossipsub.ml | 6 +- src/lib_dal_node/gossipsub/gossipsub.mli | 10 ++ src/lib_gossipsub/dune | 8 +- src/lib_gossipsub/gossipsub_profiler.ml | 18 ++++ src/lib_gossipsub/gossipsub_profiler.mli | 17 ++++ src/lib_gossipsub/gossipsub_worker.ml | 121 ++++++++++++++++------- src/lib_gossipsub/tezos_gossipsub.ml | 1 + src/lib_gossipsub/tezos_gossipsub.mli | 2 + 10 files changed, 169 insertions(+), 42 deletions(-) create mode 100644 src/lib_gossipsub/gossipsub_profiler.ml create mode 100644 src/lib_gossipsub/gossipsub_profiler.mli diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 529337f7c11e..408ed562bc2f 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -2877,6 +2877,7 @@ let _tezt_self_tests = ~deps:[tezt_lib |> open_ |> open_ ~m:"Base"; tezt_tezos |> open_] let octez_gossipsub = + let (PPX {preprocess; preprocessor_deps}) = ppx_profiler in octez_lib "tezos-gossipsub" ~path:"src/lib_gossipsub" @@ -2891,7 +2892,10 @@ let octez_gossipsub = octez_stdlib_unix |> open_; octez_stdlib |> open_; octez_version; + octez_profiler |> open_; ] + ~preprocess + ~preprocessor_deps let _octez_gossipsub_test = tezt diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index f4b9d7f74f00..46df78f06d02 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -31,13 +31,22 @@ module Profiler = struct Dal_profiler.create_reset_block_section Dal_profiler.dal_profiler end +module Gossipsub_profiler = struct + include + (val Tezos_profiler.Profiler.wrap Gossipsub.Profiler.gossipsub_profiler) + + let[@warning "-32"] reset_block_section = + Gossipsub.Profiler.(create_reset_block_section gossipsub_profiler) +end + let[@warning "-32"] may_start_profiler data_dir = match Tezos_profiler_unix.Profiler_instance.selected_backends () with | Some backends -> List.iter (fun Tezos_profiler_unix.Profiler_instance.{instance_maker; _} -> let profiler_maker = instance_maker ~directory:data_dir in - Dal_profiler.init profiler_maker) + Dal_profiler.init profiler_maker ; + Gossipsub.Profiler.init profiler_maker) backends | None -> () @@ -83,7 +92,12 @@ let on_new_finalized_head ctxt cctxt crawler = match next_final_head with | None -> Lwt.fail_with "L1 crawler lib shut down" | Some (finalized_block_hash, finalized_shell_header) -> - () [@profiler.reset_block_section finalized_block_hash] ; + () + [@profiler.reset_block_section + {profiler_module = Profiler} finalized_block_hash] ; + () + [@profiler.reset_block_section + {profiler_module = Gossipsub_profiler} finalized_block_hash] ; let* () = (Block_handler.new_finalized_head ctxt @@ -93,7 +107,9 @@ let on_new_finalized_head ctxt cctxt crawler = finalized_block_hash finalized_shell_header ~launch_time - [@profiler.record_s {verbosity = Notice} "new_finalized_head"]) + [@profiler.record_s + {verbosity = Notice; profiler_module = Profiler} + "new_finalized_head"]) in loop () in @@ -267,7 +283,6 @@ let run ?(disable_logging = false) ?(disable_shard_validation = false) in Tezos_base_unix.Internal_event_unix.init ~config:internal_events () in - () [@profiler.overwrite may_start_profiler data_dir] ; let*! () = Event.emit_starting_node () in let* ({ rpc_addr; @@ -586,5 +601,6 @@ let run ?(disable_logging = false) ?(disable_shard_validation = false) (* Start never-ending monitoring daemons *) let version = Tezos_version_value.Bin_version.octez_version_string in let*! () = Event.emit_node_is_ready ~network_name ~version in + () [@profiler.overwrite may_start_profiler data_dir] ; let* () = daemonize [on_new_finalized_head ctxt cctxt crawler] in return_unit diff --git a/src/lib_dal_node/gossipsub/gossipsub.ml b/src/lib_dal_node/gossipsub/gossipsub.ml index 1ab97097c0b6..f8963d0b413f 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.ml +++ b/src/lib_dal_node/gossipsub/gossipsub.ml @@ -24,14 +24,12 @@ (* *) (*****************************************************************************) -include Gs_interface - module Worker = struct module Config = Gs_interface.Worker_config module Default_parameters = Gs_default_parameters module Logging = Gs_logging include Gs_interface.Worker_instance - module Validate_message_hook = Validate_message_hook + module Validate_message_hook = Gs_interface.Validate_message_hook end module Transport_layer = struct @@ -232,3 +230,5 @@ end module Transport_layer_hooks = Gs_transport_connection let version = Transport_layer_interface.version + +module Profiler = Tezos_gossipsub.Profiler diff --git a/src/lib_dal_node/gossipsub/gossipsub.mli b/src/lib_dal_node/gossipsub/gossipsub.mli index 737f486eb276..8f2fd94aedf4 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.mli +++ b/src/lib_dal_node/gossipsub/gossipsub.mli @@ -199,3 +199,13 @@ end (** [version ~network_name] returns the current version of the P2P. *) val version : network_name:Distributed_db_version.Name.t -> Network_version.t + +module Profiler : sig + open Tezos_profiler.Profiler + + val gossipsub_profiler : profiler + + val init : (name:string -> instance option) -> unit + + val create_reset_block_section : profiler -> Block_hash.t * metadata -> unit +end diff --git a/src/lib_gossipsub/dune b/src/lib_gossipsub/dune index 3816ea4e11a5..11fea8053909 100644 --- a/src/lib_gossipsub/dune +++ b/src/lib_gossipsub/dune @@ -14,7 +14,10 @@ octez-libs.base.unix octez-libs.stdlib-unix octez-libs.stdlib - octez-libs.version) + octez-libs.version + octez-libs.octez-profiler) + (preprocess (pps octez-libs.ppx_profiler)) + (preprocessor_deps (env_var TEZOS_PPX_PROFILER)) (flags (:standard) -open Tezos_error_monad @@ -22,4 +25,5 @@ -open Tezos_base.TzPervasives -open Tezos_base_unix -open Tezos_stdlib_unix - -open Tezos_stdlib)) + -open Tezos_stdlib + -open Tezos_profiler)) diff --git a/src/lib_gossipsub/gossipsub_profiler.ml b/src/lib_gossipsub/gossipsub_profiler.ml new file mode 100644 index 000000000000..0f6c2dd51a3c --- /dev/null +++ b/src/lib_gossipsub/gossipsub_profiler.ml @@ -0,0 +1,18 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Profiler + +let gossipsub_profiler = unplugged () + +let init profiler_maker = + match profiler_maker ~name:"gossipsub" with + | Some instance -> plug gossipsub_profiler instance + | None -> () + +let create_reset_block_section = + Profiler.section_maker Block_hash.equal Block_hash.to_b58check diff --git a/src/lib_gossipsub/gossipsub_profiler.mli b/src/lib_gossipsub/gossipsub_profiler.mli new file mode 100644 index 000000000000..3336010602ae --- /dev/null +++ b/src/lib_gossipsub/gossipsub_profiler.mli @@ -0,0 +1,17 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2025 Trilitech *) +(* *) +(*****************************************************************************) + +open Profiler + +(** Unplugged GossipSub worker profiler. *) +val gossipsub_profiler : profiler + +(** Plug the GossipSub worker profiler given its name and Profiler instance option. *) +val init : (name:string -> instance option) -> unit + +(** Creates a function to reset the block section *) +val create_reset_block_section : profiler -> Block_hash.t * metadata -> unit diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 1389e3520bc1..30e181cf5d03 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -24,6 +24,8 @@ (* *) (*****************************************************************************) +module Profiler = (val Profiler.wrap Gossipsub_profiler.gossipsub_profiler) + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5165 Add coverage unit tests *) @@ -745,52 +747,87 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let receive_message = {GS.sender = from_peer; topic; message_id; message} in - GS.handle_receive_message receive_message gossip_state + (GS.handle_receive_message receive_message gossip_state |> update_gossip_state state - |> handle_receive_message receive_message + |> handle_receive_message receive_message) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Message_with_header"]] | Graft {topic} -> let graft : GS.graft = {peer = from_peer; topic} in - GS.handle_graft graft gossip_state + (GS.handle_graft graft gossip_state |> update_gossip_state state - |> handle_graft from_peer topic + |> handle_graft from_peer topic) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Graft"]] | Subscribe {topic} -> let subscribe : GS.subscribe = {peer = from_peer; topic} in - GS.handle_subscribe subscribe gossip_state - |> update_gossip_state state |> handle_subscribe + (GS.handle_subscribe subscribe gossip_state + |> update_gossip_state state |> handle_subscribe) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Subscribe"]] | Unsubscribe {topic} -> let unsubscribe : GS.unsubscribe = {peer = from_peer; topic} in - GS.handle_unsubscribe unsubscribe gossip_state - |> update_gossip_state state |> handle_unsubscribe + (GS.handle_unsubscribe unsubscribe gossip_state + |> update_gossip_state state |> handle_unsubscribe) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Unsubscribe"]] | IHave {topic; message_ids} -> (* The automaton should guarantee that the list is not empty. *) let ihave : GS.ihave = {peer = from_peer; topic; message_ids} in - GS.handle_ihave ihave gossip_state - |> update_gossip_state state |> handle_ihave ihave + (GS.handle_ihave ihave gossip_state + |> update_gossip_state state |> handle_ihave ihave) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "IHave"]] | IWant {message_ids} -> (* The automaton should guarantee that the list is not empty. *) let iwant : GS.iwant = {peer = from_peer; message_ids} in - GS.handle_iwant iwant gossip_state - |> update_gossip_state state |> handle_iwant iwant + (GS.handle_iwant iwant gossip_state + |> update_gossip_state state |> handle_iwant iwant) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "IWant"]] | Prune {topic; px; backoff} -> let prune : GS.prune = {peer = from_peer; topic; px; backoff} in - GS.handle_prune prune gossip_state + (GS.handle_prune prune gossip_state |> update_gossip_state state - |> handle_prune ~self ~from_peer px + |> handle_prune ~self ~from_peer px) + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Prune"]] | Ping -> (* We treat [Ping] message as a no-op and return the current [state]. *) state + [@profiler.span_f + {verbosity = Notice} + ["apply_event"; "P2P_input"; "In_message"; "Ping"]] (** Handling events received from P2P layer. *) let apply_p2p_event ~self ({gossip_state; _} as state) = function - | New_connection {peer; direct; trusted; bootstrap} -> - GS.add_peer {direct; outbound = trusted; peer; bootstrap} gossip_state - |> update_gossip_state state - |> handle_new_connection peer ~bootstrap ~trusted - | Disconnection {peer} -> - GS.remove_peer {peer} gossip_state - |> update_gossip_state state |> handle_disconnection peer + | New_connection {peer; direct; trusted; bootstrap} -> ( + ((GS.add_peer {direct; outbound = trusted; peer; bootstrap} gossip_state + |> update_gossip_state state + |> handle_new_connection peer ~bootstrap ~trusted) + [@profiler.span_f + {verbosity = Notice} ["apply_event"; "P2P_input"; "New_connection"]]) + ) + | Disconnection {peer} -> ( + ((GS.remove_peer {peer} gossip_state + |> update_gossip_state state |> handle_disconnection peer) + [@profiler.span_f + {verbosity = Notice} ["apply_event"; "P2P_input"; "Disconnection"]])) | In_message {from_peer; p2p_message} -> - apply_p2p_message ~self state from_peer p2p_message + apply_p2p_message + ~self + state + from_peer + p2p_message + [@profiler.span_f + {verbosity = Notice} ["apply_event"; "P2P_input"; "In_message"]] let rec check_unknown_messages_id state = match Bounded_message_map.remove_min state.unknown_validity_messages with @@ -838,16 +875,30 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5326 Notify the GS worker about the status of messages sent to peers. *) - | Heartbeat -> - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5170 - - Do we want to detect cases where two successive [Heartbeat] events - would be handled (e.g. because the first one is late)? *) - GS.heartbeat gossip_state |> update_gossip_state state - |> handle_heartbeat - | P2P_input event -> apply_p2p_event ~self state event - | App_input event -> apply_app_event state event - | Check_unknown_messages -> check_unknown_messages_id state + | Heartbeat -> ( + (((* TODO: https://gitlab.com/tezos/tezos/-/issues/5170 + + Do we want to detect cases where two successive [Heartbeat] events + would be handled (e.g. because the first one is late)? *) + GS.heartbeat gossip_state + |> update_gossip_state state |> handle_heartbeat) + [@profiler.span_f {verbosity = Notice} ["apply_event"; "Heartbeat"]])) + | P2P_input event -> + apply_p2p_event + ~self + state + event + [@profiler.span_f {verbosity = Notice} ["apply_event"; "P2P_input"]] + | App_input event -> + apply_app_event + state + event + [@profiler.span_f {verbosity = Notice} ["apply_event"; "App_input"]] + | Check_unknown_messages -> + check_unknown_messages_id + state + [@profiler.span_f + {verbosity = Notice} ["apply_event"; "Check_unknown_messages"]] (** A helper function that pushes events in the state *) let push e {status = _; state; self = _} = Stream.push e state.events_stream @@ -894,7 +945,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : if !shutdown then return () else let* () = events_logging event in - t.state <- apply_event ~self:t.self t.state event ; + t.state <- + (apply_event + ~self:t.self + t.state + event [@profiler.span_f {verbosity = Notice} ["apply_event"]]) ; loop t in let promise = loop t in diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 450e91a8fb11..90a3855dbf42 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -26,3 +26,4 @@ module Gossipsub_intf = Gossipsub_intf module Automaton = Gossipsub_automaton.Make module Worker = Gossipsub_worker.Make +module Profiler = Gossipsub_profiler diff --git a/src/lib_gossipsub/tezos_gossipsub.mli b/src/lib_gossipsub/tezos_gossipsub.mli index cd8032a6f804..56da9454196e 100644 --- a/src/lib_gossipsub/tezos_gossipsub.mli +++ b/src/lib_gossipsub/tezos_gossipsub.mli @@ -42,3 +42,5 @@ module Worker (C : Gossipsub_intf.WORKER_CONFIGURATION) : and module Monad = C.Monad and module Stream = C.Stream and module Point = C.Point + +module Profiler = Gossipsub_profiler -- GitLab From 1ce939bc0352e9bbedeb19c4297e47e8fb42edb6 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 16 Jun 2025 16:25:36 +0100 Subject: [PATCH 2/2] Dal_node: Add profiling call to RPC post_slot --- src/lib_dal_node/dal_node_client.ml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/lib_dal_node/dal_node_client.ml b/src/lib_dal_node/dal_node_client.ml index 7e00ba30217d..b84561ee3802 100644 --- a/src/lib_dal_node/dal_node_client.ml +++ b/src/lib_dal_node/dal_node_client.ml @@ -25,6 +25,8 @@ open Tezos_dal_node_services +module Profiler = (val Profiler.wrap Dal_profiler.dal_profiler) + class type cctxt = object inherit Tezos_rpc.Context.generic end @@ -70,4 +72,9 @@ let post_slot cctxt ?slot_index slot = method slot_index = slot_index end in - call cctxt Services.post_slot () query slot + (call + cctxt + Services.post_slot + () + query + slot [@profiler.aggregate_s {verbosity = Notice} "post_slot"]) -- GitLab