diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 1c191df55ae7eeecfcec1aec877f6093c006525b..0e01cb4dcd54febc41679f036d0969ac5a2365cf 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -29,6 +29,12 @@ - Fixes a Wasm Runtime host function that would copy only instead of moving a subtree. (!16009) +#### Experimental + +- Experimental support for websockets on endpoints (`/ws` and `/private/ws`) for + JSON-RPC requests with feature flag `experimental_features.enable_websocket = + true`. (!15566) + ### Internals ## Version 0.11 (2024-12-111) diff --git a/etherlink/bin_node/lib_dev/events.ml b/etherlink/bin_node/lib_dev/events.ml index e69fc268c5adfa27f06cebd964cf6b33607c95f1..c9bc6f54e6ebb9b1a249d67a069bbacff553d88a 100644 --- a/etherlink/bin_node/lib_dev/events.ml +++ b/etherlink/bin_node/lib_dev/events.ml @@ -66,26 +66,34 @@ let catching_up_evm_event = ("to", Data_encoding.int32) let event_is_ready = - Internal_event.Simple.declare_3 + Internal_event.Simple.declare_4 ~section ~name:"is_ready" - ~msg:"the EVM node RPC server ({backend}) is listening to {addr}:{port}" + ~msg: + "the EVM node RPC server ({backend}) is listening to {addr}:{port} \ + {websockets}" ~level:Notice ("addr", Data_encoding.string) ("port", Data_encoding.uint16) ("backend", Configuration.rpc_server_encoding) + ("websockets", Data_encoding.bool) + ~pp4:(fun fmt b -> + (if b then Format.fprintf else Format.ifprintf) fmt "(websockets enabled)") let event_private_server_is_ready = - declare_3 + declare_4 ~section ~name:"private_server_is_ready" ~msg: "the EVM node private RPC server ({backend}) is listening to \ - {addr}:{port}" + {addr}:{port} {websockets}" ~level:Notice ("addr", Data_encoding.string) ("port", Data_encoding.uint16) ("backend", Configuration.rpc_server_encoding) + ("websockets", Data_encoding.bool) + ~pp4:(fun fmt b -> + (if b then Format.fprintf else Format.ifprintf) fmt "(websockets enabled)") let event_rpc_server_error = declare_1 @@ -259,11 +267,11 @@ let ignored_kernel_arg () = emit ignored_kernel_arg () let catching_up_evm_event ~from ~to_ = emit catching_up_evm_event (from, to_) -let is_ready ~rpc_addr ~rpc_port ~backend = - emit event_is_ready (rpc_addr, rpc_port, backend) +let is_ready ~rpc_addr ~rpc_port ~websockets ~backend = + emit event_is_ready (rpc_addr, rpc_port, backend, websockets) -let private_server_is_ready ~rpc_addr ~rpc_port ~backend = - emit event_private_server_is_ready (rpc_addr, rpc_port, backend) +let private_server_is_ready ~rpc_addr ~rpc_port ~websockets ~backend = + emit event_private_server_is_ready (rpc_addr, rpc_port, backend, websockets) let rpc_server_error exn = emit__dont_wait__use_with_care event_rpc_server_error (Printexc.to_string exn) diff --git a/etherlink/bin_node/lib_dev/events.mli b/etherlink/bin_node/lib_dev/events.mli index 5f30c175e23dd5d64db6554ac4f858aa28425ec9..c81f31327aa861a364cbcdbe90f0c845a2f338f7 100644 --- a/etherlink/bin_node/lib_dev/events.mli +++ b/etherlink/bin_node/lib_dev/events.mli @@ -38,20 +38,22 @@ val ignored_kernel_arg : unit -> unit Lwt.t node from L1 level [from] to [to_]. *) val catching_up_evm_event : from:int32 -> to_:int32 -> unit Lwt.t -(** [is_ready ~rpc_addr ~rpc_port ~backend] advertises that the +(** [is_ready ~rpc_addr ~rpc_port ~websockets ~backend] advertises that the sequencer is ready and listens to [rpc_addr]:[rpc_port]. *) val is_ready : rpc_addr:string -> rpc_port:int -> + websockets:bool -> backend:Configuration.rpc_server -> unit Lwt.t -(** [private_server_is_ready ~rpc_addr ~rpc_port ~backend] +(** [private_server_is_ready ~rpc_addr ~rpc_port ~websockets ~backend] advertises that the private rpc server is ready and listens to [rpc_addr]:[rpc_port]. *) val private_server_is_ready : rpc_addr:string -> rpc_port:int -> + websockets:bool -> backend:Configuration.rpc_server -> unit Lwt.t diff --git a/etherlink/bin_node/lib_dev/evm_directory.ml b/etherlink/bin_node/lib_dev/evm_directory.ml index 1238dc1ad404fc644c71ee9fb592b4c1e9c664a4..5e4019920dd24b640fef60038ece776e6e8c96cd 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.ml +++ b/etherlink/bin_node/lib_dev/evm_directory.ml @@ -6,15 +6,33 @@ (* *) (*****************************************************************************) -type t = Resto of unit Tezos_rpc.Directory.t | Dream of Dream.route list +module EndpointMap = Map.Make (struct + type t = Cohttp.Code.meth * string + + let compare = Stdlib.compare +end) + +type resto_dir = { + dir : unit Tezos_rpc.Directory.t; + extra : + (Cohttp_lwt_unix.Server.conn -> + Cohttp.Request.t -> + Cohttp_lwt.Body.t -> + Cohttp_lwt_unix.Server.response_action Lwt.t) + EndpointMap.t; +} + +type t = Resto of resto_dir | Dream of Dream.route list let empty = function - | Configuration.Resto -> Resto Tezos_rpc.Directory.empty + | Configuration.Resto -> + Resto {dir = Tezos_rpc.Directory.empty; extra = EndpointMap.empty} | Configuration.Dream -> Dream [] let register dir service handler = match dir with - | Resto dir -> Resto (Tezos_rpc.Directory.register dir service handler) + | Resto {dir; extra} -> + Resto {dir = Tezos_rpc.Directory.register dir service handler; extra} | Dream routes -> let route = Router.make_tz_route service (fun ~params ~query input -> @@ -24,7 +42,8 @@ let register dir service handler = let opt_register dir service handler = match dir with - | Resto dir -> Resto (Tezos_rpc.Directory.opt_register dir service handler) + | Resto {dir; extra} -> + Resto {dir = Tezos_rpc.Directory.opt_register dir service handler; extra} | Dream routes -> let route = Router.make_opt_tz_route service (fun ~params ~query input -> @@ -34,7 +53,8 @@ let opt_register dir service handler = let lwt_register dir service handler = match dir with - | Resto dir -> Resto (Tezos_rpc.Directory.lwt_register dir service handler) + | Resto {dir; extra} -> + Resto {dir = Tezos_rpc.Directory.lwt_register dir service handler; extra} | Dream routes -> let route = Router.make_route service (fun ~params ~query input -> @@ -45,14 +65,14 @@ let lwt_register dir service handler = let streamed_register dir service handler = let open Lwt_syntax in match dir with - | Resto dir -> + | Resto {dir; extra} -> let dir = Tezos_rpc.Directory.gen_register dir service (fun params query input -> let* stream, shutdown = handler params query input in let next () = Lwt_stream.get stream in Tezos_rpc.Answer.return_stream {next; shutdown}) in - Resto dir + Resto {dir; extra} | Dream routes -> let route = Router.make_stream_route service (fun ~params ~query input -> @@ -60,6 +80,28 @@ let streamed_register dir service handler = in Dream (route :: routes) +let register_metrics path dir = + match dir with + | Resto {dir; extra} -> + let open Lwt_syntax in + let callback conn req body = + let+ response = Metrics.Metrics_server.callback conn req body in + `Response response + in + Resto {dir; extra = EndpointMap.add (`GET, "/metrics") callback extra} + | Dream routes -> + let route = Router.make_metrics_route path in + Dream (route :: routes) + +let jsonrpc_websocket_register dir path handler = + match dir with + | Resto dir -> + (* No support for websockets in Resto. *) + Resto dir + | Dream routes -> + let route = Router.make_jsonrpc_websocket_route path handler in + Dream (route :: routes) + module Curry = struct type (_, _, _, _, _, _) conv = | Z : (unit, 'g, 'g, unit, 'f, 'f) conv diff --git a/etherlink/bin_node/lib_dev/evm_directory.mli b/etherlink/bin_node/lib_dev/evm_directory.mli index 6d3877801e231da23342ea033e67cea94cac2306..beca87ee4ee9364567185f1e653edada939f94ba 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.mli +++ b/etherlink/bin_node/lib_dev/evm_directory.mli @@ -8,10 +8,22 @@ (** {1 Directories depending on backends} *) +module EndpointMap : Map.S with type key = Cohttp.Code.meth * string + +type resto_dir = { + dir : unit Tezos_rpc.Directory.t; + extra : + (Cohttp_lwt_unix.Server.conn -> + Cohttp.Request.t -> + Cohttp_lwt.Body.t -> + Cohttp_lwt_unix.Server.response_action Lwt.t) + EndpointMap.t; +} + (** The type of RPC directory for EVM node depending on the chosen RPC server backend. *) type t = private - | Resto of unit Tezos_rpc.Directory.t (** A Resto directory *) + | Resto of resto_dir (** A Resto directory *) | Dream of Dream.route trace (** A list of Dream routes *) (** An empty directory depending on the RPC server backend. *) @@ -51,6 +63,15 @@ val streamed_register : ('params -> 'query -> 'input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> t +(** Register a new endpoint for collecting metrics. *) +val register_metrics : string -> t -> t + +(** Register a new websocket service. The handler should return an initial + JSONRPC response and optionally produce output elements in a stream for + subscription services. *) +val jsonrpc_websocket_register : + t -> string -> Rpc_encodings.websocket_handler -> t + (** {2 Curried functions with respect to service parameters} *) val register0 : diff --git a/etherlink/bin_node/lib_dev/evm_ro_context.ml b/etherlink/bin_node/lib_dev/evm_ro_context.ml index 59536c98aa706a7a6078bda56cddbf9ead9688f2..edb94ae593415fb48f971e63280bec90104644d8 100644 --- a/etherlink/bin_node/lib_dev/evm_ro_context.ml +++ b/etherlink/bin_node/lib_dev/evm_ro_context.ml @@ -212,7 +212,7 @@ struct call_service ~keep_alive:Ctxt.keep_alive ~base:evm_node_endpoint - (Services.dispatch_service ~path:Resto.Path.root) + (Services.dispatch_batch_service ~path:Resto.Path.root) () () (Batch methods) diff --git a/etherlink/bin_node/lib_dev/injector.ml b/etherlink/bin_node/lib_dev/injector.ml index 6c22c87b536e79a77c73eba362eebc9ba27ee606..760bbd5bb0b9aa9cadeb7aa70e2ed8969f5aa8d3 100644 --- a/etherlink/bin_node/lib_dev/injector.ml +++ b/etherlink/bin_node/lib_dev/injector.ml @@ -26,7 +26,7 @@ let send_raw_transaction ~keep_alive ~base raw_txn = call_service ~keep_alive ~base - (Services.dispatch_service ~path:Resto.Path.root) + (Services.dispatch_batch_service ~path:Resto.Path.root) () () (Singleton (send_raw_transaction_method raw_txn)) diff --git a/etherlink/bin_node/lib_dev/metrics.ml b/etherlink/bin_node/lib_dev/metrics.ml index 6fb4375f2c72cf315a5f52f60dcea7bb67900d87..5925f9218c3b9cfa5a0341c974d05d47568d3af8 100644 --- a/etherlink/bin_node/lib_dev/metrics.ml +++ b/etherlink/bin_node/lib_dev/metrics.ml @@ -15,6 +15,14 @@ open Prometheus let registry = CollectorRegistry.create () +type metrics_body = {body : string; content_type : string} + +let get_metrics () = + let open Lwt_syntax in + let+ data = CollectorRegistry.collect registry in + let body = Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output data in + {body; content_type = "text/plain; version=0.0.4"} + module Cohttp (Server : Cohttp_lwt.S.Server) = struct let callback _conn req _body = let open Cohttp in @@ -22,13 +30,8 @@ module Cohttp (Server : Cohttp_lwt.S.Server) = struct let uri = Request.uri req in match (Request.meth req, Uri.path uri) with | `GET, "/metrics" -> - let* data = CollectorRegistry.collect registry in - let body = - Fmt.to_to_string Prometheus_app.TextFormat_0_0_4.output data - in - let headers = - Header.init_with "Content-Type" "text/plain; version=0.0.4" - in + let* {body; content_type} = get_metrics () in + let headers = Header.init_with "Content-Type" content_type in Server.respond_string ~status:`OK ~headers ~body () | _ -> Server.respond_error ~status:`Bad_request ~body:"Bad request" () end diff --git a/etherlink/bin_node/lib_dev/router.ml b/etherlink/bin_node/lib_dev/router.ml index 3201fbdb76d9323b3218323b1ed737afae3e0b09..1ac7c69ba85e8e50fe1b03b31dab7808c85fe741 100644 --- a/etherlink/bin_node/lib_dev/router.ml +++ b/etherlink/bin_node/lib_dev/router.ml @@ -67,11 +67,29 @@ let encode media encoding = let decode media encoding = match media with - | `Json -> + | `Json -> ( fun s -> - Ezjsonm.value_from_string s |> Data_encoding.Json.destruct encoding + match Ezjsonm.value_from_string_result s with + | Error err -> Error (Ezjsonm.read_error_description err) + | Ok json -> ( + try Ok (Data_encoding.Json.destruct encoding json) + with exn -> + let err = + Format.asprintf + "%a" + (Data_encoding.Json.print_error ~print_unknown:(fun fmt exn -> + Format.fprintf + fmt + "Unknown exception %s" + (Printexc.exn_slot_name exn))) + exn + in + Error err)) | `Octets -> - Data_encoding.Binary.of_string_exn (Data_encoding.dynamic_size encoding) + fun s -> + Data_encoding.Binary.of_string (Data_encoding.dynamic_size encoding) s + |> Result.map_error + (Format.asprintf "%a" Data_encoding.Binary.pp_read_error) let content_header = function | `Json -> Dream.application_json @@ -83,6 +101,14 @@ let respond ?status ?code ?headers media v = Dream.set_header response "Content-type" (content_header media) ; return response +let respond_error ?status ?code ?headers media err = + respond + ?status + ?code + ?headers + media + (encode media (Rpc_encodings.JSONRPC.error_encoding Data_encoding.json) err) + let get_params : type params. Dream.request -> (unit, params) Path.t -> (params, string) result = @@ -161,11 +187,13 @@ let make_gen_route : | Ok params -> ( match input_encoding with | No_input -> handler request ~params ~query () - | Input input_encoding -> + | Input input_encoding -> ( let* body = Dream.body request in let media = content_media request in - let input = decode media input_encoding body in - handler request ~params ~query input)) + match decode media input_encoding body with + | Error msg -> + respond_error output_media (Rpc_errors.parse_error msg) + | Ok input -> handler request ~params ~query input))) let make_route service handler = make_gen_route service @@ fun request ~params ~query input -> @@ -231,3 +259,106 @@ let make_stream_route service handler = in shutdown () ; return_unit + +let make_metrics_route path = + let open Lwt_syntax in + Dream.get path @@ fun _request -> + let* {body; content_type} = Metrics.get_metrics () in + let* response = Dream.respond body in + Dream.set_header response "Content-type" content_type ; + return response + +let send_error media websocket error = + let text_or_binary = match media with `Json -> `Text | `Octets -> `Binary in + Dream.send + ~text_or_binary + websocket + (encode + media + (Rpc_encodings.JSONRPC.error_encoding Data_encoding.json) + error) + +let make_jsonrpc_websocket_route path + (handler : Rpc_encodings.websocket_handler) = + let open Lwt_syntax in + let open Rpc_encodings in + Dream.get path @@ fun request -> + let input_media = content_media request in + let output_media = accept_media request in + let text_or_binary = + match output_media with `Json -> `Text | `Octets -> `Binary + in + Dream.websocket @@ fun websocket -> + let subscriptions = Stdlib.Hashtbl.create 17 in + let write_stream {id; stream; stopper} = + Stdlib.Hashtbl.add subscriptions id stopper ; + let* () = + Lwt_stream.iter_s + (function + | Ok output -> + let output = + encode output_media Subscription.response_encoding output + in + (* WARNING: https://gitlab.com/tezos/tezos/-/issues/7645 + Dream.send can 100% cpu on closed connections (see + https://github.com/aantron/dream/issues/230). *) + Dream.send ~text_or_binary websocket output + | Error exn -> + send_error + output_media + websocket + (Rpc_errors.internal_error (Printexc.exn_slot_name exn))) + (Lwt_stream.wrap_exn stream) + in + stopper () ; + Stdlib.Hashtbl.remove subscriptions id ; + return_unit + in + let async_write_stream subscription = + Lwt.dont_wait + (fun () -> write_stream subscription) + (fun exn -> + subscription.stopper () ; + Stdlib.Hashtbl.remove subscriptions subscription.id ; + Dream.error @@ fun log -> + log + "Websocket write exception with %s: %s" + (Dream.client request) + (Printexc.to_string exn)) + in + let rec loop () = + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7645 + Dream.receive may not resolve (with None) on closed connections. *) + let* message = Dream.receive websocket in + match message with + | None -> + (* The websocket is closed, cleaning up. *) + Dream.log + "Websocket connection with %s is closed, cleaning up (%d \ + subscriptions)" + (Dream.client request) + (Stdlib.Hashtbl.length subscriptions) ; + Stdlib.Hashtbl.iter (fun _id stopper -> stopper ()) subscriptions ; + Stdlib.Hashtbl.clear subscriptions ; + Dream.close_websocket websocket + | Some message -> + let* () = + match decode input_media JSONRPC.request_encoding message with + | Error msg -> + send_error output_media websocket (Rpc_errors.parse_error msg) + | Ok input -> + let* ws_response = handler input in + let response = + encode + output_media + JSONRPC.response_encoding + ws_response.response + in + let* () = Dream.send ~text_or_binary websocket response in + (* Support multiple asynchronous requests on websocket. *) + Option.iter async_write_stream ws_response.subscription ; + return_unit + in + loop () + in + loop () diff --git a/etherlink/bin_node/lib_dev/router.mli b/etherlink/bin_node/lib_dev/router.mli index 39f2232edf91cf75b9df833ee4396d4cc2fde60b..b792ddf64524a7209b494325394ea04c7fab4548 100644 --- a/etherlink/bin_node/lib_dev/router.mli +++ b/etherlink/bin_node/lib_dev/router.mli @@ -32,7 +32,7 @@ val make_opt_tz_route : (** [make_stream_route service handler] builds a route which streams the response from a handler that constructs an {!Lwt_stream.t}. The output - stream is streamed as chunks in the response body.. *) + stream is streamed as chunks in the response body. *) val make_stream_route : ([< Resto.meth], unit, 'params, 'query, 'input, 'output) Tezos_rpc.Service.t -> (params:'params -> @@ -40,3 +40,17 @@ val make_stream_route : 'input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> Dream.route + +(** [make_metrics_route path] builds a route that returns collected metrics in + plain text format. *) +val make_metrics_route : string -> Dream.route + +(** {2 JSONRPC specific routes} *) + +(** [make_jsonrpc_websocket_route service handler] builds a route which accepts + websocket connections for JSONRPC requests and subscriptions. The server + reads requests from this websocket and writes a stream of output in + response. Multiple streams can be written and interlaced in the websocket + response. *) +val make_jsonrpc_websocket_route : + string -> Rpc_encodings.websocket_handler -> Dream.route diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.ml b/etherlink/bin_node/lib_dev/rpc_encodings.ml index 1b95cd3216aa8fc49904853c5893af4cbedaa3bd..dc310556022b191202bd309b19222f9262aa3837 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.ml +++ b/etherlink/bin_node/lib_dev/rpc_encodings.ml @@ -1048,3 +1048,16 @@ let map_method_name ~restrict method_name = if List.mem ~equal:( = ) method_name unsupported_methods then Unsupported else Unknown + +type websocket_subscription = { + id : Ethereum_types.Subscription.id; + stream : Subscription.response Lwt_stream.t; + stopper : unit -> unit; +} + +type websocket_response = { + response : JSONRPC.response; + subscription : websocket_subscription option; +} + +type websocket_handler = JSONRPC.request -> websocket_response Lwt.t diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.mli b/etherlink/bin_node/lib_dev/rpc_encodings.mli index 7c73dde06c25a541ddd7f2d0152f666461e1fafb..3423ef617ea7a5f8b03021c86b419c246f26b56f 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.mli +++ b/etherlink/bin_node/lib_dev/rpc_encodings.mli @@ -353,3 +353,16 @@ type map_result = val map_method_name : restrict:Configuration.restricted_rpcs -> string -> map_result + +type websocket_subscription = { + id : Ethereum_types.Subscription.id; + stream : Subscription.response Lwt_stream.t; + stopper : unit -> unit; +} + +type websocket_response = { + response : JSONRPC.response; + subscription : websocket_subscription option; +} + +type websocket_handler = JSONRPC.request -> websocket_response Lwt.t diff --git a/etherlink/bin_node/lib_dev/rpc_errors.ml b/etherlink/bin_node/lib_dev/rpc_errors.ml index 7629625109bd00b6e4448414a32af5b1cec65b26..d528695d95fc90210fef017c686b57d0cf50fa51 100644 --- a/etherlink/bin_node/lib_dev/rpc_errors.ml +++ b/etherlink/bin_node/lib_dev/rpc_errors.ml @@ -29,7 +29,8 @@ open Rpc_encodings type t = Data_encoding.json JSONRPC.error -let parse_error = JSONRPC.{code = -32700; message = "Parse error"; data = None} +let parse_error msg = + JSONRPC.{code = -32700; message = "Parse error: " ^ msg; data = None} let invalid_request reason = JSONRPC.{code = -32600; message = reason; data = None} diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index 216ae82d8585d3f3cd060253eaf470e7a24346b7..11b8726615778eea1b9be5188563db84810a4fb5 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -17,34 +17,35 @@ type evm_services_methods = { type block_production = [`Single_node | `Threshold_encryption | `Disabled] -let callback server dir = - let open Cohttp in - let open Lwt_syntax in - let callback_log conn req body = - let path = Request.uri req |> Uri.path in - if path = "/metrics" then - let* response = Metrics.Metrics_server.callback conn req body in - Lwt.return (`Response response) - else - let uri = req |> Request.uri |> Uri.to_string in - let meth = req |> Request.meth |> Code.string_of_method in - let* body_str = body |> Cohttp_lwt.Body.to_string in - let* () = Events.callback_log ~uri ~meth ~body:body_str in - Tezos_rpc_http_server.RPC_server.resto_callback - server - conn - req - (Cohttp_lwt.Body.of_string body_str) - in - let update_metrics uri meth = - Prometheus.Summary.(time (labels Metrics.Rpc.metrics [uri; meth]) Sys.time) - in - Tezos_rpc_http_server.RPC_middleware.rpc_metrics_transform_callback - ~update_metrics - dir - callback_log - module Resto = struct + let callback server Evm_directory.{dir; extra} = + let open Cohttp in + let open Lwt_syntax in + let callback_log conn req body = + let path = Request.uri req |> Uri.path in + let meth = Request.meth req in + match Evm_directory.EndpointMap.find (meth, path) extra with + | Some callback -> callback conn req body + | None -> + let uri = req |> Request.uri |> Uri.to_string in + let meth = req |> Request.meth |> Code.string_of_method in + let* body_str = body |> Cohttp_lwt.Body.to_string in + let* () = Events.callback_log ~uri ~meth ~body:body_str in + Tezos_rpc_http_server.RPC_server.resto_callback + server + conn + req + (Cohttp_lwt.Body.of_string body_str) + in + let update_metrics uri meth = + Prometheus.Summary.( + time (labels Metrics.Rpc.metrics [uri; meth]) Sys.time) + in + Tezos_rpc_http_server.RPC_middleware.rpc_metrics_transform_callback + ~update_metrics + dir + callback_log + let start_server rpc directory = let open Lwt_result_syntax in let open Tezos_rpc_http_server in @@ -66,7 +67,7 @@ module Resto = struct ~acl ~cors ~media_types:Supported_media_types.all - directory + directory.Evm_directory.dir in let*! () = @@ -130,6 +131,7 @@ let start_public_server ?delegate_health_check_to ?evm_services let directory = Services.directory ?delegate_health_check_to config.public_rpc config ctxt |> register_evm_services + |> Evm_directory.register_metrics "/metrics" in let* finalizer = start_server config.public_rpc directory in let*! () = @@ -137,6 +139,7 @@ let start_public_server ?delegate_health_check_to ?evm_services ~rpc_addr:config.public_rpc.addr ~rpc_port:config.public_rpc.port ~backend:config.experimental_features.rpc_server + ~websockets:config.experimental_features.enable_websocket in return finalizer @@ -146,6 +149,7 @@ let start_private_server ?(block_production = `Disabled) config ctxt = | Some private_rpc -> let directory = Services.private_directory private_rpc ~block_production config ctxt + |> Evm_directory.register_metrics "/metrics" in let* finalizer = start_server private_rpc directory in let*! () = @@ -153,6 +157,7 @@ let start_private_server ?(block_production = `Disabled) config ctxt = ~rpc_addr:private_rpc.addr ~rpc_port:private_rpc.port ~backend:config.experimental_features.rpc_server + ~websockets:config.experimental_features.enable_websocket in return finalizer | None -> return (fun () -> Lwt_syntax.return_unit) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 2983a5556a590fcb6b82145cdbfee2c84c88ab5e..55fe05a29d995edc183ec6494b1b6bef76069065 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -148,7 +148,7 @@ let health_check ?delegate_to dir = once. *) type 'a batched_request = Singleton of 'a | Batch of 'a list -let request_encoding kind = +let batch_encoding kind = Data_encoding.( union [ @@ -169,8 +169,15 @@ let request_encoding kind = let dispatch_service ~path = Service.post_service ~query:Query.empty - ~input:(request_encoding JSONRPC.request_encoding) - ~output:(request_encoding JSONRPC.response_encoding) + ~input:JSONRPC.request_encoding + ~output:JSONRPC.response_encoding + path + +let dispatch_batch_service ~path = + Service.post_service + ~query:Query.empty + ~input:(batch_encoding JSONRPC.request_encoding) + ~output:(batch_encoding JSONRPC.response_encoding) path let get_block_by_number ~full_transaction_object block_param @@ -863,9 +870,19 @@ let dispatch_handler (rpc : Configuration.rpc) config ctx dispatch_request let* outputs = List.map_s process requests in return (Batch outputs) +let websocket_response_of_response response = {response; subscription = None} + +let dispatch_private_websocket ~block_production (rpc : Configuration.rpc) + config ctx (input : JSONRPC.request) = + let open Lwt_syntax in + let+ response = + dispatch_private_request ~block_production rpc config ctx input + in + websocket_response_of_response response + let generic_dispatch (rpc : Configuration.rpc) config ctx dir path dispatch_request = - Evm_directory.register0 dir (dispatch_service ~path) (fun () input -> + Evm_directory.register0 dir (dispatch_batch_service ~path) (fun () input -> dispatch_handler rpc config ctx dispatch_request input |> Lwt_result.ok) let dispatch_public (rpc : Configuration.rpc) config ctx dir = @@ -881,6 +898,28 @@ let dispatch_private (rpc : Configuration.rpc) ~block_production config ctx dir Path.(add_suffix root "private") (dispatch_private_request rpc ~block_production) +let generic_websocket_dispatch (rpc : Configuration.rpc) + (config : Configuration.t) ctx dir path dispatch_request = + if config.experimental_features.enable_websocket then + Evm_directory.jsonrpc_websocket_register dir path (fun request -> + let open Lwt_syntax in + let+ response = dispatch_request rpc config ctx request in + websocket_response_of_response response) + else dir + +let dispatch_websocket_public (rpc : Configuration.rpc) config ctx dir = + generic_websocket_dispatch rpc config ctx dir "/ws" dispatch_request + +let dispatch_websocket_private (rpc : Configuration.rpc) ~block_production + config ctx dir = + generic_websocket_dispatch + rpc + config + ctx + dir + "/private/ws" + (dispatch_private_request ~block_production) + let directory ?delegate_health_check_to rpc config ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) = Evm_directory.empty config.experimental_features.rpc_server @@ -890,6 +929,10 @@ let directory ?delegate_health_check_to rpc config rpc config ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + |> dispatch_websocket_public + rpc + config + ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) let private_directory rpc config ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) @@ -901,6 +944,11 @@ let private_directory rpc config config ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) ~block_production + |> dispatch_websocket_private + rpc + config + ((module Rollup_node_rpc : Services_backend_sig.S), smart_rollup_address) + ~block_production let call (type input output) (module R : Rpc_encodings.METHOD @@ -912,7 +960,7 @@ let call (type input output) Rollup_services.call_service ~keep_alive ~base:evm_node_endpoint - (dispatch_service ~path:Resto.Path.root) + (dispatch_batch_service ~path:Resto.Path.root) () () (Singleton diff --git a/etherlink/bin_node/main.ml b/etherlink/bin_node/main.ml index 57b22bf5e689caf3c1e40f8b704360ac43866c06..4883c1a75a90514ebbf7210f663bb275c0515755 100644 --- a/etherlink/bin_node/main.ml +++ b/etherlink/bin_node/main.ml @@ -45,6 +45,16 @@ module Event = struct ~msg:"Native execution is disabled in sequencer mode" ~level:Warning () + + let buggy_dream_websocket = + Internal_event.Simple.declare_0 + ~section + ~name:"dream_websocket" + ~msg: + "[Warning] Websocket support in Dream is known to be buggy, consider \ + using Resto as an RPC server or disabling websockets." + ~level:Warning + () end module Params = struct @@ -707,12 +717,13 @@ let snapshot_file_arg = is based on the snapshot information." Params.string -(* TODO: https://gitlab.com/tezos/tezos/-/issues/7591 - Remove this whenever we have an experimental websocket server ready to - be tested. *) -let fail_if_websocket_is_enabled ~config = - if config.experimental_features.enable_websocket then - Stdlib.failwith "The experimental websocket server is not implemented yet." +let websocket_checks config = + match config.experimental_features with + | {enable_websocket = true; rpc_server = Dream; _} -> + Internal_event.Simple.emit Event.buggy_dream_websocket () |> Lwt_result.ok + | {enable_websocket = true; rpc_server = Resto; _} -> + failwith "Resto RPC server backend does not support websockets" + | _ -> Lwt_result_syntax.return_unit let start_proxy ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ?cors_origins ?cors_headers ?log_filter_max_nb_blocks @@ -744,7 +755,6 @@ let start_proxy ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ~verbose () in - fail_if_websocket_is_enabled ~config ; (* We patch [config] to take into account the proxy-specific argument [--read-only]. *) let config = @@ -867,14 +877,6 @@ let start_sequencer ?password_filename ~wallet_dir ~data_dir ?rpc_addr ?rpc_port ~finalized_view () in - fail_if_websocket_is_enabled ~config:configuration ; - let*! configuration = - match sandbox_key with - | None -> - (* We are running in sequencer mode (not in sandbox mode), we need to disable native execution *) - sequencer_disable_native_execution configuration - | _ -> Lwt.return configuration - in let*! () = let open Tezos_base_unix.Internal_event_unix in let config = @@ -896,6 +898,14 @@ let start_sequencer ?password_filename ~wallet_dir ~data_dir ?rpc_addr ?rpc_port in init ~config () in + let*! configuration = + match sandbox_key with + | None -> + (* We are running in sequencer mode (not in sandbox mode), we need to disable native execution *) + sequencer_disable_native_execution configuration + | _ -> Lwt.return configuration + in + let* () = websocket_checks configuration in let*! () = Internal_event.Simple.emit Event.event_starting "sequencer" in Evm_node_lib_dev.Sequencer.main @@ -956,8 +966,6 @@ let start_threshold_encryption_sequencer ?password_filename ~wallet_dir ~finalized_view () in - fail_if_websocket_is_enabled ~config:configuration ; - let*! configuration = sequencer_disable_native_execution configuration in let*! () = let open Tezos_base_unix.Internal_event_unix in let config = @@ -979,6 +987,8 @@ let start_threshold_encryption_sequencer ?password_filename ~wallet_dir in init ~config () in + let*! configuration = sequencer_disable_native_execution configuration in + let* () = websocket_checks configuration in let*! () = Internal_event.Simple.emit Event.event_starting "te_sequencer" in Evm_node_lib_dev.Threshold_encryption_sequencer.main ~data_dir @@ -1117,6 +1127,7 @@ let rpc_command = in init ~config () in + let* () = websocket_checks config in let*! () = Internal_event.Simple.emit Event.event_starting "rpc" in Evm_node_lib_dev.Rpc.main ~data_dir ~evm_node_endpoint ~config) @@ -1162,7 +1173,6 @@ let start_observer ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ~finalized_view () in - fail_if_websocket_is_enabled ~config ; let*! () = let open Tezos_base_unix.Internal_event_unix in let config = @@ -1184,6 +1194,7 @@ let start_observer ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit in init ~config () in + let* () = websocket_checks config in let*! () = Internal_event.Simple.emit Event.event_starting "observer" in Evm_node_lib_dev.Observer.main ~no_sync @@ -1747,6 +1758,12 @@ mode.|} ~finalized_view () in + let*! () = + let open Tezos_base_unix.Internal_event_unix in + let config = make_with_defaults ~verbosity:Warning () in + init ~config () + in + let* () = websocket_checks config in Configuration.save ~force ~data_dir config) let check_config_command = @@ -1759,16 +1776,22 @@ let check_config_command = (args3 data_dir_arg config_path_arg print_config_arg) (prefixes ["check"; "config"] @@ stop) (fun (data_dir, config_path, print_config) () -> - let+ config = + let* config = match config_path with | Some config_path -> load_file ~data_dir:Configuration.default_data_dir config_path | None -> load ~data_dir in - + let*! () = + let open Tezos_base_unix.Internal_event_unix in + let config = make_with_defaults ~verbosity:Warning () in + init ~config () + in + let* () = websocket_checks config in if print_config then Format.printf "%a\n" (Configuration.pp_print_json ~data_dir) config - else Format.printf "Configuration has been parsed successfully.\n") + else Format.printf "Configuration has been parsed successfully.\n" ; + return_unit) let config_key_arg ~name ~placeholder = let open Lwt_result_syntax in diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 796969835afaf249d9ce7eddbb7fe314bc0618d9..e0a51e96033838820341f4e76ec96a137c2058a7 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -113,6 +113,7 @@ module Parameters = struct endpoint : string; runner : Runner.t option; restricted_rpcs : string option; + websockets : bool; } type session_state = {mutable ready : bool} @@ -708,7 +709,8 @@ let mode_with_new_private_rpc (mode : mode) = | _ -> mode let create ?(path = Uses.path Constant.octez_evm_node) ?name ?runner - ?(mode = Proxy) ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs endpoint = + ?(mode = Proxy) ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs + ?(websockets = false) endpoint = let arguments, rpc_addr, rpc_port = connection_arguments ?rpc_addr ?rpc_port ?runner () in @@ -747,6 +749,7 @@ let create ?(path = Uses.path Constant.octez_evm_node) ?name ?runner endpoint; restricted_rpcs; runner; + websockets; } in on_event evm_node (handle_is_ready_event evm_node) ; @@ -1183,7 +1186,8 @@ type rpc_server = Resto | Dream let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) ?(block_storage_sqlite3 = true) - ?(next_wasm_runtime = true) ?garbage_collector ?rpc_server () = + ?(next_wasm_runtime = true) ?garbage_collector ?rpc_server + ?(enable_websocket = false) () = let conditional_json_put ~name cond value_json json = if cond then JSON.put @@ -1231,9 +1235,10 @@ let patch_config_with_experimental_feature |> optional_json_put ~name:"rpc_server" rpc_server (function | Resto -> `String "resto" | Dream -> `String "dream") + |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port - ?restricted_rpcs rollup_node = + ?restricted_rpcs ?websockets rollup_node = let evm_node = create ?name @@ -1243,6 +1248,7 @@ let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port ?rpc_addr ?rpc_port ?restricted_rpcs + ?websockets rollup_node in let* () = Process.check @@ spawn_init_config evm_node in @@ -1251,6 +1257,15 @@ let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port | Some patch_config -> Config_file.update evm_node patch_config | None -> unit in + let* () = + match websockets with + | Some _ -> + Config_file.update evm_node + @@ patch_config_with_experimental_feature + ?enable_websocket:websockets + () + | _ -> unit + in let* () = run evm_node in return evm_node @@ -1312,7 +1327,35 @@ let call_evm_rpc ?(private_ = false) evm_node request = let batch_evm_rpc ?(private_ = false) evm_node requests = let endpoint = endpoint ~private_ evm_node in - Curl.post endpoint (batch_requests requests) |> Runnable.run + let* json = Curl.post endpoint (batch_requests requests) |> Runnable.run in + return (JSON.as_list json) + +let open_websocket ?(private_ = false) evm_node = + let kind = if private_ then "private" else "public" in + Websocket.connect + ~name:(String.concat "_" ["ws"; kind; evm_node.name]) + (endpoint ~private_ evm_node ^ "/ws") + +let call_evm_websocket websocket request = + Websocket.send_recv websocket (build_request request) + +let batch_evm_websocket websocket requests = + let* l = + Lwt_list.map_s + (fun r -> Websocket.send websocket (build_request r)) + requests + in + Lwt_list.map_s (fun () -> Websocket.recv websocket) l + +let jsonrpc ?websocket ?private_ evm_node = + match websocket with + | None -> call_evm_rpc ?private_ evm_node + | Some ws -> call_evm_websocket ws + +let batch_jsonrpc ?websocket ?private_ evm_node = + match websocket with + | None -> batch_evm_rpc ?private_ evm_node + | Some ws -> batch_evm_websocket ws let extract_result json = JSON.(json |-> "result") diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index eaa92a3630070ac39ac9af8b5e6a066f0179cb60..d093a355f8dc9b380f81894c8310051d840edcf9 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -140,8 +140,9 @@ val supports_threshold_encryption : t -> bool rollup_node_endpoint] creates an EVM node server. The server listens to requests at address [rpc_addr] and the port - [rpc_port]. [rpc_addr] defaults to [Constant.default_host] and a fresh port is - chosen if [rpc_port] is not set. + [rpc_port]. [rpc_addr] defaults to [Constant.default_host] and a fresh port + is chosen if [rpc_port] is not set. The EVM node starts a websocket server + for JSON-RPC communication if [websocket] is [true]. The server communicates with a rollup-node and sets its endpoint via [rollup_node_endpoint]. @@ -157,6 +158,7 @@ val create : ?rpc_addr:string -> ?rpc_port:int -> ?restricted_rpcs:string -> + ?websockets:bool -> string -> t @@ -261,7 +263,7 @@ type rpc_server = Resto | Dream (** [patch_config_with_experimental_feature ?drop_duplicate_when_injection ?block_storage_sqlite3 - ?next_wasm_runtime ?rpc_server json_config] patches a config to + ?next_wasm_runtime ?rpc_server ?enable_websocket json_config] patches a config to add experimental feature. Each optional argument add the correspondent experimental feature. *) val patch_config_with_experimental_feature : @@ -270,6 +272,7 @@ val patch_config_with_experimental_feature : ?next_wasm_runtime:bool -> ?garbage_collector:garbage_collector -> ?rpc_server:rpc_server -> + ?enable_websocket:bool -> unit -> JSON.t -> JSON.t @@ -287,6 +290,7 @@ val init : ?rpc_addr:string -> ?rpc_port:int -> ?restricted_rpcs:string -> + ?websockets:bool -> string -> t Lwt.t @@ -383,17 +387,44 @@ val endpoint : ?private_:bool -> t -> string (** JSON-RPC request. *) type request = {method_ : string; parameters : JSON.u} -(** [call_evm_rpc ?private_ evm_node ~request] sends a JSON-RPC request to +(** [call_evm_rpc ?private_ evm_node request] sends a JSON-RPC request to the [evm_node], for the given [request]. If [private_] is true, the request is sent to the private RPC server. *) val call_evm_rpc : ?private_:bool -> t -> request -> JSON.t Lwt.t -(** [batch_evm_rpc ?private_ evm_node ~requests] sends multiple JSON-RPC requests +(** [batch_evm_rpc ?private_ evm_node requests] sends multiple JSON-RPC requests to the [evm_node], for the given [requests]. If [private_] is true, the requests are sent to the private RPC server. *) -val batch_evm_rpc : ?private_:bool -> t -> request list -> JSON.t Lwt.t +val batch_evm_rpc : ?private_:bool -> t -> request list -> JSON.t list Lwt.t + +(** Open a websocket connection with the EVM node. If [private_] is true, a + connection is created with the private websocket endpoint of the node. *) +val open_websocket : ?private_:bool -> t -> Websocket.t Lwt.t + +(** [call_evm_websocket ws request] sends a JSON-RPC request on the websocket + connection [ws] and waits for the response. *) +val call_evm_websocket : Websocket.t -> request -> JSON.t Lwt.t + +(** [batch_evm_websocket ws requests] sends multiple JSON-RPC requests on the + websocket connection [ws] without waiting for the responses, then receive + all responses at the end. *) +val batch_evm_websocket : Websocket.t -> request list -> JSON.t list Lwt.t + +(** [jsonrpc] uses the [websocket] to make a JSON-RPC call if provided or falls + back to using HTTP RPC request on the EVM node otherwise. *) +val jsonrpc : + ?websocket:Websocket.t -> ?private_:bool -> t -> request -> JSON.t Lwt.t + +(** [batch_jsonrpc] uses the [websocket] to make a JSON-RPC calls if provided or + falls back to using an HTTP RPC batch request on the EVM node otherwise. *) +val batch_jsonrpc : + ?websocket:Websocket.t -> + ?private_:bool -> + t -> + request list -> + JSON.t list Lwt.t (** [extract_result json] expects a JSON-RPC `result` and returns the value. *) val extract_result : JSON.t -> JSON.t diff --git a/etherlink/tezt/lib/helpers.ml b/etherlink/tezt/lib/helpers.ml index 7786613b6ed0e2a200d9b8d8d60cf1f771ca9146..d1806ff66a5f865c58271024cd0342d762b1f0e1 100644 --- a/etherlink/tezt/lib/helpers.ml +++ b/etherlink/tezt/lib/helpers.ml @@ -250,12 +250,14 @@ let bake_until_sync ?__LOC__ ?timeout_in_blocks ?timeout ~sc_rollup_node ~proxy (** [wait_for_transaction_receipt ~evm_node ~transaction_hash] takes an transaction_hash and returns only when the receipt is non null, or [count] blocks have passed and the receipt is still not available. *) -let wait_for_transaction_receipt ?(count = 3) ~evm_node ~transaction_hash () = +let wait_for_transaction_receipt ?websocket ?(count = 3) ~evm_node + ~transaction_hash () = let rec loop count = let* () = Lwt_unix.sleep 5. in let* receipt = Evm_node.( - call_evm_rpc + jsonrpc + ?websocket evm_node { method_ = "eth_getTransactionReceipt"; @@ -290,7 +292,7 @@ let wait_for_application ~produce_block apply = in Lwt.pick [application_result; loop 0] -let batch_n_transactions ~evm_node txs = +let batch_n_transactions ?websocket ~evm_node txs = let requests = List.map (fun tx -> @@ -298,16 +300,17 @@ let batch_n_transactions ~evm_node txs = {method_ = "eth_sendRawTransaction"; parameters = `A [`String tx]}) txs in - let* hashes = Evm_node.batch_evm_rpc evm_node requests in + let* hashes = Evm_node.batch_jsonrpc ?websocket evm_node requests in let hashes = - hashes |> JSON.as_list + hashes |> List.map (fun json -> Evm_node.extract_result json |> JSON.as_string) in return (requests, hashes) (* sending more than ~300 tx could fail, because or curl *) -let send_n_transactions ~produce_block ~evm_node ?wait_for_blocks txs = - let* requests, hashes = batch_n_transactions ~evm_node txs in +let send_n_transactions ?websocket ~produce_block ~evm_node ?wait_for_blocks txs + = + let* requests, hashes = batch_n_transactions ?websocket ~evm_node txs in let first_hash = List.hd hashes in (* Let's wait until one of the transactions is injected into a block, and test this block contains the `n` transactions as expected. *) @@ -315,6 +318,7 @@ let send_n_transactions ~produce_block ~evm_node ?wait_for_blocks txs = wait_for_application ~produce_block (wait_for_transaction_receipt + ?websocket ?count:wait_for_blocks ~evm_node ~transaction_hash:first_hash) diff --git a/etherlink/tezt/lib/helpers.mli b/etherlink/tezt/lib/helpers.mli index 9b05c3a922a208468cf9b4336d62ab99c7adf678..da4928210ba777bc92e5e995f5511ddf555b2d0f 100644 --- a/etherlink/tezt/lib/helpers.mli +++ b/etherlink/tezt/lib/helpers.mli @@ -168,6 +168,7 @@ val bake_until_sync : transaction_hash and returns only when the receipt is non null, or [count] blocks have passed and the receipt is still not available. *) val wait_for_transaction_receipt : + ?websocket:Websocket.t -> ?count:int -> evm_node:Evm_node.t -> transaction_hash:string -> @@ -184,6 +185,7 @@ val wait_for_application : (** [batch_n_transactions ~evm_node raw_transactions] batches [raw_transactions] to the [evm_node] and returns the requests and transaction hashes. *) val batch_n_transactions : + ?websocket:Websocket.t -> evm_node:Evm_node.t -> string list -> (Evm_node.request list * string list) Lwt.t @@ -193,6 +195,7 @@ val batch_n_transactions : until the first one is applied in a block and returns, or fails if it isn't applied after [wait_for_blocks] blocks. *) val send_n_transactions : + ?websocket:Websocket.t -> produce_block:(unit -> ('a, Rpc.error) result Lwt.t) -> evm_node:Evm_node.t -> ?wait_for_blocks:int -> diff --git a/etherlink/tezt/lib/rpc.ml b/etherlink/tezt/lib/rpc.ml index ca8e09696754328551bd681e30ae7b4c58830ff2..777437720c8beeae9408de2f34a04c29113f44d7 100644 --- a/etherlink/tezt/lib/rpc.ml +++ b/etherlink/tezt/lib/rpc.ml @@ -215,14 +215,15 @@ module Request = struct let coinbase = {method_ = "eth_coinbase"; parameters = `Null} end -let net_version evm_node = - let* json = Evm_node.call_evm_rpc evm_node Request.net_version in +let net_version ?websocket evm_node = + let* json = Evm_node.jsonrpc ?websocket evm_node Request.net_version in return (decode_or_error (fun json -> JSON.(json |-> "result" |> as_string)) json) -let get_transaction_by_hash ~transaction_hash evm_node = +let get_transaction_by_hash ?websocket ~transaction_hash evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.eth_getTransactionByHash ~transaction_hash) in @@ -235,28 +236,32 @@ let get_transaction_by_hash ~transaction_hash evm_node = else Some (Transaction.transaction_object_of_json json))) json) -let get_code ~address evm_node = +let get_code ?websocket ~address evm_node = let* json = - Evm_node.call_evm_rpc evm_node (Request.eth_getCode ~address ~block:Latest) + Evm_node.jsonrpc + ?websocket + evm_node + (Request.eth_getCode ~address ~block:Latest) in return (decode_or_error (fun json -> JSON.(json |-> "result" |> as_string)) json) -let block_number evm_node = - let* json = Evm_node.call_evm_rpc evm_node Request.eth_blockNumber in +let block_number ?websocket evm_node = + let* json = Evm_node.jsonrpc ?websocket evm_node Request.eth_blockNumber in return (decode_or_error (fun json -> JSON.(json |-> "result" |> as_int32)) json) -let block_number_opt evm_node = - let* json = Evm_node.call_evm_rpc evm_node Request.eth_blockNumber in +let block_number_opt ?websocket evm_node = + let* json = Evm_node.jsonrpc ?websocket evm_node Request.eth_blockNumber in return (decode_or_error (fun json -> JSON.(json |-> "result" |> as_opt |> Option.map as_int32)) json) -let get_block_by_number ?(full_tx_objects = false) ~block evm_node = +let get_block_by_number ?websocket ?(full_tx_objects = false) ~block evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.eth_getBlockByNumber ~block ~full_tx_objects) in @@ -265,9 +270,10 @@ let get_block_by_number ?(full_tx_objects = false) ~block evm_node = (fun json -> JSON.(json |-> "result" |> Block.of_json)) json) -let get_block_by_hash ?(full_tx_objects = false) ~block evm_node = +let get_block_by_hash ?websocket ?(full_tx_objects = false) ~block evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.eth_getBlockByHash ~block ~full_tx_objects) in @@ -276,9 +282,10 @@ let get_block_by_hash ?(full_tx_objects = false) ~block evm_node = (fun json -> JSON.(json |-> "result" |> Block.of_json)) json) -let get_gas_price evm_node = +let get_gas_price ?websocket evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node {method_ = "eth_gasPrice"; parameters = `Null} in @@ -307,9 +314,10 @@ module Syntax = struct | Error err -> f err end -let produce_block ?with_delayed_transactions ?timestamp evm_node = +let produce_block ?websocket ?with_delayed_transactions ?timestamp evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket ~private_:true evm_node (Request.produceBlock ?with_delayed_transactions ?timestamp ()) @@ -319,9 +327,10 @@ let produce_block ?with_delayed_transactions ?timestamp evm_node = (fun json -> Evm_node.extract_result json |> JSON.as_int) json -let produce_proposal ?timestamp evm_node = +let produce_proposal ?websocket ?timestamp evm_node = let* json = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket ~private_:true evm_node (Request.produceProposal ?timestamp ()) @@ -333,18 +342,26 @@ let produce_proposal ?timestamp evm_node = if JSON.is_null json then () else ()) json -let state_value evm_node path = +let state_value ?websocket evm_node path = let* json = - Evm_node.call_evm_rpc ~private_:true evm_node (Request.stateValue path) + Evm_node.jsonrpc + ?websocket + ~private_:true + evm_node + (Request.stateValue path) in return @@ decode_or_error (fun json -> Evm_node.extract_result json |> JSON.as_string_opt) json -let state_subkeys evm_node path = +let state_subkeys ?websocket evm_node path = let* json = - Evm_node.call_evm_rpc ~private_:true evm_node (Request.stateSubkeys path) + Evm_node.jsonrpc + ?websocket + ~private_:true + evm_node + (Request.stateSubkeys path) in return @@ decode_or_error @@ -354,18 +371,24 @@ let state_subkeys evm_node path = | None -> None) json -let send_raw_transaction ~raw_tx evm_node = +let send_raw_transaction ?websocket ~raw_tx evm_node = let* response = - Evm_node.call_evm_rpc evm_node (Request.eth_sendRawTransaction ~raw_tx) + Evm_node.jsonrpc + ?websocket + evm_node + (Request.eth_sendRawTransaction ~raw_tx) in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string) response -let get_transaction_receipt ~tx_hash evm_node = +let get_transaction_receipt ?websocket ~tx_hash evm_node = let* response = - Evm_node.call_evm_rpc evm_node (Request.eth_getTransactionReceipt ~tx_hash) + Evm_node.jsonrpc + ?websocket + evm_node + (Request.eth_getTransactionReceipt ~tx_hash) in return @@ decode_or_error @@ -375,9 +398,10 @@ let get_transaction_receipt ~tx_hash evm_node = |> Option.map Transaction.transaction_receipt_of_json) response -let estimate_gas eth_call evm_node = +let estimate_gas ?websocket eth_call evm_node = let* response = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.eth_estimateGas ~eth_call ~block:"latest") in @@ -386,9 +410,10 @@ let estimate_gas eth_call evm_node = (fun response -> Evm_node.extract_result response |> JSON.as_int64) response -let get_transaction_count ?(block = "latest") ~address evm_node = +let get_transaction_count ?websocket ?(block = "latest") ~address evm_node = let* response = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.eth_getTransactionCount ~address ~block) in @@ -397,32 +422,36 @@ let get_transaction_count ?(block = "latest") ~address evm_node = (fun response -> Evm_node.extract_result response |> JSON.as_int64) response -let tez_kernelVersion evm_node = - let* response = Evm_node.call_evm_rpc evm_node Request.tez_kernelVersion in +let tez_kernelVersion ?websocket evm_node = + let* response = + Evm_node.jsonrpc ?websocket evm_node Request.tez_kernelVersion + in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string) response -let tez_kernelRootHash evm_node = - let* response = Evm_node.call_evm_rpc evm_node Request.tez_kernelRootHash in +let tez_kernelRootHash ?websocket evm_node = + let* response = + Evm_node.jsonrpc ?websocket evm_node Request.tez_kernelRootHash + in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string_opt) response -let call ~to_ ~data ?(block = Latest) evm_node = +let call ?websocket ~to_ ~data ?(block = Latest) evm_node = let* response = - Evm_node.call_evm_rpc evm_node (Request.eth_call ~block ~to_ ~data) + Evm_node.jsonrpc ?websocket evm_node (Request.eth_call ~block ~to_ ~data) in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string) response -let get_balance ~address ?(block = Latest) evm_node = +let get_balance ?websocket ~address ?(block = Latest) evm_node = let* response = - Evm_node.call_evm_rpc evm_node (Request.get_balance ~address ~block) + Evm_node.jsonrpc ?websocket evm_node (Request.get_balance ~address ~block) in return @@ decode_or_error @@ -430,22 +459,28 @@ let get_balance ~address ?(block = Latest) evm_node = Evm_node.extract_result response |> JSON.as_string |> Wei.of_string) response -let get_storage_at ~address ?(block = Latest) ~pos evm_node = +let get_storage_at ?websocket ~address ?(block = Latest) ~pos evm_node = let* response = - Evm_node.call_evm_rpc evm_node (Request.get_storage_at ~address ~pos ~block) + Evm_node.jsonrpc + ?websocket + evm_node + (Request.get_storage_at ~address ~pos ~block) in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string) response -let get_max_priority_fee_per_gas evm_node = - let* json = Evm_node.call_evm_rpc evm_node Request.eth_maxPriorityFeePerGas in +let get_max_priority_fee_per_gas ?websocket evm_node = + let* json = + Evm_node.jsonrpc ?websocket evm_node Request.eth_maxPriorityFeePerGas + in return JSON.(json |-> "result" |> as_int32) -let replay_block blockNumber evm_node = +let replay_block ?websocket blockNumber evm_node = let* response = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket ~private_:true evm_node (Request.replayBlock blockNumber) @@ -457,8 +492,8 @@ let replay_block blockNumber evm_node = type txpool_slot = {address : string; transactions : (int64 * JSON.t) list} -let txpool_content evm_node = - let* response = Evm_node.call_evm_rpc evm_node Request.txpool_content in +let txpool_content ?websocket evm_node = + let* response = Evm_node.jsonrpc ?websocket evm_node Request.txpool_content in let parse txpool field = let open JSON in let pool = txpool |-> field in @@ -483,18 +518,21 @@ let txpool_content evm_node = (parse txpool "pending", parse txpool "queued")) response -let trace_transaction ~transaction_hash ?tracer ?tracer_config evm_node = +let trace_transaction ?websocket ~transaction_hash ?tracer ?tracer_config + evm_node = let* response = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.trace_transaction ~transaction_hash ?tracer ?tracer_config ()) in return @@ decode_or_error (fun response -> Evm_node.extract_result response) response -let trace_call ~block ~to_ ~data ?tracer ?tracer_config evm_node = +let trace_call ?websocket ~block ~to_ ~data ?tracer ?tracer_config evm_node = let* response = - Evm_node.call_evm_rpc + Evm_node.jsonrpc + ?websocket evm_node (Request.trace_call ~block ~to_ ~data ?tracer ?tracer_config ()) in @@ -507,10 +545,12 @@ type fee_history = { gas_used_ratio : float list; } -let fee_history block_count newest_block evm_node = +let fee_history ?websocket block_count newest_block evm_node = let* response = - Evm_node.( - call_evm_rpc evm_node (Request.eth_feeHistory ~block_count ~newest_block)) + Evm_node.jsonrpc + ?websocket + evm_node + (Request.eth_feeHistory ~block_count ~newest_block) in let decode_fee_history_result response = @@ -531,8 +571,8 @@ let fee_history block_count newest_block evm_node = return @@ decode_or_error decode_fee_history_result response -let coinbase evm_node = - let* response = Evm_node.call_evm_rpc evm_node Request.coinbase in +let coinbase ?websocket evm_node = + let* response = Evm_node.jsonrpc ?websocket evm_node Request.coinbase in return @@ decode_or_error (fun response -> Evm_node.extract_result response |> JSON.as_string) diff --git a/etherlink/tezt/lib/rpc.mli b/etherlink/tezt/lib/rpc.mli index 91db7d4e0e553552d2da02e5977b826cc34ca780..740de96cc1a9b83789f6c1f517d786930ece9532 100644 --- a/etherlink/tezt/lib/rpc.mli +++ b/etherlink/tezt/lib/rpc.mli @@ -61,30 +61,46 @@ module Request : sig val coinbase : Evm_node.request end +(** {2 RPC calls wrappers} + + + Calls below are made using an HTTP request to the EVM node by default unless + a websocket is provided, in which case the communication happens on it. +*) + (** [net_version evm_node] calls [net_version]. *) -val net_version : Evm_node.t -> (string, error) result Lwt.t +val net_version : + ?websocket:Websocket.t -> Evm_node.t -> (string, error) result Lwt.t (** [get_transaction_by_hash ~transaction_hash evm_node] calls [eth_getTransactionByHash]. *) val get_transaction_by_hash : + ?websocket:Websocket.t -> transaction_hash:string -> Evm_node.t -> (Transaction.transaction_object option, error) result Lwt.t (** [get_code ~address evm_node] calls [eth_getCode]. *) -val get_code : address:string -> Evm_node.t -> (string, error) result Lwt.t +val get_code : + ?websocket:Websocket.t -> + address:string -> + Evm_node.t -> + (string, error) result Lwt.t (** [block_number evm_node] calls [eth_blockNumber]. *) -val block_number : Evm_node.t -> (int32, error) result Lwt.t +val block_number : + ?websocket:Websocket.t -> Evm_node.t -> (int32, error) result Lwt.t (** [block_number_opt evm_node] calls [eth_blockNumber]. allows None when no block have been produced yet. *) -val block_number_opt : Evm_node.t -> (int32 option, error) result Lwt.t +val block_number_opt : + ?websocket:Websocket.t -> Evm_node.t -> (int32 option, error) result Lwt.t (** [get_block_by_number ?full_tx_objets ~block evm_node] calls [eth_getBlockByNumber]. [full_tx_objects] is false by default, so the block contains the transaction hashes. [block] can be ["latest"] or its number. *) val get_block_by_number : + ?websocket:Websocket.t -> ?full_tx_objects:bool -> block:string -> Evm_node.t -> @@ -92,12 +108,13 @@ val get_block_by_number : (** Same as {!get_block_by_number} but uses the hash instead of the number. *) val get_block_by_hash : + ?websocket:Websocket.t -> ?full_tx_objects:bool -> block:string -> Evm_node.t -> (Block.t, error) result Lwt.t -val get_gas_price : Evm_node.t -> Int32.t Lwt.t +val get_gas_price : ?websocket:Websocket.t -> Evm_node.t -> Int32.t Lwt.t module Syntax : sig val ( let*@ ) : ('a, error) result Lwt.t -> ('a -> 'c Lwt.t) -> 'c Lwt.t @@ -112,6 +129,7 @@ end calls the private RPC [produceBlock]. If provided the block will have timestamp [timestamp] (in RFC3339) format. *) val produce_block : + ?websocket:Websocket.t -> ?with_delayed_transactions:bool -> ?timestamp:string -> Evm_node.t -> @@ -120,23 +138,37 @@ val produce_block : (** [produce_proposal ?timestamp evm_node] calls the private RPC [produceProposal]. If provided the block will have timestamp [timestamp] (in RFC3339) format. *) val produce_proposal : - ?timestamp:string -> Evm_node.t -> (unit, error) result Lwt.t + ?websocket:Websocket.t -> + ?timestamp:string -> + Evm_node.t -> + (unit, error) result Lwt.t (** [state_value evm_node path] calls the private RPC [stateValue]. *) -val state_value : Evm_node.t -> string -> (string option, error) result Lwt.t +val state_value : + ?websocket:Websocket.t -> + Evm_node.t -> + string -> + (string option, error) result Lwt.t (** [state_subkeys evm_node path] calls the private RPC [stateSubkeys]. *) val state_subkeys : - Evm_node.t -> string -> (string list option, error) result Lwt.t + ?websocket:Websocket.t -> + Evm_node.t -> + string -> + (string list option, error) result Lwt.t (** [send_raw_transaction ~raw_tx evm_node] calls [eth_sendRawTransaction] with [raw_tx] as argument. *) val send_raw_transaction : - raw_tx:string -> Evm_node.t -> (string, error) result Lwt.t + ?websocket:Websocket.t -> + raw_tx:string -> + Evm_node.t -> + (string, error) result Lwt.t (** [get_transaction_receipt ~tx_hash evm_node] calls [eth_getTransactionReceipt] with [tx_hash] as argument. *) val get_transaction_receipt : + ?websocket:Websocket.t -> tx_hash:string -> Evm_node.t -> (Transaction.transaction_receipt option, error) result Lwt.t @@ -144,24 +176,34 @@ val get_transaction_receipt : (** [estimate_gas eth_call evm_node] calls [eth_estimateGas] with [eth_call] as payload. *) val estimate_gas : - (string * Ezjsonm.value) list -> Evm_node.t -> (int64, error) result Lwt.t + ?websocket:Websocket.t -> + (string * Ezjsonm.value) list -> + Evm_node.t -> + (int64, error) result Lwt.t (** [get_transaction_count ?block ~address evm_node] calls [eth_getTransactionCount] with [address] as argument (on [block], default to ["latest"] if omitted). *) val get_transaction_count : - ?block:string -> address:string -> Evm_node.t -> (int64, error) result Lwt.t + ?websocket:Websocket.t -> + ?block:string -> + address:string -> + Evm_node.t -> + (int64, error) result Lwt.t (** [tez_kernelVersion evm_node] calls [tez_kernelVersion]. Returns the kernel commit hash. *) -val tez_kernelVersion : Evm_node.t -> (string, error) result Lwt.t +val tez_kernelVersion : + ?websocket:Websocket.t -> Evm_node.t -> (string, error) result Lwt.t (** [tez_kernelRootHash evm_node] calls [tez_kernelRootHash]. Returns the kernel root hash. *) -val tez_kernelRootHash : Evm_node.t -> (string option, error) result Lwt.t +val tez_kernelRootHash : + ?websocket:Websocket.t -> Evm_node.t -> (string option, error) result Lwt.t (** [call ~to_ ~data ?block evm_node] call [eth_call] with [to] and [data] as argument. [block] defaults to [Latest]. *) val call : + ?websocket:Websocket.t -> to_:string -> data:string -> ?block:block_param -> @@ -171,6 +213,7 @@ val call : (** [get_balance ~address ?block evm_node] calls [eth_getBalance]. [block] defaults to [Latest].*) val get_balance : + ?websocket:Websocket.t -> address:string -> ?block:block_param -> Evm_node.t -> @@ -179,17 +222,20 @@ val get_balance : (** [get_balance ~address ?block ~pos evm_node] calls [eth_getStorageAt]. [block] defaults to [Latest]. *) val get_storage_at : + ?websocket:Websocket.t -> address:string -> ?block:block_param -> pos:string -> Evm_node.t -> (string, error) result Lwt.t -val get_max_priority_fee_per_gas : Evm_node.t -> Int32.t Lwt.t +val get_max_priority_fee_per_gas : + ?websocket:Websocket.t -> Evm_node.t -> Int32.t Lwt.t (** [replay_block number evm_node] replays the block [number] and returns its representation. *) -val replay_block : int -> Evm_node.t -> (Block.t, error) result Lwt.t +val replay_block : + ?websocket:Websocket.t -> int -> Evm_node.t -> (Block.t, error) result Lwt.t (** A slot in the transaction pool associates an address to a mapping of nonces to transactions. *) @@ -198,11 +244,14 @@ type txpool_slot = {address : string; transactions : (int64 * JSON.t) list} (** [txpool_content evm_node] returns the transaction hash and nonce contained in the `pending` and `queued` pools. *) val txpool_content : - Evm_node.t -> (txpool_slot list * txpool_slot list, error) result Lwt.t + ?websocket:Websocket.t -> + Evm_node.t -> + (txpool_slot list * txpool_slot list, error) result Lwt.t (** [trace_transaction ~transaction_hash evm_node] replays the given transaction in the same execution context. Doesn't return the trace for now. *) val trace_transaction : + ?websocket:Websocket.t -> transaction_hash:string -> ?tracer:string -> ?tracer_config:(string * JSON.u) list -> @@ -210,6 +259,7 @@ val trace_transaction : (JSON.t, error) result Lwt.t val trace_call : + ?websocket:Websocket.t -> block:block_param -> to_:string -> data:string -> @@ -226,10 +276,15 @@ type fee_history = { (** [fee_history block_count newest_block evm_node] calls [eth_feeHistory]. *) val fee_history : - string -> string -> Evm_node.t -> (fee_history, error) result Lwt.t + ?websocket:Websocket.t -> + string -> + string -> + Evm_node.t -> + (fee_history, error) result Lwt.t (** [coinbase] calls [eth_coinbase]. *) -val coinbase : Evm_node.t -> (string, error) result Lwt.t +val coinbase : + ?websocket:Websocket.t -> Evm_node.t -> (string, error) result Lwt.t (** Returns the EVM node configuration. *) val configuration : Evm_node.t -> JSON.t Lwt.t diff --git a/etherlink/tezt/lib/setup.ml b/etherlink/tezt/lib/setup.ml index 10d9629df9181bf2a881f22562f592c4730f6ac5..7adaadc2b39e6abe8c5043e63c44f30689c7623f 100644 --- a/etherlink/tezt/lib/setup.ml +++ b/etherlink/tezt/lib/setup.ml @@ -186,7 +186,7 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime ?preimages_dir ?maximum_allowed_ticks ?maximum_gas_per_transaction ?max_blueprint_lookahead_in_seconds ?enable_fa_bridge ?(threshold_encryption = false) ?(drop_duplicate_when_injection = true) - ?history_mode ~enable_dal ?dal_slots ?rpc_server protocol = + ?history_mode ~enable_dal ?dal_slots ?rpc_server ?websockets protocol = let* node, client = setup_l1 ?commitment_period @@ -326,6 +326,7 @@ let setup_sequencer ?max_delayed_inbox_blueprint_length ?next_wasm_runtime ?rpc_port:sequencer_rpc_port ~patch_config ~mode:sequencer_mode + ?websockets (Sc_rollup_node.endpoint sc_rollup_node) in let* observer = @@ -364,8 +365,8 @@ let register_test ~__FILE__ ?max_delayed_inbox_blueprint_length ?enable_fa_bridge ?commitment_period ?challenge_window ?(threshold_encryption = false) ?(uses = uses) ?(additional_uses = []) ?history_mode ~enable_dal - ?(dal_slots = if enable_dal then Some [0; 1; 2; 3] else None) body ~title - ~tags protocols = + ?(dal_slots = if enable_dal then Some [0; 1; 2; 3] else None) ?rpc_server + ?websockets body ~title ~tags protocols = let kernel_tag, kernel_use = Kernel.to_uses_and_tags kernel in let tags = kernel_tag :: tags in let additional_uses = @@ -383,9 +384,10 @@ let register_test ~__FILE__ ?max_delayed_inbox_blueprint_length block_storage_sqlite3 in let rpc_server = - match kernel with - | Mainnet | Ghostnet -> None (* default *) - | Latest -> Some Evm_node.Dream (* test with Dream for latest kernel *) + match (rpc_server, kernel) with + | Some _, _ -> rpc_server + | _, (Mainnet | Ghostnet) -> None (* default *) + | _, Latest -> Some Evm_node.Dream (* test with Dream for latest kernel *) in let body protocol = let* sequencer_setup = @@ -419,6 +421,7 @@ let register_test ~__FILE__ ?max_delayed_inbox_blueprint_length ?enable_fa_bridge ~threshold_encryption ?history_mode + ?websockets ~enable_dal ?dal_slots ?rpc_server @@ -463,7 +466,7 @@ let register_test_for_kernels ~__FILE__ ?max_delayed_inbox_blueprint_length ?maximum_allowed_ticks ?maximum_gas_per_transaction ?max_blueprint_lookahead_in_seconds ?enable_fa_bridge ?history_mode ?commitment_period ?challenge_window ?additional_uses ~threshold_encryption - ~enable_dal ?dal_slots ~title ~tags body protocols = + ~enable_dal ?dal_slots ?rpc_server ?websockets ~title ~tags body protocols = List.iter (fun kernel -> register_test @@ -495,6 +498,8 @@ let register_test_for_kernels ~__FILE__ ?max_delayed_inbox_blueprint_length ?max_blueprint_lookahead_in_seconds ?enable_fa_bridge ?additional_uses + ?rpc_server + ?websockets ~threshold_encryption ?history_mode ~enable_dal diff --git a/etherlink/tezt/lib/setup.mli b/etherlink/tezt/lib/setup.mli index 0c0eca819c5a51e29174c7d68e6b52ce9169253f..a355b035e7baefb12f25e7cbe64899d6eabf9062 100644 --- a/etherlink/tezt/lib/setup.mli +++ b/etherlink/tezt/lib/setup.mli @@ -87,6 +87,8 @@ val register_test : ?history_mode:Sc_rollup_node.history_mode -> enable_dal:bool -> ?dal_slots:int list option -> + ?rpc_server:Evm_node.rpc_server -> + ?websockets:bool -> (sequencer_setup -> Protocol.t -> unit Lwt.t) -> title:string -> tags:string list -> @@ -129,6 +131,8 @@ val register_test_for_kernels : threshold_encryption:bool -> enable_dal:bool -> ?dal_slots:int list option -> + ?rpc_server:Evm_node.rpc_server -> + ?websockets:bool -> title:string -> tags:string list -> (sequencer_setup -> Protocol.t -> unit Lwt.t) -> @@ -170,5 +174,6 @@ val setup_sequencer : enable_dal:bool -> ?dal_slots:int list -> ?rpc_server:Evm_node.rpc_server -> + ?websockets:bool -> Protocol.t -> sequencer_setup Lwt.t diff --git a/etherlink/tezt/tests/evm_rollup.ml b/etherlink/tezt/tests/evm_rollup.ml index 5144790478cf594ae8fe301665148924b72b738a..d15485a9a2494c5df2100c7faec4eb94e9ea8357 100644 --- a/etherlink/tezt/tests/evm_rollup.ml +++ b/etherlink/tezt/tests/evm_rollup.ml @@ -316,7 +316,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ?tx_pool_timeout_limit ?tx_pool_addr_limit ?tx_pool_tx_per_addr_limit ?max_number_of_chunks ?(setup_mode = Setup_proxy) ?(force_install_kernel = true) ?whitelist ?maximum_allowed_ticks - ?restricted_rpcs ?(enable_dal = false) ?dal_slots protocol = + ?restricted_rpcs ?(enable_dal = false) ?dal_slots ?websockets protocol = let _, kernel_installee = Kernel.to_uses_and_tags kernel in let* node, client = setup_l1 ?commitment_period ?challenge_window ?timestamp protocol @@ -445,6 +445,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ~patch_config ~mode ?restricted_rpcs + ?websockets (Sc_rollup_node.endpoint sc_rollup_node) in return @@ -492,6 +493,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ~patch_config ~mode:sequencer_mode ?restricted_rpcs + ?websockets (Sc_rollup_node.endpoint sc_rollup_node) in let produce_block () = Rpc.produce_block sequencer in @@ -501,6 +503,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) Evm_node.create ~data_dir:(Evm_node.data_dir sequencer) ~mode:(Rpc Evm_node.(mode sequencer)) + ?websockets (Evm_node.endpoint sequencer) in let* () = Evm_node.run evm_node in @@ -527,8 +530,8 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin ?(additional_uses = []) ?commitment_period ?challenge_window ?bootstrap_accounts ?whitelist ?da_fee_per_byte ?minimum_base_fee_per_gas ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs ~setup_mode - ~enable_dal ?(dal_slots = if enable_dal then Some [4] else None) f protocols - = + ~enable_dal ?(dal_slots = if enable_dal then Some [4] else None) ?websockets + f protocols = let extra_tag = match setup_mode with | Setup_proxy -> "proxy" @@ -578,6 +581,7 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin ~setup_mode ~enable_dal ?dal_slots + ?websockets protocol in f ~protocol ~evm_setup) @@ -587,7 +591,7 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin let register_proxy ~title ~tags ?kernels ?additional_uses ?additional_config ?admin ?commitment_period ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?whitelist ?rollup_operator_key - ?maximum_allowed_ticks ?restricted_rpcs f protocols = + ?maximum_allowed_ticks ?restricted_rpcs ?websockets f protocols = let register ~enable_dal : unit = register_test ~title @@ -605,6 +609,7 @@ let register_proxy ~title ~tags ?kernels ?additional_uses ?additional_config ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ~enable_dal @@ -618,7 +623,8 @@ let register_sequencer ?(return_sequencer = false) ~title ~tags ?kernels ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?time_between_blocks ?whitelist ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs - ?max_blueprints_ahead ?(block_storage_sqlite3 = false) f protocols = + ?max_blueprints_ahead ?(block_storage_sqlite3 = false) ?websockets f + protocols = let register ~enable_dal : unit = register_test ~title @@ -636,6 +642,7 @@ let register_sequencer ?(return_sequencer = false) ~title ~tags ?kernels ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ~enable_dal @@ -656,7 +663,8 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?admin ?commitment_period ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?time_between_blocks ?whitelist ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs - ?max_blueprints_ahead ?block_storage_sqlite3 f protocols : unit = + ?max_blueprints_ahead ?block_storage_sqlite3 ?websockets f protocols : unit + = register_proxy ~title ~tags @@ -673,6 +681,7 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ; register_sequencer @@ -694,6 +703,7 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?restricted_rpcs ?max_blueprints_ahead ?block_storage_sqlite3 + ?websockets f protocols @@ -883,10 +893,12 @@ let test_rpc_getBlockByNumber = ~error_msg:"Unexpected block number, should be %%R, but got %%L" ; unit -let get_block_by_hash ?(full_tx_objects = false) evm_setup block_hash = +let get_block_by_hash ?websocket ?(full_tx_objects = false) evm_setup block_hash + = let* block = Evm_node.( - call_evm_rpc + jsonrpc + ?websocket evm_setup.evm_node { method_ = "eth_getBlockByHash"; @@ -910,14 +922,7 @@ let test_rpc_getBlockByHash = assert (block = block') ; unit -let test_rpc_getBlockReceipts = - register_both - ~time_between_blocks:Nothing - ~bootstrap_accounts:Eth_account.lots_of_address - ~tags:["evm"; "rpc"; "get_block_receipts"] - ~title:"RPC method eth_getBlockReceipts" - ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx - @@ fun ~protocol:_ ~evm_setup:{evm_node; produce_block; _} -> +let test_rpc_getBlockReceipts_aux ?websocket {evm_node; produce_block; _} = let txs = read_tx_from_file () |> List.filteri (fun i _ -> i < 5) @@ -928,7 +933,8 @@ let test_rpc_getBlockReceipts = in let* receipts = Evm_node.( - call_evm_rpc + jsonrpc + ?websocket evm_node { method_ = "eth_getBlockReceipts"; @@ -951,6 +957,15 @@ let test_rpc_getBlockReceipts = assert (List.equal ( = ) txs expected_txs) ; unit +let test_rpc_getBlockReceipts = + register_both + ~time_between_blocks:Nothing + ~bootstrap_accounts:Eth_account.lots_of_address + ~tags:["evm"; "rpc"; "get_block_receipts"] + ~title:"RPC method eth_getBlockReceipts" + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + @@ fun ~protocol:_ ~evm_setup -> test_rpc_getBlockReceipts_aux evm_setup + let test_rpc_getBlockBy_return_base_fee_per_gas_and_mix_hash = register_both (* TODO: https://gitlab.com/tezos/tezos/-/issues/7285 @@ -1061,7 +1076,7 @@ let test_rpc_getTransactionCountBatch = ~block:"latest"; ] in - match JSON.as_list transaction_count with + match transaction_count with | [transaction_count] -> return JSON.(transaction_count |-> "result" |> as_int64) | _ -> Test.fail "Unexpected result from batching one request" @@ -1083,7 +1098,7 @@ let test_rpc_batch = let* results = Evm_node.batch_evm_rpc evm_node [transaction_count; chain_id] in - match JSON.as_list results with + match results with | [transaction_count; chain_id] -> return ( JSON.(transaction_count |-> "result" |> as_int64), @@ -1511,7 +1526,7 @@ let config_setup evm_setup = let* results = Evm_node.batch_evm_rpc evm_setup.evm_node [web3_clientVersion; chain_id] in - match JSON.as_list results with + match results with | [web3_clientVersion; chain_id] -> (* We don't need to return the web3_clientVersion because, it might change after the upgrade. diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 33e5d7ab1ceb464fee6c078776c67face3ed58b5..06f912596a1e084920e5fa488c54ed6f2e75090d 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -246,7 +246,7 @@ let register_all ?max_delayed_inbox_blueprint_length ?block_storage_sqlite3 ?minimum_base_fee_per_gas ?preimages_dir ?maximum_allowed_ticks ?maximum_gas_per_transaction ?max_blueprint_lookahead_in_seconds ?enable_fa_bridge ?history_mode ?commitment_period ?challenge_window - ?additional_uses + ?additional_uses ?rpc_server ?websockets ?(use_threshold_encryption = default_threshold_encryption_registration) ?(use_dal = default_dal_registration) ~title ~tags body protocols = let dal_cases = @@ -298,6 +298,8 @@ let register_all ?max_delayed_inbox_blueprint_length ?block_storage_sqlite3 ?max_blueprint_lookahead_in_seconds ?enable_fa_bridge ?additional_uses + ?rpc_server + ?websockets ~threshold_encryption ?history_mode ~enable_dal @@ -8264,6 +8266,80 @@ let test_produce_block_with_no_delayed_transactions = unit +let test_websocket_rpcs = + register_all + ~tags:["evm"; "rpc"; "websocket"] + ~title:"RPC methods over websocket" + ~time_between_blocks:Nothing + ~bootstrap_accounts: + ((Array.to_list Eth_account.bootstrap_accounts + |> List.map (fun a -> a.Eth_account.address)) + @ Eth_account.lots_of_address) + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + ~rpc_server:Dream (* Websockets only available in Dream *) + ~websockets:true + @@ fun {sequencer; _} _protocol -> + Log.info "Opening a websocket connection with the node" ; + let* websocket = Evm_node.open_websocket sequencer in + Log.info "getBalance" ; + let*@ balance = + Rpc.get_balance + ~websocket + ~address:Eth_account.bootstrap_accounts.(0).address + sequencer + in + Check.((balance = Helpers.default_bootstrap_account_balance) Wei.typ) + ~error_msg: + (sf + "Expected balance of %s should be %%R, but got %%L" + Eth_account.bootstrap_accounts.(0).address) ; + Log.info "getBlockByNumber" ; + let*@ block = Rpc.get_block_by_number ~websocket ~block:"0" sequencer in + Check.((block.number = 0l) int32) + ~error_msg:"Unexpected block number, should be %%R, but got %%L" ; + Log.info "getBlockByHash" ; + let* block' = + let* block = + Evm_node.( + jsonrpc + ~websocket + sequencer + { + method_ = "eth_getBlockByHash"; + parameters = `A [`String block.hash; `Bool false]; + }) + in + return @@ (block |> Evm_node.extract_result |> Block.of_json) + in + assert (block = block') ; + Log.info "blockNumber" ; + let* () = + repeat 2 (fun () -> + let*@ _ = produce_block sequencer in + unit) + in + let*@ block_number = Rpc.block_number ~websocket sequencer in + Check.((block_number = 2l) int32) + ~error_msg:"Expected a block number of %R, but got %L" ; + Log.info "getTransactionCount" ; + let*@ transaction_count = + Rpc.get_transaction_count + ~websocket + ~address:Eth_account.bootstrap_accounts.(0).address + sequencer + in + Check.((transaction_count = 0L) int64) + ~error_msg:"Expected a nonce of %R, but got %L" ; + Log.info "netVersion" ; + let*@ net_version = Rpc.net_version ~websocket sequencer in + Check.((net_version = "1337") string) + ~error_msg:"Expected net_version is %R, but got %L" ; + Log.info "coinbase" ; + let*@ coinbase = Rpc.coinbase ~websocket sequencer in + Check.((coinbase = "0x0000000000000000000000000000000000000000") string) + ~error_msg:"eth_coinbase returned %L, expected %R" ; + unit + let protocols = Protocol.all let () = @@ -8375,4 +8451,5 @@ let () = test_overwrite_simulation_tick_limit protocols ; test_inconsistent_da_fees protocols ; test_produce_block_with_no_delayed_transactions protocols ; - test_observer_reset [Protocol.Alpha] + test_observer_reset [Protocol.Alpha] ; + test_websocket_rpcs [Protocol.Alpha] diff --git a/images/ci/Dockerfile b/images/ci/Dockerfile index 01ef4984f8b8cb7ce6fed13802a94c3a2ace75a8..7cbc78b71cb35f2a27b224079d73fca671285b31 100644 --- a/images/ci/Dockerfile +++ b/images/ci/Dockerfile @@ -340,7 +340,7 @@ SHELL ["/bin/ash", "-euo", "pipefail", "-c"] # hadolint ignore=DL3018,DL3019 RUN apk update \ - && apk add --no-cache curl npm git file make jq \ + && apk add --no-cache curl npm git file make jq websocat \ gcc clang lld ca-certificates build-base musl-dev libusb-dev linux-headers \ # We need datadog-ci to send JUnit files to Datadog in tezt # jobs. Cf. [scripts/ci/tezt.sh]. diff --git a/tezt/lib_tezos/websocket.ml b/tezt/lib_tezos/websocket.ml new file mode 100644 index 0000000000000000000000000000000000000000..61ed1574276b65d9e08db56aecc70bdaea69587a --- /dev/null +++ b/tezt/lib_tezos/websocket.ml @@ -0,0 +1,90 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +type t = {process : Process.t; stdin : Lwt_io.output_channel} + +exception Could_not_connect + +let get_unique_name = + let name_counts = ref String_map.empty in + fun name -> + let index = + match String_map.find_opt name !name_counts with None -> 0 | Some i -> i + in + name_counts := String_map.add name (index + 1) !name_counts ; + name ^ "#" ^ string_of_int index + +let connect ?runner ?hooks ?name url = + let name = + match name with Some n -> n | None -> get_unique_name "websocket_client" + in + let url = Uri.with_scheme (Uri.of_string url) (Some "ws") |> Uri.to_string in + let process, stdin = + Process.spawn_with_stdin ~name ?runner ?hooks "websocat" [url] + in + let () = + try Unix.kill (Process.pid process) 0 + with _ -> + Log.error "%s could not connect to %s" name url ; + raise Could_not_connect + in + return {process; stdin} + +let send_msg {stdin; _} msg = + let* () = Lwt_io.write stdin (msg ^ "\n") in + unit + +let read_json ~origin {process; _} = + let max_size = 10 * 1024 * 1024 (* 10MB *) in + let ch = Process.stdout process in + let buff = Buffer.create 256 in + let rec loop () = + let* line = Lwt_io.read_line_opt ch in + match line with + | None -> failwith "No response on websocket" + | Some line -> ( + Buffer.add_string buff line ; + match JSON.parse_opt ~origin (Buffer.contents buff) with + | None when Buffer.length buff >= max_size -> + Format.ksprintf + failwith + "Could not parse JSON from websocket %d bytes." + max_size + | None -> loop () + | Some json -> return json) + in + loop () + +let close ws = + let* () = Lwt_io.close ws.stdin in + Process.terminate ws.process ; + unit + +let send = + let cpt = ref 0 in + fun ws json -> + incr cpt ; + let msg = JSON.unannotate json |> Ezjsonm.value_to_string ~minify:true in + Log.debug + ~color:Log.Color.bold + "%s(%d): > %s" + (Process.name ws.process) + !cpt + msg ; + send_msg ws msg + +let recv = + let cpt = ref 0 in + fun ws -> + incr cpt ; + let origin = Format.sprintf "%s(%d)" (Process.name ws.process) !cpt in + read_json ~origin ws + +let send_recv ws json = + let* () = send ws json in + recv ws diff --git a/tezt/lib_tezos/websocket.mli b/tezt/lib_tezos/websocket.mli new file mode 100644 index 0000000000000000000000000000000000000000..965ef2af00650bdd78981d1ba5d74c61a4db7189 --- /dev/null +++ b/tezt/lib_tezos/websocket.mli @@ -0,0 +1,33 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +exception Could_not_connect + +(** Type of a websocket client *) +type t + +(** [connect ?runner ?hook ?name url] connects to a websocket server and returns + the client. *) +val connect : + ?runner:Runner.t -> + ?hooks:Process_hooks.t -> + ?name:string -> + string -> + t Lwt.t + +(** Terminate the client. *) +val close : t -> unit Lwt.t + +(** Send a JSON object on the websocket. *) +val send : t -> JSON.t -> unit Lwt.t + +(** Receive a JSON object on the websocket. *) +val recv : t -> JSON.t Lwt.t + +(** Send and receive response on websocket. *) +val send_recv : t -> JSON.t -> JSON.t Lwt.t