From fccfdacf8b23dfe835b6c7f42cdf61d2156acb9f Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Fri, 20 Dec 2024 23:56:58 +0100 Subject: [PATCH 1/8] Evm node/websocket: limit size of received messages The websocket connection is closed as soon as the message we are receiving exceeds 4MB. This protects against attacks where clients never send a final frame or send too much data on the websocket. --- etherlink/bin_node/lib_dev/evm_websocket.ml | 40 ++++++++++++++++----- 1 file changed, 32 insertions(+), 8 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index bb3389a0eb2f..fa231d6714b1 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -8,6 +8,11 @@ open Rpc_encodings +(* Maximum message size accepted by the websocket server, hardcoded to 4MB. + This should be enough for messages we expect to receive in the ethereum + JSONRPC protocol. *) +let max_message_length = 4096 * 1024 + type parameters = { push_frame : Websocket.Frame.t option -> unit; http_request : Cohttp.Request.t; @@ -16,11 +21,18 @@ type parameters = { handler : websocket_handler; } +type close_status = Normal_closure | Message_too_big + +(* https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 *) +let code_of_close_status = function + | Normal_closure -> 1000 + | Message_too_big -> 1009 + module Types = struct type subscriptions_table = (Ethereum_types.Subscription.id, unit -> unit) Stdlib.Hashtbl.t - type close_info = {reason : string; code : int} + type close_info = {reason : string; status : close_status} type state = { push_frame : Websocket.Frame.t option -> unit; @@ -216,20 +228,20 @@ type worker = Worker.infinite Worker.queue Worker.t let table = Worker.create_table Queue -let default_close_info = {Types.reason = ""; code = 1000} +let default_close_info = {Types.reason = ""; status = Normal_closure} -let shutdown_worker ~reason ?(code = default_close_info.code) w = +let shutdown_worker ~reason ?(status = default_close_info.status) w = let st = Worker.state w in (match st.close_info with | Some _ -> () - | None -> st.close_info <- Some {code; reason}) ; + | None -> st.close_info <- Some {status; reason}) ; Worker.shutdown w -let shutdown ~reason ?code conn = +let shutdown ~reason ?status conn = let open Lwt_syntax in match Worker.find_opt table conn with | None -> return_unit - | Some w -> shutdown_worker ~reason ?code w + | Some w -> shutdown_worker ~reason ?status w let handle_subscription {Types.push_frame; conn_descr; output_media_type; subscriptions; _} opcode @@ -330,6 +342,18 @@ let on_frame worker fr = (* Client has sent a close frame, we shut everything down for this worker *) shutdown_worker ~reason:"Received close frame" worker + | {opcode = Text | Binary; content; _} + when String.length content > max_message_length -> + (* We are receiving a message too big for the server *) + shutdown_worker ~reason:"Message too big" ~status:Message_too_big worker + | {opcode = Continuation; content; _} + when Buffer.length state.message_buffer + String.length content + > max_message_length -> + (* We are receiving a message too big for the server *) + shutdown_worker + ~reason:"Fragmented message too big" + ~status:Message_too_big + worker | {opcode = Text | Binary; content; final = false; _} -> (* New fragmented message *) Buffer.clear state.message_buffer ; @@ -428,13 +452,13 @@ module Handlers = struct Worker.state w in let nb_sub = Stdlib.Hashtbl.length subscriptions in - let {Types.reason; code} = + let {Types.reason; status} = Option.value close_info ~default:default_close_info in let* () = Event.(emit shutdown) (conn_descr, reason, nb_sub) in let () = try - push_frame (Some (Websocket.Frame.close code)) ; + push_frame (Some (Websocket.Frame.close (code_of_close_status status))) ; push_frame None with _ -> (* Websocket already closed *) () in -- GitLab From adf56cb4c8fd927c542ddbcc1389c99f26562e98 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Sun, 22 Dec 2024 12:45:14 +0100 Subject: [PATCH 2/8] Ocaml-websocket: allow to provide a maximum frame length --- websocket/core/websocket.ml | 16 +++++++++++----- websocket/core/websocket.mli | 2 ++ websocket/lwt/websocket_cohttp_lwt.ml | 8 ++++---- websocket/lwt/websocket_cohttp_lwt.mli | 1 + websocket/lwt/websocket_lwt_unix.ml | 3 ++- websocket/lwt/websocket_lwt_unix.mli | 1 + 6 files changed, 21 insertions(+), 10 deletions(-) diff --git a/websocket/core/websocket.ml b/websocket/core/websocket.ml index 86ec1311f32a..222e577b022f 100644 --- a/websocket/core/websocket.ml +++ b/websocket/core/websocket.ml @@ -155,6 +155,7 @@ module type S = sig type mode = Client of (int -> string) | Server val make_read_frame : + ?max_len:int -> ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit @@ -177,6 +178,7 @@ module type S = sig type t val create : + ?max_len:int -> ?read_buf:Buffer.t -> ?write_buf:Buffer.t -> Cohttp.Request.t -> @@ -265,7 +267,7 @@ module Make (IO : Cohttp.S.IO) = struct write_frame_to_buf ~mode buf @@ Frame.close code; write oc @@ Buffer.contents buf - let read_frame ic oc buf mode hdr = + let read_frame ?max_len ic oc buf mode hdr = let hdr_part1 = EndianString.BigEndian.get_int8 hdr 0 in let hdr_part2 = EndianString.BigEndian.get_int8 hdr 1 in let final = is_bit_set 7 hdr_part1 in @@ -291,6 +293,10 @@ module Make (IO : Cohttp.S.IO) = struct else if extension <> 0 then close_with_code mode buf oc 1002 >>= fun () -> proto_error "unsupported extension" + else if (match max_len with Some max -> payload_len > max | None -> false) + then + close_with_code mode buf oc 1009 >>= fun () -> + proto_error "frame payload too big" else if Frame.Opcode.is_ctrl opcode && payload_len > 125 then close_with_code mode buf oc 1002 >>= fun () -> proto_error "control frame too big" @@ -315,11 +321,11 @@ module Make (IO : Cohttp.S.IO) = struct let frame = Frame.of_bytes ~opcode ~extension ~final payload in return frame) - let make_read_frame ?(buf = Buffer.create 128) ~mode ic oc () = + let make_read_frame ?max_len ?(buf = Buffer.create 128) ~mode ic oc () = Buffer.clear buf; read_exactly ic 2 buf >>= function | None -> raise End_of_file - | Some hdr -> read_frame ic oc buf mode hdr + | Some hdr -> read_frame ?max_len ic oc buf mode hdr module Request = Cohttp.Request.Make (IO) module Response = Cohttp.Response.Make (IO) @@ -337,9 +343,9 @@ module Make (IO : Cohttp.S.IO) = struct let source { endp; _ } = endp - let create ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic + let create ?max_len ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic oc = - let read_frame = make_read_frame ?buf:read_buf ~mode:Server ic oc in + let read_frame = make_read_frame ?max_len ?buf:read_buf ~mode:Server ic oc in { buffer = write_buf; endp; diff --git a/websocket/core/websocket.mli b/websocket/core/websocket.mli index 04f1689646bc..359761b34fcb 100644 --- a/websocket/core/websocket.mli +++ b/websocket/core/websocket.mli @@ -84,6 +84,7 @@ module type S = sig type mode = Client of (int -> string) | Server val make_read_frame : + ?max_len:int -> ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit @@ -106,6 +107,7 @@ module type S = sig type t val create : + ?max_len:int -> ?read_buf:Buffer.t -> ?write_buf:Buffer.t -> Cohttp.Request.t -> diff --git a/websocket/lwt/websocket_cohttp_lwt.ml b/websocket/lwt/websocket_cohttp_lwt.ml index f3b5b9ca03b8..21376b943616 100644 --- a/websocket/lwt/websocket_cohttp_lwt.ml +++ b/websocket/lwt/websocket_cohttp_lwt.ml @@ -29,12 +29,12 @@ let send_frames stream oc = in Lwt_stream.iter_s send_frame stream -let read_frames ic oc handler_fn = - let read_frame = Lwt_IO.make_read_frame ~mode:Server ic oc in +let read_frames ?max_len ic oc handler_fn = + let read_frame = Lwt_IO.make_read_frame ?max_len ~mode:Server ic oc in let rec inner () = read_frame () >>= Lwt.wrap1 handler_fn >>= inner in inner () -let upgrade_connection request incoming_handler io_error_handler = +let upgrade_connection ?max_frame_length request incoming_handler io_error_handler = let headers = Cohttp.Request.headers request in (match Cohttp.Header.get headers "sec-websocket-key" with | None -> @@ -61,7 +61,7 @@ let upgrade_connection request incoming_handler io_error_handler = [ (* input: data from the client is read from the input channel * of the tcp connection; pass it to handler function *) - read_frames ic oc incoming_handler; + read_frames ?max_len:max_frame_length ic oc incoming_handler; (* output: data for the client is written to the output * channel of the tcp connection *) send_frames frames_out_stream oc; diff --git a/websocket/lwt/websocket_cohttp_lwt.mli b/websocket/lwt/websocket_cohttp_lwt.mli index af333b7a48a6..22f3f3233995 100644 --- a/websocket/lwt/websocket_cohttp_lwt.mli +++ b/websocket/lwt/websocket_cohttp_lwt.mli @@ -19,6 +19,7 @@ open Websocket val upgrade_connection : + ?max_frame_length:int -> Cohttp.Request.t -> (Frame.t -> unit) -> (exn -> unit Lwt.t) -> diff --git a/websocket/lwt/websocket_lwt_unix.ml b/websocket/lwt/websocket_lwt_unix.ml index 4080aa7b7bde..608509b219ae 100644 --- a/websocket/lwt/websocket_lwt_unix.ml +++ b/websocket/lwt/websocket_lwt_unix.ml @@ -101,11 +101,12 @@ let write { write_frame; _ } frame = write_frame frame let close_transport { oc; _ } = Lwt_io.close oc let connect ?(extra_headers = Cohttp.Header.init ()) + ?max_frame_length ?(random_string = Websocket.Rng.init ()) ?(ctx = Lazy.force Conduit_lwt_unix.default_ctx) ?buf client url = let nonce = Base64.encode_exn (random_string 16) in connect ctx client url nonce extra_headers >|= fun (ic, oc) -> - let read_frame = make_read_frame ?buf ~mode:(Client random_string) ic oc in + let read_frame = make_read_frame ?max_len:max_frame_length ?buf ~mode:(Client random_string) ic oc in let read_frame () = Lwt.catch read_frame (fun exn -> Lwt.async (fun () -> Lwt_io.close ic); diff --git a/websocket/lwt/websocket_lwt_unix.mli b/websocket/lwt/websocket_lwt_unix.mli index 2ad273690240..53a342e2bd81 100644 --- a/websocket/lwt/websocket_lwt_unix.mli +++ b/websocket/lwt/websocket_lwt_unix.mli @@ -39,6 +39,7 @@ val close_transport : conn -> unit Lwt.t val connect : ?extra_headers:Cohttp.Header.t -> + ?max_frame_length:int -> ?random_string:(int -> string) -> ?ctx:Conduit_lwt_unix.ctx -> ?buf:Buffer.t -> -- GitLab From 573371b68654170475037fc1c6df12ac673e471c Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Sun, 22 Dec 2024 12:48:56 +0100 Subject: [PATCH 3/8] Ocaml-websocket: save patch for maximum frame length --- ...allow-to-provide-a-maximum-frame-len.patch | 173 ++++++++++++++++++ 1 file changed, 173 insertions(+) create mode 100644 websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch diff --git a/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch b/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch new file mode 100644 index 000000000000..b7aaec477aeb --- /dev/null +++ b/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch @@ -0,0 +1,173 @@ +From 35e2ba071a2e0ac1fb39e58f71010200734a796f Mon Sep 17 00:00:00 2001 +From: Alain Mebsout +Date: Sun, 22 Dec 2024 12:45:14 +0100 +Subject: [PATCH 1/1] Ocaml-websocket: allow to provide a maximum frame length + +--- + websocket/core/websocket.ml | 16 +++++++++++----- + websocket/core/websocket.mli | 2 ++ + websocket/lwt/websocket_cohttp_lwt.ml | 8 ++++---- + websocket/lwt/websocket_cohttp_lwt.mli | 1 + + websocket/lwt/websocket_lwt_unix.ml | 3 ++- + websocket/lwt/websocket_lwt_unix.mli | 1 + + 6 files changed, 21 insertions(+), 10 deletions(-) + +diff --git a/websocket/core/websocket.ml b/websocket/core/websocket.ml +index 86ec1311f32..222e577b022 100644 +--- a/websocket/core/websocket.ml ++++ b/websocket/core/websocket.ml +@@ -155,6 +155,7 @@ module type S = sig + type mode = Client of (int -> string) | Server + + val make_read_frame : ++ ?max_len:int -> + ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t + + val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit +@@ -177,6 +178,7 @@ module type S = sig + type t + + val create : ++ ?max_len:int -> + ?read_buf:Buffer.t -> + ?write_buf:Buffer.t -> + Cohttp.Request.t -> +@@ -265,7 +267,7 @@ module Make (IO : Cohttp.S.IO) = struct + write_frame_to_buf ~mode buf @@ Frame.close code; + write oc @@ Buffer.contents buf + +- let read_frame ic oc buf mode hdr = ++ let read_frame ?max_len ic oc buf mode hdr = + let hdr_part1 = EndianString.BigEndian.get_int8 hdr 0 in + let hdr_part2 = EndianString.BigEndian.get_int8 hdr 1 in + let final = is_bit_set 7 hdr_part1 in +@@ -291,6 +293,10 @@ module Make (IO : Cohttp.S.IO) = struct + else if extension <> 0 then + close_with_code mode buf oc 1002 >>= fun () -> + proto_error "unsupported extension" ++ else if (match max_len with Some max -> payload_len > max | None -> false) ++ then ++ close_with_code mode buf oc 1009 >>= fun () -> ++ proto_error "frame payload too big" + else if Frame.Opcode.is_ctrl opcode && payload_len > 125 then + close_with_code mode buf oc 1002 >>= fun () -> + proto_error "control frame too big" +@@ -315,11 +321,11 @@ module Make (IO : Cohttp.S.IO) = struct + let frame = Frame.of_bytes ~opcode ~extension ~final payload in + return frame) + +- let make_read_frame ?(buf = Buffer.create 128) ~mode ic oc () = ++ let make_read_frame ?max_len ?(buf = Buffer.create 128) ~mode ic oc () = + Buffer.clear buf; + read_exactly ic 2 buf >>= function + | None -> raise End_of_file +- | Some hdr -> read_frame ic oc buf mode hdr ++ | Some hdr -> read_frame ?max_len ic oc buf mode hdr + + module Request = Cohttp.Request.Make (IO) + module Response = Cohttp.Response.Make (IO) +@@ -337,9 +343,9 @@ module Make (IO : Cohttp.S.IO) = struct + + let source { endp; _ } = endp + +- let create ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic ++ let create ?max_len ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic + oc = +- let read_frame = make_read_frame ?buf:read_buf ~mode:Server ic oc in ++ let read_frame = make_read_frame ?max_len ?buf:read_buf ~mode:Server ic oc in + { + buffer = write_buf; + endp; +diff --git a/websocket/core/websocket.mli b/websocket/core/websocket.mli +index 04f1689646b..359761b34fc 100644 +--- a/websocket/core/websocket.mli ++++ b/websocket/core/websocket.mli +@@ -84,6 +84,7 @@ module type S = sig + type mode = Client of (int -> string) | Server + + val make_read_frame : ++ ?max_len:int -> + ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t + + val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit +@@ -106,6 +107,7 @@ module type S = sig + type t + + val create : ++ ?max_len:int -> + ?read_buf:Buffer.t -> + ?write_buf:Buffer.t -> + Cohttp.Request.t -> +diff --git a/websocket/lwt/websocket_cohttp_lwt.ml b/websocket/lwt/websocket_cohttp_lwt.ml +index f3b5b9ca03b..21376b94361 100644 +--- a/websocket/lwt/websocket_cohttp_lwt.ml ++++ b/websocket/lwt/websocket_cohttp_lwt.ml +@@ -29,12 +29,12 @@ let send_frames stream oc = + in + Lwt_stream.iter_s send_frame stream + +-let read_frames ic oc handler_fn = +- let read_frame = Lwt_IO.make_read_frame ~mode:Server ic oc in ++let read_frames ?max_len ic oc handler_fn = ++ let read_frame = Lwt_IO.make_read_frame ?max_len ~mode:Server ic oc in + let rec inner () = read_frame () >>= Lwt.wrap1 handler_fn >>= inner in + inner () + +-let upgrade_connection request incoming_handler io_error_handler = ++let upgrade_connection ?max_frame_length request incoming_handler io_error_handler = + let headers = Cohttp.Request.headers request in + (match Cohttp.Header.get headers "sec-websocket-key" with + | None -> +@@ -61,7 +61,7 @@ let upgrade_connection request incoming_handler io_error_handler = + [ + (* input: data from the client is read from the input channel + * of the tcp connection; pass it to handler function *) +- read_frames ic oc incoming_handler; ++ read_frames ?max_len:max_frame_length ic oc incoming_handler; + (* output: data for the client is written to the output + * channel of the tcp connection *) + send_frames frames_out_stream oc; +diff --git a/websocket/lwt/websocket_cohttp_lwt.mli b/websocket/lwt/websocket_cohttp_lwt.mli +index af333b7a48a..22f3f323399 100644 +--- a/websocket/lwt/websocket_cohttp_lwt.mli ++++ b/websocket/lwt/websocket_cohttp_lwt.mli +@@ -19,6 +19,7 @@ + open Websocket + + val upgrade_connection : ++ ?max_frame_length:int -> + Cohttp.Request.t -> + (Frame.t -> unit) -> + (exn -> unit Lwt.t) -> +diff --git a/websocket/lwt/websocket_lwt_unix.ml b/websocket/lwt/websocket_lwt_unix.ml +index 4080aa7b7bd..608509b219a 100644 +--- a/websocket/lwt/websocket_lwt_unix.ml ++++ b/websocket/lwt/websocket_lwt_unix.ml +@@ -101,11 +101,12 @@ let write { write_frame; _ } frame = write_frame frame + let close_transport { oc; _ } = Lwt_io.close oc + + let connect ?(extra_headers = Cohttp.Header.init ()) ++ ?max_frame_length + ?(random_string = Websocket.Rng.init ()) + ?(ctx = Lazy.force Conduit_lwt_unix.default_ctx) ?buf client url = + let nonce = Base64.encode_exn (random_string 16) in + connect ctx client url nonce extra_headers >|= fun (ic, oc) -> +- let read_frame = make_read_frame ?buf ~mode:(Client random_string) ic oc in ++ let read_frame = make_read_frame ?max_len:max_frame_length ?buf ~mode:(Client random_string) ic oc in + let read_frame () = + Lwt.catch read_frame (fun exn -> + Lwt.async (fun () -> Lwt_io.close ic); +diff --git a/websocket/lwt/websocket_lwt_unix.mli b/websocket/lwt/websocket_lwt_unix.mli +index 2ad27369024..53a342e2bd8 100644 +--- a/websocket/lwt/websocket_lwt_unix.mli ++++ b/websocket/lwt/websocket_lwt_unix.mli +@@ -39,6 +39,7 @@ val close_transport : conn -> unit Lwt.t + + val connect : + ?extra_headers:Cohttp.Header.t -> ++ ?max_frame_length:int -> + ?random_string:(int -> string) -> + ?ctx:Conduit_lwt_unix.ctx -> + ?buf:Buffer.t -> +-- +2.44.0 -- GitLab From c11ee71514e2d18e2e22dc677ce68bf0bcc99f40 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Sun, 22 Dec 2024 12:46:18 +0100 Subject: [PATCH 4/8] EVM node/websocket: don't attempt to read frame above 4MB --- etherlink/bin_node/lib_dev/evm_websocket.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index fa231d6714b1..1283600528cb 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -496,6 +496,7 @@ let cohttp_callback handler (conn : Cohttp_lwt_unix.Server.conn) req _body = let* response_action, push_frame = Websocket_cohttp_lwt.upgrade_connection req + ~max_frame_length:max_message_length (new_frame conn_name) (fun io_exn -> let reason = -- GitLab From 58e1c5b68301d11709c57a4b54a76140d06de62e Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Mon, 6 Jan 2025 09:12:35 +0100 Subject: [PATCH 5/8] Evm node: configurable maximum websocket message length --- etherlink/bin_node/config/configuration.ml | 23 ++++++++++++--- etherlink/bin_node/config/configuration.mli | 1 + etherlink/bin_node/lib_dev/evm_directory.ml | 6 ++-- etherlink/bin_node/lib_dev/evm_directory.mli | 2 +- etherlink/bin_node/lib_dev/evm_websocket.ml | 31 +++++++++++++------- etherlink/bin_node/lib_dev/evm_websocket.mli | 9 ++++-- etherlink/bin_node/lib_dev/services.ml | 2 ++ 7 files changed, 53 insertions(+), 21 deletions(-) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index fd91ee51dd36..2c7bfb141ae8 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -54,6 +54,7 @@ type experimental_features = { history_mode : history_mode; rpc_server : rpc_server; enable_websocket : bool; + max_websocket_message_length : int; } type sequencer = { @@ -140,6 +141,10 @@ let default_enable_send_raw_transaction = true let default_garbage_collector_parameters = {split_frequency_in_seconds = 86_400; number_of_chunks = 14} +(* This should be enough for messages we expect to receive in the ethereum + JSONRPC protocol. *) +let default_max_socket_message_length = 4096 * 1024 + let default_experimental_features = { enable_send_raw_transaction = default_enable_send_raw_transaction; @@ -152,6 +157,7 @@ let default_experimental_features = history_mode = Archive; rpc_server = Resto; enable_websocket = false; + max_websocket_message_length = default_max_socket_message_length; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".octez-evm-node" @@ -736,6 +742,7 @@ let experimental_features_encoding = history_mode; rpc_server; enable_websocket; + max_websocket_message_length; } -> ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, @@ -747,7 +754,7 @@ let experimental_features_encoding = garbage_collector_parameters, history_mode, None ), - (rpc_server, enable_websocket) )) + (rpc_server, enable_websocket, max_websocket_message_length) )) (fun ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, enable_send_raw_transaction, @@ -758,7 +765,7 @@ let experimental_features_encoding = garbage_collector_parameters, history_mode, _next_wasm_runtime ), - (rpc_server, enable_websocket) ) -> + (rpc_server, enable_websocket, max_websocket_message_length) ) -> { drop_duplicate_on_injection; blueprints_publisher_order_enabled; @@ -770,6 +777,7 @@ let experimental_features_encoding = history_mode; rpc_server; enable_websocket; + max_websocket_message_length; }) (merge_objs (obj10 @@ -844,7 +852,7 @@ let experimental_features_encoding = DEPRECATED: You should remove this option from your \ configuration file." bool)) - (obj2 + (obj3 (dft "rpc_server" ~description: @@ -856,7 +864,14 @@ let experimental_features_encoding = "enable_websocket" ~description:"Enable or disable the experimental websocket server" bool - default_experimental_features.enable_websocket))) + default_experimental_features.enable_websocket) + (dft + "max_websocket_message_length" + ~description: + "Maximum message size accepted by the websocket server (only \ + for Resto backend)" + int31 + default_max_socket_message_length))) let proxy_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index 2dafde1e141d..1d3ebb1c06d2 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -86,6 +86,7 @@ type experimental_features = { history_mode : history_mode; rpc_server : rpc_server; enable_websocket : bool; + max_websocket_message_length : int; } type sequencer = { diff --git a/etherlink/bin_node/lib_dev/evm_directory.ml b/etherlink/bin_node/lib_dev/evm_directory.ml index 92ee192b25dc..6a93331091ff 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.ml +++ b/etherlink/bin_node/lib_dev/evm_directory.ml @@ -93,10 +93,12 @@ let register_metrics path dir = let route = Router.make_metrics_route path in Dream (route :: routes) -let jsonrpc_websocket_register dir path handler = +let jsonrpc_websocket_register ~max_message_length dir path handler = match dir with | Resto {dir; extra} -> - let callback = Evm_websocket.cohttp_callback handler in + let callback = + Evm_websocket.cohttp_callback ~max_message_length handler + in Resto {dir; extra = EndpointMap.add (`GET, path) callback extra} | Dream routes -> let route = Router.make_jsonrpc_websocket_route path handler in diff --git a/etherlink/bin_node/lib_dev/evm_directory.mli b/etherlink/bin_node/lib_dev/evm_directory.mli index beca87ee4ee9..fa5a0c2b623a 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.mli +++ b/etherlink/bin_node/lib_dev/evm_directory.mli @@ -70,7 +70,7 @@ val register_metrics : string -> t -> t JSONRPC response and optionally produce output elements in a stream for subscription services. *) val jsonrpc_websocket_register : - t -> string -> Rpc_encodings.websocket_handler -> t + max_message_length:int -> t -> string -> Rpc_encodings.websocket_handler -> t (** {2 Curried functions with respect to service parameters} *) diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index 1283600528cb..ee8b29a81958 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -8,16 +8,12 @@ open Rpc_encodings -(* Maximum message size accepted by the websocket server, hardcoded to 4MB. - This should be enough for messages we expect to receive in the ethereum - JSONRPC protocol. *) -let max_message_length = 4096 * 1024 - type parameters = { push_frame : Websocket.Frame.t option -> unit; http_request : Cohttp.Request.t; conn : Cohttp_lwt_unix.Server.conn; medias : Media_type.t list; + max_message_length : int; handler : websocket_handler; } @@ -39,6 +35,7 @@ module Types = struct conn_descr : string; input_media_type : Media_type.t; output_media_type : Media_type.t; + max_message_length : int; handler : websocket_handler; message_buffer : Buffer.t; subscriptions : subscriptions_table; @@ -296,7 +293,14 @@ let opcode_of_media media = let on_frame worker fr = let open Lwt_syntax in let state = Worker.state worker in - let {Types.push_frame; input_media_type; output_media_type; handler; _} = + let { + Types.push_frame; + input_media_type; + output_media_type; + max_message_length; + handler; + _; + } = state in let push_frame f = push_frame (Some f) in @@ -390,7 +394,8 @@ module Handlers = struct type launch_error = [`Not_acceptable | `Unsupported_media_type of string] let on_launch _w _name - ({push_frame; http_request; conn; medias; handler} : Types.parameters) = + ({push_frame; http_request; conn; medias; max_message_length; handler} : + Types.parameters) = let open Lwt_result_syntax in let headers = Cohttp.Request.headers http_request in let medias = @@ -421,6 +426,7 @@ module Handlers = struct conn_descr; input_media_type; output_media_type; + max_message_length; handler; message_buffer = Buffer.create 256; subscriptions = Stdlib.Hashtbl.create 3; @@ -468,14 +474,14 @@ module Handlers = struct end let start (conn : Cohttp_lwt_unix.Server.conn) (http_request : Cohttp.Request.t) - medias handler push_frame = + medias ~max_message_length handler push_frame = let open Lwt_result_syntax in let name = Cohttp.Connection.to_string (snd conn) in let* (_worker : _ Worker.t) = Worker.launch table name - {push_frame; http_request; conn; medias; handler} + {push_frame; http_request; conn; medias; max_message_length; handler} (module Handlers) in return_unit @@ -489,7 +495,8 @@ let new_frame conn fr = cases. In this case we could lose frames. *) Worker.Queue.push_request_now w (Request.Frame fr) -let cohttp_callback handler (conn : Cohttp_lwt_unix.Server.conn) req _body = +let cohttp_callback ~max_message_length handler + (conn : Cohttp_lwt_unix.Server.conn) req _body = let open Lwt_syntax in let media_types = Supported_media_types.all in let conn_name = conn |> snd |> Cohttp.Connection.to_string in @@ -504,7 +511,9 @@ let cohttp_callback handler (conn : Cohttp_lwt_unix.Server.conn) req _body = in shutdown ~reason conn_name) in - let+ res = start conn req media_types handler push_frame in + let+ res = + start conn req media_types ~max_message_length handler push_frame + in match res with | Ok () -> response_action | Error err -> diff --git a/etherlink/bin_node/lib_dev/evm_websocket.mli b/etherlink/bin_node/lib_dev/evm_websocket.mli index 39581d7e51bc..4115b1493a03 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.mli +++ b/etherlink/bin_node/lib_dev/evm_websocket.mli @@ -7,10 +7,13 @@ (*****************************************************************************) (** Callback to use for a websocket endpoint in a Cohttp server. - [cohttp_callback handler conn req body] upgrades the connection for request - [req] to the websocket protocol and starts worker that processes incoming - frames and writes in return in the websocket. *) + [cohttp_callback ~max_message_length handler conn req body] upgrades the + connection for request [req] to the websocket protocol and starts worker + that processes incoming frames and writes in return in the + websocket. [max_message_length] specifies the maximum size of message + accepted by the server, over which the connection will be closed. *) val cohttp_callback : + max_message_length:int -> Rpc_encodings.websocket_handler -> Cohttp_lwt_unix.Server.conn -> Cohttp.Request.t -> diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 5f13e2147098..be4634bd24af 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -995,6 +995,8 @@ let generic_websocket_dispatch (rpc : Configuration.rpc) (config : Configuration.t) ctx dir path dispatch_websocket = if config.experimental_features.enable_websocket then Evm_directory.jsonrpc_websocket_register + ~max_message_length: + config.experimental_features.max_websocket_message_length dir path (dispatch_websocket rpc config ctx) -- GitLab From 5d48f6a0f716b57c806983a765e3452e4633447f Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Mon, 6 Jan 2025 10:07:02 +0100 Subject: [PATCH 6/8] Test/Etherlink: websocket server receiving message larger than max --- etherlink/tezt/lib/evm_node.ml | 6 +++- etherlink/tezt/lib/evm_node.mli | 11 ++++--- etherlink/tezt/tests/evm_sequencer.ml | 44 +++++++++++++++++++++++++++ tezt/lib_tezos/websocket.ml | 8 +++-- tezt/lib_tezos/websocket.mli | 5 +++ 5 files changed, 65 insertions(+), 9 deletions(-) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 7d4da43ea69f..67e121a185b2 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -1226,7 +1226,7 @@ let patch_config_with_experimental_feature ?(blueprints_publisher_order_enabled = false) ?(block_storage_sqlite3 = true) ?(next_wasm_runtime = true) ?garbage_collector_parameters ?history_mode ?rpc_server - ?(enable_websocket = false) () = + ?(enable_websocket = false) ?max_websocket_message_length () = let conditional_json_put ~name cond value_json json = if cond then JSON.put @@ -1282,6 +1282,10 @@ let patch_config_with_experimental_feature | Resto -> `String "resto" | Dream -> `String "dream") |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) + |> optional_json_put + max_websocket_message_length + ~name:"max_websocket_message_length" + (fun max -> `Float (float_of_int max)) let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs ?websockets rollup_node = diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 70d976ae53ae..b9b574f76d92 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -297,11 +297,11 @@ type history_mode = Archive | Rolling type rpc_server = Resto | Dream -(** [patch_config_with_experimental_feature - ?drop_duplicate_when_injection ?block_storage_sqlite3 - ?next_wasm_runtime ?rpc_server ?enable_websocket json_config] patches a config to - add experimental feature. Each optional argument add the - correspondent experimental feature. *) +(** [patch_config_with_experimental_feature ?drop_duplicate_when_injection + ?block_storage_sqlite3 ?next_wasm_runtime ?rpc_server ?enable_websocket + ?max_websocket_message_length json_config] patches a config to add + experimental feature. Each optional argument add the correspondent + experimental feature. *) val patch_config_with_experimental_feature : ?drop_duplicate_when_injection:bool -> ?blueprints_publisher_order_enabled:bool -> @@ -311,6 +311,7 @@ val patch_config_with_experimental_feature : ?history_mode:history_mode -> ?rpc_server:rpc_server -> ?enable_websocket:bool -> + ?max_websocket_message_length:int -> unit -> JSON.t -> JSON.t diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 8f7ff32277a1..98ae12536bc5 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -9028,6 +9028,49 @@ let test_websocket_cleanup = ~error_msg:"All subscriptions should have been cleaned up from node" ; unit +let test_websocket_max_message_length () = + let max_websocket_message_length = 512 in + let patch_config = + Evm_node.patch_config_with_experimental_feature + ~rpc_server:Resto (* The limit is not implemented for Dream *) + ~enable_websocket:true + ~max_websocket_message_length + () + in + register_sandbox + ~tags:["evm"; "rpc"; "websocket"; "max"] + ~title:"Websocket server does not accept messages larger than maximum" + ~patch_config + @@ fun sequencer -> + let* websocket = Evm_node.open_websocket sequencer in + let shutdown = + Lwt.pick + [ + ( Evm_node.wait_for sequencer "websocket_shutdown.v0" @@ fun json -> + Some JSON.(json |-> "reason" |> as_string) ); + (let* () = Lwt_unix.sleep 5. in + Test.fail ~__LOC__ "Websocket worker did not close."); + ] + in + let* () = + Websocket.send_raw + websocket + (String.make (max_websocket_message_length + 1) '\000') + in + let* reason = shutdown in + Log.info "Websocket shutdown reason: %S." reason ; + Check.(reason =~ rex "too big") + ~error_msg:"Expected reason to match %R, got %L" ; + Lwt.catch + (fun () -> + let* _ = Websocket.recv websocket in + Test.fail ~__LOC__ "Connection was not closed.") + (function + | Websocket.Connection_closed -> + Log.info "Connection was closed on client end." ; + unit + | e -> Lwt.reraise e) + let test_websocket_newPendingTransactions_event = register_all ~tags:["evm"; "rpc"; "websocket"; "new_pending_transactions"] @@ -9499,6 +9542,7 @@ let () = [Protocol.Alpha] ; test_websocket_newHeads_event [Protocol.Alpha] ; test_websocket_cleanup [Protocol.Alpha] ; + test_websocket_max_message_length () ; test_websocket_newPendingTransactions_event [Protocol.Alpha] ; test_websocket_logs_event [Protocol.Alpha] ; test_node_correctly_uses_batcher_heap [Protocol.Alpha] ; diff --git a/tezt/lib_tezos/websocket.ml b/tezt/lib_tezos/websocket.ml index 61fd9970124e..e6e7e6d4eb6a 100644 --- a/tezt/lib_tezos/websocket.ml +++ b/tezt/lib_tezos/websocket.ml @@ -10,6 +10,8 @@ type t = {process : Process.t; stdin : Lwt_io.output_channel} exception Could_not_connect +exception Connection_closed + let get_unique_name = let name_counts = ref String_map.empty in fun name -> @@ -35,7 +37,7 @@ let connect ?runner ?hooks ?name url = in return {process; stdin} -let send_msg {stdin; _} msg = +let send_raw {stdin; _} msg = let* () = Lwt_io.write stdin (msg ^ "\n") in unit @@ -46,7 +48,7 @@ let read_json ~origin {process; _} = let rec loop () = let* line = Lwt_io.read_line_opt ch in match line with - | None -> failwith "No response on websocket" + | None -> raise Connection_closed | Some line -> ( Buffer.add_string buff line ; match JSON.parse_opt ~origin (Buffer.contents buff) with @@ -77,7 +79,7 @@ let send = (Process.name ws.process) !cpt msg ; - send_msg ws msg + send_raw ws msg let recv = let cpt = ref 0 in diff --git a/tezt/lib_tezos/websocket.mli b/tezt/lib_tezos/websocket.mli index 965ef2af0065..0002c7738b48 100644 --- a/tezt/lib_tezos/websocket.mli +++ b/tezt/lib_tezos/websocket.mli @@ -8,6 +8,8 @@ exception Could_not_connect +exception Connection_closed + (** Type of a websocket client *) type t @@ -26,6 +28,9 @@ val close : t -> unit Lwt.t (** Send a JSON object on the websocket. *) val send : t -> JSON.t -> unit Lwt.t +(** Send a raw string on the websocket. *) +val send_raw : t -> string -> unit Lwt.t + (** Receive a JSON object on the websocket. *) val recv : t -> JSON.t Lwt.t -- GitLab From 1efc4ea99c5c4c56a4affdf9dc5ca3c205ca6612 Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Mon, 6 Jan 2025 17:24:07 +0100 Subject: [PATCH 7/8] Tests: update regressions --- .../evm_sequencer.ml/Alpha- Configuration RPC.out | 9 ++++++--- .../evm_sequencer.ml/EVM Node- describe config.out | 6 +++++- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out index 58fc43cce0f2..887424cf1879 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out @@ -23,7 +23,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "ignore_block_param": false @@ -80,7 +81,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "ignore_block_param": false @@ -130,7 +132,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "evm_node_endpoint": "hidden", diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 29d4b1cf880c..6997a9eb81de 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -163,7 +163,11 @@ "rpc_server"?: "dream" | "resto", "enable_websocket"?: boolean - /* Enable or disable the experimental websocket server */ }, + /* Enable or disable the experimental websocket server */, + "max_websocket_message_length"?: + integer ∈ [-2^30, 2^30] + /* Maximum message size accepted by the websocket server (only for + Resto backend) */ }, "proxy"?: { "finalized_view"?: boolean -- GitLab From 474e35a5d4d6948a9f34fd7dcc41e41bc046140a Mon Sep 17 00:00:00 2001 From: Alain Mebsout Date: Mon, 6 Jan 2025 16:27:05 +0100 Subject: [PATCH 8/8] Doc: changelog --- etherlink/CHANGES_NODE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 8026402aebfb..91aee5991517 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -39,6 +39,7 @@ you start using them, you probably want to use `octez-evm-node check config to receive real-time notifications of incoming pending transactions. (!15991) - Added support for the WebSocket event `logs`, enabling clients to receive real-time notifications for contract events filtered by address and topics. (!16011) +- Configurable maximum websocket message length (defaults to 4MB). (!16070) - History mode can now be selected with the parameter `history_mode`, `archive` and `rolling`. If the mode is `rolling` it will use the field `garbage_collect_parameters` to prune blocks, operations and states. (!16044) -- GitLab