From 6d16d5aec5639f8373b9f9757c06be5284e49af3 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 9 Oct 2025 23:11:28 +0200 Subject: [PATCH 01/11] baker/operation_worker: fix baker shutdown --- .../lib_delegate/operation_worker.ml | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 1df6ccc4365f..14a101c626a1 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -38,14 +38,24 @@ module Events = struct let pp_int_64 fmt = Format.fprintf fmt "%Ld" - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) let ended = declare_1 @@ -741,13 +751,17 @@ let run ?(monitor_node_operations = true) (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -755,7 +769,13 @@ let run ?(monitor_node_operations = true) () in match result with - | Error err -> Events.(emit loop_failed err) + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 | Ok (head, operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () @@ -812,16 +832,11 @@ let run ?(monitor_node_operations = true) (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = -- GitLab From 43157db0d3fe797b4717baefdfbf11ac8445bc60 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 09:26:20 +0200 Subject: [PATCH 02/11] baker/operation_worker: refresh monitor_operations RPC on timeout --- .../lib_delegate/operation_worker.ml | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 14a101c626a1..3d1bd1d0532f 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -57,6 +57,17 @@ module Events = struct ~pp1:Format.pp_print_string ("msg", Data_encoding.string) + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) + let ended = declare_1 ~section @@ -799,10 +810,32 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + (* TODO: make this value round dependent *) + let stream_timeout = 20. in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -816,7 +849,7 @@ let run ?(monitor_node_operations = true) in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; -- GitLab From 3d5e39571afdb7c62712cbdff0b34581ee9798de Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 12:50:36 +0200 Subject: [PATCH 03/11] baker/operation_worker: RPC timeout depend of the round_duration --- devtools/testnet_experiment_tools/tool_alpha.ml | 14 +++++++++++++- src/proto_alpha/lib_delegate/baking_lib.ml | 9 ++++++++- src/proto_alpha/lib_delegate/baking_scheduling.ml | 10 +++++----- src/proto_alpha/lib_delegate/baking_scheduling.mli | 1 + src/proto_alpha/lib_delegate/operation_worker.ml | 10 ++++++---- src/proto_alpha/lib_delegate/operation_worker.mli | 8 ++++++-- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/devtools/testnet_experiment_tools/tool_alpha.ml b/devtools/testnet_experiment_tools/tool_alpha.ml index 8f6fa38c75e7..ea69a2c2add5 100644 --- a/devtools/testnet_experiment_tools/tool_alpha.ml +++ b/devtools/testnet_experiment_tools/tool_alpha.ml @@ -248,16 +248,28 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let open Lwt_result_syntax in let chain = cctxt#chain in let monitor_node_operations = monitor_node_mempool in + let* chain_id = Shell_services.Chain.chain_id cctxt ~chain () in + let* constants = + Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) + in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt ?synchronize + ~constants ~chain config operation_worker ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index a71be658395f..46cba2c0d65a 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -57,8 +57,14 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +73,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 97b9dc591e43..e604e3d4b151 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -616,8 +616,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -628,7 +628,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -732,7 +731,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1015,7 +1013,8 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = Operation_worker.run ~round_durations cctxt in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1029,6 +1028,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 5707a7fea4a0..41030065dc63 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 3d1bd1d0532f..452c7152431b 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -749,7 +749,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) +let run ?(monitor_node_operations = true) ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -787,7 +787,7 @@ let run ?(monitor_node_operations = true) degraded state or retry indefinitely, we shut it down explicitly. *) let* () = Events.(emit node_unreachable_crash ()) in Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 - | Ok (head, operation_stream, op_stream_stopper) -> + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -810,8 +810,10 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; - (* TODO: make this value round dependent *) - let stream_timeout = 20. in + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = let* result = Lwt.pick diff --git a/src/proto_alpha/lib_delegate/operation_worker.mli b/src/proto_alpha/lib_delegate/operation_worker.mli index a76a818fb910..325923937a4f 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.mli +++ b/src/proto_alpha/lib_delegate/operation_worker.mli @@ -55,13 +55,17 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~round_durations cctxt] spawns an operation + worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : - ?monitor_node_operations:bool -> #Protocol_client_context.full -> t Lwt.t + ?monitor_node_operations:bool -> + round_durations:Protocol.Alpha_context.Round.round_durations -> + #Protocol_client_context.full -> + t Lwt.t (** {1 Utilities} *) -- GitLab From 8a7586931b4d9746b88565378f9b012e9601db53 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 14:08:43 +0200 Subject: [PATCH 04/11] 023_PtSeouLo/baker/operation_worker: fix baker shutdown Porting to proto 023_PtSeouLo df5198c7eed268022a4189f4f2fca89935162236 - baker/operation_worker: fix baker shutdown --- .../lib_delegate/operation_worker.ml | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 9d31ac0a019a..737daf308b27 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -36,14 +36,24 @@ module Events = struct let pp_int = Format.pp_print_int - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) let ended = declare_1 @@ -733,13 +743,17 @@ let run ?(monitor_node_operations = true) ~constants (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -747,7 +761,13 @@ let run ?(monitor_node_operations = true) ~constants () in match result with - | Error err -> Events.(emit loop_failed err) + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 | Ok (head, operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () @@ -804,16 +824,11 @@ let run ?(monitor_node_operations = true) ~constants (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = -- GitLab From d411906263e19c2470dde1744091f8df00f803f2 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 14:08:57 +0200 Subject: [PATCH 05/11] 023_PtSeouLo/baker/operation_worker: refresh monitor_operations RPC on timeout Porting to proto 023_PtSeouLo 75a6f8edc8fe978f7d4b69b01a7997bd7835b3f3 - baker/operation_worker: refresh monitor_operations RPC on timeout --- .../lib_delegate/operation_worker.ml | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 737daf308b27..4fa31a9c1702 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -55,6 +55,17 @@ module Events = struct ~pp1:Format.pp_print_string ("msg", Data_encoding.string) + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) + let ended = declare_1 ~section @@ -791,10 +802,32 @@ let run ?(monitor_node_operations = true) ~constants state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + (* TODO: make this value round dependent *) + let stream_timeout = 20. in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -808,7 +841,7 @@ let run ?(monitor_node_operations = true) ~constants in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; -- GitLab From d0a04e8f99e4cfb2c5b53805c806fd0172c2fff2 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 18:13:47 +0200 Subject: [PATCH 06/11] 023_PtSeouLo/baker/operation_worker: RPC timeout depend of the round_duration Porting to proto 023_PtSeouLo 93e5eec596d567d37c9cc35b9d6222c8fa8990b6 - baker/operation_worker: RPC timeout depend of the round_duration --- .../testnet_experiment_tools/tool_023_PtSeouLo.ml | 14 +++++++++++++- src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml | 13 ++++++++++++- .../lib_delegate/baking_scheduling.ml | 12 +++++++----- .../lib_delegate/baking_scheduling.mli | 1 + .../lib_delegate/operation_worker.ml | 10 ++++++---- .../lib_delegate/operation_worker.mli | 8 +++++--- 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml index 4e3faee5456b..9f20c8ab86d5 100644 --- a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml +++ b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml @@ -253,8 +253,18 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~round_durations + ~constants + cctxt in Baking_scheduling.create_initial_state cctxt @@ -262,7 +272,9 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~chain config operation_worker + ~constants ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml index ccfe3895e162..7d118ae47b55 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml @@ -57,8 +57,18 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~constants + ~round_durations + cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +77,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index 10fd988b5893..329ca0c9ebc8 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -618,8 +618,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -630,7 +630,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -734,7 +733,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1007,7 +1005,10 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run ~constants cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = + Operation_worker.run ~constants ~round_durations cctxt + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1021,6 +1022,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index 5707a7fea4a0..41030065dc63 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 4fa31a9c1702..874a8f15a46d 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -740,7 +740,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) ~constants +let run ?(monitor_node_operations = true) ~constants ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -779,7 +779,7 @@ let run ?(monitor_node_operations = true) ~constants degraded state or retry indefinitely, we shut it down explicitly. *) let* () = Events.(emit node_unreachable_crash ()) in Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 - | Ok (head, operation_stream, op_stream_stopper) -> + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -802,8 +802,10 @@ let run ?(monitor_node_operations = true) ~constants state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; - (* TODO: make this value round dependent *) - let stream_timeout = 20. in + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = let* result = Lwt.pick diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli index 944bfbd54fb9..aa29853afa64 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli @@ -55,16 +55,18 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~constants ~round_durations cctxt] spawns an + operation worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : ?monitor_node_operations:bool -> - constants:Constants.t -> + constants:Protocol.Alpha_context.Constants.t -> + round_durations:Protocol.Alpha_context.Round.round_durations -> #Protocol_client_context.full -> - t Lwt.t + t Environment.Lwt.t (** {1 Utilities} *) -- GitLab From 086ea6a77ec7de39d938bb85befa40a2ecf016ee Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 23 Oct 2025 10:05:36 +0200 Subject: [PATCH 07/11] 024_PsD5wVTJ/baker/operation_worker: fix baker shutdown Porting to proto 024_PsD5wVTJ 6d16d5aec5639f8373b9f9757c06be5284e49af3 - baker/operation_worker: fix baker shutdown --- .../lib_delegate/operation_worker.ml | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml index 1df6ccc4365f..14a101c626a1 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml @@ -38,14 +38,24 @@ module Events = struct let pp_int_64 fmt = Format.fprintf fmt "%Ld" - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) let ended = declare_1 @@ -741,13 +751,17 @@ let run ?(monitor_node_operations = true) (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -755,7 +769,13 @@ let run ?(monitor_node_operations = true) () in match result with - | Error err -> Events.(emit loop_failed err) + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 | Ok (head, operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () @@ -812,16 +832,11 @@ let run ?(monitor_node_operations = true) (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = -- GitLab From 42ee284ac9b5f1648996651a9887bef8bd78c9b2 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 23 Oct 2025 10:05:58 +0200 Subject: [PATCH 08/11] 024_PsD5wVTJ/baker/operation_worker: refresh monitor_operations RPC on timeout Porting to proto 024_PsD5wVTJ 43157db0d3fe797b4717baefdfbf11ac8445bc60 - baker/operation_worker: refresh monitor_operations RPC on timeout --- .../lib_delegate/operation_worker.ml | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml index 14a101c626a1..3d1bd1d0532f 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml @@ -57,6 +57,17 @@ module Events = struct ~pp1:Format.pp_print_string ("msg", Data_encoding.string) + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) + let ended = declare_1 ~section @@ -799,10 +810,32 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + (* TODO: make this value round dependent *) + let stream_timeout = 20. in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -816,7 +849,7 @@ let run ?(monitor_node_operations = true) in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; -- GitLab From c5ad4fbf773399ac0e57067b5cbf80b8d6242e08 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Thu, 23 Oct 2025 10:26:38 +0200 Subject: [PATCH 09/11] 024_PsD5wVTJ/baker/operation_worker: RPC timeout depend of the round_duration --- .../testnet_experiment_tools/tool_024_PsD5wVTJ.ml | 14 +++++++++++++- src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml | 9 ++++++++- .../lib_delegate/baking_scheduling.ml | 10 +++++----- .../lib_delegate/baking_scheduling.mli | 1 + .../lib_delegate/operation_worker.ml | 10 ++++++---- .../lib_delegate/operation_worker.mli | 8 ++++++-- 6 files changed, 39 insertions(+), 13 deletions(-) diff --git a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml index adb11d52843c..07d62098f806 100644 --- a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml +++ b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml @@ -249,16 +249,28 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let open Lwt_result_syntax in let chain = cctxt#chain in let monitor_node_operations = monitor_node_mempool in + let* chain_id = Shell_services.Chain.chain_id cctxt ~chain () in + let* constants = + Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) + in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt ?synchronize + ~constants ~chain config operation_worker ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml index a71be658395f..46cba2c0d65a 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml @@ -57,8 +57,14 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +73,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml index 97b9dc591e43..e604e3d4b151 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml @@ -616,8 +616,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -628,7 +628,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -732,7 +731,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1015,7 +1013,8 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = Operation_worker.run ~round_durations cctxt in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1029,6 +1028,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli index 5707a7fea4a0..41030065dc63 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml index 3d1bd1d0532f..452c7152431b 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml @@ -749,7 +749,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) +let run ?(monitor_node_operations = true) ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -787,7 +787,7 @@ let run ?(monitor_node_operations = true) degraded state or retry indefinitely, we shut it down explicitly. *) let* () = Events.(emit node_unreachable_crash ()) in Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 - | Ok (head, operation_stream, op_stream_stopper) -> + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -810,8 +810,10 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; - (* TODO: make this value round dependent *) - let stream_timeout = 20. in + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = let* result = Lwt.pick diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli index a76a818fb910..325923937a4f 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli @@ -55,13 +55,17 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~round_durations cctxt] spawns an operation + worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : - ?monitor_node_operations:bool -> #Protocol_client_context.full -> t Lwt.t + ?monitor_node_operations:bool -> + round_durations:Protocol.Alpha_context.Round.round_durations -> + #Protocol_client_context.full -> + t Lwt.t (** {1 Utilities} *) -- GitLab From 81e90c86c795a170743d2da9a937e5685fa4a4da Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Mon, 13 Oct 2025 15:49:07 +0200 Subject: [PATCH 10/11] tezt/baker_test: add a test for operation worker shutdown --- tezt/lib_tezos/agnostic_baker.ml | 9 +++++++ tezt/lib_tezos/agnostic_baker.mli | 3 +++ tezt/tests/baker_test.ml | 41 ++++++++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 1 deletion(-) diff --git a/tezt/lib_tezos/agnostic_baker.ml b/tezt/lib_tezos/agnostic_baker.ml index 42b609c2339b..564e254c99e8 100644 --- a/tezt/lib_tezos/agnostic_baker.ml +++ b/tezt/lib_tezos/agnostic_baker.ml @@ -321,6 +321,15 @@ let wait_for_ready agnostic_baker = resolver :: agnostic_baker.persistent_state.pending_ready ; check_event agnostic_baker "starting baker daemon" promise +let wait_for_termination (baker : t) : unit Lwt.t = + match baker.status with + | Not_running -> Lwt.return_unit + | Running {event_loop_promise = Some p; _} -> p + | Running {event_loop_promise = None; _} -> + invalid_arg + "Agnostic_baker.wait_for_termination called before Agnostic_baker.run \ + returned" + let init ?env ?runner ?(path = Uses.path Constant.octez_agnostic_baker) ?name ?color ?event_level ?event_pipe ?event_sections_levels ?(delegates = []) ?votefile ?liquidity_baking_toggle_vote ?force_apply_from_round ?remote_mode diff --git a/tezt/lib_tezos/agnostic_baker.mli b/tezt/lib_tezos/agnostic_baker.mli index 7568c53329ee..103e3e12d641 100644 --- a/tezt/lib_tezos/agnostic_baker.mli +++ b/tezt/lib_tezos/agnostic_baker.mli @@ -38,6 +38,9 @@ val wait_for : ?where:string -> t -> string -> (JSON.t -> 'a option) -> 'a Lwt.t *) val wait_for_ready : t -> unit Lwt.t +(* Wait for agnostic baker termination. *) +val wait_for_termination : t -> unit Lwt.t + (** Raw events. *) type event = {name : string; value : JSON.t; timestamp : float} diff --git a/tezt/tests/baker_test.ml b/tezt/tests/baker_test.ml index d597d63009b6..5c8b5236dcc6 100644 --- a/tezt/tests/baker_test.ml +++ b/tezt/tests/baker_test.ml @@ -1273,6 +1273,44 @@ let aggregated_operations_retrival_from_block_content = in unit +let unable_to_reach_node_mempool = + Protocol.register_test + ~__FILE__ + ~title:"Baker shuts down when unable to reach the node mempool" + ~tags:[team; "operation_worker"; "shutdown"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + ~supports:Protocol.(From_protocol 023) + @@ fun protocol -> + log_step 1 "Initialize a node with the mempool disabled" ; + let* node, client = + Client.init_with_protocol + ~nodes_args:[Connections 0; Synchronisation_threshold 0; Disable_mempool] + `Client + ~timestamp:Now + ~protocol + () + in + log_step 2 "Run a baker" ; + let* baker = Agnostic_baker.init node client in + log_step 3 "Wait for baker termination" ; + let* outcome = + Lwt.choose + [ + (let* () = Agnostic_baker.wait_for_termination baker in + return `Terminated); + (* Saves some CI time by failing after 2 minutes if the baker fails to + shut down as expected.*) + (let* () = Lwt_unix.sleep 120. in + return `Timeout); + ] + in + match outcome with + | `Terminated -> unit + | `Timeout -> + Test.fail + "The baker failed to shut down after being unable to reach the node \ + mempool for 2 minutes" + let register ~protocols = check_node_version_check_bypass_test protocols ; check_node_version_allowed_test protocols ; @@ -1290,4 +1328,5 @@ let register ~protocols = prequorum_check_levels protocols ; attestations_aggregation_on_reproposal_local_context protocols ; attestations_aggregation_on_reproposal_remote_node protocols ; - aggregated_operations_retrival_from_block_content protocols + aggregated_operations_retrival_from_block_content protocols ; + unable_to_reach_node_mempool protocols -- GitLab From 071d6eef29e00db9621cfa19c579a079cef031c3 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Mon, 13 Oct 2025 17:36:35 +0200 Subject: [PATCH 11/11] tezt/baker_test: add a test for stream refresh on timeout --- tezt/tests/baker_test.ml | 64 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 63 insertions(+), 1 deletion(-) diff --git a/tezt/tests/baker_test.ml b/tezt/tests/baker_test.ml index 5c8b5236dcc6..ec617254ace3 100644 --- a/tezt/tests/baker_test.ml +++ b/tezt/tests/baker_test.ml @@ -1311,6 +1311,67 @@ let unable_to_reach_node_mempool = "The baker failed to shut down after being unable to reach the node \ mempool for 2 minutes" +let stream_is_refreshed_on_timeout = + Protocol.register_test + ~__FILE__ + ~title:"Operation stream is refreshed on timeout" + ~tags:[team; "operation_worker"; "timeout"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + ~supports:Protocol.(From_protocol 023) + @@ fun protocol -> + log_step 1 "Initialize a node with mainnet constants" ; + let* parameter_file = + Protocol.write_parameter_file + ~base:(Either.Right (protocol, Some Constants_mainnet)) + [] + in + let* node, client = + Client.init_with_protocol + `Client + ~protocol + ~parameter_file + ~timestamp:Now + () + in + log_step 2 "Run a baker" ; + let* baker = + Agnostic_baker.init + ~delegates:Constant.[activator.public_key_hash] + node + client + in + log_step 3 "Bake 2 levels" ; + let* () = + Client.bake_for_and_wait ~keys:[] ~minimal_timestamp:true ~count:2 client + in + (* Since the node will neither switch heads nor forward new operations, the + operation worker is expected to refresh the monitor_operations stream after + the equivalent of two rounds of inactivity. *) + log_step 4 "Wait for stream_timeout event" ; + let wait_for_stream_timeout () = + Agnostic_baker.wait_for + baker + "monitor_operations_stream_timeout.v0" + (fun _ -> Some ()) + in + let* waiter = + Lwt.choose + [ + (let* () = wait_for_stream_timeout () in + return `Stream_timeout); + (* Saves some CI time by failing after 2 minutes if the baker fails to + refresh the stream as expected.*) + (let* () = Lwt_unix.sleep 120. in + return `Timeout); + ] + in + match waiter with + | `Stream_timeout -> unit + | `Timeout -> + Test.fail + "The baker did not refresh the streamed RPC after prolonged inactivity \ + from the mempool" + let register ~protocols = check_node_version_check_bypass_test protocols ; check_node_version_allowed_test protocols ; @@ -1329,4 +1390,5 @@ let register ~protocols = attestations_aggregation_on_reproposal_local_context protocols ; attestations_aggregation_on_reproposal_remote_node protocols ; aggregated_operations_retrival_from_block_content protocols ; - unable_to_reach_node_mempool protocols + unable_to_reach_node_mempool protocols ; + stream_is_refreshed_on_timeout protocols -- GitLab