From 10f22f295b6d1dfdf52b2fa49c31d0e3d4e33f17 Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Mon, 17 Apr 2023 13:12:43 +0200 Subject: [PATCH 1/2] DAL/GS/Worker: use GS's heartbeat_interval instead of another given one --- src/lib_gossipsub/gossipsub_intf.ml | 8 +++----- src/lib_gossipsub/gossipsub_worker.ml | 5 ++++- src/lib_gossipsub/tezos_gossipsub.ml | 3 +++ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 2a492eaf7b38..6013b88ea66f 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -696,11 +696,9 @@ module type WORKER = sig (GS.Peer.t, GS.Message_id.t) parameters -> t - (** [start ~heartbeat_span topics state] runs the (not already started) worker - whose [state] is given. The worker is started with the given - [heartbeat_span] and the initial list of [topics] the caller is interested - in. *) - val start : heartbeat_span:GS.Span.t -> GS.Topic.t list -> t -> t + (** [start topics state] runs the (not already started) worker whose [state] + is given together with the initial list of [topics] the caller is interested in. *) + val start : GS.Topic.t list -> t -> t (** [shutdown state] allows stopping the worker whose [state] is given. *) val shutdown : t -> unit Monad.t diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 91bc55f4099a..4169ac83a7d7 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -473,9 +473,12 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : in {schedule_cancellation} - let start ~heartbeat_span topics t = + let start topics t = match t.status with | Starting -> + let heartbeat_span = + View.((view t.gossip_state).limits.heartbeat_interval) + in let heartbeat_handle = heartbeat_events_producer ~heartbeat_span t in let event_loop_handle = event_loop t in let status = Running {heartbeat_handle; event_loop_handle} in diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index dc93da49d5df..d503d6bea971 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -1082,6 +1082,9 @@ module Make (C : AUTOMATON_CONFIG) : let open Monad.Syntax in let*! heartbeat_ticks in let*! heartbeat_interval in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5455 + + Move the heartbeat interval/span outside the automaton to the worker. *) let*! backoff_cleanup_ticks in (* NOTE: Probably the cleanup can also be done lazily: at use, if a backoff time is expired, then remove it *) -- GitLab From 90a6a40adf0280e4baf79fc0539359a97f676ebf Mon Sep 17 00:00:00 2001 From: "iguerNL@Functori" Date: Thu, 13 Apr 2023 16:13:16 +0200 Subject: [PATCH 2/2] DAL/GS: instantiate the worker functor --- .../test/test_gossipsub_shared.ml | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index efb09bed3ccd..8f2adc4e051b 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -193,3 +193,46 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = gossip_factor history_length history_gossip_length + +(** Instantiate the worker functor *) +module Worker_config = struct + module GS = GS + + module Monad = struct + type 'a t = 'a Lwt.t + + let ( let* ) = Lwt.bind + + let return = Lwt.return + + let sleep i = Lwt_unix.sleep @@ float_of_int i + end + + module Stream = struct + type 'a pending = {promise : 'a Lwt.t; resolver : 'a Lwt.u} + + type 'a t = {elements : 'a Queue.t; mutable pending : 'a pending option} + + let empty () = {elements = Queue.create (); pending = None} + + let push e t = + match t.pending with + | None -> Queue.push e t.elements + | Some {resolver; _} -> + t.pending <- None ; + Lwt.wakeup resolver e + + let pop t = + if not @@ Queue.is_empty t.elements then + Monad.return @@ Queue.pop t.elements + else + match t.pending with + | Some {promise; _} -> promise + | None -> + let promise, resolver = Lwt.task () in + t.pending <- Some {promise; resolver} ; + promise + end +end + +module Worker = Tezos_gossipsub.Worker (Worker_config) -- GitLab