From af99358b88e2486d0a93bde0abd4bdd18d984076 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 7 Nov 2024 11:07:42 +0100 Subject: [PATCH 1/9] EVM/Node: prepare subscribe related features to be dispatched --- etherlink/bin_node/lib_dev/services.ml | 53 ++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 55fe05a29d99..49fcc3b9b874 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -213,6 +213,59 @@ let block_transaction_count block = | TxHash l -> List.length l | TxFull l -> List.length l +type sub_stream = { + kind : Ethereum_types.Subscription.kind; + stream : Ethereum_types.Subscription.output Lwt_stream.t; + stopper : unit -> unit; +} + +let subscriptions : + (Ethereum_types.Subscription.id, sub_stream) Stdlib.Hashtbl.t = + (* 10 seems like a reasonable number since there is only + four types of subscription, and only `logs` make sense + to be sent multiple times. *) + Stdlib.Hashtbl.create 10 + +let () = Random.self_init () + +let encode_id bytes = + let id_hex = Hex.of_bytes bytes |> Hex.show in + let buf = Buffer.create (String.length id_hex) in + (* Trimming leading zeros. *) + String.fold_left + (fun only_zeros -> function + | '0' when only_zeros -> only_zeros + | c -> + Buffer.add_char buf c ; + false) + true + id_hex + |> ignore ; + Buffer.contents buf + +let make_id ~id = Ethereum_types.Subscription.(Id (Hex id)) + +(* [generate_id]'s implementation is inspired by geth's one. + See: + https://github.com/ethereum/go-ethereum/blob/master/rpc/subscription.go *) +let generate_id () = + let id = Bytes.make 16 '\000' in + Bytes.iteri (fun i _ -> Bytes.set_uint8 id i (Random.int 256)) id ; + encode_id id + +let eth_subscribe ~kind = + let id = make_id ~id:(generate_id ()) in + Stdlib.Hashtbl.add subscriptions id kind ; + id + +let eth_unsubscribe ~id = + match Stdlib.Hashtbl.find_opt subscriptions id with + | Some {stopper; _} -> + stopper () ; + Stdlib.Hashtbl.remove subscriptions id ; + true + | None -> false + let decode : type a. (module METHOD with type input = a) -> Data_encoding.json -> a = fun (module M) v -> Data_encoding.Json.destruct M.input_encoding v -- GitLab From acecfd3df28a3312aea0c3642d928b5e32275d18 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Fri, 8 Nov 2024 11:17:30 +0100 Subject: [PATCH 2/9] EVM/Node: add a watcher for newHeads events --- etherlink/bin_node/lib_dev/evm_context.ml | 4 ++++ etherlink/bin_node/lib_dev/evm_context.mli | 3 +++ etherlink/bin_node/lib_dev/services.ml | 25 ++++++++++++++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index 1274d7e7a8c6..75c2a5d45ca5 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -132,6 +132,9 @@ let lock_data_dir ~data_dir = in return_unit +let head_watcher : Ethereum_types.Subscription.output Lwt_watcher.input = + Lwt_watcher.create_input () + module State = struct let current_blueprint_number ctxt = let (Qty next_blueprint_number) = ctxt.session.next_blueprint_number in @@ -759,6 +762,7 @@ module State = struct ctxt.session.context <- context ; ctxt.session.next_blueprint_number <- Qty (Z.succ level) ; ctxt.session.current_block_hash <- Ethereum_types.(block.hash) ; + Lwt_watcher.notify head_watcher (Ethereum_types.Subscription.NewHeads block) ; Option.iter (fun (split_level, split_timestamp) -> ctxt.session.last_split_block <- Some (split_level, split_timestamp)) diff --git a/etherlink/bin_node/lib_dev/evm_context.mli b/etherlink/bin_node/lib_dev/evm_context.mli index f61ad3cbb9f8..f940036ccce4 100644 --- a/etherlink/bin_node/lib_dev/evm_context.mli +++ b/etherlink/bin_node/lib_dev/evm_context.mli @@ -190,3 +190,6 @@ module State : sig (** Path of EVM state store. *) val store_path : data_dir:string -> string end + +(** Watcher that gets notified each time a new block is produced. *) +val head_watcher : Ethereum_types.Subscription.output Lwt_watcher.input diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 49fcc3b9b874..2c3b01dd1ddf 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -253,16 +253,33 @@ let generate_id () = Bytes.iteri (fun i _ -> Bytes.set_uint8 id i (Random.int 256)) id ; encode_id id -let eth_subscribe ~kind = +let eth_subscribe ~(kind : Ethereum_types.Subscription.kind) = let id = make_id ~id:(generate_id ()) in - Stdlib.Hashtbl.add subscriptions id kind ; - id + let stream, stopper = + match kind with + | NewHeads -> Lwt_watcher.create_stream Evm_context.head_watcher + | Logs _ -> + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7640 *) + Stdlib.failwith "The websocket event [logs] is not implemented yet." + | NewPendingTransactions -> + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7641 *) + Stdlib.failwith + "The websocket event [newPendingTransactions] is not implemented yet." + | Syncing -> + (* TODO: https://gitlab.com/tezos/tezos/-/issues/7642 *) + Stdlib.failwith "The websocket event [syncing] is not implemented yet." + in + let stopper () = + Stdlib.Hashtbl.remove subscriptions id ; + Lwt_watcher.shutdown stopper + in + Stdlib.Hashtbl.add subscriptions id {kind; stream; stopper} ; + (id, (stream, stopper)) let eth_unsubscribe ~id = match Stdlib.Hashtbl.find_opt subscriptions id with | Some {stopper; _} -> stopper () ; - Stdlib.Hashtbl.remove subscriptions id ; true | None -> false -- GitLab From 0ca4b8ab68849524be8eb0b386d4c12f4150c775 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 11 Dec 2024 10:31:26 +0100 Subject: [PATCH 3/9] EVM/Node: handle websocket stream/event Co-authored-by: default avatarAlain Mebsout --- etherlink/bin_node/lib_dev/services.ml | 62 +++++++++++++++++++++++--- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 2c3b01dd1ddf..9be4bfc6cade 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -942,6 +942,54 @@ let dispatch_handler (rpc : Configuration.rpc) config ctx dispatch_request let websocket_response_of_response response = {response; subscription = None} +let encode_subscription_response subscription r = + let result = + Data_encoding.Json.construct Ethereum_types.Subscription.output_encoding r + in + Rpc_encodings.Subscription.{params = {result; subscription}} + +let empty_stream = + let stream, push = Lwt_stream.create () in + push None ; + (stream, fun () -> ()) + +let empty_sid = Ethereum_types.(Subscription.Id (Hex "")) + +let dispatch_websocket (rpc : Configuration.rpc) config ctx + (input : JSONRPC.request) = + let open Lwt_syntax in + match map_method_name ~restrict:rpc.restricted_rpcs input.method_ with + | Method (Subscribe.Method, module_) -> + let sub_stream = ref empty_stream in + let sid = ref empty_sid in + let f (kind : Ethereum_types.Subscription.kind) = + let id, stream = eth_subscribe ~kind in + (* This is an optimization to avoid having to search in the map + of subscriptions for `stream` and `id`. *) + sub_stream := stream ; + sid := id ; + rpc_ok id + in + let* value = build_with_input ~f module_ input.parameters in + let response = JSONRPC.{value; id = input.id} in + let subscription_id = !sid in + let stream, stopper = !sub_stream in + let stream = + Lwt_stream.map (encode_subscription_response subscription_id) stream + in + return + {response; subscription = Some {id = subscription_id; stream; stopper}} + | Method (Unsubscribe.Method, module_) -> + let f (id : Ethereum_types.Subscription.id) = + let status = eth_unsubscribe ~id in + rpc_ok status + in + let+ value = build_with_input ~f module_ input.parameters in + websocket_response_of_response JSONRPC.{value; id = input.id} + | _ -> + let+ response = dispatch_request rpc config ctx input in + websocket_response_of_response response + let dispatch_private_websocket ~block_production (rpc : Configuration.rpc) config ctx (input : JSONRPC.request) = let open Lwt_syntax in @@ -969,16 +1017,16 @@ let dispatch_private (rpc : Configuration.rpc) ~block_production config ctx dir (dispatch_private_request rpc ~block_production) let generic_websocket_dispatch (rpc : Configuration.rpc) - (config : Configuration.t) ctx dir path dispatch_request = + (config : Configuration.t) ctx dir path dispatch_websocket = if config.experimental_features.enable_websocket then - Evm_directory.jsonrpc_websocket_register dir path (fun request -> - let open Lwt_syntax in - let+ response = dispatch_request rpc config ctx request in - websocket_response_of_response response) + Evm_directory.jsonrpc_websocket_register + dir + path + (dispatch_websocket rpc config ctx) else dir let dispatch_websocket_public (rpc : Configuration.rpc) config ctx dir = - generic_websocket_dispatch rpc config ctx dir "/ws" dispatch_request + generic_websocket_dispatch rpc config ctx dir "/ws" dispatch_websocket let dispatch_websocket_private (rpc : Configuration.rpc) ~block_production config ctx dir = @@ -988,7 +1036,7 @@ let dispatch_websocket_private (rpc : Configuration.rpc) ~block_production ctx dir "/private/ws" - (dispatch_private_request ~block_production) + (dispatch_private_websocket ~block_production) let directory ?delegate_health_check_to rpc config ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) = -- GitLab From 7a9a65104c7d279ea6c65398849d804917a5df83 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Tue, 10 Dec 2024 16:02:23 +0100 Subject: [PATCH 4/9] EVM/Tezt: add helpers to subscribe/unsubscribe from websocket event --- etherlink/tezt/lib/rpc.ml | 41 ++++++++++++++++++++++++++++++++++++++ etherlink/tezt/lib/rpc.mli | 21 +++++++++++++++++++ 2 files changed, 62 insertions(+) diff --git a/etherlink/tezt/lib/rpc.ml b/etherlink/tezt/lib/rpc.ml index 777437720c8b..e812942b2295 100644 --- a/etherlink/tezt/lib/rpc.ml +++ b/etherlink/tezt/lib/rpc.ml @@ -213,6 +213,35 @@ module Request = struct } let coinbase = {method_ = "eth_coinbase"; parameters = `Null} + + type logs_input_param = {address : string; topics : string list} + + type subscription_kind = + | NewHeads + | Logs of logs_input_param + | NewPendingTransactions + | Syncing + + let param_of_sub_kind = function + | NewHeads -> `A [`String "newHeads"] + | Logs {address; topics} -> + `A + [ + `String "logs"; + `O + [ + ("address", `String address); + ("topics", `A (List.map (fun topic -> `String topic) topics)); + ]; + ] + | NewPendingTransactions -> `A [`String "newPendingTransactions"] + | Syncing -> `A [`String "syncing"] + + let eth_subscribe ~kind = + {method_ = "eth_subscribe"; parameters = param_of_sub_kind kind} + + let eth_unsubscribe ~id = + {method_ = "eth_unsubscribe"; parameters = `A [`String id]} end let net_version ?websocket evm_node = @@ -291,6 +320,18 @@ let get_gas_price ?websocket evm_node = in return JSON.(json |-> "result" |> as_string |> Int32.of_string) +let subscribe ?websocket ~kind evm_node = + let* json = + Evm_node.jsonrpc ?websocket evm_node (Request.eth_subscribe ~kind) + in + return JSON.(json |-> "result" |> as_string) + +let unsubscribe ?websocket ~id evm_node = + let* json = + Evm_node.jsonrpc ?websocket evm_node (Request.eth_unsubscribe ~id) + in + return JSON.(json |-> "result" |> as_bool) + module Syntax = struct let ( let*@ ) x f = let* r = x in diff --git a/etherlink/tezt/lib/rpc.mli b/etherlink/tezt/lib/rpc.mli index 740de96cc1a9..1d125f9b5032 100644 --- a/etherlink/tezt/lib/rpc.mli +++ b/etherlink/tezt/lib/rpc.mli @@ -59,6 +59,18 @@ module Request : sig block_count:string -> newest_block:string -> Evm_node.request val coinbase : Evm_node.request + + type logs_input_param = {address : string; topics : string list} + + type subscription_kind = + | NewHeads + | Logs of logs_input_param + | NewPendingTransactions + | Syncing + + val eth_subscribe : kind:subscription_kind -> Evm_node.request + + val eth_unsubscribe : id:string -> Evm_node.request end (** {2 RPC calls wrappers} @@ -116,6 +128,15 @@ val get_block_by_hash : val get_gas_price : ?websocket:Websocket.t -> Evm_node.t -> Int32.t Lwt.t +val subscribe : + ?websocket:Websocket.t -> + kind:Request.subscription_kind -> + Evm_node.t -> + string Lwt.t + +val unsubscribe : + ?websocket:Websocket.t -> id:string -> Evm_node.t -> bool Lwt.t + module Syntax : sig val ( let*@ ) : ('a, error) result Lwt.t -> ('a -> 'c Lwt.t) -> 'c Lwt.t -- GitLab From f6a1e951be1f0d77fbd89caaa05e1840d77a416a Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 11 Dec 2024 11:49:43 +0100 Subject: [PATCH 5/9] EVM/Tezt: enable websocket for observers --- etherlink/tezt/lib/setup.ml | 23 ++++++++++++++++++++--- etherlink/tezt/lib/setup.mli | 2 ++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/etherlink/tezt/lib/setup.ml b/etherlink/tezt/lib/setup.ml index 7adaadc2b39e..002e5fc89ae3 100644 --- a/etherlink/tezt/lib/setup.ml +++ b/etherlink/tezt/lib/setup.ml @@ -130,7 +130,7 @@ let run_new_rpc_endpoint evm_node = return rpc_node let run_new_observer_node ?(finalized_view = false) ?(patch_config = Fun.id) - ~sc_rollup_node evm_node = + ~sc_rollup_node ?rpc_server ?websockets evm_node = let preimages_dir = Evm_node.preimages_dir evm_node in let initial_kernel = Evm_node.initial_kernel evm_node in let patch_config = @@ -142,6 +142,14 @@ let run_new_observer_node ?(finalized_view = false) ?(patch_config = Fun.id) (patch_config json)) else patch_config in + let patch_config = + match rpc_server with + | None -> patch_config + | Some rpc_server -> + fun c -> + Evm_node.patch_config_with_experimental_feature ~rpc_server () + @@ patch_config c + in let* observer_mode = if Evm_node.supports_threshold_encryption evm_node then let bundler = @@ -167,7 +175,11 @@ let run_new_observer_node ?(finalized_view = false) ?(patch_config = Fun.id) }) in let* observer = - Evm_node.init ~patch_config ~mode:observer_mode (Evm_node.endpoint evm_node) + Evm_node.init + ~patch_config + ~mode:observer_mode + ?websockets + (Evm_node.endpoint evm_node) in return observer @@ -330,7 +342,12 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime (Sc_rollup_node.endpoint sc_rollup_node) in let* observer = - run_new_observer_node ~patch_config ~sc_rollup_node sequencer + run_new_observer_node + ~patch_config + ~sc_rollup_node + ?rpc_server + ?websockets + sequencer in let* proxy = Evm_node.init diff --git a/etherlink/tezt/lib/setup.mli b/etherlink/tezt/lib/setup.mli index a355b035e7ba..2c8a94c62c3f 100644 --- a/etherlink/tezt/lib/setup.mli +++ b/etherlink/tezt/lib/setup.mli @@ -47,6 +47,8 @@ val run_new_observer_node : ?finalized_view:bool -> ?patch_config:(Tezt_wrapper.JSON.t -> Tezt_wrapper.JSON.t) -> sc_rollup_node:Sc_rollup_node.t -> + ?rpc_server:Evm_node.rpc_server -> + ?websockets:bool -> Evm_node.t -> Evm_node.t Lwt.t -- GitLab From 68a25f65c198e215492992a422e0355b09fe849e Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 11 Dec 2024 11:06:49 +0100 Subject: [PATCH 6/9] EVM/Tezt: websocket subscription rpcs can't be called via http requests --- etherlink/tezt/tests/evm_sequencer.ml | 31 ++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 06f912596a1e..2ef8effcaf22 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -8340,6 +8340,33 @@ let test_websocket_rpcs = ~error_msg:"eth_coinbase returned %L, expected %R" ; unit +let test_websocket_subscription_rpcs_cant_be_called_via_http_requests = + register_all + ~tags:["evm"; "rpc"; "websocket"; "http"] + ~title: + "Check that subscriptions rpcs can't be called via regular http requests" + ~time_between_blocks:Nothing + ~bootstrap_accounts: + ((Array.to_list Eth_account.bootstrap_accounts + |> List.map (fun a -> a.Eth_account.address)) + @ Eth_account.lots_of_address) + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + ~rpc_server:Dream (* Websockets only available in Dream *) + ~websockets:true + @@ fun {sequencer; _} _protocol -> + let* () = + Lwt.catch + (fun () -> + let* _ = Rpc.subscribe ~kind:NewHeads sequencer in + failwith "eth_subscribe shouldn't be callable via http requests") + (fun _ -> unit) + in + Lwt.catch + (fun () -> + let* _ = Rpc.unsubscribe ~id:"0x0" sequencer in + failwith "eth_unsubscribe shouldn't be callable via http requests") + (fun _ -> unit) + let protocols = Protocol.all let () = @@ -8452,4 +8479,6 @@ let () = test_inconsistent_da_fees protocols ; test_produce_block_with_no_delayed_transactions protocols ; test_observer_reset [Protocol.Alpha] ; - test_websocket_rpcs [Protocol.Alpha] + test_websocket_rpcs [Protocol.Alpha] ; + test_websocket_subscription_rpcs_cant_be_called_via_http_requests + [Protocol.Alpha] \ No newline at end of file -- GitLab From f2a5acf008145a485bbba09d153a47af5d9d3844 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Tue, 10 Dec 2024 16:02:46 +0100 Subject: [PATCH 7/9] EVM/Tezt: test the newHeads websocket event --- etherlink/tezt/tests/evm_sequencer.ml | 58 ++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 2ef8effcaf22..5d7f15f3f0f0 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -8367,6 +8367,61 @@ let test_websocket_subscription_rpcs_cant_be_called_via_http_requests = failwith "eth_unsubscribe shouldn't be callable via http requests") (fun _ -> unit) +let test_websocket_newHeads_event = + register_all + ~tags:["evm"; "rpc"; "websocket"; "new_heads"] + ~title:"Check that websocket event `newHeads` is behaving correctly" + ~time_between_blocks:Nothing + ~bootstrap_accounts: + ((Array.to_list Eth_account.bootstrap_accounts + |> List.map (fun a -> a.Eth_account.address)) + @ Eth_account.lots_of_address) + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + ~rpc_server:Dream (* Websockets only available in Dream *) + ~websockets:true + @@ fun {sequencer; observer; _} _protocol -> + let scenario evm_node (lvl, lvl') = + let* websocket = Evm_node.open_websocket evm_node in + let* id = Rpc.subscribe ~websocket ~kind:NewHeads evm_node in + let check_block_number ~level = + let*@ _ = produce_block sequencer in + let* block = Websocket.recv websocket in + let Block.{number; _} = + JSON.(block |-> "params" |-> "result" |> Block.of_json) + in + Check.((number = level) int32) + ~error_msg:"Received block level was %L, expected %R" ; + unit + in + let* () = check_block_number ~level:lvl in + let* () = check_block_number ~level:lvl' in + let* sub_status = Rpc.unsubscribe ~websocket ~id evm_node in + Check.((sub_status = true) bool) + ~error_msg:"Unsubscription from newHeads should be successful" ; + (* After unsubbing to newHeads, we shouldn't receive data anymore. *) + let*@ _ = produce_block sequencer in + let* result = + Lwt.pick + [ + (let* never_return = Websocket.recv websocket in + return (Error never_return)); + (let* () = Lwt_unix.sleep 2. in + return (Ok ())); + ] + in + (match result with + | Ok () -> () + | Error _ -> failwith "The websocket shouldn't have received any new data.") ; + (* If we try to unsubscribe from this event again, it should return false as + we were already unsubbed. *) + let* sub_status = Rpc.unsubscribe ~websocket ~id evm_node in + Check.((sub_status = false) bool) + ~error_msg:"Unsubscribing from the same event twice should return false" ; + unit + in + let* () = scenario sequencer (1l, 2l) in + scenario observer (4l, 5l) + let protocols = Protocol.all let () = @@ -8481,4 +8536,5 @@ let () = test_observer_reset [Protocol.Alpha] ; test_websocket_rpcs [Protocol.Alpha] ; test_websocket_subscription_rpcs_cant_be_called_via_http_requests - [Protocol.Alpha] \ No newline at end of file + [Protocol.Alpha] ; + test_websocket_newHeads_event [Protocol.Alpha] -- GitLab From 82d26cf467cd76c04dc09850670d590c3d3aaf75 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Mon, 16 Dec 2024 13:52:44 +0100 Subject: [PATCH 8/9] EVM/Tezt: test clean up of subscription on disconnection --- etherlink/tezt/tests/evm_sequencer.ml | 36 ++++++++++++++++++++++++++- tezt/lib_tezos/websocket.ml | 3 ++- 2 files changed, 37 insertions(+), 2 deletions(-) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 5d7f15f3f0f0..1465da507e8d 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -8422,6 +8422,39 @@ let test_websocket_newHeads_event = let* () = scenario sequencer (1l, 2l) in scenario observer (4l, 5l) +(* This test is flaky because Dream may not correctly detect websocket + connections closed by the client. *) +let test_websocket_cleanup = + register_all + ~tags:["evm"; "rpc"; "websocket"; "cleanup"; Tag.flaky] + ~title:"Check that websocket subscriptions are cleaned up on close" + ~time_between_blocks:Nothing + ~bootstrap_accounts: + ((Array.to_list Eth_account.bootstrap_accounts + |> List.map (fun a -> a.Eth_account.address)) + @ Eth_account.lots_of_address) + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + ~rpc_server:Dream (* Websockets only available in Dream *) + ~websockets:true + @@ fun {sequencer; _} _protocol -> + let* websocket = Evm_node.open_websocket sequencer in + let* id1 = Rpc.subscribe ~websocket ~kind:NewHeads sequencer in + let* id2 = Rpc.subscribe ~websocket ~kind:NewHeads sequencer in + let*@ _ = produce_block sequencer in + let*@ _ = produce_block sequencer in + Log.info "Closing websocket" ; + let* () = Websocket.close websocket in + let* () = Lwt_unix.sleep 1. in + Log.info "New websocket connection" ; + let* websocket = Evm_node.open_websocket sequencer in + let* sub_status = Rpc.unsubscribe ~websocket ~id:id2 sequencer in + Check.((sub_status = false) bool) + ~error_msg:"Subscription should have been cleaned up from node" ; + let* sub_status = Rpc.unsubscribe ~websocket ~id:id1 sequencer in + Check.((sub_status = false) bool) + ~error_msg:"All subscriptions should have been cleaned up from node" ; + unit + let protocols = Protocol.all let () = @@ -8537,4 +8570,5 @@ let () = test_websocket_rpcs [Protocol.Alpha] ; test_websocket_subscription_rpcs_cant_be_called_via_http_requests [Protocol.Alpha] ; - test_websocket_newHeads_event [Protocol.Alpha] + test_websocket_newHeads_event [Protocol.Alpha] ; + test_websocket_cleanup [Protocol.Alpha] diff --git a/tezt/lib_tezos/websocket.ml b/tezt/lib_tezos/websocket.ml index 61ed1574276b..61fd9970124e 100644 --- a/tezt/lib_tezos/websocket.ml +++ b/tezt/lib_tezos/websocket.ml @@ -25,7 +25,7 @@ let connect ?runner ?hooks ?name url = in let url = Uri.with_scheme (Uri.of_string url) (Some "ws") |> Uri.to_string in let process, stdin = - Process.spawn_with_stdin ~name ?runner ?hooks "websocat" [url] + Process.spawn_with_stdin ~name ?runner ?hooks "websocat" [url; "-E"] in let () = try Unix.kill (Process.pid process) 0 @@ -63,6 +63,7 @@ let read_json ~origin {process; _} = let close ws = let* () = Lwt_io.close ws.stdin in Process.terminate ws.process ; + let* _ = Process.wait ws.process in unit let send = -- GitLab From b759edcb1028dd5edf98d0ac1566ae09e0cef0a8 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Tue, 17 Dec 2024 10:31:07 +0100 Subject: [PATCH 9/9] EVM/Changes: add an entry in Experimental --- etherlink/CHANGES_NODE.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 0e01cb4dcd54..4fa02945af44 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -82,7 +82,9 @@ you start using them, you probably want to use `octez-evm-node check config - Experimental support for alternative RPC server backend [Dream](https://aantron.github.io/dream) with feature flag `experimental_features.rpc_server = "dream"`. (!15560) - +- Added support for the WebSocket event `newHeads`, allowing clients to receive + real-time notifications of new blocks. (!15899) + ### Bug fixes #### RPCs -- GitLab