diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 8026402aebfb8b98f07a72027ebef2f5187e6e08..91aee599151722e339a09e291cc89624e07ba90c 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -39,6 +39,7 @@ you start using them, you probably want to use `octez-evm-node check config to receive real-time notifications of incoming pending transactions. (!15991) - Added support for the WebSocket event `logs`, enabling clients to receive real-time notifications for contract events filtered by address and topics. (!16011) +- Configurable maximum websocket message length (defaults to 4MB). (!16070) - History mode can now be selected with the parameter `history_mode`, `archive` and `rolling`. If the mode is `rolling` it will use the field `garbage_collect_parameters` to prune blocks, operations and states. (!16044) diff --git a/etherlink/bin_node/config/configuration.ml b/etherlink/bin_node/config/configuration.ml index fd91ee51dd367e057c268f606199d91f5ffbf039..2c7bfb141ae83ad810118d7fd4608eed9e93adf5 100644 --- a/etherlink/bin_node/config/configuration.ml +++ b/etherlink/bin_node/config/configuration.ml @@ -54,6 +54,7 @@ type experimental_features = { history_mode : history_mode; rpc_server : rpc_server; enable_websocket : bool; + max_websocket_message_length : int; } type sequencer = { @@ -140,6 +141,10 @@ let default_enable_send_raw_transaction = true let default_garbage_collector_parameters = {split_frequency_in_seconds = 86_400; number_of_chunks = 14} +(* This should be enough for messages we expect to receive in the ethereum + JSONRPC protocol. *) +let default_max_socket_message_length = 4096 * 1024 + let default_experimental_features = { enable_send_raw_transaction = default_enable_send_raw_transaction; @@ -152,6 +157,7 @@ let default_experimental_features = history_mode = Archive; rpc_server = Resto; enable_websocket = false; + max_websocket_message_length = default_max_socket_message_length; } let default_data_dir = Filename.concat (Sys.getenv "HOME") ".octez-evm-node" @@ -736,6 +742,7 @@ let experimental_features_encoding = history_mode; rpc_server; enable_websocket; + max_websocket_message_length; } -> ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, @@ -747,7 +754,7 @@ let experimental_features_encoding = garbage_collector_parameters, history_mode, None ), - (rpc_server, enable_websocket) )) + (rpc_server, enable_websocket, max_websocket_message_length) )) (fun ( ( drop_duplicate_on_injection, blueprints_publisher_order_enabled, enable_send_raw_transaction, @@ -758,7 +765,7 @@ let experimental_features_encoding = garbage_collector_parameters, history_mode, _next_wasm_runtime ), - (rpc_server, enable_websocket) ) -> + (rpc_server, enable_websocket, max_websocket_message_length) ) -> { drop_duplicate_on_injection; blueprints_publisher_order_enabled; @@ -770,6 +777,7 @@ let experimental_features_encoding = history_mode; rpc_server; enable_websocket; + max_websocket_message_length; }) (merge_objs (obj10 @@ -844,7 +852,7 @@ let experimental_features_encoding = DEPRECATED: You should remove this option from your \ configuration file." bool)) - (obj2 + (obj3 (dft "rpc_server" ~description: @@ -856,7 +864,14 @@ let experimental_features_encoding = "enable_websocket" ~description:"Enable or disable the experimental websocket server" bool - default_experimental_features.enable_websocket))) + default_experimental_features.enable_websocket) + (dft + "max_websocket_message_length" + ~description: + "Maximum message size accepted by the websocket server (only \ + for Resto backend)" + int31 + default_max_socket_message_length))) let proxy_encoding = let open Data_encoding in diff --git a/etherlink/bin_node/config/configuration.mli b/etherlink/bin_node/config/configuration.mli index 2dafde1e141d798d347894fef560991841d9dff1..1d3ebb1c06d2b7396c2c1cbc85110391913e164b 100644 --- a/etherlink/bin_node/config/configuration.mli +++ b/etherlink/bin_node/config/configuration.mli @@ -86,6 +86,7 @@ type experimental_features = { history_mode : history_mode; rpc_server : rpc_server; enable_websocket : bool; + max_websocket_message_length : int; } type sequencer = { diff --git a/etherlink/bin_node/lib_dev/evm_directory.ml b/etherlink/bin_node/lib_dev/evm_directory.ml index 92ee192b25dcae5b32ae1ebb3215225ee7c2d478..6a93331091ff975a7bbcd9092ef7c2842a3d9638 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.ml +++ b/etherlink/bin_node/lib_dev/evm_directory.ml @@ -93,10 +93,12 @@ let register_metrics path dir = let route = Router.make_metrics_route path in Dream (route :: routes) -let jsonrpc_websocket_register dir path handler = +let jsonrpc_websocket_register ~max_message_length dir path handler = match dir with | Resto {dir; extra} -> - let callback = Evm_websocket.cohttp_callback handler in + let callback = + Evm_websocket.cohttp_callback ~max_message_length handler + in Resto {dir; extra = EndpointMap.add (`GET, path) callback extra} | Dream routes -> let route = Router.make_jsonrpc_websocket_route path handler in diff --git a/etherlink/bin_node/lib_dev/evm_directory.mli b/etherlink/bin_node/lib_dev/evm_directory.mli index beca87ee4ee9364567185f1e653edada939f94ba..fa5a0c2b623a1d0e6ead573aa10589785eec9d4f 100644 --- a/etherlink/bin_node/lib_dev/evm_directory.mli +++ b/etherlink/bin_node/lib_dev/evm_directory.mli @@ -70,7 +70,7 @@ val register_metrics : string -> t -> t JSONRPC response and optionally produce output elements in a stream for subscription services. *) val jsonrpc_websocket_register : - t -> string -> Rpc_encodings.websocket_handler -> t + max_message_length:int -> t -> string -> Rpc_encodings.websocket_handler -> t (** {2 Curried functions with respect to service parameters} *) diff --git a/etherlink/bin_node/lib_dev/evm_websocket.ml b/etherlink/bin_node/lib_dev/evm_websocket.ml index bb3389a0eb2f2b8d7ccb55186691f34cfd8f193b..ee8b29a819588e1b9066b8548b77862de73c0f17 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.ml +++ b/etherlink/bin_node/lib_dev/evm_websocket.ml @@ -13,20 +13,29 @@ type parameters = { http_request : Cohttp.Request.t; conn : Cohttp_lwt_unix.Server.conn; medias : Media_type.t list; + max_message_length : int; handler : websocket_handler; } +type close_status = Normal_closure | Message_too_big + +(* https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1 *) +let code_of_close_status = function + | Normal_closure -> 1000 + | Message_too_big -> 1009 + module Types = struct type subscriptions_table = (Ethereum_types.Subscription.id, unit -> unit) Stdlib.Hashtbl.t - type close_info = {reason : string; code : int} + type close_info = {reason : string; status : close_status} type state = { push_frame : Websocket.Frame.t option -> unit; conn_descr : string; input_media_type : Media_type.t; output_media_type : Media_type.t; + max_message_length : int; handler : websocket_handler; message_buffer : Buffer.t; subscriptions : subscriptions_table; @@ -216,20 +225,20 @@ type worker = Worker.infinite Worker.queue Worker.t let table = Worker.create_table Queue -let default_close_info = {Types.reason = ""; code = 1000} +let default_close_info = {Types.reason = ""; status = Normal_closure} -let shutdown_worker ~reason ?(code = default_close_info.code) w = +let shutdown_worker ~reason ?(status = default_close_info.status) w = let st = Worker.state w in (match st.close_info with | Some _ -> () - | None -> st.close_info <- Some {code; reason}) ; + | None -> st.close_info <- Some {status; reason}) ; Worker.shutdown w -let shutdown ~reason ?code conn = +let shutdown ~reason ?status conn = let open Lwt_syntax in match Worker.find_opt table conn with | None -> return_unit - | Some w -> shutdown_worker ~reason ?code w + | Some w -> shutdown_worker ~reason ?status w let handle_subscription {Types.push_frame; conn_descr; output_media_type; subscriptions; _} opcode @@ -284,7 +293,14 @@ let opcode_of_media media = let on_frame worker fr = let open Lwt_syntax in let state = Worker.state worker in - let {Types.push_frame; input_media_type; output_media_type; handler; _} = + let { + Types.push_frame; + input_media_type; + output_media_type; + max_message_length; + handler; + _; + } = state in let push_frame f = push_frame (Some f) in @@ -330,6 +346,18 @@ let on_frame worker fr = (* Client has sent a close frame, we shut everything down for this worker *) shutdown_worker ~reason:"Received close frame" worker + | {opcode = Text | Binary; content; _} + when String.length content > max_message_length -> + (* We are receiving a message too big for the server *) + shutdown_worker ~reason:"Message too big" ~status:Message_too_big worker + | {opcode = Continuation; content; _} + when Buffer.length state.message_buffer + String.length content + > max_message_length -> + (* We are receiving a message too big for the server *) + shutdown_worker + ~reason:"Fragmented message too big" + ~status:Message_too_big + worker | {opcode = Text | Binary; content; final = false; _} -> (* New fragmented message *) Buffer.clear state.message_buffer ; @@ -366,7 +394,8 @@ module Handlers = struct type launch_error = [`Not_acceptable | `Unsupported_media_type of string] let on_launch _w _name - ({push_frame; http_request; conn; medias; handler} : Types.parameters) = + ({push_frame; http_request; conn; medias; max_message_length; handler} : + Types.parameters) = let open Lwt_result_syntax in let headers = Cohttp.Request.headers http_request in let medias = @@ -397,6 +426,7 @@ module Handlers = struct conn_descr; input_media_type; output_media_type; + max_message_length; handler; message_buffer = Buffer.create 256; subscriptions = Stdlib.Hashtbl.create 3; @@ -428,13 +458,13 @@ module Handlers = struct Worker.state w in let nb_sub = Stdlib.Hashtbl.length subscriptions in - let {Types.reason; code} = + let {Types.reason; status} = Option.value close_info ~default:default_close_info in let* () = Event.(emit shutdown) (conn_descr, reason, nb_sub) in let () = try - push_frame (Some (Websocket.Frame.close code)) ; + push_frame (Some (Websocket.Frame.close (code_of_close_status status))) ; push_frame None with _ -> (* Websocket already closed *) () in @@ -444,14 +474,14 @@ module Handlers = struct end let start (conn : Cohttp_lwt_unix.Server.conn) (http_request : Cohttp.Request.t) - medias handler push_frame = + medias ~max_message_length handler push_frame = let open Lwt_result_syntax in let name = Cohttp.Connection.to_string (snd conn) in let* (_worker : _ Worker.t) = Worker.launch table name - {push_frame; http_request; conn; medias; handler} + {push_frame; http_request; conn; medias; max_message_length; handler} (module Handlers) in return_unit @@ -465,13 +495,15 @@ let new_frame conn fr = cases. In this case we could lose frames. *) Worker.Queue.push_request_now w (Request.Frame fr) -let cohttp_callback handler (conn : Cohttp_lwt_unix.Server.conn) req _body = +let cohttp_callback ~max_message_length handler + (conn : Cohttp_lwt_unix.Server.conn) req _body = let open Lwt_syntax in let media_types = Supported_media_types.all in let conn_name = conn |> snd |> Cohttp.Connection.to_string in let* response_action, push_frame = Websocket_cohttp_lwt.upgrade_connection req + ~max_frame_length:max_message_length (new_frame conn_name) (fun io_exn -> let reason = @@ -479,7 +511,9 @@ let cohttp_callback handler (conn : Cohttp_lwt_unix.Server.conn) req _body = in shutdown ~reason conn_name) in - let+ res = start conn req media_types handler push_frame in + let+ res = + start conn req media_types ~max_message_length handler push_frame + in match res with | Ok () -> response_action | Error err -> diff --git a/etherlink/bin_node/lib_dev/evm_websocket.mli b/etherlink/bin_node/lib_dev/evm_websocket.mli index 39581d7e51bc541b499f09753b628462e7004173..4115b1493a03f9a0530899b49d1c3d6b7e2295dd 100644 --- a/etherlink/bin_node/lib_dev/evm_websocket.mli +++ b/etherlink/bin_node/lib_dev/evm_websocket.mli @@ -7,10 +7,13 @@ (*****************************************************************************) (** Callback to use for a websocket endpoint in a Cohttp server. - [cohttp_callback handler conn req body] upgrades the connection for request - [req] to the websocket protocol and starts worker that processes incoming - frames and writes in return in the websocket. *) + [cohttp_callback ~max_message_length handler conn req body] upgrades the + connection for request [req] to the websocket protocol and starts worker + that processes incoming frames and writes in return in the + websocket. [max_message_length] specifies the maximum size of message + accepted by the server, over which the connection will be closed. *) val cohttp_callback : + max_message_length:int -> Rpc_encodings.websocket_handler -> Cohttp_lwt_unix.Server.conn -> Cohttp.Request.t -> diff --git a/etherlink/bin_node/lib_dev/services.ml b/etherlink/bin_node/lib_dev/services.ml index 5f13e2147098a67a29df81b40534e31ef7f3cc38..be4634bd24af2d15dd67b0984b75f232c31f1215 100644 --- a/etherlink/bin_node/lib_dev/services.ml +++ b/etherlink/bin_node/lib_dev/services.ml @@ -995,6 +995,8 @@ let generic_websocket_dispatch (rpc : Configuration.rpc) (config : Configuration.t) ctx dir path dispatch_websocket = if config.experimental_features.enable_websocket then Evm_directory.jsonrpc_websocket_register + ~max_message_length: + config.experimental_features.max_websocket_message_length dir path (dispatch_websocket rpc config ctx) diff --git a/etherlink/tezt/lib/evm_node.ml b/etherlink/tezt/lib/evm_node.ml index 7d4da43ea69fe5aeccfd1f778057bbc9eda440c2..67e121a185b26a14a850a5ae8d13f22db315674b 100644 --- a/etherlink/tezt/lib/evm_node.ml +++ b/etherlink/tezt/lib/evm_node.ml @@ -1226,7 +1226,7 @@ let patch_config_with_experimental_feature ?(blueprints_publisher_order_enabled = false) ?(block_storage_sqlite3 = true) ?(next_wasm_runtime = true) ?garbage_collector_parameters ?history_mode ?rpc_server - ?(enable_websocket = false) () = + ?(enable_websocket = false) ?max_websocket_message_length () = let conditional_json_put ~name cond value_json json = if cond then JSON.put @@ -1282,6 +1282,10 @@ let patch_config_with_experimental_feature | Resto -> `String "resto" | Dream -> `String "dream") |> conditional_json_put enable_websocket ~name:"enable_websocket" (`Bool true) + |> optional_json_put + max_websocket_message_length + ~name:"max_websocket_message_length" + (fun max -> `Float (float_of_int max)) let init ?patch_config ?name ?runner ?mode ?data_dir ?rpc_addr ?rpc_port ?restricted_rpcs ?websockets rollup_node = diff --git a/etherlink/tezt/lib/evm_node.mli b/etherlink/tezt/lib/evm_node.mli index 70d976ae53ae239ad09ffec45b5cf5ebe8424370..b9b574f76d926781658b462479b1166b277ff299 100644 --- a/etherlink/tezt/lib/evm_node.mli +++ b/etherlink/tezt/lib/evm_node.mli @@ -297,11 +297,11 @@ type history_mode = Archive | Rolling type rpc_server = Resto | Dream -(** [patch_config_with_experimental_feature - ?drop_duplicate_when_injection ?block_storage_sqlite3 - ?next_wasm_runtime ?rpc_server ?enable_websocket json_config] patches a config to - add experimental feature. Each optional argument add the - correspondent experimental feature. *) +(** [patch_config_with_experimental_feature ?drop_duplicate_when_injection + ?block_storage_sqlite3 ?next_wasm_runtime ?rpc_server ?enable_websocket + ?max_websocket_message_length json_config] patches a config to add + experimental feature. Each optional argument add the correspondent + experimental feature. *) val patch_config_with_experimental_feature : ?drop_duplicate_when_injection:bool -> ?blueprints_publisher_order_enabled:bool -> @@ -311,6 +311,7 @@ val patch_config_with_experimental_feature : ?history_mode:history_mode -> ?rpc_server:rpc_server -> ?enable_websocket:bool -> + ?max_websocket_message_length:int -> unit -> JSON.t -> JSON.t diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 8f7ff32277a1deac7ee2763fdfb3c58a9fa56d46..98ae12536bc50b3babce836cd32508b289e148e4 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -9028,6 +9028,49 @@ let test_websocket_cleanup = ~error_msg:"All subscriptions should have been cleaned up from node" ; unit +let test_websocket_max_message_length () = + let max_websocket_message_length = 512 in + let patch_config = + Evm_node.patch_config_with_experimental_feature + ~rpc_server:Resto (* The limit is not implemented for Dream *) + ~enable_websocket:true + ~max_websocket_message_length + () + in + register_sandbox + ~tags:["evm"; "rpc"; "websocket"; "max"] + ~title:"Websocket server does not accept messages larger than maximum" + ~patch_config + @@ fun sequencer -> + let* websocket = Evm_node.open_websocket sequencer in + let shutdown = + Lwt.pick + [ + ( Evm_node.wait_for sequencer "websocket_shutdown.v0" @@ fun json -> + Some JSON.(json |-> "reason" |> as_string) ); + (let* () = Lwt_unix.sleep 5. in + Test.fail ~__LOC__ "Websocket worker did not close."); + ] + in + let* () = + Websocket.send_raw + websocket + (String.make (max_websocket_message_length + 1) '\000') + in + let* reason = shutdown in + Log.info "Websocket shutdown reason: %S." reason ; + Check.(reason =~ rex "too big") + ~error_msg:"Expected reason to match %R, got %L" ; + Lwt.catch + (fun () -> + let* _ = Websocket.recv websocket in + Test.fail ~__LOC__ "Connection was not closed.") + (function + | Websocket.Connection_closed -> + Log.info "Connection was closed on client end." ; + unit + | e -> Lwt.reraise e) + let test_websocket_newPendingTransactions_event = register_all ~tags:["evm"; "rpc"; "websocket"; "new_pending_transactions"] @@ -9499,6 +9542,7 @@ let () = [Protocol.Alpha] ; test_websocket_newHeads_event [Protocol.Alpha] ; test_websocket_cleanup [Protocol.Alpha] ; + test_websocket_max_message_length () ; test_websocket_newPendingTransactions_event [Protocol.Alpha] ; test_websocket_logs_event [Protocol.Alpha] ; test_node_correctly_uses_batcher_heap [Protocol.Alpha] ; diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out index 58fc43cce0f24b2ae6aa89875fcd61e2cdb5f06d..887424cf1879d543e33e50ee524eb79d6da3837f 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Configuration RPC.out @@ -23,7 +23,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "ignore_block_param": false @@ -80,7 +81,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "ignore_block_param": false @@ -130,7 +132,8 @@ }, "history_mode": "archive", "rpc_server": "resto", - "enable_websocket": false + "enable_websocket": false, + "max_websocket_message_length": 4194304 }, "proxy": { "evm_node_endpoint": "hidden", diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out index 29d4b1cf880c42733c8e600d9828dc02c0f72b9b..6997a9eb81de4d3a2b466af2db5901feee3387cd 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/EVM Node- describe config.out @@ -163,7 +163,11 @@ "rpc_server"?: "dream" | "resto", "enable_websocket"?: boolean - /* Enable or disable the experimental websocket server */ }, + /* Enable or disable the experimental websocket server */, + "max_websocket_message_length"?: + integer ∈ [-2^30, 2^30] + /* Maximum message size accepted by the websocket server (only for + Resto backend) */ }, "proxy"?: { "finalized_view"?: boolean diff --git a/tezt/lib_tezos/websocket.ml b/tezt/lib_tezos/websocket.ml index 61fd9970124ef35c3c2b650010c5b35baa1489d9..e6e7e6d4eb6aa88a661213742f48cc2d6608e76f 100644 --- a/tezt/lib_tezos/websocket.ml +++ b/tezt/lib_tezos/websocket.ml @@ -10,6 +10,8 @@ type t = {process : Process.t; stdin : Lwt_io.output_channel} exception Could_not_connect +exception Connection_closed + let get_unique_name = let name_counts = ref String_map.empty in fun name -> @@ -35,7 +37,7 @@ let connect ?runner ?hooks ?name url = in return {process; stdin} -let send_msg {stdin; _} msg = +let send_raw {stdin; _} msg = let* () = Lwt_io.write stdin (msg ^ "\n") in unit @@ -46,7 +48,7 @@ let read_json ~origin {process; _} = let rec loop () = let* line = Lwt_io.read_line_opt ch in match line with - | None -> failwith "No response on websocket" + | None -> raise Connection_closed | Some line -> ( Buffer.add_string buff line ; match JSON.parse_opt ~origin (Buffer.contents buff) with @@ -77,7 +79,7 @@ let send = (Process.name ws.process) !cpt msg ; - send_msg ws msg + send_raw ws msg let recv = let cpt = ref 0 in diff --git a/tezt/lib_tezos/websocket.mli b/tezt/lib_tezos/websocket.mli index 965ef2af00650bdd78981d1ba5d74c61a4db7189..0002c7738b489425a9b4651bc1bf6c9f86debfc3 100644 --- a/tezt/lib_tezos/websocket.mli +++ b/tezt/lib_tezos/websocket.mli @@ -8,6 +8,8 @@ exception Could_not_connect +exception Connection_closed + (** Type of a websocket client *) type t @@ -26,6 +28,9 @@ val close : t -> unit Lwt.t (** Send a JSON object on the websocket. *) val send : t -> JSON.t -> unit Lwt.t +(** Send a raw string on the websocket. *) +val send_raw : t -> string -> unit Lwt.t + (** Receive a JSON object on the websocket. *) val recv : t -> JSON.t Lwt.t diff --git a/websocket/core/websocket.ml b/websocket/core/websocket.ml index 86ec1311f32a64a4e77e382e77155f9fc18d193b..222e577b022f1a2fcd6b63fd5717396dfce2f650 100644 --- a/websocket/core/websocket.ml +++ b/websocket/core/websocket.ml @@ -155,6 +155,7 @@ module type S = sig type mode = Client of (int -> string) | Server val make_read_frame : + ?max_len:int -> ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit @@ -177,6 +178,7 @@ module type S = sig type t val create : + ?max_len:int -> ?read_buf:Buffer.t -> ?write_buf:Buffer.t -> Cohttp.Request.t -> @@ -265,7 +267,7 @@ module Make (IO : Cohttp.S.IO) = struct write_frame_to_buf ~mode buf @@ Frame.close code; write oc @@ Buffer.contents buf - let read_frame ic oc buf mode hdr = + let read_frame ?max_len ic oc buf mode hdr = let hdr_part1 = EndianString.BigEndian.get_int8 hdr 0 in let hdr_part2 = EndianString.BigEndian.get_int8 hdr 1 in let final = is_bit_set 7 hdr_part1 in @@ -291,6 +293,10 @@ module Make (IO : Cohttp.S.IO) = struct else if extension <> 0 then close_with_code mode buf oc 1002 >>= fun () -> proto_error "unsupported extension" + else if (match max_len with Some max -> payload_len > max | None -> false) + then + close_with_code mode buf oc 1009 >>= fun () -> + proto_error "frame payload too big" else if Frame.Opcode.is_ctrl opcode && payload_len > 125 then close_with_code mode buf oc 1002 >>= fun () -> proto_error "control frame too big" @@ -315,11 +321,11 @@ module Make (IO : Cohttp.S.IO) = struct let frame = Frame.of_bytes ~opcode ~extension ~final payload in return frame) - let make_read_frame ?(buf = Buffer.create 128) ~mode ic oc () = + let make_read_frame ?max_len ?(buf = Buffer.create 128) ~mode ic oc () = Buffer.clear buf; read_exactly ic 2 buf >>= function | None -> raise End_of_file - | Some hdr -> read_frame ic oc buf mode hdr + | Some hdr -> read_frame ?max_len ic oc buf mode hdr module Request = Cohttp.Request.Make (IO) module Response = Cohttp.Response.Make (IO) @@ -337,9 +343,9 @@ module Make (IO : Cohttp.S.IO) = struct let source { endp; _ } = endp - let create ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic + let create ?max_len ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic oc = - let read_frame = make_read_frame ?buf:read_buf ~mode:Server ic oc in + let read_frame = make_read_frame ?max_len ?buf:read_buf ~mode:Server ic oc in { buffer = write_buf; endp; diff --git a/websocket/core/websocket.mli b/websocket/core/websocket.mli index 04f1689646bc8637a7f77c76a921c80ab9ccb463..359761b34fcbbeddbbed4cf06ad6bc6eba0d7ffa 100644 --- a/websocket/core/websocket.mli +++ b/websocket/core/websocket.mli @@ -84,6 +84,7 @@ module type S = sig type mode = Client of (int -> string) | Server val make_read_frame : + ?max_len:int -> ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit @@ -106,6 +107,7 @@ module type S = sig type t val create : + ?max_len:int -> ?read_buf:Buffer.t -> ?write_buf:Buffer.t -> Cohttp.Request.t -> diff --git a/websocket/lwt/websocket_cohttp_lwt.ml b/websocket/lwt/websocket_cohttp_lwt.ml index f3b5b9ca03b817e9af7e695aeccc3c0ccfdd1d77..21376b94361633ffba1b3f5d51e7200f25419337 100644 --- a/websocket/lwt/websocket_cohttp_lwt.ml +++ b/websocket/lwt/websocket_cohttp_lwt.ml @@ -29,12 +29,12 @@ let send_frames stream oc = in Lwt_stream.iter_s send_frame stream -let read_frames ic oc handler_fn = - let read_frame = Lwt_IO.make_read_frame ~mode:Server ic oc in +let read_frames ?max_len ic oc handler_fn = + let read_frame = Lwt_IO.make_read_frame ?max_len ~mode:Server ic oc in let rec inner () = read_frame () >>= Lwt.wrap1 handler_fn >>= inner in inner () -let upgrade_connection request incoming_handler io_error_handler = +let upgrade_connection ?max_frame_length request incoming_handler io_error_handler = let headers = Cohttp.Request.headers request in (match Cohttp.Header.get headers "sec-websocket-key" with | None -> @@ -61,7 +61,7 @@ let upgrade_connection request incoming_handler io_error_handler = [ (* input: data from the client is read from the input channel * of the tcp connection; pass it to handler function *) - read_frames ic oc incoming_handler; + read_frames ?max_len:max_frame_length ic oc incoming_handler; (* output: data for the client is written to the output * channel of the tcp connection *) send_frames frames_out_stream oc; diff --git a/websocket/lwt/websocket_cohttp_lwt.mli b/websocket/lwt/websocket_cohttp_lwt.mli index af333b7a48a6c3f388e56b5388004c16ba35552a..22f3f32339955ddecd39c668768c0d6c4f4f0768 100644 --- a/websocket/lwt/websocket_cohttp_lwt.mli +++ b/websocket/lwt/websocket_cohttp_lwt.mli @@ -19,6 +19,7 @@ open Websocket val upgrade_connection : + ?max_frame_length:int -> Cohttp.Request.t -> (Frame.t -> unit) -> (exn -> unit Lwt.t) -> diff --git a/websocket/lwt/websocket_lwt_unix.ml b/websocket/lwt/websocket_lwt_unix.ml index 4080aa7b7bde85a142b3d97f09329fdeeef2661c..608509b219ae4545b86e9a0be66f88f29a880c63 100644 --- a/websocket/lwt/websocket_lwt_unix.ml +++ b/websocket/lwt/websocket_lwt_unix.ml @@ -101,11 +101,12 @@ let write { write_frame; _ } frame = write_frame frame let close_transport { oc; _ } = Lwt_io.close oc let connect ?(extra_headers = Cohttp.Header.init ()) + ?max_frame_length ?(random_string = Websocket.Rng.init ()) ?(ctx = Lazy.force Conduit_lwt_unix.default_ctx) ?buf client url = let nonce = Base64.encode_exn (random_string 16) in connect ctx client url nonce extra_headers >|= fun (ic, oc) -> - let read_frame = make_read_frame ?buf ~mode:(Client random_string) ic oc in + let read_frame = make_read_frame ?max_len:max_frame_length ?buf ~mode:(Client random_string) ic oc in let read_frame () = Lwt.catch read_frame (fun exn -> Lwt.async (fun () -> Lwt_io.close ic); diff --git a/websocket/lwt/websocket_lwt_unix.mli b/websocket/lwt/websocket_lwt_unix.mli index 2ad273690240b821c82dbeec4383093d869acfc0..53a342e2bd81b0db40e8d457039a4414efc4631b 100644 --- a/websocket/lwt/websocket_lwt_unix.mli +++ b/websocket/lwt/websocket_lwt_unix.mli @@ -39,6 +39,7 @@ val close_transport : conn -> unit Lwt.t val connect : ?extra_headers:Cohttp.Header.t -> + ?max_frame_length:int -> ?random_string:(int -> string) -> ?ctx:Conduit_lwt_unix.ctx -> ?buf:Buffer.t -> diff --git a/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch b/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch new file mode 100644 index 0000000000000000000000000000000000000000..b7aaec477aebd76e017314db0795ed8be4dc7c89 --- /dev/null +++ b/websocket/patches/0002-Ocaml-websocket-allow-to-provide-a-maximum-frame-len.patch @@ -0,0 +1,173 @@ +From 35e2ba071a2e0ac1fb39e58f71010200734a796f Mon Sep 17 00:00:00 2001 +From: Alain Mebsout +Date: Sun, 22 Dec 2024 12:45:14 +0100 +Subject: [PATCH 1/1] Ocaml-websocket: allow to provide a maximum frame length + +--- + websocket/core/websocket.ml | 16 +++++++++++----- + websocket/core/websocket.mli | 2 ++ + websocket/lwt/websocket_cohttp_lwt.ml | 8 ++++---- + websocket/lwt/websocket_cohttp_lwt.mli | 1 + + websocket/lwt/websocket_lwt_unix.ml | 3 ++- + websocket/lwt/websocket_lwt_unix.mli | 1 + + 6 files changed, 21 insertions(+), 10 deletions(-) + +diff --git a/websocket/core/websocket.ml b/websocket/core/websocket.ml +index 86ec1311f32..222e577b022 100644 +--- a/websocket/core/websocket.ml ++++ b/websocket/core/websocket.ml +@@ -155,6 +155,7 @@ module type S = sig + type mode = Client of (int -> string) | Server + + val make_read_frame : ++ ?max_len:int -> + ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t + + val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit +@@ -177,6 +178,7 @@ module type S = sig + type t + + val create : ++ ?max_len:int -> + ?read_buf:Buffer.t -> + ?write_buf:Buffer.t -> + Cohttp.Request.t -> +@@ -265,7 +267,7 @@ module Make (IO : Cohttp.S.IO) = struct + write_frame_to_buf ~mode buf @@ Frame.close code; + write oc @@ Buffer.contents buf + +- let read_frame ic oc buf mode hdr = ++ let read_frame ?max_len ic oc buf mode hdr = + let hdr_part1 = EndianString.BigEndian.get_int8 hdr 0 in + let hdr_part2 = EndianString.BigEndian.get_int8 hdr 1 in + let final = is_bit_set 7 hdr_part1 in +@@ -291,6 +293,10 @@ module Make (IO : Cohttp.S.IO) = struct + else if extension <> 0 then + close_with_code mode buf oc 1002 >>= fun () -> + proto_error "unsupported extension" ++ else if (match max_len with Some max -> payload_len > max | None -> false) ++ then ++ close_with_code mode buf oc 1009 >>= fun () -> ++ proto_error "frame payload too big" + else if Frame.Opcode.is_ctrl opcode && payload_len > 125 then + close_with_code mode buf oc 1002 >>= fun () -> + proto_error "control frame too big" +@@ -315,11 +321,11 @@ module Make (IO : Cohttp.S.IO) = struct + let frame = Frame.of_bytes ~opcode ~extension ~final payload in + return frame) + +- let make_read_frame ?(buf = Buffer.create 128) ~mode ic oc () = ++ let make_read_frame ?max_len ?(buf = Buffer.create 128) ~mode ic oc () = + Buffer.clear buf; + read_exactly ic 2 buf >>= function + | None -> raise End_of_file +- | Some hdr -> read_frame ic oc buf mode hdr ++ | Some hdr -> read_frame ?max_len ic oc buf mode hdr + + module Request = Cohttp.Request.Make (IO) + module Response = Cohttp.Response.Make (IO) +@@ -337,9 +343,9 @@ module Make (IO : Cohttp.S.IO) = struct + + let source { endp; _ } = endp + +- let create ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic ++ let create ?max_len ?read_buf ?(write_buf = Buffer.create 128) http_request endp ic + oc = +- let read_frame = make_read_frame ?buf:read_buf ~mode:Server ic oc in ++ let read_frame = make_read_frame ?max_len ?buf:read_buf ~mode:Server ic oc in + { + buffer = write_buf; + endp; +diff --git a/websocket/core/websocket.mli b/websocket/core/websocket.mli +index 04f1689646b..359761b34fc 100644 +--- a/websocket/core/websocket.mli ++++ b/websocket/core/websocket.mli +@@ -84,6 +84,7 @@ module type S = sig + type mode = Client of (int -> string) | Server + + val make_read_frame : ++ ?max_len:int -> + ?buf:Buffer.t -> mode:mode -> IO.ic -> IO.oc -> unit -> Frame.t IO.t + + val write_frame_to_buf : mode:mode -> Buffer.t -> Frame.t -> unit +@@ -106,6 +107,7 @@ module type S = sig + type t + + val create : ++ ?max_len:int -> + ?read_buf:Buffer.t -> + ?write_buf:Buffer.t -> + Cohttp.Request.t -> +diff --git a/websocket/lwt/websocket_cohttp_lwt.ml b/websocket/lwt/websocket_cohttp_lwt.ml +index f3b5b9ca03b..21376b94361 100644 +--- a/websocket/lwt/websocket_cohttp_lwt.ml ++++ b/websocket/lwt/websocket_cohttp_lwt.ml +@@ -29,12 +29,12 @@ let send_frames stream oc = + in + Lwt_stream.iter_s send_frame stream + +-let read_frames ic oc handler_fn = +- let read_frame = Lwt_IO.make_read_frame ~mode:Server ic oc in ++let read_frames ?max_len ic oc handler_fn = ++ let read_frame = Lwt_IO.make_read_frame ?max_len ~mode:Server ic oc in + let rec inner () = read_frame () >>= Lwt.wrap1 handler_fn >>= inner in + inner () + +-let upgrade_connection request incoming_handler io_error_handler = ++let upgrade_connection ?max_frame_length request incoming_handler io_error_handler = + let headers = Cohttp.Request.headers request in + (match Cohttp.Header.get headers "sec-websocket-key" with + | None -> +@@ -61,7 +61,7 @@ let upgrade_connection request incoming_handler io_error_handler = + [ + (* input: data from the client is read from the input channel + * of the tcp connection; pass it to handler function *) +- read_frames ic oc incoming_handler; ++ read_frames ?max_len:max_frame_length ic oc incoming_handler; + (* output: data for the client is written to the output + * channel of the tcp connection *) + send_frames frames_out_stream oc; +diff --git a/websocket/lwt/websocket_cohttp_lwt.mli b/websocket/lwt/websocket_cohttp_lwt.mli +index af333b7a48a..22f3f323399 100644 +--- a/websocket/lwt/websocket_cohttp_lwt.mli ++++ b/websocket/lwt/websocket_cohttp_lwt.mli +@@ -19,6 +19,7 @@ + open Websocket + + val upgrade_connection : ++ ?max_frame_length:int -> + Cohttp.Request.t -> + (Frame.t -> unit) -> + (exn -> unit Lwt.t) -> +diff --git a/websocket/lwt/websocket_lwt_unix.ml b/websocket/lwt/websocket_lwt_unix.ml +index 4080aa7b7bd..608509b219a 100644 +--- a/websocket/lwt/websocket_lwt_unix.ml ++++ b/websocket/lwt/websocket_lwt_unix.ml +@@ -101,11 +101,12 @@ let write { write_frame; _ } frame = write_frame frame + let close_transport { oc; _ } = Lwt_io.close oc + + let connect ?(extra_headers = Cohttp.Header.init ()) ++ ?max_frame_length + ?(random_string = Websocket.Rng.init ()) + ?(ctx = Lazy.force Conduit_lwt_unix.default_ctx) ?buf client url = + let nonce = Base64.encode_exn (random_string 16) in + connect ctx client url nonce extra_headers >|= fun (ic, oc) -> +- let read_frame = make_read_frame ?buf ~mode:(Client random_string) ic oc in ++ let read_frame = make_read_frame ?max_len:max_frame_length ?buf ~mode:(Client random_string) ic oc in + let read_frame () = + Lwt.catch read_frame (fun exn -> + Lwt.async (fun () -> Lwt_io.close ic); +diff --git a/websocket/lwt/websocket_lwt_unix.mli b/websocket/lwt/websocket_lwt_unix.mli +index 2ad27369024..53a342e2bd8 100644 +--- a/websocket/lwt/websocket_lwt_unix.mli ++++ b/websocket/lwt/websocket_lwt_unix.mli +@@ -39,6 +39,7 @@ val close_transport : conn -> unit Lwt.t + + val connect : + ?extra_headers:Cohttp.Header.t -> ++ ?max_frame_length:int -> + ?random_string:(int -> string) -> + ?ctx:Conduit_lwt_unix.ctx -> + ?buf:Buffer.t -> +-- +2.44.0