From 8a43d56c86fd27f61a23cd549f3b803166dfa6ca Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Tue, 11 Apr 2023 07:28:52 +0200 Subject: [PATCH] DAL/GS/Worker: implement worker shutdown --- src/lib_gossipsub/gossipsub_intf.ml | 4 +- src/lib_gossipsub/gossipsub_worker.ml | 89 ++++++++++++++++++-------- src/lib_gossipsub/gossipsub_worker.mli | 2 +- 3 files changed, 68 insertions(+), 27 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 79d8a3411d3c..6a604b089e86 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -636,6 +636,8 @@ module type WORKER = sig (** The state of a gossipsub worker. *) type t + type 'a monad + (** The Gossipsub automaton of the worker. *) module GS : AUTOMATON @@ -698,7 +700,7 @@ module type WORKER = sig val start : heartbeat_span:GS.Span.t -> GS.Topic.t list -> t -> t (** [shutdown state] allows stopping the worker whose [state] is given. *) - val shutdown : t -> unit + val shutdown : t -> unit monad (** [app_input state app_input] adds the given application input [app_input] to the worker's input stream. *) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index c235bc47a1b4..754a5d1f00ba 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -29,8 +29,12 @@ Add coverage unit tests *) module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : - Gossipsub_intf.WORKER with module GS = C.GS = struct + Gossipsub_intf.WORKER with module GS = C.GS and type 'a monad = 'a C.Monad.t = +struct open C + + type 'a monad = 'a Monad.t + module GS = GS module View = GS.Introspection module Topic = GS.Topic @@ -38,14 +42,22 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : module Message_id = GS.Message_id module Message = GS.Message + type 'a cancellation_handle = { + schedule_cancellation : unit -> 'a Monad.t; + (** A handle for a cancellable looping monad. When + [schedule_cancellation ()] is called, the internal monad is + returned. Details on how the returned monad resovles or is cancelled + depends on the loop's implementation. *) + } + (** A worker has one of the following statuses: - [Starting] in case it is initialized with {!make} but not started yet. - [Running] in case the function [start] has been called. *) type worker_status = | Starting | Running of { - heartbeat_handle : unit Monad.t; - event_loop_handle : unit Monad.t; + heartbeat_handle : unit cancellation_handle; + event_loop_handle : unit cancellation_handle; } type full_message = { @@ -364,33 +376,51 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let p2p_input t input = push (P2P_input input) t - (** This function returns a never-ending loop that periodically pushes - [Heartbeat] events in the stream. *) + (** This function returns a {!cancellation_handle} for a looping monad + that pushes [Heartbeat] events in the [t.events_stream] every + [heartbeat_span]. + + When the loop is canceled, the returned monad may take up to + [heartbeat_span] to resolve. *) let heartbeat_events_producer ~heartbeat_span t = + let open Monad in + let shutdown = ref false in let stream = t.events_stream in let rec loop () = - let open Monad in let* () = Monad.sleep heartbeat_span in Stream.push Heartbeat stream ; - loop () + if !shutdown then return () else loop () in - loop () + let promise = loop () in + let schedule_cancellation () = + shutdown := true ; + promise + in + {schedule_cancellation} + + (** This function returns a {!cancellation_handle} for a looping monad + that consumes pushed events in [t.events_stream], if any. - (** This function returns a never-ending loop that processes the events of the - worker's stream. *) + When the loop is canceled, the returned monad will need an additional + extra event to consume in order to resolve. *) let event_loop t = + let open Monad in + let shutdown = ref false in let rev_push stream e = Stream.push e stream in let emit_p2p_msg = rev_push t.p2p_output_stream in let emit_app_msg = rev_push t.app_output_stream in - let rec loop t = - let open Monad in - let* event = Stream.pop t.events_stream in - let gossip_state = - apply_event ~emit_p2p_msg ~emit_app_msg t.gossip_state event - in - loop {t with gossip_state} + let events_stream = t.events_stream in + let rec loop gossip_state = + let* event = Stream.pop events_stream in + if !shutdown then return () + else loop @@ apply_event ~emit_p2p_msg ~emit_app_msg gossip_state event + in + let promise = loop t.gossip_state in + let schedule_cancellation () = + shutdown := true ; + promise in - loop t + {schedule_cancellation} let start ~heartbeat_span topics t = match t.status with @@ -408,15 +438,24 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Format.eprintf "A worker is already running for this state!@." ; assert false + (** Shutting down may require waiting up [heartbeat_span] to stop the + heartbeat event loop. *) let shutdown state = + let open C.Monad in match state.status with - | Starting -> () - | Running _ -> - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5171 - - Implement worker shutdown. - Should we unsubscribe from the callbacks called in start? *) - () + | Starting -> return () + | Running {heartbeat_handle; event_loop_handle; _} -> + (* First, we schedule event_loop cancellation without waiting for the + promise to resolve (i.e. for an input in the stream to be read). *) + let event_loop_promise = event_loop_handle.schedule_cancellation () in + (* Then, we schedule heartbeat cancellation, which will push a last + [Heartbeat] event in the stream before its returned promise is + resolved. *) + let* () = heartbeat_handle.schedule_cancellation () in + (* Now, we are sure that an event has been pushed to the event stream + after [event_loop] cancellation. So, [event_loop_promise] can be + resolved. *) + event_loop_promise let make rng limits parameters = { diff --git a/src/lib_gossipsub/gossipsub_worker.mli b/src/lib_gossipsub/gossipsub_worker.mli index 286cda55d5fc..65cd99882606 100644 --- a/src/lib_gossipsub/gossipsub_worker.mli +++ b/src/lib_gossipsub/gossipsub_worker.mli @@ -25,4 +25,4 @@ (*****************************************************************************) module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : - Gossipsub_intf.WORKER with module GS = C.GS + Gossipsub_intf.WORKER with module GS = C.GS and type 'a monad = 'a C.Monad.t -- GitLab