diff --git a/src/lib_p2p/p2p_connect_handler.ml b/src/lib_p2p/p2p_connect_handler.ml index 2880e8e5b2d05e33aa9a06f2ded330d2cd6056b1..bdb953502d04e8267295e575f37a8c446cffc541 100644 --- a/src/lib_p2p/p2p_connect_handler.ml +++ b/src/lib_p2p/p2p_connect_handler.ml @@ -653,7 +653,7 @@ let connect ?trusted ?expected_peer_id ?timeout t point = let* () = fail_unless_disconnected_point point_info in let timestamp = Time.System.now () in P2p_point_state.set_requested ~timestamp point_info canceler ; - let*! fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in + let*! fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let*! () = Events.(emit connect_status) ("start", point) in let* () = diff --git a/src/lib_p2p/p2p_fd.ml b/src/lib_p2p/p2p_fd.ml index 40665e5b2b4f6fd731c153078823fb47887c519a..33f7a791583a4bcd14a3e8776aa8f67e42222255 100644 --- a/src/lib_p2p/p2p_fd.ml +++ b/src/lib_p2p/p2p_fd.ml @@ -27,6 +27,57 @@ module Events = P2p_events.P2p_fd +(* This error is not declared in P2p_errors module of lib_p2p_services because + Unix.error_encoding is part of lib_stdlib_unix that cannot be added easily + as a dependency of lib_p2p_services. *) +type listening_socket_open_failure = { + reason : Unix.error; + address : P2p_addr.t; + port : int; +} + +type error += Failed_to_open_listening_socket of listening_socket_open_failure + +let () = + register_error_kind + `Permanent + ~id:"p2p.welcome.failed_to_open_listening_socket" + ~title:"Failed to open listening socket" + ~description:"The p2p listening socket could not be opened." + ~pp:(fun ppf (reason, address, port) -> + let tips ppf () = + match reason with + | Unix.EADDRINUSE -> + Format.fprintf + ppf + "Another tezos node is probably running on this address.@;\ + Please choose another P2P port using --net-addr." + | _ -> Format.fprintf ppf "" + in + Format.fprintf + ppf + "@[An error occured while initializing P2P server on this \ + address: %a:%d.@;\ + Reason: %s.@;\ + %a@]" + P2p_addr.pp + address + port + (Unix.error_message reason) + tips + ()) + Data_encoding.( + obj3 + (req "reason" Unix_error.encoding) + (req "address" P2p_addr.encoding) + (req "port" uint16)) + (function + | Failed_to_open_listening_socket {reason; address; port} -> + Some (reason, address, port) + | _ -> None) + (fun (reason, address, port) -> + Failed_to_open_listening_socket {reason; address; port}) + let is_not_windows = Sys.os_type <> "Win32" let () = @@ -88,8 +139,31 @@ let string_of_sockaddr addr = let id t = t.id -let socket proto kind arg = - create (Lwt_unix.socket ~cloexec:true proto kind arg) +let raw_socket () = Lwt_unix.socket ~cloexec:true PF_INET6 SOCK_STREAM 0 + +let socket () = create (raw_socket ()) + +let create_listening_socket ?(reuse_port = false) ~backlog + ?(addr = Ipaddr.V6.unspecified) port = + let open Lwt_result_syntax in + Lwt.catch + (fun () -> + let sock = raw_socket () in + (if reuse_port then Lwt_unix.(setsockopt sock SO_REUSEPORT true)) ; + Lwt_unix.(setsockopt sock SO_REUSEADDR true) ; + let*! () = + Lwt_unix.bind + sock + Unix.(ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port)) + in + Lwt_unix.listen sock backlog ; + return sock) + (function + | Unix.Unix_error (err, _, _) -> + tzfail + (Failed_to_open_listening_socket + {reason = err; address = addr; port}) + | exn -> Lwt.fail exn) let close t = let open Lwt_result_syntax in @@ -164,11 +238,11 @@ let connect t saddr = Lwt.return_error `Connection_refused | ex -> Lwt.return_error (`Unexpected_error ex)) -let accept sock = +let accept listening_socket = Lwt.catch (fun () -> let open Lwt_syntax in - let* fd, saddr = Lwt_unix.accept sock in + let* fd, saddr = Lwt_unix.accept ~cloexec:true listening_socket in let* t = create fd in let* () = Events.(emit accept_fd) (t.id, string_of_sockaddr saddr) in return_ok (t, saddr)) diff --git a/src/lib_p2p/p2p_fd.mli b/src/lib_p2p/p2p_fd.mli index 8f871268552ad052f1de311ece9a465bcb02a9a4..f07ea62d3411bb3a13beb60c6eb0f3dfd35eb47f 100644 --- a/src/lib_p2p/p2p_fd.mli +++ b/src/lib_p2p/p2p_fd.mli @@ -54,6 +54,17 @@ type close_reason = | `Unexpected_error of exn | `Unexpected_error_when_closing of exn * exn ] +(** Type describing an opening failure for the listening socket. *) +type listening_socket_open_failure = { + reason : Unix.error; (** The error we are re-raising *) + address : P2p_addr.t; (** The interface we are trying to listen to *) + port : int; (** The port we are trying to listen to *) +} + +(** Type of an error in case of the listening + socket fails to open. *) +type error += Failed_to_open_listening_socket of listening_socket_open_failure + val pp_close_reason : Format.formatter -> close_reason -> unit (** [id t] returns a unique, positive, identifier for t. Identifiers @@ -79,7 +90,23 @@ val close : t -> (unit, [`Unexpected_error of exn]) result Lwt.t val write : t -> Bytes.t -> (unit, close_reason) result Lwt.t (** Returns a fresh fd. This call always succeed. *) -val socket : Lwt_unix.socket_domain -> Lwt_unix.socket_type -> int -> t Lwt.t +val socket : unit -> t Lwt.t + +(** [create_listening_socket ?reuse_port ~backlog ?addr port] creates + a socket that listens on [addr] or [Ipaddr.V6.unspecified] if + [addr] is not provided and on [port]. + + [reuse_port] is used to set Unix socket option [SO_REUSEPORT]. If + [reuse_port] is not provided this option is set to false. + [SO_REUSEADDR] is set to true. + + [backlog] set the maximum number of pending connections. *) +val create_listening_socket : + ?reuse_port:bool -> + backlog:int -> + ?addr:Ipaddr.V6.t -> + int -> + Lwt_unix.file_descr tzresult Lwt.t (** [connect fd addr] connect [fd] to [addr]. *) val connect : diff --git a/src/lib_p2p/p2p_socket.ml b/src/lib_p2p/p2p_socket.ml index 8a210825c398c68fd8e3f0d85b364c0c3c7d9259..a2dbf910d405554211b99aeb6e5a17fee365ffe7 100644 --- a/src/lib_p2p/p2p_socket.ml +++ b/src/lib_p2p/p2p_socket.ml @@ -884,7 +884,7 @@ module Internal_for_tests = struct } in let scheduled_conn = - let f2d_t = Lwt_main.run (P2p_fd.socket PF_INET6 SOCK_STREAM 0) in + let f2d_t = Lwt_main.run (P2p_fd.socket ()) in P2p_io_scheduler.register (P2p_io_scheduler.create ~read_buffer_size:0 ()) f2d_t diff --git a/src/lib_p2p/p2p_welcome.ml b/src/lib_p2p/p2p_welcome.ml index f88aa57a73d3e04befdc6235567d30c7c4d886e2..5c7780f0c6a4fa201b88b72bb328983d5733094d 100644 --- a/src/lib_p2p/p2p_welcome.ml +++ b/src/lib_p2p/p2p_welcome.ml @@ -25,54 +25,6 @@ module Events = P2p_events.P2p_welcome -type listening_socket_open_failure = { - reason : Unix.error; - address : P2p_addr.t; - port : int; -} - -type error += Failed_to_open_listening_socket of listening_socket_open_failure - -let () = - register_error_kind - `Permanent - ~id:"p2p.welcome.failed_to_open_listening_socket" - ~title:"Failed to open listening socket" - ~description:"The p2p listening socket could not be opened." - ~pp:(fun ppf (reason, address, port) -> - let tips ppf () = - match reason with - | Unix.EADDRINUSE -> - Format.fprintf - ppf - "Another tezos node is probably running on this address.@;\ - Please choose another P2P port using --net-addr." - | _ -> Format.fprintf ppf "" - in - Format.fprintf - ppf - "@[An error occured while initializing P2P server on this \ - address: %a:%d.@;\ - Reason: %s.@;\ - %a@]" - P2p_addr.pp - address - port - (Unix.error_message reason) - tips - ()) - Data_encoding.( - obj3 - (req "reason" Unix_error.encoding) - (req "address" P2p_addr.encoding) - (req "port" uint16)) - (function - | Failed_to_open_listening_socket {reason; address; port} -> - Some (reason, address, port) - | _ -> None) - (fun (reason, address, port) -> - Failed_to_open_listening_socket {reason; address; port}) - type connect_handler = | Connect_handler : ('msg, 'meta, 'meta_conn) P2p_connect_handler.t @@ -130,33 +82,13 @@ let rec worker_loop st = | Error (Canceled :: _) -> Lwt.return_unit | Error err -> Events.(emit unexpected_error) err -let create_listening_socket ?(reuse_port = false) ~backlog - ?(addr = Ipaddr.V6.unspecified) port = - let open Lwt_result_syntax in - Lwt.catch - (fun () -> - let main_socket = Lwt_unix.(socket PF_INET6 SOCK_STREAM 0) in - (if reuse_port then Lwt_unix.(setsockopt main_socket SO_REUSEPORT true)) ; - Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; - let*! () = - Lwt_unix.bind - main_socket - Unix.(ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port)) - in - Lwt_unix.listen main_socket backlog ; - return main_socket) - (function - | Unix.Unix_error (err, _, _) -> - tzfail - (Failed_to_open_listening_socket - {reason = err; address = addr; port}) - | exn -> Lwt.fail exn) - let create ?reuse_port ?addr ~backlog connect_handler port = let open Lwt_result_syntax in Lwt.catch (fun () -> - let* socket = create_listening_socket ?reuse_port ~backlog ?addr port in + let* socket = + P2p_fd.create_listening_socket ?reuse_port ~backlog ?addr port + in let canceler = Lwt_canceler.create () in let st = { diff --git a/src/lib_p2p/p2p_welcome.mli b/src/lib_p2p/p2p_welcome.mli index 0c03cc5101991a138c42fa315b7741b6b2dba259..bfb3fb47d56d94aab9713412c75874a3847e08df 100644 --- a/src/lib_p2p/p2p_welcome.mli +++ b/src/lib_p2p/p2p_welcome.mli @@ -28,18 +28,6 @@ Accept incoming connections and add them to the pool. *) -(** Type discribing an opening failure for the - listening socket. *) -type listening_socket_open_failure = { - reason : Unix.error; (** The error we are re-raising *) - address : P2p_addr.t; (** The interface we are trying to listen to *) - port : int; (** The port we are trying to listen to *) -} - -(** Type of an error in case where the listening - socket fails to open. *) -type error += Failed_to_open_listening_socket of listening_socket_open_failure - (** Type of a welcome worker. *) type t diff --git a/src/lib_p2p/test/common/p2p_test_utils.ml b/src/lib_p2p/test/common/p2p_test_utils.ml index b538af3d721f990cc1603f60ea1ef20ac4a549b5..02d440e3ad5854b250272655137b0574573ec9dc 100644 --- a/src/lib_p2p/test/common/p2p_test_utils.ml +++ b/src/lib_p2p/test/common/p2p_test_utils.ml @@ -135,23 +135,19 @@ let conn_meta_config : unit P2p_params.conn_meta_config = let glob_port = ref None let listen ?port addr = - let open Lwt_syntax in - let uaddr = Ipaddr_unix.V6.to_inet_addr addr in - let main_socket = Lwt_unix.(socket ~cloexec:true PF_INET6 SOCK_STREAM 0) in - Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; + let open Lwt_result_syntax in (* If [port] is [0], a fresh port is used. *) let port_or_gen = Option.value port ~default:0 in - let* () = Lwt_unix.bind main_socket (ADDR_INET (uaddr, port_or_gen)) in - Lwt_unix.listen main_socket 1 ; + let* sock = P2p_fd.create_listening_socket ~backlog:1 ~addr port_or_gen in let port = match port with | Some port -> port | None -> ( (* if [port] was [0], we get the port generated. *) - let addr = Lwt_unix.getsockname main_socket in + let addr = Lwt_unix.getsockname sock in match addr with ADDR_INET (_, port) -> port | _ -> assert false) in - Lwt.return (main_socket, port) + return (sock, port) let rec sync_nodes nodes = let open Lwt_result_syntax in @@ -179,7 +175,7 @@ let run_nodes ~addr ?port client server = glob_port := Some (p + 1) ; Some p in - let*! main_socket, p = listen ?port:p addr in + let* main_socket, p = listen ?port:p addr in let* server_node = Process.detach ~prefix:"server: " (fun channel -> let sched = P2p_io_scheduler.create ~read_buffer_size:(1 lsl 12) () in @@ -257,7 +253,7 @@ let accept ?(id = id1) ?(proof_of_work_target = proof_of_work_target) sched let raw_connect sched addr port = let open Lwt_result_syntax in - let*! fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in + let*! fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let* () = P2p_fd.connect fd uaddr in let fd = P2p_io_scheduler.register sched fd in diff --git a/src/lib_p2p/test/test_p2p_connect_handler.ml b/src/lib_p2p/test/test_p2p_connect_handler.ml index 7fd3256a21d24210e22aafca8394e6a7a266b047..0f9effcdc6d376282661dbe87f9099774cf5171f 100644 --- a/src/lib_p2p/test/test_p2p_connect_handler.ml +++ b/src/lib_p2p/test/test_p2p_connect_handler.ml @@ -84,7 +84,7 @@ let test_correct_incoming_connection_number = (`Make_default_pool ()) (`Dependencies dependencies) in - let*! fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in + let*! fd = P2p_fd.socket () in Alcotest.( check' int diff --git a/src/lib_p2p/test/test_p2p_io_scheduler.ml b/src/lib_p2p/test/test_p2p_io_scheduler.ml index 9a23e722bec5a0b815e7bec3a6b9db1242aea221..d28f12737624ec3ef0cb2e45885edb99aa2fd5c3 100644 --- a/src/lib_p2p/test/test_p2p_io_scheduler.ml +++ b/src/lib_p2p/test/test_p2p_io_scheduler.ml @@ -39,21 +39,19 @@ end) exception Error of error list let rec listen ?port addr = - let open Lwt_syntax in + let open Lwt_result_syntax in let tentative_port = match port with None -> 0 | Some port -> port in - let uaddr = Ipaddr_unix.V6.to_inet_addr addr in - let main_socket = Lwt_unix.(socket ~cloexec:true PF_INET6 SOCK_STREAM 0) in - Lwt_unix.(setsockopt main_socket SO_REUSEADDR true) ; - Lwt.catch - (fun () -> - let* () = Lwt_unix.bind main_socket (ADDR_INET (uaddr, tentative_port)) in - Lwt_unix.listen main_socket 50 ; - return (main_socket, tentative_port)) - (function - | Unix.Unix_error ((Unix.EADDRINUSE | Unix.EADDRNOTAVAIL), _, _) - when port = None -> - listen addr - | exn -> Lwt.fail exn) + let*! sock = + P2p_fd.create_listening_socket ~backlog:50 ~addr tentative_port + in + match sock with + | Ok sock -> return (sock, tentative_port) + | Error + (P2p_fd.Failed_to_open_listening_socket + {reason = Unix.EADDRINUSE | Unix.EADDRNOTAVAIL; _} + :: _) -> + listen ?port addr + | Error err -> Lwt.return_error err let accept main_socket = let open Lwt_syntax in @@ -73,7 +71,7 @@ let rec accept_n main_socket n = let connect addr port = let open Lwt_syntax in - let* fd = P2p_fd.socket PF_INET6 SOCK_STREAM 0 in + let* fd = P2p_fd.socket () in let uaddr = Lwt_unix.ADDR_INET (Ipaddr_unix.V6.to_inet_addr addr, port) in let* r = P2p_fd.connect fd uaddr in match r with @@ -195,25 +193,20 @@ let client ?max_upload_speed ?write_queue_size addr port time _n = *) let run ?display_client_stat ?max_download_speed ?max_upload_speed ~read_buffer_size ?read_queue_size ?write_queue_size addr port time n = - let open Lwt_syntax in - let* () = Tezos_base_unix.Internal_event_unix.init () in + let open Lwt_result_syntax in + let*! () = Tezos_base_unix.Internal_event_unix.init () in let* main_socket, port = listen ?port addr in let* server_node = - let* r = - Process.detach - ~prefix:"server: " - (fun (_ : (unit, unit) Process.Channel.t) -> - server - ?display_client_stat - ?max_download_speed - ~read_buffer_size - ?read_queue_size - main_socket - n) - in - match r with - | Error err -> Lwt.fail (Error err) - | Ok node -> Lwt.return node + Process.detach + ~prefix:"server: " + (fun (_ : (unit, unit) Process.Channel.t) -> + server + ?display_client_stat + ?max_download_speed + ~read_buffer_size + ?read_queue_size + main_socket + n) in let client n = @@ -221,7 +214,9 @@ let run ?display_client_stat ?max_download_speed ?max_upload_speed Process.detach ~prefix (fun (_ : (unit, unit) Process.Channel.t) -> let* () = Lwt.catch - (fun () -> Lwt_unix.close main_socket) + (fun () -> + let*! () = Lwt_unix.close main_socket in + return_unit) (function (* the connection was already closed *) | Unix.Unix_error (EBADF, _, _) -> return_unit @@ -229,10 +224,8 @@ let run ?display_client_stat ?max_download_speed ?max_upload_speed in client ?max_upload_speed ?write_queue_size addr port time n) in - let* r = List.map_es client (1 -- n) in - match r with - | Error err -> Lwt.fail (Error err) - | Ok client_nodes -> Process.wait_all (server_node :: client_nodes) + let* client_nodes = List.map_es client (1 -- n) in + Process.wait_all (server_node :: client_nodes) let () = Random.self_init ()