From 6705b96ed3f79e006ae75f770184602a004d2a45 Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Wed, 31 Jul 2024 10:14:50 +0100 Subject: [PATCH] RPC: FIX: RPC middleware and resto use call_with_closefn() and sleep_fn() Co-authored-by: Diana Savvatina (Diana Savvatina was the main author of this, I merely rebased) Adding a callback for sleep_fn() requesting Cohttp to wait for EOF from the client. Adding a callback for conn_closed() allowing Cohttp to notify the RPC middlewhere if the client closes the connection or dies. RPC middleware requests and saves the close_fn callback which is called when the created connection should be closed. --- resto/src/server.ml | 10 ++- resto/src/server.mli | 17 ++-- src/bin_proxy_server/proxy_server_main_run.ml | 4 + src/lib_rpc_http/RPC_middleware.ml | 77 ++++++++++++++++--- src/lib_rpc_http/RPC_middleware.mli | 8 ++ src/lib_rpc_http/RPC_server.ml | 18 ++++- src/lib_rpc_process/forward_handler.ml | 4 +- src/lib_rpc_process/forward_handler.mli | 15 ++-- src/lib_rpc_process/main.ml | 5 ++ 9 files changed, 132 insertions(+), 26 deletions(-) diff --git a/resto/src/server.ml b/resto/src/server.ml index 75f9cac66e74..1eef5ed6fc44 100644 --- a/resto/src/server.ml +++ b/resto/src/server.ml @@ -529,7 +529,7 @@ module Make (Encoding : Resto.ENCODING) (Log : LOGGING) = struct (* Promise a running RPC server. *) let launch ?(host = "::") server ?(conn_closed = ignore) - ?(callback = resto_callback server) mode = + ?(callback = resto_callback server) ?sleep_fn mode = Conduit_lwt_unix.init ~src:host () >>= fun ctx -> let ctx = Cohttp_lwt_unix.Net.init ~ctx () in server.worker <- @@ -581,8 +581,12 @@ module Make (Encoding : Resto.ENCODING) (Log : LOGGING) = struct ~ctx ~mode ~on_exn - (Cohttp_lwt_unix.Server.make_response_action ~callback ~conn_closed ())) ; - Log.lwt_log_info "Server started (agent: %s)" server.agent + (Cohttp_lwt_unix.Server.make_response_action + ~callback + ~conn_closed + ?sleep_fn + ())) ; + Log.lwt_log_info "Resto server started (agent: %s)" server.agent let init_and_launch ?(host = "::") ?(cors = Cors.default) ?(agent = Agent.default_agent) ?(acl = Acl.Allow_all {except = []}) diff --git a/resto/src/server.mli b/resto/src/server.mli index 15dd0dd8eb98..2b3551a0e561 100644 --- a/resto/src/server.mli +++ b/resto/src/server.mli @@ -113,19 +113,26 @@ module Make (Encoding : Resto.ENCODING) (Log : LOGGING) : sig val resto_callback : server -> callback (** [launch server ?conn_closed ?callback listening_protocol] starts - the given resto [server] initiating the listening loop using the + the given Resto [server], initiating the listening loop using the [listening_protocol]. - @param [callback] overwrites (if given) the default handler of - resto each http query will be treated by. + @param [callback] overwrites (if given) the default handler of + Resto each HTTP query will be treated by. - @param [conn_closed] is an optional function that is called when - a connection is closed. *) + @param [conn_closed] is an optional function that is called when + a connection is closed. + + @param [sleep_fn] is an optional function used to actively wait + for EOF on the established connection. If not provided, EOF will not be + awaited while [callback] is executed. This can lead to inefficient + resource usage while serving an abandoned request or can lead to a + resource leak if the [callback] promise is never resolved for some reason. *) val launch : ?host:string -> server -> ?conn_closed:(Cohttp_lwt_unix.Server.conn -> unit) -> ?callback:callback -> + ?sleep_fn:(unit -> unit Lwt.t) -> Conduit_lwt_unix.server -> unit Lwt.t diff --git a/src/bin_proxy_server/proxy_server_main_run.ml b/src/bin_proxy_server/proxy_server_main_run.ml index 2eb5d9fc3882..c4ed9772bede 100644 --- a/src/bin_proxy_server/proxy_server_main_run.ml +++ b/src/bin_proxy_server/proxy_server_main_run.ml @@ -51,8 +51,12 @@ let launch_rpc_server dir {address; port; tls_cert_and_key; forwarding_endpoint} dir ~media_types:Tezos_rpc_http.Media_type.all_media_types in + let forwarder_resources = + Tezos_rpc_http_server.RPC_middleware.init_forwarder () + in let middleware = Tezos_rpc_http_server.RPC_middleware.proxy_server_query_forwarder + forwarder_resources forwarding_endpoint in let callback conn req body = diff --git a/src/lib_rpc_http/RPC_middleware.ml b/src/lib_rpc_http/RPC_middleware.ml index 4e376293d59d..f58555667d0e 100644 --- a/src/lib_rpc_http/RPC_middleware.ml +++ b/src/lib_rpc_http/RPC_middleware.ml @@ -37,10 +37,35 @@ module Events = struct ("path", Data_encoding.string) end -let make_transform_callback ?ctx ?forwarder_events forwarding_endpoint callback - conn req body = +module ForwarderConnMap = Map.Make (Cohttp.Connection) + +type forwarder_resources = { + mutable opened_conns : (unit -> unit) ForwarderConnMap.t; +} + +let src = Logs.Src.create "rpc_middleware" ~doc:"RPC middleware" + +module Log = (val Logs.src_log src : Logs.LOG) + +let init_forwarder () = {opened_conns = ForwarderConnMap.empty} + +let forwarding_conn_closed forwarder_resources (_, con) : unit = + if ForwarderConnMap.mem con forwarder_resources.opened_conns then + let callback = ForwarderConnMap.find con forwarder_resources.opened_conns in + match callback with + | Some f -> f () + | None -> + Log.warn (fun m -> + m + "(%s) got conn closed, shutdown callback not found" + (Cohttp.Connection.to_string con)) + else () + +let make_transform_callback ?ctx ?forwarder_events forwarder_resources + forwarding_endpoint callback conn req body = let open Lwt_syntax in let open Cohttp in + let _, con = conn in (* Using a [Cohttp_lwt.Body.t] destructs it. As we may need it twice, we explicitly clone the underlying [Lwt_stream.t]. *) let body_stream = Cohttp_lwt.Body.to_stream body in @@ -66,7 +91,7 @@ let make_transform_callback ?ctx ?forwarder_events forwarding_endpoint callback | `Expert (response, _) | `Response (response, _) -> Response.status response = `Not_found in - if answer_has_not_found_status answer then + if answer_has_not_found_status answer then ( let* () = match forwarder_events with | Some {on_forwarding; _} -> on_forwarding req @@ -98,17 +123,30 @@ let make_transform_callback ?ctx ?forwarder_events forwarding_endpoint callback let* () = Events.(emit forwarding_accepted_conn) ( Int32.of_int (Unix.getpid ()), - Cohttp.Connection.to_string (snd conn), + Cohttp.Connection.to_string con, Uri.to_string uri ) in - let* resp, body = - Cohttp_lwt_unix.Client.call + let* t, closefn = + Cohttp_lwt_unix.Client.call_with_closefn ?ctx ~headers ~body:(Cohttp_lwt.Body.of_stream body_stream) (Request.meth req) uri in + let shutdown () = + Log.debug (fun m -> + m + "(%s) got conn closed, closing bound conn for %s" + (Cohttp.Connection.to_string con) + (Uri.path uri)) ; + forwarder_resources.opened_conns <- + ForwarderConnMap.remove con forwarder_resources.opened_conns ; + closefn () + in + forwarder_resources.opened_conns <- + ForwarderConnMap.add con shutdown forwarder_resources.opened_conns ; + let* resp, body = t in let status = Response.status resp in let headers = Response.headers resp |> fun h -> @@ -116,8 +154,18 @@ let make_transform_callback ?ctx ?forwarder_events forwarding_endpoint callback Header.remove h "content-length" |> fun h -> Header.remove h "connection" in let* answer = Cohttp_lwt_unix.Server.respond ~headers ~status ~body () in - Lwt.return (`Response answer) + Lwt.return (`Response answer)) else + let shutdown = + Log.debug (fun m -> + m + "(%s) conn closed for locally handled, do nothing: %s" + (Cohttp.Connection.to_string con) + (Uri.path (Request.uri req))) ; + fun () -> () + in + forwarder_resources.opened_conns <- + ForwarderConnMap.add con shutdown forwarder_resources.opened_conns ; let* () = match forwarder_events with | Some {on_locally_handled; _} -> on_locally_handled req @@ -126,7 +174,7 @@ let make_transform_callback ?ctx ?forwarder_events forwarding_endpoint callback Lwt.return answer let make_transform_callback_with_acl ~acl ?ctx ?forwarder_events - forwarding_endpoint callback conn req body = + forwarder_resources forwarding_endpoint callback conn req body = let allowed = let path = Resto.Utils.decode_split_path (Uri.path @@ Cohttp.Request.uri req) @@ -139,6 +187,7 @@ let make_transform_callback_with_acl ~acl ?ctx ?forwarder_events make_transform_callback ?ctx ?forwarder_events + forwarder_resources forwarding_endpoint callback conn @@ -185,16 +234,22 @@ let rpc_metrics_transform_callback ~update_metrics dir callback conn req body = (* Otherwise, the call must be done anyway. *) do_call () -let proxy_server_query_forwarder ?acl ?ctx ?forwarder_events forwarding_endpoint - = +let proxy_server_query_forwarder ?acl ?ctx ?forwarder_events forwarder_resources + forwarding_endpoint = match acl with | Some acl -> make_transform_callback_with_acl ~acl ?ctx ?forwarder_events + forwarder_resources + forwarding_endpoint + | None -> + make_transform_callback + ?ctx + ?forwarder_events + forwarder_resources forwarding_endpoint - | None -> make_transform_callback ?ctx ?forwarder_events forwarding_endpoint module Http_cache_headers = struct (* Using Regex to parse the url path is dirty. Instead, we want to re-use diff --git a/src/lib_rpc_http/RPC_middleware.mli b/src/lib_rpc_http/RPC_middleware.mli index ea3661f85f3a..c732e524c35e 100644 --- a/src/lib_rpc_http/RPC_middleware.mli +++ b/src/lib_rpc_http/RPC_middleware.mli @@ -37,6 +37,13 @@ type forwarder_events = { (without any forwarding). *) } +type forwarder_resources + +val init_forwarder : unit -> forwarder_resources + +val forwarding_conn_closed : + forwarder_resources -> 'a * Cohttp.Connection.t -> unit + (** A Resto middleware that transforms any callback to an other that rewrites queries that the proxy server cannot handle and forwards them to the full node at the given [Uri.t]. If [acl] parameter is provided, the forwarding @@ -45,6 +52,7 @@ val proxy_server_query_forwarder : ?acl:RPC_server.Acl.t -> ?ctx:Cohttp_lwt_unix.Net.ctx -> ?forwarder_events:forwarder_events -> + forwarder_resources -> Uri.t -> RPC_server.callback -> RPC_server.callback diff --git a/src/lib_rpc_http/RPC_server.ml b/src/lib_rpc_http/RPC_server.ml index 50530986972d..05aa39349799 100644 --- a/src/lib_rpc_http/RPC_server.ml +++ b/src/lib_rpc_http/RPC_server.ml @@ -354,6 +354,16 @@ module Max_active_rpc_connections = struct | Limited limit -> Format.fprintf ppf "%l" limit end +(* [launch] starts a Resto server which will handle the incoming connections. + If the client dies or drops the connection while we are handling the request, + this can lead to resources being spent on a request which is already abandoned. + Some requests are potentially endless, e.g., streamed RPC. In this case, + the resources are never released unless we are actively waiting for EOF. + The check for EOF is done periodically with [sleep_fn]. + The selected period is 1 second which is a balance between CPU load for checks + and resource usage for abandoned requests. *) +let eof_active_wait_delay = 1.0 + let launch ?host server ?conn_closed ?callback ?(max_active_connections = Max_active_rpc_connections.default) mode = (* TODO: backport max_active_connections in resto *) @@ -361,4 +371,10 @@ let launch ?host server ?conn_closed ?callback | Unlimited -> () | Limited max_active_connections -> Conduit_lwt_unix.set_max_active max_active_connections) ; - launch ?host server ?conn_closed ?callback mode + launch + ?host + server + ?conn_closed + ?callback + ~sleep_fn:(fun () -> Lwt_unix.sleep eof_active_wait_delay) + mode diff --git a/src/lib_rpc_process/forward_handler.ml b/src/lib_rpc_process/forward_handler.ml index 73e8a5d8d96a..cf6976006a49 100644 --- a/src/lib_rpc_process/forward_handler.ml +++ b/src/lib_rpc_process/forward_handler.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2023 Nomadic Labs. *) +(* Copyright (c) 2024 TriliTech *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -35,7 +36,7 @@ let build_socket_redirection_ctx socket_path = in Cohttp_lwt_unix.Client.custom_ctx ~resolver () -let callback ~acl server socket_path = +let callback ~acl server forwarder_resources socket_path = let callback (conn : Cohttp_lwt_unix.Server.conn) req body = Tezos_rpc_http_server.RPC_server.resto_callback server conn req body in @@ -51,5 +52,6 @@ let callback ~acl server socket_path = ~acl ~ctx ~forwarder_events:{on_forwarding; on_locally_handled} + forwarder_resources forwarding_endpoint callback diff --git a/src/lib_rpc_process/forward_handler.mli b/src/lib_rpc_process/forward_handler.mli index b4de110a1a80..870fd0c8f509 100644 --- a/src/lib_rpc_process/forward_handler.mli +++ b/src/lib_rpc_process/forward_handler.mli @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2023 Nomadic Labs. *) +(* Copyright (c) 2024 TriliTech *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -23,12 +24,16 @@ (* *) (*****************************************************************************) -(** [callback server socket_path] redirects all the traffic received by the given - [server] to the socket determined by [socket_path] if it fails to be - resolved locally. The forwarding happens only if the [acl] rules are - satisfied. *) +(** [callback server forwarder_resources socket_path] redirects all the traffic + received by the given [server] to the socket determined by [socket_path] if + it fails to be resolved locally. The forwarding happens only if the [acl] + rules are satisfied. *) val callback : - acl:RPC_server.Acl.t -> RPC_server.server -> string -> RPC_server.callback + acl:RPC_server.Acl.t -> + RPC_server.server -> + RPC_middleware.forwarder_resources -> + string -> + RPC_server.callback val socket_forwarding_uri : string diff --git a/src/lib_rpc_process/main.ml b/src/lib_rpc_process/main.ml index ffa3ffbbc82a..c835de639bc4 100644 --- a/src/lib_rpc_process/main.ml +++ b/src/lib_rpc_process/main.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2023 Nomadic Labs. *) +(* Copyright (c) 2024 TriliTech *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -126,6 +127,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) ~media_types:(Media_type.Command_line.of_command_line media_types) dir in + let forwarder_resources = RPC_middleware.init_forwarder () in let callback (conn : Cohttp_lwt_unix.Server.conn) req body = let path = Cohttp.Request.uri req |> Uri.path in if path = "/metrics" then @@ -135,11 +137,13 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) Forward_handler.callback ~acl server + forwarder_resources params.rpc_comm_socket_path conn req body in + let conn_closed = RPC_middleware.forwarding_conn_closed forwarder_resources in let update_metrics uri meth callback = Prometheus.Summary.(time (labels rpc_metrics [uri; meth]) Sys.time) callback in @@ -170,6 +174,7 @@ let launch_rpc_server dynamic_store (params : Parameters.t) (addr, port) RPC_server.launch ~host server + ~conn_closed ~callback ~max_active_connections:params.config.rpc.max_active_rpc_connections mode -- GitLab