diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 46df78f06d0252c06539479210359d27ec062b22..0728fb08b8d2073bc1449e48f133687b3173e30a 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -131,7 +131,6 @@ let daemonize handlers = let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer node_store node_ctxt amplificator ~verbose = - let open Gossipsub in let timing_table_size = 2 * proto_parameters.Types.attestation_lag * proto_parameters.cryptobox_parameters.number_of_shards @@ -150,51 +149,65 @@ let connect_gossipsub_with_p2p proto_parameters gs_worker transport_layer Types.Message_id.{commitment; shard_index; level; slot_index; _} -> let open Lwt_result_syntax in let slot_id : Types.slot_id = {slot_level = level; slot_index} in - let* () = - Seq.return {Cryptobox.share; index = shard_index} - |> save_and_notify slot_id |> Errors.to_tzresult - in - let number_of_shards = - proto_parameters.cryptobox_parameters.number_of_shards - in - (* Introduce a new store read at each received shard. Not sure it can be - a problem, though *) - let* number_of_already_stored_shards = - Store.Shards.count_values node_store slot_id - in - let slot_metrics = - Dal_metrics.update_timing_shard_received - (Node_context.get_cryptobox node_ctxt) - shards_timing_table - slot_id - ~number_of_already_stored_shards - ~number_of_shards - in - match - Profile_manager.get_profiles @@ Node_context.get_profile_ctxt node_ctxt - with - | Controller profile - when Controller_profiles.is_observed_slot slot_index profile -> ( - match amplificator with - | None -> - let*! () = - if not disable_amplification then - Event.emit_amplificator_uninitialized () - else Lwt.return_unit - in - return_unit - | Some amplificator -> - assert (not disable_amplification) ; - Amplificator.try_amplification - commitment - slot_metrics - slot_id - amplificator) - | _ -> return_unit + (let* () = + (Seq.return {Cryptobox.share; index = shard_index} + |> save_and_notify slot_id |> Errors.to_tzresult) + [@profiler.aggregate_s + {verbosity = Notice; profiler_module = Profiler} "save_and_notify"] + in + let number_of_shards = + proto_parameters.cryptobox_parameters.number_of_shards + in + (* Introduce a new store read at each received shard. Not sure it can be + a problem, though *) + let* number_of_already_stored_shards = + (Store.Shards.count_values + node_store + slot_id + [@profiler.aggregate_s + {verbosity = Notice; profiler_module = Profiler} "count_values"]) + in + let slot_metrics = + (Dal_metrics.update_timing_shard_received + (Node_context.get_cryptobox node_ctxt) + shards_timing_table + slot_id + ~number_of_already_stored_shards + ~number_of_shards + [@profiler.aggregate_f + {verbosity = Notice; profiler_module = Profiler} + "update_timing_shard_received"]) + in + match + Profile_manager.get_profiles @@ Node_context.get_profile_ctxt node_ctxt + with + | Controller profile + when Controller_profiles.is_observed_slot slot_index profile -> ( + match amplificator with + | None -> + let*! () = + if not disable_amplification then + Event.emit_amplificator_uninitialized () + else Lwt.return_unit + in + return_unit + | Some amplificator -> + assert (not disable_amplification) ; + Amplificator.try_amplification + commitment + slot_metrics + slot_id + amplificator + [@profiler.aggregate_s + {verbosity = Notice; profiler_module = Profiler} + "try_amplification"]) + | _ -> return_unit) + [@profiler.aggregate_s + {verbosity = Notice; profiler_module = Profiler} "shards_handler"] in Lwt.dont_wait (fun () -> - Transport_layer_hooks.activate + Gossipsub.Transport_layer_hooks.activate gs_worker transport_layer ~app_messages_callback:(shards_handler node_store) diff --git a/src/lib_dal_node/store.ml b/src/lib_dal_node/store.ml index 3af3871875039284ceb95706ce526dfbe47de1c3..4cd3c7e0a68f9a808b45e47624d7ab44cb92520d 100644 --- a/src/lib_dal_node/store.ml +++ b/src/lib_dal_node/store.ml @@ -23,6 +23,8 @@ (* *) (*****************************************************************************) +module Profiler = (val Profiler.wrap Dal_profiler.dal_profiler) + module Version = struct type t = int @@ -186,12 +188,22 @@ module Shards = struct Seq.ES.iter (fun {Cryptobox.index; share} -> let* exists = - KVS.value_exists shards_store file_layout slot_id index + (KVS.value_exists + shards_store + file_layout + slot_id + index [@profiler.aggregate_s {verbosity = Notice} "value_exists"]) in if exists then return_unit else let* () = - KVS.write_value shards_store file_layout slot_id index share + (KVS.write_value + shards_store + file_layout + slot_id + index + share + [@profiler.aggregate_s {verbosity = Notice} "write_value"]) in let () = Dal_metrics.shard_stored () in let*! () =