diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 6aed7f9acb11b464d24b7ef2223679e3e5fae839..b15aff2d844863a87c79be66e1d705317a1c0485 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -8010,8 +8010,8 @@ let _octez_experimental_agnostic_baker = octez_rustzcash_deps; bls12_381_archive; data_encoding |> open_; - octez_base |> open_ ~m:"TzPervasives"; - octez_base_unix; + octez_base |> open_ ~m:"TzPervasives" |> open_; + octez_base_unix |> open_; octez_validation |> open_; octez_client_base_unix |> open_; octez_client_base |> open_; diff --git a/src/bin_agnostic_baker/agnostic_baker_errors.ml b/src/bin_agnostic_baker/agnostic_baker_errors.ml index a4fd7da58ff58deb5e309b8df3bbaeeb270063ae..0ebe3b98d84aa6ae1620c918cf968a3a9afb6222 100644 --- a/src/bin_agnostic_baker/agnostic_baker_errors.ml +++ b/src/bin_agnostic_baker/agnostic_baker_errors.ml @@ -9,6 +9,7 @@ type error += | Lost_node_connection | Cannot_connect_to_node of string | Cannot_decode_node_data of string + | Missing_current_baker let () = Error_monad.register_error_kind @@ -41,4 +42,13 @@ let () = ~pp:(fun ppf err -> Format.fprintf ppf "Cannot decode node data: %s" err) Data_encoding.(obj1 (req "err" string)) (function Cannot_decode_node_data err -> Some err | _ -> None) - (fun err -> Cannot_decode_node_data err) + (fun err -> Cannot_decode_node_data err) ; + Error_monad.register_error_kind + `Permanent + ~id:"agnostic_baker.missing_current_baker" + ~title:"Missing current baker" + ~description:"The current baker binary is missing." + ~pp:(fun ppf () -> Format.fprintf ppf "Missing current baker") + Data_encoding.(unit) + (function Missing_current_baker -> Some () | _ -> None) + (fun () -> Missing_current_baker) diff --git a/src/bin_agnostic_baker/daemon.ml b/src/bin_agnostic_baker/daemon.ml index 52c60f4faf495d83e59512679853f58d90409a9d..abfb1484ca47304cf4f5defa4f2ee5532aa478b6 100644 --- a/src/bin_agnostic_baker/daemon.ml +++ b/src/bin_agnostic_baker/daemon.ml @@ -7,23 +7,52 @@ open Agnostic_baker_errors -module Baker = struct - type t = { - protocol_hash : Protocol_hash.t; - binary_path : string; - process : Lwt_process.process_none; - ccid : Lwt_exit.clean_up_callback_id; - } +type baker = { + protocol_hash : Protocol_hash.t; + binary_path : string; + process : unit Lwt_process_watchdog.t; + ccid : Lwt_exit.clean_up_callback_id; +} + +module type BAKER = sig + module Event : sig + val emit : 'a Agnostic_baker_events.t -> 'a -> unit Lwt.t + + val shutting_down_process : unit Agnostic_baker_events.t + + val process_started : int Agnostic_baker_events.t + + val process_exited_abnormally : + (int * Unix.process_status) Agnostic_baker_events.t + + val cannot_start_process : string Agnostic_baker_events.t + + val waiting_for_process_restart : float Agnostic_baker_events.t + end + + val baker_path : ?user_path:string -> Protocol_hash.t -> string + + val shutdown : baker -> unit Lwt.t + + val spawn_baker : + Protocol_hash.t -> + binaries_directory:string option -> + baker_args:string trace -> + (baker, tztrace) result Lwt.t +end + +module MakeBaker (Name : Lwt_process_watchdog.NAME) : BAKER = struct + module Event = Lwt_process_watchdog.MakeEvent (Name) + module Watchdog = Lwt_process_watchdog.Daemon (Event) let baker_path ?(user_path = "./") proto_hash = let short_name = Parameters.protocol_short_hash proto_hash in Format.sprintf "%soctez-baker-%s" user_path short_name - let shutdown protocol_hash process = + let shutdown baker = let open Lwt_syntax in - let* () = Agnostic_baker_events.(emit stopping_baker) protocol_hash in - process#terminate ; - Lwt.return_unit + let* () = Agnostic_baker_events.(emit stopping_baker) baker.protocol_hash in + Watchdog.stop baker.process let spawn_baker protocol_hash ~binaries_directory ~baker_args = let open Lwt_result_syntax in @@ -41,29 +70,38 @@ module Baker = struct let binary_path = baker_path ?user_path:binaries_directory protocol_hash in let baker_args = binary_path :: baker_args in let baker_args = Array.of_list baker_args in - let process = - Lwt_process.open_process_none - ~stdout:`Keep - ~stderr:`Keep - (binary_path, baker_args) + let w = + Lwt_process_watchdog.create + ~parameters:() + ~parameters_encoding:Data_encoding.unit + in + let run_process = + Watchdog.run_process w ~binary_path ~arguments:baker_args in + let* new_baker = run_process () in + let _ = Watchdog.watch_dog ~start_new_server:run_process new_baker in let*! () = Agnostic_baker_events.(emit baker_running) protocol_hash in let ccid = Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> - let*! () = shutdown protocol_hash process in + let*! () = + Agnostic_baker_events.(emit stopping_baker) protocol_hash + in + let*! () = Watchdog.stop new_baker in Lwt.return_unit) in - return {protocol_hash; binary_path; process; ccid} + return {protocol_hash; binary_path; process = new_baker; ccid} end -type state = { +type baker_instance = Baker : (module BAKER) -> baker_instance + +type 'a state = { binaries_directory : string option; node_endpoint : string; baker_args : string list; - mutable current_baker : Baker.t option; + mutable current_baker : (baker_instance * baker) option; } -type t = state +type 'a t = 'a state let monitor_heads ~node_addr = let open Lwt_result_syntax in @@ -91,25 +129,41 @@ let monitor_heads ~node_addr = let hot_swap_baker ~state ~next_protocol_hash = let open Lwt_result_syntax in + let* (module CurrentBaker : BAKER), current_baker = + match state.current_baker with + | Some (Baker (module Baker), baker) -> + return ((module Baker : BAKER), baker) + | None -> tzfail Missing_current_baker + in let next_proto_status = Parameters.protocol_status next_protocol_hash in let*! () = Agnostic_baker_events.(emit protocol_encountered) (next_proto_status, next_protocol_hash) in + (* Shutdown previous baker *) let*! () = - match state.current_baker with - | None -> Lwt.return_unit (* Could be assert false*) - | Some b -> - let*! () = Baker.shutdown b.protocol_hash b.process in - let () = Lwt_exit.unregister_clean_up_callback b.ccid in - state.current_baker <- None ; - Lwt.return_unit + let*! () = CurrentBaker.shutdown current_baker in + let () = Lwt_exit.unregister_clean_up_callback current_baker.ccid in + state.current_baker <- None ; + Lwt.return_unit in let* new_baker = - Baker.spawn_baker - next_protocol_hash - ~binaries_directory:state.binaries_directory - ~baker_args:state.baker_args + let module Name = struct + let base = ["agnostic"; "baker"] + + let component = + [ + "baker"; Format.asprintf "%a" Protocol_hash.pp_short next_protocol_hash; + ] + end in + let module NewBaker = MakeBaker (Name) in + let* new_baker = + NewBaker.spawn_baker + next_protocol_hash + ~binaries_directory:state.binaries_directory + ~baker_args:state.baker_args + in + return (Baker (module NewBaker), new_baker) in state.current_baker <- Some new_baker ; return_unit @@ -130,10 +184,10 @@ let monitor_voting_periods ~state head_stream = let* next_protocol_hash = Rpc_services.get_next_protocol_hash ~node_addr in - let current_protocol_hash = + let* current_protocol_hash = match state.current_baker with - | None -> assert false - | Some v -> v.protocol_hash + | None -> tzfail Missing_current_baker + | Some (_, v) -> return v.protocol_hash in let* () = if not (Protocol_hash.equal current_protocol_hash next_protocol_hash) @@ -171,10 +225,23 @@ let may_start_initial_baker state = match proto_status with | Active -> let* current_baker = - Baker.spawn_baker - protocol_hash - ~binaries_directory:state.binaries_directory - ~baker_args:state.baker_args + let module Name = struct + let base = ["agnostic"; "baker"] + + let component = + [ + "baker"; + Format.asprintf "%a" Protocol_hash.pp_short protocol_hash; + ] + end in + let module NewBaker = MakeBaker (Name) in + let* new_baker = + NewBaker.spawn_baker + protocol_hash + ~binaries_directory:state.binaries_directory + ~baker_args:state.baker_args + in + return (Baker (module NewBaker), new_baker) in state.current_baker <- Some current_baker ; return_unit diff --git a/src/bin_agnostic_baker/daemon.mli b/src/bin_agnostic_baker/daemon.mli index 5dbd07c4d46d54e1988b777943e13100cfe2205c..3cdc92dd9ab7b3967633366319ca430b8d266bc1 100644 --- a/src/bin_agnostic_baker/daemon.mli +++ b/src/bin_agnostic_baker/daemon.mli @@ -7,7 +7,7 @@ (** Daemon handling the bakers life cycle. *) -type t +type 'a t (** [create binaries_directory node_endpoint baker_args] returns a non initialized daemon.*) @@ -15,8 +15,8 @@ val create : binaries_directory:string option -> node_endpoint:string -> baker_args:string trace -> - t + 'a t (** [run t] Runs the daemon responsible for the spawn/stop of the baker daemons. *) -val run : t -> unit tzresult Lwt.t +val run : 'a t -> unit tzresult Lwt.t diff --git a/src/bin_agnostic_baker/dune b/src/bin_agnostic_baker/dune index 631eb12d546fd72db84385ed0e6fc69bcd7175b4..642fd938ec2e4307cb105cc9a1cdcf22c44b7aa2 100644 --- a/src/bin_agnostic_baker/dune +++ b/src/bin_agnostic_baker/dune @@ -35,6 +35,8 @@ (:standard) -open Data_encoding -open Tezos_base.TzPervasives + -open Tezos_base + -open Tezos_base_unix -open Tezos_validation -open Tezos_client_base_unix -open Tezos_client_base diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml index e7464e0374e79d95cec48f15f0a2a8789ce46c26..c7c4f939569f6b3f5cae97ebd0f0e34fd9f43893 100644 --- a/src/lib_base/unix/lwt_process_watchdog.ml +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -186,6 +186,18 @@ module Daemon (Event : EVENTS) = struct Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; shutdown t + let run_process t ~binary_path ~arguments () = + let open Lwt_result_syntax in + let process = + Lwt_process.open_process_none + ~stdout:`Keep + ~stderr:`Keep + (binary_path, arguments) + in + let*! () = Event.(emit process_started) process#pid in + t.server <- Some process ; + return t + let run_process_with_sockets t ~process_name ?socket_prefix ?executable_name ~handshake () = let open Lwt_result_syntax in diff --git a/src/lib_base/unix/lwt_process_watchdog.mli b/src/lib_base/unix/lwt_process_watchdog.mli index f40a87c47ca634a671c098f87c0b16e97b91f93f..75b3dfa6ca5d4f8fd895ca242d6fa94669d0f910 100644 --- a/src/lib_base/unix/lwt_process_watchdog.mli +++ b/src/lib_base/unix/lwt_process_watchdog.mli @@ -43,6 +43,19 @@ module Daemon : functor (Event : EVENTS) -> sig soon as it is called. *) val stop : 'a t -> unit Lwt.t + (** [run_process t ~binary_path ~arguments ()] starts a + [Lwt_process.process_none] thanks to the given [binary_path] and + its [arguments]. + Note that the arguments are taken directly from the argument, + thus, the [binary_path] is not added to the argument list. Refer + to the [Lwt_process] documentation for more details. *) + val run_process : + 'a t -> + binary_path:string -> + arguments:string array -> + unit -> + 'a t tzresult Lwt.t + (** [run_process_with_sockets t ~process_name ?socket_prefix ?executable_name ~handshake ()] starts a [Lwt_process.process_none] depending on the given [process_name]