diff --git a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml index 4e3faee5456b7b1ce91e92247409ff36d10291ee..9f20c8ab86d5b771285e38bb4aa712cbf2282b5f 100644 --- a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml +++ b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml @@ -253,8 +253,18 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~round_durations + ~constants + cctxt in Baking_scheduling.create_initial_state cctxt @@ -262,7 +272,9 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~chain config operation_worker + ~constants ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml index adb11d52843c3b94ee8b4913055b9e058c692cb8..07d62098f806ee1f36ff3cbb0708f7133f6db4a9 100644 --- a/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml +++ b/devtools/testnet_experiment_tools/tool_024_PsD5wVTJ.ml @@ -249,16 +249,28 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let open Lwt_result_syntax in let chain = cctxt#chain in let monitor_node_operations = monitor_node_mempool in + let* chain_id = Shell_services.Chain.chain_id cctxt ~chain () in + let* constants = + Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) + in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt ?synchronize + ~constants ~chain config operation_worker ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/devtools/testnet_experiment_tools/tool_alpha.ml b/devtools/testnet_experiment_tools/tool_alpha.ml index 8f6fa38c75e7e739d6cedb089619909790c7ec20..ea69a2c2add523fe4847e59507db9604444c6d88 100644 --- a/devtools/testnet_experiment_tools/tool_alpha.ml +++ b/devtools/testnet_experiment_tools/tool_alpha.ml @@ -248,16 +248,28 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let open Lwt_result_syntax in let chain = cctxt#chain in let monitor_node_operations = monitor_node_mempool in + let* chain_id = Shell_services.Chain.chain_id cctxt ~chain () in + let* constants = + Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) + in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt ?synchronize + ~constants ~chain config operation_worker ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml index ccfe3895e16251d931e8b7b6ca1958fa5dfb6945..7d118ae47b55363567f34832912797445bf808bb 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml @@ -57,8 +57,18 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~constants + ~round_durations + cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +77,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index 10fd988b5893b09e28ac0c3e4c66b67079c942a9..329ca0c9ebc817d00cfa8f177afd4b40502fd334 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -618,8 +618,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -630,7 +630,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -734,7 +733,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1007,7 +1005,10 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run ~constants cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = + Operation_worker.run ~constants ~round_durations cctxt + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1021,6 +1022,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index 5707a7fea4a0c92cec61ea7a57e89b58a6d82fe2..41030065dc63ea67d5b083c1152959019b22ea6d 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 9d31ac0a019a88878b8019fcabf6cc7fa4d19a81..874a8f15a46dbfd77e5fd4a02927f21b8b748d74 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -36,14 +36,35 @@ module Events = struct let pp_int = Format.pp_print_int - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) + + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) let ended = declare_1 @@ -719,7 +740,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) ~constants +let run ?(monitor_node_operations = true) ~constants ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -733,13 +754,17 @@ let run ?(monitor_node_operations = true) ~constants (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -747,8 +772,14 @@ let run ?(monitor_node_operations = true) ~constants () in match result with - | Error err -> Events.(emit loop_failed err) - | Ok (head, operation_stream, op_stream_stopper) -> + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -771,10 +802,34 @@ let run ?(monitor_node_operations = true) ~constants state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -788,7 +843,7 @@ let run ?(monitor_node_operations = true) ~constants in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; @@ -804,16 +859,11 @@ let run ?(monitor_node_operations = true) ~constants (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli index 944bfbd54fb98cba72f247e0cc8f089603c6fb23..aa29853afa64fdd307c3b1b9db0e7340a1bcdda8 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli @@ -55,16 +55,18 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~constants ~round_durations cctxt] spawns an + operation worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : ?monitor_node_operations:bool -> - constants:Constants.t -> + constants:Protocol.Alpha_context.Constants.t -> + round_durations:Protocol.Alpha_context.Round.round_durations -> #Protocol_client_context.full -> - t Lwt.t + t Environment.Lwt.t (** {1 Utilities} *) diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml index a71be658395fbe6192560140b9be9fb3d7becac4..46cba2c0d65a83d170068dea0614148dfb97b432 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_lib.ml @@ -57,8 +57,14 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +73,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml index 97b9dc591e4323af1a4d594b8504448e2f662b61..e604e3d4b151c849cccdeacabc33bc7c3f5a0745 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.ml @@ -616,8 +616,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -628,7 +628,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -732,7 +731,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1015,7 +1013,8 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = Operation_worker.run ~round_durations cctxt in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1029,6 +1028,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli index 5707a7fea4a0c92cec61ea7a57e89b58a6d82fe2..41030065dc63ea67d5b083c1152959019b22ea6d 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml index 1df6ccc4365fe9e9810c994daa3c41e70c886f6d..452c7152431b46e4865b88efeb72ba58d1db30c4 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.ml @@ -38,14 +38,35 @@ module Events = struct let pp_int_64 fmt = Format.fprintf fmt "%Ld" - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) + + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) let ended = declare_1 @@ -728,7 +749,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) +let run ?(monitor_node_operations = true) ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -741,13 +762,17 @@ let run ?(monitor_node_operations = true) (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -755,8 +780,14 @@ let run ?(monitor_node_operations = true) () in match result with - | Error err -> Events.(emit loop_failed err) - | Ok (head, operation_stream, op_stream_stopper) -> + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -779,10 +810,34 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -796,7 +851,7 @@ let run ?(monitor_node_operations = true) in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; @@ -812,16 +867,11 @@ let run ?(monitor_node_operations = true) (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = diff --git a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli index a76a818fb910812716f3036714862725e838a8e3..325923937a4f52115a31a8e6a2be7ab43079b33a 100644 --- a/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli +++ b/src/proto_024_PsD5wVTJ/lib_delegate/operation_worker.mli @@ -55,13 +55,17 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~round_durations cctxt] spawns an operation + worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : - ?monitor_node_operations:bool -> #Protocol_client_context.full -> t Lwt.t + ?monitor_node_operations:bool -> + round_durations:Protocol.Alpha_context.Round.round_durations -> + #Protocol_client_context.full -> + t Lwt.t (** {1 Utilities} *) diff --git a/src/proto_alpha/lib_delegate/baking_lib.ml b/src/proto_alpha/lib_delegate/baking_lib.ml index a71be658395fbe6192560140b9be9fb3d7becac4..46cba2c0d65a83d170068dea0614148dfb97b432 100644 --- a/src/proto_alpha/lib_delegate/baking_lib.ml +++ b/src/proto_alpha/lib_delegate/baking_lib.ml @@ -57,8 +57,14 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations cctxt + Operation_worker.run ?monitor_node_operations ~round_durations cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +73,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index 97b9dc591e4323af1a4d594b8504448e2f662b61..e604e3d4b151c849cccdeacabc33bc7c3f5a0745 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -616,8 +616,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -628,7 +628,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -732,7 +731,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -1015,7 +1013,8 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = Operation_worker.run ~round_durations cctxt in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1029,6 +1028,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 5707a7fea4a0c92cec61ea7a57e89b58a6d82fe2..41030065dc63ea67d5b083c1152959019b22ea6d 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -154,6 +154,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 1df6ccc4365fe9e9810c994daa3c41e70c886f6d..452c7152431b46e4865b88efeb72ba58d1db30c4 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -38,14 +38,35 @@ module Events = struct let pp_int_64 fmt = Format.fprintf fmt "%Ld" - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) + + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) let ended = declare_1 @@ -728,7 +749,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) +let run ?(monitor_node_operations = true) ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -741,13 +762,17 @@ let run ?(monitor_node_operations = true) (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -755,8 +780,14 @@ let run ?(monitor_node_operations = true) () in match result with - | Error err -> Events.(emit loop_failed err) - | Ok (head, operation_stream, op_stream_stopper) -> + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -779,10 +810,34 @@ let run ?(monitor_node_operations = true) state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -796,7 +851,7 @@ let run ?(monitor_node_operations = true) in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; @@ -812,16 +867,11 @@ let run ?(monitor_node_operations = true) (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = diff --git a/src/proto_alpha/lib_delegate/operation_worker.mli b/src/proto_alpha/lib_delegate/operation_worker.mli index a76a818fb910812716f3036714862725e838a8e3..325923937a4f52115a31a8e6a2be7ab43079b33a 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.mli +++ b/src/proto_alpha/lib_delegate/operation_worker.mli @@ -55,13 +55,17 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~round_durations cctxt] spawns an operation + worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : - ?monitor_node_operations:bool -> #Protocol_client_context.full -> t Lwt.t + ?monitor_node_operations:bool -> + round_durations:Protocol.Alpha_context.Round.round_durations -> + #Protocol_client_context.full -> + t Lwt.t (** {1 Utilities} *) diff --git a/tezt/lib_tezos/agnostic_baker.ml b/tezt/lib_tezos/agnostic_baker.ml index 42b609c2339b77a3960a74866635478d03cda2f2..564e254c99e88b0868ce2d43fd84feb9c1a835be 100644 --- a/tezt/lib_tezos/agnostic_baker.ml +++ b/tezt/lib_tezos/agnostic_baker.ml @@ -321,6 +321,15 @@ let wait_for_ready agnostic_baker = resolver :: agnostic_baker.persistent_state.pending_ready ; check_event agnostic_baker "starting baker daemon" promise +let wait_for_termination (baker : t) : unit Lwt.t = + match baker.status with + | Not_running -> Lwt.return_unit + | Running {event_loop_promise = Some p; _} -> p + | Running {event_loop_promise = None; _} -> + invalid_arg + "Agnostic_baker.wait_for_termination called before Agnostic_baker.run \ + returned" + let init ?env ?runner ?(path = Uses.path Constant.octez_agnostic_baker) ?name ?color ?event_level ?event_pipe ?event_sections_levels ?(delegates = []) ?votefile ?liquidity_baking_toggle_vote ?force_apply_from_round ?remote_mode diff --git a/tezt/lib_tezos/agnostic_baker.mli b/tezt/lib_tezos/agnostic_baker.mli index 7568c53329eeaf29e8fe4d860c7e0c9cf8480b4d..103e3e12d641c5a5ec4bfb746a440798519847bc 100644 --- a/tezt/lib_tezos/agnostic_baker.mli +++ b/tezt/lib_tezos/agnostic_baker.mli @@ -38,6 +38,9 @@ val wait_for : ?where:string -> t -> string -> (JSON.t -> 'a option) -> 'a Lwt.t *) val wait_for_ready : t -> unit Lwt.t +(* Wait for agnostic baker termination. *) +val wait_for_termination : t -> unit Lwt.t + (** Raw events. *) type event = {name : string; value : JSON.t; timestamp : float} diff --git a/tezt/tests/baker_test.ml b/tezt/tests/baker_test.ml index d597d63009b60237e14c9658ed4adb873d6507c6..ec617254ace3a5f31f80c1c516e80b2c9d50ef0e 100644 --- a/tezt/tests/baker_test.ml +++ b/tezt/tests/baker_test.ml @@ -1273,6 +1273,105 @@ let aggregated_operations_retrival_from_block_content = in unit +let unable_to_reach_node_mempool = + Protocol.register_test + ~__FILE__ + ~title:"Baker shuts down when unable to reach the node mempool" + ~tags:[team; "operation_worker"; "shutdown"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + ~supports:Protocol.(From_protocol 023) + @@ fun protocol -> + log_step 1 "Initialize a node with the mempool disabled" ; + let* node, client = + Client.init_with_protocol + ~nodes_args:[Connections 0; Synchronisation_threshold 0; Disable_mempool] + `Client + ~timestamp:Now + ~protocol + () + in + log_step 2 "Run a baker" ; + let* baker = Agnostic_baker.init node client in + log_step 3 "Wait for baker termination" ; + let* outcome = + Lwt.choose + [ + (let* () = Agnostic_baker.wait_for_termination baker in + return `Terminated); + (* Saves some CI time by failing after 2 minutes if the baker fails to + shut down as expected.*) + (let* () = Lwt_unix.sleep 120. in + return `Timeout); + ] + in + match outcome with + | `Terminated -> unit + | `Timeout -> + Test.fail + "The baker failed to shut down after being unable to reach the node \ + mempool for 2 minutes" + +let stream_is_refreshed_on_timeout = + Protocol.register_test + ~__FILE__ + ~title:"Operation stream is refreshed on timeout" + ~tags:[team; "operation_worker"; "timeout"] + ~uses:(fun _protocol -> [Constant.octez_agnostic_baker]) + ~supports:Protocol.(From_protocol 023) + @@ fun protocol -> + log_step 1 "Initialize a node with mainnet constants" ; + let* parameter_file = + Protocol.write_parameter_file + ~base:(Either.Right (protocol, Some Constants_mainnet)) + [] + in + let* node, client = + Client.init_with_protocol + `Client + ~protocol + ~parameter_file + ~timestamp:Now + () + in + log_step 2 "Run a baker" ; + let* baker = + Agnostic_baker.init + ~delegates:Constant.[activator.public_key_hash] + node + client + in + log_step 3 "Bake 2 levels" ; + let* () = + Client.bake_for_and_wait ~keys:[] ~minimal_timestamp:true ~count:2 client + in + (* Since the node will neither switch heads nor forward new operations, the + operation worker is expected to refresh the monitor_operations stream after + the equivalent of two rounds of inactivity. *) + log_step 4 "Wait for stream_timeout event" ; + let wait_for_stream_timeout () = + Agnostic_baker.wait_for + baker + "monitor_operations_stream_timeout.v0" + (fun _ -> Some ()) + in + let* waiter = + Lwt.choose + [ + (let* () = wait_for_stream_timeout () in + return `Stream_timeout); + (* Saves some CI time by failing after 2 minutes if the baker fails to + refresh the stream as expected.*) + (let* () = Lwt_unix.sleep 120. in + return `Timeout); + ] + in + match waiter with + | `Stream_timeout -> unit + | `Timeout -> + Test.fail + "The baker did not refresh the streamed RPC after prolonged inactivity \ + from the mempool" + let register ~protocols = check_node_version_check_bypass_test protocols ; check_node_version_allowed_test protocols ; @@ -1290,4 +1389,6 @@ let register ~protocols = prequorum_check_levels protocols ; attestations_aggregation_on_reproposal_local_context protocols ; attestations_aggregation_on_reproposal_remote_node protocols ; - aggregated_operations_retrival_from_block_content protocols + aggregated_operations_retrival_from_block_content protocols ; + unable_to_reach_node_mempool protocols ; + stream_is_refreshed_on_timeout protocols