From 4aa4a0b59e00b2dee8ec774e052e8163f6d63b33 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 23 Sep 2024 14:24:56 +0100 Subject: [PATCH 1/4] Alpha: Profiler: add operation worker profiler --- src/proto_alpha/lib_delegate/baking_profiler.ml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/proto_alpha/lib_delegate/baking_profiler.ml b/src/proto_alpha/lib_delegate/baking_profiler.ml index 2dc54c31b5e2..465171bb13d0 100644 --- a/src/proto_alpha/lib_delegate/baking_profiler.ml +++ b/src/proto_alpha/lib_delegate/baking_profiler.ml @@ -9,11 +9,14 @@ open Profiler let nonce_profiler = unplugged () +let operation_worker_profiler = unplugged () + let node_rpc_profiler = unplugged () let init profiler_maker = plug nonce_profiler (profiler_maker ~name:"nonce") ; - plug node_rpc_profiler (profiler_maker ~name:"node_rpc") + plug node_rpc_profiler (profiler_maker ~name:"node_rpc") ; + plug operation_worker_profiler (profiler_maker ~name:"op_worker") let create_reset_block_section profiler = let last_block = ref None in -- GitLab From 0858f789c04d5b5bd48cbb4574a19dfd1d175933 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 23 Sep 2024 14:25:13 +0100 Subject: [PATCH 2/4] Alpha: Profiler: plug operation worker profiler --- .../lib_delegate/operation_worker.ml | 75 +++++++++++++------ 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 345b3ce63d81..6a0db574bb7b 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -27,6 +27,8 @@ open Protocol_client_context open Protocol open Alpha_context +module Profiler = (val Profiler.wrap Baking_profiler.operation_worker_profiler) + module Events = struct include Internal_event.Simple @@ -253,14 +255,14 @@ type t = { let monitor_operations (cctxt : #Protocol_client_context.full) = let open Lwt_result_syntax in let* operation_stream, stream_stopper = - Alpha_block_services.Mempool.monitor_operations - cctxt - ~chain:cctxt#chain - ~validated:true - ~branch_delayed:true - ~branch_refused:false - ~refused:false - () + (Alpha_block_services.Mempool.monitor_operations + cctxt + ~chain:cctxt#chain + ~validated:true + ~branch_delayed:true + ~branch_refused:false + ~refused:false + () [@profiler.record_s "monitor_operations RPC"]) in let operation_stream = Lwt_stream.map @@ -268,11 +270,11 @@ let monitor_operations (cctxt : #Protocol_client_context.full) = operation_stream in let* shell_header = - Shell_services.Blocks.Header.shell_header - cctxt - ~chain:cctxt#chain - ~block:(`Head 0) - () + (Shell_services.Blocks.Header.shell_header + cctxt + ~chain:cctxt#chain + ~block:(`Head 0) + () [@profiler.record_s "shell_header RPC"]) in let round = match Fitness.(round_from_raw shell_header.fitness) with @@ -579,20 +581,36 @@ let update_operations_pool state (head_level, head_round) = let create ?(monitor_node_operations = true) (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in - let state = make_initial_state ~monitor_node_operations () in + let state = + (make_initial_state + ~monitor_node_operations + () [@profiler.record_f "make initial state"]) + in (* TODO should we continue forever ? *) let rec worker_loop () = - let* result = monitor_operations cctxt in + let* result = + (monitor_operations cctxt [@profiler.record_s "monitor operations"]) + in match result with | Error err -> Events.(emit loop_failed err) | Ok (head, operation_stream, op_stream_stopper) -> + () + [@profiler.stop] + [@profiler.record + Format.sprintf + "level : %ld, round : %s" + (fst head) + (Int32.to_string @@ Round.to_int32 @@ snd head)] ; let* () = Events.(emit starting_new_monitoring ()) in state.canceler <- Lwt_canceler.create () ; Lwt_canceler.on_cancel state.canceler (fun () -> - op_stream_stopper () ; - cancel_monitoring state ; + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + cancel_monitoring + state [@profiler.record_f "cancel monitoring state"] ; return_unit) ; - update_operations_pool state head ; + update_operations_pool + state + head [@profiler.record_f "update operations pool"] ; let rec loop () = let* ops = Lwt_stream.get operation_stream in match ops with @@ -600,16 +618,25 @@ let create ?(monitor_node_operations = true) (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in - op_stream_stopper () ; - let* () = reset_monitoring state in + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + let* () = + (reset_monitoring + state [@profiler.record_s "reset monitoring state"]) + in + () [@profiler.stop] ; worker_loop () | Some ops -> - state.operation_pool <- - Operation_pool.add_operations state.operation_pool ops ; - let* () = update_monitoring state ops in + (state.operation_pool <- + Operation_pool.add_operations state.operation_pool ops) + [@profiler.aggregate_f "add operations"] ; + let* () = + (update_monitoring + state + ops [@profiler.aggregate_f "update monitoring state"]) + in loop () in - loop () + (loop () [@profiler.record_s "operations processing"]) in Lwt.dont_wait (fun () -> -- GitLab From 86200c729352974fc1797428d0338088334cebd8 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 23 Sep 2024 14:26:47 +0100 Subject: [PATCH 3/4] ParisC: Profiler: plug operation worker profiler --- .../lib_delegate/baking_profiler.ml | 5 +- .../lib_delegate/operation_worker.ml | 75 +++++++++++++------ 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/proto_020_PsParisC/lib_delegate/baking_profiler.ml b/src/proto_020_PsParisC/lib_delegate/baking_profiler.ml index 2dc54c31b5e2..465171bb13d0 100644 --- a/src/proto_020_PsParisC/lib_delegate/baking_profiler.ml +++ b/src/proto_020_PsParisC/lib_delegate/baking_profiler.ml @@ -9,11 +9,14 @@ open Profiler let nonce_profiler = unplugged () +let operation_worker_profiler = unplugged () + let node_rpc_profiler = unplugged () let init profiler_maker = plug nonce_profiler (profiler_maker ~name:"nonce") ; - plug node_rpc_profiler (profiler_maker ~name:"node_rpc") + plug node_rpc_profiler (profiler_maker ~name:"node_rpc") ; + plug operation_worker_profiler (profiler_maker ~name:"op_worker") let create_reset_block_section profiler = let last_block = ref None in diff --git a/src/proto_020_PsParisC/lib_delegate/operation_worker.ml b/src/proto_020_PsParisC/lib_delegate/operation_worker.ml index 345b3ce63d81..6a0db574bb7b 100644 --- a/src/proto_020_PsParisC/lib_delegate/operation_worker.ml +++ b/src/proto_020_PsParisC/lib_delegate/operation_worker.ml @@ -27,6 +27,8 @@ open Protocol_client_context open Protocol open Alpha_context +module Profiler = (val Profiler.wrap Baking_profiler.operation_worker_profiler) + module Events = struct include Internal_event.Simple @@ -253,14 +255,14 @@ type t = { let monitor_operations (cctxt : #Protocol_client_context.full) = let open Lwt_result_syntax in let* operation_stream, stream_stopper = - Alpha_block_services.Mempool.monitor_operations - cctxt - ~chain:cctxt#chain - ~validated:true - ~branch_delayed:true - ~branch_refused:false - ~refused:false - () + (Alpha_block_services.Mempool.monitor_operations + cctxt + ~chain:cctxt#chain + ~validated:true + ~branch_delayed:true + ~branch_refused:false + ~refused:false + () [@profiler.record_s "monitor_operations RPC"]) in let operation_stream = Lwt_stream.map @@ -268,11 +270,11 @@ let monitor_operations (cctxt : #Protocol_client_context.full) = operation_stream in let* shell_header = - Shell_services.Blocks.Header.shell_header - cctxt - ~chain:cctxt#chain - ~block:(`Head 0) - () + (Shell_services.Blocks.Header.shell_header + cctxt + ~chain:cctxt#chain + ~block:(`Head 0) + () [@profiler.record_s "shell_header RPC"]) in let round = match Fitness.(round_from_raw shell_header.fitness) with @@ -579,20 +581,36 @@ let update_operations_pool state (head_level, head_round) = let create ?(monitor_node_operations = true) (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in - let state = make_initial_state ~monitor_node_operations () in + let state = + (make_initial_state + ~monitor_node_operations + () [@profiler.record_f "make initial state"]) + in (* TODO should we continue forever ? *) let rec worker_loop () = - let* result = monitor_operations cctxt in + let* result = + (monitor_operations cctxt [@profiler.record_s "monitor operations"]) + in match result with | Error err -> Events.(emit loop_failed err) | Ok (head, operation_stream, op_stream_stopper) -> + () + [@profiler.stop] + [@profiler.record + Format.sprintf + "level : %ld, round : %s" + (fst head) + (Int32.to_string @@ Round.to_int32 @@ snd head)] ; let* () = Events.(emit starting_new_monitoring ()) in state.canceler <- Lwt_canceler.create () ; Lwt_canceler.on_cancel state.canceler (fun () -> - op_stream_stopper () ; - cancel_monitoring state ; + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + cancel_monitoring + state [@profiler.record_f "cancel monitoring state"] ; return_unit) ; - update_operations_pool state head ; + update_operations_pool + state + head [@profiler.record_f "update operations pool"] ; let rec loop () = let* ops = Lwt_stream.get operation_stream in match ops with @@ -600,16 +618,25 @@ let create ?(monitor_node_operations = true) (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in - op_stream_stopper () ; - let* () = reset_monitoring state in + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + let* () = + (reset_monitoring + state [@profiler.record_s "reset monitoring state"]) + in + () [@profiler.stop] ; worker_loop () | Some ops -> - state.operation_pool <- - Operation_pool.add_operations state.operation_pool ops ; - let* () = update_monitoring state ops in + (state.operation_pool <- + Operation_pool.add_operations state.operation_pool ops) + [@profiler.aggregate_f "add operations"] ; + let* () = + (update_monitoring + state + ops [@profiler.aggregate_f "update monitoring state"]) + in loop () in - loop () + (loop () [@profiler.record_s "operations processing"]) in Lwt.dont_wait (fun () -> -- GitLab From 1ada7afddcc21c47652f120b0416accd694a348b Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 23 Sep 2024 14:28:17 +0100 Subject: [PATCH 4/4] QuebeC: Profiler: plug operation worker profiler --- .../lib_delegate/baking_profiler.ml | 5 +- .../lib_delegate/operation_worker.ml | 75 +++++++++++++------ 2 files changed, 55 insertions(+), 25 deletions(-) diff --git a/src/proto_021_PsquebeC/lib_delegate/baking_profiler.ml b/src/proto_021_PsquebeC/lib_delegate/baking_profiler.ml index 2dc54c31b5e2..465171bb13d0 100644 --- a/src/proto_021_PsquebeC/lib_delegate/baking_profiler.ml +++ b/src/proto_021_PsquebeC/lib_delegate/baking_profiler.ml @@ -9,11 +9,14 @@ open Profiler let nonce_profiler = unplugged () +let operation_worker_profiler = unplugged () + let node_rpc_profiler = unplugged () let init profiler_maker = plug nonce_profiler (profiler_maker ~name:"nonce") ; - plug node_rpc_profiler (profiler_maker ~name:"node_rpc") + plug node_rpc_profiler (profiler_maker ~name:"node_rpc") ; + plug operation_worker_profiler (profiler_maker ~name:"op_worker") let create_reset_block_section profiler = let last_block = ref None in diff --git a/src/proto_021_PsquebeC/lib_delegate/operation_worker.ml b/src/proto_021_PsquebeC/lib_delegate/operation_worker.ml index 345b3ce63d81..6a0db574bb7b 100644 --- a/src/proto_021_PsquebeC/lib_delegate/operation_worker.ml +++ b/src/proto_021_PsquebeC/lib_delegate/operation_worker.ml @@ -27,6 +27,8 @@ open Protocol_client_context open Protocol open Alpha_context +module Profiler = (val Profiler.wrap Baking_profiler.operation_worker_profiler) + module Events = struct include Internal_event.Simple @@ -253,14 +255,14 @@ type t = { let monitor_operations (cctxt : #Protocol_client_context.full) = let open Lwt_result_syntax in let* operation_stream, stream_stopper = - Alpha_block_services.Mempool.monitor_operations - cctxt - ~chain:cctxt#chain - ~validated:true - ~branch_delayed:true - ~branch_refused:false - ~refused:false - () + (Alpha_block_services.Mempool.monitor_operations + cctxt + ~chain:cctxt#chain + ~validated:true + ~branch_delayed:true + ~branch_refused:false + ~refused:false + () [@profiler.record_s "monitor_operations RPC"]) in let operation_stream = Lwt_stream.map @@ -268,11 +270,11 @@ let monitor_operations (cctxt : #Protocol_client_context.full) = operation_stream in let* shell_header = - Shell_services.Blocks.Header.shell_header - cctxt - ~chain:cctxt#chain - ~block:(`Head 0) - () + (Shell_services.Blocks.Header.shell_header + cctxt + ~chain:cctxt#chain + ~block:(`Head 0) + () [@profiler.record_s "shell_header RPC"]) in let round = match Fitness.(round_from_raw shell_header.fitness) with @@ -579,20 +581,36 @@ let update_operations_pool state (head_level, head_round) = let create ?(monitor_node_operations = true) (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in - let state = make_initial_state ~monitor_node_operations () in + let state = + (make_initial_state + ~monitor_node_operations + () [@profiler.record_f "make initial state"]) + in (* TODO should we continue forever ? *) let rec worker_loop () = - let* result = monitor_operations cctxt in + let* result = + (monitor_operations cctxt [@profiler.record_s "monitor operations"]) + in match result with | Error err -> Events.(emit loop_failed err) | Ok (head, operation_stream, op_stream_stopper) -> + () + [@profiler.stop] + [@profiler.record + Format.sprintf + "level : %ld, round : %s" + (fst head) + (Int32.to_string @@ Round.to_int32 @@ snd head)] ; let* () = Events.(emit starting_new_monitoring ()) in state.canceler <- Lwt_canceler.create () ; Lwt_canceler.on_cancel state.canceler (fun () -> - op_stream_stopper () ; - cancel_monitoring state ; + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + cancel_monitoring + state [@profiler.record_f "cancel monitoring state"] ; return_unit) ; - update_operations_pool state head ; + update_operations_pool + state + head [@profiler.record_f "update operations pool"] ; let rec loop () = let* ops = Lwt_stream.get operation_stream in match ops with @@ -600,16 +618,25 @@ let create ?(monitor_node_operations = true) (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in - op_stream_stopper () ; - let* () = reset_monitoring state in + op_stream_stopper () [@profiler.record_f "stream stopped"] ; + let* () = + (reset_monitoring + state [@profiler.record_s "reset monitoring state"]) + in + () [@profiler.stop] ; worker_loop () | Some ops -> - state.operation_pool <- - Operation_pool.add_operations state.operation_pool ops ; - let* () = update_monitoring state ops in + (state.operation_pool <- + Operation_pool.add_operations state.operation_pool ops) + [@profiler.aggregate_f "add operations"] ; + let* () = + (update_monitoring + state + ops [@profiler.aggregate_f "update monitoring state"]) + in loop () in - loop () + (loop () [@profiler.record_s "operations processing"]) in Lwt.dont_wait (fun () -> -- GitLab