From 4fe4659f0c5a1291b87d0a7c37354bdeb92e4eb6 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Fri, 24 Mar 2023 13:52:16 +0100 Subject: [PATCH 1/5] Gossipsub: rename Memory_cache to Message_cache --- src/lib_gossipsub/gossipsub_intf.ml | 6 ++-- src/lib_gossipsub/test/test_unit.ml | 18 +++++----- src/lib_gossipsub/tezos_gossipsub.ml | 49 +++++++++++++++------------- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 212677dc3bc8..6bc2cd626824 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -424,12 +424,12 @@ module type AUTOMATON = sig type fanout_peers = {peers : Peer.Set.t; last_published_time : Time.t} - module Memory_cache : sig + module Message_cache : sig type value = {message : message; access : int Peer.Map.t} type t = {messages : value Message_id.Map.t} - (** [get_memory_cache_value message_id state] returns the + (** [get_message_cache_value message_id state] returns the cached value for [message_id]. *) val get_value : Message_id.t -> state -> value option end @@ -443,7 +443,7 @@ module type AUTOMATON = sig mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 0e17603e0b0f..d0765af4c806 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -55,10 +55,10 @@ let assert_fanout_size ~__LOC__ ~topic ~expected_size state = ~error_msg:"Expected %R, got %L" ~__LOC__) -let assert_in_memory_cache ~__LOC__ message_id ~expected_message state = - match GS.Introspection.Memory_cache.get_value message_id state with +let assert_in_message_cache ~__LOC__ message_id ~expected_message state = + match GS.Introspection.Message_cache.get_value message_id state with | None -> - Test.fail "Expected entry in memory cache for message id %d" message_id + Test.fail "Expected entry in message cache for message id %d" message_id | Some {message; access = _} -> Check.( (message = expected_message) @@ -397,7 +397,7 @@ let test_join_adds_fanout_to_mesh rng limits parameters = (** Tests that publishing to a subscribed topic: - Returns peers to publish to. - - Inserts message into memory cache. + - Inserts message into message cache. Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L629 *) @@ -432,8 +432,8 @@ let test_publish_without_flood_publishing rng limits parameters = int ~error_msg:"Expected %R, got %L" ~__LOC__) ; - (* [message_id] should be added to the memory cache. *) - assert_in_memory_cache + (* [message_id] should be added to the message cache. *) + assert_in_message_cache ~__LOC__ message_id ~expected_message:publish_data @@ -442,7 +442,7 @@ let test_publish_without_flood_publishing rng limits parameters = (** Tests that publishing to an unsubscribed topic: - Populate fanout peers. - Return peers to publish to. - - Inserts message into the memory cache. + - Inserts message into the message cache. Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L715 *) @@ -481,8 +481,8 @@ let test_fanout rng limits parameters = int ~error_msg:"Expected %R, got %L" ~__LOC__) ; - (* [message_id] should be added to the memory cache. *) - assert_in_memory_cache + (* [message_id] should be added to the message cache. *) + assert_in_message_cache ~__LOC__ message_id ~expected_message:publish_data diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 399b1eb6ee0f..f541542e7154 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -181,7 +181,7 @@ module Make (C : AUTOMATON_CONFIG) : (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 This module is incomplete. *) - module Memory_cache = struct + module Message_cache = struct type value = {message : message; access : int Peer.Map.t} type t = {messages : value Message_id.Map.t} @@ -225,7 +225,7 @@ module Make (C : AUTOMATON_CONFIG) : mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } @@ -280,7 +280,7 @@ module Make (C : AUTOMATON_CONFIG) : mesh = Topic.Map.empty; fanout = Topic.Map.empty; seen_messages = Message_id.Set.empty; - memory_cache = Memory_cache.create (); + message_cache = Message_cache.create (); rng; heartbeat_ticks = 0L; } @@ -337,7 +337,7 @@ module Make (C : AUTOMATON_CONFIG) : let peer_filter state = state.parameters.peer_filter - let memory_cache state = state.memory_cache + let message_cache state = state.message_cache let rng state = state.rng @@ -395,7 +395,7 @@ module Make (C : AUTOMATON_CONFIG) : let {mesh; _} = state in match Topic.Map.find topic mesh with None -> false | Some _ -> true - let set_memory_cache memory_cache state = ({state with memory_cache}, ()) + let set_message_cache message_cache state = ({state with message_cache}, ()) let get_connections_score connections ~default peer = match Peer.Map.find peer connections with @@ -466,8 +466,8 @@ module Make (C : AUTOMATON_CONFIG) : let state = { state with - memory_cache = - Memory_cache.add_message message_id message state.memory_cache; + message_cache = + Message_cache.add_message message_id message state.message_cache; } in (state, ()) @@ -611,31 +611,34 @@ module Make (C : AUTOMATON_CONFIG) : Score check is missing. *) let routed_message_ids = Message_id.Map.empty in - let*! memory_cache in + let*! message_cache in let*! peer_filter in - let memory_cache, routed_message_ids = + let message_cache, routed_message_ids = (* FIXME https://gitlab.com/tezos/tezos/-/issues/5011 A check should ensure that the number of accesses do not exceed some pre-defined limit. *) List.fold_left - (fun (memory_cache, messages) message_id -> - let memory_cache, info = + (fun (message_cache, messages) message_id -> + let message_cache, info = match - Memory_cache.record_message_access peer message_id memory_cache + Message_cache.record_message_access + peer + message_id + message_cache with - | None -> (memory_cache, `Not_found) - | Some (memory_cache, message) -> - ( memory_cache, + | None -> (message_cache, `Not_found) + | Some (message_cache, message) -> + ( message_cache, if peer_filter peer (`IWant message_id) then `Message message else `Ignored ) in - (memory_cache, Message_id.Map.add message_id info messages)) - (memory_cache, routed_message_ids) + (message_cache, Message_id.Map.add message_id info messages)) + (message_cache, routed_message_ids) message_ids in - let* () = set_memory_cache memory_cache in + let* () = set_message_cache message_cache in On_iwant_messages_to_route {routed_message_ids} |> return end @@ -1474,12 +1477,12 @@ module Make (C : AUTOMATON_CONFIG) : last_published_time : time; } - module Memory_cache = struct - include Memory_cache + module Message_cache = struct + include Message_cache let get_value message_id state = - Message_id.Map.find_opt message_id state.memory_cache.messages - |> Option.map (fun Memory_cache.{message; access} -> {message; access}) + Message_id.Map.find_opt message_id state.message_cache.messages + |> Option.map (fun Message_cache.{message; access} -> {message; access}) end type view = state = { @@ -1491,7 +1494,7 @@ module Make (C : AUTOMATON_CONFIG) : mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } -- GitLab From 3c0c851e2b0b9d242893443433b28ba1695ec345 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Fri, 24 Mar 2023 15:32:22 +0100 Subject: [PATCH 2/5] Gossipsub: complete module Message_cache --- src/lib_gossipsub/gossipsub_intf.ml | 30 +++- src/lib_gossipsub/message_cache.ml | 169 ++++++++++++++++++ src/lib_gossipsub/message_cache.mli | 78 ++++++++ src/lib_gossipsub/test/test_gossipsub.ml | 2 + .../test/test_gossipsub_shared.ml | 8 +- src/lib_gossipsub/test/test_unit.ml | 25 ++- src/lib_gossipsub/tezos_gossipsub.ml | 70 +++----- 7 files changed, 318 insertions(+), 64 deletions(-) create mode 100644 src/lib_gossipsub/message_cache.ml create mode 100644 src/lib_gossipsub/message_cache.mli diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 6bc2cd626824..808f38e2d0cf 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -128,6 +128,19 @@ type ('peer, 'message_id, 'span) limits = { attackers from overwhelming our mesh with incoming connections. [degree_out] must be set below {degree_low}, and must not exceed [degree_optimal / 2]. *) + history_length : int; + (** [history_length] controls the size of the message cache used for + gossip. The message cache will remember messages for [history_length] + heartbeats. *) + history_gossip_length : int; + (** [history_gossip_length] controls how many cached message ids the local + peer will advertise in IHAVE gossip messages. When asked for its seen + message ids, the local peer will return only those from the most + recent [history_gossip_length] heartbeats. The slack between + [history_gossip_length] and [history_length] allows the local peer to + avoid advertising messages that will be expired by the time they're + requested. [history_gossip_length] must be less than or equal to + [history_length]. *) } type ('peer, 'message_id) parameters = { @@ -425,13 +438,18 @@ module type AUTOMATON = sig type fanout_peers = {peers : Peer.Set.t; last_published_time : Time.t} module Message_cache : sig - type value = {message : message; access : int Peer.Map.t} + type t - type t = {messages : value Message_id.Map.t} + val create : history_slots:int -> gossip_slots:int -> t - (** [get_message_cache_value message_id state] returns the - cached value for [message_id]. *) - val get_value : Message_id.t -> state -> value option + val add_message : Message_id.t -> Message.t -> Topic.t -> t -> t + + val get_message_for_peer : + Peer.t -> Message_id.t -> t -> (t * Message.t * int) option + + val get_message_ids_to_gossip : Topic.t -> t -> Message_id.t list + + val shift : t -> t end type view = { @@ -548,7 +566,7 @@ module type WORKER = sig val shutdown : t -> unit (** [inject state msg_id msg topic] is used to inject a message [msg] with - ID [msg_id] and that belongs to [topic] to the network. *) + id [msg_id] and that belongs to [topic] to the network. *) val inject : t -> GS.Message_id.t -> GS.Message.t -> GS.Topic.t -> unit (** [join t topics] joins [topics] even if the worker is running. *) diff --git a/src/lib_gossipsub/message_cache.ml b/src/lib_gossipsub/message_cache.ml new file mode 100644 index 000000000000..0d36dbd9c3c4 --- /dev/null +++ b/src/lib_gossipsub/message_cache.ml @@ -0,0 +1,169 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module Make + (Peer : Gossipsub_intf.ITERABLE) + (Topic : Gossipsub_intf.ITERABLE) + (Message_id : Gossipsub_intf.ITERABLE) + (Message : Gossipsub_intf.PRINTABLE) = +struct + type message_with_counters = { + message : Message.t; + store_counter : int; + (* This field is used to count how many times the message has been added to + the cache. Normally the caller ensures that the same messages (that is, + with the same id) is not insert twice; however, this module does not make + this assumption. *) + access_counters : int Peer.Map.t; + } + + (* A slot is just an index, normally a heartbeat tick, and this is why the + same type, namely [Int64], is used. Note that a bigger slots is a more + recent slot. *) + module SlotMap = Map.Make (Int64) + + type slot_entry = Message_id.t list Topic.Map.t + + type t = { + history_slots : int; + gossip_slots : int; + messages : message_with_counters Message_id.Map.t; + cache : slot_entry SlotMap.t; + (** The cache without the entry for the last slot, which is stored + separately, see next field. *) + last_slot_entry : slot_entry; + last_slot : Int64.t; + } + + let create ~history_slots ~gossip_slots = + assert (gossip_slots > 0) ; + assert (gossip_slots <= history_slots) ; + { + history_slots; + gossip_slots; + messages = Message_id.Map.empty; + cache = SlotMap.empty; + last_slot_entry = Topic.Map.empty; + last_slot = 0L; + } + + let add_message message_id message topic t = + let last_slot_entry = + Topic.Map.update + topic + (function + | None -> Some [message_id] + | Some message_ids -> Some (message_id :: message_ids)) + t.last_slot_entry + in + let messages = + Message_id.Map.update + message_id + (function + | None -> + Some + {message; store_counter = 1; access_counters = Peer.Map.empty} + | Some {message; store_counter; access_counters} -> + Some {message; store_counter = store_counter + 1; access_counters}) + t.messages + in + {t with messages; last_slot_entry} + + let get_message_for_peer peer message_id t = + match Message_id.Map.find message_id t.messages with + | None -> None + | Some {message; store_counter; access_counters} -> + let counter = ref 1 in + let access_counters = + Peer.Map.update + peer + (function + | None -> Some 1 + | Some c -> + counter := c + 1 ; + Some !counter) + access_counters + in + let t = + { + t with + messages = + Message_id.Map.add + message_id + {message; store_counter; access_counters} + t.messages; + } + in + Some (t, message, !counter) + + let get_message_ids_to_gossip topic t = + SlotMap.to_rev_seq t.cache + |> Seq.take ~when_negative_length:() (t.gossip_slots - 1) + |> WithExceptions.Result.get_ok ~loc:__LOC__ + |> Seq.cons (t.last_slot, t.last_slot_entry) + |> Seq.fold_left + (fun acc_message_ids (_slot, entries) -> + match Topic.Map.find topic entries with + | None -> acc_message_ids + | Some message_ids -> List.rev_append message_ids acc_message_ids) + [] + + let shift t = + let drop_old_messages oldest_entries = + Topic.Map.fold + (fun _topic message_ids messages -> + List.fold_left + (fun messages message_id -> + Message_id.Map.update + message_id + (function + | None -> None + | Some {message; store_counter; access_counters} -> + if store_counter = 1 then None + else + Some + { + message; + store_counter = store_counter - 1; + access_counters; + }) + messages) + messages + message_ids) + oldest_entries + t.messages + in + let cache = SlotMap.add t.last_slot t.last_slot_entry t.cache in + let last_slot = Int64.succ t.last_slot in + let cache, messages = + match SlotMap.min_binding cache with + | None -> (* impossible *) (cache, t.messages) + | Some (first_slot, entries) -> + if Int64.(sub last_slot first_slot < of_int t.history_slots) then + (cache, t.messages) + else (SlotMap.remove first_slot cache, drop_old_messages entries) + in + {t with cache; messages; last_slot; last_slot_entry = Topic.Map.empty} +end diff --git a/src/lib_gossipsub/message_cache.mli b/src/lib_gossipsub/message_cache.mli new file mode 100644 index 000000000000..1ea910fad6d3 --- /dev/null +++ b/src/lib_gossipsub/message_cache.mli @@ -0,0 +1,78 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** A sliding window cache of published messages. The module also keeps track of + the number of accesses to a message by a peer, thus indirectly tracking the + number of IWANT requests a peer makes for the same message. + + The module assumes that no two messages have the same message id. However, + the cache stores duplicates; for instance, if [add_message id msg topic] is + called twice, then [msg] will appear (at least) twice in + [get_message_ids_to_gossip]'s result (assuming not more than [gossip_slots] + shifts have been executed in the meanwhile). +*) +module Make + (Peer : Gossipsub_intf.ITERABLE) + (Topic : Gossipsub_intf.ITERABLE) + (Message_id : Gossipsub_intf.ITERABLE) + (Message : Gossipsub_intf.PRINTABLE) : sig + type t + + (** [create ~history_slots ~gossip_slots] creates a sliding window cache of + length [history_slots]. + + When queried for messages to advertise, the cache only returns messages in + the last [gossip_slots]. The [gossip_slots] must be smaller or equal to + [history_slots]. The slack between [gossip_slots] and [history_slots] + accounts for the reaction time between when a message is advertised via + IHAVE gossip, and when the peer pulls it via an IWANT command. + + @raise Assert_failure when [gossip_slots <= 0 || gossip_slots > history_slots] + + TODO: https://gitlab.com/tezos/tezos/-/issues/5129 + Error handling. *) + val create : history_slots:int -> gossip_slots:int -> t + + (** Add message to the most recent cache slot. If the message already exists + in the cache, the message is not overridden, instead a duplicate is + stored. *) + val add_message : Message_id.t -> Message.t -> Topic.t -> t -> t + + (** Get the message associated to the given message id, increase the access + counter for the peer requesting the message, and also return the updated + counter. *) + val get_message_for_peer : + Peer.t -> Message_id.t -> t -> (t * Message.t * int) option + + (** Get the message ids for the given topic in the last [gossip_slots] slots + of the cache. If there were duplicates added in the cache, then there will + be duplicates in the output. There is no guarantee about the order of + messages in the output. *) + val get_message_ids_to_gossip : Topic.t -> t -> Message_id.t list + + (** Shift the sliding window by one slot (usually corresponding to one + heartbeat tick). *) + val shift : t -> t +end diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml index a7b341b553b9..aeddc9ce04bd 100644 --- a/src/lib_gossipsub/test/test_gossipsub.ml +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -45,6 +45,8 @@ let default_limits = degree_high = 12; degree_score = 4; degree_out = 2; + history_length = 5; + history_gossip_length = 3; } let parameters = {peer_filter = (fun _peer _action -> true)} diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index f951ed4b0275..8540e59d1d81 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -112,6 +112,8 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_high; degree_score; degree_out; + history_length; + history_gossip_length; } = l in @@ -133,7 +135,9 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_low = %d;@;\ degree_high = %d;@;\ degree_score = %d;@;\ - degree_out = %d }@]" + degree_out = %d;@;\ + history_length = %d;@;\ + history_gossip_length = %d }@]" max_recv_ihave_per_heartbeat max_sent_iwant_per_heartbeat degree_optimal @@ -156,3 +160,5 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_high degree_score degree_out + history_length + history_gossip_length diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index d0765af4c806..cf86daa8fe9a 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -55,17 +55,24 @@ let assert_fanout_size ~__LOC__ ~topic ~expected_size state = ~error_msg:"Expected %R, got %L" ~__LOC__) -let assert_in_message_cache ~__LOC__ message_id ~expected_message state = - match GS.Introspection.Message_cache.get_value message_id state with +(* Note: a new message cache state is returned when inspecting it, but this + function does not return this updated state! *) +let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state = + let view = GS.Introspection.view state in + match + GS.Introspection.Message_cache.get_message_for_peer + peer + message_id + view.message_cache + with | None -> Test.fail "Expected entry in message cache for message id %d" message_id - | Some {message; access = _} -> + | Some (_message_cache_state, message, _access) -> Check.( (message = expected_message) string ~error_msg:"Expected %R, got %L" - ~__LOC__) ; - unit + ~__LOC__) let assert_mesh_inclusion ~__LOC__ ~topic ~peer ~is_included state = let view = GS.Introspection.view state in @@ -436,8 +443,10 @@ let test_publish_without_flood_publishing rng limits parameters = assert_in_message_cache ~__LOC__ message_id + ~peer:(Stdlib.List.hd peers) ~expected_message:publish_data - state + state ; + unit (** Tests that publishing to an unsubscribed topic: - Populate fanout peers. @@ -485,8 +494,10 @@ let test_fanout rng limits parameters = assert_in_message_cache ~__LOC__ message_id + ~peer:(Stdlib.List.hd peers) ~expected_message:publish_data - state + state ; + unit (** Tests that a peer is added to our mesh on graft when we are both joined/subscribed to the same topic. diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index f541542e7154..51d568630941 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -178,41 +178,11 @@ module Make (C : AUTOMATON_CONFIG) : type connections = connection Peer.Map.t - (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 - - This module is incomplete. *) - module Message_cache = struct - type value = {message : message; access : int Peer.Map.t} - - type t = {messages : value Message_id.Map.t} - - let create () = {messages = Message_id.Map.empty} - - let record_message_access peer message_id t = - match Message_id.Map.find message_id t.messages with - | None -> None - | Some {message; access} -> - let access = - Peer.Map.update - peer - (function None -> Some 1 | Some x -> Some (x + 1)) - access - in - let t = - { - messages = - Message_id.Map.add message_id {message; access} t.messages; - } - in - Some (t, message) - - let add_message message_id message t = - let value = {message; access = Peer.Map.empty} in - {messages = Message_id.Map.add message_id value t.messages} - end - type fanout_peers = {peers : Peer.Set.t; last_published_time : time} + module Message_cache = + Message_cache.Make (Peer) (Topic) (Message_id) (Message) + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4983 This data-structure should be documented. *) @@ -266,7 +236,9 @@ module Make (C : AUTOMATON_CONFIG) : requirement or delete the todo. *) assert (l.degree_score + l.degree_out <= l.degree_optimal) ; assert (l.degree_out < l.degree_low) ; - assert (l.degree_out <= l.degree_optimal / 2) + assert (l.degree_out <= l.degree_optimal / 2) ; + assert (l.history_gossip_length > 0) ; + assert (l.history_gossip_length <= l.history_length) let make : Random.State.t -> limits -> parameters -> state = fun rng limits parameters -> @@ -280,7 +252,10 @@ module Make (C : AUTOMATON_CONFIG) : mesh = Topic.Map.empty; fanout = Topic.Map.empty; seen_messages = Message_id.Set.empty; - message_cache = Message_cache.create (); + message_cache = + Message_cache.create + ~history_slots:limits.history_length + ~gossip_slots:limits.history_gossip_length; rng; heartbeat_ticks = 0L; } @@ -462,12 +437,16 @@ module Make (C : AUTOMATON_CONFIG) : let state = {state with fanout = Topic.Map.remove topic state.fanout} in (state, ()) - let put_message_in_cache message_id message state = + let put_message_in_cache message_id message topic state = let state = { state with message_cache = - Message_cache.add_message message_id message state.message_cache; + Message_cache.add_message + message_id + message + topic + state.message_cache; } in (state, ()) @@ -622,13 +601,10 @@ module Make (C : AUTOMATON_CONFIG) : (fun (message_cache, messages) message_id -> let message_cache, info = match - Message_cache.record_message_access - peer - message_id - message_cache + Message_cache.get_message_for_peer peer message_id message_cache with | None -> (message_cache, `Not_found) - | Some (message_cache, message) -> + | Some (message_cache, message, _access_counter) -> ( message_cache, if peer_filter peer (`IWant message_id) then `Message message @@ -841,7 +817,7 @@ module Make (C : AUTOMATON_CONFIG) : let handle ~sender topic message_id message : [`Publish] output Monad.t = let open Monad.Syntax in - let* () = put_message_in_cache message_id message in + let* () = put_message_in_cache message_id message topic in let*! mesh_opt = find_mesh topic in let* peers = match mesh_opt with @@ -1477,13 +1453,7 @@ module Make (C : AUTOMATON_CONFIG) : last_published_time : time; } - module Message_cache = struct - include Message_cache - - let get_value message_id state = - Message_id.Map.find_opt message_id state.message_cache.messages - |> Option.map (fun Message_cache.{message; access} -> {message; access}) - end + module Message_cache = Message_cache type view = state = { limits : limits; -- GitLab From 63ae40cb5a47272a12ff16e7ba2975f86102b867 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Fri, 24 Mar 2023 16:06:04 +0100 Subject: [PATCH 3/5] Gossipsub: advance the message history sliding window --- src/lib_gossipsub/tezos_gossipsub.ml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 51d568630941..30a6595b083a 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -1277,8 +1277,10 @@ module Make (C : AUTOMATON_CONFIG) : Maintain our fanout for topics we are publishing to, but we have not joined. *) - (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 - Advance the message history window *) + (* Advance the message history sliding window. *) + let*! message_cache in + let* () = Message_cache.shift message_cache |> set_message_cache in + Heartbeat {to_graft; to_prune; noPX_peers} |> return end -- GitLab From 5296794a6c41f2fa67bd544d72bf386d30cd8774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 24 Mar 2023 18:46:20 +0100 Subject: [PATCH 4/5] Gossipsub: Add a PBT test for soundness --- src/lib_gossipsub/test/test_pbt.ml | 290 ++++++++++++++++++++++++++++- 1 file changed, 289 insertions(+), 1 deletion(-) diff --git a/src/lib_gossipsub/test/test_pbt.ml b/src/lib_gossipsub/test/test_pbt.ml index 513a1d245869..02183b38c0f3 100644 --- a/src/lib_gossipsub/test/test_pbt.ml +++ b/src/lib_gossipsub/test/test_pbt.ml @@ -59,6 +59,292 @@ module Basic_fragments = struct let heartbeat : t = of_list [Heartbeat] end +module Test_message_cache = struct + module L = GS.Introspection.Message_cache + + module R = struct + module M = Map.Make (Int) + + type t = { + mutable ticks : int; + cache : C.Message.t C.Message_id.Map.t C.Topic.Map.t M.t; + history_slots : int; + gossip_slots : int; + } + + let create ~history_slots ~gossip_slots = + assert (gossip_slots > 0) ; + assert (gossip_slots <= history_slots) ; + {ticks = 0; cache = M.empty; history_slots; gossip_slots} + + let add_message message_id message topic t = + { + t with + cache = + M.update + t.ticks + (function + | None -> + C.Topic.Map.singleton + topic + (C.Message_id.Map.singleton message_id message) + |> Option.some + | Some map -> + C.Topic.Map.update + topic + (function + | None -> + C.Message_id.Map.singleton message_id message + |> Option.some + | Some topic_map -> + C.Message_id.Map.add message_id message topic_map + |> Option.some) + map + |> Option.some) + t.cache; + } + + let get_message_for_peer _peer message_id t = + let found = ref None in + for x = max 0 (t.ticks - t.history_slots + 1) to t.ticks do + match M.find x t.cache with + | None -> () + | Some topic_map -> + C.Topic.Map.iter + (fun _topic map -> + match C.Message_id.Map.find message_id map with + | None -> () + | Some message -> found := Some message) + topic_map + done ; + let r = !found in + Option.map (fun message -> (t, message)) r + + let get_message_ids_to_gossip topic t = + let found = ref C.Message_id.Set.empty in + for x = max 0 (t.ticks - t.gossip_slots + 1) to t.ticks do + match M.find x t.cache with + | None -> () + | Some topic_map -> ( + match C.Topic.Map.find topic topic_map with + | None -> () + | Some message_map -> + let set = + message_map |> C.Message_id.Map.to_seq |> Seq.map fst + |> C.Message_id.Set.of_seq + in + found := C.Message_id.Set.union !found set) + done ; + !found + + let shift t = + t.ticks <- t.ticks + 1 ; + t + end + + (* If those numbers are too large, we will miss scenarios with collisions. *) + let history_slots = QCheck2.Gen.int_range (-1) 10 + + let gossip_slots = history_slots + + let message_id = QCheck2.Gen.int_range 0 10 + + (* The data-structure assumes that the id identifies uniquely the + message. To ease the readibility of the test we consider a + messsage constant. *) + let message = QCheck2.Gen.return "m" + + let peer = QCheck2.Gen.return 0 + + let topic = + let open QCheck2.Gen in + let* chars = + QCheck2.Gen.list_size + (QCheck2.Gen.int_range 1 2) + (QCheck2.Gen.char_range 'a' 'c') + in + return (String.of_seq (List.to_seq chars)) + + type action = + | Add_message of {message_id : int; message : string; topic : string} + | Get_message_for_peer of {peer : int; message_id : int} + | Get_message_ids_to_gossip of {topic : string} + | Shift + + let pp_action fmt = function + | Add_message {message_id; message = _; topic} -> + Format.fprintf fmt "ADD_MESSAGE {id:%d;topic:%s}" message_id topic + | Get_message_for_peer {peer = _; message_id} -> + Format.fprintf fmt "GET_MESSAGE {id:%d}" message_id + | Get_message_ids_to_gossip {topic} -> + Format.fprintf fmt "GET_FOR_TOPIC {topic:%s}" topic + | Shift -> Format.fprintf fmt "SHIFT" + + let add_message = + let open QCheck2.Gen in + let* message_id in + let* message in + let* topic in + return (Add_message {message_id; message; topic}) + + let get_message_for_peer = + let open QCheck2.Gen in + let* message_id in + let* peer in + return (Get_message_for_peer {peer; message_id}) + + let get_message_ids_to_gossip = + let open QCheck2.Gen in + let* topic in + return (Get_message_ids_to_gossip {topic}) + + let action = + QCheck2.Gen.oneof + [ + add_message; + get_message_for_peer; + get_message_ids_to_gossip; + QCheck2.Gen.return Shift; + ] + + let actions = QCheck2.Gen.(list_size (int_range 1 30) action) + + let rec run (left, right) actions = + let remaining_steps = List.length actions in + match actions with + | [] -> None + | Add_message {message_id; message; topic} :: actions -> + let left = L.add_message message_id message topic left in + let right = R.add_message message_id message topic right in + run (left, right) actions + | Get_message_for_peer {peer; message_id} :: actions -> ( + let left_result = L.get_message_for_peer peer message_id left in + let right_result = R.get_message_for_peer peer message_id right in + match (left_result, right_result) with + | None, None -> run (left, right) actions + | Some (left, _left_message, _left_counter), Some (right, _right_message) + -> + (* By definition of the message generator, messages are equal. *) + run (left, right) actions + | None, Some _ -> + let message = Format.asprintf "Expected: A message. Got: None" in + Some (remaining_steps, message) + | Some _, None -> + let message = + Format.asprintf "Expected: No message. Got: A message" + in + Some (remaining_steps, message)) + | Get_message_ids_to_gossip {topic} :: actions -> + let left_result = L.get_message_ids_to_gossip topic left in + let right_result = R.get_message_ids_to_gossip topic right in + let left_set = C.Message_id.Set.of_list left_result in + if C.Message_id.Set.equal left_set right_result then + run (left, right) actions + else + let pp_set fmt s = + if C.Message_id.Set.is_empty s then Format.fprintf fmt "empty set" + else + s |> C.Message_id.Set.to_seq |> List.of_seq + |> Format.fprintf + fmt + "%a" + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.fprintf fmt " ") + Format.pp_print_int) + in + let message = + Format.asprintf + "Expected: %a@.Got: %a@." + pp_set + right_result + pp_set + left_set + in + Some (remaining_steps, message) + | Shift :: actions -> + let left = L.shift left in + let right = R.shift right in + run (left, right) actions + + let pp fmt trace = + Format.fprintf + fmt + "%a@." + (Format.pp_print_list ~pp_sep:Format.pp_print_newline pp_action) + trace + + let test rng = + Tezt_core.Test.register + ~__FILE__ + ~title:"Gossipsub: check correction of message cache data structure" + ~tags:["gossipsub"; "message_cache"] + @@ fun () -> + let scenario = + let open QCheck2.Gen in + let* history_slots in + let* gossip_slots in + let* actions in + let left = + try L.create ~history_slots ~gossip_slots |> Either.left + with exn -> Either.right exn + in + let right = + try R.create ~history_slots ~gossip_slots |> Either.left + with exn -> Either.right exn + in + match (left, right) with + | Right _, Right _ -> return None + | Left left, Left right -> ( + match run (left, right) actions with + | None -> return None + | Some (remaining_steps, explanation) -> + let n = List.length actions - remaining_steps + 1 in + let actions = List.take_n n actions in + return @@ Some (history_slots, gossip_slots, explanation, actions) + ) + | Right exn, Left _ -> + let explanation = + Format.asprintf + "Initialisation failed unexpectedly: %s" + (Printexc.to_string exn) + in + return @@ Some (history_slots, gossip_slots, explanation, []) + | Left _, Right exn -> + let explanation = + Format.asprintf + "Initialisation succeeded while it should not. Expected to fail \ + with: %s" + (Printexc.to_string exn) + in + return @@ Some (history_slots, gossip_slots, explanation, []) + in + let test = + QCheck2.Test.make ~count:500_000 ~name:"Gossipsub: message cache" scenario + @@ function + | None -> true + | Some (history_slots, gossip_slots, explanation, trace) -> + Tezt.Test.fail + ~__LOC__ + "@[Soundness check failed.@;\ + Limits:@;\ + history_slots: %d@;\ + gossip_slots: %d@;\ + @;\ + Dumping trace:@;\ + @[%a@]@;\ + @;\ + Explanation:@;\ + %s@]" + history_slots + gossip_slots + pp + trace + explanation + in + QCheck2.Test.check_exn ~rand:rng test ; + unit +end + (** Test that removing a peer really removes it from the state *) module Test_remove_peer = struct open Gossipsub_pbt_generators @@ -172,4 +458,6 @@ module Test_remove_peer = struct unit end -let register rng limits parameters = Test_remove_peer.test rng limits parameters +let register rng limits parameters = + Test_remove_peer.test rng limits parameters ; + Test_message_cache.test rng -- GitLab From bf2cf940b988ec1196b4947df53b27047d361095 Mon Sep 17 00:00:00 2001 From: Eugen Zalinescu Date: Wed, 29 Mar 2023 14:32:00 +0200 Subject: [PATCH 5/5] Gossipsub: add a subsignature to AUTOMATON_CONFIG --- src/lib_gossipsub/gossipsub_intf.ml | 6 ++- src/lib_gossipsub/message_cache.ml | 12 +++--- src/lib_gossipsub/message_cache.mli | 12 ++---- .../test/test_gossipsub_shared.ml | 18 +++++---- src/lib_gossipsub/test/test_pbt.ml | 38 ++++++++++--------- src/lib_gossipsub/test/test_unit.ml | 32 +++++++++------- src/lib_gossipsub/tezos_gossipsub.ml | 19 +++++----- src/lib_gossipsub/tezos_gossipsub.mli | 8 ++-- 8 files changed, 77 insertions(+), 68 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 808f38e2d0cf..8c6ef48003ee 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -42,7 +42,7 @@ module type ITERABLE = sig module Map : Map.S with type key = t end -module type AUTOMATON_CONFIG = sig +module type AUTOMATON_SUBCONFIG = sig module Peer : ITERABLE module Topic : ITERABLE @@ -50,6 +50,10 @@ module type AUTOMATON_CONFIG = sig module Message_id : ITERABLE module Message : PRINTABLE +end + +module type AUTOMATON_CONFIG = sig + module Subconfig : AUTOMATON_SUBCONFIG module Span : PRINTABLE diff --git a/src/lib_gossipsub/message_cache.ml b/src/lib_gossipsub/message_cache.ml index 0d36dbd9c3c4..ce2436f9f12b 100644 --- a/src/lib_gossipsub/message_cache.ml +++ b/src/lib_gossipsub/message_cache.ml @@ -23,12 +23,12 @@ (* *) (*****************************************************************************) -module Make - (Peer : Gossipsub_intf.ITERABLE) - (Topic : Gossipsub_intf.ITERABLE) - (Message_id : Gossipsub_intf.ITERABLE) - (Message : Gossipsub_intf.PRINTABLE) = -struct +module Make (C : Gossipsub_intf.AUTOMATON_SUBCONFIG) = struct + module Peer = C.Peer + module Topic = C.Topic + module Message_id = C.Message_id + module Message = C.Message + type message_with_counters = { message : Message.t; store_counter : int; diff --git a/src/lib_gossipsub/message_cache.mli b/src/lib_gossipsub/message_cache.mli index 1ea910fad6d3..a1c03094269f 100644 --- a/src/lib_gossipsub/message_cache.mli +++ b/src/lib_gossipsub/message_cache.mli @@ -33,11 +33,7 @@ [get_message_ids_to_gossip]'s result (assuming not more than [gossip_slots] shifts have been executed in the meanwhile). *) -module Make - (Peer : Gossipsub_intf.ITERABLE) - (Topic : Gossipsub_intf.ITERABLE) - (Message_id : Gossipsub_intf.ITERABLE) - (Message : Gossipsub_intf.PRINTABLE) : sig +module Make (C : Gossipsub_intf.AUTOMATON_SUBCONFIG) : sig type t (** [create ~history_slots ~gossip_slots] creates a sliding window cache of @@ -58,19 +54,19 @@ module Make (** Add message to the most recent cache slot. If the message already exists in the cache, the message is not overridden, instead a duplicate is stored. *) - val add_message : Message_id.t -> Message.t -> Topic.t -> t -> t + val add_message : C.Message_id.t -> C.Message.t -> C.Topic.t -> t -> t (** Get the message associated to the given message id, increase the access counter for the peer requesting the message, and also return the updated counter. *) val get_message_for_peer : - Peer.t -> Message_id.t -> t -> (t * Message.t * int) option + C.Peer.t -> C.Message_id.t -> t -> (t * C.Message.t * int) option (** Get the message ids for the given topic in the last [gossip_slots] slots of the cache. If there were duplicates added in the cache, then there will be duplicates in the output. There is no guarantee about the order of messages in the output. *) - val get_message_ids_to_gossip : Topic.t -> t -> Message_id.t list + val get_message_ids_to_gossip : C.Topic.t -> t -> C.Message_id.t list (** Shift the sliding window by one slot (usually corresponding to one heartbeat tick). *) diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 8540e59d1d81..08b9c3014c5c 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -41,10 +41,10 @@ module Automaton_config : AUTOMATON_CONFIG with type Time.t = int and type Span.t = int - and type Peer.t = int - and type Topic.t = string - and type Message_id.t = int - and type Message.t = string = struct + and type Subconfig.Peer.t = int + and type Subconfig.Topic.t = string + and type Subconfig.Message_id.t = int + and type Subconfig.Message.t = string = struct module Span = struct type t = int @@ -85,10 +85,12 @@ module Automaton_config : module Set = Set.Make (String) end - module Peer = Int_iterable - module Topic = String_iterable - module Message_id = Int_iterable - module Message = String_iterable + module Subconfig = struct + module Peer = Int_iterable + module Topic = String_iterable + module Message_id = Int_iterable + module Message = String_iterable + end end module C = Automaton_config diff --git a/src/lib_gossipsub/test/test_pbt.ml b/src/lib_gossipsub/test/test_pbt.ml index 02183b38c0f3..5154bae52b23 100644 --- a/src/lib_gossipsub/test/test_pbt.ml +++ b/src/lib_gossipsub/test/test_pbt.ml @@ -28,6 +28,10 @@ open Test_gossipsub_shared open Gossipsub_intf open Tezt_core.Base +module Peer = C.Subconfig.Peer +module Topic = C.Subconfig.Topic +module Message_id = C.Subconfig.Message_id +module Message = C.Subconfig.Message module Basic_fragments = struct open Gossipsub_pbt_generators @@ -67,7 +71,7 @@ module Test_message_cache = struct type t = { mutable ticks : int; - cache : C.Message.t C.Message_id.Map.t C.Topic.Map.t M.t; + cache : Message.t Message_id.Map.t Topic.Map.t M.t; history_slots : int; gossip_slots : int; } @@ -85,19 +89,19 @@ module Test_message_cache = struct t.ticks (function | None -> - C.Topic.Map.singleton + Topic.Map.singleton topic - (C.Message_id.Map.singleton message_id message) + (Message_id.Map.singleton message_id message) |> Option.some | Some map -> - C.Topic.Map.update + Topic.Map.update topic (function | None -> - C.Message_id.Map.singleton message_id message + Message_id.Map.singleton message_id message |> Option.some | Some topic_map -> - C.Message_id.Map.add message_id message topic_map + Message_id.Map.add message_id message topic_map |> Option.some) map |> Option.some) @@ -110,9 +114,9 @@ module Test_message_cache = struct match M.find x t.cache with | None -> () | Some topic_map -> - C.Topic.Map.iter + Topic.Map.iter (fun _topic map -> - match C.Message_id.Map.find message_id map with + match Message_id.Map.find message_id map with | None -> () | Some message -> found := Some message) topic_map @@ -121,19 +125,19 @@ module Test_message_cache = struct Option.map (fun message -> (t, message)) r let get_message_ids_to_gossip topic t = - let found = ref C.Message_id.Set.empty in + let found = ref Message_id.Set.empty in for x = max 0 (t.ticks - t.gossip_slots + 1) to t.ticks do match M.find x t.cache with | None -> () | Some topic_map -> ( - match C.Topic.Map.find topic topic_map with + match Topic.Map.find topic topic_map with | None -> () | Some message_map -> let set = - message_map |> C.Message_id.Map.to_seq |> Seq.map fst - |> C.Message_id.Set.of_seq + message_map |> Message_id.Map.to_seq |> Seq.map fst + |> Message_id.Set.of_seq in - found := C.Message_id.Set.union !found set) + found := Message_id.Set.union !found set) done ; !found @@ -237,14 +241,14 @@ module Test_message_cache = struct | Get_message_ids_to_gossip {topic} :: actions -> let left_result = L.get_message_ids_to_gossip topic left in let right_result = R.get_message_ids_to_gossip topic right in - let left_set = C.Message_id.Set.of_list left_result in - if C.Message_id.Set.equal left_set right_result then + let left_set = Message_id.Set.of_list left_result in + if Message_id.Set.equal left_set right_result then run (left, right) actions else let pp_set fmt s = - if C.Message_id.Set.is_empty s then Format.fprintf fmt "empty set" + if Message_id.Set.is_empty s then Format.fprintf fmt "empty set" else - s |> C.Message_id.Set.to_seq |> List.of_seq + s |> Message_id.Set.to_seq |> List.of_seq |> Format.fprintf fmt "%a" diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index cf86daa8fe9a..c98fa93a7309 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -29,6 +29,10 @@ open Test_gossipsub_shared open Gossipsub_intf open Tezt open Tezt_core.Base +module Peer = C.Subconfig.Peer +module Topic = C.Subconfig.Topic +module Message_id = C.Subconfig.Message_id +module Message = C.Subconfig.Message let assert_output ~__LOC__ actual expected = (* TODO: https://gitlab.com/tezos/tezos/-/issues/5079 @@ -100,10 +104,10 @@ let make_peers ~number = (** [add_and_subscribe_peers topics peers] adds [peers] to the gossipsub connections and subscribes each peer to [topics]. *) -let add_and_subscribe_peers (topics : C.Topic.t list) (peers : C.Peer.t list) - ~(to_subscribe : C.Peer.t * C.Topic.t -> bool) - ?(direct : C.Peer.t -> bool = fun _ -> false) - ?(outbound : C.Peer.t -> bool = fun _ -> false) state = +let add_and_subscribe_peers (topics : Topic.t list) (peers : Peer.t list) + ~(to_subscribe : Peer.t * Topic.t -> bool) + ?(direct : Peer.t -> bool = fun _ -> false) + ?(outbound : Peer.t -> bool = fun _ -> false) state = let subscribe_peer_to_topics peer topics state = List.fold_left (fun state topic -> @@ -126,10 +130,10 @@ let add_and_subscribe_peers (topics : C.Topic.t list) (peers : C.Peer.t list) peers let init_state ~rng ~limits ~parameters ~peers ~topics - ?(to_join : C.Topic.t -> bool = fun _ -> true) - ?(direct : C.Peer.t -> bool = fun _ -> false) - ?(outbound : C.Peer.t -> bool = fun _ -> false) - ~(to_subscribe : C.Peer.t * C.Topic.t -> bool) () = + ?(to_join : Topic.t -> bool = fun _ -> true) + ?(direct : Peer.t -> bool = fun _ -> false) + ?(outbound : Peer.t -> bool = fun _ -> false) + ~(to_subscribe : Peer.t * Topic.t -> bool) () = let state = GS.make rng limits parameters in (* Add and subscribe the given peers. *) let state = @@ -284,7 +288,7 @@ let test_join_adds_peers_to_mesh rng limits parameters = (* re-join - there should be peers associated with the topic *) let state, to_graft = match GS.join {topic} state with - | state, Joining_topic {to_graft} -> (state, C.Peer.Set.elements to_graft) + | state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft) | _, _ -> Test.fail ~__LOC__ "Expected Join to succeed" in (* should have added [degree_optimal] nodes to the mesh *) @@ -363,7 +367,7 @@ let test_join_adds_fanout_to_mesh rng limits parameters = (* Join to topic0 *) let state, to_graft = match GS.join {topic = "topic0"} state with - | state, Joining_topic {to_graft} -> (state, C.Peer.Set.elements to_graft) + | state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft) | _, _ -> Test.fail ~__LOC__ "Expected Join to succeed" in let peers_in_topic = @@ -435,7 +439,7 @@ let test_publish_without_flood_publishing rng limits parameters = in (* Should return [degree_optimal] peers to publish to. *) Check.( - (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + (Peer.Set.cardinal peers_to_publish = limits.degree_optimal) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; @@ -486,7 +490,7 @@ let test_fanout rng limits parameters = assert_fanout_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ; (* Should return [degree_optimal] peers to publish to. *) Check.( - (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + (Peer.Set.cardinal peers_to_publish = limits.degree_optimal) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; @@ -659,7 +663,7 @@ let test_mesh_addition rng limits parameters = (* Heartbeat. *) let _state, Heartbeat {to_graft; _} = GS.heartbeat state in (* There should be two grafting requests to fill the mesh. *) - let grafts = C.Peer.Map.bindings to_graft in + let grafts = Peer.Map.bindings to_graft in Check.((List.length grafts = 2) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; unit @@ -700,7 +704,7 @@ let test_mesh_subtraction rng limits parameters = (* Heartbeat. *) let _state, Heartbeat {to_prune; _} = GS.heartbeat state in (* There should be enough prune requests to bring back the mesh size to [degree_optimal]. *) - let prunes = C.Peer.Map.bindings to_prune in + let prunes = Peer.Map.bindings to_prune in Check.( (List.length prunes = peer_number - limits.degree_optimal) int diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 30a6595b083a..82030a009bc7 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -36,14 +36,14 @@ module Make (C : AUTOMATON_CONFIG) : AUTOMATON with type Time.t = C.Time.t and module Span = C.Span - and module Peer = C.Peer - and module Topic = C.Topic - and module Message_id = C.Message_id - and module Message = C.Message = struct - module Peer = C.Peer - module Topic = C.Topic - module Message_id = C.Message_id - module Message = C.Message + and module Peer = C.Subconfig.Peer + and module Topic = C.Subconfig.Topic + and module Message_id = C.Subconfig.Message_id + and module Message = C.Subconfig.Message = struct + module Peer = C.Subconfig.Peer + module Topic = C.Subconfig.Topic + module Message_id = C.Subconfig.Message_id + module Message = C.Subconfig.Message module Span = C.Span module Time = C.Time @@ -180,8 +180,7 @@ module Make (C : AUTOMATON_CONFIG) : type fanout_peers = {peers : Peer.Set.t; last_published_time : time} - module Message_cache = - Message_cache.Make (Peer) (Topic) (Message_id) (Message) + module Message_cache = Message_cache.Make (C.Subconfig) (* FIXME https://gitlab.com/tezos/tezos/-/issues/4983 diff --git a/src/lib_gossipsub/tezos_gossipsub.mli b/src/lib_gossipsub/tezos_gossipsub.mli index a4e134ae1b02..aa878209026f 100644 --- a/src/lib_gossipsub/tezos_gossipsub.mli +++ b/src/lib_gossipsub/tezos_gossipsub.mli @@ -29,7 +29,7 @@ module Make (C : Gossipsub_intf.AUTOMATON_CONFIG) : Gossipsub_intf.AUTOMATON with type Time.t = C.Time.t and type Span.t = C.Time.span - and module Peer = C.Peer - and module Topic = C.Topic - and module Message_id = C.Message_id - and module Message = C.Message + and module Peer = C.Subconfig.Peer + and module Topic = C.Subconfig.Topic + and module Message_id = C.Subconfig.Message_id + and module Message = C.Subconfig.Message -- GitLab