diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index dc335c99efd09c6b64d1ec897f13f4e7e673a5bc..2b040b71465644282ce36a726334b05d66fa3504 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -54,7 +54,7 @@ let create_scheduler limits = ?write_queue_size:limits.write_queue_size () -let create_connection_pool config limits meta_cfg log triggers = +let create_connection_pool ?fd_pool config limits meta_cfg log triggers = let open P2p_limits in let pool_cfg = { @@ -69,7 +69,7 @@ let create_connection_pool config limits meta_cfg log triggers = ip_greylist_cleanup_delay = limits.ip_greylist_cleanup_delay; } in - P2p_pool.create pool_cfg meta_cfg ~log triggers + P2p_pool.create ?fd_pool pool_cfg meta_cfg ~log triggers let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched triggers log answerer = @@ -188,14 +188,16 @@ module Real = struct let pool net = P2p_connect_handler.get_pool net.connect_handler - let create ~config ~limits ?received_msg_hook ?sent_msg_hook + let create ?fd_pool ~config ~limits ?received_msg_hook ?sent_msg_hook ?broadcasted_msg_hook meta_cfg msg_cfg conn_meta_cfg = let open Lwt_result_syntax in let io_sched = create_scheduler limits in let watcher = Lwt_watcher.create_input () in let log event = Lwt_watcher.notify watcher event in let triggers = P2p_trigger.create () in - let*! pool = create_connection_pool config limits meta_cfg log triggers in + let*! pool = + create_connection_pool ?fd_pool config limits meta_cfg log triggers + in (* There is a mutual recursion between an answerer and connect_handler, for the default answerer. Because of the swap request mechanism, the default answerer needs to initiate new connections using the @@ -601,12 +603,13 @@ let check_limits = in return_unit -let create ~config ~limits ?received_msg_hook ?sent_msg_hook +let create ?fd_pool ~config ~limits ?received_msg_hook ?sent_msg_hook ?broadcasted_msg_hook peer_cfg conn_cfg msg_cfg = let open Lwt_result_syntax in let*? () = check_limits limits in let* net = Real.create + ?fd_pool ~config ~limits ?received_msg_hook diff --git a/src/lib_p2p/p2p.mli b/src/lib_p2p/p2p.mli index f6d083b3e0c13a2e552c21ea1d3a6f18cba13967..489f660ce8e27cfca3e76cc69d034efdd48c89f7 100644 --- a/src/lib_p2p/p2p.mli +++ b/src/lib_p2p/p2p.mli @@ -150,6 +150,7 @@ val faked_network : ['msg] is broadcasted. *) val create : + ?fd_pool:P2p_fd.fd_pool -> config:config -> limits:Tezos_p2p_services.P2p_limits.t -> ?received_msg_hook:(('msg, 'peer_meta, 'conn_meta) connection -> 'msg -> unit) -> diff --git a/src/lib_p2p/p2p_connect_handler.ml b/src/lib_p2p/p2p_connect_handler.ml index 8a3ac05095150ded1ee1a305e062e3c6cd8ce960..3a167c5a83ec18e5622838fea4dd2cc90918f0a1 100644 --- a/src/lib_p2p/p2p_connect_handler.ml +++ b/src/lib_p2p/p2p_connect_handler.ml @@ -719,7 +719,8 @@ let connect ?trusted ?expected_peer_id ?timeout t point = let* () = fail_unless_disconnected_point point_info in let timestamp = Time.System.now () in P2p_point_state.set_requested ~timestamp point_info canceler ; - let*! fd = P2p_fd.socket () in + let fd_pool = P2p_pool.get_fd_pool t.pool in + let* fd = P2p_fd.socket ?fd_pool () in P2p_fd.set_point ~point fd ; let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! () = Events.(emit connect_status) ("start", point) in diff --git a/src/lib_p2p/p2p_fd.ml b/src/lib_p2p/p2p_fd.ml index 5ee6d98a88ec8be0537d96e5d6ed4c5b4865a930..0baef60b4ff4c335eeeb43207e42d361ca88ddcb 100644 --- a/src/lib_p2p/p2p_fd.ml +++ b/src/lib_p2p/p2p_fd.ml @@ -36,7 +36,9 @@ type listening_socket_open_failure = { port : int; } -type error += Failed_to_open_listening_socket of listening_socket_open_failure +type error += + | Failed_to_open_listening_socket of listening_socket_open_failure + | Full_waiting_queue let () = register_error_kind @@ -76,7 +78,20 @@ let () = Some (reason, address, port) | _ -> None) (fun (reason, address, port) -> - Failed_to_open_listening_socket {reason; address; port}) + Failed_to_open_listening_socket {reason; address; port}) ; + let description = + "Not only the maximum number of connection has been reached, but the \ + waiting queue for connections is already full." + in + register_error_kind + `Permanent + ~id:"p2p.fd.Full_waiting_queue" + ~title:"Connection rejected since waiting queue is full" + ~description + ~pp:(fun ppf () -> Format.fprintf ppf "%s" description) + Data_encoding.empty + (function Full_waiting_queue -> Some () | _ -> None) + (fun () -> Full_waiting_queue) let is_not_windows = Sys.os_type <> "Win32" @@ -103,11 +118,21 @@ type connect_error = | unexpected_error ] type accept_error = - [`System_error of exn | `Socket_error of exn | unexpected_error] + [ `System_error of exn + | `Socket_error of exn + | `Full_waiting_queue + | unexpected_error ] + +let accept_error_to_tzerror = function + | `System_error exn | `Socket_error exn | `Unexpected_error exn -> + error_of_exn exn + | `Full_waiting_queue -> Full_waiting_queue type t = { fd : Lwt_unix.file_descr; id : int; + wakener : unit Lwt.u option; + (** Used to put the token back in the fd_pool after closing the file descriptor. *) mutable point : P2p_point.Id.t option; mutable peer_id : P2p_peer.Id.t option; mutable nread : int; @@ -115,6 +140,10 @@ type t = { mutable closing_reasons : P2p_disconnection_reason.t list; } +type fd_pool = unit Lwt_pool.t + +let create_fd_pool ~capacity = Lwt_pool.create capacity Lwt.return + let set_point ~point t = t.point <- Some point let set_peer_id ~peer_id t = t.peer_id <- Some peer_id @@ -138,8 +167,26 @@ let pp_read_write_error fmt = function let create = let counter = ref 0 in - fun fd -> - let open Lwt_syntax in + fun ?fd_pool fd -> + let open Lwt_result_syntax in + let* wakener = + match fd_pool with + | None -> return None + | Some fd_pool -> + (* [1000] is quite arbitrary. It is chosen to be in the same range as + the typical pools. The pools are expected to be around 1000 + as the number of opened file descriptors on Linux is limited to 1024. *) + if Lwt_pool.wait_queue_length fd_pool > 1000 then + Lwt_result.fail `Full_waiting_queue + else + let p, u = Lwt.task () in + (* We grab an element of the pool, which will be freed only when + [u] will be waken up. *) + let _ : unit Lwt.t = Lwt_pool.use fd_pool (fun () -> p) in + (* We wait until the pool is not full. *) + let* () = Lwt_pool.use fd_pool return in + return (Some u) + in incr counter ; let t = { @@ -150,9 +197,10 @@ let create = nread = 0; nwrit = 0; closing_reasons = []; + wakener; } in - let* () = Events.(emit create_fd) t.id in + let*! () = Events.(emit create_fd) t.id in return t let string_of_sockaddr addr = @@ -258,10 +306,12 @@ let raw_socket () = let* sock = socket_setopt_user_timeout sock in socket_setopt_keepalive sock -let socket () = +let socket ?fd_pool () = let open Lwt_syntax in let* socket = raw_socket () in - create socket + Lwt_result.map_error + (fun `Full_waiting_queue -> [Full_waiting_queue]) + (create ?fd_pool socket) let create_listening_socket ?(reuse_port = false) ~backlog ?(addr = Ipaddr.V6.unspecified) port = @@ -326,7 +376,12 @@ let close ?reason t = Lwt.catch (fun () -> (* Guarantee idempotency. *) - if Lwt_unix.state t.fd = Closed then return_unit else close_socket t.fd) + if Lwt_unix.state t.fd = Closed then return_unit + else + let open Lwt_syntax in + let* () = close_socket t.fd in + Option.iter (fun wakener -> Lwt.wakeup wakener ()) t.wakener ; + return_unit) (function (* From [man 2 close] [close] can fail with [EBADF, EINTR, EIO, ENOSPC, EDQUOT] Unix errors. @@ -439,7 +494,7 @@ let connect t saddr = in Lwt.return_error (`Unexpected_error ex)) -let accept listening_socket = +let accept ?fd_pool listening_socket = let open Lwt_result_syntax in let* fd, saddr = Lwt.catch @@ -482,7 +537,7 @@ let accept listening_socket = Lwt.return_error @@ `Socket_error e | ex -> Lwt.return_error @@ `Unexpected_error ex) in - let*! t = create fd in + let* t = create ?fd_pool fd in let*! () = Events.(emit accept_fd) (t.id, string_of_sockaddr saddr) in return (t, saddr) diff --git a/src/lib_p2p/p2p_fd.mli b/src/lib_p2p/p2p_fd.mli index db5cff2c8ca7cc887d74687ade0c5918d7a158ee..7194f8e327b152b02a6676b38eb7bb8860ddbba4 100644 --- a/src/lib_p2p/p2p_fd.mli +++ b/src/lib_p2p/p2p_fd.mli @@ -32,7 +32,9 @@ type listening_socket_open_failure = { (** Type of an error in case of the listening socket fails to open. *) -type error += Failed_to_open_listening_socket of listening_socket_open_failure +type error += + | Failed_to_open_listening_socket of listening_socket_open_failure + | Full_waiting_queue (** This module defines a type [t] which wraps a file descriptor. Most functions simply call the underlying file descriptor function and generate @@ -66,7 +68,7 @@ type read_write_error = | `Connection_locally_closed | unexpected_error ] -(** +(** - Unreachable can be issued when we don't know how to reach to this address @@ -77,7 +79,7 @@ type read_write_error = the behaviour of the firewall: If the firewall drops packets, it will be canceled because of a timeout, otherwise it can refuse the connection and we get this error. - + *) type connect_error = @@ -101,7 +103,12 @@ type connect_error = For most socket-errors you can just log them and call accept again to be ready for the next connection. *) type accept_error = - [`System_error of exn | `Socket_error of exn | unexpected_error] + [ `System_error of exn + | `Socket_error of exn + | `Full_waiting_queue + | unexpected_error ] + +val accept_error_to_tzerror : accept_error -> error (** Pretty printer for read_write_error. *) val pp_read_write_error : Format.formatter -> read_write_error -> unit @@ -136,8 +143,19 @@ val read : t -> Bytes.t -> int -> int -> (int, read_write_error) result Lwt.t underlying socket has been closed. *) val write : t -> Bytes.t -> (unit, read_write_error) result Lwt.t -(** Returns a fresh fd. This call always succeed. *) -val socket : unit -> t Lwt.t +(** [fd_pool] is the type of file descriptors pool used to control the number + of connections opened simultaneously. *) +type fd_pool + +(** [create_fd_pool capacity] creates a file descriptors pool accepting at most + [capacity] connections. *) +val create_fd_pool : capacity:int -> fd_pool + +(** [socket ?fd_pool ()] returns a fresh fd. + If [fd_pool] is [None], this call succeeds immediatly. + If [fd_pool] is [Some p], it waits until [p] has a free element which + can be taken, and succeds. *) +val socket : ?fd_pool:fd_pool -> unit -> t tzresult Lwt.t (** [create_listening_socket ?reuse_port ~backlog ?addr port] creates a socket that listens on [addr] or [Ipaddr.V6.unspecified] if @@ -159,8 +177,13 @@ val create_listening_socket : closed. *) val connect : t -> Lwt_unix.sockaddr -> (unit, connect_error) result Lwt.t -(** [accept sock] accepts connections on socket [sock]. *) +(** [accept ?fd_pool sock] accepts connections on socket [sock]. + If [fd_pool] is [None], it does not perform more checks. + If [fd_pool] is [Some p], it waits until [p] has a free element which + can be taken, and succeeds. *) val accept : - Lwt_unix.file_descr -> (t * Lwt_unix.sockaddr, accept_error) result Lwt.t + ?fd_pool:fd_pool -> + Lwt_unix.file_descr -> + (t * Lwt_unix.sockaddr, accept_error) result Lwt.t module Table : Hashtbl.S with type key = t diff --git a/src/lib_p2p/p2p_pool.ml b/src/lib_p2p/p2p_pool.ml index 722640bccda5bb23b2fe0925b6ac6d9dec1605a9..a9c6a12d9bf2fa22d861716c5d3a54ff50dadd3c 100644 --- a/src/lib_p2p/p2p_pool.ml +++ b/src/lib_p2p/p2p_pool.ml @@ -73,8 +73,11 @@ type ('msg, 'peer, 'conn) t = { triggers : P2p_trigger.t; log : P2p_connection.P2p_event.t -> unit; acl : P2p_acl.t; + fd_pool : P2p_fd.fd_pool option; } +let get_fd_pool t = t.fd_pool + module Gc_point_set = Bounded_heap.Make (struct type t = Time.System.t * P2p_point.Id.t @@ -551,7 +554,7 @@ let score {peer_meta_config = {score; _}; _} meta = score meta let active_connections pool = P2p_peer.Table.length pool.connected_peer_ids -let create config peer_meta_config triggers ~log = +let create ?fd_pool config peer_meta_config triggers ~log = let open Lwt_syntax in let pool = { @@ -569,6 +572,7 @@ let create config peer_meta_config triggers ~log = ~ip_size:config.ip_greylist_size_in_kilobytes ~ip_cleanup_delay:config.ip_greylist_cleanup_delay; log; + fd_pool; } in List.iter (Points.set_trusted pool) config.trusted_points ; @@ -743,7 +747,7 @@ module Internal_for_tests = struct ip_greylist_cleanup_delay; } - let create peer_meta_encoding initial_peer = + let create ?fd_pool peer_meta_encoding initial_peer = let triggers = P2p_trigger.create () in let config = dumb_config in let peer_meta_config : 'peer P2p_params.peer_meta_config = @@ -770,5 +774,6 @@ module Internal_for_tests = struct ~ip_size:config.ip_greylist_size_in_kilobytes ~ip_cleanup_delay:config.ip_greylist_cleanup_delay; log; + fd_pool; } end diff --git a/src/lib_p2p/p2p_pool.mli b/src/lib_p2p/p2p_pool.mli index 2870df1c52b3114bc5ac99de2dcadd5a22fb1b4d..f2eb640f01f95a892ae9bd618903dcf1a0bc3398 100644 --- a/src/lib_p2p/p2p_pool.mli +++ b/src/lib_p2p/p2p_pool.mli @@ -72,6 +72,7 @@ type config = { } val create : + ?fd_pool:P2p_fd.fd_pool -> config -> 'peer P2p_params.peer_meta_config -> P2p_trigger.t -> @@ -429,10 +430,19 @@ val add_to_id_points : ('msg, 'peer, 'conn) t -> P2p_point.Id.t -> unit val set_expected_peer_id : ('msg, 'peer, 'conn) t -> P2p_point.Id.t -> P2p_peer.Id.t -> unit Lwt.t +(** [get_fd_pool t] returns the file descriptors pool contained in [t], if any.*) +val get_fd_pool : ('msg, 'peer, 'conn) t -> P2p_fd.fd_pool option + (**/**) module Internal_for_tests : sig - (** [create peer_encoding peer] returns a pool. [peer_encoding] and [peer] are needed as some functions of [P2p_pool] - return it. *) - val create : 'peer Data_encoding.t -> 'peer -> ('msg, 'peer, 'conn) t + (** [create ?fd_pool peer_encoding peer] returns a pool. + [peer_encoding] and [peer] are needed as some functions of [P2p_pool] + return it. + [fd_pool] is the field containing the file descriptors pool of the constructed pool.*) + val create : + ?fd_pool:P2p_fd.fd_pool -> + 'peer Data_encoding.t -> + 'peer -> + ('msg, 'peer, 'conn) t end diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 9b5f9980da5e74183f30b0187c91c44ed5636f58..4dd172b33d4f44ba1824cbece518da98ddb339f5 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -908,7 +908,11 @@ module Internal_for_tests = struct } in let scheduled_conn = - let f2d_t = Tezos_base_unix.Event_loop.main_run P2p_fd.socket in + let socket = Tezos_base_unix.Event_loop.main_run P2p_fd.socket in + let f2d_t = + match socket with Ok socket -> socket | Error _ -> assert false + (* [P2p_fd.socket] cannot fail when not given a fd_pool. *) + in P2p_io_scheduler.register (P2p_io_scheduler.create ~read_buffer_size:0 ()) f2d_t diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index abd7ec1655b2836f9bd083795d42e44a0fedc725..7c21a69c5539272076a7bc38a291f3597bc894c0 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -47,7 +47,10 @@ let accept st = let* () = Lwt_mutex.lock accept_lock in let* res = protect @@ fun () -> - let* r = P2p_fd.accept st.socket in + let fd_pool = + P2p_pool.get_fd_pool @@ P2p_connect_handler.get_pool connect_handler + in + let* r = P2p_fd.accept ?fd_pool st.socket in Result.fold ~ok:(fun (fd, addr) -> let point = @@ -77,10 +80,7 @@ let accept st = (TzTrace.make (error_of_exn ex), "socket") in Lwt.return (Ok ()) - | `Unexpected_error ex -> - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5632 - Losing some information here... *) - Lwt.return_error (TzTrace.make (error_of_exn ex))) + | ex -> Lwt_result_syntax.tzfail (P2p_fd.accept_error_to_tzerror ex)) r in Lwt_mutex.unlock accept_lock ; diff --git a/src/lib_p2p/test/common/p2p_test_utils.ml b/src/lib_p2p/test/common/p2p_test_utils.ml index 282858cbcdfd0af873fa91cf5aac57303ded5857..c9f47469d661ec178f39c6750ed6a589fea795f7 100644 --- a/src/lib_p2p/test/common/p2p_test_utils.ml +++ b/src/lib_p2p/test/common/p2p_test_utils.ml @@ -264,8 +264,7 @@ let raw_accept sched main_socket = let open Lwt_syntax in let* r = P2p_fd.accept main_socket in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.fail ex + | Error e -> Lwt_result_syntax.tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, sockaddr) -> let fd = P2p_io_scheduler.register sched fd in let point = @@ -285,8 +284,7 @@ let accept ?(id = id1) ?(proof_of_work_target = proof_of_work_target) sched let* r = raw_accept sched main_socket in let* id1 = id in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.fail ex + | Error e -> Lwt_result.fail e | Ok (fd, point) -> P2p_socket.authenticate ~canceler @@ -300,11 +298,11 @@ let accept ?(id = id1) ?(proof_of_work_target = proof_of_work_target) sched let raw_connect sched addr port = let open Lwt_result_syntax in - let*! fd = P2p_fd.socket () in + let* fd = Lwt_result.map_error Either.right (P2p_fd.socket ()) in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in - let* () = P2p_fd.connect fd uaddr in + let* () = Lwt_result.map_error Either.left (P2p_fd.connect fd uaddr) in let fd = P2p_io_scheduler.register sched fd in - Lwt.return_ok fd + return fd exception Cant_connect @@ -316,9 +314,11 @@ let connect ?(proof_of_work_target = proof_of_work_target) sched addr port id = let*! r = raw_connect sched addr port in match r with | Error - ( `Network_unreachable | `Connection_unreachable | `Connection_refused - | `Connection_canceled | `Unexpected_error _ ) -> + (Left + ( `Network_unreachable | `Connection_unreachable | `Connection_refused + | `Connection_canceled | `Unexpected_error _ )) -> Lwt.fail Cant_connect + | Error (Right e) -> Lwt_result.fail e | Ok fd -> P2p_socket.authenticate ~canceler diff --git a/src/lib_p2p/test/common/p2p_test_utils.mli b/src/lib_p2p/test/common/p2p_test_utils.mli index ec868525c1f1543df3f0b672d41e1a0cdf3116da..894dd95ddd85d629e0040ac36e14077dd8c63824 100644 --- a/src/lib_p2p/test/common/p2p_test_utils.mli +++ b/src/lib_p2p/test/common/p2p_test_utils.mli @@ -111,8 +111,7 @@ val run_nodes_fd : val raw_accept : P2p_io_scheduler.t -> Lwt_unix.file_descr -> - (P2p_io_scheduler.connection * (P2p_addr.t * int), P2p_fd.accept_error) result - Lwt.t + (P2p_io_scheduler.connection * (P2p_addr.t * int)) tzresult Lwt.t val accept : ?id:P2p_identity.t Lwt.t -> @@ -128,7 +127,8 @@ val raw_connect : P2p_io_scheduler.t -> P2p_addr.t -> int -> - (P2p_io_scheduler.connection, P2p_fd.connect_error) result Lwt.t + (P2p_io_scheduler.connection, (P2p_fd.connect_error, tztrace) Either.t) result + Lwt.t val connect : ?proof_of_work_target:Tezos_crypto.Crypto_box.pow_target -> diff --git a/src/lib_p2p/test/test_p2p_connect_handler.ml b/src/lib_p2p/test/test_p2p_connect_handler.ml index 349d3fb98dd948a53e2b31937ccc21a236e0ce32..9144cfc4cb9bc213dbba6591e54d52f22f233505 100644 --- a/src/lib_p2p/test/test_p2p_connect_handler.ml +++ b/src/lib_p2p/test/test_p2p_connect_handler.ml @@ -84,7 +84,7 @@ let test_correct_incoming_connection_number = (`Make_default_pool ()) (`Dependencies dependencies) in - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in Alcotest.( check' int diff --git a/src/lib_p2p/test/test_p2p_io_scheduler.ml b/src/lib_p2p/test/test_p2p_io_scheduler.ml index 2734734878827e4ab43a8ce5859cd0a6b72135ef..6c03e99b413d16b75991194d30c6cf7fb05ecdca 100644 --- a/src/lib_p2p/test/test_p2p_io_scheduler.ml +++ b/src/lib_p2p/test/test_p2p_io_scheduler.ml @@ -50,15 +50,14 @@ let rec listen ?port addr = | Error err -> Lwt.return_error err let accept main_socket = - let open Lwt_syntax in - let* r = P2p_fd.accept main_socket in + let open Lwt_result_syntax in + let*! r = P2p_fd.accept main_socket in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.fail ex - | Ok (fd, _) -> Lwt.return fd + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) + | Ok (fd, _) -> return fd let rec accept_n main_socket n = - let open Lwt_syntax in + let open Lwt_result_syntax in if n <= 0 then return_nil else let* acc = accept_n main_socket (n - 1) in @@ -68,6 +67,13 @@ let rec accept_n main_socket n = let connect addr port = let open Lwt_syntax in let* fd = P2p_fd.socket () in + let fd = + match fd with + | Ok fd -> fd + | Error _ -> + assert + false (* [P2p_fd.socket] cannot fail when not given a fd_semaphore. *) + in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let* r = P2p_fd.connect fd uaddr in match r with @@ -107,7 +113,7 @@ let receive conn = let server ?(display_client_stat = true) ?max_download_speed ?read_queue_size ~read_buffer_size main_socket n = - let open Lwt_syntax in + let open Lwt_result_syntax in let sched = P2p_io_scheduler.create ?max_download_speed @@ -127,8 +133,10 @@ let server ?(display_client_stat = true) ?max_download_speed ?read_queue_size (* Accept and read message until the connection is closed. *) let* conns = accept_n main_socket n in let conns = List.map (P2p_io_scheduler.register sched) conns in - let* () = List.iter_p receive (List.map P2p_io_scheduler.to_readable conns) in - let* r = + let*! () = + List.iter_p receive (List.map P2p_io_scheduler.to_readable conns) + in + let*! r = List.iter_ep (P2p_io_scheduler.close ~reason:(User "shutdown from server")) conns @@ -136,7 +144,7 @@ let server ?(display_client_stat = true) ?max_download_speed ?read_queue_size match r with | Ok () -> Tezt.Log.debug "OK %a" P2p_stat.pp (P2p_io_scheduler.global_stat sched) ; - return_ok () + return () | Error _ -> Lwt.fail Alcotest.Test_error let max_size ?max_upload_speed () = diff --git a/src/lib_p2p/test/test_p2p_socket.ml b/src/lib_p2p/test/test_p2p_socket.ml index 808d10c266b812f713953b7365465d76df4f1845..c5ae7fbb7f1fa193fbb4eca1712a68909383fbce 100644 --- a/src/lib_p2p/test/test_p2p_socket.ml +++ b/src/lib_p2p/test/test_p2p_socket.ml @@ -224,10 +224,7 @@ module Low_level = struct let msg = Bytes.create (Bytes.length simple_msg) in let*! r = raw_connect sched addr port in match r with - | Error - ( `Network_unreachable | `Connection_unreachable | `Connection_refused - | `Connection_canceled | `Unexpected_error _ ) -> - Lwt.fail Alcotest.Test_error + | Error _ -> Lwt.fail Alcotest.Test_error | Ok fd -> let* () = P2p_buffer_reader.( @@ -241,8 +238,7 @@ module Low_level = struct let open Lwt_result_syntax in let*! r = raw_accept sched socket in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.fail ex + | Error e -> fail e | Ok (fd, _point) -> let* () = P2p_io_scheduler.write fd simple_msg in let* () = sync ch in diff --git a/src/lib_p2p/tezt/test_p2p_fd.ml b/src/lib_p2p/tezt/test_p2p_fd.ml index cdfc6a294c8cd8afa8c9c4f365a88d0beb20285f..63660ae06fd466bed606d5b62a1028f927621a5d 100644 --- a/src/lib_p2p/tezt/test_p2p_fd.ml +++ b/src/lib_p2p/tezt/test_p2p_fd.ml @@ -37,13 +37,12 @@ let test_connect () = let server _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (_fd, _sockaddr) -> return_unit in let client _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -80,8 +79,7 @@ let test_read_write () = let server data _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, _sockaddr) -> ( let*! r = P2p_fd.write fd data in match r with @@ -95,7 +93,7 @@ let test_read_write () = in let client data _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -156,8 +154,7 @@ let test_closed_by_peer_read_outgoing () = let server ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, _sockaddr) -> let*! () = P2p_fd.close ~reason:(User "server explicit close") fd in let* () = sync ch in @@ -165,7 +162,7 @@ let test_closed_by_peer_read_outgoing () = in let client ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -215,8 +212,7 @@ let test_closed_by_peer_read_incoming () = let server ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, _sockaddr) -> ( let* () = sync ch in let data_length = 10 in @@ -232,7 +228,7 @@ let test_closed_by_peer_read_incoming () = in let client ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -274,13 +270,12 @@ let test_locally_closed_read_outgoing () = let server _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (_fd, _sockaddr) -> return_unit in let client _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -330,8 +325,7 @@ let test_locally_closed_read_incoming () = let server _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, _sockaddr) -> ( let*! () = P2p_fd.close ~reason:(User "server explicit close") fd in let data_length = 10 in @@ -347,7 +341,7 @@ let test_locally_closed_read_incoming () = in let client _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -386,13 +380,12 @@ let test_locally_closed_write_outgoing () = let server _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (_fd, _sockaddr) -> return_unit in let client _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with @@ -441,8 +434,7 @@ let test_locally_closed_write_incoming () = let server _ch listening_fd = let*! r = P2p_fd.accept listening_fd in match r with - | Error (`Socket_error ex | `System_error ex | `Unexpected_error ex) -> - Lwt.return_error (TzTrace.make (error_of_exn ex)) + | Error e -> tzfail (P2p_fd.accept_error_to_tzerror e) | Ok (fd, _sockaddr) -> ( let*! () = P2p_fd.close ~reason:(User "server explicit close") fd in let data = Bytes.of_string "test" in @@ -457,7 +449,7 @@ let test_locally_closed_write_incoming () = in let client _ch addr port = - let*! fd = P2p_fd.socket () in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! r = P2p_fd.connect fd uaddr in match r with diff --git a/src/lib_p2p/tezt/test_p2p_socket.ml b/src/lib_p2p/tezt/test_p2p_socket.ml index 2bc076532fb08baaaf4b40242e050f57b24dc689..84db1e0d1adac1a15fd2a18270e4ae278c25865a 100644 --- a/src/lib_p2p/tezt/test_p2p_socket.ml +++ b/src/lib_p2p/tezt/test_p2p_socket.ml @@ -221,7 +221,9 @@ module Self_identification = struct in let*! conn = P2p_test_utils.raw_connect sched addr port in match conn with - | Error e -> Test.fail "First connection failed: %a" pp_connect_error e + | Error (Left e) -> + Test.fail "First connection failed: %a" pp_connect_error e + | Error (Right _) -> Test.fail "First connection failed unexpectedly" | Ok conn -> ( let canceler = Lwt_canceler.create () in (* During the first connection, the client get the connection message @@ -240,8 +242,10 @@ module Self_identification = struct connection message instead of its own. *) let*! conn = P2p_test_utils.raw_connect sched addr port in match conn with - | Error e -> + | Error (Left e) -> Test.fail "Second connection failed: %a" pp_connect_error e + | Error (Right _) -> + Test.fail "Second connection failed: unexpectedly" | Ok conn -> let canceler = Lwt_canceler.create () in let* sent_msg =