diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 212677dc3bc872101f822c2bf8d5706db2305955..8c6ef48003eeb9bac3210b5d9901ebedc1196164 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -42,7 +42,7 @@ module type ITERABLE = sig module Map : Map.S with type key = t end -module type AUTOMATON_CONFIG = sig +module type AUTOMATON_SUBCONFIG = sig module Peer : ITERABLE module Topic : ITERABLE @@ -50,6 +50,10 @@ module type AUTOMATON_CONFIG = sig module Message_id : ITERABLE module Message : PRINTABLE +end + +module type AUTOMATON_CONFIG = sig + module Subconfig : AUTOMATON_SUBCONFIG module Span : PRINTABLE @@ -128,6 +132,19 @@ type ('peer, 'message_id, 'span) limits = { attackers from overwhelming our mesh with incoming connections. [degree_out] must be set below {degree_low}, and must not exceed [degree_optimal / 2]. *) + history_length : int; + (** [history_length] controls the size of the message cache used for + gossip. The message cache will remember messages for [history_length] + heartbeats. *) + history_gossip_length : int; + (** [history_gossip_length] controls how many cached message ids the local + peer will advertise in IHAVE gossip messages. When asked for its seen + message ids, the local peer will return only those from the most + recent [history_gossip_length] heartbeats. The slack between + [history_gossip_length] and [history_length] allows the local peer to + avoid advertising messages that will be expired by the time they're + requested. [history_gossip_length] must be less than or equal to + [history_length]. *) } type ('peer, 'message_id) parameters = { @@ -424,14 +441,19 @@ module type AUTOMATON = sig type fanout_peers = {peers : Peer.Set.t; last_published_time : Time.t} - module Memory_cache : sig - type value = {message : message; access : int Peer.Map.t} + module Message_cache : sig + type t + + val create : history_slots:int -> gossip_slots:int -> t + + val add_message : Message_id.t -> Message.t -> Topic.t -> t -> t + + val get_message_for_peer : + Peer.t -> Message_id.t -> t -> (t * Message.t * int) option - type t = {messages : value Message_id.Map.t} + val get_message_ids_to_gossip : Topic.t -> t -> Message_id.t list - (** [get_memory_cache_value message_id state] returns the - cached value for [message_id]. *) - val get_value : Message_id.t -> state -> value option + val shift : t -> t end type view = { @@ -443,7 +465,7 @@ module type AUTOMATON = sig mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } @@ -548,7 +570,7 @@ module type WORKER = sig val shutdown : t -> unit (** [inject state msg_id msg topic] is used to inject a message [msg] with - ID [msg_id] and that belongs to [topic] to the network. *) + id [msg_id] and that belongs to [topic] to the network. *) val inject : t -> GS.Message_id.t -> GS.Message.t -> GS.Topic.t -> unit (** [join t topics] joins [topics] even if the worker is running. *) diff --git a/src/lib_gossipsub/message_cache.ml b/src/lib_gossipsub/message_cache.ml new file mode 100644 index 0000000000000000000000000000000000000000..ce2436f9f12be7aa0d5fe0980bbd9d75f9fb1506 --- /dev/null +++ b/src/lib_gossipsub/message_cache.ml @@ -0,0 +1,169 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module Make (C : Gossipsub_intf.AUTOMATON_SUBCONFIG) = struct + module Peer = C.Peer + module Topic = C.Topic + module Message_id = C.Message_id + module Message = C.Message + + type message_with_counters = { + message : Message.t; + store_counter : int; + (* This field is used to count how many times the message has been added to + the cache. Normally the caller ensures that the same messages (that is, + with the same id) is not insert twice; however, this module does not make + this assumption. *) + access_counters : int Peer.Map.t; + } + + (* A slot is just an index, normally a heartbeat tick, and this is why the + same type, namely [Int64], is used. Note that a bigger slots is a more + recent slot. *) + module SlotMap = Map.Make (Int64) + + type slot_entry = Message_id.t list Topic.Map.t + + type t = { + history_slots : int; + gossip_slots : int; + messages : message_with_counters Message_id.Map.t; + cache : slot_entry SlotMap.t; + (** The cache without the entry for the last slot, which is stored + separately, see next field. *) + last_slot_entry : slot_entry; + last_slot : Int64.t; + } + + let create ~history_slots ~gossip_slots = + assert (gossip_slots > 0) ; + assert (gossip_slots <= history_slots) ; + { + history_slots; + gossip_slots; + messages = Message_id.Map.empty; + cache = SlotMap.empty; + last_slot_entry = Topic.Map.empty; + last_slot = 0L; + } + + let add_message message_id message topic t = + let last_slot_entry = + Topic.Map.update + topic + (function + | None -> Some [message_id] + | Some message_ids -> Some (message_id :: message_ids)) + t.last_slot_entry + in + let messages = + Message_id.Map.update + message_id + (function + | None -> + Some + {message; store_counter = 1; access_counters = Peer.Map.empty} + | Some {message; store_counter; access_counters} -> + Some {message; store_counter = store_counter + 1; access_counters}) + t.messages + in + {t with messages; last_slot_entry} + + let get_message_for_peer peer message_id t = + match Message_id.Map.find message_id t.messages with + | None -> None + | Some {message; store_counter; access_counters} -> + let counter = ref 1 in + let access_counters = + Peer.Map.update + peer + (function + | None -> Some 1 + | Some c -> + counter := c + 1 ; + Some !counter) + access_counters + in + let t = + { + t with + messages = + Message_id.Map.add + message_id + {message; store_counter; access_counters} + t.messages; + } + in + Some (t, message, !counter) + + let get_message_ids_to_gossip topic t = + SlotMap.to_rev_seq t.cache + |> Seq.take ~when_negative_length:() (t.gossip_slots - 1) + |> WithExceptions.Result.get_ok ~loc:__LOC__ + |> Seq.cons (t.last_slot, t.last_slot_entry) + |> Seq.fold_left + (fun acc_message_ids (_slot, entries) -> + match Topic.Map.find topic entries with + | None -> acc_message_ids + | Some message_ids -> List.rev_append message_ids acc_message_ids) + [] + + let shift t = + let drop_old_messages oldest_entries = + Topic.Map.fold + (fun _topic message_ids messages -> + List.fold_left + (fun messages message_id -> + Message_id.Map.update + message_id + (function + | None -> None + | Some {message; store_counter; access_counters} -> + if store_counter = 1 then None + else + Some + { + message; + store_counter = store_counter - 1; + access_counters; + }) + messages) + messages + message_ids) + oldest_entries + t.messages + in + let cache = SlotMap.add t.last_slot t.last_slot_entry t.cache in + let last_slot = Int64.succ t.last_slot in + let cache, messages = + match SlotMap.min_binding cache with + | None -> (* impossible *) (cache, t.messages) + | Some (first_slot, entries) -> + if Int64.(sub last_slot first_slot < of_int t.history_slots) then + (cache, t.messages) + else (SlotMap.remove first_slot cache, drop_old_messages entries) + in + {t with cache; messages; last_slot; last_slot_entry = Topic.Map.empty} +end diff --git a/src/lib_gossipsub/message_cache.mli b/src/lib_gossipsub/message_cache.mli new file mode 100644 index 0000000000000000000000000000000000000000..a1c03094269f13669acccc465d29d56c53b8ff8e --- /dev/null +++ b/src/lib_gossipsub/message_cache.mli @@ -0,0 +1,74 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** A sliding window cache of published messages. The module also keeps track of + the number of accesses to a message by a peer, thus indirectly tracking the + number of IWANT requests a peer makes for the same message. + + The module assumes that no two messages have the same message id. However, + the cache stores duplicates; for instance, if [add_message id msg topic] is + called twice, then [msg] will appear (at least) twice in + [get_message_ids_to_gossip]'s result (assuming not more than [gossip_slots] + shifts have been executed in the meanwhile). +*) +module Make (C : Gossipsub_intf.AUTOMATON_SUBCONFIG) : sig + type t + + (** [create ~history_slots ~gossip_slots] creates a sliding window cache of + length [history_slots]. + + When queried for messages to advertise, the cache only returns messages in + the last [gossip_slots]. The [gossip_slots] must be smaller or equal to + [history_slots]. The slack between [gossip_slots] and [history_slots] + accounts for the reaction time between when a message is advertised via + IHAVE gossip, and when the peer pulls it via an IWANT command. + + @raise Assert_failure when [gossip_slots <= 0 || gossip_slots > history_slots] + + TODO: https://gitlab.com/tezos/tezos/-/issues/5129 + Error handling. *) + val create : history_slots:int -> gossip_slots:int -> t + + (** Add message to the most recent cache slot. If the message already exists + in the cache, the message is not overridden, instead a duplicate is + stored. *) + val add_message : C.Message_id.t -> C.Message.t -> C.Topic.t -> t -> t + + (** Get the message associated to the given message id, increase the access + counter for the peer requesting the message, and also return the updated + counter. *) + val get_message_for_peer : + C.Peer.t -> C.Message_id.t -> t -> (t * C.Message.t * int) option + + (** Get the message ids for the given topic in the last [gossip_slots] slots + of the cache. If there were duplicates added in the cache, then there will + be duplicates in the output. There is no guarantee about the order of + messages in the output. *) + val get_message_ids_to_gossip : C.Topic.t -> t -> C.Message_id.t list + + (** Shift the sliding window by one slot (usually corresponding to one + heartbeat tick). *) + val shift : t -> t +end diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml index a7b341b553b95489fc03803a0d80f3b50892a8cb..aeddc9ce04bd1c92a569ff34e2f304274768e0b6 100644 --- a/src/lib_gossipsub/test/test_gossipsub.ml +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -45,6 +45,8 @@ let default_limits = degree_high = 12; degree_score = 4; degree_out = 2; + history_length = 5; + history_gossip_length = 3; } let parameters = {peer_filter = (fun _peer _action -> true)} diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index f951ed4b02753c21a462187803d072d699898876..08b9c3014c5cda5fa952ba2ceec05ac05cf9781e 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -41,10 +41,10 @@ module Automaton_config : AUTOMATON_CONFIG with type Time.t = int and type Span.t = int - and type Peer.t = int - and type Topic.t = string - and type Message_id.t = int - and type Message.t = string = struct + and type Subconfig.Peer.t = int + and type Subconfig.Topic.t = string + and type Subconfig.Message_id.t = int + and type Subconfig.Message.t = string = struct module Span = struct type t = int @@ -85,10 +85,12 @@ module Automaton_config : module Set = Set.Make (String) end - module Peer = Int_iterable - module Topic = String_iterable - module Message_id = Int_iterable - module Message = String_iterable + module Subconfig = struct + module Peer = Int_iterable + module Topic = String_iterable + module Message_id = Int_iterable + module Message = String_iterable + end end module C = Automaton_config @@ -112,6 +114,8 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_high; degree_score; degree_out; + history_length; + history_gossip_length; } = l in @@ -133,7 +137,9 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_low = %d;@;\ degree_high = %d;@;\ degree_score = %d;@;\ - degree_out = %d }@]" + degree_out = %d;@;\ + history_length = %d;@;\ + history_gossip_length = %d }@]" max_recv_ihave_per_heartbeat max_sent_iwant_per_heartbeat degree_optimal @@ -156,3 +162,5 @@ let pp_limits fmtr (l : (GS.Peer.t, GS.Message_id.t, GS.span) limits) = degree_high degree_score degree_out + history_length + history_gossip_length diff --git a/src/lib_gossipsub/test/test_pbt.ml b/src/lib_gossipsub/test/test_pbt.ml index 513a1d24586960458f4e3b5f944d5f40ddc5afd9..5154bae52b23638824188f5212a241c244f8bdbc 100644 --- a/src/lib_gossipsub/test/test_pbt.ml +++ b/src/lib_gossipsub/test/test_pbt.ml @@ -28,6 +28,10 @@ open Test_gossipsub_shared open Gossipsub_intf open Tezt_core.Base +module Peer = C.Subconfig.Peer +module Topic = C.Subconfig.Topic +module Message_id = C.Subconfig.Message_id +module Message = C.Subconfig.Message module Basic_fragments = struct open Gossipsub_pbt_generators @@ -59,6 +63,292 @@ module Basic_fragments = struct let heartbeat : t = of_list [Heartbeat] end +module Test_message_cache = struct + module L = GS.Introspection.Message_cache + + module R = struct + module M = Map.Make (Int) + + type t = { + mutable ticks : int; + cache : Message.t Message_id.Map.t Topic.Map.t M.t; + history_slots : int; + gossip_slots : int; + } + + let create ~history_slots ~gossip_slots = + assert (gossip_slots > 0) ; + assert (gossip_slots <= history_slots) ; + {ticks = 0; cache = M.empty; history_slots; gossip_slots} + + let add_message message_id message topic t = + { + t with + cache = + M.update + t.ticks + (function + | None -> + Topic.Map.singleton + topic + (Message_id.Map.singleton message_id message) + |> Option.some + | Some map -> + Topic.Map.update + topic + (function + | None -> + Message_id.Map.singleton message_id message + |> Option.some + | Some topic_map -> + Message_id.Map.add message_id message topic_map + |> Option.some) + map + |> Option.some) + t.cache; + } + + let get_message_for_peer _peer message_id t = + let found = ref None in + for x = max 0 (t.ticks - t.history_slots + 1) to t.ticks do + match M.find x t.cache with + | None -> () + | Some topic_map -> + Topic.Map.iter + (fun _topic map -> + match Message_id.Map.find message_id map with + | None -> () + | Some message -> found := Some message) + topic_map + done ; + let r = !found in + Option.map (fun message -> (t, message)) r + + let get_message_ids_to_gossip topic t = + let found = ref Message_id.Set.empty in + for x = max 0 (t.ticks - t.gossip_slots + 1) to t.ticks do + match M.find x t.cache with + | None -> () + | Some topic_map -> ( + match Topic.Map.find topic topic_map with + | None -> () + | Some message_map -> + let set = + message_map |> Message_id.Map.to_seq |> Seq.map fst + |> Message_id.Set.of_seq + in + found := Message_id.Set.union !found set) + done ; + !found + + let shift t = + t.ticks <- t.ticks + 1 ; + t + end + + (* If those numbers are too large, we will miss scenarios with collisions. *) + let history_slots = QCheck2.Gen.int_range (-1) 10 + + let gossip_slots = history_slots + + let message_id = QCheck2.Gen.int_range 0 10 + + (* The data-structure assumes that the id identifies uniquely the + message. To ease the readibility of the test we consider a + messsage constant. *) + let message = QCheck2.Gen.return "m" + + let peer = QCheck2.Gen.return 0 + + let topic = + let open QCheck2.Gen in + let* chars = + QCheck2.Gen.list_size + (QCheck2.Gen.int_range 1 2) + (QCheck2.Gen.char_range 'a' 'c') + in + return (String.of_seq (List.to_seq chars)) + + type action = + | Add_message of {message_id : int; message : string; topic : string} + | Get_message_for_peer of {peer : int; message_id : int} + | Get_message_ids_to_gossip of {topic : string} + | Shift + + let pp_action fmt = function + | Add_message {message_id; message = _; topic} -> + Format.fprintf fmt "ADD_MESSAGE {id:%d;topic:%s}" message_id topic + | Get_message_for_peer {peer = _; message_id} -> + Format.fprintf fmt "GET_MESSAGE {id:%d}" message_id + | Get_message_ids_to_gossip {topic} -> + Format.fprintf fmt "GET_FOR_TOPIC {topic:%s}" topic + | Shift -> Format.fprintf fmt "SHIFT" + + let add_message = + let open QCheck2.Gen in + let* message_id in + let* message in + let* topic in + return (Add_message {message_id; message; topic}) + + let get_message_for_peer = + let open QCheck2.Gen in + let* message_id in + let* peer in + return (Get_message_for_peer {peer; message_id}) + + let get_message_ids_to_gossip = + let open QCheck2.Gen in + let* topic in + return (Get_message_ids_to_gossip {topic}) + + let action = + QCheck2.Gen.oneof + [ + add_message; + get_message_for_peer; + get_message_ids_to_gossip; + QCheck2.Gen.return Shift; + ] + + let actions = QCheck2.Gen.(list_size (int_range 1 30) action) + + let rec run (left, right) actions = + let remaining_steps = List.length actions in + match actions with + | [] -> None + | Add_message {message_id; message; topic} :: actions -> + let left = L.add_message message_id message topic left in + let right = R.add_message message_id message topic right in + run (left, right) actions + | Get_message_for_peer {peer; message_id} :: actions -> ( + let left_result = L.get_message_for_peer peer message_id left in + let right_result = R.get_message_for_peer peer message_id right in + match (left_result, right_result) with + | None, None -> run (left, right) actions + | Some (left, _left_message, _left_counter), Some (right, _right_message) + -> + (* By definition of the message generator, messages are equal. *) + run (left, right) actions + | None, Some _ -> + let message = Format.asprintf "Expected: A message. Got: None" in + Some (remaining_steps, message) + | Some _, None -> + let message = + Format.asprintf "Expected: No message. Got: A message" + in + Some (remaining_steps, message)) + | Get_message_ids_to_gossip {topic} :: actions -> + let left_result = L.get_message_ids_to_gossip topic left in + let right_result = R.get_message_ids_to_gossip topic right in + let left_set = Message_id.Set.of_list left_result in + if Message_id.Set.equal left_set right_result then + run (left, right) actions + else + let pp_set fmt s = + if Message_id.Set.is_empty s then Format.fprintf fmt "empty set" + else + s |> Message_id.Set.to_seq |> List.of_seq + |> Format.fprintf + fmt + "%a" + (Format.pp_print_list + ~pp_sep:(fun fmt () -> Format.fprintf fmt " ") + Format.pp_print_int) + in + let message = + Format.asprintf + "Expected: %a@.Got: %a@." + pp_set + right_result + pp_set + left_set + in + Some (remaining_steps, message) + | Shift :: actions -> + let left = L.shift left in + let right = R.shift right in + run (left, right) actions + + let pp fmt trace = + Format.fprintf + fmt + "%a@." + (Format.pp_print_list ~pp_sep:Format.pp_print_newline pp_action) + trace + + let test rng = + Tezt_core.Test.register + ~__FILE__ + ~title:"Gossipsub: check correction of message cache data structure" + ~tags:["gossipsub"; "message_cache"] + @@ fun () -> + let scenario = + let open QCheck2.Gen in + let* history_slots in + let* gossip_slots in + let* actions in + let left = + try L.create ~history_slots ~gossip_slots |> Either.left + with exn -> Either.right exn + in + let right = + try R.create ~history_slots ~gossip_slots |> Either.left + with exn -> Either.right exn + in + match (left, right) with + | Right _, Right _ -> return None + | Left left, Left right -> ( + match run (left, right) actions with + | None -> return None + | Some (remaining_steps, explanation) -> + let n = List.length actions - remaining_steps + 1 in + let actions = List.take_n n actions in + return @@ Some (history_slots, gossip_slots, explanation, actions) + ) + | Right exn, Left _ -> + let explanation = + Format.asprintf + "Initialisation failed unexpectedly: %s" + (Printexc.to_string exn) + in + return @@ Some (history_slots, gossip_slots, explanation, []) + | Left _, Right exn -> + let explanation = + Format.asprintf + "Initialisation succeeded while it should not. Expected to fail \ + with: %s" + (Printexc.to_string exn) + in + return @@ Some (history_slots, gossip_slots, explanation, []) + in + let test = + QCheck2.Test.make ~count:500_000 ~name:"Gossipsub: message cache" scenario + @@ function + | None -> true + | Some (history_slots, gossip_slots, explanation, trace) -> + Tezt.Test.fail + ~__LOC__ + "@[Soundness check failed.@;\ + Limits:@;\ + history_slots: %d@;\ + gossip_slots: %d@;\ + @;\ + Dumping trace:@;\ + @[%a@]@;\ + @;\ + Explanation:@;\ + %s@]" + history_slots + gossip_slots + pp + trace + explanation + in + QCheck2.Test.check_exn ~rand:rng test ; + unit +end + (** Test that removing a peer really removes it from the state *) module Test_remove_peer = struct open Gossipsub_pbt_generators @@ -172,4 +462,6 @@ module Test_remove_peer = struct unit end -let register rng limits parameters = Test_remove_peer.test rng limits parameters +let register rng limits parameters = + Test_remove_peer.test rng limits parameters ; + Test_message_cache.test rng diff --git a/src/lib_gossipsub/test/test_unit.ml b/src/lib_gossipsub/test/test_unit.ml index 0e17603e0b0f219981022aaa9d217141e61fbe5f..c98fa93a73090e69ccfdfa16d6e77ae316c2200d 100644 --- a/src/lib_gossipsub/test/test_unit.ml +++ b/src/lib_gossipsub/test/test_unit.ml @@ -29,6 +29,10 @@ open Test_gossipsub_shared open Gossipsub_intf open Tezt open Tezt_core.Base +module Peer = C.Subconfig.Peer +module Topic = C.Subconfig.Topic +module Message_id = C.Subconfig.Message_id +module Message = C.Subconfig.Message let assert_output ~__LOC__ actual expected = (* TODO: https://gitlab.com/tezos/tezos/-/issues/5079 @@ -55,17 +59,24 @@ let assert_fanout_size ~__LOC__ ~topic ~expected_size state = ~error_msg:"Expected %R, got %L" ~__LOC__) -let assert_in_memory_cache ~__LOC__ message_id ~expected_message state = - match GS.Introspection.Memory_cache.get_value message_id state with +(* Note: a new message cache state is returned when inspecting it, but this + function does not return this updated state! *) +let assert_in_message_cache ~__LOC__ message_id ~peer ~expected_message state = + let view = GS.Introspection.view state in + match + GS.Introspection.Message_cache.get_message_for_peer + peer + message_id + view.message_cache + with | None -> - Test.fail "Expected entry in memory cache for message id %d" message_id - | Some {message; access = _} -> + Test.fail "Expected entry in message cache for message id %d" message_id + | Some (_message_cache_state, message, _access) -> Check.( (message = expected_message) string ~error_msg:"Expected %R, got %L" - ~__LOC__) ; - unit + ~__LOC__) let assert_mesh_inclusion ~__LOC__ ~topic ~peer ~is_included state = let view = GS.Introspection.view state in @@ -93,10 +104,10 @@ let make_peers ~number = (** [add_and_subscribe_peers topics peers] adds [peers] to the gossipsub connections and subscribes each peer to [topics]. *) -let add_and_subscribe_peers (topics : C.Topic.t list) (peers : C.Peer.t list) - ~(to_subscribe : C.Peer.t * C.Topic.t -> bool) - ?(direct : C.Peer.t -> bool = fun _ -> false) - ?(outbound : C.Peer.t -> bool = fun _ -> false) state = +let add_and_subscribe_peers (topics : Topic.t list) (peers : Peer.t list) + ~(to_subscribe : Peer.t * Topic.t -> bool) + ?(direct : Peer.t -> bool = fun _ -> false) + ?(outbound : Peer.t -> bool = fun _ -> false) state = let subscribe_peer_to_topics peer topics state = List.fold_left (fun state topic -> @@ -119,10 +130,10 @@ let add_and_subscribe_peers (topics : C.Topic.t list) (peers : C.Peer.t list) peers let init_state ~rng ~limits ~parameters ~peers ~topics - ?(to_join : C.Topic.t -> bool = fun _ -> true) - ?(direct : C.Peer.t -> bool = fun _ -> false) - ?(outbound : C.Peer.t -> bool = fun _ -> false) - ~(to_subscribe : C.Peer.t * C.Topic.t -> bool) () = + ?(to_join : Topic.t -> bool = fun _ -> true) + ?(direct : Peer.t -> bool = fun _ -> false) + ?(outbound : Peer.t -> bool = fun _ -> false) + ~(to_subscribe : Peer.t * Topic.t -> bool) () = let state = GS.make rng limits parameters in (* Add and subscribe the given peers. *) let state = @@ -277,7 +288,7 @@ let test_join_adds_peers_to_mesh rng limits parameters = (* re-join - there should be peers associated with the topic *) let state, to_graft = match GS.join {topic} state with - | state, Joining_topic {to_graft} -> (state, C.Peer.Set.elements to_graft) + | state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft) | _, _ -> Test.fail ~__LOC__ "Expected Join to succeed" in (* should have added [degree_optimal] nodes to the mesh *) @@ -356,7 +367,7 @@ let test_join_adds_fanout_to_mesh rng limits parameters = (* Join to topic0 *) let state, to_graft = match GS.join {topic = "topic0"} state with - | state, Joining_topic {to_graft} -> (state, C.Peer.Set.elements to_graft) + | state, Joining_topic {to_graft} -> (state, Peer.Set.elements to_graft) | _, _ -> Test.fail ~__LOC__ "Expected Join to succeed" in let peers_in_topic = @@ -397,7 +408,7 @@ let test_join_adds_fanout_to_mesh rng limits parameters = (** Tests that publishing to a subscribed topic: - Returns peers to publish to. - - Inserts message into memory cache. + - Inserts message into message cache. Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L629 *) @@ -428,21 +439,23 @@ let test_publish_without_flood_publishing rng limits parameters = in (* Should return [degree_optimal] peers to publish to. *) Check.( - (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + (Peer.Set.cardinal peers_to_publish = limits.degree_optimal) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; - (* [message_id] should be added to the memory cache. *) - assert_in_memory_cache + (* [message_id] should be added to the message cache. *) + assert_in_message_cache ~__LOC__ message_id + ~peer:(Stdlib.List.hd peers) ~expected_message:publish_data - state + state ; + unit (** Tests that publishing to an unsubscribed topic: - Populate fanout peers. - Return peers to publish to. - - Inserts message into the memory cache. + - Inserts message into the message cache. Ported from: https://github.com/libp2p/rust-libp2p/blob/12b785e94ede1e763dd041a107d3a00d5135a213/protocols/gossipsub/src/behaviour/tests.rs#L715 *) @@ -477,16 +490,18 @@ let test_fanout rng limits parameters = assert_fanout_size ~__LOC__ ~topic ~expected_size:limits.degree_optimal state ; (* Should return [degree_optimal] peers to publish to. *) Check.( - (C.Peer.Set.cardinal peers_to_publish = limits.degree_optimal) + (Peer.Set.cardinal peers_to_publish = limits.degree_optimal) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; - (* [message_id] should be added to the memory cache. *) - assert_in_memory_cache + (* [message_id] should be added to the message cache. *) + assert_in_message_cache ~__LOC__ message_id + ~peer:(Stdlib.List.hd peers) ~expected_message:publish_data - state + state ; + unit (** Tests that a peer is added to our mesh on graft when we are both joined/subscribed to the same topic. @@ -648,7 +663,7 @@ let test_mesh_addition rng limits parameters = (* Heartbeat. *) let _state, Heartbeat {to_graft; _} = GS.heartbeat state in (* There should be two grafting requests to fill the mesh. *) - let grafts = C.Peer.Map.bindings to_graft in + let grafts = Peer.Map.bindings to_graft in Check.((List.length grafts = 2) int ~error_msg:"Expected %R, got %L" ~__LOC__) ; unit @@ -689,7 +704,7 @@ let test_mesh_subtraction rng limits parameters = (* Heartbeat. *) let _state, Heartbeat {to_prune; _} = GS.heartbeat state in (* There should be enough prune requests to bring back the mesh size to [degree_optimal]. *) - let prunes = C.Peer.Map.bindings to_prune in + let prunes = Peer.Map.bindings to_prune in Check.( (List.length prunes = peer_number - limits.degree_optimal) int diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml index 399b1eb6ee0fa66b5817fc6f8314013143c0cece..82030a009bc7bf3d29283c96110c853bd9192888 100644 --- a/src/lib_gossipsub/tezos_gossipsub.ml +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -36,14 +36,14 @@ module Make (C : AUTOMATON_CONFIG) : AUTOMATON with type Time.t = C.Time.t and module Span = C.Span - and module Peer = C.Peer - and module Topic = C.Topic - and module Message_id = C.Message_id - and module Message = C.Message = struct - module Peer = C.Peer - module Topic = C.Topic - module Message_id = C.Message_id - module Message = C.Message + and module Peer = C.Subconfig.Peer + and module Topic = C.Subconfig.Topic + and module Message_id = C.Subconfig.Message_id + and module Message = C.Subconfig.Message = struct + module Peer = C.Subconfig.Peer + module Topic = C.Subconfig.Topic + module Message_id = C.Subconfig.Message_id + module Message = C.Subconfig.Message module Span = C.Span module Time = C.Time @@ -178,41 +178,10 @@ module Make (C : AUTOMATON_CONFIG) : type connections = connection Peer.Map.t - (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 - - This module is incomplete. *) - module Memory_cache = struct - type value = {message : message; access : int Peer.Map.t} - - type t = {messages : value Message_id.Map.t} - - let create () = {messages = Message_id.Map.empty} - - let record_message_access peer message_id t = - match Message_id.Map.find message_id t.messages with - | None -> None - | Some {message; access} -> - let access = - Peer.Map.update - peer - (function None -> Some 1 | Some x -> Some (x + 1)) - access - in - let t = - { - messages = - Message_id.Map.add message_id {message; access} t.messages; - } - in - Some (t, message) - - let add_message message_id message t = - let value = {message; access = Peer.Map.empty} in - {messages = Message_id.Map.add message_id value t.messages} - end - type fanout_peers = {peers : Peer.Set.t; last_published_time : time} + module Message_cache = Message_cache.Make (C.Subconfig) + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4983 This data-structure should be documented. *) @@ -225,7 +194,7 @@ module Make (C : AUTOMATON_CONFIG) : mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } @@ -266,7 +235,9 @@ module Make (C : AUTOMATON_CONFIG) : requirement or delete the todo. *) assert (l.degree_score + l.degree_out <= l.degree_optimal) ; assert (l.degree_out < l.degree_low) ; - assert (l.degree_out <= l.degree_optimal / 2) + assert (l.degree_out <= l.degree_optimal / 2) ; + assert (l.history_gossip_length > 0) ; + assert (l.history_gossip_length <= l.history_length) let make : Random.State.t -> limits -> parameters -> state = fun rng limits parameters -> @@ -280,7 +251,10 @@ module Make (C : AUTOMATON_CONFIG) : mesh = Topic.Map.empty; fanout = Topic.Map.empty; seen_messages = Message_id.Set.empty; - memory_cache = Memory_cache.create (); + message_cache = + Message_cache.create + ~history_slots:limits.history_length + ~gossip_slots:limits.history_gossip_length; rng; heartbeat_ticks = 0L; } @@ -337,7 +311,7 @@ module Make (C : AUTOMATON_CONFIG) : let peer_filter state = state.parameters.peer_filter - let memory_cache state = state.memory_cache + let message_cache state = state.message_cache let rng state = state.rng @@ -395,7 +369,7 @@ module Make (C : AUTOMATON_CONFIG) : let {mesh; _} = state in match Topic.Map.find topic mesh with None -> false | Some _ -> true - let set_memory_cache memory_cache state = ({state with memory_cache}, ()) + let set_message_cache message_cache state = ({state with message_cache}, ()) let get_connections_score connections ~default peer = match Peer.Map.find peer connections with @@ -462,12 +436,16 @@ module Make (C : AUTOMATON_CONFIG) : let state = {state with fanout = Topic.Map.remove topic state.fanout} in (state, ()) - let put_message_in_cache message_id message state = + let put_message_in_cache message_id message topic state = let state = { state with - memory_cache = - Memory_cache.add_message message_id message state.memory_cache; + message_cache = + Message_cache.add_message + message_id + message + topic + state.message_cache; } in (state, ()) @@ -611,31 +589,31 @@ module Make (C : AUTOMATON_CONFIG) : Score check is missing. *) let routed_message_ids = Message_id.Map.empty in - let*! memory_cache in + let*! message_cache in let*! peer_filter in - let memory_cache, routed_message_ids = + let message_cache, routed_message_ids = (* FIXME https://gitlab.com/tezos/tezos/-/issues/5011 A check should ensure that the number of accesses do not exceed some pre-defined limit. *) List.fold_left - (fun (memory_cache, messages) message_id -> - let memory_cache, info = + (fun (message_cache, messages) message_id -> + let message_cache, info = match - Memory_cache.record_message_access peer message_id memory_cache + Message_cache.get_message_for_peer peer message_id message_cache with - | None -> (memory_cache, `Not_found) - | Some (memory_cache, message) -> - ( memory_cache, + | None -> (message_cache, `Not_found) + | Some (message_cache, message, _access_counter) -> + ( message_cache, if peer_filter peer (`IWant message_id) then `Message message else `Ignored ) in - (memory_cache, Message_id.Map.add message_id info messages)) - (memory_cache, routed_message_ids) + (message_cache, Message_id.Map.add message_id info messages)) + (message_cache, routed_message_ids) message_ids in - let* () = set_memory_cache memory_cache in + let* () = set_message_cache message_cache in On_iwant_messages_to_route {routed_message_ids} |> return end @@ -838,7 +816,7 @@ module Make (C : AUTOMATON_CONFIG) : let handle ~sender topic message_id message : [`Publish] output Monad.t = let open Monad.Syntax in - let* () = put_message_in_cache message_id message in + let* () = put_message_in_cache message_id message topic in let*! mesh_opt = find_mesh topic in let* peers = match mesh_opt with @@ -1298,8 +1276,10 @@ module Make (C : AUTOMATON_CONFIG) : Maintain our fanout for topics we are publishing to, but we have not joined. *) - (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 - Advance the message history window *) + (* Advance the message history sliding window. *) + let*! message_cache in + let* () = Message_cache.shift message_cache |> set_message_cache in + Heartbeat {to_graft; to_prune; noPX_peers} |> return end @@ -1474,13 +1454,7 @@ module Make (C : AUTOMATON_CONFIG) : last_published_time : time; } - module Memory_cache = struct - include Memory_cache - - let get_value message_id state = - Message_id.Map.find_opt message_id state.memory_cache.messages - |> Option.map (fun Memory_cache.{message; access} -> {message; access}) - end + module Message_cache = Message_cache type view = state = { limits : limits; @@ -1491,7 +1465,7 @@ module Make (C : AUTOMATON_CONFIG) : mesh : Peer.Set.t Topic.Map.t; fanout : fanout_peers Topic.Map.t; seen_messages : Message_id.Set.t; - memory_cache : Memory_cache.t; + message_cache : Message_cache.t; rng : Random.State.t; heartbeat_ticks : int64; } diff --git a/src/lib_gossipsub/tezos_gossipsub.mli b/src/lib_gossipsub/tezos_gossipsub.mli index a4e134ae1b02017193ea268c58ff3adaebbf7a52..aa878209026f98827606ca5cd78090da307ab86d 100644 --- a/src/lib_gossipsub/tezos_gossipsub.mli +++ b/src/lib_gossipsub/tezos_gossipsub.mli @@ -29,7 +29,7 @@ module Make (C : Gossipsub_intf.AUTOMATON_CONFIG) : Gossipsub_intf.AUTOMATON with type Time.t = C.Time.t and type Span.t = C.Time.span - and module Peer = C.Peer - and module Topic = C.Topic - and module Message_id = C.Message_id - and module Message = C.Message + and module Peer = C.Subconfig.Peer + and module Topic = C.Subconfig.Topic + and module Message_id = C.Subconfig.Message_id + and module Message = C.Subconfig.Message