From 180a58e16f3eb20ac8b6a7e0c4458ed4d1304470 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Mon, 3 Mar 2025 16:32:36 +0100 Subject: [PATCH 1/4] P2P: p2p_discovery.Answer worker --- src/lib_p2p/p2p.ml | 23 +++--- src/lib_p2p/p2p_discovery.ml | 127 +++++++++++++++++++++++----------- src/lib_p2p/p2p_discovery.mli | 2 +- 3 files changed, 101 insertions(+), 51 deletions(-) diff --git a/src/lib_p2p/p2p.ml b/src/lib_p2p/p2p.ml index dc335c99efd0..ab79648200e6 100644 --- a/src/lib_p2p/p2p.ml +++ b/src/lib_p2p/p2p.ml @@ -105,19 +105,22 @@ let create_connect_handler config limits pool msg_cfg conn_meta_cfg io_sched ~answerer let may_create_discovery_worker _limits config pool = + let open Lwt_result_syntax in match (config.listening_port, config.discovery_port, config.discovery_addr) with | Some listening_port, Some discovery_port, Some discovery_addr -> - Some - (P2p_discovery.create - pool - config.identity.peer_id - ~listening_port - ~discovery_port - ~discovery_addr - ~trust_discovered_peers:config.trust_discovered_peers) - | _, _, _ -> None + let* discovery = + P2p_discovery.create + pool + config.identity.peer_id + ~listening_port + ~discovery_port + ~discovery_addr + ~trust_discovered_peers:config.trust_discovered_peers + in + return_some discovery + | _, _, _ -> return_none let create_maintenance_worker limits connect_handler config triggers log = let open P2p_limits in @@ -142,7 +145,7 @@ let create_maintenance_worker limits connect_handler config triggers log = } in let pool = P2p_connect_handler.get_pool connect_handler in - let discovery = may_create_discovery_worker limits config pool in + let* discovery = may_create_discovery_worker limits config pool in let* p2p_maintenance = P2p_maintenance.create ?discovery diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml index 4fd7a6a23ac7..e27d76bb458e 100644 --- a/src/lib_p2p/p2p_discovery.ml +++ b/src/lib_p2p/p2p_discovery.ml @@ -43,22 +43,40 @@ module Message = struct end module Answer = struct - type t = { + type inner_state = { my_peer_id : P2p_peer.Id.t; pool : pool; discovery_port : int; - canceler : Lwt_canceler.t; trust_discovered_peers : bool; - mutable worker : unit Lwt.t; } - let create_socket st = + module Name = P2p_workers.Unique_name_maker (struct + let base = ["p2p_discovery"; "answer"] + end) + + module Types = struct + type state = inner_state + + type parameters = { + my_peer_id : P2p_peer.Id.t; + pool : pool; + discovery_port : int; + trust_discovered_peers : bool; + } + end + + module Worker = P2p_workers.Make (Name) (P2p_workers.Loop_request) (Types) + + type t = Worker.activated_worker + + let create_socket worker = let open Lwt_syntax in + let st = Worker.state worker in Lwt.catch (fun () -> let socket = Lwt_unix.socket PF_INET SOCK_DGRAM 0 in Lwt_unix.set_close_on_exec socket ; - Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_canceler.on_cancel (Worker.canceler worker) (fun () -> let* r = Lwt_utils_unix.safe_close socket in Result.iter_error (Format.eprintf "Uncaught error: %a\n%!" pp_print_trace) @@ -73,17 +91,18 @@ module Answer = struct let* () = Events.(emit create_socket_error) () in Lwt.fail exn) - let loop st = + let loop worker = let open Lwt_result_syntax in + let st = Worker.state worker in let* socket = - protect ~canceler:st.canceler (fun () -> - Lwt_result.ok @@ create_socket st) + protect ~canceler:(Worker.canceler worker) (fun () -> + Lwt_result.ok @@ create_socket worker) in (* Infinite loop, should never exit. *) let rec aux () = let buf = Bytes.create Message.length in let* rd = - protect ~canceler:st.canceler (fun () -> + protect ~canceler:(Worker.canceler worker) (fun () -> let*! content = Lwt_unix.recvfrom socket buf 0 Message.length [] in let*! () = Events.(emit message_received) () in return content) @@ -114,35 +133,62 @@ module Answer = struct in aux () - let worker_loop st = - let open Lwt_syntax in - let* r = loop st in - match r with - | Error (Canceled :: _) -> return_unit - | Error err -> - let* () = Events.(emit unexpected_error) ("answer", err) in - Error_monad.cancel_with_exceptions st.canceler - | Ok () -> - let* () = Events.(emit unexpected_exit) () in - Error_monad.cancel_with_exceptions st.canceler + module Handlers = struct + type self = Worker.callback Worker.t - let create my_peer_id pool ~trust_discovered_peers ~discovery_port = - { - canceler = Lwt_canceler.create (); - my_peer_id; - discovery_port; - trust_discovered_peers; - pool = Pool pool; - worker = Lwt.return_unit; - } + type launch_error = tztrace - let activate st = - st.worker <- - Lwt_utils.worker - "discovery_answer" - ~on_event:Internal_event.Lwt_worker_logger.on_event - ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Error_monad.cancel_with_exceptions st.canceler) + let on_launch : + self -> + Name.t -> + Types.parameters -> + (Types.state, launch_error) result Lwt.t = + fun _self _name {my_peer_id; discovery_port; trust_discovered_peers; pool} -> + Lwt.return_ok {my_peer_id; discovery_port; trust_discovered_peers; pool} + + let on_request : + type response error. + self -> + (response, error) P2p_workers.Loop_request.t -> + (response, error) result Lwt.t = + fun self Loop -> loop self + + let on_no_request _self = Lwt.return_unit + + let on_close _self = Lwt.return_unit + + let on_error : + type response error. + self -> + _ -> + (response, error) P2p_workers.Loop_request.t -> + error -> + [`Continue | `Shutdown] tzresult Lwt.t = + fun _self _ Loop error -> + let open Lwt_result_syntax in + match error with + | Canceled :: _ -> return `Shutdown + | err -> + let*! () = Events.(emit unexpected_error) ("answer", err) in + return `Shutdown + + (* Considering the request loop should never exit, any completion that is + not an error should be considered as an unexpected exit. + + Note that this will be removed by the next commit as [loop] will no + longer be an infinite loop, to let the worker handle it. *) + let on_completion self _request _result _status = + let open Lwt_syntax in + let* () = Events.(emit unexpected_exit) () in + Worker.trigger_shutdown self ; + return_unit + end + + let create my_peer_id pool ~trust_discovered_peers ~discovery_port = + Worker.create + () + {my_peer_id; discovery_port; trust_discovered_peers; pool = Pool pool} + (module Handlers) end (* ************************************************************ *) @@ -253,7 +299,8 @@ type t = {answer : Answer.t; sender : Sender.t} let create ~listening_port ~discovery_port ~discovery_addr ~trust_discovered_peers pool my_peer_id = - let answer = + let open Lwt_result_syntax in + let* answer = Answer.create my_peer_id pool ~discovery_port ~trust_discovered_peers in let sender = @@ -264,10 +311,10 @@ let create ~listening_port ~discovery_port ~discovery_addr ~discovery_port ~discovery_addr in - {answer; sender} + return {answer; sender} let activate {answer; sender} = - Answer.activate answer ; + Answer.Worker.activate answer ; Sender.activate sender let wakeup t = Lwt_condition.signal t.sender.restart_discovery () @@ -275,6 +322,6 @@ let wakeup t = Lwt_condition.signal t.sender.restart_discovery () let shutdown t = Lwt.join [ - Error_monad.cancel_with_exceptions t.answer.canceler; + Answer.Worker.shutdown t.answer; Error_monad.cancel_with_exceptions t.sender.canceler; ] diff --git a/src/lib_p2p/p2p_discovery.mli b/src/lib_p2p/p2p_discovery.mli index efb5cb0ed6ef..04e5e1950e2a 100644 --- a/src/lib_p2p/p2p_discovery.mli +++ b/src/lib_p2p/p2p_discovery.mli @@ -49,7 +49,7 @@ val create : trust_discovered_peers:bool -> ('a, 'b, 'c) P2p_pool.t -> P2p_peer.Table.key -> - t + t tzresult Lwt.t val activate : t -> unit -- GitLab From 016194f5dad9cf84847f300fbc134c8a111a229d Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Wed, 5 Mar 2025 17:08:23 +0100 Subject: [PATCH 2/4] P2P: P2p_discovery.Answer refactor loop Also remove the cancelers that are not useful anymore --- src/lib_p2p/p2p_discovery.ml | 95 ++++++++++++++++-------------------- 1 file changed, 42 insertions(+), 53 deletions(-) diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml index e27d76bb458e..8077e06772f5 100644 --- a/src/lib_p2p/p2p_discovery.ml +++ b/src/lib_p2p/p2p_discovery.ml @@ -48,6 +48,7 @@ module Answer = struct pool : pool; discovery_port : int; trust_discovered_peers : bool; + socket : Lwt_unix.file_descr; } module Name = P2p_workers.Unique_name_maker (struct @@ -69,14 +70,13 @@ module Answer = struct type t = Worker.activated_worker - let create_socket worker = + let create_socket discovery_port canceler = let open Lwt_syntax in - let st = Worker.state worker in Lwt.catch (fun () -> let socket = Lwt_unix.socket PF_INET SOCK_DGRAM 0 in Lwt_unix.set_close_on_exec socket ; - Lwt_canceler.on_cancel (Worker.canceler worker) (fun () -> + Lwt_canceler.on_cancel canceler (fun () -> let* r = Lwt_utils_unix.safe_close socket in Result.iter_error (Format.eprintf "Uncaught error: %a\n%!" pp_print_trace) @@ -84,7 +84,7 @@ module Answer = struct return_unit) ; Lwt_unix.setsockopt socket SO_BROADCAST true ; Lwt_unix.setsockopt socket SO_REUSEADDR true ; - let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, st.discovery_port) in + let addr = Lwt_unix.ADDR_INET (Unix.inet_addr_any, discovery_port) in let* () = Lwt_unix.bind socket addr in return socket) (fun exn -> @@ -94,44 +94,36 @@ module Answer = struct let loop worker = let open Lwt_result_syntax in let st = Worker.state worker in - let* socket = - protect ~canceler:(Worker.canceler worker) (fun () -> - Lwt_result.ok @@ create_socket worker) - in - (* Infinite loop, should never exit. *) - let rec aux () = - let buf = Bytes.create Message.length in - let* rd = - protect ~canceler:(Worker.canceler worker) (fun () -> - let*! content = Lwt_unix.recvfrom socket buf 0 Message.length [] in - let*! () = Events.(emit message_received) () in - return content) - in - match rd with - | len, Lwt_unix.ADDR_INET (remote_addr, _) - when Compare.Int.equal len Message.length -> ( - match Data_encoding.Binary.of_bytes_opt Message.encoding buf with - | Some (key, remote_peer_id, remote_port) - when Compare.String.equal key Message.key - && not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) -> ( - let s_addr = Unix.string_of_inet_addr remote_addr in - match P2p_addr.of_string_opt s_addr with - | None -> - let*! () = Events.(emit parse_error) s_addr in - aux () - | Some addr -> - let (Pool pool) = st.pool in - let*! () = Events.(emit register_new) (addr, remote_port) in - P2p_pool.register_new_point - ~trusted:st.trust_discovered_peers - pool - (addr, remote_port) - |> ignore ; - aux ()) - | _ -> aux ()) - | _ -> aux () + let buf = Bytes.create Message.length in + let* rd = + protect (fun () -> + let*! content = Lwt_unix.recvfrom st.socket buf 0 Message.length [] in + let*! () = Events.(emit message_received) () in + return content) in - aux () + match rd with + | len, Lwt_unix.ADDR_INET (remote_addr, _) + when Compare.Int.equal len Message.length -> ( + match Data_encoding.Binary.of_bytes_opt Message.encoding buf with + | Some (key, remote_peer_id, remote_port) + when Compare.String.equal key Message.key + && not (P2p_peer.Id.equal remote_peer_id st.my_peer_id) -> ( + let s_addr = Unix.string_of_inet_addr remote_addr in + match P2p_addr.of_string_opt s_addr with + | None -> + let*! () = Events.(emit parse_error) s_addr in + return_unit + | Some addr -> + let (Pool pool) = st.pool in + let*! () = Events.(emit register_new) (addr, remote_port) in + P2p_pool.register_new_point + ~trusted:st.trust_discovered_peers + pool + (addr, remote_port) + |> ignore ; + return_unit) + | _ -> return_unit) + | _ -> return_unit module Handlers = struct type self = Worker.callback Worker.t @@ -143,8 +135,14 @@ module Answer = struct Name.t -> Types.parameters -> (Types.state, launch_error) result Lwt.t = - fun _self _name {my_peer_id; discovery_port; trust_discovered_peers; pool} -> - Lwt.return_ok {my_peer_id; discovery_port; trust_discovered_peers; pool} + fun self _name {my_peer_id; discovery_port; trust_discovered_peers; pool} -> + let open Lwt_result_syntax in + let* socket = + protect (fun () -> + Lwt_result.ok @@ create_socket discovery_port (Worker.canceler self)) + in + Lwt.return_ok + {my_peer_id; discovery_port; trust_discovered_peers; pool; socket} let on_request : type response error. @@ -172,16 +170,7 @@ module Answer = struct let*! () = Events.(emit unexpected_error) ("answer", err) in return `Shutdown - (* Considering the request loop should never exit, any completion that is - not an error should be considered as an unexpected exit. - - Note that this will be removed by the next commit as [loop] will no - longer be an infinite loop, to let the worker handle it. *) - let on_completion self _request _result _status = - let open Lwt_syntax in - let* () = Events.(emit unexpected_exit) () in - Worker.trigger_shutdown self ; - return_unit + let on_completion _self _request _result _status = Lwt.return_unit end let create my_peer_id pool ~trust_discovered_peers ~discovery_port = -- GitLab From 604e1d4f398018645a0d3c5dc14ab56a51e181cb Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 4 Mar 2025 15:28:31 +0100 Subject: [PATCH 3/4] P2P: prepare P2p_discovery.Sender for workerification --- src/lib_p2p/p2p_discovery.ml | 53 +++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 22 deletions(-) diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml index 8077e06772f5..7b67f4f47a1b 100644 --- a/src/lib_p2p/p2p_discovery.ml +++ b/src/lib_p2p/p2p_discovery.ml @@ -184,17 +184,6 @@ end (* Sender *) module Sender = struct - type t = { - canceler : Lwt_canceler.t; - my_peer_id : P2p_peer.Id.t; - listening_port : int; - discovery_port : int; - discovery_addr : Ipaddr.V4.t; - pool : pool; - restart_discovery : unit Lwt_condition.t; - mutable worker : unit Lwt.t; - } - module Config = struct type t = {delay : float; loop : int} @@ -205,6 +194,18 @@ module Sender = struct let max_loop = 10 end + type inner_state = { + mutable config : Config.t; + canceler : Lwt_canceler.t; + my_peer_id : P2p_peer.Id.t; + listening_port : int; + discovery_port : int; + discovery_addr : Ipaddr.V4.t; + pool : pool; + restart_discovery : unit Lwt_condition.t; + mutable worker : unit Lwt.t; + } + let broadcast_message st = let open Lwt_syntax in let msg = Message.make st.my_peer_id st.listening_port in @@ -231,8 +232,9 @@ module Sender = struct return_unit) (fun _exn -> Events.(emit broadcast_error) ()) - let rec worker_loop sender_config st = + let rec worker_loop st = let open Lwt_syntax in + let sender_config = st.config in let* r = Lwt_result.bind (protect ~canceler:st.canceler (fun () -> @@ -250,19 +252,26 @@ module Sender = struct ]) in match r with - | Ok config when config.Config.loop = Config.max_loop -> - let new_sender_config = {config with Config.loop = pred config.loop} in - worker_loop new_sender_config st - | Ok config -> - let new_sender_config = Config.increase_delay config in - worker_loop new_sender_config st - | Error (Canceled :: _) -> return_unit - | Error err -> + | Ok config -> worker_loop_completion config st + | Error trace -> worker_loop_error trace st + + and worker_loop_completion config st = + if config.Config.loop = Config.max_loop then + st.config <- {config with Config.loop = pred config.loop} + else st.config <- Config.increase_delay config ; + worker_loop st + + and worker_loop_error err st = + let open Lwt_syntax in + match err with + | Canceled :: _ -> return_unit + | err -> let* () = Events.(emit unexpected_error) ("sender", err) in Error_monad.cancel_with_exceptions st.canceler let create my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = { + config = Config.initial; canceler = Lwt_canceler.create (); my_peer_id; listening_port; @@ -278,13 +287,13 @@ module Sender = struct Lwt_utils.worker "discovery_sender" ~on_event:Internal_event.Lwt_worker_logger.on_event - ~run:(fun () -> worker_loop Config.initial st) + ~run:(fun () -> worker_loop st) ~cancel:(fun () -> Error_monad.cancel_with_exceptions st.canceler) end (* ********************************************************************** *) -type t = {answer : Answer.t; sender : Sender.t} +type t = {answer : Answer.t; sender : Sender.inner_state} let create ~listening_port ~discovery_port ~discovery_addr ~trust_discovered_peers pool my_peer_id = -- GitLab From b4a5727538167d89bd472cb6be1e344543ddfc90 Mon Sep 17 00:00:00 2001 From: Pierrick Couderc Date: Tue, 4 Mar 2025 16:14:08 +0100 Subject: [PATCH 4/4] P2P: P2p_discovery.Sender is a worker now --- src/lib_p2p/p2p_discovery.ml | 187 ++++++++++++++++++++++++----------- 1 file changed, 127 insertions(+), 60 deletions(-) diff --git a/src/lib_p2p/p2p_discovery.ml b/src/lib_p2p/p2p_discovery.ml index 7b67f4f47a1b..7c73e87bc4c5 100644 --- a/src/lib_p2p/p2p_discovery.ml +++ b/src/lib_p2p/p2p_discovery.ml @@ -196,23 +196,58 @@ module Sender = struct type inner_state = { mutable config : Config.t; - canceler : Lwt_canceler.t; my_peer_id : P2p_peer.Id.t; listening_port : int; discovery_port : int; discovery_addr : Ipaddr.V4.t; pool : pool; restart_discovery : unit Lwt_condition.t; - mutable worker : unit Lwt.t; } - let broadcast_message st = + module Name = P2p_workers.Unique_name_maker (struct + let base = ["p2p_discovery"; "sender"] + end) + + module Types = struct + type state = inner_state + + type parameters = { + my_peer_id : P2p_peer.Id.t; + listening_port : int; + discovery_port : int; + discovery_addr : Ipaddr.V4.t; + pool : pool; + } + end + + module Request = struct + type ('response, 'error) t = Loop : (Config.t, tztrace) t + + type view = View : ('response, 'error) t -> view + + let view r = View r + + let encoding = + let open Data_encoding in + conv (fun (View Loop) -> ()) (fun () -> View Loop) unit + + let pp ppf (View Loop) = Format.fprintf ppf "loop" + + let default_callback_value = View Loop + end + + module Worker = P2p_workers.Make (Name) (Request) (Types) + + type t = Worker.activated_worker + + let broadcast_message worker = let open Lwt_syntax in + let st = Worker.state worker in let msg = Message.make st.my_peer_id st.listening_port in Lwt.catch (fun () -> let socket = Lwt_unix.(socket PF_INET SOCK_DGRAM 0) in - Lwt_canceler.on_cancel st.canceler (fun () -> + Lwt_canceler.on_cancel (Worker.canceler worker) (fun () -> let* r = Lwt_utils_unix.safe_close socket in Result.iter_error (Format.eprintf "Uncaught error: %a\n%!" pp_print_trace) @@ -232,68 +267,101 @@ module Sender = struct return_unit) (fun _exn -> Events.(emit broadcast_error) ()) - let rec worker_loop st = - let open Lwt_syntax in + let loop worker = + let open Lwt_result_syntax in + let st = Worker.state worker in let sender_config = st.config in - let* r = - Lwt_result.bind - (protect ~canceler:st.canceler (fun () -> - Lwt_result.ok @@ broadcast_message st)) - @@ fun () -> - protect ~canceler:st.canceler (fun () -> - Lwt_result.ok - @@ Lwt.pick - [ - (let* () = Lwt_condition.wait st.restart_discovery in - return Config.initial); - (let* () = Lwt_unix.sleep sender_config.Config.delay in - return - {sender_config with Config.loop = succ sender_config.loop}); - ]) - in - match r with - | Ok config -> worker_loop_completion config st - | Error trace -> worker_loop_error trace st - - and worker_loop_completion config st = + Lwt_result.bind + (protect (fun () -> Lwt_result.ok @@ broadcast_message worker)) + @@ fun () -> + protect (fun () -> + Lwt.pick + [ + (let*! () = Lwt_condition.wait st.restart_discovery in + return Config.initial); + (let*! () = Lwt_unix.sleep sender_config.Config.delay in + return {sender_config with Config.loop = succ sender_config.loop}); + ]) + + let loop_completion config st = if config.Config.loop = Config.max_loop then st.config <- {config with Config.loop = pred config.loop} else st.config <- Config.increase_delay config ; - worker_loop st + Lwt.return_unit - and worker_loop_error err st = + let loop_error err = let open Lwt_syntax in match err with | Canceled :: _ -> return_unit - | err -> - let* () = Events.(emit unexpected_error) ("sender", err) in - Error_monad.cancel_with_exceptions st.canceler + | err -> Events.(emit unexpected_error) ("sender", err) - let create my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = - { - config = Config.initial; - canceler = Lwt_canceler.create (); - my_peer_id; - listening_port; - discovery_port; - discovery_addr; - restart_discovery = Lwt_condition.create (); - pool = Pool pool; - worker = Lwt.return_unit; - } + module Handlers = struct + type self = Worker.callback Worker.t + + type launch_error = tztrace - let activate st = - st.worker <- - Lwt_utils.worker - "discovery_sender" - ~on_event:Internal_event.Lwt_worker_logger.on_event - ~run:(fun () -> worker_loop st) - ~cancel:(fun () -> Error_monad.cancel_with_exceptions st.canceler) + let on_launch : + self -> + Name.t -> + Types.parameters -> + (Types.state, launch_error) result Lwt.t = + fun _self + _name + {my_peer_id; listening_port; discovery_port; discovery_addr; pool} -> + Lwt.return_ok + { + config = Config.initial; + my_peer_id; + listening_port; + discovery_port; + discovery_addr; + restart_discovery = Lwt_condition.create (); + pool; + } + + let on_request : + type response error. + self -> (response, error) Request.t -> (response, error) result Lwt.t = + fun self Loop -> loop self + + let on_no_request _self = Lwt.return_unit + + let on_close _self = Lwt.return_unit + + let on_error : + type response error. + self -> + _ -> + (response, error) Request.t -> + error -> + [`Continue | `Shutdown] tzresult Lwt.t = + fun _self _ Loop error -> + let open Lwt_result_syntax in + let*! () = loop_error error in + return `Shutdown + + let on_completion : + type resp err. self -> (resp, err) Request.t -> resp -> _ -> unit Lwt.t + = + fun self Loop config _status -> loop_completion config (Worker.state self) + end + + let create my_peer_id pool ~listening_port ~discovery_port ~discovery_addr = + Worker.create + () + { + my_peer_id; + listening_port; + discovery_port; + discovery_addr; + pool = Pool pool; + } + (module Handlers) end (* ********************************************************************** *) -type t = {answer : Answer.t; sender : Sender.inner_state} +type t = {answer : Answer.t; sender : Sender.t} let create ~listening_port ~discovery_port ~discovery_addr ~trust_discovered_peers pool my_peer_id = @@ -301,7 +369,7 @@ let create ~listening_port ~discovery_port ~discovery_addr let* answer = Answer.create my_peer_id pool ~discovery_port ~trust_discovered_peers in - let sender = + let* sender = Sender.create my_peer_id pool @@ -313,13 +381,12 @@ let create ~listening_port ~discovery_port ~discovery_addr let activate {answer; sender} = Answer.Worker.activate answer ; - Sender.activate sender + Sender.Worker.activate sender -let wakeup t = Lwt_condition.signal t.sender.restart_discovery () +let wakeup t = + Lwt_condition.signal + (Sender.Worker.state t.sender.worker_state).restart_discovery + () let shutdown t = - Lwt.join - [ - Answer.Worker.shutdown t.answer; - Error_monad.cancel_with_exceptions t.sender.canceler; - ] + Lwt.join [Answer.Worker.shutdown t.answer; Sender.Worker.shutdown t.sender] -- GitLab