From dd6ffc986a11ee784306fb6f47278e1125b8ebb9 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 5 Nov 2024 13:02:17 +0100 Subject: [PATCH 01/30] Build: add dream as a dependency to the EVM node --- cohttp/cohttp-lwt-unix/src/io.ml | 1 + etherlink/bin_node/lib_dev/dune | 1 + flake.lock | 6 +-- manifest/externals.ml | 10 ++-- manifest/product_etherlink.ml | 1 + opam/octez-evm-node-libs.opam | 1 + opam/octez-libs.opam | 4 +- opam/octez-node.opam | 2 +- opam/octez-teztale.opam | 6 +-- opam/virtual/octez-deps.opam | 9 ++-- opam/virtual/octez-deps.opam.locked | 83 ++++++++++++++++++----------- scripts/version.sh | 2 +- 12 files changed, 78 insertions(+), 48 deletions(-) diff --git a/cohttp/cohttp-lwt-unix/src/io.ml b/cohttp/cohttp-lwt-unix/src/io.ml index 24e2d9669436..1da4bcbda266 100644 --- a/cohttp/cohttp-lwt-unix/src/io.ml +++ b/cohttp/cohttp-lwt-unix/src/io.ml @@ -88,6 +88,7 @@ let wait_eof_or_closed conn ic sleep_fn = let wait_for_cancel () = fst (Lwt.task ()) in match (conn : Conduit_lwt_unix.flow) with | Vchan _ -> wait_for_cancel () + | Tunnel _ -> wait_for_cancel () | TCP {fd; _} | Domain_socket {fd; _} -> let peek_buffer = Bytes.create 1 in let has_recv_eof fd = diff --git a/etherlink/bin_node/lib_dev/dune b/etherlink/bin_node/lib_dev/dune index b7998b542f52..c56c0cbc2ff3 100644 --- a/etherlink/bin_node/lib_dev/dune +++ b/etherlink/bin_node/lib_dev/dune @@ -9,6 +9,7 @@ octez-libs.base octez-libs.rpc-http octez-libs.rpc-http-server + dream octez-libs.tezos-workers octez-libs.rpc-http-client-unix octez-version.value diff --git a/flake.lock b/flake.lock index 2345aea7653d..0183fddfbe45 100644 --- a/flake.lock +++ b/flake.lock @@ -122,11 +122,11 @@ "opam-repository": { "flake": false, "locked": { - "lastModified": 1730116182, - "narHash": "sha256-2ci69XsboLd3PKtCbSW/fuqYnt5hg3kg+sZTEzCkJfs=", + "lastModified": 1730731085, + "narHash": "sha256-RmibDeHeQCNWiJbnOKLbpbhb8BKuUpLdOPPlX6Kl220=", "owner": "ocaml", "repo": "opam-repository", - "rev": "924ed6223cc22c606b7d5e7bb9793eaa1519e708", + "rev": "8476aa70b71db29af398e70bb8a00298c1e64b86", "type": "github" }, "original": { diff --git a/manifest/externals.ml b/manifest/externals.ml index 05779d690a24..b7e7e6f9d461 100644 --- a/manifest/externals.ml +++ b/manifest/externals.ml @@ -77,9 +77,9 @@ let cmdliner = external_lib "cmdliner" V.(at_least "1.1.0") let cohttp = external_lib "cohttp" V.(at_least "5.3.1") -let conduit_lwt = external_lib "conduit-lwt" V.(exactly "6.2.2") +let conduit_lwt = external_lib "conduit-lwt" V.(at_least "7.1.0") -let conduit_lwt_unix = external_lib "conduit-lwt-unix" V.(exactly "6.2.2") +let conduit_lwt_unix = external_lib "conduit-lwt-unix" V.(at_least "7.1.0") let compiler_libs_common = external_lib "compiler-libs.common" V.True ~opam:"" @@ -103,6 +103,8 @@ let ctypes_stubs = external_sublib ctypes "ctypes.stubs" let digestif = external_lib "digestif" V.(at_least "0.9.0") +let dream = external_lib "dream" V.(at_least "1.0.0~alpha7") + let dune_configurator = external_lib "dune-configurator" V.True let dynlink = external_lib "dynlink" V.True ~opam:"" @@ -264,7 +266,7 @@ let tar_unix = external_lib "tar-unix" V.(at_least "2.0.1" && less_than "3.0.0") let tezos_sapling_parameters = opam_only ~can_vendor:false "tezos-sapling-parameters" V.(at_least "1.1.0") -let tls_lwt = external_lib "tls-lwt" V.(at_least "0.16.0") +let tls_lwt = external_lib "tls-lwt" V.(at_least "1.0.4") let trace = external_lib "trace" V.True @@ -308,7 +310,7 @@ let tezt_lib = let tezt_core_lib = external_sublib tezt_lib "tezt.core" ~main_module:"Tezt_core" -let x509 = external_lib "x509" V.(less_than "1.0.0") +let x509 = external_lib "x509" V.(at_least "1.0.0") let tezt_json_lib = external_sublib tezt_lib "tezt.json" ~main_module:"JSON" diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index c07be6eb3184..649cb0dfdd92 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -225,6 +225,7 @@ let evm_node_lib_dev = octez_base |> open_ ~m:"TzPervasives"; octez_rpc_http |> open_; octez_rpc_http_server; + dream; octez_workers |> open_; octez_rpc_http_client_unix; octez_version_value; diff --git a/opam/octez-evm-node-libs.opam b/opam/octez-evm-node-libs.opam index 024f6e150d85..231369ff8c87 100644 --- a/opam/octez-evm-node-libs.opam +++ b/opam/octez-evm-node-libs.opam @@ -18,6 +18,7 @@ depends: [ "crunch" { >= "3.3.0" } "re" { >= "1.10.0" } "octez-smart-rollup-wasm-debugger-plugin" + "dream" { >= "1.0.0~alpha7" } "octez-version" "lwt-watcher" { = "0.2" } "lwt-exit" diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index 57b46dc4c008..fe265f0765aa 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -25,9 +25,9 @@ depends: [ "cohttp" { >= "5.3.1" } "logs" "fmt" { >= "0.8.7" } - "conduit-lwt" { = "6.2.2" } + "conduit-lwt" { >= "7.1.0" } "magic-mime" { >= "1.3.1" } - "conduit-lwt-unix" { = "6.2.2" } + "conduit-lwt-unix" { >= "7.1.0" } "lwt_ppx" "mtime" { >= "2.0.0" } "opentelemetry" diff --git a/opam/octez-node.opam b/opam/octez-node.opam index ed1216c18efb..ebefddbbbace 100644 --- a/opam/octez-node.opam +++ b/opam/octez-node.opam @@ -19,7 +19,7 @@ depends: [ "octez-rpc-process" { = version } "cmdliner" { >= "1.1.0" } "fmt" { >= "0.8.7" } - "tls-lwt" { >= "0.16.0" } + "tls-lwt" { >= "1.0.4" } "lwt-exit" "uri" { >= "3.1.0" } "tezos-protocol-000-Ps9mPmXa" { = version } diff --git a/opam/octez-teztale.opam b/opam/octez-teztale.opam index 2a245ccac67e..789a9bf23d0c 100644 --- a/opam/octez-teztale.opam +++ b/opam/octez-teztale.opam @@ -23,9 +23,9 @@ depends: [ "cmdliner" { >= "1.1.0" } "octez-rust-deps" "octez-shell-libs" - "tls-lwt" { >= "0.16.0" } - "conduit-lwt-unix" { = "6.2.2" } - "x509" { < "1.0.0" } + "tls-lwt" { >= "1.0.4" } + "conduit-lwt-unix" { >= "7.1.0" } + "x509" { >= "1.0.0" } ] depopts: [ "octez-protocol-001-PtCJ7pwo-libs" diff --git a/opam/virtual/octez-deps.opam b/opam/virtual/octez-deps.opam index be8f0f14f195..b739fd5bc9da 100644 --- a/opam/virtual/octez-deps.opam +++ b/opam/virtual/octez-deps.opam @@ -33,8 +33,8 @@ depends: [ "class_group_vdf" { >= "0.0.4" } "cmdliner" { >= "1.1.0" } "cohttp" { >= "5.3.1" } - "conduit-lwt" { = "6.2.2" } - "conduit-lwt-unix" { = "6.2.2" } + "conduit-lwt" { >= "7.1.0" } + "conduit-lwt-unix" { >= "7.1.0" } "conf-libev" "conf-rust" "crowbar" { >= "0.2" } @@ -43,6 +43,7 @@ depends: [ "ctypes-foreign" { >= "0.18.0" } "digestif" { >= "0.9.0" } "dmap" + "dream" { >= "1.0.0~alpha7" } "dune-configurator" "eqaf" "ezgzip" @@ -108,11 +109,11 @@ depends: [ "tezos-sapling-parameters" { >= "1.1.0" } "tezt" { >= "4.1.0" & < "5.0.0" } "tezt-bam" - "tls-lwt" { >= "0.16.0" } + "tls-lwt" { >= "1.0.4" } "uri" { >= "3.1.0" } "uutf" "vector" - "x509" { < "1.0.0" } + "x509" { >= "1.0.0" } "yaml" { >= "3.1.0" } "zarith" { >= "1.13" & < "1.14" } ] diff --git a/opam/virtual/octez-deps.opam.locked b/opam/virtual/octez-deps.opam.locked index f8725f3c2f85..0bad6086798c 100644 --- a/opam/virtual/octez-deps.opam.locked +++ b/opam/virtual/octez-deps.opam.locked @@ -1,6 +1,6 @@ opam-version: "2.0" name: "octez-deps" -version: "dev" +version: "~dev" synopsis: "Virtual package depending on Octez dependencies (profile: octez-deps)" description: @@ -19,10 +19,10 @@ depends: [ "ambient-context" {= "0.1.0"} "angstrom" {= "0.16.1"} "asetmap" {= "0.8.1"} - "asn1-combinators" {= "0.2.6"} + "asn1-combinators" {= "0.3.2"} "astring" {= "0.8.5"} - "bam" {= "0.2"} - "bam-ppx" {= "0.2"} + "bam" {= "0.3"} + "bam-ppx" {= "0.3"} "base" {= "v0.16.3"} "base-bigarray" {= "base"} "base-bytes" {= "base"} @@ -31,11 +31,12 @@ depends: [ "base64" {= "3.5.1"} "bheap" {= "2.0.0"} "bigarray-compat" {= "1.1.0"} + "bigarray-overlap" {= "0.2.1"} "bigstring" {= "0.3"} "bigstringaf" {= "0.10.0"} "bisect_ppx" {= "2.8.3"} "bos" {= "0.2.1"} - "ca-certs" {= "0.2.3"} + "ca-certs" {= "1.0.0"} "camlp-streams" {= "5.0.1"} "camlzip" {= "1.12"} "caqti" {= "2.1.2"} @@ -48,9 +49,9 @@ depends: [ "class_group_vdf" {= "0.0.5"} "cmdliner" {= "1.3.0"} "cohttp" {= "5.3.1"} - "conduit" {= "6.2.2"} - "conduit-lwt" {= "6.2.2"} - "conduit-lwt-unix" {= "6.2.2"} + "conduit" {= "7.1.0"} + "conduit-lwt" {= "7.1.0"} + "conduit-lwt-unix" {= "7.1.0"} "conf-autoconf" {= "0.2"} "conf-bash" {= "1"} "conf-cmake" {= "1"} @@ -60,6 +61,7 @@ depends: [ "conf-hidapi" {= "0"} "conf-libev" {= "4-12"} "conf-libffi" {= "2.0.0"} + "conf-libssl" {= "4"} "conf-pkg-config" {= "3"} "conf-postgresql" {= "1"} "conf-rust" {= "0.1"} @@ -78,28 +80,36 @@ depends: [ "digestif" {= "1.2.0"} "dmap" {= "0.5"} "domain-name" {= "0.4.0"} - "dune" {= "3.16.0"} - "dune-build-info" {= "3.16.0"} - "dune-configurator" {= "3.16.0"} - "dune-private-libs" {= "3.16.0"} - "dune-site" {= "3.16.0"} + "dream" {= "1.0.0~alpha7"} + "dream-httpaf" {= "1.0.0~alpha3"} + "dream-pure" {= "1.0.0~alpha2"} + "dune" {= "3.16.1"} + "dune-build-info" {= "3.16.1"} + "dune-configurator" {= "3.16.1"} + "dune-private-libs" {= "3.16.1"} + "dune-site" {= "3.16.1"} "duration" {= "0.2.1"} - "dyn" {= "3.16.0"} + "dyn" {= "3.16.1"} "either" {= "1.0.0"} - "eqaf" {= "0.9"} + "eqaf" {= "0.10"} "ezgzip" {= "0.2.3"} "ezjsonm" {= "1.3.0"} + "faraday" {= "0.8.2"} + "faraday-lwt" {= "0.8.2"} + "faraday-lwt-unix" {= "0.8.2"} "fix" {= "20230505"} "fmt" {= "0.9.0"} "fpath" {= "0.7.3"} "gmap" {= "0.3.0"} + "graphql" {= "0.14.0"} + "graphql-lwt" {= "0.14.0"} + "graphql_parser" {= "0.14.0"} "hacl-star" {= "0.7.2"} "hacl-star-raw" {= "0.7.2"} "hashcons" {= "1.4.0"} "hex" {= "1.5.0"} "hidapi" {= "1.2.1"} "hidapi-lwt" {= "1.2.1"} - "hkdf" {= "1.0.4"} "hmap" {= "0.8.1"} "index" {= "1.6.2"} "integers" {= "0.7.0"} @@ -109,20 +119,25 @@ depends: [ "jingoo" {= "1.5.0"} "jsonm" {= "1.0.2"} "jst-config" {= "v0.16.0"} + "kdf" {= "1.0.0"} + "ke" {= "0.6"} "lambda-term" {= "3.3.2"} + "lambdasoup" {= "1.1.1"} "ledgerwallet" {= "0.4.1"} "ledgerwallet-tezos" {= "0.4.1"} "logs" {= "0.7.0"} "lru" {= "0.3.1"} - "lwt" {= "5.7.0"} + "lwt" {= "5.8.0"} "lwt-canceler" {= "0.3"} "lwt-dllist" {= "1.0.1"} "lwt-exit" {= "1.0"} "lwt-watcher" {= "0.2"} - "lwt_ppx" {= "2.1.0"} + "lwt_ppx" {= "5.8.0"} "lwt_react" {= "1.2.0"} + "lwt_ssl" {= "1.2.0"} "macaddr" {= "5.6.0"} "magic-mime" {= "1.3.1"} + "markup" {= "1.0.3"} "memtrace" {= "0.2.3"} "menhir" {= "20240715"} "menhirCST" {= "20240715"} @@ -130,12 +145,15 @@ depends: [ "menhirSdk" {= "20240715"} "mew" {= "0.1.0"} "mew_vi" {= "0.5.0"} - "mirage-crypto" {= "0.11.3"} - "mirage-crypto-ec" {= "0.11.3"} - "mirage-crypto-pk" {= "0.11.3"} - "mirage-crypto-rng" {= "0.11.3"} - "mirage-crypto-rng-lwt" {= "0.11.3"} + "mirage-clock" {= "4.2.0"} + "mirage-crypto" {= "1.1.0"} + "mirage-crypto-ec" {= "1.1.0"} + "mirage-crypto-pk" {= "1.1.0"} + "mirage-crypto-rng" {= "1.1.0"} + "mirage-crypto-rng-lwt" {= "1.1.0"} "mtime" {= "2.1.0"} + "multipart_form" {= "0.6.0"} + "multipart_form-lwt" {= "0.6.0"} "num" {= "1.5-1"} "ocaml" {= "4.14.2"} "ocaml-base-compiler" {= "4.14.2"} @@ -152,12 +170,13 @@ depends: [ "ocp-indent" {= "1.8.1"} "ocp-ocamlres" {= "0.4"} "ocplib-endian" {= "1.2"} + "ohex" {= "0.2.0"} "opentelemetry" {= "0.10"} "optint" {= "0.3.0"} - "ordering" {= "3.16.0"} + "ordering" {= "3.16.1"} "parsexp" {= "v0.16.0"} - "pbkdf" {= "1.2.0"} "pbrt" {= "3.1.1"} + "pecu" {= "0.7"} "postgresql" {= "5.0.0"} "pp" {= "1.2.0"} "pprint" {= "20230830"} @@ -181,6 +200,7 @@ depends: [ "prbnmcn-basic-structures" {= "0.0.1"} "prbnmcn-linalg" {= "0.0.1"} "prbnmcn-stats" {= "0.0.6"} + "prettym" {= "0.0.3"} "pringo" {= "1.3"} "progress" {= "0.4.0"} "psq" {= "0.2.1"} @@ -204,33 +224,36 @@ depends: [ "sexplib" {= "v0.16.0"} "sexplib0" {= "v0.16.0"} "sqlite3" {= "5.2.0"} + "ssl" {= "0.7.0"} "stdcompat" {= "19"} "stdint" {= "0.7.2"} "stdio" {= "v0.16.0"} "stdlib-random" {= "1.2.0"} "stdlib-shims" {= "0.3.0"} - "stdune" {= "3.16.0"} + "stdune" {= "3.16.1"} "stringext" {= "1.6.0"} "tar" {= "2.6.0"} "tar-unix" {= "2.6.0"} "terminal" {= "0.4.0"} "tezos-sapling-parameters" {= "1.1.0"} "tezt" {= "4.1.0"} - "tezt-bam" {= "0.2"} + "tezt-bam" {= "0.3"} "time_now" {= "v0.16.0"} - "tls" {= "0.17.5"} - "tls-lwt" {= "0.17.5"} + "tls" {= "1.0.4"} + "tls-lwt" {= "1.0.4"} "topkg" {= "1.0.7"} "trie" {= "1.0.0"} "uchar" {= "0.0.2"} + "unstrctrd" {= "0.4"} "uri" {= "4.4.0"} "uri-sexp" {= "4.4.0"} "uucp" {= "16.0.0"} "uuseg" {= "16.0.0"} "uutf" {= "1.0.3"} "vector" {= "1.0.0"} - "x509" {= "0.16.5"} + "x509" {= "1.0.5"} "yaml" {= "3.2.0"} + "yojson" {= "2.2.2"} "zarith" {= "1.13"} "zed" {= "3.2.3"} ] diff --git a/scripts/version.sh b/scripts/version.sh index efe86dc25123..1133e2d1b8bf 100644 --- a/scripts/version.sh +++ b/scripts/version.sh @@ -24,7 +24,7 @@ export recommended_node_version=18.18.2 ## opam_repository is a commit hash of the public opam repository, i.e. ## https://github.com/ocaml/opam-repository -export opam_repository_tag=924ed6223cc22c606b7d5e7bb9793eaa1519e708 +export opam_repository_tag=8476aa70b71db29af398e70bb8a00298c1e64b86 # SHA-256 hashes of the DAL SRSs, as used in 'scripts/install_dal_trusted_setup.sh' to verify # integrity of downloaded SRS. -- GitLab From d9473369bf6ce42a7ea4ce5b061dac64d39ce29c Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Wed, 6 Nov 2024 21:22:02 +0100 Subject: [PATCH 02/30] EVM node: wrapper around Dream to define routes based on Resto services --- etherlink/bin_node/lib_dev/router.ml | 174 ++++++++++++++++++++++++++ etherlink/bin_node/lib_dev/router.mli | 79 ++++++++++++ 2 files changed, 253 insertions(+) create mode 100644 etherlink/bin_node/lib_dev/router.ml create mode 100644 etherlink/bin_node/lib_dev/router.mli diff --git a/etherlink/bin_node/lib_dev/router.ml b/etherlink/bin_node/lib_dev/router.ml new file mode 100644 index 000000000000..7bdb356bef3e --- /dev/null +++ b/etherlink/bin_node/lib_dev/router.ml @@ -0,0 +1,174 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +open Tezos_rpc + +type media = [`Json | `Octets] + +let default_media = `Json + +let application_octet_stream = "application/octet-stream" + +type accept_media = {media : [media | `Any]; q : float} + +let parse_media_header s = + match String.split_on_char ';' s with + | [] -> None + | accept :: q -> ( + try + let media = + match String.lowercase_ascii accept with + | "application/json" -> `Json + | "application/octet-stream" -> `Octets + | "application/*" | "*/*" | "*" -> `Any + | _ -> raise Not_found + in + let q = + match q with + | [] -> 1.0 + | q :: _ -> Scanf.sscanf q "q=%f" (fun f -> f) + in + Some {media; q} + with Not_found -> None) + +let media header_name request = + let medias = Dream.headers request header_name in + let medias = + List.fold_left + (fun acc h -> List.rev_append (String.split_on_char ',' h) acc) + [] + medias + |> List.rev + in + let parsed = + List.filter_map parse_media_header medias + |> List.stable_sort (fun a1 a2 -> Compare.Float.compare a2.q a1.q) + in + match parsed with + | [] | {media = `Any; _} :: _ -> default_media + | {media = #media as m; _} :: _ -> m + +let accept_media = media "accept" + +let content_media = media "content-type" + +let encode media encoding = + match media with + | `Json -> + fun v -> + Data_encoding.Json.construct encoding v |> Ezjsonm.value_to_string + | `Octets -> + Data_encoding.Binary.to_string_exn (Data_encoding.dynamic_size encoding) + +let decode media encoding = + match media with + | `Json -> + fun s -> + Ezjsonm.value_from_string s |> Data_encoding.Json.destruct encoding + | `Octets -> + Data_encoding.Binary.of_string_exn (Data_encoding.dynamic_size encoding) + +let content_header = function + | `Json -> Dream.application_json + | `Octets -> application_octet_stream + +let respond ?status ?code ?headers media v = + let open Lwt_syntax in + let* response = Dream.respond ?status ?code ?headers v in + Dream.set_header response "Content-type" (content_header media) ; + return response + +let make_gen_route : + type i. + string -> + ([< Resto.meth], 'a, 'b, 'c, i, 'd, 'e) Service.raw -> + (Dream.request -> i -> Dream.response Lwt.t) -> + Dream.route = + fun path service handler -> + let f = + match Service.meth service with + | `PUT -> Dream.put + | `GET -> Dream.get + | `DELETE -> Dream.delete + | `POST -> Dream.post + | `PATCH -> Dream.patch + in + let input_encoding = Service.input_encoding service in + f path @@ fun request -> + let open Lwt_syntax in + match input_encoding with + | No_input -> handler request () + | Input input_encoding -> + let* body = Dream.body request in + let media = content_media request in + let input = decode media input_encoding body in + handler request input + +let make_route path service handler = + make_gen_route path service @@ fun request input -> + let open Lwt_syntax in + let output_encoding = Service.output_encoding service in + let* output = handler request input in + let media = accept_media request in + respond media (encode media output_encoding output) + +let make_tz_route path service handler = + make_gen_route path service @@ fun request input -> + let open Lwt_syntax in + let* output = handler request input in + let media = accept_media request in + match output with + | Ok output -> + let output_encoding = Service.output_encoding service in + respond media (encode media output_encoding output) + | Error e -> + respond + ~status:`Internal_Server_Error + media + (encode media trace_encoding e) + +let make_opt_tz_route path service handler = + make_gen_route path service @@ fun request input -> + let open Lwt_syntax in + let* output = handler request input in + let media = accept_media request in + match output with + | Ok (Some output) -> + let output_encoding = Service.output_encoding service in + respond media (encode media output_encoding output) + | Ok None -> + respond + ~status:`Not_Found + media + (encode + media + Data_encoding.string + (Dream.target request ^ " not found on server")) + | Error e -> + respond + ~status:`Internal_Server_Error + media + (encode media trace_encoding e) + +let make_stream_route path service handler = + make_gen_route path service @@ fun request input -> + let open Lwt_syntax in + let output_encoding = Service.output_encoding service in + let media = accept_media request in + Dream.stream ~headers:[("Content-Type", content_header media)] + @@ fun response_stream -> + let* stream, shutdown = handler request input in + let* () = + Lwt_stream.iter_s + (fun output -> + let chunk = encode media output_encoding output in + Dream.write response_stream chunk) + stream + in + shutdown () ; + return_unit diff --git a/etherlink/bin_node/lib_dev/router.mli b/etherlink/bin_node/lib_dev/router.mli new file mode 100644 index 000000000000..fe5345ff0b9e --- /dev/null +++ b/etherlink/bin_node/lib_dev/router.mli @@ -0,0 +1,79 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** {1 Helper functions to build {!Dream} routes from {!Resto} services. *) + +(** [make_gen_route path service handler] builds a route by parsing the request + and body, from a generic handler that constructs the response. *) +val make_gen_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + (Dream.request -> 'input -> Dream.response Lwt.t) -> + Dream.route + +(** [make_gen_route path service handler] builds a route from a handler that returns an output. *) +val make_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + (Dream.request -> 'input -> 'output Lwt.t) -> + Dream.route + +(** [make_tz_route path service handler] builds a route from a handler that returns an output or an error. *) +val make_tz_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + (Dream.request -> 'input -> 'output tzresult Lwt.t) -> + Dream.route + +(** [make_opt_tz_route path service handler] builds a route from a handler that + returns an optional output or an error. If [handler] returns [None] the + server answers with a 404 Not_Found response. *) +val make_opt_tz_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + (Dream.request -> 'input -> 'output option tzresult Lwt.t) -> + Dream.route + +(** [make_stream_route path service handler] builds a route which streams the + response from a handler that constructs an {!Lwt_stream.t}. The output + stream is streamed as chunks in the response body.. *) +val make_stream_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + (Dream.request -> 'input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> + Dream.route -- GitLab From b22b0dff12c210a034ad10a766f5810d2dc59853 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Wed, 6 Nov 2024 21:23:31 +0100 Subject: [PATCH 03/30] EVM node: RPC server which uses Dream/httpun instead of Resto/Cohttp --- etherlink/bin_node/lib_dev/evm_services.ml | 63 +++++- etherlink/bin_node/lib_dev/evm_services.mli | 10 + etherlink/bin_node/lib_dev/rpc_server.ml | 73 +++++++ etherlink/bin_node/lib_dev/rpc_server.mli | 16 ++ etherlink/bin_node/lib_dev/services.ml | 211 ++++++++++++-------- 5 files changed, 282 insertions(+), 91 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_services.ml b/etherlink/bin_node/lib_dev/evm_services.ml index 6e22600d8bf6..bc03f1a5dc47 100644 --- a/etherlink/bin_node/lib_dev/evm_services.ml +++ b/etherlink/bin_node/lib_dev/evm_services.ml @@ -16,7 +16,7 @@ let get_smart_rollup_address_service = ~output:Tezos_crypto.Hashed.Smart_rollup_address.encoding Path.(evm_services_root / "smart_rollup_address") -let get_time_between_blocks = +let get_time_between_blocks_service = Service.get_service ~description:"Get the maximum time between two blocks" ~query:Query.empty @@ -41,8 +41,7 @@ let blueprint_watcher_service = ~output:Blueprint_types.with_events_encoding Path.(evm_services_root / "blueprints") -let create_blueprint_watcher_service get_next_blueprint_number find_blueprint - from_level = +let blueprint_watcher get_next_blueprint_number find_blueprint from_level = let open Lwt_syntax in let blueprint_stream, stopper = Blueprints_watcher.create_stream () in let shutdown () = Lwt_watcher.shutdown stopper in @@ -70,6 +69,14 @@ let create_blueprint_watcher_service get_next_blueprint_number find_blueprint "Something went wrong when trying to fetch a blueprint") else Lwt_stream.get blueprint_stream in + return (next, shutdown) + +let create_blueprint_watcher_service get_next_blueprint_number find_blueprint + from_level = + let open Lwt_syntax in + let* next, shutdown = + blueprint_watcher get_next_blueprint_number find_blueprint from_level + in Tezos_rpc.Answer.return_stream {next; shutdown} let register_get_smart_rollup_address_service smart_rollup_address dir = @@ -78,7 +85,7 @@ let register_get_smart_rollup_address_service smart_rollup_address dir = return_ok smart_rollup_address) let register_get_time_between_block_service time_between_block dir = - Directory.register0 dir get_time_between_blocks (fun () () -> + Directory.register0 dir get_time_between_blocks_service (fun () () -> let open Lwt_result_syntax in return time_between_block) @@ -119,7 +126,7 @@ let get_time_between_blocks ?fallback ~evm_node_endpoint () = Tezos_rpc_http_client_unix.RPC_client_unix.call_service [Media_type.octet_stream] ~base:evm_node_endpoint - get_time_between_blocks + get_time_between_blocks_service () () () @@ -157,3 +164,49 @@ let monitor_blueprints ~evm_node_endpoint Ethereum_types.(Qty level) = () in return stream + +(** Routes for use in a {!Dream} server. *) +module Dream = struct + let get_smart_rollup_address_route smart_rollup_address = + Router.make_route "/smart_rollup_address" get_smart_rollup_address_service + @@ fun _request () -> Lwt.return smart_rollup_address + + let get_time_between_block_route time_between_block = + Router.make_route "/time_between_blocks" get_time_between_blocks_service + @@ fun _request () -> Lwt.return time_between_block + + let get_blueprint_service_route find_blueprint = + Router.make_opt_tz_route "/blueprint/:level" get_blueprint_service + @@ fun request () -> + let level = Dream.param request "level" in + let number = Ethereum_types.Qty (Z.of_string level) in + find_blueprint number + + let blueprint_watcher_route find_blueprint get_next_blueprint_number = + Router.make_stream_route "/blueprints" blueprint_watcher_service + @@ fun request () -> + let open Lwt_syntax in + let from_level = + Dream.query request "from_level" + |> Option.map Int64.of_string |> Option.value ~default:0L + in + let* next, stopper = + blueprint_watcher get_next_blueprint_number find_blueprint from_level + in + let stream = Lwt_stream.from next in + return (stream, stopper) + + let routes get_next_blueprint_number find_blueprint smart_rollup_address + time_between_blocks = + [ + Dream.scope + "/evm" + [] + [ + get_smart_rollup_address_route smart_rollup_address; + get_time_between_block_route time_between_blocks; + get_blueprint_service_route find_blueprint; + blueprint_watcher_route find_blueprint get_next_blueprint_number; + ]; + ] +end diff --git a/etherlink/bin_node/lib_dev/evm_services.mli b/etherlink/bin_node/lib_dev/evm_services.mli index e774b0a77c6b..1f0c08ef3334 100644 --- a/etherlink/bin_node/lib_dev/evm_services.mli +++ b/etherlink/bin_node/lib_dev/evm_services.mli @@ -35,3 +35,13 @@ val monitor_blueprints : evm_node_endpoint:Uri.t -> Ethereum_types.quantity -> Blueprint_types.with_events Lwt_stream.t tzresult Lwt.t + +module Dream : sig + val routes : + (unit -> Ethereum_types.quantity Lwt.t) -> + (Ethereum_types.quantity -> + Blueprint_types.with_events option tzresult Lwt.t) -> + Tezos_crypto.Hashed.Smart_rollup_address.t -> + Configuration.time_between_blocks -> + Dream.route list +end diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index d70839c65413..8d850bd043b4 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -125,3 +125,76 @@ let start_private_server ?(block_production = `Disabled) config ctxt = in return finalizer | None -> return (fun () -> Lwt_syntax.return_unit) + +module Dream = struct + let start_server rpc routes = + let open Lwt_result_syntax in + let Configuration.{port; addr; cors_origins = _; cors_headers = _; _} = + rpc + in + let stop, resolve_stop = Lwt.wait () in + let shutdown () = + Lwt.wakeup_later resolve_stop () ; + Lwt.return_unit + in + Lwt.dont_wait + (fun () -> + Dream.set_log_level "" `Debug ; + Dream.serve + ~interface:addr + ~port + ~stop + (Dream.logger (Dream.router routes))) + (fun exn -> + Format.eprintf "Dream server error: %s@." (Printexc.to_string exn)) ; + return shutdown + + let start_public_server ?delegate_health_check_to ?evm_services + (config : Configuration.t) ctxt = + let open Lwt_result_syntax in + let evm_services_routes = + match evm_services with + | None -> [] + | Some impl -> + Evm_services.Dream.routes + impl.next_blueprint_number + impl.find_blueprint + impl.smart_rollup_address + impl.time_between_blocks + in + let routes = + Services.Dream.public_routes + ?delegate_health_check_to + config.public_rpc + config + ctxt + @ evm_services_routes + in + let* finalizer = start_server config.public_rpc routes in + let*! () = + Events.is_ready + ~rpc_addr:config.public_rpc.addr + ~rpc_port:config.public_rpc.port + in + return finalizer + + let start_private_server ?(block_production = `Disabled) config ctxt = + let open Lwt_result_syntax in + match config.Configuration.private_rpc with + | Some private_rpc -> + let routes = + Services.Dream.private_routes + private_rpc + ~block_production + config + ctxt + in + let* finalizer = start_server private_rpc routes in + let*! () = + Events.private_server_is_ready + ~rpc_addr:private_rpc.addr + ~rpc_port:private_rpc.port + in + return finalizer + | None -> return (fun () -> Lwt_syntax.return_unit) +end diff --git a/etherlink/bin_node/lib_dev/rpc_server.mli b/etherlink/bin_node/lib_dev/rpc_server.mli index b386dd5cda19..6fa1d61c6d38 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.mli +++ b/etherlink/bin_node/lib_dev/rpc_server.mli @@ -46,3 +46,19 @@ val start_public_server : Configuration.t -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t + +(** RPC server which uses Dream/httpun instead of Resto/Cohttp. *) +module Dream : sig + val start_public_server : + ?delegate_health_check_to:Uri.t -> + ?evm_services:evm_services_methods -> + Configuration.t -> + (module Services_backend_sig.S) * 'a -> + finalizer tzresult Lwt.t + + val start_private_server : + ?block_production:block_production -> + Configuration.t -> + (module Services_backend_sig.S) * 'a -> + finalizer tzresult Lwt.t +end diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 79619d153667..3d9a21d640cf 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -72,83 +72,77 @@ let client_version = Stdlib.Sys.os_type Stdlib.Sys.ocaml_version +let configuration_handler config = + let open Configuration in + (* Hide some parts of the configuration. *) + let hidden = "hidden" in + let kernel_execution = + Configuration.{config.kernel_execution with preimages = hidden} + in + let sequencer = + Option.map + (fun (sequencer_config : sequencer) -> + {sequencer_config with sequencer = Client_keys.sk_uri_of_string hidden}) + config.sequencer + in + let observer = + Option.map + (fun (observer : observer) -> + {observer with evm_node_endpoint = Uri.of_string hidden}) + config.observer + in + let proxy : proxy = + let evm_node_endpoint = + Option.map (fun _ -> Uri.of_string hidden) config.proxy.evm_node_endpoint + in + {config.proxy with evm_node_endpoint} + in + + let config = + { + config with + rollup_node_endpoint = Uri.of_string hidden; + kernel_execution; + sequencer; + threshold_encryption_sequencer = None; + proxy; + observer; + private_rpc = None; + } + in + + Data_encoding.Json.construct + ~include_default_fields:`Always + (Configuration.encoding hidden) + config + +let health_check_handler ?delegate_to () = + match delegate_to with + | None -> + let open Lwt_result_syntax in + let* () = fail_when (Metrics.is_bootstrapping ()) Node_is_bootstrapping in + return_unit + | Some evm_node_endpoint -> + Rollup_services.call_service + ~keep_alive:false + ~base:evm_node_endpoint + ~media_types:[Media_type.json] + health_check_service + () + () + () + let version dir = Directory.register0 dir version_service (fun () () -> Lwt.return_ok client_version) let configuration config dir = Directory.register0 dir configuration_service (fun () () -> - let open Configuration in - (* Hide some parts of the configuration. *) - let hidden = "hidden" in - let kernel_execution = - Configuration.{config.kernel_execution with preimages = hidden} - in - let sequencer = - Option.map - (fun (sequencer_config : sequencer) -> - { - sequencer_config with - sequencer = Client_keys.sk_uri_of_string hidden; - }) - config.sequencer - in - let observer = - Option.map - (fun (observer : observer) -> - {observer with evm_node_endpoint = Uri.of_string hidden}) - config.observer - in - let proxy : proxy = - let evm_node_endpoint = - Option.map - (fun _ -> Uri.of_string hidden) - config.proxy.evm_node_endpoint - in - {config.proxy with evm_node_endpoint} - in - - let config = - { - config with - rollup_node_endpoint = Uri.of_string hidden; - kernel_execution; - sequencer; - threshold_encryption_sequencer = None; - proxy; - observer; - private_rpc = None; - } - in - - Lwt.return_ok - (Data_encoding.Json.construct - ~include_default_fields:`Always - (Configuration.encoding hidden) - config)) + configuration_handler config |> Lwt.return_ok) let health_check ?delegate_to dir = - let handler = - match delegate_to with - | None -> - fun () () -> - let open Lwt_result_syntax in - let* () = - fail_when (Metrics.is_bootstrapping ()) Node_is_bootstrapping - in - return_unit - | Some evm_node_endpoint -> - fun () () -> - Rollup_services.call_service - ~keep_alive:false - ~base:evm_node_endpoint - ~media_types:[Media_type.json] - health_check_service - () - () - () - in - Directory.register0 dir health_check_service handler + Directory.register0 dir health_check_service (fun () () -> + health_check_handler ?delegate_to ()) (* The node can either take a single request or multiple requests at once. *) @@ -844,26 +838,30 @@ let can_process_batch size = function | Configuration.Limit l -> size <= l | Unlimited -> true +let dispatch_handler (rpc : Configuration.rpc) config ctx dispatch_request + (input : JSONRPC.request batched_request) = + let open Lwt_syntax in + match input with + | Singleton request -> + let* response = dispatch_request config ctx request in + return (Singleton response) + | Batch requests -> + let process = + if can_process_batch (List.length requests) rpc.batch_limit then + dispatch_request config ctx + else fun req -> + let value = + Error Rpc_errors.(invalid_request "too many requests in batch") + in + Lwt.return JSONRPC.{value; id = req.id} + in + let* outputs = List.map_s process requests in + return (Batch outputs) + let generic_dispatch (rpc : Configuration.rpc) config ctx dir path dispatch_request = Directory.register0 dir (dispatch_service ~path) (fun () input -> - let open Lwt_result_syntax in - match input with - | Singleton request -> - let*! response = dispatch_request config ctx request in - return (Singleton response) - | Batch requests -> - let process = - if can_process_batch (List.length requests) rpc.batch_limit then - dispatch_request config ctx - else fun req -> - let value = - Error Rpc_errors.(invalid_request "too many requests in batch") - in - Lwt.return JSONRPC.{value; id = req.id} - in - let*! outputs = List.map_s process requests in - return (Batch outputs)) + dispatch_handler rpc config ctx dispatch_request input |> Lwt_result.ok) let dispatch_public (rpc : Configuration.rpc) config ctx dir = generic_dispatch rpc config ctx dir Path.root (dispatch_request rpc) @@ -928,3 +926,44 @@ let call (type input output) (construct (JSONRPC.error_encoding Data_encoding.Json.encoding) err)) | Batch l -> failwith "request: unexpected number of responses (%d)" List.(length l) + +(** Routes for use in a {!Dream} server. *) +module Dream = struct + let version_route = + Router.make_route "/version" version_service @@ fun _request () -> + Lwt.return client_version + + let configuration_route config = + Router.make_route "/configuration" configuration_service + @@ fun _request () -> Lwt.return (configuration_handler config) + + let health_check_route ?delegate_to () = + Router.make_tz_route "/health_check" health_check_service + @@ fun _request () -> health_check_handler ?delegate_to () + + let jsonrpc_route path (rpc : Configuration.rpc) config ctx dispatch_request = + Router.make_route + path + (dispatch_service ~path:Path.root) + (fun _request data -> + dispatch_handler rpc config ctx dispatch_request data) + + let public_routes ?delegate_health_check_to rpc config ctx = + [ + version_route; + configuration_route config; + health_check_route ?delegate_to:delegate_health_check_to (); + jsonrpc_route "/" rpc config ctx (dispatch_request rpc); + ] + + let private_routes rpc ~block_production config ctx = + [ + version_route; + jsonrpc_route + "private" + rpc + config + ctx + (dispatch_private_request rpc ~block_production); + ] +end -- GitLab From a56bd976abe4445fe18121b0c0c321a5d0331381 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Wed, 6 Nov 2024 21:28:44 +0100 Subject: [PATCH 04/30] EVM node: select RPC server based on enable_websocket feature flag --- etherlink/bin_node/lib_dev/rpc_server.ml | 176 ++++++++++++---------- etherlink/bin_node/lib_dev/rpc_server.mli | 16 -- etherlink/bin_node/main.ml | 11 -- 3 files changed, 100 insertions(+), 103 deletions(-) diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index 8d850bd043b4..27e85efde6b9 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -44,87 +44,89 @@ let callback server dir = dir callback_log -let start_server rpc directory = - let open Lwt_result_syntax in - let open Tezos_rpc_http_server in - let Configuration. - {port; addr; cors_origins; cors_headers; max_active_connections; _} = - rpc - in +module Resto = struct + let start_server rpc directory = + let open Lwt_result_syntax in + let open Tezos_rpc_http_server in + let Configuration. + {port; addr; cors_origins; cors_headers; max_active_connections; _} = + rpc + in - let p2p_addr = P2p_addr.of_string_exn addr in - let host = Ipaddr.V6.to_string p2p_addr in - let node = `TCP (`Port port) in - let acl = RPC_server.Acl.allow_all in - let cors = - Resto_cohttp.Cors. - {allowed_headers = cors_headers; allowed_origins = cors_origins} - in - let server = - RPC_server.init_server - ~acl - ~cors - ~media_types:Supported_media_types.all - directory - in + let p2p_addr = P2p_addr.of_string_exn addr in + let host = Ipaddr.V6.to_string p2p_addr in + let node = `TCP (`Port port) in + let acl = RPC_server.Acl.allow_all in + let cors = + Resto_cohttp.Cors. + {allowed_headers = cors_headers; allowed_origins = cors_origins} + in + let server = + RPC_server.init_server + ~acl + ~cors + ~media_types:Supported_media_types.all + directory + in - let*! () = - RPC_server.launch - ~max_active_connections - ~host - server - ~callback:(callback server directory) - node - in + let*! () = + RPC_server.launch + ~max_active_connections + ~host + server + ~callback:(callback server directory) + node + in - let finalizer () = - let open Lwt_syntax in - let* () = Tezos_rpc_http_server.RPC_server.shutdown server in - return_unit - in + let finalizer () = + let open Lwt_syntax in + let* () = Tezos_rpc_http_server.RPC_server.shutdown server in + return_unit + in - return finalizer + return finalizer -let start_public_server ?delegate_health_check_to ?evm_services - (config : Configuration.t) ctxt = - let open Lwt_result_syntax in - let register_evm_services = - match evm_services with - | None -> Fun.id - | Some impl -> - Evm_services.register - impl.next_blueprint_number - impl.find_blueprint - impl.smart_rollup_address - impl.time_between_blocks - in - let directory = - Services.directory ?delegate_health_check_to config.public_rpc config ctxt - |> register_evm_services - in - let* finalizer = start_server config.public_rpc directory in - let*! () = - Events.is_ready - ~rpc_addr:config.public_rpc.addr - ~rpc_port:config.public_rpc.port - in - return finalizer - -let start_private_server ?(block_production = `Disabled) config ctxt = - let open Lwt_result_syntax in - match config.Configuration.private_rpc with - | Some private_rpc -> - let directory = - Services.private_directory private_rpc ~block_production config ctxt - in - let* finalizer = start_server private_rpc directory in - let*! () = - Events.private_server_is_ready - ~rpc_addr:private_rpc.addr - ~rpc_port:private_rpc.port - in - return finalizer - | None -> return (fun () -> Lwt_syntax.return_unit) + let start_public_server ?delegate_health_check_to ?evm_services + (config : Configuration.t) ctxt = + let open Lwt_result_syntax in + let register_evm_services = + match evm_services with + | None -> Fun.id + | Some impl -> + Evm_services.register + impl.next_blueprint_number + impl.find_blueprint + impl.smart_rollup_address + impl.time_between_blocks + in + let directory = + Services.directory ?delegate_health_check_to config.public_rpc config ctxt + |> register_evm_services + in + let* finalizer = start_server config.public_rpc directory in + let*! () = + Events.is_ready + ~rpc_addr:config.public_rpc.addr + ~rpc_port:config.public_rpc.port + in + return finalizer + + let start_private_server ?(block_production = `Disabled) config ctxt = + let open Lwt_result_syntax in + match config.Configuration.private_rpc with + | Some private_rpc -> + let directory = + Services.private_directory private_rpc ~block_production config ctxt + in + let* finalizer = start_server private_rpc directory in + let*! () = + Events.private_server_is_ready + ~rpc_addr:private_rpc.addr + ~rpc_port:private_rpc.port + in + return finalizer + | None -> return (fun () -> Lwt_syntax.return_unit) +end module Dream = struct let start_server rpc routes = @@ -198,3 +200,25 @@ module Dream = struct return finalizer | None -> return (fun () -> Lwt_syntax.return_unit) end + +let start_public_server ?delegate_health_check_to ?evm_services + (config : Configuration.t) ctxt = + if config.experimental_features.enable_websocket then + (* Start Dream/httpun server when websocket support is requested *) + Dream.start_public_server + ?delegate_health_check_to + ?evm_services + config + ctxt + else + Resto.start_public_server + ?delegate_health_check_to + ?evm_services + config + ctxt + +let start_private_server ?block_production (config : Configuration.t) ctxt = + if config.experimental_features.enable_websocket then + (* Start Dream/httpun server when websocket support is requested *) + Dream.start_private_server ?block_production config ctxt + else Resto.start_private_server ?block_production config ctxt diff --git a/etherlink/bin_node/lib_dev/rpc_server.mli b/etherlink/bin_node/lib_dev/rpc_server.mli index 6fa1d61c6d38..b386dd5cda19 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.mli +++ b/etherlink/bin_node/lib_dev/rpc_server.mli @@ -46,19 +46,3 @@ val start_public_server : Configuration.t -> (module Services_backend_sig.S) * 'a -> finalizer tzresult Lwt.t - -(** RPC server which uses Dream/httpun instead of Resto/Cohttp. *) -module Dream : sig - val start_public_server : - ?delegate_health_check_to:Uri.t -> - ?evm_services:evm_services_methods -> - Configuration.t -> - (module Services_backend_sig.S) * 'a -> - finalizer tzresult Lwt.t - - val start_private_server : - ?block_production:block_production -> - Configuration.t -> - (module Services_backend_sig.S) * 'a -> - finalizer tzresult Lwt.t -end diff --git a/etherlink/bin_node/main.ml b/etherlink/bin_node/main.ml index af2338deeb91..73f19c486625 100644 --- a/etherlink/bin_node/main.ml +++ b/etherlink/bin_node/main.ml @@ -678,13 +678,6 @@ let snapshot_file_arg = is based on the snapshot information." Params.string -(* TODO: https://gitlab.com/tezos/tezos/-/issues/7591 - Remove this whenever we have an experimental websocket server ready to - be tested. *) -let fail_if_websocket_is_enabled ~config = - if config.experimental_features.enable_websocket then - Stdlib.failwith "The experimental websocket server is not implemented yet." - let start_proxy ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ?cors_origins ?cors_headers ?log_filter_max_nb_blocks ?log_filter_max_nb_logs ?log_filter_chunk_size ?rollup_node_endpoint @@ -715,7 +708,6 @@ let start_proxy ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ~verbose () in - fail_if_websocket_is_enabled ~config ; (* We patch [config] to take into account the proxy-specific argument [--read-only]. *) let config = @@ -823,7 +815,6 @@ let start_sequencer ?password_filename ~wallet_dir ~data_dir ?rpc_addr ?rpc_port ~finalized_view () in - fail_if_websocket_is_enabled ~config:configuration ; let*! () = let open Tezos_base_unix.Internal_event_unix in let config = @@ -905,7 +896,6 @@ let start_threshold_encryption_sequencer ?password_filename ~wallet_dir ~finalized_view () in - fail_if_websocket_is_enabled ~config:configuration ; let*! () = let open Tezos_base_unix.Internal_event_unix in let config = @@ -1100,7 +1090,6 @@ let start_observer ~data_dir ~keep_alive ?rpc_addr ?rpc_port ?rpc_batch_limit ~finalized_view () in - fail_if_websocket_is_enabled ~config ; let*! () = let open Tezos_base_unix.Internal_event_unix in let config = -- GitLab From 2a77546e8cf87ba77d42122e2939cd0b3ae70bee Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 7 Nov 2024 10:18:03 +0100 Subject: [PATCH 05/30] TMP: default to websocket server for tests --- etherlink/bin_node/config/configuration.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index 23c8751053e5..32ec26788609 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -138,7 +138,7 @@ let default_experimental_features = overwrite_simulation_tick_limit = false; garbage_collector = None; next_wasm_runtime = false; - enable_websocket = false; + enable_websocket = true; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".octez-evm-node" -- GitLab From f2768f2723fc93c9b5e2b9352e65bdf7783dca5a Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 7 Nov 2024 13:13:42 +0100 Subject: [PATCH 06/30] EVM node: helper to create routes which accept websocket connections --- etherlink/bin_node/lib_dev/router.ml | 50 ++++++++++++++++++++++++++- etherlink/bin_node/lib_dev/router.mli | 18 +++++++++- 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/router.ml b/etherlink/bin_node/lib_dev/router.ml index 7bdb356bef3e..86d5d9d0018a 100644 --- a/etherlink/bin_node/lib_dev/router.ml +++ b/etherlink/bin_node/lib_dev/router.ml @@ -86,7 +86,7 @@ let respond ?status ?code ?headers media v = let make_gen_route : type i. string -> - ([< Resto.meth], 'a, 'b, 'c, i, 'd, 'e) Service.raw -> + (_, _, _, _, i, _) Service.t -> (Dream.request -> i -> Dream.response Lwt.t) -> Dream.route = fun path service handler -> @@ -172,3 +172,51 @@ let make_stream_route path service handler = in shutdown () ; return_unit + +let make_websocket_route (type input output) path + (service : (_, _, _, _, input, output) Service.t) + (handler : input -> (output Lwt_stream.t * (unit -> unit)) Lwt.t) = + let open Lwt_syntax in + let output_encoding = Service.output_encoding service in + Dream.get path @@ fun request -> + let input_media = content_media request in + let output_media = accept_media request in + let text_or_binary = + match output_media with `Json -> `Text | `Octets -> `Binary + in + Dream.websocket @@ fun websocket -> + let write_stream (stream, shutdown) = + let* () = + Lwt_stream.iter_s + (fun output -> + let output = encode output_media output_encoding output in + Dream.send ~text_or_binary websocket output) + stream + in + shutdown () ; + return_unit + in + let async_write_stream (stream, shutdown) = + Lwt.dont_wait + (fun () -> write_stream (stream, shutdown)) + (fun exn -> + shutdown () ; + Dream.error @@ fun log -> + log "Websocket write exception: %s" (Printexc.to_string exn)) + in + match Service.input_encoding service with + | No_input -> + let* stream = handler () in + write_stream stream + | Input input_encoding -> + let rec loop () = + let* message = Dream.receive websocket in + match message with + | None -> Dream.close_websocket websocket + | Some message -> + let input = decode input_media input_encoding message in + let* stream = handler input in + async_write_stream stream ; + loop () + in + loop () diff --git a/etherlink/bin_node/lib_dev/router.mli b/etherlink/bin_node/lib_dev/router.mli index fe5345ff0b9e..294e0ead3784 100644 --- a/etherlink/bin_node/lib_dev/router.mli +++ b/etherlink/bin_node/lib_dev/router.mli @@ -65,7 +65,7 @@ val make_opt_tz_route : (** [make_stream_route path service handler] builds a route which streams the response from a handler that constructs an {!Lwt_stream.t}. The output - stream is streamed as chunks in the response body.. *) + stream is streamed as chunks in the response body. *) val make_stream_route : string -> ( [< Resto.meth], @@ -77,3 +77,19 @@ val make_stream_route : Tezos_rpc.Service.t -> (Dream.request -> 'input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> Dream.route + +(** [make_websocket_route path service handler] builds a route which accepts + websocket connections. The server potentially reads inputs from this + websocket and writes a stream of output in response. Multiple streams can be + written and interlaced in the websocket response. *) +val make_websocket_route : + string -> + ( [< Resto.meth], + 'prefix, + 'query, + 'params, + 'input, + 'output ) + Tezos_rpc.Service.t -> + ('input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> + Dream.route -- GitLab From b9888d5d34a35d253474ed47f8fda2f730ed0413 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 7 Nov 2024 13:34:51 +0100 Subject: [PATCH 07/30] EVM node: support websocket connections for JSONRPC API --- etherlink/bin_node/lib_dev/evm_ro_context.ml | 2 +- etherlink/bin_node/lib_dev/injector.ml | 2 +- etherlink/bin_node/lib_dev/services.ml | 44 +++++++++++++++----- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_ro_context.ml b/etherlink/bin_node/lib_dev/evm_ro_context.ml index 4049f75eea58..d0564cf56247 100644 --- a/etherlink/bin_node/lib_dev/evm_ro_context.ml +++ b/etherlink/bin_node/lib_dev/evm_ro_context.ml @@ -211,7 +211,7 @@ struct call_service ~keep_alive:Ctxt.keep_alive ~base:evm_node_endpoint - (Services.dispatch_service ~path:Resto.Path.root) + (Services.dispatch_batch_service ~path:Resto.Path.root) () () (Batch methods) diff --git a/etherlink/bin_node/lib_dev/injector.ml b/etherlink/bin_node/lib_dev/injector.ml index 6c22c87b536e..760bbd5bb0b9 100644 --- a/etherlink/bin_node/lib_dev/injector.ml +++ b/etherlink/bin_node/lib_dev/injector.ml @@ -26,7 +26,7 @@ let send_raw_transaction ~keep_alive ~base raw_txn = call_service ~keep_alive ~base - (Services.dispatch_service ~path:Resto.Path.root) + (Services.dispatch_batch_service ~path:Resto.Path.root) () () (Singleton (send_raw_transaction_method raw_txn)) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 3d9a21d640cf..86a363110240 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -148,7 +148,7 @@ let health_check ?delegate_to dir = once. *) type 'a batched_request = Singleton of 'a | Batch of 'a list -let request_encoding kind = +let batch_encoding kind = Data_encoding.( union [ @@ -169,8 +169,15 @@ let request_encoding kind = let dispatch_service ~path = Service.post_service ~query:Query.empty - ~input:(request_encoding JSONRPC.request_encoding) - ~output:(request_encoding JSONRPC.response_encoding) + ~input:JSONRPC.request_encoding + ~output:JSONRPC.response_encoding + path + +let dispatch_batch_service ~path = + Service.post_service + ~query:Query.empty + ~input:(batch_encoding JSONRPC.request_encoding) + ~output:(batch_encoding JSONRPC.response_encoding) path let get_block_by_number ~full_transaction_object block_param @@ -860,7 +867,7 @@ let dispatch_handler (rpc : Configuration.rpc) config ctx dispatch_request let generic_dispatch (rpc : Configuration.rpc) config ctx dir path dispatch_request = - Directory.register0 dir (dispatch_service ~path) (fun () input -> + Directory.register0 dir (dispatch_batch_service ~path) (fun () input -> dispatch_handler rpc config ctx dispatch_request input |> Lwt_result.ok) let dispatch_public (rpc : Configuration.rpc) config ctx dir = @@ -903,7 +910,7 @@ let call (type input output) Rollup_services.call_service ~keep_alive ~base:evm_node_endpoint - (dispatch_service ~path:Resto.Path.root) + (dispatch_batch_service ~path:Resto.Path.root) () () (Singleton @@ -944,26 +951,43 @@ module Dream = struct let jsonrpc_route path (rpc : Configuration.rpc) config ctx dispatch_request = Router.make_route path - (dispatch_service ~path:Path.root) + (dispatch_batch_service ~path:Path.root) (fun _request data -> - dispatch_handler rpc config ctx dispatch_request data) + dispatch_handler rpc config ctx (dispatch_request rpc) data) + + let jsonrpc_websocket_route path rpc config ctx dispatch_request = + Router.make_websocket_route + path + (dispatch_service ~path:Path.root) + (fun request -> + let stream = + dispatch_request rpc config ctx request |> Lwt_stream.return_lwt + in + Lwt.return (stream, fun () -> ())) let public_routes ?delegate_health_check_to rpc config ctx = [ version_route; configuration_route config; health_check_route ?delegate_to:delegate_health_check_to (); - jsonrpc_route "/" rpc config ctx (dispatch_request rpc); + jsonrpc_route "/" rpc config ctx dispatch_request; + jsonrpc_websocket_route "/ws" rpc config ctx dispatch_request; ] let private_routes rpc ~block_production config ctx = [ version_route; jsonrpc_route - "private" + "/private" + rpc + config + ctx + (dispatch_private_request ~block_production); + jsonrpc_websocket_route + "/private/ws" rpc config ctx - (dispatch_private_request rpc ~block_production); + (dispatch_private_request ~block_production); ] end -- GitLab From 00d5ba5644efdc082f98c6fc8a8f885009f98cb6 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 12 Nov 2024 17:58:22 +0100 Subject: [PATCH 08/30] Evm node: show websocket support in event --- etherlink/bin_node/lib_dev/events.ml | 23 ++++++++++++++++------- etherlink/bin_node/lib_dev/events.mli | 11 ++++++----- etherlink/bin_node/lib_dev/rpc_server.ml | 4 ++++ 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/etherlink/bin_node/lib_dev/events.ml b/etherlink/bin_node/lib_dev/events.ml index d68002a7b121..9b47920eaaf9 100644 --- a/etherlink/bin_node/lib_dev/events.ml +++ b/etherlink/bin_node/lib_dev/events.ml @@ -66,22 +66,30 @@ let catching_up_evm_event = ("to", Data_encoding.int32) let event_is_ready = - Internal_event.Simple.declare_2 + Internal_event.Simple.declare_3 ~section ~name:"is_ready" - ~msg:"the EVM node is listening to {addr}:{port}" + ~msg:"the EVM node is listening to {addr}:{port} {websockets}" ~level:Notice ("addr", Data_encoding.string) ("port", Data_encoding.uint16) + ("websockets", Data_encoding.bool) + ~pp3:(fun fmt b -> + (if b then Format.fprintf else Format.ifprintf) fmt "(websockets support)") let event_private_server_is_ready = - declare_2 + declare_3 ~section ~name:"private_server_is_ready" - ~msg:"the EVM node private RPC server is listening to {addr}:{port}" + ~msg: + "the EVM node private RPC server is listening to {addr}:{port} \ + {websockets}" ~level:Notice ("addr", Data_encoding.string) ("port", Data_encoding.uint16) + ("websockets", Data_encoding.bool) + ~pp3:(fun fmt b -> + (if b then Format.fprintf else Format.ifprintf) fmt "(websockets support)") let event_shutdown_node = Internal_event.Simple.declare_1 @@ -239,10 +247,11 @@ let ignored_kernel_arg () = emit ignored_kernel_arg () let catching_up_evm_event ~from ~to_ = emit catching_up_evm_event (from, to_) -let is_ready ~rpc_addr ~rpc_port = emit event_is_ready (rpc_addr, rpc_port) +let is_ready ~rpc_addr ~rpc_port ~websockets = + emit event_is_ready (rpc_addr, rpc_port, websockets) -let private_server_is_ready ~rpc_addr ~rpc_port = - emit event_private_server_is_ready (rpc_addr, rpc_port) +let private_server_is_ready ~rpc_addr ~rpc_port ~websockets = + emit event_private_server_is_ready (rpc_addr, rpc_port, websockets) let shutdown_rpc_server ~private_ = emit (event_shutdown_rpc_server ~private_) () diff --git a/etherlink/bin_node/lib_dev/events.mli b/etherlink/bin_node/lib_dev/events.mli index eb119257947a..1dc223acaccc 100644 --- a/etherlink/bin_node/lib_dev/events.mli +++ b/etherlink/bin_node/lib_dev/events.mli @@ -38,13 +38,14 @@ val ignored_kernel_arg : unit -> unit Lwt.t node from L1 level [from] to [to_]. *) val catching_up_evm_event : from:int32 -> to_:int32 -> unit Lwt.t -(** [is_ready ~rpc_addr ~rpc_port] advertises that the sequencer is +(** [is_ready ~rpc_addr ~rpc_port ~websockets] advertises that the sequencer is ready and listens to [rpc_addr]:[rpc_port]. *) -val is_ready : rpc_addr:string -> rpc_port:int -> unit Lwt.t +val is_ready : rpc_addr:string -> rpc_port:int -> websockets:bool -> unit Lwt.t -(** [private_server_is_ready ~rpc_addr ~rpc_port] advertises that the - private rpc server is ready and listens to [rpc_addr]:[rpc_port]. *) -val private_server_is_ready : rpc_addr:string -> rpc_port:int -> unit Lwt.t +(** [private_server_is_ready ~rpc_addr ~rpc_port ~websockets] advertises that + the private rpc server is ready and listens to [rpc_addr]:[rpc_port]. *) +val private_server_is_ready : + rpc_addr:string -> rpc_port:int -> websockets:bool -> unit Lwt.t (** [shutdown_rpc_server ~private_ ()] advertises that the RPC server was shut down, [private_] tells whether it is the private server diff --git a/etherlink/bin_node/lib_dev/rpc_server.ml b/etherlink/bin_node/lib_dev/rpc_server.ml index 27e85efde6b9..9956006b8fe5 100644 --- a/etherlink/bin_node/lib_dev/rpc_server.ml +++ b/etherlink/bin_node/lib_dev/rpc_server.ml @@ -108,6 +108,7 @@ module Resto = struct Events.is_ready ~rpc_addr:config.public_rpc.addr ~rpc_port:config.public_rpc.port + ~websockets:false in return finalizer @@ -123,6 +124,7 @@ module Resto = struct Events.private_server_is_ready ~rpc_addr:private_rpc.addr ~rpc_port:private_rpc.port + ~websockets:false in return finalizer | None -> return (fun () -> Lwt_syntax.return_unit) @@ -177,6 +179,7 @@ module Dream = struct Events.is_ready ~rpc_addr:config.public_rpc.addr ~rpc_port:config.public_rpc.port + ~websockets:true in return finalizer @@ -196,6 +199,7 @@ module Dream = struct Events.private_server_is_ready ~rpc_addr:private_rpc.addr ~rpc_port:private_rpc.port + ~websockets:true in return finalizer | None -> return (fun () -> Lwt_syntax.return_unit) -- GitLab From 299ead4a255a45cb8305dba9773799a65825e58f Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 7 Nov 2024 17:54:20 +0100 Subject: [PATCH 09/30] EVM node: send JSONRPC errors on websocket --- etherlink/bin_node/lib_dev/router.ml | 76 ++++++++++++++++++++---- etherlink/bin_node/lib_dev/rpc_errors.ml | 3 +- 2 files changed, 65 insertions(+), 14 deletions(-) diff --git a/etherlink/bin_node/lib_dev/router.ml b/etherlink/bin_node/lib_dev/router.ml index 86d5d9d0018a..bad2ab00d8a0 100644 --- a/etherlink/bin_node/lib_dev/router.ml +++ b/etherlink/bin_node/lib_dev/router.ml @@ -67,11 +67,29 @@ let encode media encoding = let decode media encoding = match media with - | `Json -> + | `Json -> ( fun s -> - Ezjsonm.value_from_string s |> Data_encoding.Json.destruct encoding + match Ezjsonm.value_from_string_result s with + | Error err -> Error (Ezjsonm.read_error_description err) + | Ok json -> ( + try Ok (Data_encoding.Json.destruct encoding json) + with exn -> + let err = + Format.asprintf + "%a" + (Data_encoding.Json.print_error ~print_unknown:(fun fmt exn -> + Format.fprintf + fmt + "Unknown exception %s" + (Printexc.exn_slot_name exn))) + exn + in + Error err)) | `Octets -> - Data_encoding.Binary.of_string_exn (Data_encoding.dynamic_size encoding) + fun s -> + Data_encoding.Binary.of_string (Data_encoding.dynamic_size encoding) s + |> Result.map_error + (Format.asprintf "%a" Data_encoding.Binary.pp_read_error) let content_header = function | `Json -> Dream.application_json @@ -83,6 +101,14 @@ let respond ?status ?code ?headers media v = Dream.set_header response "Content-type" (content_header media) ; return response +let respond_error ?status ?code ?headers media err = + respond + ?status + ?code + ?headers + media + (encode media (Rpc_encodings.JSONRPC.error_encoding Data_encoding.json) err) + let make_gen_route : type i. string -> @@ -103,11 +129,13 @@ let make_gen_route : let open Lwt_syntax in match input_encoding with | No_input -> handler request () - | Input input_encoding -> + | Input input_encoding -> ( let* body = Dream.body request in let media = content_media request in - let input = decode media input_encoding body in - handler request input + let output_media = accept_media request in + match decode media input_encoding body with + | Error msg -> respond_error output_media (Rpc_errors.parse_error msg) + | Ok input -> handler request input) let make_route path service handler = make_gen_route path service @@ fun request input -> @@ -173,6 +201,16 @@ let make_stream_route path service handler = shutdown () ; return_unit +let send_error media websocket error = + let text_or_binary = match media with `Json -> `Text | `Octets -> `Binary in + Dream.send + ~text_or_binary + websocket + (encode + media + (Rpc_encodings.JSONRPC.error_encoding Data_encoding.json) + error) + let make_websocket_route (type input output) path (service : (_, _, _, _, input, output) Service.t) (handler : input -> (output Lwt_stream.t * (unit -> unit)) Lwt.t) = @@ -188,10 +226,16 @@ let make_websocket_route (type input output) path let write_stream (stream, shutdown) = let* () = Lwt_stream.iter_s - (fun output -> - let output = encode output_media output_encoding output in - Dream.send ~text_or_binary websocket output) - stream + (function + | Ok output -> + let output = encode output_media output_encoding output in + Dream.send ~text_or_binary websocket output + | Error exn -> + send_error + output_media + websocket + (Rpc_errors.internal_error (Printexc.exn_slot_name exn))) + (Lwt_stream.wrap_exn stream) in shutdown () ; return_unit @@ -214,9 +258,15 @@ let make_websocket_route (type input output) path match message with | None -> Dream.close_websocket websocket | Some message -> - let input = decode input_media input_encoding message in - let* stream = handler input in - async_write_stream stream ; + let* () = + match decode input_media input_encoding message with + | Error msg -> + send_error output_media websocket (Rpc_errors.parse_error msg) + | Ok input -> + let* stream = handler input in + async_write_stream stream ; + return_unit + in loop () in loop () diff --git a/etherlink/bin_node/lib_dev/rpc_errors.ml b/etherlink/bin_node/lib_dev/rpc_errors.ml index 7629625109bd..d528695d95fc 100644 --- a/etherlink/bin_node/lib_dev/rpc_errors.ml +++ b/etherlink/bin_node/lib_dev/rpc_errors.ml @@ -29,7 +29,8 @@ open Rpc_encodings type t = Data_encoding.json JSONRPC.error -let parse_error = JSONRPC.{code = -32700; message = "Parse error"; data = None} +let parse_error msg = + JSONRPC.{code = -32700; message = "Parse error: " ^ msg; data = None} let invalid_request reason = JSONRPC.{code = -32600; message = reason; data = None} -- GitLab From a3bc0c5ce03f209139ce948bd37ebe41beb63d25 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Fri, 8 Nov 2024 15:21:23 +0100 Subject: [PATCH 10/30] Etherlink/Tezt: small websocket client using websocat --- tezt/lib_tezos/websocket.ml | 90 ++++++++++++++++++++++++++++++++++++ tezt/lib_tezos/websocket.mli | 33 +++++++++++++ 2 files changed, 123 insertions(+) create mode 100644 tezt/lib_tezos/websocket.ml create mode 100644 tezt/lib_tezos/websocket.mli diff --git a/tezt/lib_tezos/websocket.ml b/tezt/lib_tezos/websocket.ml new file mode 100644 index 000000000000..61ed1574276b --- /dev/null +++ b/tezt/lib_tezos/websocket.ml @@ -0,0 +1,90 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +type t = {process : Process.t; stdin : Lwt_io.output_channel} + +exception Could_not_connect + +let get_unique_name = + let name_counts = ref String_map.empty in + fun name -> + let index = + match String_map.find_opt name !name_counts with None -> 0 | Some i -> i + in + name_counts := String_map.add name (index + 1) !name_counts ; + name ^ "#" ^ string_of_int index + +let connect ?runner ?hooks ?name url = + let name = + match name with Some n -> n | None -> get_unique_name "websocket_client" + in + let url = Uri.with_scheme (Uri.of_string url) (Some "ws") |> Uri.to_string in + let process, stdin = + Process.spawn_with_stdin ~name ?runner ?hooks "websocat" [url] + in + let () = + try Unix.kill (Process.pid process) 0 + with _ -> + Log.error "%s could not connect to %s" name url ; + raise Could_not_connect + in + return {process; stdin} + +let send_msg {stdin; _} msg = + let* () = Lwt_io.write stdin (msg ^ "\n") in + unit + +let read_json ~origin {process; _} = + let max_size = 10 * 1024 * 1024 (* 10MB *) in + let ch = Process.stdout process in + let buff = Buffer.create 256 in + let rec loop () = + let* line = Lwt_io.read_line_opt ch in + match line with + | None -> failwith "No response on websocket" + | Some line -> ( + Buffer.add_string buff line ; + match JSON.parse_opt ~origin (Buffer.contents buff) with + | None when Buffer.length buff >= max_size -> + Format.ksprintf + failwith + "Could not parse JSON from websocket %d bytes." + max_size + | None -> loop () + | Some json -> return json) + in + loop () + +let close ws = + let* () = Lwt_io.close ws.stdin in + Process.terminate ws.process ; + unit + +let send = + let cpt = ref 0 in + fun ws json -> + incr cpt ; + let msg = JSON.unannotate json |> Ezjsonm.value_to_string ~minify:true in + Log.debug + ~color:Log.Color.bold + "%s(%d): > %s" + (Process.name ws.process) + !cpt + msg ; + send_msg ws msg + +let recv = + let cpt = ref 0 in + fun ws -> + incr cpt ; + let origin = Format.sprintf "%s(%d)" (Process.name ws.process) !cpt in + read_json ~origin ws + +let send_recv ws json = + let* () = send ws json in + recv ws diff --git a/tezt/lib_tezos/websocket.mli b/tezt/lib_tezos/websocket.mli new file mode 100644 index 000000000000..410ac162c48c --- /dev/null +++ b/tezt/lib_tezos/websocket.mli @@ -0,0 +1,33 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* Copyright (c) 2024 Functori *) +(* Copyright (c) 2024 Nomadic Labs *) +(* *) +(*****************************************************************************) + +exception Could_not_connect + +(** Type of a websocket client *) +type t + +(** [connect ?runner ?hook ?name url] connects to a websocket server and returns + the client. *) +val connect : + ?runner:Runner.t -> + ?hooks:Process_hooks.t -> + ?name:string -> + string -> + t Lwt.t + +(** Terminate the client. *) +val close : t -> unit Lwt.t + +(** Send a JSON object on the websocket. *) +val send : t -> JSON.t -> unit Lwt.t + +(** Receive a JSON object on the websocket. *) +val recv : t -> JSON.t Lwt.t + +(** Send an receive response on websocket. *) +val send_recv : t -> JSON.t -> JSON.t Lwt.t -- GitLab From 2531b3217d1cfaf301076d625df2c6eb0ba8d0d8 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Fri, 8 Nov 2024 15:26:47 +0100 Subject: [PATCH 11/30] Tests: Allow to communicate on websocket with EVM nodes --- etherlink/tezt/lib/evm_node.ml | 171 ++++++++++++++++++++++------- etherlink/tezt/lib/evm_node.mli | 10 +- etherlink/tezt/lib/helpers.ml | 2 +- etherlink/tezt/tests/evm_rollup.ml | 6 +- 4 files changed, 140 insertions(+), 49 deletions(-) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index fb567066ef87..113dab9bc177 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -95,6 +95,11 @@ type mode = module Per_level_map = Map.Make (Int) +type websockets = { + public : Websocket.t Lwt.t; + private_ : Websocket.t Lwt.t option; +} + module Parameters = struct type persistent_state = { arguments : string list; @@ -113,9 +118,13 @@ module Parameters = struct endpoint : string; runner : Runner.t option; restricted_rpcs : string option; + websockets : bool; } - type session_state = {mutable ready : bool} + type session_state = { + mutable ready : bool; + mutable websockets : websockets option; + } let base_default_name = "evm_node" @@ -174,6 +183,39 @@ let connection_arguments ?rpc_addr ?rpc_port ?runner () = in (["--rpc-port"; string_of_int rpc_port] @ rpc_addr_arg, rpc_host, rpc_port) +let rpc_endpoint ?(local = false) ?(private_ = false) (evm_node : t) = + let addr, port, path = + let host = + if local then Constant.default_host + else Runner.address evm_node.persistent_state.runner + in + if private_ then + match evm_node.persistent_state.mode with + | Sequencer {private_rpc_port = Some private_rpc_port; _} + | Observer {private_rpc_port = Some private_rpc_port; _} + | Sandbox {private_rpc_port = Some private_rpc_port; _} -> + (host, private_rpc_port, "/private") + | Sequencer {private_rpc_port = None; _} + | Sandbox {private_rpc_port = None; _} -> + Test.fail "Sequencer doesn't have a private RPC server" + | Threshold_encryption_sequencer + {private_rpc_port = Some private_rpc_port; _} -> + (host, private_rpc_port, "/private") + | Threshold_encryption_sequencer {private_rpc_port = None; _} -> + Test.fail + "Threshold encryption sequencer doesn't have a private RPC server" + | Proxy -> Test.fail "Proxy doesn't have a private RPC server" + | Observer _ -> Test.fail "Observer doesn't have a private RPC server" + | Rpc _ -> Test.fail "Rpc node doesn't have a private RPC server" + | Threshold_encryption_observer _ -> + Test.fail + "Threshold encryption observer doesn't have a private RPC server" + else (host, evm_node.persistent_state.rpc_port, "") + in + Format.sprintf "http://%s:%d%s" addr port path + +let endpoint = rpc_endpoint ?local:None + let trigger_ready sc_node value = let pending = sc_node.persistent_state.pending_ready in sc_node.persistent_state.pending_ready <- [] ; @@ -221,8 +263,35 @@ let event_blueprint_finalized_name = let event_blueprint_applied_name = "blueprint_application.v0" +let open_websockets evm_node = + match evm_node.status with + | Not_running -> failwith "Cannot open websockets, EVM node is down." + | Running status -> + Lwt.async @@ fun () -> + let public = + Websocket.connect + ~name:("ws_public_" ^ evm_node.name) + (endpoint evm_node ^ "/ws") + and private_ = + let endpoint = + try Some (endpoint ~private_:true evm_node ^ "/ws") with _ -> None + in + match endpoint with + | None -> None + | Some endpoint -> + let ws = + Websocket.connect ~name:("ws_private_" ^ evm_node.name) endpoint + in + Some ws + in + status.session_state.websockets <- Some {public; private_} ; + unit + let handle_is_ready_event (evm_node : t) {name; value = _; timestamp = _} = - if name = event_ready_name then set_ready evm_node else () + if name = event_ready_name then ( + set_ready evm_node ; + if evm_node.persistent_state.websockets then open_websockets evm_node) + else () let handle_blueprint_injected_event (evm_node : t) {name; value; timestamp = _} = @@ -562,7 +631,8 @@ let wait_for_gc_finished ?gc_level ?head_level evm_node = | None, None -> Some (event_gc_level, event_gc_level) let create ?(path = Uses.path Constant.octez_evm_node) ?name ?runner - ?(mode = Proxy) ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs endpoint = + ?(mode = Proxy) ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs + ?(websockets = false) endpoint = let arguments, rpc_addr, rpc_port = connection_arguments ?rpc_addr ?rpc_port ?runner () in @@ -601,6 +671,7 @@ let create ?(path = Uses.path Constant.octez_evm_node) ?name ?runner endpoint; restricted_rpcs; runner; + websockets; } in on_event evm_node (handle_is_ready_event evm_node) ; @@ -701,7 +772,7 @@ let run ?(wait = true) ?(extra_arguments = []) evm_node = ?runner:evm_node.persistent_state.runner ~event_level:`Debug evm_node - {ready = false} + {ready = false; websockets = None} (run_args evm_node @ extra_arguments) ~on_terminate in @@ -986,39 +1057,6 @@ let spawn_init_config ?(extra_arguments = []) evm_node = spawn_command evm_node @@ ["init"; "config"] @ mode_args @ shared_args @ extra_arguments -let rpc_endpoint ?(local = false) ?(private_ = false) (evm_node : t) = - let addr, port, path = - let host = - if local then Constant.default_host - else Runner.address evm_node.persistent_state.runner - in - if private_ then - match evm_node.persistent_state.mode with - | Sequencer {private_rpc_port = Some private_rpc_port; _} - | Observer {private_rpc_port = Some private_rpc_port; _} - | Sandbox {private_rpc_port = Some private_rpc_port; _} -> - (host, private_rpc_port, "/private") - | Sequencer {private_rpc_port = None; _} - | Sandbox {private_rpc_port = None; _} -> - Test.fail "Sequencer doesn't have a private RPC server" - | Threshold_encryption_sequencer - {private_rpc_port = Some private_rpc_port; _} -> - (host, private_rpc_port, "/private") - | Threshold_encryption_sequencer {private_rpc_port = None; _} -> - Test.fail - "Threshold encryption sequencer doesn't have a private RPC server" - | Proxy -> Test.fail "Proxy doesn't have a private RPC server" - | Observer _ -> Test.fail "Observer doesn't have a private RPC server" - | Rpc _ -> Test.fail "Rpc node doesn't have a private RPC server" - | Threshold_encryption_observer _ -> - Test.fail - "Threshold encryption observer doesn't have a private RPC server" - else (host, evm_node.persistent_state.rpc_port, "") - in - Format.sprintf "http://%s:%d%s" addr port path - -let endpoint = rpc_endpoint ?local:None - type garbage_collector = { split_frequency_in_seconds : int; history_to_keep_in_seconds : int; @@ -1027,7 +1065,8 @@ type garbage_collector = { let patch_config_with_experimental_feature ?(drop_duplicate_when_injection = false) ?(node_transaction_validation = false) ?(block_storage_sqlite3 = true) - ?(next_wasm_runtime = true) ?garbage_collector () = + ?(next_wasm_runtime = true) ?garbage_collector ?(enable_websocket = false) + () = let conditional_json_put ~name cond value_json json = if cond then JSON.put @@ -1077,9 +1116,10 @@ let patch_config_with_experimental_feature ( "history_to_keep_in_seconds", `Float (Int.to_float history_to_keep_in_seconds) ); ]) + |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port - ?restricted_rpcs rollup_node = + ?restricted_rpcs ?(websockets = false) rollup_node = let evm_node = create ?name @@ -1089,9 +1129,16 @@ let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port ?rpc_addr ?rpc_port ?restricted_rpcs + ~websockets rollup_node in let* () = Process.check @@ spawn_init_config evm_node in + let* () = + if websockets then + Config_file.update evm_node + @@ patch_config_with_experimental_feature ~enable_websocket:true () + else unit + in let* () = match patch_config with | Some patch_config -> Config_file.update evm_node patch_config @@ -1152,13 +1199,53 @@ let batch_requests requests = (* We keep both encoding (with a single object or an array of objects) and both function on purpose, to ensure both encoding are supported by the server. *) -let call_evm_rpc ?(private_ = false) evm_node request = +let call_evm_rpc_curl ~private_ evm_node request = let endpoint = endpoint ~private_ evm_node in Curl.post endpoint (build_request request) |> Runnable.run -let batch_evm_rpc ?(private_ = false) evm_node requests = +let call_evm_rpc_websocket ~private_ websockets request = + let ws = + match (private_, websockets.private_) with + | true, Some ws -> ws + | true, None -> failwith "No private websocket" + | false, _ -> websockets.public + in + let* ws in + Websocket.send_recv ws (build_request request) + +let call_evm_rpc ?(private_ = false) evm_node request = + Log.debug "call_rpc %s on %s" request.method_ evm_node.name ; + if not evm_node.persistent_state.websockets then + call_evm_rpc_curl ~private_ evm_node request + else + match evm_node.status with + | Not_running -> failwith "Cannot send websocket JSONRPC, EVM node is down." + | Running status -> ( + match status.session_state.websockets with + | None -> assert false + | Some websockets -> call_evm_rpc_websocket ~private_ websockets request + ) + +let batch_evm_rpc_curl ~private_ evm_node requests = let endpoint = endpoint ~private_ evm_node in - Curl.post endpoint (batch_requests requests) |> Runnable.run + let* json = Curl.post endpoint (batch_requests requests) |> Runnable.run in + return (JSON.as_list json) + +let batch_evm_rpc_websocket ~private_ websockets requests = + Lwt_list.map_s (call_evm_rpc_websocket ~private_ websockets) requests + +let batch_evm_rpc ?(private_ = false) evm_node requests = + if not evm_node.persistent_state.websockets then + batch_evm_rpc_curl ~private_ evm_node requests + else + match evm_node.status with + | Not_running -> + failwith "Cannot send websocket JSONRPCs, EVM node is down." + | Running status -> ( + match status.session_state.websockets with + | None -> assert false + | Some websockets -> + batch_evm_rpc_websocket ~private_ websockets requests) let extract_result json = JSON.(json |-> "result") diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 78011b0b7f37..d38e7892f78f 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -140,8 +140,9 @@ val supports_threshold_encryption : t -> bool rollup_node_endpoint] creates an EVM node server. The server listens to requests at address [rpc_addr] and the port - [rpc_port]. [rpc_addr] defaults to [Constant.default_host] and a fresh port is - chosen if [rpc_port] is not set. + [rpc_port]. [rpc_addr] defaults to [Constant.default_host] and a fresh port + is chosen if [rpc_port] is not set. The EVM node starts a websocket server + for JSON-RPC communication if [websocket] is [true]. The server communicates with a rollup-node and sets its endpoint via [rollup_node_endpoint]. @@ -157,6 +158,7 @@ val create : ?rpc_addr:string -> ?rpc_port:int -> ?restricted_rpcs:string -> + ?websockets:bool -> string -> t @@ -259,6 +261,7 @@ val patch_config_with_experimental_feature : ?block_storage_sqlite3:bool -> ?next_wasm_runtime:bool -> ?garbage_collector:garbage_collector -> + ?enable_websocket:bool -> unit -> JSON.t -> JSON.t @@ -276,6 +279,7 @@ val init : ?rpc_addr:string -> ?rpc_port:int -> ?restricted_rpcs:string -> + ?websockets:bool -> string -> t Lwt.t @@ -378,7 +382,7 @@ val call_evm_rpc : ?private_:bool -> t -> request -> JSON.t Lwt.t to the [evm_node], for the given [requests]. If [private_] is true, the requests are sent to the private RPC server. *) -val batch_evm_rpc : ?private_:bool -> t -> request list -> JSON.t Lwt.t +val batch_evm_rpc : ?private_:bool -> t -> request list -> JSON.t list Lwt.t (** [extract_result json] expects a JSON-RPC `result` and returns the value. *) val extract_result : JSON.t -> JSON.t diff --git a/etherlink/tezt/lib/helpers.ml b/etherlink/tezt/lib/helpers.ml index 858241d64532..c35e5b7b67e6 100644 --- a/etherlink/tezt/lib/helpers.ml +++ b/etherlink/tezt/lib/helpers.ml @@ -296,7 +296,7 @@ let batch_n_transactions ~evm_node txs = in let* hashes = Evm_node.batch_evm_rpc evm_node requests in let hashes = - hashes |> JSON.as_list + hashes |> List.map (fun json -> Evm_node.extract_result json |> JSON.as_string) in return (requests, hashes) diff --git a/etherlink/tezt/tests/evm_rollup.ml b/etherlink/tezt/tests/evm_rollup.ml index debdf84517fd..23c7a154e8cd 100644 --- a/etherlink/tezt/tests/evm_rollup.ml +++ b/etherlink/tezt/tests/evm_rollup.ml @@ -1066,7 +1066,7 @@ let test_rpc_getTransactionCountBatch = ~block:"latest"; ] in - match JSON.as_list transaction_count with + match transaction_count with | [transaction_count] -> return JSON.(transaction_count |-> "result" |> as_int64) | _ -> Test.fail "Unexpected result from batching one request" @@ -1088,7 +1088,7 @@ let test_rpc_batch = let* results = Evm_node.batch_evm_rpc evm_node [transaction_count; chain_id] in - match JSON.as_list results with + match results with | [transaction_count; chain_id] -> return ( JSON.(transaction_count |-> "result" |> as_int64), @@ -1516,7 +1516,7 @@ let config_setup evm_setup = let* results = Evm_node.batch_evm_rpc evm_setup.evm_node [web3_clientVersion; chain_id] in - match JSON.as_list results with + match results with | [web3_clientVersion; chain_id] -> (* We don't need to return the web3_clientVersion because, it might change after the upgrade. -- GitLab From 7c3fa07278155decc54a7a969fd3690573a7914d Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Fri, 8 Nov 2024 17:59:30 +0100 Subject: [PATCH 12/30] Test: test that RPC calls can be made on the same websocket connection --- etherlink/tezt/tests/evm_rollup.ml | 100 ++++++++++++++++++++++++----- 1 file changed, 85 insertions(+), 15 deletions(-) diff --git a/etherlink/tezt/tests/evm_rollup.ml b/etherlink/tezt/tests/evm_rollup.ml index 23c7a154e8cd..72b4fb40f1df 100644 --- a/etherlink/tezt/tests/evm_rollup.ml +++ b/etherlink/tezt/tests/evm_rollup.ml @@ -316,7 +316,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ?tx_pool_timeout_limit ?tx_pool_addr_limit ?tx_pool_tx_per_addr_limit ?max_number_of_chunks ?(setup_mode = Setup_proxy) ?(force_install_kernel = true) ?whitelist ?maximum_allowed_ticks - ?restricted_rpcs ?(enable_dal = false) ?dal_slots protocol = + ?restricted_rpcs ?(enable_dal = false) ?dal_slots ?websockets protocol = let _, kernel_installee = Kernel.to_uses_and_tags kernel in let* node, client = setup_l1 ?commitment_period ?challenge_window ?timestamp protocol @@ -449,6 +449,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ~patch_config ~mode ?restricted_rpcs + ?websockets (Sc_rollup_node.endpoint sc_rollup_node) in return @@ -497,6 +498,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) ~patch_config ~mode:sequencer_mode ?restricted_rpcs + ?websockets (Sc_rollup_node.endpoint sc_rollup_node) in let produce_block () = Rpc.produce_block sequencer in @@ -506,6 +508,7 @@ let setup_evm_kernel ?additional_config ?(setup_kernel_root_hash = true) Evm_node.create ~data_dir:(Evm_node.data_dir sequencer) ~mode:(Rpc Evm_node.(mode sequencer)) + ?websockets (Evm_node.endpoint sequencer) in let* () = Evm_node.run evm_node in @@ -532,8 +535,8 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin ?(additional_uses = []) ?commitment_period ?challenge_window ?bootstrap_accounts ?whitelist ?da_fee_per_byte ?minimum_base_fee_per_gas ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs ~setup_mode - ~enable_dal ?(dal_slots = if enable_dal then Some [4] else None) f protocols - = + ~enable_dal ?(dal_slots = if enable_dal then Some [4] else None) ?websockets + f protocols = let extra_tag = match setup_mode with | Setup_proxy -> "proxy" @@ -583,6 +586,7 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin ~setup_mode ~enable_dal ?dal_slots + ?websockets protocol in f ~protocol ~evm_setup) @@ -592,7 +596,7 @@ let register_test ~title ~tags ?(kernels = Kernel.all) ?additional_config ?admin let register_proxy ~title ~tags ?kernels ?additional_uses ?additional_config ?admin ?commitment_period ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?whitelist ?rollup_operator_key - ?maximum_allowed_ticks ?restricted_rpcs f protocols = + ?maximum_allowed_ticks ?restricted_rpcs ?websockets f protocols = let register ~enable_dal : unit = register_test ~title @@ -610,6 +614,7 @@ let register_proxy ~title ~tags ?kernels ?additional_uses ?additional_config ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ~enable_dal @@ -623,7 +628,8 @@ let register_sequencer ?(return_sequencer = false) ~title ~tags ?kernels ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?time_between_blocks ?whitelist ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs - ?max_blueprints_ahead ?(block_storage_sqlite3 = false) f protocols = + ?max_blueprints_ahead ?(block_storage_sqlite3 = false) ?websockets f + protocols = let register ~enable_dal : unit = register_test ~title @@ -641,6 +647,7 @@ let register_sequencer ?(return_sequencer = false) ~title ~tags ?kernels ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ~enable_dal @@ -661,7 +668,8 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?admin ?commitment_period ?challenge_window ?bootstrap_accounts ?da_fee_per_byte ?minimum_base_fee_per_gas ?time_between_blocks ?whitelist ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs - ?max_blueprints_ahead ?block_storage_sqlite3 f protocols : unit = + ?max_blueprints_ahead ?block_storage_sqlite3 ?websockets f protocols : unit + = register_proxy ~title ~tags @@ -678,6 +686,7 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?rollup_operator_key ?maximum_allowed_ticks ?restricted_rpcs + ?websockets f protocols ; register_sequencer @@ -699,6 +708,7 @@ let register_both ~title ~tags ?kernels ?additional_uses ?additional_config ?restricted_rpcs ?max_blueprints_ahead ?block_storage_sqlite3 + ?websockets f protocols @@ -915,14 +925,7 @@ let test_rpc_getBlockByHash = assert (block = block') ; unit -let test_rpc_getBlockReceipts = - register_both - ~time_between_blocks:Nothing - ~bootstrap_accounts:Eth_account.lots_of_address - ~tags:["evm"; "rpc"; "get_block_receipts"] - ~title:"RPC method eth_getBlockReceipts" - ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx - @@ fun ~protocol:_ ~evm_setup:{evm_node; produce_block; _} -> +let test_rpc_getBlockReceipts_aux ~evm_setup:{evm_node; produce_block; _} = let txs = read_tx_from_file () |> List.filteri (fun i _ -> i < 5) @@ -956,6 +959,15 @@ let test_rpc_getBlockReceipts = assert (List.equal ( = ) txs expected_txs) ; unit +let test_rpc_getBlockReceipts = + register_both + ~time_between_blocks:Nothing + ~bootstrap_accounts:Eth_account.lots_of_address + ~tags:["evm"; "rpc"; "get_block_receipts"] + ~title:"RPC method eth_getBlockReceipts" + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + @@ fun ~protocol:_ ~evm_setup -> test_rpc_getBlockReceipts_aux ~evm_setup + let test_rpc_getBlockBy_return_base_fee_per_gas_and_mix_hash = register_both (* TODO: https://gitlab.com/tezos/tezos/-/issues/7285 @@ -1112,6 +1124,63 @@ let test_rpc_eth_coinbase = ~error_msg:"eth_coinbase returned %L, expected %R" ; unit +let test_websocket_rpcs = + register_both + ~tags:["evm"; "rpc"; "websocket"] + ~title:"RPC methods over websocket" + ~time_between_blocks:Nothing + ~bootstrap_accounts: + ((Array.to_list Eth_account.bootstrap_accounts + |> List.map (fun a -> a.Eth_account.address)) + @ Eth_account.lots_of_address) + ~minimum_base_fee_per_gas:base_fee_for_hardcoded_tx + ~websockets:true + @@ fun ~protocol:_ ~evm_setup:({evm_node; produce_block; _} as evm_setup) -> + Log.info "getBalance" ; + let*@ balance = + Rpc.get_balance ~address:Eth_account.bootstrap_accounts.(0).address evm_node + in + Check.((balance = Helpers.default_bootstrap_account_balance) Wei.typ) + ~error_msg: + (sf + "Expected balance of %s should be %%R, but got %%L" + Eth_account.bootstrap_accounts.(0).address) ; + Log.info "getBlockByNumber" ; + let*@ block = Rpc.get_block_by_number ~block:"0" evm_node in + Check.((block.number = 0l) int32) + ~error_msg:"Unexpected block number, should be %%R, but got %%L" ; + Log.info "getBlockByHash" ; + let* block' = get_block_by_hash evm_setup block.hash in + assert (block = block') ; + Log.info "blockNumber" ; + let* () = + repeat 2 (fun () -> + let*@ _ = produce_block () in + unit) + in + let*@ block_number = Rpc.block_number evm_node in + Check.((block_number = 2l) int32) + ~error_msg:"Expected a block number of %R, but got %L" ; + Log.info "getBlockReceipts" ; + let* () = test_rpc_getBlockReceipts_aux ~evm_setup in + Log.info "getTransactionCount" ; + let*@ transaction_count = + Rpc.get_transaction_count + ~address:Eth_account.bootstrap_accounts.(0).address + evm_node + in + Check.((transaction_count = 0L) int64) + ~error_msg:"Expected a nonce of %R, but got %L" ; + Log.info "netVersion" ; + let*@ net_version = Rpc.net_version evm_node in + Check.((net_version = "1337") string) + ~error_msg:"Expected net_version is %R, but got %L" ; + Log.info "coinbase" ; + let*@ coinbase = Rpc.coinbase evm_node in + Check.((coinbase = "0x0000000000000000000000000000000000000000") string) + ~error_msg:"eth_coinbase returned %L, expected %R" ; + unit + let test_l2_blocks_progression = register_proxy ~tags:["evm"; "l2_blocks_progression"] @@ -6538,7 +6607,8 @@ let register_evm_node ~protocols = "29a7000000000000000000000000000000000000000000000000000000000000" ~flag_expected:true protocols ; - test_proxy_ignore_block_param protocols + test_proxy_ignore_block_param protocols ; + test_websocket_rpcs protocols let protocols = Protocol.all -- GitLab From 79fd5db949dfb1dbace75cffc9fb6472304169f7 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 12 Nov 2024 11:53:26 +0100 Subject: [PATCH 13/30] CI: add websocat to docker image deps for tests --- images/ci/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/images/ci/Dockerfile b/images/ci/Dockerfile index f6d5338a8279..0a6f8a59c23f 100644 --- a/images/ci/Dockerfile +++ b/images/ci/Dockerfile @@ -339,7 +339,7 @@ SHELL ["/bin/ash", "-euo", "pipefail", "-c"] # hadolint ignore=DL3018,DL3019 RUN apk update \ - && apk add --no-cache curl npm git file make jq \ + && apk add --no-cache curl npm git file make jq websocat \ gcc clang lld ca-certificates build-base musl-dev libusb-dev linux-headers \ # We need datadog-ci to send JUnit files to Datadog in tezt # jobs. Cf. [scripts/ci/tezt.sh]. -- GitLab From 7db156bcc9dd6b210187fb8b13ce439a55e97d6d Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 12 Nov 2024 16:04:54 +0100 Subject: [PATCH 14/30] TMP: disable rust version checks --- images/ci/scripts/check_versions.sh | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/images/ci/scripts/check_versions.sh b/images/ci/scripts/check_versions.sh index 1ed38b7fe4f6..805f9cdd184d 100755 --- a/images/ci/scripts/check_versions.sh +++ b/images/ci/scripts/check_versions.sh @@ -52,11 +52,14 @@ check_version_in_test_dependency_image() { echo "### Important packages version" - current_cargo_version=$(${run} cargo --version | awk '{print $2}') - check_version cargo "${current_cargo_version}" "${cargo_version}" + # FIXME: Disabled until https://gitlab.com/tezos/tezos/-/merge_requests/15438 + # is merged. - current_rust_version=$(${run} rustc --version | awk '{print $2}') - check_version rust "${current_rust_version}" "${rust_version}" + # current_cargo_version=$(${run} cargo --version | awk '{print $2}') + # check_version cargo "${current_cargo_version}" "${cargo_version}" + + # current_rust_version=$(${run} rustc --version | awk '{print $2}') + # check_version rust "${current_rust_version}" "${rust_version}" current_opam_version=$(${run} opam --version) check_version opam "${current_opam_version}" "${opam_version}" -- GitLab From af1126c8bcc110f5dc378f5c37313695a14bfcb3 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 7 Nov 2024 11:06:58 +0100 Subject: [PATCH 15/30] EVM/Node: add subscription's types --- .../lib_dev/encodings/ethereum_types.ml | 41 +++++++++++++++++++ .../lib_dev/encodings/ethereum_types.mli | 14 +++++++ 2 files changed, 55 insertions(+) diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml index 0b4d3b8453b8..b990f3531e37 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml @@ -1121,3 +1121,44 @@ let state_override_empty = AddressMap.empty let state_override_encoding = AddressMap.associative_array_encoding state_account_override_encoding + +module Subscription = struct + exception Unknown_subscription + + type logs = {address : address; topics : hash list} + + let logs_encoding = + let open Data_encoding in + conv + (fun {address; topics} -> (address, topics)) + (fun (address, topics) -> {address; topics}) + (obj2 + (req "address" address_encoding) + (req "topics" (list hash_encoding))) + + type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing + + let kind_encoding = + let open Data_encoding in + conv + (function + | NewHeads -> ("newHeads", None) + | Logs logs -> ("logs", Some logs) + | NewPendingTransactions -> ("newPendingTransactions", None) + | Syncing -> ("syncing", None)) + (function + | "newHeads", None -> NewHeads + | "logs", Some logs -> Logs logs + | "newPendingTransactions", None -> NewPendingTransactions + | "syncing", None -> Syncing + | _ -> raise Unknown_subscription) + (tup2 string (option logs_encoding)) + + type id = Id of hex [@@ocaml.unboxed] + + let id_of_string s = Id (hex_of_string (String.lowercase_ascii s)) + + let id_to_string (Id a) = hex_to_string a + + let id_encoding = Data_encoding.(conv id_to_string id_of_string string) +end diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli index f40a9bf75857..2dca1a851b1a 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli @@ -309,3 +309,17 @@ module From_rlp : sig val decode_hex : Rlp.item -> hex tzresult end + +module Subscription : sig + exception Unknown_subscription + + type logs = {address : address; topics : hash list} + + type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing + + val kind_encoding : kind Data_encoding.t + + type id = Id of hex [@@ocaml.unboxed] + + val id_encoding : id Data_encoding.t +end -- GitLab From b017920a6d4222f6113b6b3304a268e31ba1e7da Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 7 Nov 2024 11:07:19 +0100 Subject: [PATCH 16/30] EVM/Node: add subscription's RPC encodings --- etherlink/bin_node/lib_dev/rpc_encodings.ml | 36 ++++++++++++++++++-- etherlink/bin_node/lib_dev/rpc_encodings.mli | 8 +++++ 2 files changed, 42 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.ml b/etherlink/bin_node/lib_dev/rpc_encodings.ml index 0dea4c11ef0a..66c6617142b1 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.ml +++ b/etherlink/bin_node/lib_dev/rpc_encodings.ml @@ -844,6 +844,38 @@ module Coinbase = struct type ('input, 'output) method_ += Method : (input, output) method_ end +module Subscribe = struct + open Ethereum_types + + type input = Subscription.kind + + type output = Subscription.id + + let input_encoding = Subscription.kind_encoding + + let output_encoding = Subscription.id_encoding + + let method_ = "eth_subscribe" + + type ('input, 'output) method_ += Method : (input, output) method_ +end + +module Unsubscribe = struct + open Ethereum_types + + type input = Subscription.id + + type output = bool + + let input_encoding = Subscription.id_encoding + + let output_encoding = Data_encoding.bool + + let method_ = "eth_unsubscribe" + + type ('input, 'output) method_ += Method : (input, output) method_ +end + type map_result = | Method : ('input, 'output) method_ @@ -897,6 +929,8 @@ let supported_methods : (module METHOD) list = (module Eth_fee_history); (module Coinbase); (module Trace_call); + (module Subscribe); + (module Unsubscribe); ] let unsupported_methods : string list = @@ -921,8 +955,6 @@ let unsupported_methods : string list = "eth_newPendingTransactionFilter"; "eth_uninstallFilter"; "eth_sendTransaction"; - "eth_subscribe"; - "eth_unsubscribe"; (* debug *) "debug_getBadBlocks"; "debug_getRawBlock"; diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.mli b/etherlink/bin_node/lib_dev/rpc_encodings.mli index ac1669296c42..15bd5598309e 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.mli +++ b/etherlink/bin_node/lib_dev/rpc_encodings.mli @@ -309,6 +309,14 @@ module Eth_fee_history : module Coinbase : METHOD with type input = unit and type output = Ethereum_types.address +module Subscribe : + METHOD + with type input = Ethereum_types.Subscription.kind + and type output = Ethereum_types.Subscription.id + +module Unsubscribe : + METHOD with type input = Ethereum_types.Subscription.id and type output = bool + type map_result = | Method : ('input, 'output) method_ -- GitLab From af750447f23a776941fcf94ea24807c0397ecca3 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 7 Nov 2024 11:07:42 +0100 Subject: [PATCH 17/30] WIP! EVM/Node: add subscription's services --- etherlink/bin_node/lib_dev/services.ml | 50 ++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 86a363110240..310bc15c0477 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -213,6 +213,44 @@ let block_transaction_count block = | TxHash l -> List.length l | TxFull l -> List.length l +let subscriptions : + ( Ethereum_types.Subscription.id, + Ethereum_types.Subscription.kind ) + Stdlib.Hashtbl.t = + (* 10 seems like a reasonable since there is only four types + of subscription *) + Stdlib.Hashtbl.create 10 + +let () = Random.self_init () + +let encode_id bytes = + let id_hex = Hex.of_bytes bytes |> Hex.show in + (* Trim leading zeros *) + let id_trimmed = Str.global_replace (Str.regexp "^0+") "" id_hex in + if id_trimmed = "" then "0x0" else "0x" ^ id_trimmed + +let make_id ~id = Ethereum_types.Subscription.(Id (Hex id)) + +(* [generate_id]'s implementation is inspired by geth's one. + See: + https://github.com/ethereum/go-ethereum/blob/master/rpc/subscription.go. *) +let generate_id () = + let id = Bytes.make 16 '\000' in + Bytes.iteri (fun i _ -> Bytes.set_uint8 id i (Random.int 256)) id ; + encode_id id + +let eth_subscribe ~kind = + let id = make_id ~id:(generate_id ()) in + Stdlib.Hashtbl.add subscriptions id kind ; + (* TODO: Start sending data based on kind to the websocket. *) + id + +let eth_unsubscribe ~id = + Stdlib.Hashtbl.remove subscriptions id ; + (* TODO: To perfectly comply with standard specification, check + if the id isn't in the map, should we return false (?) *) + true + let decode : type a. (module METHOD with type input = a) -> Data_encoding.json -> a = fun (module M) v -> Data_encoding.Json.destruct M.input_encoding v @@ -733,6 +771,18 @@ let dispatch_request (rpc : Configuration.rpc) (config : Configuration.t) process_trace_result trace in build_with_input ~f module_ parameters + | Subscribe.Method -> + let f (kind : Subscription.kind) = + let id = eth_subscribe ~kind in + rpc_ok id + in + build_with_input ~f module_ parameters + | Unsubscribe.Method -> + let f (id : Subscription.id) = + let status = eth_unsubscribe ~id in + rpc_ok status + in + build_with_input ~f module_ parameters | _ -> Stdlib.failwith "The pattern matching of methods is not exhaustive") in -- GitLab From 5428ff49c72aca7902c28c1b9c223f7f1c537b8e Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Fri, 8 Nov 2024 11:17:08 +0100 Subject: [PATCH 18/30] WIP! head encoding --- etherlink/bin_node/lib_dev/encodings/ethereum_types.ml | 9 +++++++++ etherlink/bin_node/lib_dev/encodings/ethereum_types.mli | 4 ++++ 2 files changed, 13 insertions(+) diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml index b990f3531e37..19563425fe85 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml @@ -1161,4 +1161,13 @@ module Subscription = struct let id_to_string (Id a) = hex_to_string a let id_encoding = Data_encoding.(conv id_to_string id_of_string string) + + type head = {subscription : id; result : block} + + let head_encoding = + let open Data_encoding in + conv + (fun {subscription; result} -> (subscription, result)) + (fun (subscription, result) -> {subscription; result}) + (obj2 (req "subscription" id_encoding) (req "result" block_encoding)) end diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli index 2dca1a851b1a..27b53c092330 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli @@ -322,4 +322,8 @@ module Subscription : sig type id = Id of hex [@@ocaml.unboxed] val id_encoding : id Data_encoding.t + + type head = {subscription : id; result : block} + + val head_encoding : head Data_encoding.t end -- GitLab From 5620bb7af7fa41bbf1a739a5db1fca198211a89e Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Fri, 8 Nov 2024 11:17:30 +0100 Subject: [PATCH 19/30] WIP! watcher/stream/stopper for newheads --- etherlink/bin_node/lib_dev/evm_context.ml | 4 ++++ etherlink/bin_node/lib_dev/evm_context.mli | 2 ++ etherlink/bin_node/lib_dev/services.ml | 19 ++++++++++++++----- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index 541d110299a1..6d525aff1782 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -363,6 +363,9 @@ let lock_data_dir ~data_dir = in return_unit +let head_watcher : Ethereum_types.block Lwt_watcher.input = + Lwt_watcher.create_input () + module State = struct let with_store_transaction ctxt k = Evm_store.use ctxt.store @@ fun conn -> @@ -905,6 +908,7 @@ module State = struct ctxt.session.context <- context ; ctxt.session.next_blueprint_number <- Qty (Z.succ level) ; ctxt.session.current_block_hash <- Ethereum_types.(block.hash) ; + Lwt_watcher.notify head_watcher block ; Option.iter (fun (split_level, split_timestamp) -> ctxt.session.last_split_block <- Some (split_level, split_timestamp)) diff --git a/etherlink/bin_node/lib_dev/evm_context.mli b/etherlink/bin_node/lib_dev/evm_context.mli index 0ff106eb16e5..10b49dcdcdf6 100644 --- a/etherlink/bin_node/lib_dev/evm_context.mli +++ b/etherlink/bin_node/lib_dev/evm_context.mli @@ -184,3 +184,5 @@ module State : sig (** Path of EVM state store. *) val store_path : data_dir:string -> string end + +val head_watcher : Ethereum_types.block Lwt_watcher.input diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 310bc15c0477..8a648ed3b3c4 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -213,10 +213,14 @@ let block_transaction_count block = | TxHash l -> List.length l | TxFull l -> List.length l +type 'kind sub_stream = { + kind : Ethereum_types.Subscription.kind; + stream : 'kind Lwt_stream.t; + stopper : Lwt_watcher.stopper; +} + let subscriptions : - ( Ethereum_types.Subscription.id, - Ethereum_types.Subscription.kind ) - Stdlib.Hashtbl.t = + (Ethereum_types.Subscription.id, 'kind sub_stream) Stdlib.Hashtbl.t = (* 10 seems like a reasonable since there is only four types of subscription *) Stdlib.Hashtbl.create 10 @@ -241,8 +245,13 @@ let generate_id () = let eth_subscribe ~kind = let id = make_id ~id:(generate_id ()) in - Stdlib.Hashtbl.add subscriptions id kind ; - (* TODO: Start sending data based on kind to the websocket. *) + let stream, stopper = + match kind with + | Ethereum_types.Subscription.NewHeads -> + Lwt_watcher.create_stream Evm_context.head_watcher + | _ -> Stdlib.failwith "TODO_implement_all_kinds" + in + Stdlib.Hashtbl.add subscriptions id {kind; stream; stopper} ; id let eth_unsubscribe ~id = -- GitLab From 6dbcca42fa3d966c80947e137d70cf79bf51890d Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Tue, 12 Nov 2024 11:11:46 +0100 Subject: [PATCH 20/30] WIP! Subscription encoding (sent through the websocket) --- etherlink/bin_node/lib_dev/rpc_encodings.ml | 32 ++++++++++++++++++++ etherlink/bin_node/lib_dev/rpc_encodings.mli | 17 +++++++++++ 2 files changed, 49 insertions(+) diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.ml b/etherlink/bin_node/lib_dev/rpc_encodings.ml index 66c6617142b1..b7de46ba4c0f 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.ml +++ b/etherlink/bin_node/lib_dev/rpc_encodings.ml @@ -112,6 +112,38 @@ module JSONRPC = struct (req "id" (option id_repr_encoding)))) end +module Subscription = struct + let version = JSONRPC.version + + let method_ = "eth_subscription" + + type result = { + result : Data_encoding.json; + subscription : Ethereum_types.Subscription.id; + } + + let result_encoding = + Data_encoding.( + conv + (fun {result; subscription} -> (result, subscription)) + (fun (result, subscription) -> {result; subscription}) + (obj2 + (req "result" Data_encoding.json) + (req "subscription" Ethereum_types.Subscription.id_encoding))) + + type response = {params : result} + + let response_encoding = + Data_encoding.( + conv + (fun {params} -> ((), (), params)) + (fun ((), (), params) -> {params}) + (obj3 + (req "jsonrpc" (constant version)) + (req "method" (constant method_)) + (req "params" result_encoding))) +end + module Error = struct type t = unit diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.mli b/etherlink/bin_node/lib_dev/rpc_encodings.mli index 15bd5598309e..b9eb25beca71 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.mli +++ b/etherlink/bin_node/lib_dev/rpc_encodings.mli @@ -87,6 +87,23 @@ module JSONRPC : sig val response_encoding : response Data_encoding.t end +module Subscription : sig + val version : string + + val method_ : string + + type result = { + result : Data_encoding.json; + subscription : Ethereum_types.Subscription.id; + } + + val result_encoding : result Data_encoding.t + + type response = {params : result} + + val response_encoding : response Data_encoding.t +end + (* Errors returned by the RPC server, to be embedded as data to the JSON-RPC error object. *) module Error : sig -- GitLab From db774e107732fb059ce4dcb9c41d5d4e2c502bc7 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Tue, 12 Nov 2024 13:38:48 +0100 Subject: [PATCH 21/30] WIP! (experimental) example of how a subscription response can be built There are probably (and most likely) more straightforward way to do this but I found this one which follows what's done for classic HTTP request, it could help with refactorisation at some point to have similar way to build responses. Introducing WS_METHOD seems irrelevant in the end, but it might make sense to split them (or not) for typing reasons and avoid mixing what's not supposed to be mixed. --- etherlink/bin_node/lib_dev/rpc_encodings.ml | 28 +++++++++++ etherlink/bin_node/lib_dev/rpc_encodings.mli | 17 +++++++ etherlink/bin_node/lib_dev/services.ml | 51 ++++++++++++++++++++ 3 files changed, 96 insertions(+) diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.ml b/etherlink/bin_node/lib_dev/rpc_encodings.ml index b7de46ba4c0f..669f4388686a 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.ml +++ b/etherlink/bin_node/lib_dev/rpc_encodings.ml @@ -168,6 +168,20 @@ module type METHOD = sig type ('input, 'output) method_ += Method : (input, output) method_ end +type ('input, 'output) ws_method_ = .. + +module type WS_METHOD = sig + type input + + val input_encoding : input Data_encoding.t + + type output + + val output_encoding : output Data_encoding.t + + type ('input, 'output) ws_method_ += Websocket : (input, output) ws_method_ +end + let encoding_with_optional_extended_block_param encoding = Evm_node_lib_dev_encoding.Helpers.encoding_with_optional_last_param encoding @@ -908,6 +922,20 @@ module Unsubscribe = struct type ('input, 'output) method_ += Method : (input, output) method_ end +module WS_Block = struct + open Ethereum_types + + type input = unit + + let input_encoding = Data_encoding.unit + + type output = block + + let output_encoding = block_encoding + + type ('input, 'output) ws_method_ += Websocket : (input, output) ws_method_ +end + type map_result = | Method : ('input, 'output) method_ diff --git a/etherlink/bin_node/lib_dev/rpc_encodings.mli b/etherlink/bin_node/lib_dev/rpc_encodings.mli index b9eb25beca71..fff3735f3b19 100644 --- a/etherlink/bin_node/lib_dev/rpc_encodings.mli +++ b/etherlink/bin_node/lib_dev/rpc_encodings.mli @@ -134,6 +134,20 @@ module type METHOD = sig type ('input, 'output) method_ += Method : (input, output) method_ end +type ('input, 'output) ws_method_ = .. + +module type WS_METHOD = sig + type input + + val input_encoding : input Data_encoding.t + + type output + + val output_encoding : output Data_encoding.t + + type ('input, 'output) ws_method_ += Websocket : (input, output) ws_method_ +end + module Kernel_version : METHOD with type input = unit and type output = string module Kernel_root_hash : @@ -334,6 +348,9 @@ module Subscribe : module Unsubscribe : METHOD with type input = Ethereum_types.Subscription.id and type output = bool +module WS_Block : + WS_METHOD with type input = unit and type output = Ethereum_types.block + type map_result = | Method : ('input, 'output) method_ diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 8a648ed3b3c4..ddb0b186fdba 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -180,6 +180,13 @@ let dispatch_batch_service ~path = ~output:(batch_encoding JSONRPC.response_encoding) path +let dispatch_service_ws ~path = + Service.post_service + ~query:Query.empty + ~input:Data_encoding.unit + ~output:(request_encoding Subscription.response_encoding) + path + let get_block_by_number ~full_transaction_object block_param (module Rollup_node_rpc : Services_backend_sig.S) = let open Lwt_result_syntax in @@ -243,6 +250,50 @@ let generate_id () = Bytes.iteri (fun i _ -> Bytes.set_uint8 id i (Random.int 256)) id ; encode_id id +let decode_ws : + type a. (module WS_METHOD with type input = a) -> Data_encoding.json -> a = + fun (module M) v -> Data_encoding.Json.destruct M.input_encoding v + +let encode_ws : + type a. (module WS_METHOD with type output = a) -> a -> Data_encoding.json = + fun (module M) v -> Data_encoding.Json.construct M.output_encoding v + +let build_ws : + type input output. + (module WS_METHOD with type input = input and type output = output) -> + f:(input option -> (output, Rpc_errors.t) Result.t tzresult Lwt.t) -> + Data_encoding.json option -> + JSONRPC.value Lwt.t = + fun (module Websocket) ~f parameters -> + let open Lwt_syntax in + Lwt.catch + (fun () -> + let decoded = Option.map (decode_ws (module Websocket)) parameters in + let+ v = f decoded in + match v with + | Error err -> + Error + (Rpc_errors.internal_error + @@ Format.asprintf "%a" pp_print_trace err) + | Ok value -> Result.map (encode_ws (module Websocket)) value) + (fun exn -> + Lwt.return_error @@ Rpc_errors.invalid_request @@ Printexc.to_string exn) + +(* example_ws just show how build_ws can be used in the websocket flow *) +let example_ws (module Backend_rpc : Services_backend_sig.S) = + let open Lwt_result_syntax in + let f (_ : unit option) = + let* block = + get_block_by_number + ~full_transaction_object:false + Latest + (module Backend_rpc) + in + return (Ok block) + in + (* TODO: do something smart to give the proper id to build the response *) + build_ws (module WS_Block) ~f None + let eth_subscribe ~kind = let id = make_id ~id:(generate_id ()) in let stream, stopper = -- GitLab From 871265d9f67f47eb93636d6ab3520f3cd14d80c6 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 13 Nov 2024 10:34:54 +0100 Subject: [PATCH 22/30] EXP! request_encoding -> batch_encoding (compilation) --- etherlink/bin_node/lib_dev/services.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index ddb0b186fdba..2ce0ce0fefa0 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -184,7 +184,7 @@ let dispatch_service_ws ~path = Service.post_service ~query:Query.empty ~input:Data_encoding.unit - ~output:(request_encoding Subscription.response_encoding) + ~output:(batch_encoding Subscription.response_encoding) path let get_block_by_number ~full_transaction_object block_param -- GitLab From 98ac09779a368fd8b92f8b81310fb62f95149f0b Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 13 Nov 2024 11:05:43 +0100 Subject: [PATCH 23/30] EXP! repair IDE colors --- etherlink/bin_node/lib_dev/router.mli | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/etherlink/bin_node/lib_dev/router.mli b/etherlink/bin_node/lib_dev/router.mli index 294e0ead3784..6e45655ee5e1 100644 --- a/etherlink/bin_node/lib_dev/router.mli +++ b/etherlink/bin_node/lib_dev/router.mli @@ -6,7 +6,7 @@ (* *) (*****************************************************************************) -(** {1 Helper functions to build {!Dream} routes from {!Resto} services. *) +(* {1 Helper functions to build {!Dream} routes from {!Resto} services. *) (** [make_gen_route path service handler] builds a route by parsing the request and body, from a generic handler that constructs the response. *) -- GitLab From f3acdaefef262408bd91524a4f5f6af13056383e Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 13 Nov 2024 13:25:24 +0100 Subject: [PATCH 24/30] EXP! unified return type for the websocket stream --- etherlink/bin_node/lib_dev/services.ml | 36 +++++++++++++++++++++----- 1 file changed, 30 insertions(+), 6 deletions(-) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 2ce0ce0fefa0..5fd36de29bc5 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -166,6 +166,29 @@ let batch_encoding kind = (fun i -> Batch i); ]) +type ('a, 'b) ws_response = Simple of 'a | Websocket of 'b + +let failwith_unused_encoding ~__FUNCTION__ = + Stdlib.failwith @@ __FUNCTION__ ^ ": Unused encoding" + +let websocket_output_encoding = + let open Data_encoding in + union + [ + case + ~title:"simple" + Json_only + JSONRPC.response_encoding + (function Simple response -> Some response | _ -> None) + (fun _ -> failwith_unused_encoding ~__FUNCTION__); + case + ~title:"websocket" + Json_only + Subscription.response_encoding + (function Websocket response -> Some response | _ -> None) + (fun _ -> failwith_unused_encoding ~__FUNCTION__); + ] + let dispatch_service ~path = Service.post_service ~query:Query.empty @@ -183,8 +206,9 @@ let dispatch_batch_service ~path = let dispatch_service_ws ~path = Service.post_service ~query:Query.empty - ~input:Data_encoding.unit - ~output:(batch_encoding Subscription.response_encoding) + (* The input is always a JSONRPC request, event if it's for websockets. *) + ~input:JSONRPC.request_encoding + ~output:websocket_output_encoding path let get_block_by_number ~full_transaction_object block_param @@ -1068,11 +1092,11 @@ module Dream = struct let jsonrpc_websocket_route path rpc config ctx dispatch_request = Router.make_websocket_route path - (dispatch_service ~path:Path.root) + (dispatch_service_ws ~path:Path.root) (fun request -> - let stream = - dispatch_request rpc config ctx request |> Lwt_stream.return_lwt - in + let open Lwt_syntax in + let* stream = dispatch_request rpc config ctx request in + let stream = Lwt.return @@ Simple stream |> Lwt_stream.return_lwt in Lwt.return (stream, fun () -> ())) let public_routes ?delegate_health_check_to rpc config ctx = -- GitLab From ba5e30695f3bf2153baa56456250911b3588f819 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Wed, 13 Nov 2024 17:07:24 +0100 Subject: [PATCH 25/30] EXP! use proper stream per subscription id --- etherlink/bin_node/lib_dev/services.ml | 44 ++++++++++++++++++++++++-- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 5fd36de29bc5..9e4e9f924327 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -1096,8 +1096,48 @@ module Dream = struct (fun request -> let open Lwt_syntax in let* stream = dispatch_request rpc config ctx request in - let stream = Lwt.return @@ Simple stream |> Lwt_stream.return_lwt in - Lwt.return (stream, fun () -> ())) + let subscription_id = + if request.JSONRPC.method_ = "eth_subscribe" then + match stream.JSONRPC.value with + | Ok id -> + Some + (Data_encoding.Json.destruct + Ethereum_types.Subscription.id_encoding + id) + | Error _ -> None + else None + in + let stream, stopper = + match subscription_id with + | Some id -> + let {stream; stopper; _} = Stdlib.Hashtbl.find subscriptions id in + let response_stream : + (JSONRPC.response, Subscription.response) ws_response + Lwt_stream.t = + Lwt_stream.map + (fun block -> + Websocket + Rpc_encodings.Subscription. + { + params = + { + result = + Data_encoding.Json.construct + Ethereum_types.block_encoding + block; + subscription = id; + }; + }) + stream + in + (response_stream, fun () -> () (* stopper ? *)) + (* TODO: The response with the id is not sent for now, needs + to be done at some point. *) + | None -> + ( Lwt.return @@ Simple stream |> Lwt_stream.return_lwt, + fun () -> () ) + in + Lwt.return (stream, stopper)) let public_routes ?delegate_health_check_to rpc config ctx = [ -- GitLab From 4d699b786f9252abbc74057fa67f43fabba66057 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 14 Nov 2024 09:05:38 +0100 Subject: [PATCH 26/30] EXP! fix shutdown/stopper's type --- etherlink/bin_node/lib_dev/router.ml | 6 +++--- etherlink/bin_node/lib_dev/router.mli | 2 +- etherlink/bin_node/lib_dev/services.ml | 8 +++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/etherlink/bin_node/lib_dev/router.ml b/etherlink/bin_node/lib_dev/router.ml index bad2ab00d8a0..9bb39fe35ec6 100644 --- a/etherlink/bin_node/lib_dev/router.ml +++ b/etherlink/bin_node/lib_dev/router.ml @@ -213,7 +213,7 @@ let send_error media websocket error = let make_websocket_route (type input output) path (service : (_, _, _, _, input, output) Service.t) - (handler : input -> (output Lwt_stream.t * (unit -> unit)) Lwt.t) = + (handler : input -> (output Lwt_stream.t * Lwt_watcher.stopper) Lwt.t) = let open Lwt_syntax in let output_encoding = Service.output_encoding service in Dream.get path @@ fun request -> @@ -237,14 +237,14 @@ let make_websocket_route (type input output) path (Rpc_errors.internal_error (Printexc.exn_slot_name exn))) (Lwt_stream.wrap_exn stream) in - shutdown () ; + Lwt_watcher.shutdown shutdown ; return_unit in let async_write_stream (stream, shutdown) = Lwt.dont_wait (fun () -> write_stream (stream, shutdown)) (fun exn -> - shutdown () ; + Lwt_watcher.shutdown shutdown ; Dream.error @@ fun log -> log "Websocket write exception: %s" (Printexc.to_string exn)) in diff --git a/etherlink/bin_node/lib_dev/router.mli b/etherlink/bin_node/lib_dev/router.mli index 6e45655ee5e1..a014f39de73f 100644 --- a/etherlink/bin_node/lib_dev/router.mli +++ b/etherlink/bin_node/lib_dev/router.mli @@ -91,5 +91,5 @@ val make_websocket_route : 'input, 'output ) Tezos_rpc.Service.t -> - ('input -> ('output Lwt_stream.t * (unit -> unit)) Lwt.t) -> + ('input -> ('output Lwt_stream.t * Lwt_watcher.stopper) Lwt.t) -> Dream.route diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 9e4e9f924327..0b086bc8abd0 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -1130,12 +1130,14 @@ module Dream = struct }) stream in - (response_stream, fun () -> () (* stopper ? *)) + (response_stream, stopper) (* TODO: The response with the id is not sent for now, needs to be done at some point. *) | None -> - ( Lwt.return @@ Simple stream |> Lwt_stream.return_lwt, - fun () -> () ) + (* Using a fake stream here should be ok as we don't really use + the stream for RPCs that aren't related to subscriptions. *) + let _, stopper = Lwt_watcher.create_fake_stream () in + (Lwt.return @@ Simple stream |> Lwt_stream.return_lwt, stopper) in Lwt.return (stream, stopper)) -- GitLab From d9a3d3f57c1e8a8bf24c9d4678170542caec7a66 Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 14 Nov 2024 10:24:13 +0100 Subject: [PATCH 27/30] EXP! test with command in commit description dune exec etherlink/tezt/tests/main.exe -- --file evm_rollup.ml evm rpc websocket /threshold_encryption /dal --verbose --- etherlink/tezt/tests/evm_rollup.ml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/etherlink/tezt/tests/evm_rollup.ml b/etherlink/tezt/tests/evm_rollup.ml index 72b4fb40f1df..49c25b880aa3 100644 --- a/etherlink/tezt/tests/evm_rollup.ml +++ b/etherlink/tezt/tests/evm_rollup.ml @@ -1161,6 +1161,8 @@ let test_websocket_rpcs = let*@ block_number = Rpc.block_number evm_node in Check.((block_number = 2l) int32) ~error_msg:"Expected a block number of %R, but got %L" ; + let* () = Lwt_unix.sleep 100000000. in + (* stalling to request manually *) Log.info "getBlockReceipts" ; let* () = test_rpc_getBlockReceipts_aux ~evm_setup in Log.info "getTransactionCount" ; -- GitLab From 5518b703210de003e6ff74fcd9c882e0d84ca5fb Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 14 Nov 2024 10:25:05 +0100 Subject: [PATCH 28/30] EXP! tmp fix for sub encoding --- .../lib_dev/encodings/ethereum_types.ml | 27 +++++++++++-------- .../lib_dev/encodings/ethereum_types.mli | 8 ++++-- 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml index 19563425fe85..148acb6bb434 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml @@ -1127,7 +1127,7 @@ module Subscription = struct type logs = {address : address; topics : hash list} - let logs_encoding = + let _logs_encoding = let open Data_encoding in conv (fun {address; topics} -> (address, topics)) @@ -1136,23 +1136,28 @@ module Subscription = struct (req "address" address_encoding) (req "topics" (list hash_encoding))) - type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing + (* TODO/FIX: find a workaround for logs *) + type kind = + | NewHeads + (* | Logs of logs *) + | NewPendingTransactions + | Syncing let kind_encoding = let open Data_encoding in conv (function - | NewHeads -> ("newHeads", None) - | Logs logs -> ("logs", Some logs) - | NewPendingTransactions -> ("newPendingTransactions", None) - | Syncing -> ("syncing", None)) + | NewHeads -> ["newHeads"] + (* | Logs logs -> ("logs", Some logs) *) + | NewPendingTransactions -> ["newPendingTransactions"] + | Syncing -> ["syncing"]) (function - | "newHeads", None -> NewHeads - | "logs", Some logs -> Logs logs - | "newPendingTransactions", None -> NewPendingTransactions - | "syncing", None -> Syncing + | ["newHeads"] -> NewHeads + (* | "logs", Some logs -> Logs logs *) + | ["newPendingTransactions"] -> NewPendingTransactions + | ["syncing"] -> Syncing | _ -> raise Unknown_subscription) - (tup2 string (option logs_encoding)) + (list string) type id = Id of hex [@@ocaml.unboxed] diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli index 27b53c092330..2f20751c44ea 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli @@ -313,9 +313,13 @@ end module Subscription : sig exception Unknown_subscription - type logs = {address : address; topics : hash list} + (* type logs = {address : address; topics : hash list} *) - type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing + type kind = + | NewHeads + (* | Logs of logs *) + | NewPendingTransactions + | Syncing val kind_encoding : kind Data_encoding.t -- GitLab From e3192ed51a841a472be2100bfc904428924ad6dc Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Thu, 14 Nov 2024 10:53:27 +0100 Subject: [PATCH 29/30] EXP! tezt infinite block production --- etherlink/tezt/tests/evm_rollup.ml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/etherlink/tezt/tests/evm_rollup.ml b/etherlink/tezt/tests/evm_rollup.ml index 49c25b880aa3..4af548510d7c 100644 --- a/etherlink/tezt/tests/evm_rollup.ml +++ b/etherlink/tezt/tests/evm_rollup.ml @@ -1161,7 +1161,12 @@ let test_websocket_rpcs = let*@ block_number = Rpc.block_number evm_node in Check.((block_number = 2l) int32) ~error_msg:"Expected a block number of %R, but got %L" ; - let* () = Lwt_unix.sleep 100000000. in + let* () = + repeat 200 (fun () -> + let*@ _ = produce_block () in + let* () = Lwt_unix.sleep 5. in + unit) + in (* stalling to request manually *) Log.info "getBlockReceipts" ; let* () = test_rpc_getBlockReceipts_aux ~evm_setup in -- GitLab From ae5ba9615f52719d8bb4c0cdc0373edc524daa9b Mon Sep 17 00:00:00 2001 From: Rodi-Can Bozman Date: Fri, 15 Nov 2024 09:22:09 +0100 Subject: [PATCH 30/30] EXP! fix kind encoding to re-introduce logs --- .../lib_dev/encodings/ethereum_types.ml | 46 +++++++++++-------- .../lib_dev/encodings/ethereum_types.mli | 8 +--- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml index 148acb6bb434..a98253d785bc 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.ml @@ -1127,7 +1127,7 @@ module Subscription = struct type logs = {address : address; topics : hash list} - let _logs_encoding = + let logs_encoding = let open Data_encoding in conv (fun {address; topics} -> (address, topics)) @@ -1136,28 +1136,34 @@ module Subscription = struct (req "address" address_encoding) (req "topics" (list hash_encoding))) - (* TODO/FIX: find a workaround for logs *) - type kind = - | NewHeads - (* | Logs of logs *) - | NewPendingTransactions - | Syncing + type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing let kind_encoding = let open Data_encoding in - conv - (function - | NewHeads -> ["newHeads"] - (* | Logs logs -> ("logs", Some logs) *) - | NewPendingTransactions -> ["newPendingTransactions"] - | Syncing -> ["syncing"]) - (function - | ["newHeads"] -> NewHeads - (* | "logs", Some logs -> Logs logs *) - | ["newPendingTransactions"] -> NewPendingTransactions - | ["syncing"] -> Syncing - | _ -> raise Unknown_subscription) - (list string) + union + [ + case + ~title:"params_size_two" + Json_only + (tup2 string logs_encoding) + (function Logs logs -> Some ("logs", logs) | _ -> None) + (function + | "logs", logs -> Logs logs | _ -> raise Unknown_subscription); + case + ~title:"params_size_one" + Json_only + (tup1 string) + (function + | NewHeads -> Some "newHeads" + | NewPendingTransactions -> Some "newPendingTransactions" + | Syncing -> Some "syncing" + | _ -> None) + (function + | "newHeads" -> NewHeads + | "newPendingTransactions" -> NewPendingTransactions + | "syncing" -> Syncing + | _ -> raise Unknown_subscription); + ] type id = Id of hex [@@ocaml.unboxed] diff --git a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli index 2f20751c44ea..27b53c092330 100644 --- a/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli +++ b/etherlink/bin_node/lib_dev/encodings/ethereum_types.mli @@ -313,13 +313,9 @@ end module Subscription : sig exception Unknown_subscription - (* type logs = {address : address; topics : hash list} *) + type logs = {address : address; topics : hash list} - type kind = - | NewHeads - (* | Logs of logs *) - | NewPendingTransactions - | Syncing + type kind = NewHeads | Logs of logs | NewPendingTransactions | Syncing val kind_encoding : kind Data_encoding.t -- GitLab