From fc690d812e88f67b7d76cf1579ff22f13c92794d Mon Sep 17 00:00:00 2001 From: Thomas Letan Date: Sat, 2 Aug 2025 20:34:02 +0200 Subject: [PATCH] EVM Node: Use close function for monitoring streams * What This patch refactors the EVM node message monitoring service to include an explicit close function. This ensures that the underlying connection used for streaming blueprints is properly terminated when the stream is no longer needed. * Why Previously, the `blueprints_follower` would connect to a streamed RPC to monitor new blueprints. However, the function to close this connection was ignored. This could lead to resource leaks, such as dangling network connections, especially when the follower loop restarted due to timeouts or errors. * How A new generic `'a monitor` type is introduced in `Evm_services`. This type encapsulates both a data stream (`Lwt_stream.t`) and a `closefn` function. The `monitor_messages` service is updated to capture the close handle from the underlying RPC call and return it within this new `monitor` record. Consequently, the `blueprints_follower` is updated to work with this `monitor` type instead of a raw stream. It now explicitly calls `Evm_services.close` in all exit paths of its processing loop (e.g., on timeout, error, or restart), ensuring the connection is cleanly terminated. --- etherlink/CHANGES_NODE.md | 3 +++ .../bin_node/lib_dev/blueprints_follower.ml | 18 +++++++++++------- etherlink/bin_node/lib_dev/evm_services.ml | 10 ++++++++-- etherlink/bin_node/lib_dev/evm_services.mli | 8 +++++++- 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 279cc1ca18c7..2bfcababaea5 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -16,6 +16,9 @@ - Makes the sequencer resilient to GCP KMS authentication tokens invalidation. (!18901) +- Fixes a connection leak in the blueprint follower where the monitoring stream + was not closed if an internal callback failed. This prevents resource + exhaustion over time and improves the node's long-term stability. ### Storage changes diff --git a/etherlink/bin_node/lib_dev/blueprints_follower.ml b/etherlink/bin_node/lib_dev/blueprints_follower.ml index b8eb190b1d2d..4135c9961b6c 100644 --- a/etherlink/bin_node/lib_dev/blueprints_follower.ml +++ b/etherlink/bin_node/lib_dev/blueprints_follower.ml @@ -218,12 +218,12 @@ let rec catchup ~multichain ~next_blueprint_number ~first_connection params : in match call_result with - | Ok blueprints_stream -> + | Ok monitor -> (stream_loop [@tailcall]) ~multichain next_blueprint_number params - blueprints_stream + monitor | Error _ -> (catchup [@tailcall]) ~multichain @@ -231,13 +231,13 @@ let rec catchup ~multichain ~next_blueprint_number ~first_connection params : ~first_connection:false params) -and stream_loop ~multichain (Qty next_blueprint_number) params stream = +and stream_loop ~multichain (Qty next_blueprint_number) params monitor = let open Lwt_result_syntax in Metrics.stop_bootstrapping () ; let*! candidate = Lwt.pick [ - (let*! res = Lwt_stream.get stream in + (let*! res = Evm_services.get_from_monitor monitor in return res); timeout_from_tbb params.time_between_blocks; ] @@ -251,7 +251,7 @@ and stream_loop ~multichain (Qty next_blueprint_number) params stream = ~multichain (Qty next_blueprint_number) params - stream + monitor | Ok (Some (Blueprint blueprint)) -> ( let* r = params.on_new_blueprint (Qty next_blueprint_number) blueprint in let* () = @@ -264,8 +264,9 @@ and stream_loop ~multichain (Qty next_blueprint_number) params stream = ~multichain (Qty (Z.succ next_blueprint_number)) params - stream + monitor | `Restart_from level -> + Evm_services.close_monitor monitor ; (catchup [@tailcall]) ~multichain ~next_blueprint_number:level @@ -276,12 +277,15 @@ and stream_loop ~multichain (Qty next_blueprint_number) params stream = true params) | Ok None | Error [Timeout] -> + Evm_services.close_monitor monitor ; (catchup [@tailcall]) ~multichain ~next_blueprint_number:(Qty next_blueprint_number) ~first_connection:false params - | Error err -> fail err + | Error err -> + Evm_services.close_monitor monitor ; + fail err let start ?(ping_tx_pool = true) ~multichain ~time_between_blocks ~evm_node_endpoint ~next_blueprint_number ~on_new_blueprint diff --git a/etherlink/bin_node/lib_dev/evm_services.ml b/etherlink/bin_node/lib_dev/evm_services.ml index 053240a0e3f0..239a53662d21 100644 --- a/etherlink/bin_node/lib_dev/evm_services.ml +++ b/etherlink/bin_node/lib_dev/evm_services.ml @@ -371,11 +371,17 @@ let monitor_blueprints ~evm_node_endpoint Ethereum_types.(Qty level) = in return stream +type 'a monitor = {stream : 'a Lwt_stream.t; closefn : unit -> unit} + +let close_monitor {closefn; _} = closefn () + +let get_from_monitor {stream; _} = Lwt_stream.get stream + let monitor_messages ~evm_node_endpoint Ethereum_types.(Qty level) = let open Lwt_result_syntax in let stream, push = Lwt_stream.create () in let on_chunk v = push (Some v) and on_close () = push None in - let* _spill_all = + let* closefn = Tezos_rpc_http_client_unix.RPC_client_unix.call_streamed_service [Media_type.octet_stream] ~base:evm_node_endpoint @@ -386,4 +392,4 @@ let monitor_messages ~evm_node_endpoint Ethereum_types.(Qty level) = (Z.to_int64 level) () in - return stream + return {stream; closefn} diff --git a/etherlink/bin_node/lib_dev/evm_services.mli b/etherlink/bin_node/lib_dev/evm_services.mli index 345807579bdd..96d1ec065431 100644 --- a/etherlink/bin_node/lib_dev/evm_services.mli +++ b/etherlink/bin_node/lib_dev/evm_services.mli @@ -5,6 +5,12 @@ (* *) (*****************************************************************************) +type 'a monitor + +val close_monitor : 'a monitor -> unit + +val get_from_monitor : 'a monitor -> 'a option Lwt.t + val get_smart_rollup_address : keep_alive:bool -> evm_node_endpoint:Uri.t -> @@ -60,4 +66,4 @@ val monitor_blueprints : val monitor_messages : evm_node_endpoint:Uri.t -> Ethereum_types.quantity -> - Broadcast.message Lwt_stream.t tzresult Lwt.t + Broadcast.message monitor tzresult Lwt.t -- GitLab