From 369347fb679f0fa2a7cbd2d3e41cc5999b1b7da8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20El=20Siba=C3=AFe?= Date: Fri, 2 Jun 2023 12:11:28 +0200 Subject: [PATCH 1/8] Socket: add simple handshake function --- src/lib_base/unix/socket.ml | 25 +++++++++++++++++++++++++ src/lib_base/unix/socket.mli | 15 +++++++++++++++ 2 files changed, 40 insertions(+) diff --git a/src/lib_base/unix/socket.ml b/src/lib_base/unix/socket.ml index bd2364413122..d4274f6088ce 100644 --- a/src/lib_base/unix/socket.ml +++ b/src/lib_base/unix/socket.ml @@ -25,6 +25,20 @@ open Error_monad +type error += Handshake_failure + +let () = + register_error_kind + `Temporary + ~id:"handshake_failure" + ~title:"Handshake failure" + ~description:"Handshake failed" + ~pp:(fun ppf () -> + Format.fprintf ppf "Hanshake failed: cannot validate magic bytes") + Data_encoding.empty + (function Handshake_failure -> Some () | _ -> None) + (fun () -> Handshake_failure) + type addr = | Unix of string | Tcp of string * string * Unix.getaddrinfo_option list @@ -207,3 +221,14 @@ let recv ?timeout fd encoding = | Ok (read_len, message) -> if read_len <> len then tzfail (Decoding_error Extra_bytes) else return message + +let handshake socket magic_bytes = + let open Lwt_result_syntax in + let* () = send socket Data_encoding.Variable.bytes magic_bytes in + let* magic = recv socket Data_encoding.Variable.bytes in + fail_unless (Bytes.equal magic magic_bytes) Handshake_failure + +let get_temporary_socket_dir () = + match Sys.getenv_opt "XDG_RUNTIME_DIR" with + | Some xdg_runtime_dir when xdg_runtime_dir <> "" -> xdg_runtime_dir + | Some _ | None -> Filename.get_temp_dir_name () diff --git a/src/lib_base/unix/socket.mli b/src/lib_base/unix/socket.mli index a06d599cb1ba..4fff6f542f7b 100644 --- a/src/lib_base/unix/socket.mli +++ b/src/lib_base/unix/socket.mli @@ -57,3 +57,18 @@ val recv : Lwt_unix.file_descr -> 'a Data_encoding.t -> 'a tzresult Lwt.t + +(** [handshake socket magic_bytes] is a function to synchronize two + separate processes and start a communication. + + The scenario of the handshake is the following: + - both processes simultaneously send some [magic_bytes], + - both processes wait and checks the received bytes validity, + - handshake is finished. *) +val handshake : Lwt_unix.file_descr -> bytes -> unit tzresult Lwt.t + +(** [get_temporary_socket_dir ()] returns a temporary path for the + socket to be spawned. $XDG_RUNTIME_DIR is returned if the + environment variable is defined. Otherwise, the default temporary + directory is used. *) +val get_temporary_socket_dir : unit -> string -- GitLab From e08d6d2bec7a2a1f7923c5239ac5c8add06e4069 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 20 Jun 2023 09:43:02 +0200 Subject: [PATCH 2/8] Node: remove useless event argument --- src/bin_node/node_run_command.ml | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index e3794a215b3b..104d0a22ed7f 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -104,14 +104,13 @@ module Event = struct () let starting_rpc_server = - declare_4 + declare_3 ~section ~name:"starting_rpc_server" ~msg:"starting RPC server on {host}:{port} (acl = {acl_policy})" ~level:Notice ("host", Data_encoding.string) ("port", Data_encoding.uint16) - ("tls", Data_encoding.bool) ("acl_policy", Data_encoding.string) let starting_metrics_server = @@ -400,8 +399,7 @@ let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node |> Option.value_f ~default:(fun () -> default addr) in let*! () = - Event.(emit starting_rpc_server) - (host, port, rpc_config.tls <> None, RPC_server.Acl.policy_type acl) + Event.(emit starting_rpc_server) (host, port, RPC_server.Acl.policy_type acl) in let cors_headers = sanitize_cors_headers ~default:["Content-Type"] rpc_config.cors_headers -- GitLab From 143c6439f7d9d945a33a6c22c8a65874d4256d29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20El=20Siba=C3=AFe?= Date: Tue, 6 Jun 2023 11:43:10 +0200 Subject: [PATCH 3/8] Config_file: show rpc config encoding in interface --- src/lib_node_config/config_file.ml | 2 ++ src/lib_node_config/config_file.mli | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/lib_node_config/config_file.ml b/src/lib_node_config/config_file.ml index 52371cf1582e..f65daab11f00 100644 --- a/src/lib_node_config/config_file.ml +++ b/src/lib_node_config/config_file.ml @@ -625,6 +625,8 @@ let rpc : rpc Data_encoding.t = Media_type.Command_line.encoding default_rpc.media_type)) +let rpc_encoding = rpc + let encoding = let open Data_encoding in conv diff --git a/src/lib_node_config/config_file.mli b/src/lib_node_config/config_file.mli index 5fa1107e2fdc..6100bb26263c 100644 --- a/src/lib_node_config/config_file.mli +++ b/src/lib_node_config/config_file.mli @@ -177,6 +177,8 @@ val resolve_metrics_addrs : val resolve_bootstrap_addrs : string list -> (P2p_point.Id.t * P2p_peer.Id.t option) list tzresult Lwt.t +val rpc_encoding : rpc Data_encoding.t + val encoding : t Data_encoding.t (** Return [p2p.bootstrap_peers] if not [None], -- GitLab From 358b71a97af66df1df67d344127e680c6dfff96c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20El=20Siba=C3=AFe?= Date: Fri, 9 Jun 2023 14:28:53 +0200 Subject: [PATCH 4/8] RPC_middleware: expose ctx parameter to be able to call sockets --- src/lib_rpc_http/RPC_middleware.ml | 7 ++++--- src/lib_rpc_http/RPC_middleware.mli | 5 ++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/lib_rpc_http/RPC_middleware.ml b/src/lib_rpc_http/RPC_middleware.ml index 214d1e6e567a..ee9a756bd6e7 100644 --- a/src/lib_rpc_http/RPC_middleware.ml +++ b/src/lib_rpc_http/RPC_middleware.ml @@ -23,7 +23,7 @@ (* *) (*****************************************************************************) -let make_transform_callback forwarding_endpoint callback conn req body = +let make_transform_callback ?ctx forwarding_endpoint callback conn req body = let open Lwt_syntax in let open Cohttp in (* Using a [Cohttp_lwt.Body.t] destructs it. As we may need it @@ -62,6 +62,7 @@ let make_transform_callback forwarding_endpoint callback conn req body = in let* resp, body = Cohttp_lwt_unix.Client.call + ?ctx ~headers ~body:(Cohttp_lwt.Body.of_stream body_stream) (Request.meth req) @@ -104,5 +105,5 @@ let rpc_metrics_transform_callback ~update_metrics dir callback conn req body = (* Otherwise, the call must be done anyway. *) do_call () -let proxy_server_query_forwarder forwarding_endpoint = - make_transform_callback forwarding_endpoint +let proxy_server_query_forwarder ?ctx forwarding_endpoint = + make_transform_callback ?ctx forwarding_endpoint diff --git a/src/lib_rpc_http/RPC_middleware.mli b/src/lib_rpc_http/RPC_middleware.mli index 0424e78d08a4..b02be991288e 100644 --- a/src/lib_rpc_http/RPC_middleware.mli +++ b/src/lib_rpc_http/RPC_middleware.mli @@ -30,7 +30,10 @@ that rewrites queries that the proxy server cannot handle and forwards them to the full node at the given [Uri.t]. *) val proxy_server_query_forwarder : - Uri.t -> RPC_server.callback -> RPC_server.callback + ?ctx:Cohttp_lwt_unix.Net.ctx -> + Uri.t -> + RPC_server.callback -> + RPC_server.callback (** A Resto middleware that transforms any server callback to an other that handles RPC metrics *) -- GitLab From 5113d7bccef057c59902fdd4cc49c7454e5492d6 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Wed, 10 May 2023 13:57:26 +0200 Subject: [PATCH 5/8] Manifest: introduce lib_rpc_process --- .gitlab/ci/jobs/packaging/opam_package.yml | 21 ++++++++++++------- dune-project | 1 + manifest/main.ml | 20 +++++++++++++++++- opam/octez-node.opam | 1 + opam/octez-rpc-process.opam | 24 ++++++++++++++++++++++ src/bin_node/dune | 3 +++ src/lib_rpc_process/dune | 24 ++++++++++++++++++++++ 7 files changed, 86 insertions(+), 8 deletions(-) create mode 100644 opam/octez-rpc-process.opam create mode 100644 src/lib_rpc_process/dune diff --git a/.gitlab/ci/jobs/packaging/opam_package.yml b/.gitlab/ci/jobs/packaging/opam_package.yml index 87eab65c646a..e540d2cbf327 100644 --- a/.gitlab/ci/jobs/packaging/opam_package.yml +++ b/.gitlab/ci/jobs/packaging/opam_package.yml @@ -535,14 +535,14 @@ opam:octez-libs: opam:octez-node: extends: - .opam_template - - .rules_template__trigger_exec_opam_batch_2 + - .rules_template__trigger_exec_opam_batch_1 variables: package: octez-node opam:octez-node-config: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_2 + - .rules_template__trigger_all_opam_batch_3 variables: package: octez-node-config @@ -560,6 +560,13 @@ opam:octez-proxy-server: variables: package: octez-proxy-server +opam:octez-rpc-process: + extends: + - .opam_template + - .rules_template__trigger_all_opam_batch_2 + variables: + package: octez-rpc-process + opam:octez-signer: extends: - .opam_template @@ -643,7 +650,7 @@ opam:octez-smart-rollup-wasm-debugger: opam:tezos-017-PtNairob-test-helpers: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_1 + - .rules_template__trigger_all_opam_batch_2 variables: package: tezos-017-PtNairob-test-helpers @@ -1111,7 +1118,7 @@ opam:tezos-embedded-protocol-016-PtMumbai: opam:tezos-embedded-protocol-017-PtNairob: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_4 + - .rules_template__trigger_all_opam_batch_5 variables: package: tezos-embedded-protocol-017-PtNairob @@ -1296,7 +1303,7 @@ opam:tezos-protocol-009-PsFLoren: opam:tezos-protocol-010-PtGRANAD: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_6 + - .rules_template__trigger_all_opam_batch_7 variables: package: tezos-protocol-010-PtGRANAD @@ -1491,7 +1498,7 @@ opam:tezos-protocol-plugin-013-PtJakart-registerer: opam:tezos-protocol-plugin-014-PtKathma: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_5 + - .rules_template__trigger_all_opam_batch_6 variables: package: tezos-protocol-plugin-014-PtKathma @@ -1540,7 +1547,7 @@ opam:tezos-protocol-plugin-017-PtNairob: opam:tezos-protocol-plugin-017-PtNairob-registerer: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_3 + - .rules_template__trigger_all_opam_batch_4 variables: package: tezos-protocol-plugin-017-PtNairob-registerer diff --git a/dune-project b/dune-project index 0b8d168b52ec..2131d1a88b78 100644 --- a/dune-project +++ b/dune-project @@ -31,6 +31,7 @@ (package (name octez-node-config)) (package (name octez-protocol-compiler)) (package (name octez-proxy-server)) +(package (name octez-rpc-process)) (package (name octez-signer)) (package (name octez-smart-rollup)) (package (name octez-smart-rollup-client-Proxford)) diff --git a/manifest/main.ml b/manifest/main.ml index f7a59387627e..fd94e06041b3 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -4389,6 +4389,23 @@ let octez_node_config = octez_validation |> open_; ] +let octez_rpc_process = + public_lib + "octez-rpc-process" + ~path:"src/lib_rpc_process" + ~synopsis:"Tezos: RPC process" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives" |> open_; + octez_base_unix |> open_; + octez_node_config |> open_; + octez_rpc_http |> open_; + octez_rpc_http_server |> open_; + lwt_unix; + lwt_exit; + prometheus_app; + ] + let octez_crawler = public_lib "octez-crawler" @@ -7309,7 +7326,7 @@ let _octez_node = ~deps: ([ octez_base |> open_ ~m:"TzPervasives" |> open_; - octez_base_unix; + octez_base_unix |> open_; octez_version; octez_version_value; octez_node_config |> open_; @@ -7317,6 +7334,7 @@ let _octez_node = octez_shell_services |> open_; octez_rpc_http |> open_; octez_rpc_http_server |> open_; + octez_rpc_process |> open_; octez_p2p |> open_; octez_shell |> open_; octez_store |> open_; diff --git a/opam/octez-node.opam b/opam/octez-node.opam index 8f1bd14da295..7304aa4c66d4 100644 --- a/opam/octez-node.opam +++ b/opam/octez-node.opam @@ -13,6 +13,7 @@ depends: [ "octez-libs" "tezos-version" "octez-node-config" + "octez-rpc-process" "tezos-shell" "tezos-store" "tezos-validation" diff --git a/opam/octez-rpc-process.opam b/opam/octez-rpc-process.opam new file mode 100644 index 000000000000..e7ec2b74a7ae --- /dev/null +++ b/opam/octez-rpc-process.opam @@ -0,0 +1,24 @@ +# This file was automatically generated, do not edit. +# Edit file manifest/main.ml instead. +opam-version: "2.0" +maintainer: "contact@tezos.com" +authors: ["Tezos devteam"] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "git+https://gitlab.com/tezos/tezos.git" +license: "MIT" +depends: [ + "dune" { >= "3.0" } + "ocaml" { >= "4.14" } + "octez-libs" + "octez-node-config" + "lwt" { >= "5.7.0" } + "lwt-exit" + "prometheus-app" { >= "1.2" } +] +build: [ + ["rm" "-r" "vendors" "contrib"] + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Tezos: RPC process" diff --git a/src/bin_node/dune b/src/bin_node/dune index 7362b2a2dfe7..17e59755fbf9 100644 --- a/src/bin_node/dune +++ b/src/bin_node/dune @@ -16,6 +16,7 @@ octez-libs.tezos-shell-services octez-libs.tezos-rpc-http octez-libs.tezos-rpc-http-server + octez-rpc-process octez-libs.tezos-p2p tezos-shell tezos-store @@ -147,11 +148,13 @@ (:standard) -open Tezos_base.TzPervasives -open Tezos_base + -open Tezos_base_unix -open Octez_node_config -open Tezos_stdlib_unix -open Tezos_shell_services -open Tezos_rpc_http -open Tezos_rpc_http_server + -open Octez_rpc_process -open Tezos_p2p -open Tezos_shell -open Tezos_store diff --git a/src/lib_rpc_process/dune b/src/lib_rpc_process/dune new file mode 100644 index 000000000000..83e3e3a6ade8 --- /dev/null +++ b/src/lib_rpc_process/dune @@ -0,0 +1,24 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(library + (name octez_rpc_process) + (public_name octez-rpc-process) + (instrumentation (backend bisect_ppx)) + (libraries + octez-libs.tezos-base + octez-libs.tezos-base.unix + octez-node-config + octez-libs.tezos-rpc-http + octez-libs.tezos-rpc-http-server + lwt.unix + lwt-exit + prometheus-app) + (flags + (:standard) + -open Tezos_base.TzPervasives + -open Tezos_base + -open Tezos_base_unix + -open Octez_node_config + -open Tezos_rpc_http + -open Tezos_rpc_http_server)) -- GitLab From e4389303be7860fcf422b443a0eea684819b1c28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20El=20Siba=C3=AFe?= Date: Fri, 9 Jun 2023 11:12:36 +0200 Subject: [PATCH 6/8] lib_rpc_process: naive implementation that forwards everything MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Victor Allombert Co-authored-by: Rémy El Sibaïe --- src/bin_node/main.ml | 4 + src/bin_node/node_run_command.ml | 212 +++++++++++++--- src/lib_node_config/config_file.ml | 48 +++- src/lib_node_config/config_file.mli | 2 + src/lib_node_config/shared_arg.ml | 29 ++- src/lib_node_config/shared_arg.mli | 2 + src/lib_rpc_process/main.ml | 280 +++++++++++++++++++++ src/lib_rpc_process/rpc_process_worker.ml | 254 +++++++++++++++++++ src/lib_rpc_process/rpc_process_worker.mli | 54 ++++ 9 files changed, 835 insertions(+), 50 deletions(-) create mode 100644 src/lib_rpc_process/main.ml create mode 100644 src/lib_rpc_process/rpc_process_worker.ml create mode 100644 src/lib_rpc_process/rpc_process_worker.mli diff --git a/src/bin_node/main.ml b/src/bin_node/main.ml index 1bbdae0a97ce..70e3ef149fcc 100644 --- a/src/bin_node/main.ml +++ b/src/bin_node/main.ml @@ -106,6 +106,10 @@ let () = if Filename.basename Sys.argv.(0) = "octez-validator" then Tezos_validation.Command_line.run () +let () = + if Filename.basename Sys.argv.(0) = "octez-rpc-process" then + exit (Cmdliner.Cmd.eval Octez_rpc_process.Main.cmd) + let term = let open Cmdliner.Term in ret (const (`Help (`Pager, None))) diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index 104d0a22ed7f..9714a96b6cb4 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -103,16 +103,24 @@ module Event = struct ~level:Warning () - let starting_rpc_server = + let starting_local_rpc_server = declare_3 ~section - ~name:"starting_rpc_server" - ~msg:"starting RPC server on {host}:{port} (acl = {acl_policy})" + ~name:"starting_local_rpc_server" + ~msg:"starting local RPC server on {host}:{port} (acl = {acl_policy})" ~level:Notice ("host", Data_encoding.string) ("port", Data_encoding.uint16) ("acl_policy", Data_encoding.string) + let starting_internal_rpc_server = + declare_1 + ~section + ~name:"starting_internal_rpc_server" + ~msg:"starting internal RPC server (acl = {acl_policy})" + ~level:Info + ("acl_policy", Data_encoding.string) + let starting_metrics_server = declare_2 ~section @@ -149,12 +157,12 @@ module Event = struct ~level:Notice () - let shutting_down_rpc_server = + let shutting_down_local_rpc_server = declare_0 ~section - ~name:"shutting_down_rpc_server" - ~msg:"shutting down the RPC server" - ~level:Notice + ~name:"shutting_down_local_rpc_server" + ~msg:"shutting down the local RPC server" + ~level:Info () let bye = @@ -367,10 +375,13 @@ let rpc_metrics = module Metrics_server = Prometheus_app.Cohttp (Cohttp_lwt_unix.Server) -let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node - (addr, port) = +(* Launches an RPC server depending on the given [mode] (which is + usually TCP, TLS or unix sockets). *) +let launch_rpc_server ~mode (config : Config_file.t) node (addr, port) = let open Lwt_result_syntax in let rpc_config = config.rpc in + let media_types = rpc_config.media_type in + let*! acl_policy = RPC_server.Acl.resolve_domain_names rpc_config.acl in let host = Ipaddr.V6.to_string addr in let version = Tezos_version_value.Current_git_info.version in let commit_info = @@ -387,19 +398,20 @@ let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node dir Tezos_rpc.Service.description_service in - let mode = - match rpc_config.tls with - | None -> `TCP (`Port port) - | Some {cert; key} -> - `TLS (`Crt_file_path cert, `Key_file_path key, `No_password, `Port port) - in let acl = let open RPC_server.Acl in find_policy acl_policy (Ipaddr.V6.to_string addr, Some port) |> Option.value_f ~default:(fun () -> default addr) in let*! () = - Event.(emit starting_rpc_server) (host, port, RPC_server.Acl.policy_type acl) + match (mode : Conduit_lwt_unix.server) with + | `Unix_domain_socket _ -> + Event.(emit starting_internal_rpc_server) + (RPC_server.Acl.policy_type acl) + | `TCP _ | `TLS _ -> + Event.(emit starting_local_rpc_server) + (host, port, RPC_server.Acl.policy_type acl) + | _ -> Lwt.return_unit in let cors_headers = sanitize_cors_headers ~default:["Content-Type"] rpc_config.cors_headers @@ -443,23 +455,117 @@ let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node tzfail (RPC_Port_already_in_use [(addr, port)]) | exn -> fail_with_exn exn) -let init_rpc (config : Config_file.t) node = +(* Describes the kind of servers that can be handled by the node. + - Local_rpc_server: RPC server is run by the node itself + (this may block the node in case of heavy RPC load), + - External_rpc_server: RPC server is spawned as an external + process, + - No_server: the node is not responding to any RPC. *) +type rpc_server_kind = + | Local_rpc_server of RPC_server.server list + | External_rpc_server of (RPC_server.server * Rpc_process_worker.t) list + | No_server + +(* Initializes an RPC server handled by the node main process. *) +let init_local_rpc_server (config : Config_file.t) node = + let open Lwt_result_syntax in + let* servers = + List.concat_map_es + (fun addr -> + let* addrs = Config_file.resolve_rpc_listening_addrs addr in + match addrs with + | [] -> failwith "Cannot resolve listening address: %S" addr + | addrs -> + List.map_es + (fun addr -> + let port = snd addr in + let mode = + match config.rpc.tls with + | None -> `TCP (`Port port) + | Some {cert; key} -> + `TLS + ( `Crt_file_path cert, + `Key_file_path key, + `No_password, + `Port port ) + in + launch_rpc_server ~mode config node addr) + addrs) + config.rpc.local_listen_addrs + in + return (Local_rpc_server servers) + +let rpc_socket_path ~socket_dir ~id ~pid = + let filename = Format.sprintf "octez-external-rpc-socket-%d-%d" pid id in + Filename.concat socket_dir filename + +(* Initializes an RPC server handled by the node process. It will be + used by an external RPC process, identified by [id], to forward + RPCs to the node through a Unix socket. *) +let init_local_rpc_server_for_external_process id (config : Config_file.t) node + addr = + let open Lwt_result_syntax in + let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in + let pid = Unix.getpid () in + let comm_socket_path = rpc_socket_path ~id ~socket_dir ~pid in + let mode = `Unix_domain_socket (`File comm_socket_path) in + (* Register a clean up callback to clean the comm_socket_path when + shutting down. Indeed, the socket file is created by the + Conduit-lwt-unix.Conduit_lwt_server.listen function, but the + resource is not cleaned. *) + let _ = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> + Lwt_unix.unlink comm_socket_path) + in + let* rpc_server = launch_rpc_server ~mode config node addr in + return (rpc_server, comm_socket_path) + +let init_external_rpc_server config node internal_events = let open Lwt_result_syntax in - let media_types = config.rpc.media_type in - List.concat_map_es - (fun addr -> - let* addrs = Config_file.resolve_rpc_listening_addrs addr in - match addrs with - | [] -> failwith "Cannot resolve listening address: %S" addr - | addrs -> - let*! acl_policy = - RPC_server.Acl.resolve_domain_names config.rpc.acl - in - List.map_es - (fun addr -> - launch_rpc_server ~acl_policy ~media_types config node addr) - addrs) - config.rpc.listen_addrs + (* Start one rpc_process for each rpc endpoint. *) + let id = ref 0 in + let* rpc_servers = + List.concat_map_ep + (fun addr -> + let* addrs = Config_file.resolve_rpc_listening_addrs addr in + match addrs with + | [] -> failwith "Cannot resolve listening address: %S" addr + | addrs -> + List.map_ep + (fun (p2p_point : P2p_point.Id.t) -> + let id = + let curid = !id in + incr id ; + curid + in + let* local_rpc_server, comm_socket_path = + init_local_rpc_server_for_external_process + id + config + node + p2p_point + in + let addr = P2p_point.Id.to_string p2p_point in + (* Update the config sent to the rpc_process to + start so that it contains a single listen + address. *) + let config = + {config with rpc = {config.rpc with listen_addrs = [addr]}} + in + let rpc_process = + Octez_rpc_process.Rpc_process_worker.create + ~comm_socket_path + config + internal_events + in + let* () = + Octez_rpc_process.Rpc_process_worker.start rpc_process + in + return (local_rpc_server, rpc_process)) + addrs) + config.rpc.listen_addrs + in + return (External_rpc_server rpc_servers) let metrics_serve metrics_addrs = let open Lwt_result_syntax in @@ -498,6 +604,25 @@ let init_zcash () = "Failed to initialize Zcash parameters: %s" (Printexc.to_string exn)) +let init_rpc (config : Config_file.t) node internal_events = + let open Lwt_result_syntax in + (* Start local RPC server (handled by the node main process) only + when at least one local listen addr is given. *) + let* local_rpc_server = + if config.rpc.local_listen_addrs = [] then return No_server + else init_local_rpc_server config node + in + (* Start RPC process only when at least one listen addr is given. *) + let* rpc_server = + if config.rpc.listen_addrs = [] then return No_server + else + (* Starts the node's local RPC server that aims to handle the + RPCs forwarded by the rpc_process, if they cannot be + processed by the rpc_process itself. *) + init_external_rpc_server config node internal_events + in + return (local_rpc_server :: [rpc_server]) + let run ?verbosity ?sandbox ?target ?(cli_warnings = []) ?ignore_testchain_warning ~singleprocess ~force_history_mode_switch (config : Config_file.t) = @@ -566,14 +691,31 @@ let run ?verbosity ?sandbox ?target ?(cli_warnings = []) Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> Event.(emit shutting_down_node) ()) in - let* rpc = init_rpc config node in + let* rpc_servers = init_rpc config node internal_events in let rpc_downer = Lwt_exit.register_clean_up_callback ~loc:__LOC__ ~after:[log_node_downer] (fun _ -> - let*! () = Event.(emit shutting_down_rpc_server) () in - List.iter_p RPC_server.shutdown rpc) + let*! () = Event.(emit shutting_down_local_rpc_server) () in + List.iter_s + (function + | No_server -> Lwt.return_unit + | External_rpc_server rpc_servers -> + List.iter_p + (fun (local_server, rpc_process) -> + (* Stop the RPC_process first to avoid requests to + be forwarded to the note with a RPC_server that + is down. *) + let*! () = + Octez_rpc_process.Rpc_process_worker.stop rpc_process + in + let*! () = RPC_server.shutdown local_server in + Lwt.return_unit) + rpc_servers + | Local_rpc_server rpc_server -> + List.iter_p RPC_server.shutdown rpc_server) + rpc_servers) in let node_downer = Lwt_exit.register_clean_up_callback diff --git a/src/lib_node_config/config_file.ml b/src/lib_node_config/config_file.ml index f65daab11f00..e68e327725d1 100644 --- a/src/lib_node_config/config_file.ml +++ b/src/lib_node_config/config_file.ml @@ -355,6 +355,7 @@ and p2p = { and rpc = { listen_addrs : string list; + local_listen_addrs : string list; cors_origins : string list; cors_headers : string list; tls : tls option; @@ -382,6 +383,7 @@ let default_p2p = let default_rpc = { listen_addrs = []; + local_listen_addrs = []; cors_origins = []; cors_headers = []; tls = None; @@ -549,13 +551,22 @@ let p2p = let rpc : rpc Data_encoding.t = let open Data_encoding in conv - (fun {cors_origins; cors_headers; listen_addrs; tls; acl; media_type} -> + (fun { + cors_origins; + cors_headers; + listen_addrs; + local_listen_addrs; + tls; + acl; + media_type; + } -> let cert, key = match tls with | None -> (None, None) | Some {cert; key} -> (Some cert, Some key) in ( Some listen_addrs, + Some local_listen_addrs, None, cors_origins, cors_headers, @@ -564,6 +575,7 @@ let rpc : rpc Data_encoding.t = acl, media_type )) (fun ( listen_addrs, + local_listen_addrs, legacy_listen_addr, cors_origins, cors_headers, @@ -586,13 +598,31 @@ let rpc : rpc Data_encoding.t = "Config file: Use only \"listen-addrs\" and not (legacy) \ \"listen-addr\"." in - {listen_addrs; cors_origins; cors_headers; tls; acl; media_type}) - (obj8 + let local_listen_addrs = + Option.value local_listen_addrs ~default:default_rpc.local_listen_addrs + in + { + listen_addrs; + local_listen_addrs; + cors_origins; + cors_headers; + tls; + acl; + media_type; + }) + (obj9 (opt "listen-addrs" ~description: - "Hosts to listen to. If the port is not specified, the default \ - port 8732 will be assumed." + "Hosts to listen to for the RPC server. If the port is not \ + specified, the default port 8732 will be assumed." + (list string)) + (opt + "local-listen-addrs" + ~description: + "Hosts to listen to for the local RPC server, that is an RPC \ + server hosted into the node. If the port is not specified, the \ + default port 8732 will be assumed." (list string)) (opt "listen-addr" ~description:"Legacy value: Host to listen to" string) (dft @@ -810,9 +840,9 @@ let update ?(disable_config_validation = false) ?data_dir ?min_connections ?expected_connections ?max_connections ?max_download_speed ?max_upload_speed ?binary_chunks_size ?peer_table_size ?expected_pow ?bootstrap_peers ?listen_addr ?advertised_net_port ?discovery_addr ?(rpc_listen_addrs = []) - ?(allow_all_rpc = []) ?(media_type = Media_type.Command_line.Any) - ?(metrics_addr = []) ?operation_metadata_size_limit - ?(private_mode = default_p2p.private_mode) + ?(local_rpc_listen_addrs = []) ?(allow_all_rpc = []) + ?(media_type = Media_type.Command_line.Any) ?(metrics_addr = []) + ?operation_metadata_size_limit ?(private_mode = default_p2p.private_mode) ?(disable_p2p_maintenance = Option.is_none default_p2p.limits.maintenance_idle_time) ?(disable_p2p_swap = Option.is_none default_p2p.limits.swap_linger) @@ -892,6 +922,8 @@ let update ?(disable_config_validation = false) ?data_dir ?min_connections and rpc : rpc = { listen_addrs = unopt_list ~default:cfg.rpc.listen_addrs rpc_listen_addrs; + local_listen_addrs = + unopt_list ~default:cfg.rpc.local_listen_addrs local_rpc_listen_addrs; cors_origins = unopt_list ~default:cfg.rpc.cors_origins cors_origins; cors_headers = unopt_list ~default:cfg.rpc.cors_headers cors_headers; tls = Option.either rpc_tls cfg.rpc.tls; diff --git a/src/lib_node_config/config_file.mli b/src/lib_node_config/config_file.mli index 6100bb26263c..9a63c29b3456 100644 --- a/src/lib_node_config/config_file.mli +++ b/src/lib_node_config/config_file.mli @@ -75,6 +75,7 @@ and p2p = { and rpc = { listen_addrs : string list; + local_listen_addrs : string list; cors_origins : string list; cors_headers : string list; tls : tls option; @@ -116,6 +117,7 @@ val update : ?advertised_net_port:int -> ?discovery_addr:string -> ?rpc_listen_addrs:string list -> + ?local_rpc_listen_addrs:string list -> ?allow_all_rpc:P2p_point.Id.addr_port_id list -> ?media_type:Media_type.Command_line.t -> ?metrics_addr:string list -> diff --git a/src/lib_node_config/shared_arg.ml b/src/lib_node_config/shared_arg.ml index 7bbc0c1e99f4..b076884e9ed0 100644 --- a/src/lib_node_config/shared_arg.ml +++ b/src/lib_node_config/shared_arg.ml @@ -49,6 +49,7 @@ type t = { advertised_net_port : int option; discovery_addr : string option; rpc_listen_addrs : string list; + local_rpc_listen_addrs : string list; private_mode : bool; disable_p2p_maintenance : bool; disable_p2p_swap : bool; @@ -196,9 +197,10 @@ let wrap data_dir config_file network connections max_download_speed advertised_net_port discovery_addr peers no_bootstrap_peers bootstrap_threshold private_mode disable_p2p_maintenance disable_p2p_swap disable_mempool disable_mempool_precheck enable_testchain expected_pow - rpc_listen_addrs rpc_tls cors_origins cors_headers log_output log_coloring - history_mode synchronisation_threshold latency disable_config_validation - allow_all_rpc media_type metrics_addr operation_metadata_size_limit = + rpc_listen_addrs local_rpc_listen_addrs rpc_tls cors_origins cors_headers + log_output log_coloring history_mode synchronisation_threshold latency + disable_config_validation allow_all_rpc media_type metrics_addr + operation_metadata_size_limit = let actual_data_dir = Option.value ~default:Config_file.default_data_dir data_dir in @@ -226,6 +228,7 @@ let wrap data_dir config_file network connections max_download_speed advertised_net_port; discovery_addr; rpc_listen_addrs; + local_rpc_listen_addrs; private_mode; disable_p2p_maintenance; disable_p2p_swap; @@ -673,6 +676,16 @@ module Term = struct Arg.( value & opt_all string [] & info ~docs ~doc ~docv:"ADDR:PORT" ["rpc-addr"]) + let local_rpc_listen_addrs = + let doc = + "The TCP socket address at which this local RPC server instance can be \ + reached. As a local RPC server is handled by the node itself, calling \ + computational intensive RPCs can affect the performances of the node." + in + Arg.( + value & opt_all string [] + & info ~docs ~doc ~docv:"ADDR:PORT" ["local-rpc-addr"]) + let rpc_tls = let doc = "Enable TLS for this RPC server with the provided certificate and key." @@ -737,10 +750,10 @@ module Term = struct $ peers $ no_bootstrap_peers $ bootstrap_threshold $ private_mode $ disable_p2p_maintenance $ disable_p2p_swap $ disable_mempool $ disable_mempool_precheck $ enable_testchain $ expected_pow - $ rpc_listen_addrs $ rpc_tls $ cors_origins $ cors_headers $ log_output - $ log_coloring $ history_mode $ synchronisation_threshold $ latency - $ disable_config_validation $ allow_all_rpc $ media_type $ metrics_addr - $ operation_metadata_size_limit + $ rpc_listen_addrs $ local_rpc_listen_addrs $ rpc_tls $ cors_origins + $ cors_headers $ log_output $ log_coloring $ history_mode + $ synchronisation_threshold $ latency $ disable_config_validation + $ allow_all_rpc $ media_type $ metrics_addr $ operation_metadata_size_limit end let read_config_file args = @@ -872,6 +885,7 @@ let patch_config ?(may_override_network = false) ?(emit = Event.emit) disable_mempool_precheck; enable_testchain; rpc_listen_addrs; + local_rpc_listen_addrs; rpc_tls; cors_origins; cors_headers; @@ -1030,6 +1044,7 @@ let patch_config ?(may_override_network = false) ?(emit = Event.emit) ?advertised_net_port ?discovery_addr ~rpc_listen_addrs + ~local_rpc_listen_addrs ~allow_all_rpc ~media_type ~metrics_addr diff --git a/src/lib_node_config/shared_arg.mli b/src/lib_node_config/shared_arg.mli index 73446c0b4f29..328f7b9b4076 100644 --- a/src/lib_node_config/shared_arg.mli +++ b/src/lib_node_config/shared_arg.mli @@ -53,6 +53,8 @@ type t = { discovery_addr : string option; rpc_listen_addrs : string list; (** a list of addresses to listen to RPC requests on *) + local_rpc_listen_addrs : string list; + (** a list of addresses to listen to RPC requests on *) private_mode : bool; (** enables the private mode, see https://tezos.gitlab.io/user/node-configuration.html#private-node *) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml new file mode 100644 index 000000000000..968acd414390 --- /dev/null +++ b/src/lib_rpc_process/main.ml @@ -0,0 +1,280 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs. *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +type error += + | RPC_Process_Port_already_in_use of P2p_point.Id.t list + | Missing_socket_dir + +type parameters = { + rpc : Config_file.rpc; + rpc_comm_socket_path : string; + internal_events : Tezos_base.Internal_event_config.t; +} + +let parameters_encoding = + let open Data_encoding in + conv + (fun {rpc; rpc_comm_socket_path; internal_events} -> + (rpc, rpc_comm_socket_path, internal_events)) + (fun (rpc, rpc_comm_socket_path, internal_events) -> + {rpc; rpc_comm_socket_path; internal_events}) + (obj3 + (req "rpc" Config_file.rpc_encoding) + (req "rpc_comm_socket_path" Data_encoding.string) + (req "internal_events" Tezos_base.Internal_event_config.encoding)) + +let () = + register_error_kind + `Permanent + ~id:"rpc_process.main.process_port_already_in_use" + ~title:"Cannot start RPC process: RPC port already in use" + ~description: + "Another octez RPC process is probably running on the same RPC port." + ~pp:(fun ppf addrlist -> + Format.fprintf + ppf + "Another octez RPC process is probably running on one of these \ + addresses (%a). Please choose another RPC port." + (Format.pp_print_list P2p_point.Id.pp) + addrlist) + Data_encoding.(obj1 (req "addrlist" (list P2p_point.Id.encoding))) + (function + | RPC_Process_Port_already_in_use addrlist -> Some addrlist | _ -> None) + (fun addrlist -> RPC_Process_Port_already_in_use addrlist) ; + register_error_kind + `Permanent + ~id:"rpc_process.main.missing_socket_dir" + ~title:"Cannot start RPC process: missing socket dir" + ~description:"Cannot start RPC process without socket dir." + ~pp:(fun ppf () -> + Format.fprintf + ppf + "The path to the communication socket is missing. Use the --socket-dir \ + option to specify it when running the RPC process. ") + Data_encoding.empty + (function Missing_socket_dir -> Some () | _ -> None) + (fun () -> Missing_socket_dir) + +module Event = struct + include Internal_event.Simple + + let section = ["rpc-server"; "main"] + + let starting_rpc_server = + declare_4 + ~section + ~name:"starting_rpc_server" + ~msg:"starting RPC server on {host}:{port} (acl = {acl_policy})" + ~level:Notice + ("host", Data_encoding.string) + ("port", Data_encoding.uint16) + ("tls", Data_encoding.bool) + ("acl_policy", Data_encoding.string) +end + +(* Add default accepted CORS headers *) +let sanitize_cors_headers ~default headers = + List.map String.lowercase_ascii headers + |> String.Set.of_list + |> String.Set.(union (of_list default)) + |> String.Set.elements + +let launch_rpc_server (config : parameters) (addr, port) = + let open Lwt_result_syntax in + let media_types = config.rpc.media_type in + let*! acl_policy = RPC_server.Acl.resolve_domain_names config.rpc.acl in + let host = Ipaddr.V6.to_string addr in + let mode = + match config.rpc.tls with + | None -> `TCP (`Port port) + | Some {Config_file.cert; key} -> + `TLS (`Crt_file_path cert, `Key_file_path key, `No_password, `Port port) + in + let acl = + let open RPC_server.Acl in + find_policy acl_policy (Ipaddr.V6.to_string addr, Some port) + |> Option.value_f ~default:(fun () -> default addr) + in + let*! () = + Event.(emit starting_rpc_server) + (host, port, config.rpc.tls <> None, RPC_server.Acl.policy_type acl) + in + let cors_headers = + sanitize_cors_headers ~default:["Content-Type"] config.rpc.cors_headers + in + let cors = + Resto_cohttp.Cors. + { + allowed_origins = config.rpc.cors_origins; + allowed_headers = cors_headers; + } + in + let dir = Tezos_rpc.Directory.empty in + let server = + RPC_server.init_server + ~cors + ~acl + ~media_types:(Media_type.Command_line.of_command_line media_types) + dir + in + let callback (conn : Cohttp_lwt_unix.Server.conn) req body = + Tezos_rpc_http_server.RPC_server.resto_callback server conn req body + in + let callback = + let resolver_handle = "octez-node-unix-socket" in + let localhost = Format.asprintf "http://%s" resolver_handle in + let forwarding_endpoint = Uri.of_string localhost in + let resolver = + let h = Stdlib.Hashtbl.create 1 in + Stdlib.Hashtbl.add + h + resolver_handle + (`Unix_domain_socket config.rpc_comm_socket_path) ; + Resolver_lwt_unix.static h + in + let ctx = Cohttp_lwt_unix.Client.custom_ctx ~resolver () in + RPC_middleware.proxy_server_query_forwarder + ~ctx + forwarding_endpoint + callback + in + Lwt.catch + (fun () -> + let*! () = RPC_server.launch ~host server ~callback mode in + return server) + (function + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/1312 + This exception seems to be unreachable. + *) + | Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + tzfail (RPC_Process_Port_already_in_use [(addr, port)]) + | exn -> fail_with_exn exn) + +let init_rpc parameters = + let open Lwt_result_syntax in + let* server = + let* p2p_point = + match parameters.rpc.listen_addrs with + | [addr] -> Config_file.resolve_rpc_listening_addrs addr + | _ -> + (* We assume that the config contains only one listening + address. This should hold thanks to the `init_rpc` + function from `bin_node/Node_run_command`. *) + assert false + in + match p2p_point with + | [point] -> launch_rpc_server parameters point + | _ -> + (* Same as above: only one p2p_point is expected here. *) + assert false + in + (* Add a cleanup call back to shutdown the RPC_server gracefully + when an exit signal is received. *) + let (_ccid : Lwt_exit.clean_up_callback_id) = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> + RPC_server.shutdown server) + in + return_unit + +let get_init_socket_path socket_dir pid = + let filename = Format.sprintf "init-rpc-socket-%d" pid in + Filename.concat socket_dir filename + +(* Magic bytes used for the external RPC process handshake. *) +let socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" + +let create_init_socket socket_dir = + let open Lwt_result_syntax in + let* socket_dir = + match socket_dir with + | Some sd -> return sd + | None -> tzfail Missing_socket_dir + in + let pid = Unix.getpid () in + let init_socket_path = get_init_socket_path socket_dir pid in + let* init_socket_fd = Socket.connect (Unix init_socket_path) in + (* Unlink the socket as soon as both sides have opened it.*) + let*! () = Lwt_unix.unlink init_socket_path in + let* () = Socket.handshake init_socket_fd socket_magic in + return init_socket_fd + +let run socket_dir = + let open Lwt_result_syntax in + let* init_socket_fd = create_init_socket socket_dir in + let* parameters = Socket.recv init_socket_fd parameters_encoding in + let*! () = + Tezos_base_unix.Internal_event_unix.init + ~config:parameters.internal_events + () + in + let* () = init_rpc parameters in + (* Send the config ack as synchronization barrier for the init_rpc + phase. *) + let* () = Socket.send init_socket_fd Data_encoding.unit () in + let*! () = Lwt_unix.close init_socket_fd in + Lwt_utils.never_ending () + +let process socket_dir = + let open Lwt_result_syntax in + let main_promise = + Lwt.catch (fun () -> run socket_dir) (function exn -> fail_with_exn exn) + in + Lwt_main.run + (let*! r = Lwt_exit.wrap_and_exit main_promise in + match r with + | Ok () -> + let*! _ = Lwt_exit.exit_and_wait 0 in + Lwt.return (`Ok ()) + | Error err -> + let*! _ = Lwt_exit.exit_and_wait 1 in + Lwt.return @@ `Error (false, Format.asprintf "%a" pp_print_trace err)) + +module Term = struct + let socket_dir = + let open Cmdliner in + let doc = "Socket directory to communicate with node process" in + Arg.( + value + & opt (some string) None + & info ~docs:Shared_arg.Manpage.misc_section ~doc ["socket-dir"]) + + let term = Cmdliner.Term.(ret (const process $ socket_dir)) +end + +module Manpage = struct + let command_description = + "The $(b, octez-rpc-process) starts the RPC process that aims to serve as \ + the default endpoint for RPC queries. This server may communicate with an \ + Octez node." + + let description = [`S "DESCRIPTION"; `P command_description] + + let man = description + + let info = + Cmdliner.Cmd.info ~doc:"Run the Octez rpc process" ~man "octez-rpc-process" +end + +let cmd = Cmdliner.Cmd.v Manpage.info Term.term diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml new file mode 100644 index 000000000000..f6fe6e9d561a --- /dev/null +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -0,0 +1,254 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module Event = struct + include Internal_event.Simple + + let section = ["node"; "main"] + + let shutting_down_rpc_process = + declare_0 + ~section + ~name:"shutting_down_rpc_process" + ~msg:"shutting down the RPC process" + ~level:Notice + () + + let rpc_process_started = + declare_1 + ~section + ~name:"rpc_process_started" + ~msg:"RPC process was started on pid {pid}" + ~level:Notice + ("pid", Data_encoding.int31) + + let rpc_process_exited_abnormally = + let open Unix in + let exit_status_encoding = + let open Data_encoding in + union + [ + case + (Tag 0) + ~title:"wexited" + int31 + (function WEXITED i -> Some i | _ -> None) + (fun i -> WEXITED i); + case + (Tag 1) + ~title:"wsignaled" + int31 + (function WSIGNALED i -> Some i | _ -> None) + (fun i -> WSIGNALED i); + case + (Tag 2) + ~title:"wstopped" + int31 + (function WSTOPPED i -> Some i | _ -> None) + (fun i -> WSTOPPED i); + ] + in + declare_2 + ~section + ~level:Error + ~name:"rpc_process_exited_status" + ~msg:"rpc process (pid {pid}) {status_msg}" + ("pid", Data_encoding.int31) + ~pp2:(fun fmt status -> + match status with + | WEXITED i -> + Format.fprintf fmt "terminated abnormally with exit code %i" i + | WSIGNALED i -> + Format.fprintf + fmt + "was killed by signal %s" + (Lwt_exit.signal_name i) + | WSTOPPED i -> + Format.fprintf + fmt + "was stopped by signal %s" + (Lwt_exit.signal_name i)) + ("status_msg", exit_status_encoding) + + let cannot_start_rpc_process = + declare_1 + ~section + ~name:"cannot_start_rpc_process" + ~level:Error + ~msg:"cannot start rpc process: {trace}" + ("trace", Data_encoding.string) + + let waiting_for_rpc_process_restart = + declare_1 + ~section + ~name:"waiting_for_rpc_process_restart" + ~level:Error + ~msg:"restarting RPC process in {sleep} seconds" + ("sleep", Data_encoding.float) +end + +(* State of the worker. *) +type t = { + rpc_config : Config_file.rpc; + events_config : Internal_event_config.t; + mutable server : Lwt_process.process_none option; + stop : (int * Unix.process_status) Lwt.t; + stopper : (int * Unix.process_status) Lwt.u; + comm_socket_path : string; +} + +let create ~comm_socket_path (config : Config_file.t) events_config = + let stop, stopper = Lwt.wait () in + { + rpc_config = config.rpc; + events_config; + server = None; + stop; + stopper; + comm_socket_path; + } + +let shutdown t = + let open Lwt_syntax in + match t.server with + | None -> return_unit + | Some process -> + let* () = Event.(emit shutting_down_rpc_process) () in + process#terminate ; + return_unit + +let stop t = + Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; + shutdown t + +let run_server t () = + let open Lwt_result_syntax in + let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in + let socket_dir_arg = ["--socket-dir"; socket_dir] in + let args = "octez-rpc-process" :: socket_dir_arg in + let process = + Lwt_process.open_process_none + ~stdout:(`FD_copy Unix.stdout) + ~stderr:(`FD_copy Unix.stderr) + (Sys.executable_name, Array.of_list args) + in + let pid = process#pid in + let init_socket_path = Main.get_init_socket_path socket_dir pid in + let* init_socket_fd = + let* fds = Tezos_base_unix.Socket.bind (Unix init_socket_path) in + match fds with + | [fd] -> + let*! init_socket_fd, _ = Lwt_unix.accept ~cloexec:true fd in + let*! () = Lwt_unix.close fd in + return init_socket_fd + | _ -> + (* This assertions holds as long as + Tezos_base_unix.Socket.bind returns a single list element + when binding Unix sockets. *) + assert false + in + let* () = Tezos_base_unix.Socket.handshake init_socket_fd Main.socket_magic in + let parameters = + Main. + { + rpc = t.rpc_config; + internal_events = t.events_config; + rpc_comm_socket_path = t.comm_socket_path; + } + in + let* () = Socket.send init_socket_fd Main.parameters_encoding parameters in + let* () = Socket.recv init_socket_fd Data_encoding.unit in + let*! () = Lwt_unix.close init_socket_fd in + let*! () = Event.(emit rpc_process_started) pid in + t.server <- Some process ; + return t + +(* Evaluates [f]. If [f] fails, the error is caught, printed as an + error event, and [f] is re-evaluated after a [backoff] delay. The + delay increases at each failing try. *) +let rec may_start backoff f = + let open Lwt_result_syntax in + let timestamp, sleep = backoff in + let now = Time.System.now () in + let diff = Ptime.diff now timestamp in + if Ptime.Span.to_float_s diff > sleep then + protect + (fun () -> f ()) + ~on_error:(function + | errs -> + let*! () = + Event.( + emit + cannot_start_rpc_process + (Format.asprintf "%a" pp_print_trace errs)) + in + may_start (Time.System.now (), sleep *. 1.2) f) + else + let*! () = Event.(emit waiting_for_rpc_process_restart sleep) in + let*! () = Lwt_unix.sleep sleep in + may_start (timestamp, sleep) f + +(* Watch_dog make sure that the RPC process is restarted as soon as it + dies. *) +let watch_dog run_server = + let open Lwt_result_syntax in + let rec loop t = + match t.server with + | None -> + let* new_server = may_start (Time.System.epoch, 0.5) run_server in + loop new_server + | Some process -> ( + let wait_pid_t = + let*! _, status = Lwt_unix.waitpid [] process#pid in + (* Sleep is necessary here to avoid waitpid to be faster than + the Lwt_exit stack. It avoids the clean_up_starts to be + pending while the node is properly shutting down. *) + let*! () = Lwt_unix.sleep 1. in + Lwt.return (`Wait_pid status) + in + let stop_t = + let*! _ = t.stop in + Lwt.return `Stopped + in + let*! res = Lwt.choose [wait_pid_t; stop_t] in + match res with + | `Stopped -> return_unit + | `Wait_pid _ when not (Lwt.is_sleeping Lwt_exit.clean_up_starts) -> + return_unit + | `Wait_pid status -> + t.server <- None ; + let*! () = + Event.(emit rpc_process_exited_abnormally) (process#pid, status) + in + let* new_server = may_start (Time.System.epoch, 0.5) run_server in + loop new_server) + in + loop + +let start server = + let open Lwt_result_syntax in + let* new_server = run_server server () in + let _ = watch_dog (run_server server) new_server in + return_unit diff --git a/src/lib_rpc_process/rpc_process_worker.mli b/src/lib_rpc_process/rpc_process_worker.mli new file mode 100644 index 000000000000..4f8159f8cf70 --- /dev/null +++ b/src/lib_rpc_process/rpc_process_worker.mli @@ -0,0 +1,54 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** Worker that handles the RPC server spawned as an external process + by the node. + + This module defines an RPC process worker type [t] that is + responsible of dealing with the process' lifetime. *) + +(** Type of the RPC process worker*) +type t + +(** [create ~comm_socket_path config internal_event_config] creates the + worker initial state. [comm_socket_path] is a socket path that + will be used to communicate with the node (note that the socket is + created automatically, but the cleaning of it is not + handled). [config] contains all the RPC server configuration (such + as ACLs, cors_headers, …). *) +val create : + comm_socket_path:string -> Config_file.t -> Internal_event_config.t -> t + +(** Starts the external RPC process using fork+exec calls. It + implements a watch dog that is responsible of restarting the + process if it dies at some point. Additionally, if the process + fails to be restarted, it retries with an exponential back-off + until the restart is successful. + The promise is blocking until the RPC server is not fully + available to answer to PCs. *) +val start : t -> unit tzresult Lwt.t + +(** Stops gracefully the RPC process worker*) +val stop : t -> unit Lwt.t -- GitLab From 4ec2c5cde0db4641ce7157cbe6d5ad3695b81965 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 13 Jun 2023 14:13:59 +0200 Subject: [PATCH 7/8] Tezt: test the octez' Rpc-process resilience --- tezt/tests/main.ml | 1 + tezt/tests/rpc_process.ml | 92 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 tezt/tests/rpc_process.ml diff --git a/tezt/tests/main.ml b/tezt/tests/main.ml index a1c4bac5d21c..9d0e96504b80 100644 --- a/tezt/tests/main.ml +++ b/tezt/tests/main.ml @@ -173,6 +173,7 @@ let register_protocol_tests_that_use_supports_correctly () = Protocol_limits.register ~protocols ; Proxy.register ~protocols ; Proxy_server_test.register ~protocols ; + Rpc_process.register ~protocols ; RPC_test.register protocols ; Rpc_versioning_attestation.register ~protocols ; Reject_malformed_micheline.register ~protocols ; diff --git a/tezt/tests/rpc_process.ml b/tezt/tests/rpc_process.ml new file mode 100644 index 000000000000..d40e051407bd --- /dev/null +++ b/tezt/tests/rpc_process.ml @@ -0,0 +1,92 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: Node's RPC_process + Invocation: dune exec tezt/tests/main.exe -- --file rpc_process.ml + Subject: Tests the resilience of the RPC process +*) + +let wait_for_RPC_process_pid node = + let filter json = JSON.(json |> as_int_opt) in + Node.wait_for node "rpc_process_started.v0" filter + +type signal = SIGABRT | SIGINT | SIGKILL | SIGQUIT | SIGTERM + +let signal_to_int = function + | SIGABRT -> Sys.sigabrt + | SIGINT -> Sys.sigint + | SIGKILL -> Sys.sigkill + | SIGQUIT -> Sys.sigquit + | SIGTERM -> Sys.sigterm + +let pp_signal ppf signal = + let str = + match signal with + | SIGABRT -> "sigabrt" + | SIGINT -> "sigint" + | SIGKILL -> "sigkill" + | SIGQUIT -> "sigquit" + | SIGTERM -> "sigterm" + in + Format.fprintf ppf "%s" str + +let kill_process ~pid ~signal = + Log.info "Kill the rpc process (pid %d) with signal %a" pid pp_signal signal ; + Unix.kill pid (signal_to_int signal) + +let head_can_be_requested ~expected_level node = + let* json = RPC.call node @@ RPC.get_chain_block_header () in + let v = JSON.(json |-> "level" |> as_int) in + return (v = expected_level) + +let test_kill = + Protocol.register_test + ~__FILE__ + ~title:"RPC process kill" + ~tags:["rpc"; "process"; "kill"] + @@ fun protocol -> + let node = Node.create [] in + let wait_RPC_process_pid = wait_for_RPC_process_pid node in + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + let* client = Client.init ~endpoint:(Node node) () in + let* rpc_process_pid = wait_RPC_process_pid in + Log.info "RPC process is now runnig on pid %d" rpc_process_pid ; + let* () = Client.activate_protocol_and_wait ~protocol client in + Log.info "Wait for level 1" ; + let* (_ : int) = Node.wait_for_level node 1 in + Log.info "Head can be requested" ; + let* (_ : bool) = head_can_be_requested ~expected_level:1 node in + let wait_new_rpc_process_pid = wait_for_RPC_process_pid node in + let () = kill_process ~pid:rpc_process_pid ~signal:SIGKILL in + let* new_rpc_process_pid = wait_new_rpc_process_pid in + Log.info "The new RPC process is now runnig on pid %d" new_rpc_process_pid ; + Log.info "Head can still be requested" ; + let* (_ : bool) = head_can_be_requested ~expected_level:1 node in + unit + +let register ~protocols = test_kill protocols -- GitLab From 9170cfc8a5684c559149d8963c4bb4747be38c43 Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Tue, 29 Aug 2023 17:08:06 +0200 Subject: [PATCH 8/8] Changelog: introduce RPC-process --- CHANGES.rst | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/CHANGES.rst b/CHANGES.rst index ba8d4606e109..ed3443ed80cd 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -33,6 +33,13 @@ Node ../helpers/scripts/simulate_operation`` default version to version ``1``. Version ``0`` can still be used with ``?version=0`` argument. (MR :gl:`!9840`) +- Introduced a new process, forked by the node, that is responsible of + managing the RPC server: the RPC-process. It is used by default by + the node. + +- Introduced a new ``--local-rpc-addr`` that starts the RPC server + locally, not using the dedicated RPC-process. + Client ------ -- GitLab