From 94a911c83c23ed8cb4bd0f13454e04c656c5d56d Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 13 Feb 2025 09:55:11 +0100 Subject: [PATCH 1/6] Build: Add websocket_lwt_unix library for outbox monitoring dependencies --- manifest/externals.ml | 2 ++ manifest/product_websocket.ml | 8 ++++++++ manifest/product_websocket.mli | 2 ++ opam/octez-evm-node-libs.opam | 1 + opam/virtual/octez-deps.opam | 1 + websocket/lwt/dune | 10 ++++++++++ 6 files changed, 24 insertions(+) diff --git a/manifest/externals.ml b/manifest/externals.ml index 34f44386a9f2..6dd6ccc54ba8 100644 --- a/manifest/externals.ml +++ b/manifest/externals.ml @@ -171,6 +171,8 @@ let lwt_canceler = let lwt_exit = external_lib "lwt-exit" V.True +let lwt_log = external_lib "lwt_log" V.True + let lwt_ppx = external_lib "lwt_ppx" V.True let lwt_unix = external_sublib lwt "lwt.unix" diff --git a/manifest/product_websocket.ml b/manifest/product_websocket.ml index c52fb2fa0e5d..26756acaaa76 100644 --- a/manifest/product_websocket.ml +++ b/manifest/product_websocket.ml @@ -33,3 +33,11 @@ let websocket_cohttp_lwt = ~path:"websocket/lwt" ~modules:["websocket_cohttp_lwt"] ~deps:[cohttp_lwt_unix; websocket] + +let websocket_lwt_unix = + public_lib + "octez-evm-node-libs.websocket_lwt_unix" + ~internal_name:"websocket_lwt_unix" + ~path:"websocket/lwt" + ~modules:["websocket_lwt_unix"] + ~deps:[lwt_log; cohttp_lwt_unix; websocket] diff --git a/manifest/product_websocket.mli b/manifest/product_websocket.mli index ae57062ef2eb..bc375b4656e7 100644 --- a/manifest/product_websocket.mli +++ b/manifest/product_websocket.mli @@ -11,3 +11,5 @@ val product_source : string list val websocket : Manifest.target val websocket_cohttp_lwt : Manifest.target + +val websocket_lwt_unix : Manifest.target diff --git a/opam/octez-evm-node-libs.opam b/opam/octez-evm-node-libs.opam index ecf84047dd14..c7ecc880ed78 100644 --- a/opam/octez-evm-node-libs.opam +++ b/opam/octez-evm-node-libs.opam @@ -17,6 +17,7 @@ depends: [ "cohttp" { >= "5.3.1" } "mirage-crypto-rng" { >= "1.0.0" } "octez-libs" + "lwt_log" "octez-l2-libs" "crunch" { >= "3.3.0" } "caqti-lwt" { >= "2.0.1" } diff --git a/opam/virtual/octez-deps.opam b/opam/virtual/octez-deps.opam index 27f309f3a743..3ca158e61358 100644 --- a/opam/virtual/octez-deps.opam +++ b/opam/virtual/octez-deps.opam @@ -70,6 +70,7 @@ depends: [ "lwt-canceler" { >= "0.3" & < "0.4" } "lwt-exit" "lwt-watcher" { = "0.2" } + "lwt_log" "lwt_ppx" "magic-mime" { >= "1.3.1" } "memtrace" diff --git a/websocket/lwt/dune b/websocket/lwt/dune index 02778f45183b..ca8836dd8688 100644 --- a/websocket/lwt/dune +++ b/websocket/lwt/dune @@ -9,3 +9,13 @@ octez-libs.cohttp-lwt-unix octez-evm-node-libs.websocket) (modules websocket_cohttp_lwt)) + +(library + (name websocket_lwt_unix) + (public_name octez-evm-node-libs.websocket_lwt_unix) + (instrumentation (backend bisect_ppx)) + (libraries + lwt_log + octez-libs.cohttp-lwt-unix + octez-evm-node-libs.websocket) + (modules websocket_lwt_unix)) -- GitLab From 17ce2fe4bbe27489bfac0b8fd68b8c6dbdb643cd Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 18 Feb 2025 12:11:40 +0100 Subject: [PATCH 2/6] Build: update opam lock --- opam/virtual/octez-deps.opam.locked | 1 + 1 file changed, 1 insertion(+) diff --git a/opam/virtual/octez-deps.opam.locked b/opam/virtual/octez-deps.opam.locked index 74f780d1cc08..cc3bbfb83d8c 100644 --- a/opam/virtual/octez-deps.opam.locked +++ b/opam/virtual/octez-deps.opam.locked @@ -134,6 +134,7 @@ depends: [ "lwt-dllist" {= "1.0.1"} "lwt-exit" {= "1.0"} "lwt-watcher" {= "0.2"} + "lwt_log" {= "1.1.2"} "lwt_ppx" {= "5.8.0"} "lwt_react" {= "1.2.0"} "lwt_ssl" {= "1.2.0"} -- GitLab From 3be7f9ba63cdbdf13b40f43beef9a79402c663ee Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Wed, 12 Feb 2025 17:54:06 +0100 Subject: [PATCH 3/6] Websocket: fix for new conduit version --- websocket/lwt/websocket_lwt_unix.ml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/websocket/lwt/websocket_lwt_unix.ml b/websocket/lwt/websocket_lwt_unix.ml index 608509b219ae..63f7c30e7f4c 100644 --- a/websocket/lwt/websocket_lwt_unix.ml +++ b/websocket/lwt/websocket_lwt_unix.ml @@ -195,8 +195,13 @@ let establish_server ?read_buf ?write_buf ?timeout ?stop set_tcp_nodelay flow; Lwt.catch (fun () -> + let endp : Conduit.endp = + match Conduit_lwt_unix.endp_of_flow flow with + | #Conduit.endp as endp -> endp + | _ -> failwith "Unsupported tls tunnel" + in server_fun ?read_buf ?write_buf check_request - (Conduit_lwt_unix.endp_of_flow flow) + endp ic oc react) (function | End_of_file -> Lwt.return_unit -- GitLab From 931c623a5720eafbb791476e17ad5b4cce465abd Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Tue, 18 Feb 2025 15:21:04 +0100 Subject: [PATCH 4/6] EVM node: websocket client for JSONRPC requests and subscriptions --- etherlink/bin_node/index.mld | 1 + etherlink/bin_node/lib_dev/client/dune | 19 ++ .../lib_dev/client/websocket_client.ml | 309 ++++++++++++++++++ .../lib_dev/client/websocket_client.mli | 77 +++++ manifest/product_etherlink.ml | 14 + src/lib_rpc_http/media_type.mli | 2 + 6 files changed, 422 insertions(+) create mode 100644 etherlink/bin_node/lib_dev/client/dune create mode 100644 etherlink/bin_node/lib_dev/client/websocket_client.ml create mode 100644 etherlink/bin_node/lib_dev/client/websocket_client.mli diff --git a/etherlink/bin_node/index.mld b/etherlink/bin_node/index.mld index 27ca51cd9eec..ac40407dade2 100644 --- a/etherlink/bin_node/index.mld +++ b/etherlink/bin_node/index.mld @@ -6,6 +6,7 @@ It contains the following libraries: - {{!module-Evm_node_config}Evm_node_config}: Configuration for the EVM node - {{!module-Evm_node_lib_dev}Evm_node_lib_dev}: An implementation of a subset of Ethereum JSON-RPC API for the EVM rollup [dev version] +- {{!module-Evm_node_lib_dev_client}Evm_node_lib_dev_client}: Client library for communicating with an EVM node - {{!module-Evm_node_lib_dev_encoding}Evm_node_lib_dev_encoding}: EVM encodings for the EVM node and plugin for the WASM Debugger [dev version] - {{!module-Evm_node_migrations}Evm_node_migrations}: SQL migrations for the EVM node store - {{!module-Evm_node_rust_deps}Evm_node_rust_deps}: WASM runtime foreign archive diff --git a/etherlink/bin_node/lib_dev/client/dune b/etherlink/bin_node/lib_dev/client/dune new file mode 100644 index 000000000000..366df9e2b8e5 --- /dev/null +++ b/etherlink/bin_node/lib_dev/client/dune @@ -0,0 +1,19 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(library + (name evm_node_lib_dev_client) + (public_name octez-evm-node-libs.evm_node_lib_dev_client) + (instrumentation (backend bisect_ppx)) + (libraries + octez-libs.base + octez-evm-node-libs.websocket_lwt_unix + octez-evm-node-libs.evm_node_lib_dev_encoding + octez-evm-node-libs.evm_node_lib_dev + octez-libs.rpc-http) + (flags + (:standard) + -open Tezos_base.TzPervasives + -open Evm_node_lib_dev_encoding + -open Evm_node_lib_dev + -open Tezos_rpc_http)) diff --git a/etherlink/bin_node/lib_dev/client/websocket_client.ml b/etherlink/bin_node/lib_dev/client/websocket_client.ml new file mode 100644 index 000000000000..b3c565358d37 --- /dev/null +++ b/etherlink/bin_node/lib_dev/client/websocket_client.ml @@ -0,0 +1,309 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Functori *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +open Rpc_encodings + +type response = + | Response of JSONRPC.response + | Notification of Subscription.notification + +type t = { + media : Media_type.t; + conn : Websocket_lwt_unix.conn; + response_stream : response Lwt_stream.t; + mutable id : int; +} + +type 'a subscription = { + stream : 'a Lwt_stream.t; + unsubscribe : unit -> bool tzresult Lwt.t; +} + +type error += + | No_response of JSONRPC.request + | Request_failed of JSONRPC.request * JSONRPC.error + +let () = + register_error_kind + `Temporary + ~id:"websocket_client.no_repsonse" + ~title:"No response for the JSONRPC request" + ~description:"No response for the JSONRPC request." + ~pp:(fun ppf r -> + Format.fprintf + ppf + "No response on websocket for request %a" + Data_encoding.Json.pp + (Data_encoding.Json.construct JSONRPC.request_encoding r)) + Data_encoding.(obj1 (req "request" JSONRPC.request_encoding)) + (function No_response r -> Some r | _ -> None) + (fun r -> No_response r) ; + register_error_kind + `Temporary + ~id:"websocket_client.request_failed" + ~title:"JSONRPC request failed" + ~description:"JSONRPC request failed." + ~pp:(fun ppf (r, e) -> + Format.fprintf + ppf + "JSONRPC request %a on websocket failed for %a" + Data_encoding.Json.pp + (Data_encoding.Json.construct JSONRPC.request_encoding r) + Data_encoding.Json.pp + (Data_encoding.Json.construct JSONRPC.error_encoding e)) + Data_encoding.( + obj2 + (req "request" JSONRPC.request_encoding) + (req "error" JSONRPC.error_encoding)) + (function Request_failed (r, e) -> Some (r, e) | _ -> None) + (fun (r, e) -> Request_failed (r, e)) + +let opcode_of_media media = + match Media_type.name media with + | "application/octet-stream" -> Websocket.Frame.Opcode.Binary + | _ -> Websocket.Frame.Opcode.Text + +let new_id client = + let id = client.id in + client.id <- client.id + 1 ; + JSONRPC.Id_float (float_of_int id) + +let handle_message (media : Media_type.t) message = + match media.destruct JSONRPC.response_encoding message with + | Ok resp -> Lwt.return_some (Response resp) + | Error _ -> ( + match media.destruct Subscription.notification_encoding message with + | Ok notif -> Lwt.return_some (Notification notif) + | Error _ -> + (* TODO: log *) + Lwt.return_none) + +let disconnect conn = + let open Lwt_syntax in + Lwt.catch + (fun () -> + (* TODO: log *) + let* () = Websocket_lwt_unix.write conn (Websocket.Frame.close 1000) in + let* () = Websocket_lwt_unix.close_transport conn in + (* TODO: log *) + Format.eprintf "Closed websocket connection@." ; + return_unit) + (fun e -> + (* TODO: log *) + Format.eprintf "Disconnection failed: %s@." (Printexc.to_string e) ; + return_unit) + +let connect media uri = + let open Lwt_syntax in + let* endp = Resolver_lwt.resolve_uri ~uri Resolver_lwt_unix.system in + let ctx = Lazy.force Conduit_lwt_unix.default_ctx in + let* client = Conduit_lwt_unix.endp_to_client ~ctx endp in + let accept_header = Tezos_rpc_http.Media_type.accept_header [media] in + let content_header = Tezos_rpc_http.Media_type.acceptable_encoding [media] in + let extra_headers = + Cohttp.Header.of_list + [("Accept", accept_header); ("Content-type", content_header)] + in + let* conn = Websocket_lwt_unix.connect ~ctx client uri ~extra_headers in + let message_buffer = Buffer.create 256 in + let frame_stream = + Lwt_stream.from (fun () -> + Lwt.catch + (fun () -> + let* frame = Websocket_lwt_unix.read conn in + match frame.opcode with + | Close -> + let* () = disconnect conn in + return_none + | _ -> return_some frame) + (fun e -> + (* TODO: Log eof *) + Format.eprintf "Read: %s@." (Printexc.to_string e) ; + let* () = disconnect conn in + return_none)) + in + let response_stream = + Lwt_stream.filter_map_s + (function + | {Websocket.Frame.opcode = Ping; content; _} -> + (* We must answer ping frames from the server with a pong frame + with the same content. *) + let* () = + Websocket_lwt_unix.write + conn + (Websocket.Frame.create ~opcode:Pong ~content ()) + in + return_none + | {opcode = Pong; _} -> return_none + | {opcode = Close; _} -> + (* Cannot happen because frame_stream is closed when we receive this + opcode. *) + return_none + | {opcode = Text | Binary; content; final = false; _} -> + (* New fragmented message *) + Buffer.clear message_buffer ; + Buffer.add_string message_buffer content ; + return_none + | {opcode = Continuation; content; final = false; _} -> + (* Non final fragment of message, we add the content to the + buffer *) + Buffer.add_string message_buffer content ; + return_none + | {opcode = Text | Binary; content; final = true; _} -> + (* Complete message in frame *) + handle_message media content + | {opcode = Continuation; content; final = true; _} -> + (* Final data frame of fragmented message, the complete message is + the buffer + new content *) + let message = Buffer.contents message_buffer ^ content in + handle_message media message + | {opcode = Ctrl _ | Nonctrl _; _} -> + (* Ignore other frames *) + return_none) + frame_stream + in + return {media; conn; response_stream; id = 0} + +let disconnect {conn; _} = disconnect conn + +let send_jsonrpc_request_helper client (request : JSONRPC.request) = + let open Lwt_result_syntax in + let message = client.media.construct JSONRPC.request_encoding request in + let opcode = opcode_of_media client.media in + let*! () = + Websocket_lwt_unix.write + client.conn + (Websocket.Frame.create ~opcode ~content:message ()) + in + let stream = Lwt_stream.clone client.response_stream in + let*! response = + Lwt_stream.find_map + (function + | Response {id; value} when id = request.id -> Some value | _ -> None) + stream + in + match response with + | None -> tzfail (No_response request) + | Some (Error e) -> tzfail (Request_failed (request, e)) + | Some (Ok resp) -> return resp + +type (_, _) call = + | Call : + (module METHOD with type input = 'input and type output = 'output) + * 'input + -> ('input, 'output) call + +let send_jsonrpc : + type input output. t -> (input, output) call -> output tzresult Lwt.t = + fun client (Call ((module M), input)) -> + let open Lwt_result_syntax in + let id = new_id client in + let request = + JSONRPC. + { + method_ = M.method_; + parameters = Some (Data_encoding.Json.construct M.input_encoding input); + id = Some id; + } + in + let+ response = send_jsonrpc_request_helper client request in + Data_encoding.Json.destruct M.output_encoding response + +let subscribe client (kind : Ethereum_types.Subscription.kind) = + let open Lwt_result_syntax in + let id = new_id client in + let request = + JSONRPC. + { + method_ = Subscribe.method_; + parameters = + Some (Data_encoding.Json.construct Subscribe.input_encoding kind); + id = Some id; + } + in + let message = client.media.construct JSONRPC.request_encoding request in + let opcode = opcode_of_media client.media in + let*! () = + Websocket_lwt_unix.write + client.conn + (Websocket.Frame.create ~opcode ~content:message ()) + in + let stream = Lwt_stream.clone client.response_stream in + let stop, stop_resolver = Lwt.task () in + let stream = + Lwt_stream.from (fun () -> Lwt.choose [stop; Lwt_stream.get stream]) + in + let*! subscription_repsonse = + Lwt_stream.find_map + (function + | Response {id; value} when id = request.id -> ( + match value with + | Ok json -> + Some + (Ok + (Data_encoding.Json.destruct + Subscribe.output_encoding + json)) + | Error e -> Some (Error e)) + | _ -> None) + stream + in + let+ (subscription_id : Ethereum_types.Subscription.id) = + match subscription_repsonse with + | None -> tzfail (No_response request) + | Some (Error e) -> tzfail (Request_failed (request, e)) + | Some (Ok id) -> return id + in + let unsubscribe () = + let* r = + send_jsonrpc client (Call ((module Unsubscribe), subscription_id)) + in + Lwt.wakeup_later stop_resolver None ; + return r + in + let stream = + Lwt_stream.filter_map + (function + | Notification notif when notif.params.subscription = subscription_id -> + let event = + Data_encoding.Json.destruct + (Ethereum_types.Subscription.output_encoding + Transaction_object.encoding) + notif.params.result + in + Some event + | _ -> None) + stream + in + {stream; unsubscribe} + +let subscribe_filter client kind filter = + let open Lwt_result_syntax in + let+ {stream; unsubscribe} = subscribe client kind in + let stream = Lwt_stream.filter_map filter stream in + {stream; unsubscribe} + +let subscribe_newHeads client = + subscribe_filter client NewHeads @@ function + | Ethereum_types.Subscription.NewHeads h -> Some h + | _ -> None + +let subscribe_newPendingTransactions client = + subscribe_filter client NewPendingTransactions @@ function + | Ethereum_types.Subscription.NewPendingTransactions tx -> Some tx + | _ -> None + +let subscribe_syncing client = + subscribe_filter client Syncing @@ function + | Ethereum_types.Subscription.Syncing s -> Some s + | _ -> None + +let subscribe_logs ?address ?topics client = + subscribe_filter client (Logs {address; topics}) @@ function + | Ethereum_types.Subscription.Logs log -> Some log + | _ -> None diff --git a/etherlink/bin_node/lib_dev/client/websocket_client.mli b/etherlink/bin_node/lib_dev/client/websocket_client.mli new file mode 100644 index 000000000000..3feef4355fe3 --- /dev/null +++ b/etherlink/bin_node/lib_dev/client/websocket_client.mli @@ -0,0 +1,77 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Functori *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +(** Client to communicate with an EVM node over websockets *) + +open Rpc_encodings + +(** Type of client connected to a websocket server. *) +type t + +type error += + | No_response of JSONRPC.request + | Request_failed of JSONRPC.request * JSONRPC.error + +(** Subscriptions returned by [subscribe]. *) +type 'a subscription = { + stream : 'a Lwt_stream.t; (** The stream of events. *) + unsubscribe : unit -> bool tzresult Lwt.t; + (** A function to unsubscribe from events notifications. *) +} + +(** Wrapper type for calling a JSONRPC method with an input *) +type (_, _) call = + | Call : + (module METHOD with type input = 'input and type output = 'output) + * 'input + -> ('input, 'output) call + +(** [connect media uri] connects to an EVM node websocket server on [uri], + communication is either JSON or binary depending on [media]. *) +val connect : Media_type.t -> Uri.t -> t Lwt.t + +(** Disconnect the websocket client by sending a close frame and closing the + connection. *) +val disconnect : t -> unit Lwt.t + +(** [send_jsonrpc client (Call ((module Method), input))] makes a JSONRPC + request with the provided [Method] and [input] to the websocket [client]. It + returns the corresponding response. *) +val send_jsonrpc : t -> ('input, 'output) call -> 'output tzresult Lwt.t + +(** [subscribe client kind] creates a subscription of [kind] with the + websocket [client]. It returns a stream with the notifications and a + function to unsubscribe. *) +val subscribe : + t -> + Subscribe.input -> + Transaction_object.t Ethereum_types.Subscription.output subscription tzresult + Lwt.t + +(** [subscribe_newHeads client] is like [subscribe] but specialized for + newHeads events. *) +val subscribe_newHeads : + t -> Transaction_object.t Ethereum_types.block subscription tzresult Lwt.t + +(** [subscribe_newPendingTransactions client] is like [subscribe] but + specialized for newPendingTransactions events. *) +val subscribe_newPendingTransactions : + t -> Ethereum_types.hash subscription tzresult Lwt.t + +(** [subscribe_syncing client] is like [subscribe] but specialized for syncing + events. *) +val subscribe_syncing : + t -> Ethereum_types.Subscription.sync_output subscription tzresult Lwt.t + +(** [subscribe_logs ?address ?topics client] is like [subscribe] but + specialized for logs events filtered by [address] and/or [topics]. *) +val subscribe_logs : + ?address:Ethereum_types.Filter.filter_address -> + ?topics:Ethereum_types.Filter.topic option list -> + t -> + Ethereum_types.transaction_log subscription tzresult Lwt.t diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index 4f805a040aa5..72a0436046b8 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -299,6 +299,20 @@ let evm_node_lib_dev = performance_metrics; ] +let _evm_node_lib_dev_client = + octez_evm_node_lib + "evm_node_lib_dev_client" + ~path:"etherlink/bin_node/lib_dev/client" + ~synopsis:"Client library for communicating with an EVM node" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives"; + websocket_lwt_unix; + evm_node_lib_dev_encoding |> open_; + evm_node_lib_dev |> open_; + octez_rpc_http |> open_; + ] + let _octez_evm_node_tests = tests [ diff --git a/src/lib_rpc_http/media_type.mli b/src/lib_rpc_http/media_type.mli index 45c4c8045c8a..b2432ceb0943 100644 --- a/src/lib_rpc_http/media_type.mli +++ b/src/lib_rpc_http/media_type.mli @@ -44,6 +44,8 @@ val all_media_types : t list val accept_header : t list -> string +val acceptable_encoding : t list -> string + val first_complete_media : t list -> ((string * string) * t) option val encoding : t Data_encoding.t -- GitLab From c80fa8937283f9a725ace13523e1f00710cb22d8 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 13 Feb 2025 09:49:18 +0100 Subject: [PATCH 5/6] Outbox monitor: test websocket client by subscribing to new heads --- etherlink/bin_outbox_monitor/dune | 10 ++++--- etherlink/bin_outbox_monitor/main.ml | 39 ++++++++++++++++++++++++++-- manifest/product_etherlink.ml | 5 +++- opam/etherlink-outbox-monitor.opam | 2 +- 4 files changed, 49 insertions(+), 7 deletions(-) diff --git a/etherlink/bin_outbox_monitor/dune b/etherlink/bin_outbox_monitor/dune index 660f873848a8..9f6332f6fe18 100644 --- a/etherlink/bin_outbox_monitor/dune +++ b/etherlink/bin_outbox_monitor/dune @@ -8,6 +8,7 @@ (instrumentation (backend bisect_ppx)) (libraries bls12-381.archive + octez-evm-node-libs.evm_node_rust_deps octez-libs.base octez-libs.base.unix octez-version.value @@ -17,16 +18,19 @@ caqti-lwt re octez-l2-libs.sqlite - octez-evm-node-libs.evm_node_lib_dev_encoding) + octez-evm-node-libs.evm_node_lib_dev_encoding + octez-evm-node-libs.evm_node_lib_dev_client) (link_flags (:standard) - (:include %{workspace_root}/static-link-flags.sexp)) + (:include %{workspace_root}/static-link-flags.sexp) + (:include %{workspace_root}/macos-link-flags.sexp)) (flags (:standard) -open Tezos_base.TzPervasives -open Tezos_rpc_http -open Octez_sqlite - -open Evm_node_lib_dev_encoding)) + -open Evm_node_lib_dev_encoding + -open Evm_node_lib_dev_client)) (rule (target migrations.ml) diff --git a/etherlink/bin_outbox_monitor/main.ml b/etherlink/bin_outbox_monitor/main.ml index 862fafa8dbd9..d6b7511d7a65 100644 --- a/etherlink/bin_outbox_monitor/main.ml +++ b/etherlink/bin_outbox_monitor/main.ml @@ -12,6 +12,11 @@ let default_data_dir = Filename.concat (Sys.getenv "HOME") ".outbox-monitor" let default_global = {data_dir = default_data_dir; verbosity = Notice} +module Parameter = struct + let endpoint = + Tezos_clic.parameter (fun _ uri -> Lwt.return_ok (Uri.of_string uri)) +end + module Arg = struct let verbose = Tezos_clic.switch @@ -23,6 +28,20 @@ module Arg = struct let debug = Tezos_clic.switch ~long:"debug" ~doc:"Sets logging level to debug" () + let endpoint ~default ~long ~doc = + Tezos_clic.default_arg + ~default + ~long + ~doc + ~placeholder:"URL" + Parameter.endpoint + + let evm_node_endpoint = + endpoint + ~default:"http://127.0.0.1:8545/ws" + ~long:"evm-node" + ~doc:"Websocket endpoint to reach EVM node" + let data_dir = Tezos_clic.arg ~long:"data-dir" @@ -77,12 +96,28 @@ let run_command = let open Tezos_clic in om_command ~desc:"Start monitoring outbox" - no_options + (args1 Arg.evm_node_endpoint) (prefixes ["run"] @@ stop) - (fun {data_dir; verbosity} () _ -> + (fun {data_dir; verbosity} evm_node_endpoint _ -> let open Lwt_result_syntax in let*! () = log_config ~verbosity () in let* _db = Db.init ~data_dir `Read_write in + let*! ws_client = + Websocket_client.connect Media_type.json evm_node_endpoint + in + let* heads_subscription = Websocket_client.subscribe_newHeads ws_client in + let cpt = ref 0 in + let*! () = + Lwt_stream.iter_s + (fun (head : _ Ethereum_types.block) -> + incr cpt ; + Format.eprintf "Block %a@." Ethereum_types.pp_quantity head.number ; + if !cpt = 10 then + let*! _ = heads_subscription.unsubscribe () in + Lwt.return_unit + else Lwt.return_unit) + heads_subscription.stream + in return_unit) let commands = [run_command] diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index 72a0436046b8..90a024acaa32 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -299,7 +299,7 @@ let evm_node_lib_dev = performance_metrics; ] -let _evm_node_lib_dev_client = +let evm_node_lib_dev_client = octez_evm_node_lib "evm_node_lib_dev_client" ~path:"etherlink/bin_node/lib_dev/client" @@ -499,11 +499,13 @@ let _outbox_monitor = ~path:"etherlink/bin_outbox_monitor" ~opam:"etherlink-outbox-monitor" ~release_status:Unreleased + ~with_macos_security_framework:true ~synopsis: "A binary to monitor withdrawals in the outbox and their execution" ~deps: [ bls12_381_archive; + evm_node_rust_deps; octez_base |> open_ ~m:"TzPervasives"; octez_base_unix; octez_version_value; @@ -515,6 +517,7 @@ let _outbox_monitor = re; octez_sqlite |> open_; evm_node_lib_dev_encoding |> open_; + evm_node_lib_dev_client |> open_; ] ~dune: Dune. diff --git a/opam/etherlink-outbox-monitor.opam b/opam/etherlink-outbox-monitor.opam index 4024e01ded83..8ec94cc6c342 100644 --- a/opam/etherlink-outbox-monitor.opam +++ b/opam/etherlink-outbox-monitor.opam @@ -11,13 +11,13 @@ depends: [ "dune" { >= "3.11.1" } "ocaml" { >= "4.14" } "bls12-381" + "octez-evm-node-libs" { = version } "octez-libs" "octez-version" "caqti-lwt" { >= "2.0.1" } "crunch" { >= "3.3.0" } "re" { >= "1.10.0" } "octez-l2-libs" - "octez-evm-node-libs" { = version } ] build: [ ["rm" "-r" "vendors" "contrib"] -- GitLab From 4686d2a6200ab63a39c79a4fc609a116f0324857 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Thu, 13 Feb 2025 14:36:07 +0100 Subject: [PATCH 6/6] Outbox monitor: events for websocket client --- .../lib_dev/client/websocket_client.ml | 127 +++++++++++++++--- etherlink/bin_node/lib_dev/encodings/dune | 1 + .../lib_dev/encodings/websocket_encodings.ml | 76 +++++++++++ .../lib_dev/encodings/websocket_encodings.mli | 9 ++ etherlink/bin_node/lib_dev/evm_websocket.ml | 74 +--------- manifest/product_etherlink.ml | 1 + 6 files changed, 199 insertions(+), 89 deletions(-) create mode 100644 etherlink/bin_node/lib_dev/encodings/websocket_encodings.ml create mode 100644 etherlink/bin_node/lib_dev/encodings/websocket_encodings.mli diff --git a/etherlink/bin_node/lib_dev/client/websocket_client.ml b/etherlink/bin_node/lib_dev/client/websocket_client.ml index b3c565358d37..b41eba09d349 100644 --- a/etherlink/bin_node/lib_dev/client/websocket_client.ml +++ b/etherlink/bin_node/lib_dev/client/websocket_client.ml @@ -16,6 +16,7 @@ type t = { media : Media_type.t; conn : Websocket_lwt_unix.conn; response_stream : response Lwt_stream.t; + binary : bool; mutable id : int; } @@ -24,6 +25,81 @@ type 'a subscription = { unsubscribe : unit -> bool tzresult Lwt.t; } +module Event = struct + include Internal_event.Simple + + let section = ["evm"; "client"; "websocket"] + + let exn = + let open Data_encoding in + conv Printexc.to_string (fun s -> Failure s) string + + let pp_exn fmt e = Format.pp_print_string fmt (Printexc.to_string e) + + let disconnecting = + declare_0 + ~section + ~name:"websocket_client_disconnecting" + ~msg:"disconnecting websocket client" + ~level:Debug + () + + let disconnected = + declare_0 + ~section + ~name:"websocket_client_disconnected" + ~msg:"websocket client disconnected" + ~level:Debug + () + + let disconnection_error = + declare_1 + ~section + ~name:"websocket_client_disconnection_error" + ~msg:"websocket client disconnection error {exn}" + ~level:Debug + ("exn", exn) + ~pp1:pp_exn + + let decoding_error = + declare_2 + ~section + ~name:"websocket_client_decoding_error" + ~msg:"websocket client decoding error {error1}, {error2}" + ~level:Error + ("error1", Data_encoding.string) + ("error2", Data_encoding.string) + ~pp1:Format.pp_print_string + ~pp2:Format.pp_print_string + + let connection_closed = + declare_1 + ~section + ~name:"websocket_client_connection_closed" + ~msg:"websocket client connection was closed because {exn}" + ~level:Notice + ("exn", exn) + ~pp1:pp_exn + + let received_frame = + declare_1 + ~section + ~name:"websocket_client_received_frame" + ~msg:"websocket client received {frame}" + ~level:Debug + ("frame", Websocket_encodings.frame_encoding) + ~pp1:Websocket.Frame.pp + + let send_frame = + declare_1 + ~section + ~name:"websocket_client_send_frame" + ~msg:"websocket client sending {frame}" + ~level:Debug + ("frame", Websocket_encodings.frame_encoding) + ~pp1:Websocket.Frame.pp +end + type error += | No_response of JSONRPC.request | Request_failed of JSONRPC.request * JSONRPC.error @@ -63,10 +139,25 @@ let () = (function Request_failed (r, e) -> Some (r, e) | _ -> None) (fun (r, e) -> Request_failed (r, e)) -let opcode_of_media media = +module Websocket_lwt_unix = struct + include Websocket_lwt_unix + + let write conn fr = + let open Lwt_syntax in + let* () = Event.(emit send_frame) fr in + Websocket_lwt_unix.write conn fr + + let read conn = + let open Lwt_syntax in + let* fr = Websocket_lwt_unix.read conn in + let* () = Event.(emit received_frame) fr in + return fr +end + +let is_binary media = match Media_type.name media with - | "application/octet-stream" -> Websocket.Frame.Opcode.Binary - | _ -> Websocket.Frame.Opcode.Text + | "application/octet-stream" -> true + | _ -> false let new_id client = let id = client.id in @@ -74,28 +165,27 @@ let new_id client = JSONRPC.Id_float (float_of_int id) let handle_message (media : Media_type.t) message = + let open Lwt_syntax in match media.destruct JSONRPC.response_encoding message with - | Ok resp -> Lwt.return_some (Response resp) - | Error _ -> ( + | Ok resp -> return_some (Response resp) + | Error e1 -> ( match media.destruct Subscription.notification_encoding message with - | Ok notif -> Lwt.return_some (Notification notif) - | Error _ -> - (* TODO: log *) - Lwt.return_none) + | Ok notif -> return_some (Notification notif) + | Error e2 -> + let* () = Event.(emit decoding_error) (e1, e2) in + return_none) let disconnect conn = let open Lwt_syntax in Lwt.catch (fun () -> - (* TODO: log *) + let* () = Event.(emit disconnecting) () in let* () = Websocket_lwt_unix.write conn (Websocket.Frame.close 1000) in let* () = Websocket_lwt_unix.close_transport conn in - (* TODO: log *) - Format.eprintf "Closed websocket connection@." ; + let* () = Event.(emit disconnected) () in return_unit) (fun e -> - (* TODO: log *) - Format.eprintf "Disconnection failed: %s@." (Printexc.to_string e) ; + let* () = Event.(emit disconnection_error) e in return_unit) let connect media uri = @@ -122,8 +212,7 @@ let connect media uri = return_none | _ -> return_some frame) (fun e -> - (* TODO: Log eof *) - Format.eprintf "Read: %s@." (Printexc.to_string e) ; + let* () = Event.(emit connection_closed) e in let* () = disconnect conn in return_none)) in @@ -167,14 +256,14 @@ let connect media uri = return_none) frame_stream in - return {media; conn; response_stream; id = 0} + return {media; conn; response_stream; id = 0; binary = is_binary media} let disconnect {conn; _} = disconnect conn let send_jsonrpc_request_helper client (request : JSONRPC.request) = let open Lwt_result_syntax in let message = client.media.construct JSONRPC.request_encoding request in - let opcode = opcode_of_media client.media in + let opcode = if client.binary then Websocket.Frame.Opcode.Binary else Text in let*! () = Websocket_lwt_unix.write client.conn @@ -227,7 +316,7 @@ let subscribe client (kind : Ethereum_types.Subscription.kind) = } in let message = client.media.construct JSONRPC.request_encoding request in - let opcode = opcode_of_media client.media in + let opcode = if client.binary then Websocket.Frame.Opcode.Binary else Text in let*! () = Websocket_lwt_unix.write client.conn diff --git a/etherlink/bin_node/lib_dev/encodings/dune b/etherlink/bin_node/lib_dev/encodings/dune index bb3394bb1b43..1fda77e6749d 100644 --- a/etherlink/bin_node/lib_dev/encodings/dune +++ b/etherlink/bin_node/lib_dev/encodings/dune @@ -8,6 +8,7 @@ (libraries octez-libs.base octez-smart-rollup-wasm-debugger-plugin + octez-evm-node-libs.websocket re uuidm) (flags diff --git a/etherlink/bin_node/lib_dev/encodings/websocket_encodings.ml b/etherlink/bin_node/lib_dev/encodings/websocket_encodings.ml new file mode 100644 index 000000000000..4ed0b30df379 --- /dev/null +++ b/etherlink/bin_node/lib_dev/encodings/websocket_encodings.ml @@ -0,0 +1,76 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Functori *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +let opcode_encoding : Websocket.Frame.Opcode.t Data_encoding.t = + let open Data_encoding in + let open Websocket.Frame.Opcode in + union + [ + case + (Tag 0) + ~title:"continuation" + (constant "continuation") + (function Continuation -> Some () | _ -> None) + (fun () -> Continuation); + case + (Tag 1) + ~title:"text" + (constant "text") + (function Text -> Some () | _ -> None) + (fun () -> Text); + case + (Tag 2) + ~title:"binary" + (constant "binary") + (function Binary -> Some () | _ -> None) + (fun () -> Binary); + case + (Tag 8) + ~title:"close" + (constant "close") + (function Close -> Some () | _ -> None) + (fun () -> Close); + case + (Tag 9) + ~title:"ping" + (constant "ping") + (function Ping -> Some () | _ -> None) + (fun () -> Ping); + case + (Tag 10) + ~title:"pong" + (constant "pong") + (function Pong -> Some () | _ -> None) + (fun () -> Pong); + case + (Tag 15) + ~title:"ctrl" + (obj1 (req "ctrl" int31)) + (function Ctrl i -> Some i | _ -> None) + (fun i -> Ctrl i); + case + (Tag 255) + ~title:"nonctrl" + (obj1 (req "nonctrl" int31)) + (function Nonctrl i -> Some i | _ -> None) + (fun i -> Nonctrl i); + ] + +let frame_encoding : Websocket.Frame.t Data_encoding.t = + let open Data_encoding in + let open Websocket.Frame in + conv + (fun {opcode; extension; final; content} -> + (opcode, extension, final, content)) + (fun (opcode, extension, final, content) -> + {opcode; extension; final; content}) + @@ obj4 + (req "opcode" opcode_encoding) + (req "extension" int31) + (req "final" bool) + (req "content" string) diff --git a/etherlink/bin_node/lib_dev/encodings/websocket_encodings.mli b/etherlink/bin_node/lib_dev/encodings/websocket_encodings.mli new file mode 100644 index 000000000000..3fde7f785143 --- /dev/null +++ b/etherlink/bin_node/lib_dev/encodings/websocket_encodings.mli @@ -0,0 +1,9 @@ +(*****************************************************************************) +(* *) +(* SPDX-License-Identifier: MIT *) +(* SPDX-FileCopyrightText: 2025 Functori *) +(* SPDX-FileCopyrightText: 2025 Nomadic Labs *) +(* *) +(*****************************************************************************) + +val frame_encoding : Websocket.Frame.t Data_encoding.t diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index 467c23769928..41c91158d923 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -75,78 +75,12 @@ module Request = struct let view (req : _ t) = View req - let opcode_encoding : Websocket.Frame.Opcode.t Data_encoding.t = - let open Data_encoding in - let open Websocket.Frame.Opcode in - union - [ - case - (Tag 0) - ~title:"continuation" - (constant "continuation") - (function Continuation -> Some () | _ -> None) - (fun () -> Continuation); - case - (Tag 1) - ~title:"text" - (constant "text") - (function Text -> Some () | _ -> None) - (fun () -> Text); - case - (Tag 2) - ~title:"binary" - (constant "binary") - (function Binary -> Some () | _ -> None) - (fun () -> Binary); - case - (Tag 8) - ~title:"close" - (constant "close") - (function Close -> Some () | _ -> None) - (fun () -> Close); - case - (Tag 9) - ~title:"ping" - (constant "ping") - (function Ping -> Some () | _ -> None) - (fun () -> Ping); - case - (Tag 10) - ~title:"pong" - (constant "pong") - (function Pong -> Some () | _ -> None) - (fun () -> Pong); - case - (Tag 15) - ~title:"ctrl" - (obj1 (req "ctrl" int31)) - (function Ctrl i -> Some i | _ -> None) - (fun i -> Ctrl i); - case - (Tag 255) - ~title:"nonctrl" - (obj1 (req "nonctrl" int31)) - (function Nonctrl i -> Some i | _ -> None) - (fun i -> Nonctrl i); - ] - - let frame_encoding : Websocket.Frame.t Data_encoding.t = - let open Data_encoding in - let open Websocket.Frame in - conv - (fun {opcode; extension; final; content} -> - (opcode, extension, final, content)) - (fun (opcode, extension, final, content) -> - {opcode; extension; final; content}) - @@ obj4 - (req "opcode" opcode_encoding) - (req "extension" int31) - (req "final" bool) - (req "content" string) - let encoding = let open Data_encoding in - conv (fun (View (Frame r)) -> r) (fun r -> View (Frame r)) frame_encoding + conv + (fun (View (Frame r)) -> r) + (fun r -> View (Frame r)) + Websocket_encodings.frame_encoding let pp ppf (View (Frame r)) = Websocket.Frame.pp ppf r end diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index 90a024acaa32..b72c6d10a6ac 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -237,6 +237,7 @@ let evm_node_lib_dev_encoding = [ octez_base |> open_ ~m:"TzPervasives"; octez_scoru_wasm_debugger_plugin; + websocket; re; uuidm; ] -- GitLab