diff --git a/.gitlab/ci/jobs/packaging/opam_package.yml b/.gitlab/ci/jobs/packaging/opam_package.yml index 7eb340510f70edaa0cdfdc6dc7eeaf2d90bbd020..e993caa462bc8ba109bb844a9dbcee4280016f88 100644 --- a/.gitlab/ci/jobs/packaging/opam_package.yml +++ b/.gitlab/ci/jobs/packaging/opam_package.yml @@ -718,6 +718,13 @@ opam:octez-proxy-server: variables: package: octez-proxy-server +opam:octez-rpc-process: + extends: + - .opam_template + - .rules_template__trigger_all_opam_batch_3 + variables: + package: octez-rpc-process + opam:octez-shell-libs: extends: - .opam_template @@ -835,7 +842,7 @@ opam:tezos-benchmark: opam:tezos-client-demo-counter: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_3 + - .rules_template__trigger_all_opam_batch_4 variables: package: tezos-client-demo-counter @@ -881,7 +888,7 @@ opam:tezos-dal-node-lib: opam:tezos-dal-node-services: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_6 + - .rules_template__trigger_all_opam_batch_7 variables: package: tezos-dal-node-services @@ -935,7 +942,7 @@ opam:tezos-protocol-004-Pt24m4xi: opam:tezos-protocol-005-PsBABY5H: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_4 + - .rules_template__trigger_all_opam_batch_5 variables: package: tezos-protocol-005-PsBABY5H @@ -1026,7 +1033,7 @@ opam:tezos-protocol-015-PtLimaPt: opam:tezos-protocol-016-PtMumbai: extends: - .opam_template - - .rules_template__trigger_all_opam_batch_5 + - .rules_template__trigger_all_opam_batch_6 variables: package: tezos-protocol-016-PtMumbai diff --git a/CHANGES.rst b/CHANGES.rst index 53df1eef83522695059b12a57f5bceea28020bee..0f1fdc76af19e13eb95bdcefe1c6b9819f96834d 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -59,6 +59,13 @@ Node nodes. Also, improved the consistency of ``snapshot`` import errors messages (MR :gl:`!10138`) +- Introduced a new process, forked by the node, that is responsible of + managing the RPC server: the RPC-process. It is used by default by + the node. + +- Introduced a new ``--local-rpc-addr`` that starts the RPC server + locally, not using the dedicated RPC-process. + Client ------ diff --git a/dune-project b/dune-project index 331bbef55f64da0b968418e2aa86000cabaabaac..e741f644359484424dcf48dacffb2471cda94897 100644 --- a/dune-project +++ b/dune-project @@ -55,6 +55,7 @@ (package (name octez-protocol-alpha-libs)) (package (name octez-protocol-compiler)) (package (name octez-proxy-server)) +(package (name octez-rpc-process)) (package (name octez-shell-libs)) (package (name octez-shell-tests)(allow_empty)) (package (name octez-signer)) diff --git a/manifest/main.ml b/manifest/main.ml index bd5d52e78fef01403a7140b60c404c53d1838347..128e964559c64142484b9d69426de0670db5a384 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -4382,6 +4382,23 @@ let octez_node_config = octez_validation |> open_; ] +let octez_rpc_process = + public_lib + "octez-rpc-process" + ~path:"src/lib_rpc_process" + ~synopsis:"Tezos: RPC process" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives" |> open_; + octez_base_unix |> open_; + octez_node_config |> open_; + octez_rpc_http |> open_; + octez_rpc_http_server |> open_; + lwt_unix; + lwt_exit; + prometheus_app; + ] + let octez_crawler = public_lib "octez-crawler" @@ -7562,7 +7579,7 @@ let _octez_node = ~deps: ([ octez_base |> open_ ~m:"TzPervasives" |> open_; - octez_base_unix; + octez_base_unix |> open_; octez_version; octez_version_value; octez_node_config |> open_; @@ -7570,6 +7587,7 @@ let _octez_node = octez_shell_services |> open_; octez_rpc_http |> open_; octez_rpc_http_server |> open_; + octez_rpc_process |> open_; octez_p2p |> open_; octez_shell |> open_; octez_store |> open_; diff --git a/opam/octez-node.opam b/opam/octez-node.opam index 75fd9f4e37360c1ce65fb1428021e8575263d51b..98e8a77c7ed108681ef2e4c3de10a7db26165f00 100644 --- a/opam/octez-node.opam +++ b/opam/octez-node.opam @@ -14,6 +14,7 @@ depends: [ "octez-version" "octez-node-config" "octez-shell-libs" + "octez-rpc-process" "cmdliner" { >= "1.1.0" } "fmt" { >= "0.8.7" } "tls-lwt" { >= "0.16.0" } diff --git a/opam/octez-rpc-process.opam b/opam/octez-rpc-process.opam new file mode 100644 index 0000000000000000000000000000000000000000..e7ec2b74a7ae1bf7bc0d2cb0d30c306295d3a378 --- /dev/null +++ b/opam/octez-rpc-process.opam @@ -0,0 +1,24 @@ +# This file was automatically generated, do not edit. +# Edit file manifest/main.ml instead. +opam-version: "2.0" +maintainer: "contact@tezos.com" +authors: ["Tezos devteam"] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "git+https://gitlab.com/tezos/tezos.git" +license: "MIT" +depends: [ + "dune" { >= "3.0" } + "ocaml" { >= "4.14" } + "octez-libs" + "octez-node-config" + "lwt" { >= "5.7.0" } + "lwt-exit" + "prometheus-app" { >= "1.2" } +] +build: [ + ["rm" "-r" "vendors" "contrib"] + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Tezos: RPC process" diff --git a/src/bin_node/dune b/src/bin_node/dune index 35956a2d1978f875a32d20245880fd0898db9fb4..ad1b852dce858afa228333c9c3ac68e504c8d3eb 100644 --- a/src/bin_node/dune +++ b/src/bin_node/dune @@ -16,6 +16,7 @@ octez-shell-libs.shell-services octez-libs.rpc-http octez-libs.rpc-http-server + octez-rpc-process octez-shell-libs.p2p octez-shell-libs.shell octez-shell-libs.store @@ -147,11 +148,13 @@ (:standard) -open Tezos_base.TzPervasives -open Tezos_base + -open Tezos_base_unix -open Octez_node_config -open Tezos_stdlib_unix -open Tezos_shell_services -open Tezos_rpc_http -open Tezos_rpc_http_server + -open Octez_rpc_process -open Tezos_p2p -open Tezos_shell -open Tezos_store diff --git a/src/bin_node/main.ml b/src/bin_node/main.ml index 1bbdae0a97ce14fa9a08794c39a104f0f783a06b..70e3ef149fcc714ccf28731f0acdb05ec997ceae 100644 --- a/src/bin_node/main.ml +++ b/src/bin_node/main.ml @@ -106,6 +106,10 @@ let () = if Filename.basename Sys.argv.(0) = "octez-validator" then Tezos_validation.Command_line.run () +let () = + if Filename.basename Sys.argv.(0) = "octez-rpc-process" then + exit (Cmdliner.Cmd.eval Octez_rpc_process.Main.cmd) + let term = let open Cmdliner.Term in ret (const (`Help (`Pager, None))) diff --git a/src/bin_node/node_run_command.ml b/src/bin_node/node_run_command.ml index e3794a215b3bf23f165cd1cafc94d352d654fad5..9714a96b6cb4c18f52953a28bc00a23ae366ac25 100644 --- a/src/bin_node/node_run_command.ml +++ b/src/bin_node/node_run_command.ml @@ -103,15 +103,22 @@ module Event = struct ~level:Warning () - let starting_rpc_server = - declare_4 + let starting_local_rpc_server = + declare_3 ~section - ~name:"starting_rpc_server" - ~msg:"starting RPC server on {host}:{port} (acl = {acl_policy})" + ~name:"starting_local_rpc_server" + ~msg:"starting local RPC server on {host}:{port} (acl = {acl_policy})" ~level:Notice ("host", Data_encoding.string) ("port", Data_encoding.uint16) - ("tls", Data_encoding.bool) + ("acl_policy", Data_encoding.string) + + let starting_internal_rpc_server = + declare_1 + ~section + ~name:"starting_internal_rpc_server" + ~msg:"starting internal RPC server (acl = {acl_policy})" + ~level:Info ("acl_policy", Data_encoding.string) let starting_metrics_server = @@ -150,12 +157,12 @@ module Event = struct ~level:Notice () - let shutting_down_rpc_server = + let shutting_down_local_rpc_server = declare_0 ~section - ~name:"shutting_down_rpc_server" - ~msg:"shutting down the RPC server" - ~level:Notice + ~name:"shutting_down_local_rpc_server" + ~msg:"shutting down the local RPC server" + ~level:Info () let bye = @@ -368,10 +375,13 @@ let rpc_metrics = module Metrics_server = Prometheus_app.Cohttp (Cohttp_lwt_unix.Server) -let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node - (addr, port) = +(* Launches an RPC server depending on the given [mode] (which is + usually TCP, TLS or unix sockets). *) +let launch_rpc_server ~mode (config : Config_file.t) node (addr, port) = let open Lwt_result_syntax in let rpc_config = config.rpc in + let media_types = rpc_config.media_type in + let*! acl_policy = RPC_server.Acl.resolve_domain_names rpc_config.acl in let host = Ipaddr.V6.to_string addr in let version = Tezos_version_value.Current_git_info.version in let commit_info = @@ -388,20 +398,20 @@ let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node dir Tezos_rpc.Service.description_service in - let mode = - match rpc_config.tls with - | None -> `TCP (`Port port) - | Some {cert; key} -> - `TLS (`Crt_file_path cert, `Key_file_path key, `No_password, `Port port) - in let acl = let open RPC_server.Acl in find_policy acl_policy (Ipaddr.V6.to_string addr, Some port) |> Option.value_f ~default:(fun () -> default addr) in let*! () = - Event.(emit starting_rpc_server) - (host, port, rpc_config.tls <> None, RPC_server.Acl.policy_type acl) + match (mode : Conduit_lwt_unix.server) with + | `Unix_domain_socket _ -> + Event.(emit starting_internal_rpc_server) + (RPC_server.Acl.policy_type acl) + | `TCP _ | `TLS _ -> + Event.(emit starting_local_rpc_server) + (host, port, RPC_server.Acl.policy_type acl) + | _ -> Lwt.return_unit in let cors_headers = sanitize_cors_headers ~default:["Content-Type"] rpc_config.cors_headers @@ -445,23 +455,117 @@ let launch_rpc_server ~acl_policy ~media_types (config : Config_file.t) node tzfail (RPC_Port_already_in_use [(addr, port)]) | exn -> fail_with_exn exn) -let init_rpc (config : Config_file.t) node = +(* Describes the kind of servers that can be handled by the node. + - Local_rpc_server: RPC server is run by the node itself + (this may block the node in case of heavy RPC load), + - External_rpc_server: RPC server is spawned as an external + process, + - No_server: the node is not responding to any RPC. *) +type rpc_server_kind = + | Local_rpc_server of RPC_server.server list + | External_rpc_server of (RPC_server.server * Rpc_process_worker.t) list + | No_server + +(* Initializes an RPC server handled by the node main process. *) +let init_local_rpc_server (config : Config_file.t) node = + let open Lwt_result_syntax in + let* servers = + List.concat_map_es + (fun addr -> + let* addrs = Config_file.resolve_rpc_listening_addrs addr in + match addrs with + | [] -> failwith "Cannot resolve listening address: %S" addr + | addrs -> + List.map_es + (fun addr -> + let port = snd addr in + let mode = + match config.rpc.tls with + | None -> `TCP (`Port port) + | Some {cert; key} -> + `TLS + ( `Crt_file_path cert, + `Key_file_path key, + `No_password, + `Port port ) + in + launch_rpc_server ~mode config node addr) + addrs) + config.rpc.local_listen_addrs + in + return (Local_rpc_server servers) + +let rpc_socket_path ~socket_dir ~id ~pid = + let filename = Format.sprintf "octez-external-rpc-socket-%d-%d" pid id in + Filename.concat socket_dir filename + +(* Initializes an RPC server handled by the node process. It will be + used by an external RPC process, identified by [id], to forward + RPCs to the node through a Unix socket. *) +let init_local_rpc_server_for_external_process id (config : Config_file.t) node + addr = let open Lwt_result_syntax in - let media_types = config.rpc.media_type in - List.concat_map_es - (fun addr -> - let* addrs = Config_file.resolve_rpc_listening_addrs addr in - match addrs with - | [] -> failwith "Cannot resolve listening address: %S" addr - | addrs -> - let*! acl_policy = - RPC_server.Acl.resolve_domain_names config.rpc.acl - in - List.map_es - (fun addr -> - launch_rpc_server ~acl_policy ~media_types config node addr) - addrs) - config.rpc.listen_addrs + let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in + let pid = Unix.getpid () in + let comm_socket_path = rpc_socket_path ~id ~socket_dir ~pid in + let mode = `Unix_domain_socket (`File comm_socket_path) in + (* Register a clean up callback to clean the comm_socket_path when + shutting down. Indeed, the socket file is created by the + Conduit-lwt-unix.Conduit_lwt_server.listen function, but the + resource is not cleaned. *) + let _ = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> + Lwt_unix.unlink comm_socket_path) + in + let* rpc_server = launch_rpc_server ~mode config node addr in + return (rpc_server, comm_socket_path) + +let init_external_rpc_server config node internal_events = + let open Lwt_result_syntax in + (* Start one rpc_process for each rpc endpoint. *) + let id = ref 0 in + let* rpc_servers = + List.concat_map_ep + (fun addr -> + let* addrs = Config_file.resolve_rpc_listening_addrs addr in + match addrs with + | [] -> failwith "Cannot resolve listening address: %S" addr + | addrs -> + List.map_ep + (fun (p2p_point : P2p_point.Id.t) -> + let id = + let curid = !id in + incr id ; + curid + in + let* local_rpc_server, comm_socket_path = + init_local_rpc_server_for_external_process + id + config + node + p2p_point + in + let addr = P2p_point.Id.to_string p2p_point in + (* Update the config sent to the rpc_process to + start so that it contains a single listen + address. *) + let config = + {config with rpc = {config.rpc with listen_addrs = [addr]}} + in + let rpc_process = + Octez_rpc_process.Rpc_process_worker.create + ~comm_socket_path + config + internal_events + in + let* () = + Octez_rpc_process.Rpc_process_worker.start rpc_process + in + return (local_rpc_server, rpc_process)) + addrs) + config.rpc.listen_addrs + in + return (External_rpc_server rpc_servers) let metrics_serve metrics_addrs = let open Lwt_result_syntax in @@ -500,6 +604,25 @@ let init_zcash () = "Failed to initialize Zcash parameters: %s" (Printexc.to_string exn)) +let init_rpc (config : Config_file.t) node internal_events = + let open Lwt_result_syntax in + (* Start local RPC server (handled by the node main process) only + when at least one local listen addr is given. *) + let* local_rpc_server = + if config.rpc.local_listen_addrs = [] then return No_server + else init_local_rpc_server config node + in + (* Start RPC process only when at least one listen addr is given. *) + let* rpc_server = + if config.rpc.listen_addrs = [] then return No_server + else + (* Starts the node's local RPC server that aims to handle the + RPCs forwarded by the rpc_process, if they cannot be + processed by the rpc_process itself. *) + init_external_rpc_server config node internal_events + in + return (local_rpc_server :: [rpc_server]) + let run ?verbosity ?sandbox ?target ?(cli_warnings = []) ?ignore_testchain_warning ~singleprocess ~force_history_mode_switch (config : Config_file.t) = @@ -568,14 +691,31 @@ let run ?verbosity ?sandbox ?target ?(cli_warnings = []) Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> Event.(emit shutting_down_node) ()) in - let* rpc = init_rpc config node in + let* rpc_servers = init_rpc config node internal_events in let rpc_downer = Lwt_exit.register_clean_up_callback ~loc:__LOC__ ~after:[log_node_downer] (fun _ -> - let*! () = Event.(emit shutting_down_rpc_server) () in - List.iter_p RPC_server.shutdown rpc) + let*! () = Event.(emit shutting_down_local_rpc_server) () in + List.iter_s + (function + | No_server -> Lwt.return_unit + | External_rpc_server rpc_servers -> + List.iter_p + (fun (local_server, rpc_process) -> + (* Stop the RPC_process first to avoid requests to + be forwarded to the note with a RPC_server that + is down. *) + let*! () = + Octez_rpc_process.Rpc_process_worker.stop rpc_process + in + let*! () = RPC_server.shutdown local_server in + Lwt.return_unit) + rpc_servers + | Local_rpc_server rpc_server -> + List.iter_p RPC_server.shutdown rpc_server) + rpc_servers) in let node_downer = Lwt_exit.register_clean_up_callback diff --git a/src/lib_base/unix/socket.ml b/src/lib_base/unix/socket.ml index bd23644131227bd0e32e47e727a5bf9cd0284045..d4274f6088ce9558c92d1cbf60fda8b687cbbd78 100644 --- a/src/lib_base/unix/socket.ml +++ b/src/lib_base/unix/socket.ml @@ -25,6 +25,20 @@ open Error_monad +type error += Handshake_failure + +let () = + register_error_kind + `Temporary + ~id:"handshake_failure" + ~title:"Handshake failure" + ~description:"Handshake failed" + ~pp:(fun ppf () -> + Format.fprintf ppf "Hanshake failed: cannot validate magic bytes") + Data_encoding.empty + (function Handshake_failure -> Some () | _ -> None) + (fun () -> Handshake_failure) + type addr = | Unix of string | Tcp of string * string * Unix.getaddrinfo_option list @@ -207,3 +221,14 @@ let recv ?timeout fd encoding = | Ok (read_len, message) -> if read_len <> len then tzfail (Decoding_error Extra_bytes) else return message + +let handshake socket magic_bytes = + let open Lwt_result_syntax in + let* () = send socket Data_encoding.Variable.bytes magic_bytes in + let* magic = recv socket Data_encoding.Variable.bytes in + fail_unless (Bytes.equal magic magic_bytes) Handshake_failure + +let get_temporary_socket_dir () = + match Sys.getenv_opt "XDG_RUNTIME_DIR" with + | Some xdg_runtime_dir when xdg_runtime_dir <> "" -> xdg_runtime_dir + | Some _ | None -> Filename.get_temp_dir_name () diff --git a/src/lib_base/unix/socket.mli b/src/lib_base/unix/socket.mli index a06d599cb1ba31f6581d7e02c264925ed6ffcc27..4fff6f542f7b1002405ce366931d60ee4496ccc0 100644 --- a/src/lib_base/unix/socket.mli +++ b/src/lib_base/unix/socket.mli @@ -57,3 +57,18 @@ val recv : Lwt_unix.file_descr -> 'a Data_encoding.t -> 'a tzresult Lwt.t + +(** [handshake socket magic_bytes] is a function to synchronize two + separate processes and start a communication. + + The scenario of the handshake is the following: + - both processes simultaneously send some [magic_bytes], + - both processes wait and checks the received bytes validity, + - handshake is finished. *) +val handshake : Lwt_unix.file_descr -> bytes -> unit tzresult Lwt.t + +(** [get_temporary_socket_dir ()] returns a temporary path for the + socket to be spawned. $XDG_RUNTIME_DIR is returned if the + environment variable is defined. Otherwise, the default temporary + directory is used. *) +val get_temporary_socket_dir : unit -> string diff --git a/src/lib_node_config/config_file.ml b/src/lib_node_config/config_file.ml index 52371cf1582e90e32df682645caafe2d84b15413..da0025a1073e538d247d74701cc66628dc147b9a 100644 --- a/src/lib_node_config/config_file.ml +++ b/src/lib_node_config/config_file.ml @@ -355,6 +355,7 @@ and p2p = { and rpc = { listen_addrs : string list; + local_listen_addrs : string list; cors_origins : string list; cors_headers : string list; tls : tls option; @@ -382,6 +383,7 @@ let default_p2p = let default_rpc = { listen_addrs = []; + local_listen_addrs = []; cors_origins = []; cors_headers = []; tls = None; @@ -549,13 +551,22 @@ let p2p = let rpc : rpc Data_encoding.t = let open Data_encoding in conv - (fun {cors_origins; cors_headers; listen_addrs; tls; acl; media_type} -> + (fun { + cors_origins; + cors_headers; + listen_addrs; + local_listen_addrs; + tls; + acl; + media_type; + } -> let cert, key = match tls with | None -> (None, None) | Some {cert; key} -> (Some cert, Some key) in ( Some listen_addrs, + Some local_listen_addrs, None, cors_origins, cors_headers, @@ -564,6 +575,7 @@ let rpc : rpc Data_encoding.t = acl, media_type )) (fun ( listen_addrs, + local_listen_addrs, legacy_listen_addr, cors_origins, cors_headers, @@ -586,14 +598,31 @@ let rpc : rpc Data_encoding.t = "Config file: Use only \"listen-addrs\" and not (legacy) \ \"listen-addr\"." in - {listen_addrs; cors_origins; cors_headers; tls; acl; media_type}) - (obj8 + let local_listen_addrs = + Option.value local_listen_addrs ~default:default_rpc.local_listen_addrs + in + { + listen_addrs; + local_listen_addrs; + cors_origins; + cors_headers; + tls; + acl; + media_type; + }) + (obj9 (opt "listen-addrs" ~description: "Hosts to listen to. If the port is not specified, the default \ port 8732 will be assumed." (list string)) + (opt + "local-listen-addrs" + ~description: + "Hosts to listen to. If the port is not specified, the default \ + port 8732 will be assumed." + (list string)) (opt "listen-addr" ~description:"Legacy value: Host to listen to" string) (dft "cors-origin" @@ -625,6 +654,8 @@ let rpc : rpc Data_encoding.t = Media_type.Command_line.encoding default_rpc.media_type)) +let rpc_encoding = rpc + let encoding = let open Data_encoding in conv @@ -808,9 +839,9 @@ let update ?(disable_config_validation = false) ?data_dir ?min_connections ?expected_connections ?max_connections ?max_download_speed ?max_upload_speed ?binary_chunks_size ?peer_table_size ?expected_pow ?bootstrap_peers ?listen_addr ?advertised_net_port ?discovery_addr ?(rpc_listen_addrs = []) - ?(allow_all_rpc = []) ?(media_type = Media_type.Command_line.Any) - ?(metrics_addr = []) ?operation_metadata_size_limit - ?(private_mode = default_p2p.private_mode) + ?(local_rpc_listen_addrs = []) ?(allow_all_rpc = []) + ?(media_type = Media_type.Command_line.Any) ?(metrics_addr = []) + ?operation_metadata_size_limit ?(private_mode = default_p2p.private_mode) ?(disable_p2p_maintenance = Option.is_none default_p2p.limits.maintenance_idle_time) ?(disable_p2p_swap = Option.is_none default_p2p.limits.swap_linger) @@ -890,6 +921,8 @@ let update ?(disable_config_validation = false) ?data_dir ?min_connections and rpc : rpc = { listen_addrs = unopt_list ~default:cfg.rpc.listen_addrs rpc_listen_addrs; + local_listen_addrs = + unopt_list ~default:cfg.rpc.local_listen_addrs local_rpc_listen_addrs; cors_origins = unopt_list ~default:cfg.rpc.cors_origins cors_origins; cors_headers = unopt_list ~default:cfg.rpc.cors_headers cors_headers; tls = Option.either rpc_tls cfg.rpc.tls; diff --git a/src/lib_node_config/config_file.mli b/src/lib_node_config/config_file.mli index 5fa1107e2fdc2360c916dc827e1305d407aed34d..9a63c29b3456eb52622270155fd3b32a1286af31 100644 --- a/src/lib_node_config/config_file.mli +++ b/src/lib_node_config/config_file.mli @@ -75,6 +75,7 @@ and p2p = { and rpc = { listen_addrs : string list; + local_listen_addrs : string list; cors_origins : string list; cors_headers : string list; tls : tls option; @@ -116,6 +117,7 @@ val update : ?advertised_net_port:int -> ?discovery_addr:string -> ?rpc_listen_addrs:string list -> + ?local_rpc_listen_addrs:string list -> ?allow_all_rpc:P2p_point.Id.addr_port_id list -> ?media_type:Media_type.Command_line.t -> ?metrics_addr:string list -> @@ -177,6 +179,8 @@ val resolve_metrics_addrs : val resolve_bootstrap_addrs : string list -> (P2p_point.Id.t * P2p_peer.Id.t option) list tzresult Lwt.t +val rpc_encoding : rpc Data_encoding.t + val encoding : t Data_encoding.t (** Return [p2p.bootstrap_peers] if not [None], diff --git a/src/lib_node_config/shared_arg.ml b/src/lib_node_config/shared_arg.ml index 30234945e6160384066ecd26a32beb1ec183442f..8a095fa82f72d24ac01ebcc68828eb96fdb7769b 100644 --- a/src/lib_node_config/shared_arg.ml +++ b/src/lib_node_config/shared_arg.ml @@ -49,6 +49,7 @@ type t = { advertised_net_port : int option; discovery_addr : string option; rpc_listen_addrs : string list; + local_rpc_listen_addrs : string list; private_mode : bool; disable_p2p_maintenance : bool; disable_p2p_swap : bool; @@ -184,10 +185,11 @@ let wrap data_dir config_file network connections max_download_speed max_upload_speed binary_chunks_size peer_table_size listen_addr advertised_net_port discovery_addr peers no_bootstrap_peers bootstrap_threshold private_mode disable_p2p_maintenance disable_p2p_swap - disable_mempool enable_testchain expected_pow rpc_listen_addrs rpc_tls - cors_origins cors_headers log_output log_coloring history_mode - synchronisation_threshold latency disable_config_validation allow_all_rpc - media_type metrics_addr operation_metadata_size_limit = + disable_mempool enable_testchain expected_pow rpc_listen_addrs + local_rpc_listen_addrs rpc_tls cors_origins cors_headers log_output + log_coloring history_mode synchronisation_threshold latency + disable_config_validation allow_all_rpc media_type metrics_addr + operation_metadata_size_limit = let actual_data_dir = Option.value ~default:Config_file.default_data_dir data_dir in @@ -215,6 +217,7 @@ let wrap data_dir config_file network connections max_download_speed advertised_net_port; discovery_addr; rpc_listen_addrs; + local_rpc_listen_addrs; private_mode; disable_p2p_maintenance; disable_p2p_swap; @@ -655,6 +658,16 @@ module Term = struct Arg.( value & opt_all string [] & info ~docs ~doc ~docv:"ADDR:PORT" ["rpc-addr"]) + let local_rpc_listen_addrs = + let doc = + "The TCP socket address at which this local RPC server instance can be \ + reached. As a local RPC server is handled by the node itself, calling \ + computational intensive RPCs can affect the performances of the node." + in + Arg.( + value & opt_all string [] + & info ~docs ~doc ~docv:"ADDR:PORT" ["local-rpc-addr"]) + let rpc_tls = let doc = "Enable TLS for this RPC server with the provided certificate and key." @@ -718,10 +731,11 @@ module Term = struct $ peer_table_size $ listen_addr $ advertised_net_port $ discovery_addr $ peers $ no_bootstrap_peers $ bootstrap_threshold $ private_mode $ disable_p2p_maintenance $ disable_p2p_swap $ disable_mempool - $ enable_testchain $ expected_pow $ rpc_listen_addrs $ rpc_tls - $ cors_origins $ cors_headers $ log_output $ log_coloring $ history_mode - $ synchronisation_threshold $ latency $ disable_config_validation - $ allow_all_rpc $ media_type $ metrics_addr $ operation_metadata_size_limit + $ enable_testchain $ expected_pow $ rpc_listen_addrs + $ local_rpc_listen_addrs $ rpc_tls $ cors_origins $ cors_headers + $ log_output $ log_coloring $ history_mode $ synchronisation_threshold + $ latency $ disable_config_validation $ allow_all_rpc $ media_type + $ metrics_addr $ operation_metadata_size_limit end let read_config_file args = @@ -852,6 +866,7 @@ let patch_config ?(may_override_network = false) ?(emit = Event.emit) disable_mempool; enable_testchain; rpc_listen_addrs; + local_rpc_listen_addrs; rpc_tls; cors_origins; cors_headers; @@ -1005,6 +1020,7 @@ let patch_config ?(may_override_network = false) ?(emit = Event.emit) ?advertised_net_port ?discovery_addr ~rpc_listen_addrs + ~local_rpc_listen_addrs ~allow_all_rpc ~media_type ~metrics_addr diff --git a/src/lib_node_config/shared_arg.mli b/src/lib_node_config/shared_arg.mli index 5d574693159b335316055180c8b8702db53ee36e..3328959a4b2dc99758b033e53fce5be1b58cef23 100644 --- a/src/lib_node_config/shared_arg.mli +++ b/src/lib_node_config/shared_arg.mli @@ -53,6 +53,8 @@ type t = { discovery_addr : string option; rpc_listen_addrs : string list; (** a list of addresses to listen to RPC requests on *) + local_rpc_listen_addrs : string list; + (** a list of addresses to listen to RPC requests on *) private_mode : bool; (** enables the private mode, see https://tezos.gitlab.io/user/node-configuration.html#private-node *) diff --git a/src/lib_rpc_http/RPC_middleware.ml b/src/lib_rpc_http/RPC_middleware.ml index 214d1e6e567a9f064356135f4eccfb3e1085b514..ee9a756bd6e7a8b6ca449f74b5b688b5db0da2ee 100644 --- a/src/lib_rpc_http/RPC_middleware.ml +++ b/src/lib_rpc_http/RPC_middleware.ml @@ -23,7 +23,7 @@ (* *) (*****************************************************************************) -let make_transform_callback forwarding_endpoint callback conn req body = +let make_transform_callback ?ctx forwarding_endpoint callback conn req body = let open Lwt_syntax in let open Cohttp in (* Using a [Cohttp_lwt.Body.t] destructs it. As we may need it @@ -62,6 +62,7 @@ let make_transform_callback forwarding_endpoint callback conn req body = in let* resp, body = Cohttp_lwt_unix.Client.call + ?ctx ~headers ~body:(Cohttp_lwt.Body.of_stream body_stream) (Request.meth req) @@ -104,5 +105,5 @@ let rpc_metrics_transform_callback ~update_metrics dir callback conn req body = (* Otherwise, the call must be done anyway. *) do_call () -let proxy_server_query_forwarder forwarding_endpoint = - make_transform_callback forwarding_endpoint +let proxy_server_query_forwarder ?ctx forwarding_endpoint = + make_transform_callback ?ctx forwarding_endpoint diff --git a/src/lib_rpc_http/RPC_middleware.mli b/src/lib_rpc_http/RPC_middleware.mli index 0424e78d08a4d7a55685c0521fded6e0461e937a..b02be991288e4761553d798df4cac7dc5b2997dd 100644 --- a/src/lib_rpc_http/RPC_middleware.mli +++ b/src/lib_rpc_http/RPC_middleware.mli @@ -30,7 +30,10 @@ that rewrites queries that the proxy server cannot handle and forwards them to the full node at the given [Uri.t]. *) val proxy_server_query_forwarder : - Uri.t -> RPC_server.callback -> RPC_server.callback + ?ctx:Cohttp_lwt_unix.Net.ctx -> + Uri.t -> + RPC_server.callback -> + RPC_server.callback (** A Resto middleware that transforms any server callback to an other that handles RPC metrics *) diff --git a/src/lib_rpc_process/dune b/src/lib_rpc_process/dune new file mode 100644 index 0000000000000000000000000000000000000000..76eebe9e388d0dcb04c5a5cca0e995272dd3732b --- /dev/null +++ b/src/lib_rpc_process/dune @@ -0,0 +1,24 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(library + (name octez_rpc_process) + (public_name octez-rpc-process) + (instrumentation (backend bisect_ppx)) + (libraries + octez-libs.base + octez-libs.base.unix + octez-node-config + octez-libs.rpc-http + octez-libs.rpc-http-server + lwt.unix + lwt-exit + prometheus-app) + (flags + (:standard) + -open Tezos_base.TzPervasives + -open Tezos_base + -open Tezos_base_unix + -open Octez_node_config + -open Tezos_rpc_http + -open Tezos_rpc_http_server)) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml new file mode 100644 index 0000000000000000000000000000000000000000..968acd4143901108168ce9be7713554e0d0b6420 --- /dev/null +++ b/src/lib_rpc_process/main.ml @@ -0,0 +1,280 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs. *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +type error += + | RPC_Process_Port_already_in_use of P2p_point.Id.t list + | Missing_socket_dir + +type parameters = { + rpc : Config_file.rpc; + rpc_comm_socket_path : string; + internal_events : Tezos_base.Internal_event_config.t; +} + +let parameters_encoding = + let open Data_encoding in + conv + (fun {rpc; rpc_comm_socket_path; internal_events} -> + (rpc, rpc_comm_socket_path, internal_events)) + (fun (rpc, rpc_comm_socket_path, internal_events) -> + {rpc; rpc_comm_socket_path; internal_events}) + (obj3 + (req "rpc" Config_file.rpc_encoding) + (req "rpc_comm_socket_path" Data_encoding.string) + (req "internal_events" Tezos_base.Internal_event_config.encoding)) + +let () = + register_error_kind + `Permanent + ~id:"rpc_process.main.process_port_already_in_use" + ~title:"Cannot start RPC process: RPC port already in use" + ~description: + "Another octez RPC process is probably running on the same RPC port." + ~pp:(fun ppf addrlist -> + Format.fprintf + ppf + "Another octez RPC process is probably running on one of these \ + addresses (%a). Please choose another RPC port." + (Format.pp_print_list P2p_point.Id.pp) + addrlist) + Data_encoding.(obj1 (req "addrlist" (list P2p_point.Id.encoding))) + (function + | RPC_Process_Port_already_in_use addrlist -> Some addrlist | _ -> None) + (fun addrlist -> RPC_Process_Port_already_in_use addrlist) ; + register_error_kind + `Permanent + ~id:"rpc_process.main.missing_socket_dir" + ~title:"Cannot start RPC process: missing socket dir" + ~description:"Cannot start RPC process without socket dir." + ~pp:(fun ppf () -> + Format.fprintf + ppf + "The path to the communication socket is missing. Use the --socket-dir \ + option to specify it when running the RPC process. ") + Data_encoding.empty + (function Missing_socket_dir -> Some () | _ -> None) + (fun () -> Missing_socket_dir) + +module Event = struct + include Internal_event.Simple + + let section = ["rpc-server"; "main"] + + let starting_rpc_server = + declare_4 + ~section + ~name:"starting_rpc_server" + ~msg:"starting RPC server on {host}:{port} (acl = {acl_policy})" + ~level:Notice + ("host", Data_encoding.string) + ("port", Data_encoding.uint16) + ("tls", Data_encoding.bool) + ("acl_policy", Data_encoding.string) +end + +(* Add default accepted CORS headers *) +let sanitize_cors_headers ~default headers = + List.map String.lowercase_ascii headers + |> String.Set.of_list + |> String.Set.(union (of_list default)) + |> String.Set.elements + +let launch_rpc_server (config : parameters) (addr, port) = + let open Lwt_result_syntax in + let media_types = config.rpc.media_type in + let*! acl_policy = RPC_server.Acl.resolve_domain_names config.rpc.acl in + let host = Ipaddr.V6.to_string addr in + let mode = + match config.rpc.tls with + | None -> `TCP (`Port port) + | Some {Config_file.cert; key} -> + `TLS (`Crt_file_path cert, `Key_file_path key, `No_password, `Port port) + in + let acl = + let open RPC_server.Acl in + find_policy acl_policy (Ipaddr.V6.to_string addr, Some port) + |> Option.value_f ~default:(fun () -> default addr) + in + let*! () = + Event.(emit starting_rpc_server) + (host, port, config.rpc.tls <> None, RPC_server.Acl.policy_type acl) + in + let cors_headers = + sanitize_cors_headers ~default:["Content-Type"] config.rpc.cors_headers + in + let cors = + Resto_cohttp.Cors. + { + allowed_origins = config.rpc.cors_origins; + allowed_headers = cors_headers; + } + in + let dir = Tezos_rpc.Directory.empty in + let server = + RPC_server.init_server + ~cors + ~acl + ~media_types:(Media_type.Command_line.of_command_line media_types) + dir + in + let callback (conn : Cohttp_lwt_unix.Server.conn) req body = + Tezos_rpc_http_server.RPC_server.resto_callback server conn req body + in + let callback = + let resolver_handle = "octez-node-unix-socket" in + let localhost = Format.asprintf "http://%s" resolver_handle in + let forwarding_endpoint = Uri.of_string localhost in + let resolver = + let h = Stdlib.Hashtbl.create 1 in + Stdlib.Hashtbl.add + h + resolver_handle + (`Unix_domain_socket config.rpc_comm_socket_path) ; + Resolver_lwt_unix.static h + in + let ctx = Cohttp_lwt_unix.Client.custom_ctx ~resolver () in + RPC_middleware.proxy_server_query_forwarder + ~ctx + forwarding_endpoint + callback + in + Lwt.catch + (fun () -> + let*! () = RPC_server.launch ~host server ~callback mode in + return server) + (function + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/1312 + This exception seems to be unreachable. + *) + | Unix.Unix_error (Unix.EADDRINUSE, "bind", "") -> + tzfail (RPC_Process_Port_already_in_use [(addr, port)]) + | exn -> fail_with_exn exn) + +let init_rpc parameters = + let open Lwt_result_syntax in + let* server = + let* p2p_point = + match parameters.rpc.listen_addrs with + | [addr] -> Config_file.resolve_rpc_listening_addrs addr + | _ -> + (* We assume that the config contains only one listening + address. This should hold thanks to the `init_rpc` + function from `bin_node/Node_run_command`. *) + assert false + in + match p2p_point with + | [point] -> launch_rpc_server parameters point + | _ -> + (* Same as above: only one p2p_point is expected here. *) + assert false + in + (* Add a cleanup call back to shutdown the RPC_server gracefully + when an exit signal is received. *) + let (_ccid : Lwt_exit.clean_up_callback_id) = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> + RPC_server.shutdown server) + in + return_unit + +let get_init_socket_path socket_dir pid = + let filename = Format.sprintf "init-rpc-socket-%d" pid in + Filename.concat socket_dir filename + +(* Magic bytes used for the external RPC process handshake. *) +let socket_magic = Bytes.of_string "TEZOS_RPC_SERVER_MAGIC_0" + +let create_init_socket socket_dir = + let open Lwt_result_syntax in + let* socket_dir = + match socket_dir with + | Some sd -> return sd + | None -> tzfail Missing_socket_dir + in + let pid = Unix.getpid () in + let init_socket_path = get_init_socket_path socket_dir pid in + let* init_socket_fd = Socket.connect (Unix init_socket_path) in + (* Unlink the socket as soon as both sides have opened it.*) + let*! () = Lwt_unix.unlink init_socket_path in + let* () = Socket.handshake init_socket_fd socket_magic in + return init_socket_fd + +let run socket_dir = + let open Lwt_result_syntax in + let* init_socket_fd = create_init_socket socket_dir in + let* parameters = Socket.recv init_socket_fd parameters_encoding in + let*! () = + Tezos_base_unix.Internal_event_unix.init + ~config:parameters.internal_events + () + in + let* () = init_rpc parameters in + (* Send the config ack as synchronization barrier for the init_rpc + phase. *) + let* () = Socket.send init_socket_fd Data_encoding.unit () in + let*! () = Lwt_unix.close init_socket_fd in + Lwt_utils.never_ending () + +let process socket_dir = + let open Lwt_result_syntax in + let main_promise = + Lwt.catch (fun () -> run socket_dir) (function exn -> fail_with_exn exn) + in + Lwt_main.run + (let*! r = Lwt_exit.wrap_and_exit main_promise in + match r with + | Ok () -> + let*! _ = Lwt_exit.exit_and_wait 0 in + Lwt.return (`Ok ()) + | Error err -> + let*! _ = Lwt_exit.exit_and_wait 1 in + Lwt.return @@ `Error (false, Format.asprintf "%a" pp_print_trace err)) + +module Term = struct + let socket_dir = + let open Cmdliner in + let doc = "Socket directory to communicate with node process" in + Arg.( + value + & opt (some string) None + & info ~docs:Shared_arg.Manpage.misc_section ~doc ["socket-dir"]) + + let term = Cmdliner.Term.(ret (const process $ socket_dir)) +end + +module Manpage = struct + let command_description = + "The $(b, octez-rpc-process) starts the RPC process that aims to serve as \ + the default endpoint for RPC queries. This server may communicate with an \ + Octez node." + + let description = [`S "DESCRIPTION"; `P command_description] + + let man = description + + let info = + Cmdliner.Cmd.info ~doc:"Run the Octez rpc process" ~man "octez-rpc-process" +end + +let cmd = Cmdliner.Cmd.v Manpage.info Term.term diff --git a/src/lib_rpc_process/rpc_process_worker.ml b/src/lib_rpc_process/rpc_process_worker.ml new file mode 100644 index 0000000000000000000000000000000000000000..f6fe6e9d561a621c4317ebdef04b5fd3e351eea4 --- /dev/null +++ b/src/lib_rpc_process/rpc_process_worker.ml @@ -0,0 +1,254 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module Event = struct + include Internal_event.Simple + + let section = ["node"; "main"] + + let shutting_down_rpc_process = + declare_0 + ~section + ~name:"shutting_down_rpc_process" + ~msg:"shutting down the RPC process" + ~level:Notice + () + + let rpc_process_started = + declare_1 + ~section + ~name:"rpc_process_started" + ~msg:"RPC process was started on pid {pid}" + ~level:Notice + ("pid", Data_encoding.int31) + + let rpc_process_exited_abnormally = + let open Unix in + let exit_status_encoding = + let open Data_encoding in + union + [ + case + (Tag 0) + ~title:"wexited" + int31 + (function WEXITED i -> Some i | _ -> None) + (fun i -> WEXITED i); + case + (Tag 1) + ~title:"wsignaled" + int31 + (function WSIGNALED i -> Some i | _ -> None) + (fun i -> WSIGNALED i); + case + (Tag 2) + ~title:"wstopped" + int31 + (function WSTOPPED i -> Some i | _ -> None) + (fun i -> WSTOPPED i); + ] + in + declare_2 + ~section + ~level:Error + ~name:"rpc_process_exited_status" + ~msg:"rpc process (pid {pid}) {status_msg}" + ("pid", Data_encoding.int31) + ~pp2:(fun fmt status -> + match status with + | WEXITED i -> + Format.fprintf fmt "terminated abnormally with exit code %i" i + | WSIGNALED i -> + Format.fprintf + fmt + "was killed by signal %s" + (Lwt_exit.signal_name i) + | WSTOPPED i -> + Format.fprintf + fmt + "was stopped by signal %s" + (Lwt_exit.signal_name i)) + ("status_msg", exit_status_encoding) + + let cannot_start_rpc_process = + declare_1 + ~section + ~name:"cannot_start_rpc_process" + ~level:Error + ~msg:"cannot start rpc process: {trace}" + ("trace", Data_encoding.string) + + let waiting_for_rpc_process_restart = + declare_1 + ~section + ~name:"waiting_for_rpc_process_restart" + ~level:Error + ~msg:"restarting RPC process in {sleep} seconds" + ("sleep", Data_encoding.float) +end + +(* State of the worker. *) +type t = { + rpc_config : Config_file.rpc; + events_config : Internal_event_config.t; + mutable server : Lwt_process.process_none option; + stop : (int * Unix.process_status) Lwt.t; + stopper : (int * Unix.process_status) Lwt.u; + comm_socket_path : string; +} + +let create ~comm_socket_path (config : Config_file.t) events_config = + let stop, stopper = Lwt.wait () in + { + rpc_config = config.rpc; + events_config; + server = None; + stop; + stopper; + comm_socket_path; + } + +let shutdown t = + let open Lwt_syntax in + match t.server with + | None -> return_unit + | Some process -> + let* () = Event.(emit shutting_down_rpc_process) () in + process#terminate ; + return_unit + +let stop t = + Lwt.wakeup t.stopper (0, Lwt_unix.WSTOPPED 0) ; + shutdown t + +let run_server t () = + let open Lwt_result_syntax in + let socket_dir = Tezos_base_unix.Socket.get_temporary_socket_dir () in + let socket_dir_arg = ["--socket-dir"; socket_dir] in + let args = "octez-rpc-process" :: socket_dir_arg in + let process = + Lwt_process.open_process_none + ~stdout:(`FD_copy Unix.stdout) + ~stderr:(`FD_copy Unix.stderr) + (Sys.executable_name, Array.of_list args) + in + let pid = process#pid in + let init_socket_path = Main.get_init_socket_path socket_dir pid in + let* init_socket_fd = + let* fds = Tezos_base_unix.Socket.bind (Unix init_socket_path) in + match fds with + | [fd] -> + let*! init_socket_fd, _ = Lwt_unix.accept ~cloexec:true fd in + let*! () = Lwt_unix.close fd in + return init_socket_fd + | _ -> + (* This assertions holds as long as + Tezos_base_unix.Socket.bind returns a single list element + when binding Unix sockets. *) + assert false + in + let* () = Tezos_base_unix.Socket.handshake init_socket_fd Main.socket_magic in + let parameters = + Main. + { + rpc = t.rpc_config; + internal_events = t.events_config; + rpc_comm_socket_path = t.comm_socket_path; + } + in + let* () = Socket.send init_socket_fd Main.parameters_encoding parameters in + let* () = Socket.recv init_socket_fd Data_encoding.unit in + let*! () = Lwt_unix.close init_socket_fd in + let*! () = Event.(emit rpc_process_started) pid in + t.server <- Some process ; + return t + +(* Evaluates [f]. If [f] fails, the error is caught, printed as an + error event, and [f] is re-evaluated after a [backoff] delay. The + delay increases at each failing try. *) +let rec may_start backoff f = + let open Lwt_result_syntax in + let timestamp, sleep = backoff in + let now = Time.System.now () in + let diff = Ptime.diff now timestamp in + if Ptime.Span.to_float_s diff > sleep then + protect + (fun () -> f ()) + ~on_error:(function + | errs -> + let*! () = + Event.( + emit + cannot_start_rpc_process + (Format.asprintf "%a" pp_print_trace errs)) + in + may_start (Time.System.now (), sleep *. 1.2) f) + else + let*! () = Event.(emit waiting_for_rpc_process_restart sleep) in + let*! () = Lwt_unix.sleep sleep in + may_start (timestamp, sleep) f + +(* Watch_dog make sure that the RPC process is restarted as soon as it + dies. *) +let watch_dog run_server = + let open Lwt_result_syntax in + let rec loop t = + match t.server with + | None -> + let* new_server = may_start (Time.System.epoch, 0.5) run_server in + loop new_server + | Some process -> ( + let wait_pid_t = + let*! _, status = Lwt_unix.waitpid [] process#pid in + (* Sleep is necessary here to avoid waitpid to be faster than + the Lwt_exit stack. It avoids the clean_up_starts to be + pending while the node is properly shutting down. *) + let*! () = Lwt_unix.sleep 1. in + Lwt.return (`Wait_pid status) + in + let stop_t = + let*! _ = t.stop in + Lwt.return `Stopped + in + let*! res = Lwt.choose [wait_pid_t; stop_t] in + match res with + | `Stopped -> return_unit + | `Wait_pid _ when not (Lwt.is_sleeping Lwt_exit.clean_up_starts) -> + return_unit + | `Wait_pid status -> + t.server <- None ; + let*! () = + Event.(emit rpc_process_exited_abnormally) (process#pid, status) + in + let* new_server = may_start (Time.System.epoch, 0.5) run_server in + loop new_server) + in + loop + +let start server = + let open Lwt_result_syntax in + let* new_server = run_server server () in + let _ = watch_dog (run_server server) new_server in + return_unit diff --git a/src/lib_rpc_process/rpc_process_worker.mli b/src/lib_rpc_process/rpc_process_worker.mli new file mode 100644 index 0000000000000000000000000000000000000000..4f8159f8cf70fc2c226f19a9e1ece30502d7fb7c --- /dev/null +++ b/src/lib_rpc_process/rpc_process_worker.mli @@ -0,0 +1,54 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** Worker that handles the RPC server spawned as an external process + by the node. + + This module defines an RPC process worker type [t] that is + responsible of dealing with the process' lifetime. *) + +(** Type of the RPC process worker*) +type t + +(** [create ~comm_socket_path config internal_event_config] creates the + worker initial state. [comm_socket_path] is a socket path that + will be used to communicate with the node (note that the socket is + created automatically, but the cleaning of it is not + handled). [config] contains all the RPC server configuration (such + as ACLs, cors_headers, …). *) +val create : + comm_socket_path:string -> Config_file.t -> Internal_event_config.t -> t + +(** Starts the external RPC process using fork+exec calls. It + implements a watch dog that is responsible of restarting the + process if it dies at some point. Additionally, if the process + fails to be restarted, it retries with an exponential back-off + until the restart is successful. + The promise is blocking until the RPC server is not fully + available to answer to PCs. *) +val start : t -> unit tzresult Lwt.t + +(** Stops gracefully the RPC process worker*) +val stop : t -> unit Lwt.t diff --git a/tezt/lib_tezos/cluster.ml b/tezt/lib_tezos/cluster.ml index 4fb61af9e17b0d7b01d52721ba527763eb9a479f..11097fad52e724304d8223230920a0835323659d 100644 --- a/tezt/lib_tezos/cluster.ml +++ b/tezt/lib_tezos/cluster.ml @@ -30,10 +30,15 @@ let fresh_name () = incr next_name ; "cluster" ^ string_of_int index -let create ?path ?name count arguments = +let create ?path ?name count ?rpc_local arguments = let name = match name with None -> fresh_name () | Some name -> name in List.map - (fun i -> Node.create ?path ~name:(name ^ "." ^ string_of_int i) arguments) + (fun i -> + Node.create + ?path + ~name:(name ^ "." ^ string_of_int i) + ?rpc_local + arguments) (range 1 count) let symmetric_add_peer a b = diff --git a/tezt/lib_tezos/cluster.mli b/tezt/lib_tezos/cluster.mli index 53e7cb974cf1f98e6da2163917d58651de999296..3efe285f6a5f303d77e7fd9b30c66d6479e27d38 100644 --- a/tezt/lib_tezos/cluster.mli +++ b/tezt/lib_tezos/cluster.mli @@ -55,13 +55,19 @@ (** Create several nodes. - Usage: [create count arguments] + Usage: [create count ?rpc_local arguments] - Create [count] nodes (using [Node.create]) sharing the same [arguments]. + Create [count] nodes (using [Node.create]) sharing the same [?rpc_local] + and [arguments]. Each node is named [name ^ "." ^ i] where [i] is the index of the node in the resulting list, starting from 1. *) val create : - ?path:string -> ?name:string -> int -> Node.argument list -> Node.t list + ?path:string -> + ?name:string -> + int -> + ?rpc_local:bool -> + Node.argument list -> + Node.t list (** {2 Topologies} *) diff --git a/tezt/lib_tezos/node.ml b/tezt/lib_tezos/node.ml index ea7cf31f088586288a21cab49868434500c28101..ae3424a35d8e9bfbb06a57637ff093ed2c78d0b3 100644 --- a/tezt/lib_tezos/node.ml +++ b/tezt/lib_tezos/node.ml @@ -56,6 +56,8 @@ type argument = | Cors_origin of string | Disable_mempool | Version + | RPC_additional_addr of string + | RPC_additional_addr_local of string let make_argument = function | Network x -> ["--network"; x] @@ -85,6 +87,8 @@ let make_argument = function | Cors_origin cors_origin -> ["--cors-origin"; cors_origin] | Disable_mempool -> ["--disable-mempool"] | Version -> ["--version"] + | RPC_additional_addr addr -> ["--rpc-addr"; addr] + | RPC_additional_addr_local addr -> ["--local-rpc-addr"; addr] let make_arguments arguments = List.flatten (List.map make_argument arguments) @@ -128,6 +132,8 @@ let is_redundant = function | Metrics_addr _, _ | Cors_origin _, _ | Disable_mempool, _ + | RPC_additional_addr _, _ + | RPC_additional_addr_local _, _ | Version, _ -> false @@ -139,6 +145,7 @@ module Parameters = struct net_addr : string option; mutable net_port : int; advertised_net_port : int option; + rpc_local : bool; rpc_host : string; rpc_port : int; rpc_tls : tls_config option; @@ -189,6 +196,8 @@ let advertised_net_port node = node.persistent_state.advertised_net_port let rpc_scheme node = match node.persistent_state.rpc_tls with Some _ -> "https" | None -> "http" +let rpc_local node = node.persistent_state.rpc_local + let rpc_host node = node.persistent_state.rpc_host let rpc_port node = node.persistent_state.rpc_port @@ -683,7 +692,7 @@ let wait_for_disconnections node disconnections = waiter let create ?runner ?(path = Constant.tezos_node) ?name ?color ?data_dir - ?event_pipe ?net_addr ?net_port ?advertised_net_port + ?event_pipe ?net_addr ?net_port ?advertised_net_port ?(rpc_local = false) ?(rpc_host = "localhost") ?rpc_port ?rpc_tls ?(allow_all_rpc = true) arguments = let name = match name with None -> fresh_name () | Some name -> name in @@ -713,6 +722,7 @@ let create ?runner ?(path = Constant.tezos_node) ?name ?color ?data_dir net_addr; net_port; advertised_net_port; + rpc_local; rpc_host; rpc_port; rpc_tls; @@ -813,7 +823,8 @@ let runlike_command_arguments node command arguments = in command :: "--data-dir" :: node.persistent_state.data_dir :: "--net-addr" :: (net_addr ^ string_of_int node.persistent_state.net_port) - :: "--rpc-addr" + :: (if node.persistent_state.rpc_local then "--local-rpc-addr" + else "--rpc-addr") :: (rpc_addr ^ string_of_int node.persistent_state.rpc_port) :: command_args @@ -874,7 +885,7 @@ let replay ?on_terminate ?event_level ?event_sections_levels ?(strict = false) arguments let init ?runner ?path ?name ?color ?data_dir ?event_pipe ?net_port - ?advertised_net_port ?rpc_host ?rpc_port ?rpc_tls ?event_level + ?advertised_net_port ?rpc_local ?rpc_host ?rpc_port ?rpc_tls ?event_level ?event_sections_levels ?patch_config ?snapshot arguments = (* The single process argument does not exist in the configuration file of the node. It is only known as a command-line option. As a @@ -892,6 +903,7 @@ let init ?runner ?path ?name ?color ?data_dir ?event_pipe ?net_port ?event_pipe ?net_port ?advertised_net_port + ?rpc_local ?rpc_host ?rpc_port ?rpc_tls diff --git a/tezt/lib_tezos/node.mli b/tezt/lib_tezos/node.mli index b4d04b0eb0f1eaa4f68b2379108681a4652ec8dc..14ec898dec8d629e51455201f031786ea9abc8dd 100644 --- a/tezt/lib_tezos/node.mli +++ b/tezt/lib_tezos/node.mli @@ -84,6 +84,8 @@ type argument = | Cors_origin of string (** [--cors-origin] *) | Disable_mempool (** [--disable-mempool] *) | Version (** [--version] *) + | RPC_additional_addr of string (** [--rpc-addr] *) + | RPC_additional_addr_local of string (** [--local-rpc-addr] *) (** A TLS configuration for the node: paths to a [.crt] and a [.key] file. @@ -112,6 +114,9 @@ type t provided, or a value allowing the local Tezt program to connect to it if it is. + Default [rpc_local] is [false]. If [rpc_local] is [true], the node will not + spawn a process for non-blocking RPCs. + Default values for [net_port] or [rpc_port] are chosen automatically with values starting from 16384 (configurable with `--starting-port`). They are used by [config_init] @@ -139,6 +144,7 @@ val create : ?net_addr:string -> ?net_port:int -> ?advertised_net_port:int -> + ?rpc_local:bool -> ?rpc_host:string -> ?rpc_port:int -> ?rpc_tls:tls_config -> @@ -214,6 +220,9 @@ val advertised_net_port : t -> int option Returns [https] if node is started with [--rpc-tls], otherwise [http] *) val rpc_scheme : t -> string +(** Returns [False] if RPCs are handled by a dedicated process. *) +val rpc_local : t -> bool + (** Get the RPC host given as [--rpc-addr] to a node. *) val rpc_host : t -> string @@ -556,6 +565,7 @@ val init : ?event_pipe:string -> ?net_port:int -> ?advertised_net_port:int -> + ?rpc_local:bool -> ?rpc_host:string -> ?rpc_port:int -> ?rpc_tls:tls_config -> diff --git a/tezt/tests/RPC_test.ml b/tezt/tests/RPC_test.ml index 8c1ceb0db7a94a0aaf7d6c81a84f92fa66646c09..46d5a7b60d2d8f65318f917b6b3b7f24ec164402 100644 --- a/tezt/tests/RPC_test.ml +++ b/tezt/tests/RPC_test.ml @@ -1610,6 +1610,151 @@ let test_no_service_at_valid_prefix address () = in unit +let point_of_port port = "127.0.0.1:" ^ Int.to_string port + +let wait_for_starting_rpc_server_event ~local ?fail node port = + let filter = + match fail with + | None -> + fun json -> + let event_port = JSON.(json |-> "port" |> as_int) in + if port = event_port then Some () else None + | Some fail_msg -> fun _ -> Test.fail fail_msg + in + Node.wait_for + node + (if local then "starting_local_rpc_server.v0" else "starting_rpc_server.v0") + filter + +let make_endpoint port = + Client.Foreign_endpoint Endpoint.{host = "127.0.0.1"; scheme = "http"; port} + +let test_local_rpc_server = + Test.register + ~__FILE__ + ~title:"RPC local server" + ~tags:["rpc"; "process"; "local_server"] + @@ fun () -> + let node = Node.create ~rpc_local:true [] in + (* Register event watchers for local RPC server before the node is running to + ensure they will not be missed. *) + let local_event_promise = + wait_for_starting_rpc_server_event ~local:true node (Node.rpc_port node) + in + (* Register event watchers for process RPC server that will make the test + fails if detected. *) + let _ = + wait_for_starting_rpc_server_event + ~local:false + ~fail:"Process RPC server detected" + node + (Node.rpc_port node) + in + (* Run the node *) + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + Log.info "Node ready." ; + let* client = Client.init () in + Log.info "Checking if host is available." ; + let* _ = + Client.RPC.call ~endpoint:(make_endpoint (Node.rpc_port node)) client + @@ RPC.get_version + in + Log.info "Checking if local RPC server has been well started" ; + local_event_promise + +let test_process_rpc_server = + Test.register + ~__FILE__ + ~title:"RPC process server" + ~tags:["rpc"; "process"; "local_server"] + @@ fun () -> + (* By default Node module start a process RPC server. *) + let node = Node.create [] in + (* Register event watchers for process RPC server before the node is running + to ensure they will not be missed. *) + let process_event_promise = + wait_for_starting_rpc_server_event ~local:false node (Node.rpc_port node) + in + (* Register event watchers for local RPC server that will make the test fails + if detected. *) + let _ = + wait_for_starting_rpc_server_event + ~local:true + ~fail:"Local RPC server detected" + node + (Node.rpc_port node) + in + (* Run the node *) + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + Log.info "Node ready." ; + let* client = Client.init () in + Log.info "Checking if host is available." ; + let* _ = + Client.RPC.call ~endpoint:(make_endpoint (Node.rpc_port node)) client + @@ RPC.get_version + in + Log.info "Checking if process RPC server has been well started" ; + process_event_promise + +let test_local_and_process_rpc_servers = + Test.register + ~__FILE__ + ~title:"RPC local servers and process servers" + ~tags:["rpc"; "process"; "local_server"] + @@ fun () -> + (* Generate some ports for additional process RPC servers *) + let process_rpc_ports = List.init 2 (fun _ -> Port.fresh ()) in + (* Generate some ports for additional local RPC servers *) + let local_rpc_ports = List.init 2 (fun _ -> Port.fresh ()) in + let node = + Node.create + (List.map + (fun port -> Node.RPC_additional_addr (point_of_port port)) + process_rpc_ports + @ List.map + (fun port -> Node.RPC_additional_addr_local (point_of_port port)) + local_rpc_ports) + in + (* Add the RPC server created by default in Node module. *) + let process_rpc_ports = Node.rpc_port node :: process_rpc_ports in + (* Register event watchers for both process and local RPC servers before the + node is running to ensure they will not be missed. *) + let process_event_promises = + List.map + (wait_for_starting_rpc_server_event ~local:false node) + process_rpc_ports + in + let local_event_promises = + List.map + (wait_for_starting_rpc_server_event ~local:true node) + local_rpc_ports + in + (* Run the node *) + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + Log.info "Node ready." ; + let* client = Client.init () in + Log.info "Checking if process hosts are available." ; + let* _ = + Lwt_list.map_p + (fun port -> + Client.RPC.call ~endpoint:(make_endpoint port) client @@ RPC.get_version) + process_rpc_ports + in + Log.info "Checking if local hosts are available." ; + let* _ = + Lwt_list.map_p + (fun port -> + Client.RPC.call ~endpoint:(make_endpoint port) client @@ RPC.get_version) + local_rpc_ports + in + Log.info "Checking if process RPC servers have been well started" ; + let* () = Lwt_list.iter_p (fun p -> p) process_event_promises in + Log.info "Checking if local RPC servers have been well started" ; + Lwt_list.iter_p (fun p -> p) local_event_promises + let register protocols = Regression.register ~__FILE__ diff --git a/tezt/tests/main.ml b/tezt/tests/main.ml index d2d946f35d756610068eccc9429b278af42d1b3e..3148a4f700b73b697a93030e97113fcb6c631210 100644 --- a/tezt/tests/main.ml +++ b/tezt/tests/main.ml @@ -174,6 +174,7 @@ let register_protocol_tests_that_use_supports_correctly () = Protocol_limits.register ~protocols ; Proxy.register ~protocols ; Proxy_server_test.register ~protocols ; + Rpc_process.register ~protocols ; RPC_test.register protocols ; Rpc_versioning_attestation.register ~protocols ; Reject_malformed_micheline.register ~protocols ; diff --git a/tezt/tests/p2p.ml b/tezt/tests/p2p.ml index d84c72bcb5825a4bec33eb64b967bd912f856d61..a3f83eee39281cfce25e73d11cee8c0bb7c8170b 100644 --- a/tezt/tests/p2p.ml +++ b/tezt/tests/p2p.ml @@ -345,11 +345,21 @@ module Maintenance = struct max_target max_threshold max_connections ; - let* target_node = Node.init [Connections expected_connections] in + (* TODO: https://gitlab.com/tezos/tezos/-/issues/6442 + This test launches 10 nodes and consumes a large amount of memory. + To reduce memory consumption each node launches its RPC server locally. + A better way to reduce memory consumption would be to use nodes that + only have the p2p layer. *) + let* target_node = + Node.init ~rpc_local:true [Connections expected_connections] + in let* target_client = Client.init ~endpoint:(Node target_node) () in Log.info "Target created." ; let nodes = - Cluster.create max_connections [Connections (max_connections - 1)] + Cluster.create + max_connections + ~rpc_local:true + [Connections (max_connections - 1)] in Cluster.clique nodes ; let* () = Cluster.start ~public:true nodes in diff --git a/tezt/tests/rpc_process.ml b/tezt/tests/rpc_process.ml new file mode 100644 index 0000000000000000000000000000000000000000..3e0d1b8dae1933e52c222788925cb699b4fb99d7 --- /dev/null +++ b/tezt/tests/rpc_process.ml @@ -0,0 +1,92 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: Node's RPC_process + Invocation: dune exec tezt/tests/main.exe -- --file rpc_process.ml + Subject: Tests the resilience of the RPC process +*) + +let wait_for_RPC_process_pid node = + let filter json = JSON.(json |> as_int_opt) in + Node.wait_for node "rpc_process_started.v0" filter + +type signal = SIGABRT | SIGINT | SIGKILL | SIGQUIT | SIGTERM + +let signal_to_int = function + | SIGABRT -> Sys.sigabrt + | SIGINT -> Sys.sigint + | SIGKILL -> Sys.sigkill + | SIGQUIT -> Sys.sigquit + | SIGTERM -> Sys.sigterm + +let pp_signal ppf signal = + let str = + match signal with + | SIGABRT -> "sigabrt" + | SIGINT -> "sigint" + | SIGKILL -> "sigkill" + | SIGQUIT -> "sigquit" + | SIGTERM -> "sigterm" + in + Format.fprintf ppf "%s" str + +let kill_process ~pid ~signal = + Log.info "Kill the rpc process (pid %d) with signal %a" pid pp_signal signal ; + Unix.kill pid (signal_to_int signal) + +let head_can_be_requested ~expected_level client = + let* json = Client.RPC.call client @@ RPC.get_chain_block_header () in + let v = JSON.(json |-> "level" |> as_int) in + return (v = expected_level) + +let test_kill = + Protocol.register_test + ~__FILE__ + ~title:"RPC process kill" + ~tags:["rpc"; "process"; "kill"] + @@ fun protocol -> + let node = Node.create [] in + let wait_RPC_process_pid = wait_for_RPC_process_pid node in + let* () = Node.run node [] in + let* () = Node.wait_for_ready node in + let* client = Client.init ~endpoint:(Node node) () in + let* rpc_process_pid = wait_RPC_process_pid in + Log.info "RPC process is now runnig on pid %d" rpc_process_pid ; + let* () = Client.activate_protocol_and_wait ~protocol client in + Log.info "Wait for level 1" ; + let* (_ : int) = Node.wait_for_level node 1 in + Log.info "Head can be requested" ; + let* (_ : bool) = head_can_be_requested ~expected_level:1 client in + let wait_new_rpc_process_pid = wait_for_RPC_process_pid node in + let () = kill_process ~pid:rpc_process_pid ~signal:SIGKILL in + let* new_rpc_process_pid = wait_new_rpc_process_pid in + Log.info "The new RPC process is now runnig on pid %d" new_rpc_process_pid ; + Log.info "Head can still be requested" ; + let* (_ : bool) = head_can_be_requested ~expected_level:1 client in + unit + +let register ~protocols = test_kill protocols