From e679be22cb7d3410961e6a5a0cce39091f9f3aa0 Mon Sep 17 00:00:00 2001 From: Albin Coquereau Date: Thu, 9 Oct 2025 14:48:28 +0200 Subject: [PATCH 1/5] stdlib/utils: add errors in retry print message --- src/lib_stdlib_unix/utils.ml | 18 +++++++++++++----- src/lib_stdlib_unix/utils.mli | 16 ++++++++-------- .../lib_delegate/baking_scheduling.ml | 5 +++-- .../lib_delegate/baking_scheduling.mli | 2 +- .../lib_delegate/operation_worker.ml | 2 +- .../lib_delegate/baking_scheduling.ml | 5 +++-- .../lib_delegate/baking_scheduling.mli | 2 +- .../lib_delegate/operation_worker.ml | 2 +- .../lib_delegate/baking_scheduling.ml | 5 +++-- .../lib_delegate/baking_scheduling.mli | 2 +- .../lib_delegate/operation_worker.ml | 2 +- 11 files changed, 36 insertions(+), 25 deletions(-) diff --git a/src/lib_stdlib_unix/utils.ml b/src/lib_stdlib_unix/utils.ml index 7bf05ec6b41c..92b7b452e4c2 100644 --- a/src/lib_stdlib_unix/utils.ml +++ b/src/lib_stdlib_unix/utils.ml @@ -115,14 +115,20 @@ let copy_dir ?(perm = 0o755) ?progress src dst = let copy_file = copy_file ~count_progress:(fun _ -> ()) -let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ?(msg = "") f x = +let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit + ?(msg = fun _ -> "") f x = let open Lwt.Syntax in let* result = f x in match result with | Ok _ as r -> Lwt.return r - | Error (err :: _) as errs when tries > 0 && is_error err -> ( + | Error (err :: _ as errs) when tries > 0 && is_error err -> ( let* () = - emit (Format.sprintf "%sRetrying in %.2f seconds..." msg delay) + emit + (Format.sprintf + "%sRetrying in %.2f seconds, %d attempts left..." + (msg errs) + delay + tries) in let* result = Lwt.pick @@ -134,7 +140,7 @@ let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ?(msg = "") f x = ] in match result with - | `Killed -> Lwt.return errs + | `Killed -> Lwt.return_error errs | `Continue -> let next_delay = delay *. factor in let delay = @@ -153,4 +159,6 @@ let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ?(msg = "") f x = ~emit f x) - | Error _ as err -> Lwt.return err + | Error errs as err -> + let* () = emit (Format.sprintf "%sNo attempts left." (msg errs)) in + Lwt.return err diff --git a/src/lib_stdlib_unix/utils.mli b/src/lib_stdlib_unix/utils.mli index 738c02bf50d3..94d6e2fbb308 100644 --- a/src/lib_stdlib_unix/utils.mli +++ b/src/lib_stdlib_unix/utils.mli @@ -59,13 +59,13 @@ val copy_file : src:string -> dst:string -> unit val copy_dir : ?perm:int -> ?progress:string * Terminal.Color.t -> string -> string -> unit -(** [retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ?msg f x] - retries applying [f x] [tries] until it succeeds or returns an error - when [is_error] is false, at most [tries] number of times. After - each try it waits for a number of seconds, but not more than [max_delay], if - given. The wait time between tries is given by the initial [delay], - multiplied by [factor] at each subsequent try. At each failure, [msg] - together with the current delay is printed using [emit]. *) +(** [retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ?msg f x] retries + applying [f x] [tries] until it succeeds or returns an error when [is_error] + is false, at most [tries] number of times. After each try it waits for a + number of seconds, but not more than [max_delay], if given. The wait time + between tries is given by the initial [delay], multiplied by [factor] at + each subsequent try. At each failure, [msg] can print the error together + with the current delay using [emit]. *) val retry : ?max_delay:float -> delay:float -> @@ -73,7 +73,7 @@ val retry : tries:int -> is_error:('err -> bool) -> emit:(string -> unit Lwt.t) -> - ?msg:string -> + ?msg:('err list -> string) -> ('a -> ('b, 'err list) result Lwt.t) -> 'a -> ('b, 'err list) result Lwt.t diff --git a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml index 503a14f3555e..c4748657f384 100644 --- a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml +++ b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml @@ -872,7 +872,7 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = "Connection failed. ") f x = + ~tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay @@ -962,7 +962,8 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~delay:1. ~factor:2. ~tries:max_int - ~msg:"Failed to register profiles, DAL node is not reachable. " + ~msg:(fun _errs -> + "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) ()) dal_node_rpc_ctxt diff --git a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli index b6f52af181ca..498031186a8d 100644 --- a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli +++ b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli @@ -46,7 +46,7 @@ val retry : delay:float -> factor:float -> tries:int -> - ?msg:string -> + ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> 'b tzresult Lwt.t diff --git a/src/proto_022_PsRiotum/lib_delegate/operation_worker.ml b/src/proto_022_PsRiotum/lib_delegate/operation_worker.ml index 872233eb9aad..3e26796b5d95 100644 --- a/src/proto_022_PsRiotum/lib_delegate/operation_worker.ml +++ b/src/proto_022_PsRiotum/lib_delegate/operation_worker.ml @@ -715,7 +715,7 @@ let run ?(monitor_node_operations = true) ~constants ~factor:2. ~tries:5 ~is_error:(function _ -> true) - ~msg:"unable to call monitor operations RPC." + ~msg:(fun _ -> "unable to call monitor operations RPC.") (fun () -> (monitor_operations cctxt diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index aae0b43742ec..7161b69ddb77 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -873,7 +873,7 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = "Connection failed. ") f x = + ~tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay @@ -969,7 +969,8 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~delay:1. ~factor:2. ~tries:max_int - ~msg:"Failed to register profiles, DAL node is not reachable. " + ~msg:(fun _errs -> + "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) ()) dal_node_rpc_ctxt diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index 924476911a88..11d9b2a8558b 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -46,7 +46,7 @@ val retry : delay:float -> factor:float -> tries:int -> - ?msg:string -> + ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> 'b tzresult Lwt.t diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 139c96af3c33..ed02e0c4fe27 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -737,7 +737,7 @@ let run ?(monitor_node_operations = true) ~constants ~factor:2. ~tries:5 ~is_error:(function _ -> true) - ~msg:"unable to call monitor operations RPC." + ~msg:(fun _ -> "unable to call monitor operations RPC.") (fun () -> (monitor_operations cctxt diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index b007199c1930..a2decd1a4e87 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -883,7 +883,7 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = "Connection failed. ") f x = + ~tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay @@ -979,7 +979,8 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~delay:1. ~factor:2. ~tries:max_int - ~msg:"Failed to register profiles, DAL node is not reachable. " + ~msg:(fun _errs -> + "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) ()) dal_node_rpc_ctxt diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 151f47d87ee9..47427edc3af9 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -46,7 +46,7 @@ val retry : delay:float -> factor:float -> tries:int -> - ?msg:string -> + ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> 'b tzresult Lwt.t diff --git a/src/proto_alpha/lib_delegate/operation_worker.ml b/src/proto_alpha/lib_delegate/operation_worker.ml index 139c96af3c33..ed02e0c4fe27 100644 --- a/src/proto_alpha/lib_delegate/operation_worker.ml +++ b/src/proto_alpha/lib_delegate/operation_worker.ml @@ -737,7 +737,7 @@ let run ?(monitor_node_operations = true) ~constants ~factor:2. ~tries:5 ~is_error:(function _ -> true) - ~msg:"unable to call monitor operations RPC." + ~msg:(fun _ -> "unable to call monitor operations RPC.") (fun () -> (monitor_operations cctxt -- GitLab From a6402fa170b00f8791dad1f8981f0790c7e0e230 Mon Sep 17 00:00:00 2001 From: Albin Coquereau Date: Thu, 9 Oct 2025 14:52:37 +0200 Subject: [PATCH 2/5] stdlib/utils: make tries count optional --- src/lib_agnostic_baker/daemon.ml | 1 - src/lib_stdlib_unix/utils.ml | 13 ++++++++----- src/lib_stdlib_unix/utils.mli | 2 +- .../lib_delegate/baking_scheduling.ml | 5 ++--- .../lib_delegate/baking_scheduling.mli | 2 +- .../lib_delegate/client_daemon.ml | 7 +------ .../lib_delegate/baking_scheduling.ml | 5 ++--- .../lib_delegate/baking_scheduling.mli | 2 +- .../lib_delegate/client_daemon.ml | 7 +------ src/proto_alpha/lib_delegate/baking_scheduling.ml | 5 ++--- src/proto_alpha/lib_delegate/baking_scheduling.mli | 2 +- src/proto_alpha/lib_delegate/client_daemon.ml | 7 +------ 12 files changed, 21 insertions(+), 37 deletions(-) diff --git a/src/lib_agnostic_baker/daemon.ml b/src/lib_agnostic_baker/daemon.ml index 665873610373..118bacb3ca5f 100644 --- a/src/lib_agnostic_baker/daemon.ml +++ b/src/lib_agnostic_baker/daemon.ml @@ -136,7 +136,6 @@ module Make_daemon (Agent : AGENT) : ~max_delay:10. ~delay:1. ~factor:1.5 - ~tries:max_int ~is_error:(function Cannot_connect_to_node _ -> true | _ -> false) (fun node_addr -> Rpc_services.get_level ~node_addr) node_addr diff --git a/src/lib_stdlib_unix/utils.ml b/src/lib_stdlib_unix/utils.ml index 92b7b452e4c2..ff00029801fd 100644 --- a/src/lib_stdlib_unix/utils.ml +++ b/src/lib_stdlib_unix/utils.ml @@ -115,20 +115,23 @@ let copy_dir ?(perm = 0o755) ?progress src dst = let copy_file = copy_file ~count_progress:(fun _ -> ()) -let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit +let rec retry ?max_delay ~delay ~factor ?tries ~is_error ~emit ?(msg = fun _ -> "") f x = let open Lwt.Syntax in let* result = f x in + let should_retry = match tries with None -> true | Some i -> i > 0 in match result with | Ok _ as r -> Lwt.return r - | Error (err :: _ as errs) when tries > 0 && is_error err -> ( + | Error (err :: _ as errs) when should_retry && is_error err -> ( let* () = emit (Format.sprintf - "%sRetrying in %.2f seconds, %d attempts left..." + "%sRetrying in %.2f seconds%s..." (msg errs) delay - tries) + (match tries with + | None -> "" + | Some i -> Format.sprintf ", %d attempts left" i)) in let* result = Lwt.pick @@ -154,7 +157,7 @@ let rec retry ?max_delay ~delay ~factor ~tries ~is_error ~emit ~delay ~factor ~msg - ~tries:(tries - 1) + ?tries:(Option.map pred tries) ~is_error ~emit f diff --git a/src/lib_stdlib_unix/utils.mli b/src/lib_stdlib_unix/utils.mli index 94d6e2fbb308..89e42133edca 100644 --- a/src/lib_stdlib_unix/utils.mli +++ b/src/lib_stdlib_unix/utils.mli @@ -70,7 +70,7 @@ val retry : ?max_delay:float -> delay:float -> factor:float -> - tries:int -> + ?tries:int -> is_error:('err -> bool) -> emit:(string -> unit Lwt.t) -> ?msg:('err list -> string) -> diff --git a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml index c4748657f384..332ff6af5b5d 100644 --- a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml +++ b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.ml @@ -872,13 +872,13 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = fun _errs -> "Connection failed. ") f x = + ?tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay ~delay ~factor - ~tries + ?tries ~msg ~is_error:(function | RPC_client_errors.Request_failed {error = Connection_failed _; _} -> @@ -961,7 +961,6 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~max_delay:2. ~delay:1. ~factor:2. - ~tries:max_int ~msg:(fun _errs -> "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) diff --git a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli index 498031186a8d..cab28a15d13e 100644 --- a/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli +++ b/src/proto_022_PsRiotum/lib_delegate/baking_scheduling.mli @@ -45,7 +45,7 @@ val retry : ?max_delay:float -> delay:float -> factor:float -> - tries:int -> + ?tries:int -> ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> diff --git a/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml b/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml index 944a5862626f..d589e9dc4d2b 100644 --- a/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml +++ b/src/proto_022_PsRiotum/lib_delegate/client_daemon.ml @@ -37,12 +37,7 @@ let rec retry_on_disconnection (cctxt : #Protocol_client_context.full) f = let* () = Client_confirmations.wait_for_bootstrapped ~retry: - (Baking_scheduling.retry - cctxt - ~max_delay:10. - ~delay:1. - ~factor:1.5 - ~tries:max_int) + (Baking_scheduling.retry cctxt ~max_delay:10. ~delay:1. ~factor:1.5) cctxt in retry_on_disconnection cctxt f diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index 7161b69ddb77..cb06f599e86a 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -873,13 +873,13 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = fun _errs -> "Connection failed. ") f x = + ?tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay ~delay ~factor - ~tries + ?tries ~msg ~is_error:(function | RPC_client_errors.Request_failed {error = Connection_failed _; _} -> @@ -968,7 +968,6 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~max_delay:2. ~delay:1. ~factor:2. - ~tries:max_int ~msg:(fun _errs -> "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index 11d9b2a8558b..cad08d349607 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -45,7 +45,7 @@ val retry : ?max_delay:float -> delay:float -> factor:float -> - tries:int -> + ?tries:int -> ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml index 1e30f06735e2..714e0c6b3ce5 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/client_daemon.ml @@ -37,12 +37,7 @@ let rec retry_on_disconnection (cctxt : #Protocol_client_context.full) f = let* () = Client_confirmations.wait_for_bootstrapped ~retry: - (Baking_scheduling.retry - cctxt - ~max_delay:10. - ~delay:1. - ~factor:1.5 - ~tries:max_int) + (Baking_scheduling.retry cctxt ~max_delay:10. ~delay:1. ~factor:1.5) cctxt in retry_on_disconnection cctxt f diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.ml b/src/proto_alpha/lib_delegate/baking_scheduling.ml index a2decd1a4e87..4a32d353647c 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.ml +++ b/src/proto_alpha/lib_delegate/baking_scheduling.ml @@ -883,13 +883,13 @@ let perform_sanity_check cctxt ~chain_id = return_unit let retry (cctxt : #Protocol_client_context.full) ?max_delay ~delay ~factor - ~tries ?(msg = fun _errs -> "Connection failed. ") f x = + ?tries ?(msg = fun _errs -> "Connection failed. ") f x = Utils.retry ~emit:(cctxt#message "%s") ?max_delay ~delay ~factor - ~tries + ?tries ~msg ~is_error:(function | RPC_client_errors.Request_failed {error = Connection_failed _; _} -> @@ -978,7 +978,6 @@ let register_dal_profiles cctxt dal_node_rpc_ctxt delegates = ~max_delay:2. ~delay:1. ~factor:2. - ~tries:max_int ~msg:(fun _errs -> "Failed to register profiles, DAL node is not reachable. ") (fun () -> register dal_ctxt) diff --git a/src/proto_alpha/lib_delegate/baking_scheduling.mli b/src/proto_alpha/lib_delegate/baking_scheduling.mli index 47427edc3af9..5707a7fea4a0 100644 --- a/src/proto_alpha/lib_delegate/baking_scheduling.mli +++ b/src/proto_alpha/lib_delegate/baking_scheduling.mli @@ -45,7 +45,7 @@ val retry : ?max_delay:float -> delay:float -> factor:float -> - tries:int -> + ?tries:int -> ?msg:(tztrace -> string) -> ('a -> 'b tzresult Lwt.t) -> 'a -> diff --git a/src/proto_alpha/lib_delegate/client_daemon.ml b/src/proto_alpha/lib_delegate/client_daemon.ml index 1e30f06735e2..714e0c6b3ce5 100644 --- a/src/proto_alpha/lib_delegate/client_daemon.ml +++ b/src/proto_alpha/lib_delegate/client_daemon.ml @@ -37,12 +37,7 @@ let rec retry_on_disconnection (cctxt : #Protocol_client_context.full) f = let* () = Client_confirmations.wait_for_bootstrapped ~retry: - (Baking_scheduling.retry - cctxt - ~max_delay:10. - ~delay:1. - ~factor:1.5 - ~tries:max_int) + (Baking_scheduling.retry cctxt ~max_delay:10. ~delay:1. ~factor:1.5) cctxt in retry_on_disconnection cctxt f -- GitLab From 205a84fcc773cba074242860fecdb0d6aa1be279 Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 14:08:43 +0200 Subject: [PATCH 3/5] 023_PtSeouLo/baker/operation_worker: fix baker shutdown Porting to proto 023_PtSeouLo df5198c7eed268022a4189f4f2fca89935162236 - baker/operation_worker: fix baker shutdown --- .../lib_delegate/operation_worker.ml | 55 ++++++++++++------- 1 file changed, 35 insertions(+), 20 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index ed02e0c4fe27..3473ada7a098 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -36,14 +36,24 @@ module Events = struct let pp_int = Format.pp_print_int - let loop_failed = - declare_1 + let node_unreachable_crash = + declare_0 ~section - ~name:"loop_failed" + ~name:"node_unreachable_crash" ~level:Error - ~msg:"loop failed with {trace}" - ~pp1:Error_monad.pp_print_trace - ("trace", Error_monad.trace_encoding) + ~msg: + "Node unreachable via the monitor_operations RPC. Unable to monitor \ + quorum. Shutting down baker..." + () + + let monitor_operations_retry = + declare_1 + ~section + ~name:"monitor_operations_retry" + ~level:Warning + ~msg:"{msg}" + ~pp1:Format.pp_print_string + ("msg", Data_encoding.string) let ended = declare_1 @@ -731,13 +741,17 @@ let run ?(monitor_node_operations = true) ~constants (* If the call to [monitor_operations] RPC fails, retry 5 times during 25 seconds before crashing the worker . *) Utils.retry - ~emit:(cctxt#message "%s") + ~emit:Events.(emit monitor_operations_retry) ~max_delay:10. ~delay:1. ~factor:2. - ~tries:5 + ~tries:10 ~is_error:(function _ -> true) - ~msg:(fun _ -> "unable to call monitor operations RPC.") + ~msg:(fun errs -> + Format.asprintf + "Failed to reach the node via the monitor_operations RPC@,%a" + pp_print_trace + errs) (fun () -> (monitor_operations cctxt @@ -745,7 +759,13 @@ let run ?(monitor_node_operations = true) ~constants () in match result with - | Error err -> Events.(emit loop_failed err) + | Error _ -> + (* The baker failed to reach the node via the monitor_operations + RPC after multiple retries. Because it can no longer monitor the + consensus, it is unable to attest or bake. Rather than remain in this + degraded state or retry indefinitely, we shut it down explicitly. *) + let* () = Events.(emit node_unreachable_crash ()) in + Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 | Ok (head, operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () @@ -802,16 +822,11 @@ let run ?(monitor_node_operations = true) ~constants (loop () [@profiler.record_s {verbosity = Notice} "operations processing"]) in - Lwt.dont_wait - (fun () -> - Lwt.finalize - (fun () -> - if state.monitor_node_operations then worker_loop () else return_unit) - (fun () -> - let* _ = shutdown_worker state in - return_unit)) - (fun exn -> - Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; + if state.monitor_node_operations then + Lwt.dont_wait + (fun () -> worker_loop ()) + (fun exn -> + Events.(emit__dont_wait__use_with_care ended (Printexc.to_string exn))) ; return state let retrieve_pending_operations cctxt state = -- GitLab From 75e5224d9e0273a4b1f4f2052744fb469cb9625e Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 14:08:57 +0200 Subject: [PATCH 4/5] 023_PtSeouLo/baker/operation_worker: refresh monitor_operations RPC on timeout Porting to proto 023_PtSeouLo 75a6f8edc8fe978f7d4b69b01a7997bd7835b3f3 - baker/operation_worker: refresh monitor_operations RPC on timeout --- .../lib_delegate/operation_worker.ml | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 3473ada7a098..8f49440688ac 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -55,6 +55,17 @@ module Events = struct ~pp1:Format.pp_print_string ("msg", Data_encoding.string) + let monitor_operations_stream_timeout = + declare_1 + ~section + ~name:"monitor_operations_stream_timeout" + ~level:Warning + ~msg: + "No data received from monitor_operations RPC for {timeout} seconds. \ + Assuming the stream is stalled and refreshing it." + ~pp1:Format.pp_print_float + ("timeout", Data_encoding.float) + let ended = declare_1 ~section @@ -789,10 +800,32 @@ let run ?(monitor_node_operations = true) ~constants state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; + (* TODO: make this value round dependent *) + let stream_timeout = 20. in let rec loop () = - let* ops = Lwt_stream.get operation_stream in - match ops with - | None -> + let* result = + Lwt.pick + [ + (let* ops = Lwt_stream.get operation_stream in + return (`Stream ops)); + (let* () = Lwt_unix.sleep stream_timeout in + return `Timeout); + ] + in + match result with + | `Timeout -> + (* The monitor_operations RPC has neither produced new data nor + closed the stream for some time. This can occur naturally, but + may also indicate a stalled stream. Restarting it is + inexpensive and can prevent the baker from hanging + indefinitely. *) + let* () = + Events.(emit monitor_operations_stream_timeout stream_timeout) + in + op_stream_stopper () ; + let* () = reset_monitoring state in + worker_loop () + | `Stream None -> (* When the stream closes, it means a new head has been set, we reset the monitoring and flush current operations *) let* () = Events.(emit end_of_stream ()) in @@ -806,7 +839,7 @@ let run ?(monitor_node_operations = true) ~constants in () [@profiler.stop] ; worker_loop () - | Some ops -> + | `Stream (Some ops) -> (state.operation_pool <- Operation_pool.add_operations state.operation_pool ops) [@profiler.aggregate_f {verbosity = Info} "add operations"] ; -- GitLab From c5b165ca9d4042d9d6f35ecd60dec0881febe75d Mon Sep 17 00:00:00 2001 From: Adam Allombert-Goget Date: Fri, 10 Oct 2025 18:13:47 +0200 Subject: [PATCH 5/5] 023_PtSeouLo/baker/operation_worker: RPC timeout depend of the round_duration Porting to proto 023_PtSeouLo 93e5eec596d567d37c9cc35b9d6222c8fa8990b6 - baker/operation_worker: RPC timeout depend of the round_duration --- .../testnet_experiment_tools/tool_023_PtSeouLo.ml | 14 +++++++++++++- src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml | 13 ++++++++++++- .../lib_delegate/baking_scheduling.ml | 12 +++++++----- .../lib_delegate/baking_scheduling.mli | 1 + .../lib_delegate/operation_worker.ml | 10 ++++++---- .../lib_delegate/operation_worker.mli | 8 +++++--- 6 files changed, 44 insertions(+), 14 deletions(-) diff --git a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml index 6249f3c50bb7..f6a61d08a58e 100644 --- a/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml +++ b/devtools/testnet_experiment_tools/tool_023_PtSeouLo.ml @@ -255,8 +255,18 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~round_durations + ~constants + cctxt in Baking_scheduling.create_initial_state cctxt @@ -264,7 +274,9 @@ let create_state cctxt ?synchronize ?monitor_node_mempool ~config ~chain config operation_worker + ~constants ~current_proposal + round_durations delegates let compute_current_round_duration round_durations diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml index b5e992bfa3a4..9793e00e21ad 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_lib.ml @@ -57,8 +57,18 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool let* constants = Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in + let*? round_durations = + Round.Durations.create + ~first_round_duration:constants.parametric.minimal_block_delay + ~delay_increment_per_round:constants.parametric.delay_increment_per_round + |> Environment.wrap_tzresult + in let*! operation_worker = - Operation_worker.run ?monitor_node_operations ~constants cctxt + Operation_worker.run + ?monitor_node_operations + ~constants + ~round_durations + cctxt in Baking_scheduling.create_initial_state cctxt @@ -67,6 +77,7 @@ let create_state cctxt ?dal_node_rpc_ctxt ?synchronize ?monitor_node_mempool ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml index cb06f599e86a..6a66761d424d 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.ml @@ -608,8 +608,8 @@ let create_round_durations constants = (Round.Durations.create ~first_round_duration ~delay_increment_per_round) let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain - config operation_worker ~(current_proposal : Baking_state.proposal) - ?constants delegates = + config operation_worker round_durations + ~(current_proposal : Baking_state.proposal) ?constants delegates = let open Lwt_result_syntax in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7391 consider saved attestable value *) @@ -620,7 +620,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain | Some c -> return c | None -> Alpha_services.Constants.all cctxt (`Hash chain_id, `Head 0) in - let*? round_durations = create_round_durations constants in let* validation_mode = Baking_state.( match config.Baking_configuration.validation with @@ -724,7 +723,6 @@ let create_initial_state cctxt ?dal_node_rpc_ctxt ?(synchronize = true) ~chain in let* round_state = if synchronize then - let*? round_durations = create_round_durations constants in let*? current_round = Baking_actions.compute_round current_proposal round_durations in @@ -997,7 +995,10 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) | Some current_head -> return current_head | None -> failwith "head stream unexpectedly ended" in - let*! operation_worker = Operation_worker.run ~constants cctxt in + let*? round_durations = create_round_durations constants in + let*! operation_worker = + Operation_worker.run ~constants ~round_durations cctxt + in Option.iter (fun canceler -> Lwt_canceler.on_cancel canceler (fun () -> @@ -1011,6 +1012,7 @@ let run cctxt ?dal_node_rpc_ctxt ?canceler ?(stop_on_event = fun _ -> false) ~chain config operation_worker + round_durations ~current_proposal ~constants delegates diff --git a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli index cad08d349607..d468d0c36d54 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/baking_scheduling.mli @@ -145,6 +145,7 @@ val create_initial_state : chain:Chain_services.chain -> Baking_configuration.t -> Operation_worker.t -> + Round.round_durations -> current_proposal:proposal -> ?constants:Constants.t -> Baking_state_types.Key.t list -> diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml index 8f49440688ac..5d5079aae645 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.ml @@ -738,7 +738,7 @@ let flush_operation_pool state (head_level, head_round) = let operation_pool = {Operation_pool.empty with consensus = attestations} in state.operation_pool <- operation_pool -let run ?(monitor_node_operations = true) ~constants +let run ?(monitor_node_operations = true) ~constants ~round_durations (cctxt : #Protocol_client_context.full) = let open Lwt_syntax in let state = @@ -777,7 +777,7 @@ let run ?(monitor_node_operations = true) ~constants degraded state or retry indefinitely, we shut it down explicitly. *) let* () = Events.(emit node_unreachable_crash ()) in Lwt_exit.exit_and_raise (*ECONNREFUSED*) 111 - | Ok (head, operation_stream, op_stream_stopper) -> + | Ok (((_, round) as head), operation_stream, op_stream_stopper) -> () [@profiler.stop] ; () [@profiler.record @@ -800,8 +800,10 @@ let run ?(monitor_node_operations = true) ~constants state head [@profiler.record_f {verbosity = Notice} "update operations pool"] ; - (* TODO: make this value round dependent *) - let stream_timeout = 20. in + let stream_timeout = + Round.round_duration round_durations (Round.succ round) + |> Period.to_seconds |> Int64.to_float + in let rec loop () = let* result = Lwt.pick diff --git a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli index 944bfbd54fb9..aa29853afa64 100644 --- a/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli +++ b/src/proto_023_PtSeouLo/lib_delegate/operation_worker.mli @@ -55,16 +55,18 @@ type event = (** {1 Constructors}*) -(** [run ?monitor_node_operations cctxt] spawns an operation worker. +(** [run ?monitor_node_operations ~constants ~round_durations cctxt] spawns an + operation worker. @param monitor_node_operations monitor operations on the node (defaults: [true]). Set [monitor_node_operations] to [false] to only consider externally provided (non-node) operations. *) val run : ?monitor_node_operations:bool -> - constants:Constants.t -> + constants:Protocol.Alpha_context.Constants.t -> + round_durations:Protocol.Alpha_context.Round.round_durations -> #Protocol_client_context.full -> - t Lwt.t + t Environment.Lwt.t (** {1 Utilities} *) -- GitLab