diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index dc335c99efd09c6b64d1ec897f13f4e7e673a5bc..ab79648200e6bdb5bed4f3da6e1a0abd211ee8c2 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -105,19 +105,22 @@ let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched ~answerer let may_create_discovery_worker _limits config pool = + let open Lwt_result_syntax in match (config.listening_port, config.discovery_port, config.discovery_addr) with | Some listening_port, Some discovery_port, Some discovery_addr -> - Some - (P2p_discovery.create - pool - config.identity.peer_id - ~listening_port - ~discovery_port - ~discovery_addr - ~trust_discovered_peers:config.trust_discovered_peers) - | _, _, _ -> None + let* discovery = + P2p_discovery.create + pool + config.identity.peer_id + ~listening_port + ~discovery_port + ~discovery_addr + ~trust_discovered_peers:config.trust_discovered_peers + in + return_some discovery + | _, _, _ -> return_none let create_maintenance_worker limits connect_handler config triggers log = let open P2p_limits in @@ -142,7 +145,7 @@ let create_maintenance_worker limits connect_handler config triggers log = } in let pool = P2p_connect_handler.get_pool connect_handler in - let discovery = may_create_discovery_worker limits config pool in + let* discovery = may_create_discovery_worker limits config pool in let* p2p_maintenance = P2p_maintenance.create ?discovery diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml index 4fd7a6a23ac70f95729f8666baf98f657e3b3505..7c73e87bc4c5e77cac7d79a6be1522611a17e21d 100644 --- a/src/lib_p2p/p2p_discovery.ml +++ b/src/lib_p2p/p2p_discovery.ml @@ -43,22 +43,40 @@ module Message = struct end module Answer = struct - type t = { + type inner_state = { my_peer_id : P2p_peer.Id.t; pool : pool; discovery_port : int; - canceler : Lwt_canceler.t; trust_discovered_peers : bool; - mutable worker : unit Lwt.t; + socket : Lwt_unix.file_descr; } - let create_socket st = + module Name = P2p_workers.Unique_name_maker (struct + let base = ["p2p_discovery"; "answer"] + end) + + module Types = struct + type state = inner_state + + type parameters = { + my_peer_id : P2p_peer.Id.t; + pool : pool; + discovery_port : int; + trust_discovered_peers : bool; + } + end + + module Worker = P2p_workers.Make (Name) (P2p_workers.Loop_request) (Types) + + type t = Worker.activated_worker + + let create_socket discovery_port canceler = let open Lwt_syntax in Lwt.catch (fun () -> let socket = Lwt_unix.socket PF_INET SOCK_DGRAM 0 in Lwt_unix.set_close_on_exec socket ; - Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_canceler.on_cancel canceler (fun () -> let* r = Lwt_utils_unix.safe_close socket in Result.iter_error (Format.eprintf "Uncaught error: %a\n%!" pp_print_trace) @@ -66,117 +84,170 @@ module Answer = struct return_unit) ; Lwt_unix.setsockopt socket SO_BROADCAST true ; Lwt_unix.setsockopt socket SO_REUSEADDR true ; - let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, st.discovery_port) in + let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, discovery_port) in let* () = Lwt_unix.bind socket addr in return socket) (fun exn -> let* () = Events.(emit create_socket_error) () in Lwt.fail exn) - let loop st = + let loop worker = let open Lwt_result_syntax in - let* socket = - protect ~canceler:st.canceler (fun () -> - Lwt_result.ok @@ create_socket st) + let st = Worker.state worker in + let buf = Bytes.create Message.length in + let* rd = + protect (fun () -> + let*! content = Lwt_unix.recvfrom st.socket buf 0 Message.length [] in + let*! () = Events.(emit message_received) () in + return content) in - (* Infinite loop, should never exit. *) - let rec aux () = - let buf = Bytes.create Message.length in - let* rd = - protect ~canceler:st.canceler (fun () -> - let*! content = Lwt_unix.recvfrom socket buf 0 Message.length [] in - let*! () = Events.(emit message_received) () in - return content) + match rd with + | len, Lwt_unix.ADDR_INET (remote_addr, _) + when Compare.Int.equal len Message.length -> ( + match Data_encoding.Binary.of_bytes_opt Message.encoding buf with + | Some (key, remote_peer_id, remote_port) + when Compare.String.equal key Message.key + && not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) -> ( + let s_addr = Unix.string_of_inet_addr remote_addr in + match P2p_addr.of_string_opt s_addr with + | None -> + let*! () = Events.(emit parse_error) s_addr in + return_unit + | Some addr -> + let (Pool pool) = st.pool in + let*! () = Events.(emit register_new) (addr, remote_port) in + P2p_pool.register_new_point + ~trusted:st.trust_discovered_peers + pool + (addr, remote_port) + |> ignore ; + return_unit) + | _ -> return_unit) + | _ -> return_unit + + module Handlers = struct + type self = Worker.callback Worker.t + + type launch_error = tztrace + + let on_launch : + self -> + Name.t -> + Types.parameters -> + (Types.state, launch_error) result Lwt.t = + fun self _name {my_peer_id; discovery_port; trust_discovered_peers; pool} -> + let open Lwt_result_syntax in + let* socket = + protect (fun () -> + Lwt_result.ok @@ create_socket discovery_port (Worker.canceler self)) in - match rd with - | len, Lwt_unix.ADDR_INET (remote_addr, _) - when Compare.Int.equal len Message.length -> ( - match Data_encoding.Binary.of_bytes_opt Message.encoding buf with - | Some (key, remote_peer_id, remote_port) - when Compare.String.equal key Message.key - && not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) -> ( - let s_addr = Unix.string_of_inet_addr remote_addr in - match P2p_addr.of_string_opt s_addr with - | None -> - let*! () = Events.(emit parse_error) s_addr in - aux () - | Some addr -> - let (Pool pool) = st.pool in - let*! () = Events.(emit register_new) (addr, remote_port) in - P2p_pool.register_new_point - ~trusted:st.trust_discovered_peers - pool - (addr, remote_port) - |> ignore ; - aux ()) - | _ -> aux ()) - | _ -> aux () - in - aux () + Lwt.return_ok + {my_peer_id; discovery_port; trust_discovered_peers; pool; socket} - let worker_loop st = - let open Lwt_syntax in - let* r = loop st in - match r with - | Error (Canceled :: _) -> return_unit - | Error err -> - let* () = Events.(emit unexpected_error) ("answer", err) in - Error_monad.cancel_with_exceptions st.canceler - | Ok () -> - let* () = Events.(emit unexpected_exit) () in - Error_monad.cancel_with_exceptions st.canceler + let on_request : + type response error. + self -> + (response, error) P2p_workers.Loop_request.t -> + (response, error) result Lwt.t = + fun self Loop -> loop self - let create my_peer_id pool ~trust_discovered_peers ~discovery_port = - { - canceler = Lwt_canceler.create (); - my_peer_id; - discovery_port; - trust_discovered_peers; - pool = Pool pool; - worker = Lwt.return_unit; - } + let on_no_request _self = Lwt.return_unit + + let on_close _self = Lwt.return_unit - let activate st = - st.worker <- - Lwt_utils.worker - "discovery_answer" - ~on_event:Internal_event.Lwt_worker_logger.on_event - ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Error_monad.cancel_with_exceptions st.canceler) + let on_error : + type response error. + self -> + _ -> + (response, error) P2p_workers.Loop_request.t -> + error -> + [`Continue | `Shutdown] tzresult Lwt.t = + fun _self _ Loop error -> + let open Lwt_result_syntax in + match error with + | Canceled :: _ -> return `Shutdown + | err -> + let*! () = Events.(emit unexpected_error) ("answer", err) in + return `Shutdown + + let on_completion _self _request _result _status = Lwt.return_unit + end + + let create my_peer_id pool ~trust_discovered_peers ~discovery_port = + Worker.create + () + {my_peer_id; discovery_port; trust_discovered_peers; pool = Pool pool} + (module Handlers) end (* ************************************************************ *) (* Sender *) module Sender = struct - type t = { - canceler : Lwt_canceler.t; + module Config = struct + type t = {delay : float; loop : int} + + let initial = {delay = 0.1; loop = 0} + + let increase_delay config = {config with delay = 2.0 *. config.delay} + + let max_loop = 10 + end + + type inner_state = { + mutable config : Config.t; my_peer_id : P2p_peer.Id.t; listening_port : int; discovery_port : int; discovery_addr : Ipaddr.V4.t; pool : pool; restart_discovery : unit Lwt_condition.t; - mutable worker : unit Lwt.t; } - module Config = struct - type t = {delay : float; loop : int} + module Name = P2p_workers.Unique_name_maker (struct + let base = ["p2p_discovery"; "sender"] + end) - let initial = {delay = 0.1; loop = 0} + module Types = struct + type state = inner_state - let increase_delay config = {config with delay = 2.0 *. config.delay} + type parameters = { + my_peer_id : P2p_peer.Id.t; + listening_port : int; + discovery_port : int; + discovery_addr : Ipaddr.V4.t; + pool : pool; + } + end - let max_loop = 10 + module Request = struct + type ('response, 'error) t = Loop : (Config.t, tztrace) t + + type view = View : ('response, 'error) t -> view + + let view r = View r + + let encoding = + let open Data_encoding in + conv (fun (View Loop) -> ()) (fun () -> View Loop) unit + + let pp ppf (View Loop) = Format.fprintf ppf "loop" + + let default_callback_value = View Loop end - let broadcast_message st = + module Worker = P2p_workers.Make (Name) (Request) (Types) + + type t = Worker.activated_worker + + let broadcast_message worker = let open Lwt_syntax in + let st = Worker.state worker in let msg = Message.make st.my_peer_id st.listening_port in Lwt.catch (fun () -> let socket = Lwt_unix.(socket PF_INET SOCK_DGRAM 0) in - Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_canceler.on_cancel (Worker.canceler worker) (fun () -> let* r = Lwt_utils_unix.safe_close socket in Result.iter_error (Format.eprintf "Uncaught error: %a\n%!" pp_print_trace) @@ -196,55 +267,96 @@ module Sender = struct return_unit) (fun _exn -> Events.(emit broadcast_error) ()) - let rec worker_loop sender_config st = + let loop worker = + let open Lwt_result_syntax in + let st = Worker.state worker in + let sender_config = st.config in + Lwt_result.bind + (protect (fun () -> Lwt_result.ok @@ broadcast_message worker)) + @@ fun () -> + protect (fun () -> + Lwt.pick + [ + (let*! () = Lwt_condition.wait st.restart_discovery in + return Config.initial); + (let*! () = Lwt_unix.sleep sender_config.Config.delay in + return {sender_config with Config.loop = succ sender_config.loop}); + ]) + + let loop_completion config st = + if config.Config.loop = Config.max_loop then + st.config <- {config with Config.loop = pred config.loop} + else st.config <- Config.increase_delay config ; + Lwt.return_unit + + let loop_error err = let open Lwt_syntax in - let* r = - Lwt_result.bind - (protect ~canceler:st.canceler (fun () -> - Lwt_result.ok @@ broadcast_message st)) - @@ fun () -> - protect ~canceler:st.canceler (fun () -> - Lwt_result.ok - @@ Lwt.pick - [ - (let* () = Lwt_condition.wait st.restart_discovery in - return Config.initial); - (let* () = Lwt_unix.sleep sender_config.Config.delay in - return - {sender_config with Config.loop = succ sender_config.loop}); - ]) - in - match r with - | Ok config when config.Config.loop = Config.max_loop -> - let new_sender_config = {config with Config.loop = pred config.loop} in - worker_loop new_sender_config st - | Ok config -> - let new_sender_config = Config.increase_delay config in - worker_loop new_sender_config st - | Error (Canceled :: _) -> return_unit - | Error err -> - let* () = Events.(emit unexpected_error) ("sender", err) in - Error_monad.cancel_with_exceptions st.canceler + match err with + | Canceled :: _ -> return_unit + | err -> Events.(emit unexpected_error) ("sender", err) - let create my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = - { - canceler = Lwt_canceler.create (); - my_peer_id; - listening_port; - discovery_port; - discovery_addr; - restart_discovery = Lwt_condition.create (); - pool = Pool pool; - worker = Lwt.return_unit; - } + module Handlers = struct + type self = Worker.callback Worker.t + + type launch_error = tztrace + + let on_launch : + self -> + Name.t -> + Types.parameters -> + (Types.state, launch_error) result Lwt.t = + fun _self + _name + {my_peer_id; listening_port; discovery_port; discovery_addr; pool} -> + Lwt.return_ok + { + config = Config.initial; + my_peer_id; + listening_port; + discovery_port; + discovery_addr; + restart_discovery = Lwt_condition.create (); + pool; + } - let activate st = - st.worker <- - Lwt_utils.worker - "discovery_sender" - ~on_event:Internal_event.Lwt_worker_logger.on_event - ~run:(fun () -> worker_loop Config.initial st) - ~cancel:(fun () -> Error_monad.cancel_with_exceptions st.canceler) + let on_request : + type response error. + self -> (response, error) Request.t -> (response, error) result Lwt.t = + fun self Loop -> loop self + + let on_no_request _self = Lwt.return_unit + + let on_close _self = Lwt.return_unit + + let on_error : + type response error. + self -> + _ -> + (response, error) Request.t -> + error -> + [`Continue | `Shutdown] tzresult Lwt.t = + fun _self _ Loop error -> + let open Lwt_result_syntax in + let*! () = loop_error error in + return `Shutdown + + let on_completion : + type resp err. self -> (resp, err) Request.t -> resp -> _ -> unit Lwt.t + = + fun self Loop config _status -> loop_completion config (Worker.state self) + end + + let create my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = + Worker.create + () + { + my_peer_id; + listening_port; + discovery_port; + discovery_addr; + pool = Pool pool; + } + (module Handlers) end (* ********************************************************************** *) @@ -253,10 +365,11 @@ type t = {answer : Answer.t; sender : Sender.t} let create ~listening_port ~discovery_port ~discovery_addr ~trust_discovered_peers pool my_peer_id = - let answer = + let open Lwt_result_syntax in + let* answer = Answer.create my_peer_id pool ~discovery_port ~trust_discovered_peers in - let sender = + let* sender = Sender.create my_peer_id pool @@ -264,17 +377,16 @@ let create ~listening_port ~discovery_port ~discovery_addr ~discovery_port ~discovery_addr in - {answer; sender} + return {answer; sender} let activate {answer; sender} = - Answer.activate answer ; - Sender.activate sender + Answer.Worker.activate answer ; + Sender.Worker.activate sender -let wakeup t = Lwt_condition.signal t.sender.restart_discovery () +let wakeup t = + Lwt_condition.signal + (Sender.Worker.state t.sender.worker_state).restart_discovery + () let shutdown t = - Lwt.join - [ - Error_monad.cancel_with_exceptions t.answer.canceler; - Error_monad.cancel_with_exceptions t.sender.canceler; - ] + Lwt.join [Answer.Worker.shutdown t.answer; Sender.Worker.shutdown t.sender] diff --git a/src/lib_p2p/p2p_discovery.mli b/src/lib_p2p/p2p_discovery.mli index efb5cb0ed6efa64c34fdf1fc236c605727d42da3..04e5e1950e2a4eba99803731f8294d6d7b1bd96e 100644 --- a/src/lib_p2p/p2p_discovery.mli +++ b/src/lib_p2p/p2p_discovery.mli @@ -49,7 +49,7 @@ val create : trust_discovered_peers:bool -> ('a, 'b, 'c) P2p_pool.t -> P2p_peer.Table.key -> - t + t tzresult Lwt.t val activate : t -> unit