diff --git a/cohttp/cohttp-lwt-unix/src/io.ml b/cohttp/cohttp-lwt-unix/src/io.ml index 6deeeef841aa8aa59f666fb0e058335ffd7943f8..24e2d96694366720f891ac3b4b39032e94435067 100644 --- a/cohttp/cohttp-lwt-unix/src/io.ml +++ b/cohttp/cohttp-lwt-unix/src/io.ml @@ -83,3 +83,32 @@ let catch f = | ex -> Lwt.fail ex) let pp_error = Fmt.exn + +let wait_eof_or_closed conn ic sleep_fn = + let wait_for_cancel () = fst (Lwt.task ()) in + match (conn : Conduit_lwt_unix.flow) with + | Vchan _ -> wait_for_cancel () + | TCP {fd; _} | Domain_socket {fd; _} -> + let peek_buffer = Bytes.create 1 in + let has_recv_eof fd = + (* MSG_PEEK does not consume data from the stream and does not + impact normal read operations *) + Lwt_unix.recv fd peek_buffer 0 1 Unix.[MSG_PEEK] >>= fun n -> + Lwt.return (n = 0) + in + let rec loop fd = + (* Calls [sleep_fn] to allow yielding control to the request handler *) + sleep_fn () >>= fun () -> + if Lwt_io.is_closed ic then + (* The connection was closed locally. Stop waiting for EOF. + The client has closed the connection and now possibly is doing + some clean up. We should not interrupt this. Let's wait + till the promise for the request handling is resolved and then this + promise will be cancelled. *) + wait_for_cancel () + else + has_recv_eof fd >>= function + | true -> Lwt.return_unit + | false -> loop fd + in + loop fd diff --git a/cohttp/cohttp-lwt/src/s.ml b/cohttp/cohttp-lwt/src/s.ml index 196435f759ba320b49ad1ed63dd62126dcb4e08c..068c61d999b52f37a1c91dbdd60d0b42b0f70304 100644 --- a/cohttp/cohttp-lwt/src/s.ml +++ b/cohttp/cohttp-lwt/src/s.ml @@ -13,6 +13,20 @@ module type IO = sig val catch : (unit -> 'a t) -> ('a, error) result t val pp_error : Format.formatter -> error -> unit + + (** [wait_eof_or_closed conn ic sleep_fn] waits for an EOF or a Closed status + on the input channel [ic]. This function is designed to be used in + [Lwt.pick] to run concurrently with the request handling from the input + channel. The function checks for EOF using [MSG_PEEK] on the input channel + without consuming data, thereby not disturbing the request handling. If + the connection is closed locally, Cohttp will stop waiting for EOF and + will wait the promise to be cancelled. This function ensures that the + monitoring does not spin too quickly and uses CPU efficiently when the + input channel has read activity but the client is not reading it. + + [sleep_fn] is a parameter function used to yield control periodically, + keeping Cohttp platform-independent. *) + val wait_eof_or_closed : conn -> ic -> (unit -> unit t) -> unit t end (** The [Net] module type defines how to connect to a remote node and close the @@ -157,14 +171,29 @@ module type Server = sig type t + (** [make_response_action] creates a set of callbacks used by Cohttp Server. + + - [callback] is called when a new connection is accepted by the server + socket. + - [conn_closed] if provided, will be called when the connection is closed, + e.g. when an EOF is received. + - [sleep_fn] if provided, will be used for periodic checks for EOF from + the client. If this callback is not provided, Cohttp will not detect and + notify the client about EOF received from the peer while the client is + handling the new connection. This can lead to a resource leak if the + [callback] is designed to never resolve. If the connection is closed + locally, Cohttp will stop waiting for EOF and will wait the promise to + be cancelled. *) val make_response_action : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback:(conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t) -> unit -> t val make_expert : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback: (conn -> Cohttp.Request.t -> @@ -175,6 +204,7 @@ module type Server = sig val make : ?conn_closed:(conn -> unit) -> + ?sleep_fn:(unit -> unit Lwt.t) -> callback: (conn -> Cohttp.Request.t -> Body.t -> (Cohttp.Response.t * Body.t) Lwt.t) -> unit -> diff --git a/cohttp/cohttp-lwt/src/server.ml b/cohttp/cohttp-lwt/src/server.ml index bffc23c0f90cc798f31fa9ee243b36872acc0adf..f03ced593a170a946b6b565ab4863c6c1367dd12 100644 --- a/cohttp/cohttp-lwt/src/server.ml +++ b/cohttp/cohttp-lwt/src/server.ml @@ -19,22 +19,23 @@ module Make (IO : S.IO) = struct type t = { callback : conn -> Cohttp.Request.t -> Body.t -> response_action Lwt.t; conn_closed : conn -> unit; + sleep_fn : (unit -> unit Lwt.t) option; } - let make_response_action ?(conn_closed = ignore) ~callback () = - {conn_closed; callback} + let make_response_action ?(conn_closed = ignore) ?sleep_fn ~callback () = + {conn_closed; callback; sleep_fn} - let make ?conn_closed ~callback () = + let make ?conn_closed ?sleep_fn ~callback () = let callback conn req body = callback conn req body >|= fun rsp -> `Response rsp in - make_response_action ?conn_closed ~callback () + make_response_action ?conn_closed ?sleep_fn ~callback () - let make_expert ?conn_closed ~callback () = + let make_expert ?conn_closed ?sleep_fn ~callback () = let callback conn req body = callback conn req body >|= fun rsp -> `Expert rsp in - make_response_action ?conn_closed ~callback () + make_response_action ?conn_closed ?sleep_fn ~callback () module Transfer_IO = Cohttp__Transfer_io.Make (IO) @@ -117,7 +118,9 @@ module Make (IO : S.IO) = struct `Response rsp)) (fun () -> Body.drain_body body) - let handle_response ~keep_alive oc res body conn_closed handle_client = + type conn_action = Call_conn_closed | Call_conn_closed_and_drain of Body.t + + let handle_response ~keep_alive oc res body handle_client = IO.catch (fun () -> let flush = Response.flush res in Response.write @@ -127,36 +130,29 @@ module Make (IO : S.IO) = struct oc) >>= function | Ok () -> - if keep_alive then handle_client oc - else - let () = conn_closed () in - Lwt.return_unit + if keep_alive then handle_client oc else Lwt.return Call_conn_closed | Error e -> Log.info (fun m -> m "IO error while writing body: %a" IO.pp_error e) ; - conn_closed () ; - Body.drain_body body + Lwt.return (Call_conn_closed_and_drain body) let rec handle_client ic oc conn spec = Request.read ic >>= function | `Eof -> - spec.conn_closed conn ; - Lwt.return_unit + Log.debug (fun m -> + m + "Got EOF while handling client: %s" + (Cohttp.Connection.to_string (snd conn))) ; + Lwt.return Call_conn_closed | `Invalid data -> Log.err (fun m -> m "invalid input %s while handling client" data) ; - spec.conn_closed conn ; - Lwt.return_unit + Lwt.return Call_conn_closed | `Ok req -> ( let body = read_body ic req in handle_request spec.callback conn req body >>= function | `Response (res, body) -> let keep_alive = Request.is_keep_alive req in - handle_response - ~keep_alive - oc - res - body - (fun () -> spec.conn_closed conn) - (fun oc -> handle_client ic oc conn spec) + handle_response ~keep_alive oc res body (fun oc -> + handle_client ic oc conn spec) | `Expert (res, io_handler) -> Response.write_header res oc >>= fun () -> io_handler ic oc >>= fun () -> handle_client ic oc conn spec) @@ -164,9 +160,29 @@ module Make (IO : S.IO) = struct let callback spec io_id ic oc = let conn_id = Cohttp.Connection.create () in let conn_closed () = spec.conn_closed (io_id, conn_id) in + let handle () = handle_client ic oc (io_id, conn_id) spec in + let is_conn_closed () = + (* Without a sleep function we cannot safely loop waiting for EOF *) + match spec.sleep_fn with + | None -> fst (Lwt.task ()) (* wait to be cancelled *) + | Some sleep_fn -> + IO.wait_eof_or_closed io_id ic sleep_fn >>= fun () -> + Log.debug (fun m -> + m + "Client closed the connection, got EOF for %s" + (Cohttp.Connection.to_string conn_id)) ; + Lwt.return Call_conn_closed + in Lwt.catch (fun () -> - IO.catch (fun () -> handle_client ic oc (io_id, conn_id) spec) + IO.catch (fun () -> + Lwt.pick [handle (); is_conn_closed ()] >>= function + | Call_conn_closed -> + conn_closed () ; + Lwt.return_unit + | Call_conn_closed_and_drain body -> + conn_closed () ; + Body.drain_body body) >>= function | Ok () -> Lwt.return_unit | Error e ->