From 5f2ea5111d5e2183361a68ac5c17dd0c3b4696d4 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 28 Oct 2024 15:45:49 +0100 Subject: [PATCH 01/10] Lib_rpc_process: pave the way to genericity -- step 1 (params) --- src/bin_node/node_run_command.ml | 2 +- src/lib_rpc_process/rpc_process_worker.ml | 29 ++++++++++++---------- src/lib_rpc_process/rpc_process_worker.mli | 8 +++--- 3 files changed, 21 insertions(+), 18 deletions(-) diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 111fb9cda8ba..988d863b2557 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -513,7 +513,7 @@ let launch_rpc_server ?middleware (config : Config_file.t) dir rpc_server_kind - No_server: the node is not responding to any RPC. *) type rpc_server_kind = | Local_rpc_server of RPC_server.server list - | External_rpc_server of (RPC_server.server * Rpc_process_worker.t) list + | External_rpc_server of (RPC_server.server * Rpc_process_worker.process) list | No_server (* Initializes an RPC server handled by the node main process. *) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index 56a8c2b8de96..c37d2623d2d6 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -109,13 +109,18 @@ module Event = struct end (* State of the worker. *) -type t = { +type 'a t = { mutable server : Lwt_process.process_none option; + (* Promise that aims to be resolved as soon as the server is + shutting down. *) stop : (int * Unix.process_status) Lwt.t; + (* Resolver that will wakeup the above stop promise. *) stopper : (int * Unix.process_status) Lwt.u; - external_process_parameters : Parameters.t; + parameters : 'a; } +type process = Parameters.t t + let create ~comm_socket_path (config : Config_file.t) node_version events_config = let stop, stopper = Lwt.wait () in @@ -123,13 +128,14 @@ let create ~comm_socket_path (config : Config_file.t) node_version events_config server = None; stop; stopper; - external_process_parameters = - { - internal_events = events_config; - config; - rpc_comm_socket_path = comm_socket_path; - node_version; - }; + parameters = + Parameters. + { + internal_events = events_config; + config; + rpc_comm_socket_path = comm_socket_path; + node_version; + }; } let shutdown t = @@ -173,10 +179,7 @@ let run_server t () = in let* () = Tezos_base_unix.Socket.handshake init_socket_fd Main.socket_magic in let* () = - Socket.send - init_socket_fd - Parameters.parameters_encoding - t.external_process_parameters + Socket.send init_socket_fd Parameters.parameters_encoding t.parameters in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6579 Workaround: increase default timeout. If the timeout is still not diff --git a/src/lib_rpc_process/rpc_process_worker.mli b/src/lib_rpc_process/rpc_process_worker.mli index 6d4500a4bee6..aae8e52743df 100644 --- a/src/lib_rpc_process/rpc_process_worker.mli +++ b/src/lib_rpc_process/rpc_process_worker.mli @@ -30,7 +30,7 @@ responsible of dealing with the process' lifetime. *) (** Type of the RPC process worker*) -type t +type process (** [create ~comm_socket_path config node_version internal_event_config] creates the worker initial state. [comm_socket_path] is a socket path that will be @@ -44,7 +44,7 @@ val create : Config_file.t -> Tezos_version.Octez_node_version.t -> Internal_event_config.t -> - t + process (** Starts the external RPC process using fork+exec calls. It implements a watch dog that is responsible of restarting the @@ -53,7 +53,7 @@ val create : until the restart is successful. The promise is blocking until the RPC server is fully available to answer to RPCs. *) -val start : t -> unit tzresult Lwt.t +val start : process -> unit tzresult Lwt.t (** Stops gracefully the RPC process worker*) -val stop : t -> unit Lwt.t +val stop : process -> unit Lwt.t -- GitLab From 3d33c72681106221f5d6e275af46c09cc88dc9fe Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 28 Oct 2024 16:45:39 +0100 Subject: [PATCH 02/10] Lib_rpc_process: pave the way to genericity -- step 2 (launchers) --- src/lib_rpc_process/rpc_process_worker.ml | 38 ++++++++++++++--------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index c37d2623d2d6..05027cf5a981 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -151,11 +151,11 @@ let stop t = Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; shutdown t -let run_server t () = +let run_process t ~process_name () = let open Lwt_result_syntax in let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in let socket_dir_arg = ["--socket-dir"; socket_dir] in - let args = "octez-rpc-process" :: socket_dir_arg in + let args = process_name :: socket_dir_arg in let process = Lwt_process.open_process_none ~stdout:(`FD_copy Unix.stdout) @@ -203,10 +203,11 @@ let run_server t () = t.server <- Some process ; return t -(* Evaluates [f]. If [f] fails, the error is caught, printed as an - error event, and [f] is re-evaluated after a [backoff] delay. The - delay increases at each failing try. *) -let rec may_start backoff f = +(* [run_with_backoff ~backoff f] evaluates [f]. If [f] fails, the + error is caught, printed as an error event, and [f] is re-evaluated + after a [backoff] delay. The delay increases at each failing + try. *) +let rec run_with_backoff ~backoff ~f = let open Lwt_result_syntax in let timestamp, sleep = backoff in let now = Time.System.now () in @@ -222,20 +223,22 @@ let rec may_start backoff f = cannot_start_rpc_process (Format.asprintf "%a" pp_print_trace errs)) in - may_start (Time.System.now (), sleep *. 1.2) f) + run_with_backoff ~backoff:(Time.System.now (), sleep *. 1.2) ~f) else let*! () = Event.(emit waiting_for_rpc_process_restart sleep) in let*! () = Lwt_unix.sleep sleep in - may_start (timestamp, sleep) f + run_with_backoff ~backoff:(timestamp, sleep) ~f -(* Watch_dog make sure that the RPC process is restarted as soon as it +(* [watch_dog ~start_new_server] make sure that the RPC process is restarted as soon as it dies. *) -let watch_dog run_server = +let watch_dog ~start_new_server = let open Lwt_result_syntax in let rec loop t = match t.server with | None -> - let* new_server = may_start (Time.System.epoch, 0.5) run_server in + let* new_server = + run_with_backoff ~backoff:(Time.System.epoch, 0.5) ~f:start_new_server + in loop new_server | Some process -> ( let wait_pid_t = @@ -260,13 +263,18 @@ let watch_dog run_server = let*! () = Event.(emit rpc_process_exited_abnormally) (process#pid, status) in - let* new_server = may_start (Time.System.epoch, 0.5) run_server in + let* new_server = + run_with_backoff + ~backoff:(Time.System.epoch, 0.5) + ~f:start_new_server + in loop new_server) in loop -let start server = +let start parameters = let open Lwt_result_syntax in - let* new_server = run_server server () in - let _ = watch_dog (run_server server) new_server in + let run_process = run_process parameters ~process_name:"octez-rpc-process" in + let* new_server = run_process () in + let _ = watch_dog ~start_new_server:run_process new_server in return_unit -- GitLab From 9ebbacfc7de609d3edefa655019d94eb850f908f Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 29 Oct 2024 10:26:12 +0100 Subject: [PATCH 03/10] Lib_rpc_process: pave the way to genericity -- step 3 (utilities) --- src/lib_rpc_process/main.ml | 19 ++++++------ src/lib_rpc_process/rpc_process_worker.ml | 35 +++++++++++++++++----- src/lib_rpc_process/rpc_process_worker.mli | 14 +++++++++ 3 files changed, 52 insertions(+), 16 deletions(-) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index c835de639bc4..b62f2a0cd45e 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -222,13 +222,6 @@ let init_rpc dynamic_store parameters head_watcher applied_blocks_watcher = in return_unit -let get_init_socket_path socket_dir pid = - let filename = Format.sprintf "init-rpc-socket-%d" pid in - Filename.concat socket_dir filename - -(* Magic bytes used for the external RPC process handshake. *) -let socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" - let create_init_socket socket_dir = let open Lwt_result_syntax in let* socket_dir = @@ -237,11 +230,19 @@ let create_init_socket socket_dir = | None -> tzfail Missing_socket_dir in let pid = Unix.getpid () in - let init_socket_path = get_init_socket_path socket_dir pid in + let init_socket_path = + Rpc_process_worker.get_init_socket_path + ~socket_dir + ~socket_prefix:Rpc_process_worker.rpc_process_socket_prefix + ~pid + () + in let* init_socket_fd = Socket.connect (Unix init_socket_path) in (* Unlink the socket as soon as both sides have opened it.*) let*! () = Lwt_unix.unlink init_socket_path in - let* () = Socket.handshake init_socket_fd socket_magic in + let* () = + Socket.handshake init_socket_fd Rpc_process_worker.rpc_process_socket_magic + in return init_socket_fd let run socket_dir = diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index 05027cf5a981..af1df898d8e9 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -117,6 +117,7 @@ type 'a t = { (* Resolver that will wakeup the above stop promise. *) stopper : (int * Unix.process_status) Lwt.u; parameters : 'a; + parameters_encoding : 'a Data_encoding.t; } type process = Parameters.t t @@ -136,6 +137,7 @@ let create ~comm_socket_path (config : Config_file.t) node_version events_config rpc_comm_socket_path = comm_socket_path; node_version; }; + parameters_encoding = Parameters.parameters_encoding; } let shutdown t = @@ -151,7 +153,20 @@ let stop t = Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; shutdown t -let run_process t ~process_name () = +let get_init_socket_path ~socket_dir ?socket_prefix ~pid () = + let socket_prefix = + match socket_prefix with + | Some v -> v + | None -> Filename.(temp_file ~temp_dir:"" "" "") + in + let filename = Format.sprintf "%s-%d" socket_prefix pid in + Filename.concat socket_dir filename + +let rpc_process_socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" + +let rpc_process_socket_prefix = "init-rpc-socket" + +let run_process t ~process_name ?socket_prefix ~handshake () = let open Lwt_result_syntax in let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in let socket_dir_arg = ["--socket-dir"; socket_dir] in @@ -163,7 +178,9 @@ let run_process t ~process_name () = (Sys.executable_name, Array.of_list args) in let pid = process#pid in - let init_socket_path = Main.get_init_socket_path socket_dir pid in + let init_socket_path = + get_init_socket_path ~socket_dir ?socket_prefix ~pid () + in let* init_socket_fd = let* fds = Tezos_base_unix.Socket.bind (Unix init_socket_path) in match fds with @@ -177,10 +194,8 @@ let run_process t ~process_name () = when binding Unix sockets. *) assert false in - let* () = Tezos_base_unix.Socket.handshake init_socket_fd Main.socket_magic in - let* () = - Socket.send init_socket_fd Parameters.parameters_encoding t.parameters - in + let* () = Tezos_base_unix.Socket.handshake init_socket_fd handshake in + let* () = Socket.send init_socket_fd t.parameters_encoding t.parameters in (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6579 Workaround: increase default timeout. If the timeout is still not enough and an Lwt_unix.Timeout is triggered, we display a @@ -274,7 +289,13 @@ let watch_dog ~start_new_server = let start parameters = let open Lwt_result_syntax in - let run_process = run_process parameters ~process_name:"octez-rpc-process" in + let run_process = + run_process + parameters + ~socket_prefix:rpc_process_socket_prefix + ~process_name:"octez-rpc-process" + ~handshake:rpc_process_socket_magic + in let* new_server = run_process () in let _ = watch_dog ~start_new_server:run_process new_server in return_unit diff --git a/src/lib_rpc_process/rpc_process_worker.mli b/src/lib_rpc_process/rpc_process_worker.mli index aae8e52743df..87f51373e1d6 100644 --- a/src/lib_rpc_process/rpc_process_worker.mli +++ b/src/lib_rpc_process/rpc_process_worker.mli @@ -57,3 +57,17 @@ val start : process -> unit tzresult Lwt.t (** Stops gracefully the RPC process worker*) val stop : process -> unit Lwt.t + +(** [get_init_socket_path ~socket_dir ?socket_prefix ~pid ()] + generates the socket path in which the socket will be created. The + socket will be named from the [?socket_prefix] (a random filename + is generated if none) and the [pid] and will be located in + [socket_dir]. *) +val get_init_socket_path : + socket_dir:string -> ?socket_prefix:string -> pid:int -> unit -> string + +(** Magic bytes used for the external RPC process handshake. *) +val rpc_process_socket_magic : bytes + +(** Name of the shared socket prefix of the RPC process. *) +val rpc_process_socket_prefix : string -- GitLab From 55bafe115328c681aa14d9e16ddd3a18aa4056a4 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 29 Oct 2024 15:07:11 +0100 Subject: [PATCH 04/10] Lib_rpc_process: pave the way to genericity -- step 4 (events) --- src/lib_rpc_process/rpc_process_worker.ml | 62 +++++++++++++++-------- 1 file changed, 40 insertions(+), 22 deletions(-) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index af1df898d8e9..55bf4b29c6fb 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -22,28 +22,44 @@ let () = (function RPC_process_init_too_slow -> Some () | _ -> None) (fun () -> RPC_process_init_too_slow) -module Event = struct +module type NAME = sig + val base : string list + + val component : string list +end + +module Name : NAME = struct + let base = ["node"; "main"] + + let component = ["rpc"; "process"] +end + +module MakeEvent (N : NAME) = struct include Internal_event.Simple - let section = ["node"; "main"] + let section = N.base + + let component_base_name = String.concat "_" N.component - let shutting_down_rpc_process = + let component_name = String.concat " " N.component + + let shutting_down_process = declare_0 ~section - ~name:"shutting_down_rpc_process" - ~msg:"shutting down the RPC process" + ~name:(Format.sprintf "shutting_down_%s" component_base_name) + ~msg:(Format.sprintf "shutting down the %s" component_name) ~level:Notice () - let rpc_process_started = + let process_started = declare_1 ~section - ~name:"rpc_process_started" - ~msg:"RPC process was started on pid {pid}" + ~name:(Format.sprintf "%s_started" component_base_name) + ~msg:(Format.sprintf "%s was started on pid {pid}" component_name) ~level:Notice ("pid", Data_encoding.int31) - let rpc_process_exited_abnormally = + let process_exited_abnormally = let open Unix in let exit_status_encoding = let open Data_encoding in @@ -72,8 +88,8 @@ module Event = struct declare_2 ~section ~level:Error - ~name:"rpc_process_exited_status" - ~msg:"rpc process (pid {pid}) {status_msg}" + ~name:(Format.sprintf "%s_exited_status" component_base_name) + ~msg:(Format.sprintf "%s (pid {pid}) {status_msg}" component_name) ("pid", Data_encoding.int31) ~pp2:(fun fmt status -> match status with @@ -91,23 +107,25 @@ module Event = struct (Lwt_exit.signal_name i)) ("status_msg", exit_status_encoding) - let cannot_start_rpc_process = + let cannot_start_process = declare_1 ~section - ~name:"cannot_start_rpc_process" + ~name:(Format.sprintf "cannot_start_%s" component_base_name) ~level:Error - ~msg:"cannot start rpc process: {trace}" + ~msg:(Format.sprintf "cannot start %s: {trace}" component_name) ("trace", Data_encoding.string) - let waiting_for_rpc_process_restart = + let waiting_for_process_restart = declare_1 ~section - ~name:"waiting_for_rpc_process_restart" + ~name:(Format.sprintf "waiting_for_%s_restart" component_base_name) ~level:Error - ~msg:"restarting RPC process in {sleep} seconds" + ~msg:(Format.sprintf "restarting %s in {sleep} seconds" component_name) ("sleep", Data_encoding.float) end +module Event = MakeEvent (Name) + (* State of the worker. *) type 'a t = { mutable server : Lwt_process.process_none option; @@ -145,7 +163,7 @@ let shutdown t = match t.server with | None -> return_unit | Some process -> - let* () = Event.(emit shutting_down_rpc_process) () in + let* () = Event.(emit shutting_down_process) () in process#terminate ; return_unit @@ -214,7 +232,7 @@ let run_process t ~process_name ?socket_prefix ~handshake () = | e -> fail e) in let*! () = Lwt_unix.close init_socket_fd in - let*! () = Event.(emit rpc_process_started) pid in + let*! () = Event.(emit process_started) pid in t.server <- Some process ; return t @@ -235,12 +253,12 @@ let rec run_with_backoff ~backoff ~f = let*! () = Event.( emit - cannot_start_rpc_process + cannot_start_process (Format.asprintf "%a" pp_print_trace errs)) in run_with_backoff ~backoff:(Time.System.now (), sleep *. 1.2) ~f) else - let*! () = Event.(emit waiting_for_rpc_process_restart sleep) in + let*! () = Event.(emit waiting_for_process_restart sleep) in let*! () = Lwt_unix.sleep sleep in run_with_backoff ~backoff:(timestamp, sleep) ~f @@ -276,7 +294,7 @@ let watch_dog ~start_new_server = | `Wait_pid status -> t.server <- None ; let*! () = - Event.(emit rpc_process_exited_abnormally) (process#pid, status) + Event.(emit process_exited_abnormally) (process#pid, status) in let* new_server = run_with_backoff -- GitLab From 4a0d23805235a7be692ff86ba22eb15a41e3b48f Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 29 Oct 2024 15:11:38 +0100 Subject: [PATCH 05/10] Lib_rpc_process: pave the way to genericity -- step 5 (errors) --- src/lib_rpc_process/rpc_process_worker.ml | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index 55bf4b29c6fb..f08f711799c1 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -5,22 +5,22 @@ (* *) (*****************************************************************************) -type error += RPC_process_init_too_slow +type error += Process_init_too_slow let () = register_error_kind `Permanent - ~id:"rpc_process_worker.RPC_process_init_too_slow" - ~title:"RPC process init too slow" - ~description:"RPC process init too slow" + ~id:"process_worker.Process_init_too_slow" + ~title:"Process init too slow" + ~description:"Process init too slow" ~pp:(fun ppf () -> Format.fprintf ppf - "RPC process init timeout: too slow to start. This is certainly due to \ - the slow DAL initialization.") + "Process init timeout: too slow to start. This is certainly due to the \ + slow DAL initialization.") Data_encoding.unit - (function RPC_process_init_too_slow -> Some () | _ -> None) - (fun () -> RPC_process_init_too_slow) + (function Process_init_too_slow -> Some () | _ -> None) + (fun () -> Process_init_too_slow) module type NAME = sig val base : string list @@ -228,7 +228,7 @@ let run_process t ~process_name ?socket_prefix ~handshake () = when List.exists (function Exn Lwt_unix.Timeout -> true | _ -> false) err -> - tzfail RPC_process_init_too_slow + tzfail Process_init_too_slow | e -> fail e) in let*! () = Lwt_unix.close init_socket_fd in -- GitLab From a59c550857732a3d63ce98f0627bb2d3298e7017 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 29 Oct 2024 16:32:44 +0100 Subject: [PATCH 06/10] Base: move process_watchdog to dedicated library --- src/lib_base/unix/lwt_process_watchdog.ml | 304 ++++++++++++++++++++ src/lib_base/unix/lwt_process_watchdog.mli | 66 +++++ src/lib_rpc_process/main.ml | 2 +- src/lib_rpc_process/rpc_process_worker.ml | 308 ++------------------- src/lib_rpc_process/rpc_process_worker.mli | 8 - 5 files changed, 394 insertions(+), 294 deletions(-) create mode 100644 src/lib_base/unix/lwt_process_watchdog.ml create mode 100644 src/lib_base/unix/lwt_process_watchdog.mli diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml new file mode 100644 index 000000000000..5316effceaf5 --- /dev/null +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -0,0 +1,304 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2023-2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +type error += Process_init_too_slow + +let () = + register_error_kind + `Permanent + ~id:"process_worker.Process_init_too_slow" + ~title:"Process init too slow" + ~description:"Process init too slow" + ~pp:(fun ppf () -> + Format.fprintf + ppf + "Process init timeout: too slow to start. This is certainly due to the \ + slow DAL initialization.") + Data_encoding.unit + (function Process_init_too_slow -> Some () | _ -> None) + (fun () -> Process_init_too_slow) + +module type NAME = sig + val base : string list + + val component : string list +end + +module type EVENTS = sig + open Internal_event.Simple + + val emit : 'a t -> 'a -> unit Lwt.t + + val shutting_down_process : unit t + + val process_started : int t + + val process_exited_abnormally : (int * Unix.process_status) t + + val cannot_start_process : string t + + val waiting_for_process_restart : float t +end + +module MakeEvent (N : NAME) : EVENTS = struct + let emit = Internal_event.Simple.emit + + let section = N.base + + let component_base_name = String.concat "_" N.component + + let component_name = String.concat " " N.component + + let shutting_down_process = + let open Internal_event.Simple in + declare_0 + ~section + ~name:(Format.sprintf "shutting_down_%s" component_base_name) + ~msg:(Format.sprintf "shutting down the %s" component_name) + ~level:Notice + () + + let process_started = + let open Internal_event.Simple in + declare_1 + ~section + ~name:(Format.sprintf "%s_started" component_base_name) + ~msg:(Format.sprintf "%s was started on pid {pid}" component_name) + ~level:Notice + ("pid", Data_encoding.int31) + + let process_exited_abnormally = + let open Unix in + let exit_status_encoding = + let open Data_encoding in + union + [ + case + (Tag 0) + ~title:"wexited" + int31 + (function WEXITED i -> Some i | _ -> None) + (fun i -> WEXITED i); + case + (Tag 1) + ~title:"wsignaled" + int31 + (function WSIGNALED i -> Some i | _ -> None) + (fun i -> WSIGNALED i); + case + (Tag 2) + ~title:"wstopped" + int31 + (function WSTOPPED i -> Some i | _ -> None) + (fun i -> WSTOPPED i); + ] + in + let open Internal_event.Simple in + declare_2 + ~section + ~level:Error + ~name:(Format.sprintf "%s_exited_status" component_base_name) + ~msg:(Format.sprintf "%s (pid {pid}) {status_msg}" component_name) + ("pid", Data_encoding.int31) + ~pp2:(fun fmt status -> + match status with + | WEXITED i -> + Format.fprintf fmt "terminated abnormally with exit code %i" i + | WSIGNALED i -> + Format.fprintf + fmt + "was killed by signal %s" + (Lwt_exit.signal_name i) + | WSTOPPED i -> + Format.fprintf + fmt + "was stopped by signal %s" + (Lwt_exit.signal_name i)) + ("status_msg", exit_status_encoding) + + let cannot_start_process = + let open Internal_event.Simple in + declare_1 + ~section + ~name:(Format.sprintf "cannot_start_%s" component_base_name) + ~level:Error + ~msg:(Format.sprintf "cannot start %s: {trace}" component_name) + ("trace", Data_encoding.string) + + let waiting_for_process_restart = + let open Internal_event.Simple in + declare_1 + ~section + ~name:(Format.sprintf "waiting_for_%s_restart" component_base_name) + ~level:Error + ~msg:(Format.sprintf "restarting %s in {sleep} seconds" component_name) + ("sleep", Data_encoding.float) +end + +(* State of the worker. *) +type 'a t = { + mutable server : Lwt_process.process_none option; + (* Promise that aims to be resolved as soon as the server is + shutting down. *) + stop : (int * Unix.process_status) Lwt.t; + (* Resolver that will wakeup the above stop promise. *) + stopper : (int * Unix.process_status) Lwt.u; + parameters : 'a; + parameters_encoding : 'a Data_encoding.t; +} + +(** [get_init_socket_path ~socket_dir ?socket_prefix ~pid ()] + generates the socket path in which the socket will be created. The + socket will be named from the [?socket_prefix] (a random filename + is generated if none) and the [pid] and will be located in + [socket_dir]. *) +let get_init_socket_path ~socket_dir ?socket_prefix ~pid () = + let socket_prefix = + match socket_prefix with + | Some v -> v + | None -> Filename.(temp_file ~temp_dir:"" "" "") + in + let filename = Format.sprintf "%s-%d" socket_prefix pid in + Filename.concat socket_dir filename + +module Daemon (Event : EVENTS) = struct + let shutdown t = + let open Lwt_syntax in + match t.server with + | None -> return_unit + | Some process -> + let* () = Event.(emit shutting_down_process) () in + process#terminate ; + return_unit + + let stop t = + Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; + shutdown t + + let run_process t ~process_name ?socket_prefix ~handshake () = + let open Lwt_result_syntax in + let socket_dir = Socket.get_temporary_socket_dir () in + let socket_dir_arg = ["--socket-dir"; socket_dir] in + let args = process_name :: socket_dir_arg in + let process = + Lwt_process.open_process_none + ~stdout:(`FD_copy Unix.stdout) + ~stderr:(`FD_copy Unix.stderr) + (Sys.executable_name, Array.of_list args) + in + let pid = process#pid in + let init_socket_path = + get_init_socket_path ~socket_dir ?socket_prefix ~pid () + in + let* init_socket_fd = + let* fds = Socket.bind (Unix init_socket_path) in + match fds with + | [fd] -> + let*! init_socket_fd, _ = Lwt_unix.accept ~cloexec:true fd in + let*! () = Lwt_unix.close fd in + return init_socket_fd + | _ -> + (* This assertions holds as long as + Tezos_base_unix.Socket.bind returns a single list element + when binding Unix sockets. *) + assert false + in + let* () = Socket.handshake init_socket_fd handshake in + let* () = Socket.send init_socket_fd t.parameters_encoding t.parameters in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6579 + Workaround: increase default timeout. If the timeout is still not + enough and an Lwt_unix.Timeout is triggered, we display a + comprehensive message. + *) + let timeout = Ptime.Span.of_int_s 120 in + let* () = + protect + (fun () -> Socket.recv ~timeout init_socket_fd Data_encoding.unit) + ~on_error:(function + | err + when List.exists + (function Exn Lwt_unix.Timeout -> true | _ -> false) + err -> + tzfail Process_init_too_slow + | e -> fail e) + in + let*! () = Lwt_unix.close init_socket_fd in + let*! () = Event.(emit process_started) pid in + t.server <- Some process ; + return t + + (* [run_with_backoff ~backoff f] evaluates [f]. If [f] fails, the + error is caught, printed as an error event, and [f] is re-evaluated + after a [backoff] delay. The delay increases at each failing + try. *) + let rec run_with_backoff ~backoff ~f = + let open Lwt_result_syntax in + let timestamp, sleep = backoff in + let now = Time.System.now () in + let diff = Ptime.diff now timestamp in + if Ptime.Span.to_float_s diff > sleep then + protect + (fun () -> f ()) + ~on_error:(function + | errs -> + let*! () = + Event.( + emit + cannot_start_process + (Format.asprintf "%a" pp_print_trace errs)) + in + run_with_backoff ~backoff:(Time.System.now (), sleep *. 1.2) ~f) + else + let*! () = Event.(emit waiting_for_process_restart sleep) in + let*! () = Lwt_unix.sleep sleep in + run_with_backoff ~backoff:(timestamp, sleep) ~f + + (* [watch_dog ~start_new_server] make sure that the RPC process is restarted as soon as it + dies. *) + let watch_dog ~start_new_server = + let open Lwt_result_syntax in + let rec loop t = + match t.server with + | None -> + let* new_server = + run_with_backoff + ~backoff:(Time.System.epoch, 0.5) + ~f:start_new_server + in + loop new_server + | Some process -> ( + let wait_pid_t = + let*! _, status = Lwt_unix.waitpid [] process#pid in + (* Sleep is necessary here to avoid waitpid to be faster than + the Lwt_exit stack. It avoids the clean_up_starts to be + pending while the node is properly shutting down. *) + let*! () = Lwt_unix.sleep 1. in + Lwt.return (`Wait_pid status) + in + let stop_t = + let*! _ = t.stop in + Lwt.return `Stopped + in + let*! res = Lwt.choose [wait_pid_t; stop_t] in + match res with + | `Stopped -> return_unit + | `Wait_pid _ when not (Lwt.is_sleeping Lwt_exit.clean_up_starts) -> + return_unit + | `Wait_pid status -> + t.server <- None ; + let*! () = + Event.(emit process_exited_abnormally) (process#pid, status) + in + let* new_server = + run_with_backoff + ~backoff:(Time.System.epoch, 0.5) + ~f:start_new_server + in + loop new_server) + in + loop +end diff --git a/src/lib_base/unix/lwt_process_watchdog.mli b/src/lib_base/unix/lwt_process_watchdog.mli new file mode 100644 index 000000000000..5d5d91c12c1d --- /dev/null +++ b/src/lib_base/unix/lwt_process_watchdog.mli @@ -0,0 +1,66 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2023-2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +module type NAME = sig + val base : string list + + val component : string list +end + +module type EVENTS = sig + open Internal_event.Simple + + val emit : 'a t -> 'a -> unit Lwt.t + + val shutting_down_process : unit t + + val process_started : int t + + val process_exited_abnormally : (int * Unix.process_status) t + + val cannot_start_process : string t + + val waiting_for_process_restart : float t +end + +module MakeEvent : functor (N : NAME) -> EVENTS + +type 'a t = { + mutable server : Lwt_process.process_none option; + (* Promise that aims to be resolved as soon as the server is + shutting down. *) + stop : (int * Unix.process_status) Lwt.t; + (* Resolver that will wakeup the above stop promise. *) + stopper : (int * Unix.process_status) Lwt.u; + parameters : 'a; + parameters_encoding : 'a Data_encoding.t; +} + +val get_init_socket_path : + socket_dir:string -> ?socket_prefix:string -> pid:int -> unit -> string + +module Daemon : functor (Event : EVENTS) -> sig + val stop : 'a t -> unit Lwt.t + + val run_process : + 'a t -> + process_name:string -> + ?socket_prefix:string -> + handshake:bytes -> + unit -> + 'a t tzresult Lwt.t + + val run_with_backoff : + backoff:Ptime.t * float -> + f:(unit -> 'a tzresult Lwt.t) -> + 'a tzresult Lwt.t + + val watch_dog : + start_new_server:(unit -> 'a t tzresult Lwt.t) -> + 'a t -> + (unit, tztrace) result Lwt.t +end diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index b62f2a0cd45e..78002b6fa568 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -231,7 +231,7 @@ let create_init_socket socket_dir = in let pid = Unix.getpid () in let init_socket_path = - Rpc_process_worker.get_init_socket_path + Lwt_process_watchdog.get_init_socket_path ~socket_dir ~socket_prefix:Rpc_process_worker.rpc_process_socket_prefix ~pid diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index f08f711799c1..3621ed5252e9 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -5,315 +5,53 @@ (* *) (*****************************************************************************) -type error += Process_init_too_slow - -let () = - register_error_kind - `Permanent - ~id:"process_worker.Process_init_too_slow" - ~title:"Process init too slow" - ~description:"Process init too slow" - ~pp:(fun ppf () -> - Format.fprintf - ppf - "Process init timeout: too slow to start. This is certainly due to the \ - slow DAL initialization.") - Data_encoding.unit - (function Process_init_too_slow -> Some () | _ -> None) - (fun () -> Process_init_too_slow) - -module type NAME = sig - val base : string list - - val component : string list -end - -module Name : NAME = struct +module Name : Lwt_process_watchdog.NAME = struct let base = ["node"; "main"] let component = ["rpc"; "process"] end -module MakeEvent (N : NAME) = struct - include Internal_event.Simple - - let section = N.base - - let component_base_name = String.concat "_" N.component - - let component_name = String.concat " " N.component - - let shutting_down_process = - declare_0 - ~section - ~name:(Format.sprintf "shutting_down_%s" component_base_name) - ~msg:(Format.sprintf "shutting down the %s" component_name) - ~level:Notice - () - - let process_started = - declare_1 - ~section - ~name:(Format.sprintf "%s_started" component_base_name) - ~msg:(Format.sprintf "%s was started on pid {pid}" component_name) - ~level:Notice - ("pid", Data_encoding.int31) - - let process_exited_abnormally = - let open Unix in - let exit_status_encoding = - let open Data_encoding in - union - [ - case - (Tag 0) - ~title:"wexited" - int31 - (function WEXITED i -> Some i | _ -> None) - (fun i -> WEXITED i); - case - (Tag 1) - ~title:"wsignaled" - int31 - (function WSIGNALED i -> Some i | _ -> None) - (fun i -> WSIGNALED i); - case - (Tag 2) - ~title:"wstopped" - int31 - (function WSTOPPED i -> Some i | _ -> None) - (fun i -> WSTOPPED i); - ] - in - declare_2 - ~section - ~level:Error - ~name:(Format.sprintf "%s_exited_status" component_base_name) - ~msg:(Format.sprintf "%s (pid {pid}) {status_msg}" component_name) - ("pid", Data_encoding.int31) - ~pp2:(fun fmt status -> - match status with - | WEXITED i -> - Format.fprintf fmt "terminated abnormally with exit code %i" i - | WSIGNALED i -> - Format.fprintf - fmt - "was killed by signal %s" - (Lwt_exit.signal_name i) - | WSTOPPED i -> - Format.fprintf - fmt - "was stopped by signal %s" - (Lwt_exit.signal_name i)) - ("status_msg", exit_status_encoding) - - let cannot_start_process = - declare_1 - ~section - ~name:(Format.sprintf "cannot_start_%s" component_base_name) - ~level:Error - ~msg:(Format.sprintf "cannot start %s: {trace}" component_name) - ("trace", Data_encoding.string) - - let waiting_for_process_restart = - declare_1 - ~section - ~name:(Format.sprintf "waiting_for_%s_restart" component_base_name) - ~level:Error - ~msg:(Format.sprintf "restarting %s in {sleep} seconds" component_name) - ("sleep", Data_encoding.float) -end - -module Event = MakeEvent (Name) +module Event : Lwt_process_watchdog.EVENTS = + Lwt_process_watchdog.MakeEvent (Name) -(* State of the worker. *) -type 'a t = { - mutable server : Lwt_process.process_none option; - (* Promise that aims to be resolved as soon as the server is - shutting down. *) - stop : (int * Unix.process_status) Lwt.t; - (* Resolver that will wakeup the above stop promise. *) - stopper : (int * Unix.process_status) Lwt.u; - parameters : 'a; - parameters_encoding : 'a Data_encoding.t; -} - -type process = Parameters.t t +type process = Parameters.t Lwt_process_watchdog.t let create ~comm_socket_path (config : Config_file.t) node_version events_config = let stop, stopper = Lwt.wait () in - { - server = None; - stop; - stopper; - parameters = - Parameters. - { - internal_events = events_config; - config; - rpc_comm_socket_path = comm_socket_path; - node_version; - }; - parameters_encoding = Parameters.parameters_encoding; - } - -let shutdown t = - let open Lwt_syntax in - match t.server with - | None -> return_unit - | Some process -> - let* () = Event.(emit shutting_down_process) () in - process#terminate ; - return_unit - -let stop t = - Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; - shutdown t - -let get_init_socket_path ~socket_dir ?socket_prefix ~pid () = - let socket_prefix = - match socket_prefix with - | Some v -> v - | None -> Filename.(temp_file ~temp_dir:"" "" "") - in - let filename = Format.sprintf "%s-%d" socket_prefix pid in - Filename.concat socket_dir filename + Lwt_process_watchdog. + { + server = None; + stop; + stopper; + parameters = + Parameters. + { + internal_events = events_config; + config; + rpc_comm_socket_path = comm_socket_path; + node_version; + }; + parameters_encoding = Parameters.parameters_encoding; + } let rpc_process_socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" let rpc_process_socket_prefix = "init-rpc-socket" -let run_process t ~process_name ?socket_prefix ~handshake () = - let open Lwt_result_syntax in - let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in - let socket_dir_arg = ["--socket-dir"; socket_dir] in - let args = process_name :: socket_dir_arg in - let process = - Lwt_process.open_process_none - ~stdout:(`FD_copy Unix.stdout) - ~stderr:(`FD_copy Unix.stderr) - (Sys.executable_name, Array.of_list args) - in - let pid = process#pid in - let init_socket_path = - get_init_socket_path ~socket_dir ?socket_prefix ~pid () - in - let* init_socket_fd = - let* fds = Tezos_base_unix.Socket.bind (Unix init_socket_path) in - match fds with - | [fd] -> - let*! init_socket_fd, _ = Lwt_unix.accept ~cloexec:true fd in - let*! () = Lwt_unix.close fd in - return init_socket_fd - | _ -> - (* This assertions holds as long as - Tezos_base_unix.Socket.bind returns a single list element - when binding Unix sockets. *) - assert false - in - let* () = Tezos_base_unix.Socket.handshake init_socket_fd handshake in - let* () = Socket.send init_socket_fd t.parameters_encoding t.parameters in - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/6579 - Workaround: increase default timeout. If the timeout is still not - enough and an Lwt_unix.Timeout is triggered, we display a - comprehensive message. - *) - let timeout = Ptime.Span.of_int_s 120 in - let* () = - protect - (fun () -> Socket.recv ~timeout init_socket_fd Data_encoding.unit) - ~on_error:(function - | err - when List.exists - (function Exn Lwt_unix.Timeout -> true | _ -> false) - err -> - tzfail Process_init_too_slow - | e -> fail e) - in - let*! () = Lwt_unix.close init_socket_fd in - let*! () = Event.(emit process_started) pid in - t.server <- Some process ; - return t - -(* [run_with_backoff ~backoff f] evaluates [f]. If [f] fails, the - error is caught, printed as an error event, and [f] is re-evaluated - after a [backoff] delay. The delay increases at each failing - try. *) -let rec run_with_backoff ~backoff ~f = - let open Lwt_result_syntax in - let timestamp, sleep = backoff in - let now = Time.System.now () in - let diff = Ptime.diff now timestamp in - if Ptime.Span.to_float_s diff > sleep then - protect - (fun () -> f ()) - ~on_error:(function - | errs -> - let*! () = - Event.( - emit - cannot_start_process - (Format.asprintf "%a" pp_print_trace errs)) - in - run_with_backoff ~backoff:(Time.System.now (), sleep *. 1.2) ~f) - else - let*! () = Event.(emit waiting_for_process_restart sleep) in - let*! () = Lwt_unix.sleep sleep in - run_with_backoff ~backoff:(timestamp, sleep) ~f +module Watchdog = Lwt_process_watchdog.Daemon (Event) -(* [watch_dog ~start_new_server] make sure that the RPC process is restarted as soon as it - dies. *) -let watch_dog ~start_new_server = - let open Lwt_result_syntax in - let rec loop t = - match t.server with - | None -> - let* new_server = - run_with_backoff ~backoff:(Time.System.epoch, 0.5) ~f:start_new_server - in - loop new_server - | Some process -> ( - let wait_pid_t = - let*! _, status = Lwt_unix.waitpid [] process#pid in - (* Sleep is necessary here to avoid waitpid to be faster than - the Lwt_exit stack. It avoids the clean_up_starts to be - pending while the node is properly shutting down. *) - let*! () = Lwt_unix.sleep 1. in - Lwt.return (`Wait_pid status) - in - let stop_t = - let*! _ = t.stop in - Lwt.return `Stopped - in - let*! res = Lwt.choose [wait_pid_t; stop_t] in - match res with - | `Stopped -> return_unit - | `Wait_pid _ when not (Lwt.is_sleeping Lwt_exit.clean_up_starts) -> - return_unit - | `Wait_pid status -> - t.server <- None ; - let*! () = - Event.(emit process_exited_abnormally) (process#pid, status) - in - let* new_server = - run_with_backoff - ~backoff:(Time.System.epoch, 0.5) - ~f:start_new_server - in - loop new_server) - in - loop +let stop (t : process) = Watchdog.stop t let start parameters = let open Lwt_result_syntax in let run_process = - run_process + Watchdog.run_process parameters ~socket_prefix:rpc_process_socket_prefix ~process_name:"octez-rpc-process" ~handshake:rpc_process_socket_magic in let* new_server = run_process () in - let _ = watch_dog ~start_new_server:run_process new_server in + let _ = Watchdog.watch_dog ~start_new_server:run_process new_server in return_unit diff --git a/src/lib_rpc_process/rpc_process_worker.mli b/src/lib_rpc_process/rpc_process_worker.mli index 87f51373e1d6..f66d2bb5f949 100644 --- a/src/lib_rpc_process/rpc_process_worker.mli +++ b/src/lib_rpc_process/rpc_process_worker.mli @@ -58,14 +58,6 @@ val start : process -> unit tzresult Lwt.t (** Stops gracefully the RPC process worker*) val stop : process -> unit Lwt.t -(** [get_init_socket_path ~socket_dir ?socket_prefix ~pid ()] - generates the socket path in which the socket will be created. The - socket will be named from the [?socket_prefix] (a random filename - is generated if none) and the [pid] and will be located in - [socket_dir]. *) -val get_init_socket_path : - socket_dir:string -> ?socket_prefix:string -> pid:int -> unit -> string - (** Magic bytes used for the external RPC process handshake. *) val rpc_process_socket_magic : bytes -- GitLab From b6cfbf9e1e0d6e598d12afb82f4741a592f84666 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 30 Oct 2024 16:07:18 +0100 Subject: [PATCH 07/10] Lwt_process_watchdog: abstract create function --- src/lib_base/unix/lwt_process_watchdog.ml | 4 ++++ src/lib_base/unix/lwt_process_watchdog.mli | 15 ++++-------- src/lib_rpc_process/rpc_process_worker.ml | 28 ++++++++++------------ 3 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml index 5316effceaf5..7ea08c621c79 100644 --- a/src/lib_base/unix/lwt_process_watchdog.ml +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -151,6 +151,10 @@ type 'a t = { parameters_encoding : 'a Data_encoding.t; } +let create ~parameters ~parameters_encoding = + let stop, stopper = Lwt.wait () in + {server = None; stop; stopper; parameters; parameters_encoding} + (** [get_init_socket_path ~socket_dir ?socket_prefix ~pid ()] generates the socket path in which the socket will be created. The socket will be named from the [?socket_prefix] (a random filename diff --git a/src/lib_base/unix/lwt_process_watchdog.mli b/src/lib_base/unix/lwt_process_watchdog.mli index 5d5d91c12c1d..6397954e4a1e 100644 --- a/src/lib_base/unix/lwt_process_watchdog.mli +++ b/src/lib_base/unix/lwt_process_watchdog.mli @@ -29,16 +29,11 @@ end module MakeEvent : functor (N : NAME) -> EVENTS -type 'a t = { - mutable server : Lwt_process.process_none option; - (* Promise that aims to be resolved as soon as the server is - shutting down. *) - stop : (int * Unix.process_status) Lwt.t; - (* Resolver that will wakeup the above stop promise. *) - stopper : (int * Unix.process_status) Lwt.u; - parameters : 'a; - parameters_encoding : 'a Data_encoding.t; -} +type 'a t + +(** [create ~parameters ~parameters_encoding] creates a watchdog + state, ready to be passed to the [Daemon] runner. *) +val create : parameters:'a -> parameters_encoding:'a encoding -> 'a t val get_init_socket_path : socket_dir:string -> ?socket_prefix:string -> pid:int -> unit -> string diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index 3621ed5252e9..a11cb427ada3 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -18,22 +18,18 @@ type process = Parameters.t Lwt_process_watchdog.t let create ~comm_socket_path (config : Config_file.t) node_version events_config = - let stop, stopper = Lwt.wait () in - Lwt_process_watchdog. - { - server = None; - stop; - stopper; - parameters = - Parameters. - { - internal_events = events_config; - config; - rpc_comm_socket_path = comm_socket_path; - node_version; - }; - parameters_encoding = Parameters.parameters_encoding; - } + let parameters = + Parameters. + { + internal_events = events_config; + config; + rpc_comm_socket_path = comm_socket_path; + node_version; + } + in + Lwt_process_watchdog.create + ~parameters + ~parameters_encoding:Parameters.parameters_encoding let rpc_process_socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" -- GitLab From 4c3df37de7f16a349787cef476d6d45a168281b5 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 30 Oct 2024 16:12:38 +0100 Subject: [PATCH 08/10] Lwt_process_watchdog: run_process now dedicated to socket's flavour --- src/lib_base/unix/lwt_process_watchdog.ml | 8 ++++++-- src/lib_base/unix/lwt_process_watchdog.mli | 17 ++++++++++++++++- src/lib_rpc_process/rpc_process_worker.ml | 2 +- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml index 7ea08c621c79..c75f055c3cf1 100644 --- a/src/lib_base/unix/lwt_process_watchdog.ml +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -183,16 +183,20 @@ module Daemon (Event : EVENTS) = struct Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; shutdown t - let run_process t ~process_name ?socket_prefix ~handshake () = + let run_process_with_sockets t ~process_name ?socket_prefix ?executable_name + ~handshake () = let open Lwt_result_syntax in let socket_dir = Socket.get_temporary_socket_dir () in let socket_dir_arg = ["--socket-dir"; socket_dir] in let args = process_name :: socket_dir_arg in + let executable_name = + Option.value executable_name ~default:Sys.executable_name + in let process = Lwt_process.open_process_none ~stdout:(`FD_copy Unix.stdout) ~stderr:(`FD_copy Unix.stderr) - (Sys.executable_name, Array.of_list args) + (executable_name, Array.of_list args) in let pid = process#pid in let init_socket_path = diff --git a/src/lib_base/unix/lwt_process_watchdog.mli b/src/lib_base/unix/lwt_process_watchdog.mli index 6397954e4a1e..a1d5510abdd6 100644 --- a/src/lib_base/unix/lwt_process_watchdog.mli +++ b/src/lib_base/unix/lwt_process_watchdog.mli @@ -41,10 +41,25 @@ val get_init_socket_path : module Daemon : functor (Event : EVENTS) -> sig val stop : 'a t -> unit Lwt.t - val run_process : + (** [run_process_with_sockets t ~process_name ?socket_prefix + ?executable_name ~handshake ()] starts a + [Lwt_process.process_none] depending on the given [process_name] + and [executable_name] parameters. If [executable_name] is + passed, then the process will be run thanks to the path to this + binary. Otherwise, the current binary name will be used as a + forked process. [process_name] aims to be the entry point of the + binary, that may differ from the [executable_name] in case of + fork. The [stdout] and [stderr] are redirected to the default + Unix streams. + + [socket_prefix] and [handshake] are used to setup the + communication, through a socket, with the created process. The + values ares expected to be defined accordingly to both parts. *) + val run_process_with_sockets : 'a t -> process_name:string -> ?socket_prefix:string -> + ?executable_name:string -> handshake:bytes -> unit -> 'a t tzresult Lwt.t diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml index a11cb427ada3..1f68ec7f96e4 100644 --- a/src/lib_rpc_process/rpc_process_worker.ml +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -42,7 +42,7 @@ let stop (t : process) = Watchdog.stop t let start parameters = let open Lwt_result_syntax in let run_process = - Watchdog.run_process + Watchdog.run_process_with_sockets parameters ~socket_prefix:rpc_process_socket_prefix ~process_name:"octez-rpc-process" -- GitLab From c3ed55f3736b9e51872d50599625c9953ece5d51 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 30 Oct 2024 16:24:04 +0100 Subject: [PATCH 09/10] Lwt_process_watchdog: document interface --- src/lib_base/unix/lwt_process_watchdog.ml | 3 +++ src/lib_base/unix/lwt_process_watchdog.mli | 10 +++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml index c75f055c3cf1..f1404f2892fb 100644 --- a/src/lib_base/unix/lwt_process_watchdog.ml +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -147,7 +147,10 @@ type 'a t = { stop : (int * Unix.process_status) Lwt.t; (* Resolver that will wakeup the above stop promise. *) stopper : (int * Unix.process_status) Lwt.u; + (* The parameters that will be passed to the process after the + handshake.*) parameters : 'a; + (* The parameters encoding associated to the above field. *) parameters_encoding : 'a Data_encoding.t; } diff --git a/src/lib_base/unix/lwt_process_watchdog.mli b/src/lib_base/unix/lwt_process_watchdog.mli index a1d5510abdd6..f40a87c47ca6 100644 --- a/src/lib_base/unix/lwt_process_watchdog.mli +++ b/src/lib_base/unix/lwt_process_watchdog.mli @@ -39,6 +39,8 @@ val get_init_socket_path : socket_dir:string -> ?socket_prefix:string -> pid:int -> unit -> string module Daemon : functor (Event : EVENTS) -> sig + (** [stop t] stops the process handled by the watchdog daemon as + soon as it is called. *) val stop : 'a t -> unit Lwt.t (** [run_process_with_sockets t ~process_name ?socket_prefix @@ -64,11 +66,9 @@ module Daemon : functor (Event : EVENTS) -> sig unit -> 'a t tzresult Lwt.t - val run_with_backoff : - backoff:Ptime.t * float -> - f:(unit -> 'a tzresult Lwt.t) -> - 'a tzresult Lwt.t - + (** [watch_dog ~start_new_server t] takes a running process [t] and + make sure it runs well. If it crashed, it will restart the + process using the given [start_new_server] callback. *) val watch_dog : start_new_server:(unit -> 'a t tzresult Lwt.t) -> 'a t -> -- GitLab From 6b57cbd03d3c888cc45cbac4fc395cc44dcce056 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 13 Nov 2024 11:03:57 +0100 Subject: [PATCH 10/10] Lwt_process_watchdog: minor refactoring --- src/lib_base/unix/lwt_process_watchdog.ml | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/lib_base/unix/lwt_process_watchdog.ml b/src/lib_base/unix/lwt_process_watchdog.ml index f1404f2892fb..e7464e0374e7 100644 --- a/src/lib_base/unix/lwt_process_watchdog.ml +++ b/src/lib_base/unix/lwt_process_watchdog.ml @@ -272,13 +272,12 @@ module Daemon (Event : EVENTS) = struct dies. *) let watch_dog ~start_new_server = let open Lwt_result_syntax in + let initial_backoff = (Time.System.epoch, 0.5) in let rec loop t = match t.server with | None -> let* new_server = - run_with_backoff - ~backoff:(Time.System.epoch, 0.5) - ~f:start_new_server + run_with_backoff ~backoff:initial_backoff ~f:start_new_server in loop new_server | Some process -> ( @@ -305,9 +304,7 @@ module Daemon (Event : EVENTS) = struct Event.(emit process_exited_abnormally) (process#pid, status) in let* new_server = - run_with_backoff - ~backoff:(Time.System.epoch, 0.5) - ~f:start_new_server + run_with_backoff ~backoff:initial_backoff ~f:start_new_server in loop new_server) in -- GitLab