diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 2a492eaf7b38eeac3bedbaef759016f5cf7b05b7..6013b88ea66f2e1ce9564d7286a0292cfaaf9fc5 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -696,11 +696,9 @@ module type WORKER = sig (GS.Peer.t, GS.Message_id.t) parameters -> t - (** [start ~heartbeat_span topics state] runs the (not already started) worker - whose [state] is given. The worker is started with the given - [heartbeat_span] and the initial list of [topics] the caller is interested - in. *) - val start : heartbeat_span:GS.Span.t -> GS.Topic.t list -> t -> t + (** [start topics state] runs the (not already started) worker whose [state] + is given together with the initial list of [topics] the caller is interested in. *) + val start : GS.Topic.t list -> t -> t (** [shutdown state] allows stopping the worker whose [state] is given. *) val shutdown : t -> unit Monad.t diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 91bc55f4099a19842bf684dbb6bdda58acd24fd8..4169ac83a7d73e8d3f44442475eddfbf728f9f3c 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -473,9 +473,12 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : in {schedule_cancellation} - let start ~heartbeat_span topics t = + let start topics t = match t.status with | Starting -> + let heartbeat_span = + View.((view t.gossip_state).limits.heartbeat_interval) + in let heartbeat_handle = heartbeat_events_producer ~heartbeat_span t in let event_loop_handle = event_loop t in let status = Running {heartbeat_handle; event_loop_handle} in diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index efb09bed3ccd5dec0f0045d8f3355cfcb3abbd2c..8f2adc4e051bb6e28e19fa31aa25817a297444d2 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -193,3 +193,46 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = gossip_factor history_length history_gossip_length + +(** Instantiate the worker functor *) +module Worker_config = struct + module GS = GS + + module Monad = struct + type 'a t = 'a Lwt.t + + let ( let* ) = Lwt.bind + + let return = Lwt.return + + let sleep i = Lwt_unix.sleep @@ float_of_int i + end + + module Stream = struct + type 'a pending = {promise : 'a Lwt.t; resolver : 'a Lwt.u} + + type 'a t = {elements : 'a Queue.t; mutable pending : 'a pending option} + + let empty () = {elements = Queue.create (); pending = None} + + let push e t = + match t.pending with + | None -> Queue.push e t.elements + | Some {resolver; _} -> + t.pending <- None ; + Lwt.wakeup resolver e + + let pop t = + if not @@ Queue.is_empty t.elements then + Monad.return @@ Queue.pop t.elements + else + match t.pending with + | Some {promise; _} -> promise + | None -> + let promise, resolver = Lwt.task () in + t.pending <- Some {promise; resolver} ; + promise + end +end + +module Worker = Tezos_gossipsub.Worker (Worker_config) diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index dc93da49d5df1d7ead582f4f41de6ef6d89b1ab8..d503d6bea9712432e2fdeae540434d1da21c69b4 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -1082,6 +1082,9 @@ module Make (C : AUTOMATON_CONFIG) : let open Monad.Syntax in let*! heartbeat_ticks in let*! heartbeat_interval in + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5455 + + Move the heartbeat interval/span outside the automaton to the worker. *) let*! backoff_cleanup_ticks in (* NOTE: Probably the cleanup can also be done lazily: at use, if a backoff time is expired, then remove it *)