diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index 207afc99ce8ed848eec54ae0b1e82edc47bd3d49..8f01a0beb07964cefe04e5e4bc4a0c3cc1a38f2a 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -4745,6 +4745,10 @@ let octez_rpc_process = octez_node_config |> open_; octez_rpc_http |> open_; octez_rpc_http_server |> open_; + octez_rpc_http_client_unix |> open_; + octez_rpc_http_client |> open_; + octez_shell_services; + octez_store |> open_; lwt_unix; lwt_exit; prometheus_app; diff --git a/src/lib_rpc_process/directory.ml b/src/lib_rpc_process/directory.ml index 6c838511f128a733db54688ec67a47a2e4c3111a..d5c80065eb18c72227b9fedb0cd0b5b9fd82c17c 100644 --- a/src/lib_rpc_process/directory.ml +++ b/src/lib_rpc_process/directory.ml @@ -5,16 +5,16 @@ (* *) (*****************************************************************************) -let build_rpc_directory node_version config = - let dir = Tezos_shell.Version_directory.rpc_directory node_version in - let dir = +let build_rpc_directory node_version config _store = + let static_dir = Tezos_shell.Version_directory.rpc_directory node_version in + let static_dir = Tezos_shell.Config_directory.build_rpc_directory_for_rpc_process ~user_activated_upgrades: config.Config_file.blockchain_network.user_activated_upgrades ~user_activated_protocol_overrides: config.blockchain_network.user_activated_protocol_overrides ~dal_config:config.blockchain_network.dal_config - dir + static_dir in - Tezos_rpc.Directory.register0 dir Node_services.S.config (fun () () -> + Tezos_rpc.Directory.register0 static_dir Node_services.S.config (fun () () -> Lwt.return_ok config) diff --git a/src/lib_rpc_process/directory.mli b/src/lib_rpc_process/directory.mli index ee21ec7c5f3c642389084fbd2a3e85c53286c975..546fa8d0b97f108a0d665d1ecb84496ccd6c8878 100644 --- a/src/lib_rpc_process/directory.mli +++ b/src/lib_rpc_process/directory.mli @@ -5,10 +5,12 @@ (* *) (*****************************************************************************) -(** [build_rpc_directory node_version config] builds the Tezos RPC directory for - the rpc process. RPCs handled here are not forwarded to the node. +(** [build_rpc_directory node_version config store] builds the + Tezos RPC directory for the rpc process. RPCs handled here are not + forwarded to the node. *) val build_rpc_directory : Tezos_version.Octez_node_version.t -> Octez_node_config.Config_file.t -> + Store.t option ref -> unit Tezos_rpc.Directory.t diff --git a/src/lib_rpc_process/dune b/src/lib_rpc_process/dune index 7cf83b04e7255fded5e9e586649c3c574efd5153..08ac42481b8f968ebba29aff23cb50826e767c84 100644 --- a/src/lib_rpc_process/dune +++ b/src/lib_rpc_process/dune @@ -12,6 +12,10 @@ octez-node-config octez-libs.rpc-http octez-libs.rpc-http-server + octez-libs.rpc-http-client-unix + octez-libs.rpc-http-client + octez-shell-libs.shell-services + octez-shell-libs.store lwt.unix lwt-exit prometheus-app) @@ -23,4 +27,7 @@ -open Tezos_base_unix -open Octez_node_config -open Tezos_rpc_http - -open Tezos_rpc_http_server)) + -open Tezos_rpc_http_server + -open Tezos_rpc_http_client_unix + -open Tezos_rpc_http_client + -open Tezos_store)) diff --git a/src/lib_rpc_process/forward_handler.mli b/src/lib_rpc_process/forward_handler.mli index 3141e835dc2fd41b7540c7efdc5a8c6acdcef81b..b4de110a1a80d0737ec5d38e3086b99027586569 100644 --- a/src/lib_rpc_process/forward_handler.mli +++ b/src/lib_rpc_process/forward_handler.mli @@ -29,3 +29,7 @@ satisfied. *) val callback : acl:RPC_server.Acl.t -> RPC_server.server -> string -> RPC_server.callback + +val socket_forwarding_uri : string + +val build_socket_redirection_ctx : string -> Cohttp_lwt_unix.Net.ctx diff --git a/src/lib_rpc_process/head_daemon.ml b/src/lib_rpc_process/head_daemon.ml new file mode 100644 index 0000000000000000000000000000000000000000..2e97a69a8856b41eef1d0dd9545c8334da429900 --- /dev/null +++ b/src/lib_rpc_process/head_daemon.ml @@ -0,0 +1,119 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +open Parameters + +module Events = struct + include Internal_event.Simple + + let section = ["octez_rpc_server"] + + let daemon_error = + declare_1 + ~section + ~name:"octez_rpc_server_daemon_error" + ~msg:"Daemon thrown an error: {error}" + ~level:Notice + ~pp1:Error_monad.pp_print_trace + ("error", Error_monad.trace_encoding) + + let new_head = + declare_1 + ~section + ~name:"new_head" + ~msg:"New head received at level ({level})" + ~level:Notice + ("level", Data_encoding.int32) + + let synchronized = + declare_1 + ~section + ~name:"synchronized" + ~msg:"Store synchronized up to level {level}" + ~level:Notice + ("level", Data_encoding.int32) + + let shutting_head_daemon = + declare_0 + ~section + ~name:"shutting_head_daemon" + ~msg:"shutting down head daemon" + ~level:Info + () +end + +module Daemon = struct + type t = { + daemon : unit tzresult Lwt.t; + head_stream_stopper : Tezos_rpc.Context.stopper; + } + + (** [make_stream_daemon ~on_head ~head_stream] calls [on_head] on + each newly received value from [head_stream]. + + It returns a couple [(p, stopper)] where [p] is a promise + resolving when the stream closes and [stopper] is a function + closing the stream. *) + let make_stream_daemon ~on_head ~head_stream = + let open Lwt_result_syntax in + let* head_stream, head_stream_stopper = head_stream in + let rec stream_processor () = + let*! head_element = Lwt_stream.get head_stream in + match head_element with + | None -> return_unit + | Some element -> + let*! processed_head = on_head element in + let*! () = + match processed_head with + | Ok () -> Lwt.return_unit + | Error trace -> Events.(emit daemon_error) trace + in + stream_processor () + in + return {daemon = stream_processor (); head_stream_stopper} + + let shutdown {head_stream_stopper; _} = + let open Lwt_syntax in + let* () = Events.(emit shutting_head_daemon) () in + head_stream_stopper () ; + return_unit +end + +let handle_new_head _dynamic_store _parameters + (_block_hash, (header : Tezos_base.Block_header.t)) = + let open Lwt_result_syntax in + let*! () = Events.(emit new_head) header.shell.level in + (* TODO: Synchronize the store *) + return_unit + +let init dynamic_store parameters = + let ctx = + Forward_handler.build_socket_redirection_ctx parameters.rpc_comm_socket_path + in + let module CustomRetryClient = struct + include RPC_client_unix.RetryClient + + let call ?ctx:_ = call ~ctx + end in + let module Custom_rpc_client = + RPC_client.Make (Resto_cohttp_client.Client.OfCohttp (CustomRetryClient)) in + let rpc_config = + Custom_rpc_client. + { + media_type = Media_type.Command_line.Any; + endpoint = Uri.of_string Forward_handler.socket_forwarding_uri; + logger = null_logger; + } + in + let rpc_ctxt = + new Custom_rpc_client.http_ctxt + rpc_config + (Media_type.Command_line.of_command_line rpc_config.media_type) + in + Daemon.make_stream_daemon + ~on_head:(handle_new_head dynamic_store parameters) + ~head_stream:(Tezos_shell_services.Monitor_services.heads rpc_ctxt `Main) diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index 60fa896ef9ac189c91804925ffbefc18578f5261..de14ad765f9d837f01c206a2ba9bbcd61b20f9bd 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -66,8 +66,7 @@ let sanitize_cors_headers ~default headers = |> String.Set.(union (of_list default)) |> String.Set.elements -let launch_rpc_server (params : Parameters.t) (addr, port) = - let open Config_file in +let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) = let open Lwt_result_syntax in let media_types = params.config.rpc.media_type in let*! acl_policy = @@ -101,7 +100,12 @@ let launch_rpc_server (params : Parameters.t) (addr, port) = allowed_headers = cors_headers; } in - let dir = Directory.build_rpc_directory params.node_version params.config in + let dir = + Directory.build_rpc_directory + params.node_version + params.config + dynamic_store + in let server = RPC_server.init_server ~cors @@ -131,7 +135,7 @@ let launch_rpc_server (params : Parameters.t) (addr, port) = tzfail (RPC_Process_Port_already_in_use [(addr, port)]) | exn -> fail_with_exn exn) -let init_rpc parameters = +let init_rpc dynamic_store parameters = let open Lwt_result_syntax in let* server = let* p2p_point = @@ -146,7 +150,7 @@ let init_rpc parameters = assert false in match p2p_point with - | [point] -> launch_rpc_server parameters point + | [point] -> launch_rpc_server dynamic_store parameters point | _ -> (* Same as above: only one p2p_point is expected here. *) assert false @@ -190,11 +194,16 @@ let run socket_dir = ~config:parameters.Parameters.internal_events () in - let* () = init_rpc parameters in - (* Send the params ack as synchronization barrier for the init_rpc + let dynamic_store : Store.t option ref = ref None in + let* () = init_rpc dynamic_store parameters in + (* Send the config ack as synchronisation barrier for the init_rpc phase. *) let* () = Socket.send init_socket_fd Data_encoding.unit () in - let*! () = Lwt_unix.close init_socket_fd in + let* daemon = Head_daemon.init dynamic_store parameters in + let (_ccid : Lwt_exit.clean_up_callback_id) = + Lwt_exit.register_clean_up_callback ~loc:__LOC__ (fun _ -> + Head_daemon.Daemon.shutdown daemon) + in Lwt_utils.never_ending () let process socket_dir =