From 52203fdacf5eb1b0c6d2d4ae5f9c21006062c0a4 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 12 Nov 2025 17:18:47 +0000 Subject: [PATCH] Alpha/Baker: Add profiling for DAL attestable slots worker --- .../lib_delegate/baking_actions.ml | 14 ++++-- .../lib_delegate/baking_profiler.ml | 3 ++ .../dal_attestable_slots_worker.ml | 45 +++++++++++++++++-- 3 files changed, 54 insertions(+), 8 deletions(-) diff --git a/src/proto_alpha/lib_delegate/baking_actions.ml b/src/proto_alpha/lib_delegate/baking_actions.ml index f1e8140cc916..d7077fec6d4f 100644 --- a/src/proto_alpha/lib_delegate/baking_actions.ml +++ b/src/proto_alpha/lib_delegate/baking_actions.ml @@ -544,10 +544,16 @@ let may_get_dal_content state consensus_vote = ~default_value:None (fun _dal_node_rpc_ctxt -> let*! dal_attestable_slots = - Dal_attestable_slots_worker.get_dal_attestable_slots - state.global_state.dal_attestable_slots_worker - ~delegate_id - ~attestation_level:level + (Dal_attestable_slots_worker.get_dal_attestable_slots + state.global_state.dal_attestable_slots_worker + ~delegate_id + ~attestation_level:level + [@profiler.record_s + {verbosity = Debug} + (Format.asprintf + "get_dal_attestable_slots - delegate_id : %a" + Delegate_id.pp + delegate_id)]) in process_dal_rpc_result state delegate_id level round dal_attestable_slots) diff --git a/src/proto_alpha/lib_delegate/baking_profiler.ml b/src/proto_alpha/lib_delegate/baking_profiler.ml index 1f476de4d111..e1ab2d1af4ee 100644 --- a/src/proto_alpha/lib_delegate/baking_profiler.ml +++ b/src/proto_alpha/lib_delegate/baking_profiler.ml @@ -11,6 +11,8 @@ let nonce_profiler = unplugged () let operation_worker_profiler = unplugged () +let dal_attestable_slots_worker_profiler = unplugged () + let node_rpc_profiler = unplugged () (* This is the main profiler for the baker *) @@ -24,6 +26,7 @@ let all_profilers = [ ("nonce", [nonce_profiler]); ("op_worker", [operation_worker_profiler]); + ("dal_slots_worker", [dal_attestable_slots_worker_profiler]); ("node_rpc", [node_rpc_profiler]); ("baker", [baker_profiler; environment_profiler]); ] diff --git a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml index c19136dac688..1fbd736af672 100644 --- a/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml +++ b/src/proto_alpha/lib_delegate/dal_attestable_slots_worker.ml @@ -8,6 +8,9 @@ open Baking_state_types open Tezos_dal_node_services +module Profiler = + (val Profiler.wrap Baking_profiler.dal_attestable_slots_worker_profiler) + module Events = struct include Internal_event.Simple @@ -157,7 +160,12 @@ let update_cache_backfill_payload state ~delegate_id ~backfill_payload = let E.{slot_ids; no_shards_attestation_levels} = backfill_payload in List.iter (fun slot_id -> - update_cache_with_attestable_slot state ~delegate_id ~slot_id) + (update_cache_with_attestable_slot + state + ~delegate_id + ~slot_id + [@profiler.record_f + {verbosity = Debug} "update_cache_with_attestable_slot"])) slot_ids ; List.iter (fun attestation_level -> @@ -206,7 +214,12 @@ let rec consume_stream state stream_handle ~delegate_id = Delegate_id.Table.remove state.streams delegate_id ; return_unit | Some (E.Attestable_slot {slot_id}) -> - update_cache_with_attestable_slot state ~delegate_id ~slot_id ; + update_cache_with_attestable_slot + state + ~delegate_id + ~slot_id + [@profiler.aggregate_f + {verbosity = Debug} "update_cache_with_attestable_slot"] ; consume_stream state ~delegate_id stream_handle | Some (No_shards_assigned {attestation_level}) -> update_cache_no_shards_assigned state ~delegate_id ~attestation_level ; @@ -235,7 +248,15 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = List.filter_map_p (fun delegate_id -> let* res = - Node_rpc.monitor_attestable_slots dal_node_rpc_ctxt ~delegate_id + (Node_rpc.monitor_attestable_slots + dal_node_rpc_ctxt + ~delegate_id + [@profiler.record_s + {verbosity = Info} + (Format.asprintf + "monitor_attestable_slots : %a" + Delegate_id.pp + delegate_id)]) in match res with | Ok (stream, stopper) -> return_some (delegate_id, {stream; stopper}) @@ -258,7 +279,18 @@ let subscribe_to_new_streams state dal_node_rpc_ctxt ~delegate_ids_to_add = (fun (delegate_id, stream_handle) -> Lwt.dont_wait (fun () -> - let* () = consume_backfill_stream state ~delegate_id stream_handle in + let* () = + (consume_backfill_stream + state + ~delegate_id + stream_handle + [@profiler.record_s + {verbosity = Info} + (Format.asprintf + "consume_backfill_stream : %a" + Delegate_id.pp + delegate_id)]) + in consume_stream state ~delegate_id stream_handle) (fun exn -> Events.( @@ -283,6 +315,11 @@ let update_streams_subscriptions state dal_node_rpc_ctxt ~delegate_ids = let get_dal_attestable_slots state ~delegate_id ~attestation_level = let open Lwt_syntax in + () [@profiler.stop] ; + () + [@profiler.record + {verbosity = Notice} + (Format.sprintf "attestation_level : %ld" attestation_level)] ; match Level_map.find_opt state.cache attestation_level with | None -> let* () = Events.(emit no_attestable_slot_at_level attestation_level) in -- GitLab