diff --git a/.gitlab/ci/jobs/packaging/opam_package.yml b/.gitlab/ci/jobs/packaging/opam_package.yml index f7f00ab62324731378050683af407d9e47468202..4648dcf53981d3aa8d80262cadd4ad37e8f332bc 100644 --- a/.gitlab/ci/jobs/packaging/opam_package.yml +++ b/.gitlab/ci/jobs/packaging/opam_package.yml @@ -840,6 +840,10 @@ opam:tezos-event-logging-test-helpers: # Ignoring unreleased package tezos-expect-helper. +# Ignoring unreleased package tezos-gossipsub. + +# Ignoring unreleased package tezos-gossipsub-test. + opam:tezos-hacl: extends: - .opam_template diff --git a/dune-project b/dune-project index 6baad27fb6fddfba39ea07add7fc4ce47a9a34e8..66096be0d874fada87271a90f8d2780fc37ef5cd 100644 --- a/dune-project +++ b/dune-project @@ -116,6 +116,8 @@ (package (name tezos-event-logging)) (package (name tezos-event-logging-test-helpers)) (package (name tezos-expect-helper)) +(package (name tezos-gossipsub)) +(package (name tezos-gossipsub-test)(allow_empty)) (package (name tezos-hacl)) (package (name tezos-injector-015-PtLimaPt)) (package (name tezos-layer2-store)(allow_empty)) diff --git a/manifest/main.ml b/manifest/main.ml index a12df2f9b85078fa940de8eef0988a499a32cf4c..3801a206e8438d42814e239ad40eed049aa37a60 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -1800,6 +1800,37 @@ let _octez_p2p_tests = ]; ]) +let octez_gossipsub = + public_lib + "tezos-gossipsub" + ~path:"src/lib_gossipsub" + ~synopsis:"Tezos: An implementation of gossipsub" + ~deps: + [ + ringo; + aches; + octez_error_monad |> open_ |> open_ ~m:"TzLwtreslib"; + octez_base |> open_ ~m:"TzPervasives"; + octez_base_unix |> open_; + octez_stdlib_unix |> open_; + octez_stdlib |> open_; + octez_version; + ] + +let _octez_gossipsub_test = + test + "test_gossipsub" + ~path:"src/lib_gossipsub/test" + ~opam:"tezos-gossipsub-test" + ~synopsis:"Tests for the gossipsub algorithm" + ~deps: + [ + octez_base |> open_ ~m:"TzPervasives"; + octez_base_unix; + octez_gossipsub |> open_; + tezt_lib; + ] + let octez_wasmer = public_lib "tezos-wasmer" diff --git a/opam/tezos-gossipsub-test.opam b/opam/tezos-gossipsub-test.opam new file mode 100644 index 0000000000000000000000000000000000000000..56740689cc28bb1e704613431fab4af7c8e630d4 --- /dev/null +++ b/opam/tezos-gossipsub-test.opam @@ -0,0 +1,22 @@ +# This file was automatically generated, do not edit. +# Edit file manifest/main.ml instead. +opam-version: "2.0" +maintainer: "contact@tezos.com" +authors: ["Tezos devteam"] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "git+https://gitlab.com/tezos/tezos.git" +license: "MIT" +depends: [ + "dune" { >= "3.0" } + "ocaml" { >= "4.14" } + "tezos-base" {with-test} + "tezos-gossipsub" {with-test} + "tezt" { with-test & >= "3.0.0" } +] +build: [ + ["rm" "-r" "vendors"] + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Tests for the gossipsub algorithm" diff --git a/opam/tezos-gossipsub.opam b/opam/tezos-gossipsub.opam new file mode 100644 index 0000000000000000000000000000000000000000..1df172ce587dd3e9c0243dfee4bca27d0a0061a7 --- /dev/null +++ b/opam/tezos-gossipsub.opam @@ -0,0 +1,26 @@ +# This file was automatically generated, do not edit. +# Edit file manifest/main.ml instead. +opam-version: "2.0" +maintainer: "contact@tezos.com" +authors: ["Tezos devteam"] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "git+https://gitlab.com/tezos/tezos.git" +license: "MIT" +depends: [ + "dune" { >= "3.0" } + "ocaml" { >= "4.14" } + "ringo" { >= "1.0.0" } + "aches" { >= "1.0.0" } + "tezos-error-monad" + "tezos-base" + "tezos-stdlib-unix" + "tezos-stdlib" + "tezos-version" +] +build: [ + ["rm" "-r" "vendors"] + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Tezos: An implementation of gossipsub" diff --git a/src/lib_gossipsub/dune b/src/lib_gossipsub/dune new file mode 100644 index 0000000000000000000000000000000000000000..d3c27ba1934f770cdcc1d3e27d65dd75d51e161c --- /dev/null +++ b/src/lib_gossipsub/dune @@ -0,0 +1,24 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(library + (name tezos_gossipsub) + (public_name tezos-gossipsub) + (instrumentation (backend bisect_ppx)) + (libraries + ringo + aches + tezos-error-monad + tezos-base + tezos-base.unix + tezos-stdlib-unix + tezos-stdlib + tezos-version) + (flags + (:standard) + -open Tezos_error_monad + -open Tezos_error_monad.TzLwtreslib + -open Tezos_base.TzPervasives + -open Tezos_base_unix + -open Tezos_stdlib_unix + -open Tezos_stdlib)) diff --git a/src/lib_gossipsub/test/dune b/src/lib_gossipsub/test/dune new file mode 100644 index 0000000000000000000000000000000000000000..3999bff50ea5a251b709efc406b16c61b5a4c86e --- /dev/null +++ b/src/lib_gossipsub/test/dune @@ -0,0 +1,19 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(executable + (name test_gossipsub) + (libraries + tezos-base + tezos-base.unix + tezos-gossipsub + tezt) + (flags + (:standard) + -open Tezos_base.TzPervasives + -open Tezos_gossipsub)) + +(rule + (alias runtest) + (package tezos-gossipsub-test) + (action (run %{dep:./test_gossipsub.exe}))) diff --git a/src/lib_gossipsub/test/test_gossipsub.ml b/src/lib_gossipsub/test/test_gossipsub.ml new file mode 100644 index 0000000000000000000000000000000000000000..abdbbab8ad25a28495224237a749a950c08a5b3d --- /dev/null +++ b/src/lib_gossipsub/test/test_gossipsub.ml @@ -0,0 +1,103 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +include Tezos_gossipsub + +module Configuration : + CONFIGURATION + with type Time.t = int + and type Time.span = int + and type Peer.t = int + and type Topic.t = int + and type Message_id.t = int + and type Message.t = int = struct + module Time = struct + type span = int + + let now = + let cpt = ref (-1) in + fun () -> + incr cpt ; + !cpt + + let add = ( + ) + + include Compare.Int + end + + module Peer = struct + type t = int + + module Map = Map.Make (Int) + module Set = Set.Make (Int) + end + + module Topic = Peer + module Message_id = Peer + module Message = Peer +end + +module GS = Make (Configuration) + +let limits = + { + max_recv_ihave_per_heartbeat = 0; + max_sent_iwant_per_heartbeat = 0; + expected_peers_per_topic = 0; + gossip_publish_threshold = 0.; + accept_px_threshold = 0.; + unsuscribe_backoff = 0; + graft_flood_backoff = 0; + prune_backoff = 0; + retain_duration = 0; + } + +let parameters = {peer_filter = (fun _peer _action -> true)} + +(* This is to use a seed with Tezt. *) +let _seed = + match + Tezt_core.Cli.get + ~default:None + (fun x -> + try int_of_string x |> Option.some |> Option.some + with _ -> Option.none) + "seed" + with + | None -> + Random.self_init () ; + Random.bits () + | Some seed -> seed + +(* This dummy test can be removed once the first unit tests are + registered. *) +let () = + Tezt_core.Test.register ~__FILE__ ~title:"simple" ~tags:["simple"] + @@ fun () -> + let rng = Random.get_state () in + let _state = GS.make rng limits parameters in + Tezt_core.Base.unit + +let () = Tezt.Test.run () diff --git a/src/lib_gossipsub/tezos_gossipsub.ml b/src/lib_gossipsub/tezos_gossipsub.ml new file mode 100644 index 0000000000000000000000000000000000000000..5349df143dcae6c6bcc84869f989b62b9faa5fa4 --- /dev/null +++ b/src/lib_gossipsub/tezos_gossipsub.ml @@ -0,0 +1,1020 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* FIXME https://gitlab.com/tezos/tezos/-/issues/4966 + + We should decide whether we want to implement a tracing mechanism + as in the go implementation. +*) + +module type ITERABLE = sig + type t + + module Set : Set.S with type elt = t + + module Map : Map.S with type key = t +end + +module type CONFIGURATION = sig + module Peer : ITERABLE + + module Topic : ITERABLE + + module Message_id : ITERABLE + + module Message : sig + type t + end + + module Time : sig + include Compare.S + + type span + + val now : unit -> t + + val add : t -> span -> t + end +end + +type ('peer, 'message_id, 'span) limits = { + max_recv_ihave_per_heartbeat : int; + max_sent_iwant_per_heartbeat : int; + expected_peers_per_topic : int; + gossip_publish_threshold : float; + accept_px_threshold : float; + unsuscribe_backoff : 'span; + graft_flood_backoff : 'span; + (* WRT to go implementation, the value of this constant is + actually [graft_flood_threshold - prune_backoff] *) + prune_backoff : 'span; + retain_duration : 'span; +} + +type ('peer, 'message_id) parameters = { + peer_filter : + 'peer -> [`IHave of 'message_id | `IWant of 'message_id | `Graft] -> bool; +} + +(** This module allows to compute a score for each peers. *) +module Score : sig + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4967 + + This is incomplete *) + type t + + val float : t -> float + + val zero : t + + val penality : t -> int -> t +end = struct + type t = {behaviour_penality : int} + + let zero = {behaviour_penality = 0} + + let float {behaviour_penality} = behaviour_penality |> float_of_int + + let penality {behaviour_penality} penality = + {behaviour_penality = behaviour_penality + penality} +end + +module type S = sig + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5012 + + Maybe it would be better if the interface would be more generic + and would look like an automaton? *) + type peer + + type topic + + type message_id + + type message + + type time + + type span + + type state + + type limits := (peer, message_id, span) limits + + type parameters := (peer, message_id) parameters + + val make : Random.State.t -> limits -> parameters -> state + + type 'a output + + type 'a monad := state -> state * 'a output + + val add_peer : direct:bool -> outbound:bool -> peer -> [`Add_peer] monad + + val remove_peer : peer -> [`Remove_peer] monad + + val handle_ihave : peer -> topic -> message_id list -> [`IHave] monad + + val handle_iwant : peer -> message_id list -> [`IWant] monad + + val handle_graft : peer -> topic -> [`Graft] monad + + val handle_prune : + peer -> topic -> px:peer Seq.t -> backoff:span -> [`Prune] monad + + val publish : + sender:peer option -> topic -> message_id -> message -> [`Publish] monad + + val heartbeat : [`Heartbeat] monad + + val join : topic -> [`Join] monad + + val leave : topic -> [`Leave] monad +end + +module Make (C : CONFIGURATION) : + S + with type time = C.Time.t + and type span = C.Time.span + and type peer = C.Peer.t + and type topic = C.Topic.t + and type message_id = C.Message_id.t + and type message = C.Message.t = struct + type peer = C.Peer.t + + type topic = C.Topic.t + + type message_id = C.Message_id.t + + type message = C.Message.t + + type time = C.Time.t + + type span = C.Time.span + + type nonrec limits = (peer, message_id, C.Time.span) limits + + type nonrec parameters = (peer, message_id) parameters + + (* FIXME not sure subtyping for output is useful. If it is, it is + probably for few ouputs and could be removed. *) + type _ output = + | Negative_peer_score : Score.t -> [`IHave] output + | Too_many_recv_ihave_messages : {count : int; max : int} -> [`IHave] output + | Too_many_sent_iwant_messages : {count : int; max : int} -> [`IHave] output + | Message_topic_not_tracked : [`IHave] output + | Message_requested_message_ids : message_id list -> [`IHave] output + | On_iwant_messages_to_route : { + peer : peer; + routed_message_ids : + [`Ignored | `Not_found | `Message of message] C.Message_id.Map.t; + } + -> [`IWant] output + | Peer_filtered : [`Graft] output + | Unknown_topic : [`Graft] output + | Peer_already_in_mesh : [`Graft] output + | Grafting_direct_peer : [`Graft] output + | Unexpected_grafting_peer : [`Graft] output + | Grafting_peer_with_negative_score : [`Graft] output + | Grafting_successfully : [`Graft] output + | Peer_backed_off : [`Graft] output + | No_peer_in_mesh : [`Prune] output + | Ignore_PX_score_too_low : Score.t -> [`Prune] output + | No_PX : [`Prune] output + | PX : C.Peer.Set.t -> [`Prune] output + | Publish_message : C.Peer.Set.t -> [`Publish] output + | Already_subscribed : [`Join] output + | Joining_topic : C.Peer.Set.t -> [`Join] output + | Not_subscribed : [`Leave] output + | Leaving_topic : {to_prune : C.Peer.Set.t} -> [`Leave] output + | Peer_added : [`Add_peer] output + | Peer_already_known : [`Add_peer] output + | Removing_peer : [`Remove_peer] output + + type connection = { + topics : C.Topic.Set.t; + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4980 + + When does this field should be updated? *) + direct : bool; + (** A direct connection is a connection to which we forward all the messages. *) + outbound : bool; + (** An outbound connection is a connection we + connected to. *) + backoff : C.Time.t C.Topic.Map.t; + (** The backoff times associated to this peer for each topic *) + score : Score.t; (** The score associated to this peer. *) + expire : C.Time.t option; + (** The expiring time after having being disconnected from this peer. *) + } + + type connections = connection C.Peer.Map.t + + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4982 + + This module is incomplete. *) + module Memory_cache = struct + type value = {message : message; access : int C.Peer.Map.t} + + type t = {messages : value C.Message_id.Map.t} + + let create () = {messages = C.Message_id.Map.empty} + + let record_message_access peer message_id t = + match C.Message_id.Map.find message_id t.messages with + | None -> None + | Some {message; access} -> + let access = + C.Peer.Map.update + peer + (function None -> Some 1 | Some x -> Some (x + 1)) + access + in + let t = + { + messages = + C.Message_id.Map.add message_id {message; access} t.messages; + } + in + Some (t, message) + + let add_message message_id message t = + let value = {message; access = C.Peer.Map.empty} in + {messages = C.Message_id.Map.add message_id value t.messages} + end + + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4983 + + This data-structure should be documented. *) + type state = { + limits : limits; + parameters : parameters; + connections : connections; + ihave_per_heartbeat : int C.Peer.Map.t; + iwant_per_heartbeat : int C.Peer.Map.t; + mesh : C.Peer.Set.t C.Topic.Map.t; + fanout : C.Peer.Set.t C.Topic.Map.t; + last_published_time : C.Time.t C.Topic.Map.t; + seen_messages : C.Message_id.Set.t; + memory_cache : Memory_cache.t; + rng : Random.State.t; + } + (* Invariants: + + - Forall t set, C.Topic.Map.find t mesh = Some set -> C.Peer.Set set <> C.Peer.Set.empty + + - Forall t set, C.Topic.Map.find t fanout = Some set -> C.Peer.Set set <> C.Peer.Set.empty + *) + + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4984 + + Test the those invariants + *) + + module Monad = struct + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5013 + + This module could be put outside of the `Make` functor. + *) + type 'a t = state -> state * 'a + + type 'c check = [`Pass of 'a | `Fail of 'b] t + constraint 'c = < pass : 'a ; fail : 'b > + + let bind m f state = + let state, res = m state in + f res state + + let get p f state = + let res = p state in + f res state + + let return x state = (state, x) + + let check c f = + bind c (function `Pass res -> f res | `Fail output -> return output) + + let return_pass x = `Pass x |> return + + let return_fail x = `Fail x |> return + + module Syntax = struct + let ( let* ) = bind + + let ( let*? ) = check + + let ( let*! ) = get + + let return = return + + let pass = return_pass + + let unit = pass () + + let fail = return_fail + end + end + + let make : Random.State.t -> limits -> parameters -> state = + fun rng limits parameters -> + { + limits; + parameters; + connections = C.Peer.Map.empty; + ihave_per_heartbeat = C.Peer.Map.empty; + iwant_per_heartbeat = C.Peer.Map.empty; + mesh = C.Topic.Map.empty; + fanout = C.Topic.Map.empty; + last_published_time = C.Topic.Map.empty; + seen_messages = C.Message_id.Set.empty; + memory_cache = Memory_cache.create (); + rng; + } + + module Helpers = struct + (* Those projections enable let-punning. *) + let max_recv_ihave_per_heartbeat state = + state.limits.max_recv_ihave_per_heartbeat + + let max_sent_iwant_per_heartbeat state = + state.limits.max_sent_iwant_per_heartbeat + + let expected_peers_per_topic state = state.limits.expected_peers_per_topic + + let gossip_publish_threshold state = state.limits.gossip_publish_threshold + + let accept_px_threshold state = state.limits.accept_px_threshold + + let unsuscribe_backoff state = state.limits.unsuscribe_backoff + + let graft_flood_backoff state = state.limits.graft_flood_backoff + + let retain_duration state = state.limits.retain_duration + + let mesh state = state.mesh + + let fanout state = state.fanout + + let connections state = state.connections + + let seen_messages state = state.seen_messages + + let peer_filter state = state.parameters.peer_filter + + let memory_cache state = state.memory_cache + + let rng state = state.rng + + let update ?(delta = 1) key map = + C.Peer.Map.update + key + (function None -> Some delta | Some n -> Some (n + delta)) + map + + let update_and_get ?(delta = 1) key map = + let res = ref delta in + C.Peer.Map.update + key + (function + | None -> Some delta + | Some n -> + let value = n + delta in + res := value ; + Some value) + map + |> fun x -> (x, !res) + + let update_and_get_ihave_per_heartbeat ?delta key state = + let ihave_per_heartbeat, res = + update_and_get ?delta key state.ihave_per_heartbeat + in + let state = {state with ihave_per_heartbeat} in + (state, res) + + let update_iwant_per_heartbeat ?delta key state = + let iwant_per_heartbeat = update ?delta key state.ihave_per_heartbeat in + let state = {state with iwant_per_heartbeat} in + (state, ()) + + let find ?(default = 0) key map = + match C.Peer.Map.find key map with None -> default | Some n -> n + + let find_iwant_per_heartbeat ?default key state = + find ?default key state.iwant_per_heartbeat + + let set_connections connections state = ({state with connections}, ()) + + let topic_is_tracked topic state = + let {mesh; _} = state in + match C.Topic.Map.find topic mesh with None -> false | Some _ -> true + + let set_memory_cache memory_cache state = ({state with memory_cache}, ()) + + let get_score ~default peer state = + match C.Peer.Map.find peer state.connections with + | None -> default + | Some connection -> connection.score + + let get_peers topic ~filter ~max = + let open Monad.Syntax in + let*! connections in + let*! rng in + C.Peer.Map.to_seq connections + |> Seq.filter_map (fun (peer, connections) -> + let topics = connections.topics in + if filter peer connections && C.Topic.Set.mem topic topics then + Some peer + else None) + |> List.of_seq |> List.shuffle ~rng |> List.take_n max + |> C.Peer.Set.of_list |> return + + let set_mesh_topic topic peers state = + let state = {state with mesh = C.Topic.Map.add topic peers state.mesh} in + (state, ()) + + let set_mesh mesh state = + let state = {state with mesh} in + (state, ()) + + let find_mesh topic state = C.Topic.Map.find topic state.mesh + + let find_fanout topic state = C.Topic.Map.find topic state.fanout + + let set_fanout_topic topic peers state = + if C.Peer.Set.is_empty peers then (state, ()) + else + let state = + {state with fanout = C.Topic.Map.add topic peers state.fanout} + in + (state, ()) + + let set_fanout fanout state = + let state = {state with fanout} in + (state, ()) + + let delete_mesh topic state = + let state = {state with mesh = C.Topic.Map.remove topic state.mesh} in + (state, ()) + + let delete_fanout topic state = + let state = {state with fanout = C.Topic.Map.remove topic state.fanout} in + (state, ()) + + let set_last_published_time topic time state = + let state = + { + state with + last_published_time = + C.Topic.Map.add topic time state.last_published_time; + } + in + (state, ()) + + let delete_last_published_time topic state = + let state = + { + state with + last_published_time = + C.Topic.Map.remove topic state.last_published_time; + } + in + (state, ()) + + let put_message_in_cache message_id message state = + let state = + { + state with + memory_cache = + Memory_cache.add_message message_id message state.memory_cache; + } + in + (state, ()) + + let update_backoff peer topic expire connections = + C.Peer.Map.update + peer + (function + | None -> None + | Some connection -> + let backoff = + C.Topic.Map.update + topic + (function + | None -> Some expire + | Some old_backoff -> + if C.Time.(old_backoff < expire) then Some expire + else Some old_backoff) + connection.backoff + in + Some {connection with backoff}) + connections + + let add_connections_backoff time topic peer connections = + let now = C.Time.now () in + let expire = C.Time.add now time in + update_backoff peer topic expire connections + + let add_backoff time topic peer = + let open Monad.Syntax in + let*! connections in + let connections = add_connections_backoff time topic peer connections in + set_connections connections + + let add_connections_score peer score = + C.Peer.Map.update + peer + (Option.map (fun connection -> {connection with score})) + + let add_score peer score = + let open Monad.Syntax in + let*! connections in + let connections = add_connections_score peer score connections in + set_connections connections + + let _check_peer_score peer = + let open Monad.Syntax in + let*! peer_score = get_score ~default:Score.zero peer in + if Compare.Float.(peer_score |> Score.float < 0.) then + Negative_peer_score peer_score |> fail + else unit + end + + include Helpers + + module IHave = struct + let check_too_many_recv_ihave_message count = + let open Monad.Syntax in + let*! max_recv_ihave_per_heartbeat in + if count > max_recv_ihave_per_heartbeat then + Too_many_recv_ihave_messages {count; max = max_recv_ihave_per_heartbeat} + |> fail + else unit + + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5016 + + This check is not correct if the distant peer uses a different + value for [max_recv_ihave_per_heartbeat] then our value for + [max_sent_iwant_per_heartbeat]. *) + let check_too_many_sent_iwant_message count = + let open Monad.Syntax in + let*! max_sent_iwant_per_heartbeat in + if count > max_sent_iwant_per_heartbeat then + Too_many_sent_iwant_messages {count; max = max_sent_iwant_per_heartbeat} + |> fail + else unit + + let check_topic_tracked topic = + let open Monad.Syntax in + let*! is_topic_tracked = topic_is_tracked topic in + if not is_topic_tracked then Message_topic_not_tracked |> fail else unit + + let check_not_empty iwant_message_ids = + let open Monad.Syntax in + match iwant_message_ids with + | [] -> Message_requested_message_ids [] |> fail + | _ -> unit + + let filter peer message_ids : message_id list Monad.t = + let open Monad.Syntax in + let*! peer_filter in + let*! seen_messages in + let should_handle_message_id message_id : bool = + (not (C.Message_id.Set.mem message_id seen_messages)) + && peer_filter peer (`IHave message_id) + in + List.filter should_handle_message_id message_ids |> return + + let shuffle_and_trunc message_ids ~limit : (int * message_id list) Monad.t = + let open Monad.Syntax in + let*! rng in + let iwant_message_ids_len = List.length message_ids in + (* Do not send more messages than [max_sent_iwant_per_heartbeat] *) + let iwant_ids_to_send_n = min iwant_message_ids_len limit in + let shuffle_iwant_ids = List.shuffle ~rng message_ids in + let requested_message_ids = + List.take_n iwant_ids_to_send_n shuffle_iwant_ids + in + return (iwant_ids_to_send_n, requested_message_ids) + + let handle peer topic message_ids : [`IHave] output Monad.t = + let open Monad.Syntax in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5009 + + Score check is missing. + *) + let* count_ihave_received = update_and_get_ihave_per_heartbeat peer in + let*! count_iwant_sent = find_iwant_per_heartbeat peer in + let*? () = check_too_many_recv_ihave_message count_ihave_received in + let*? () = check_too_many_sent_iwant_message count_iwant_sent in + let*? () = check_topic_tracked topic in + let* iwant_message_ids = filter peer message_ids in + let*? () = check_not_empty iwant_message_ids in + let*! max_sent_iwant_per_heartbeat in + let limit = max_sent_iwant_per_heartbeat - count_iwant_sent in + (* Invariant: limit > 0 *) + let* length, requested_message_ids = + shuffle_and_trunc iwant_message_ids ~limit + in + let* () = update_iwant_per_heartbeat ~delta:length peer in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4966 + + The go implementation traces some of the messages + requested. *) + Message_requested_message_ids requested_message_ids |> return + end + + let handle_ihave : peer -> topic -> message_id list -> [`IHave] output Monad.t + = + IHave.handle + + module IWant = struct + let handle peer message_ids : [`IWant] output Monad.t = + let open Monad.Syntax in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5008 + + Score check is missing. + *) + let routed_message_ids = C.Message_id.Map.empty in + let*! memory_cache in + let*! peer_filter in + let memory_cache, routed_message_ids = + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5011 + + A check should ensure that the number of accesses do not + exceed some pre-defined limit. *) + List.fold_left + (fun (memory_cache, messages) message_id -> + match + Memory_cache.record_message_access peer message_id memory_cache + with + | None -> + ( memory_cache, + C.Message_id.Map.add message_id `Not_found messages ) + | Some (memory_cache, message) -> + let messages = + if peer_filter peer (`IWant message_id) then + C.Message_id.Map.add message_id (`Message message) messages + else C.Message_id.Map.add message_id `Ignored messages + in + (memory_cache, messages)) + (memory_cache, routed_message_ids) + message_ids + in + let* () = set_memory_cache memory_cache in + On_iwant_messages_to_route {peer; routed_message_ids} |> return + end + + let handle_iwant : peer -> message_id list -> [`IWant] output Monad.t = + IWant.handle + + module Graft = struct + let check_filter peer = + let open Monad.Syntax in + let*! peer_filter in + if peer_filter peer `Graft then unit else Peer_filtered |> fail + + let check_topic_known mesh_opt = + let open Monad.Syntax in + match mesh_opt with + | None -> Unknown_topic |> fail + | Some mesh -> pass mesh + + let check_not_in_mesh mesh peer = + let open Monad.Syntax in + if C.Peer.Set.mem peer mesh then Peer_already_in_mesh |> fail else unit + + let check_not_direct peer = + let open Monad.Syntax in + let*! connections in + match C.Peer.Map.find peer connections with + | None -> Unexpected_grafting_peer |> fail + | Some ({direct; _} as connection) -> + if direct then Grafting_direct_peer |> fail else pass connection + + let check_score peer topic {score; _} = + let open Monad.Syntax in + let*! unsuscribe_backoff in + if Score.(score >= zero) then unit + else + let* () = add_backoff unsuscribe_backoff topic peer in + Grafting_peer_with_negative_score |> fail + + let check_backoff peer topic {backoff; score; _} = + let open Monad.Syntax in + let*! unsuscribe_backoff in + match C.Topic.Map.find topic backoff with + | None -> unit + | Some backoff -> + let current = C.Time.now () in + if C.Time.(current >= backoff) then unit + else + let score = Score.penality score 1 in + let*! graft_flood_backoff in + let score = + if C.Time.(current < add backoff graft_flood_backoff) then + Score.penality score 1 + else score + in + let* () = add_backoff unsuscribe_backoff topic peer in + let* () = add_score peer score in + fail Peer_backed_off + + let handle peer topic = + let open Monad.Syntax in + let*? () = check_filter peer in + let*! mesh_opt = find_mesh topic in + let*? mesh = check_topic_known mesh_opt in + let*? () = check_not_in_mesh mesh peer in + let*? connection = check_not_direct peer in + let*? () = check_backoff peer topic connection in + let*? () = check_score peer topic connection in + let* () = set_mesh_topic topic (C.Peer.Set.add peer mesh) in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5007 + + Handle negative score and the size check is missing *) + Grafting_successfully |> return + end + + let handle_graft : peer -> topic -> [`Graft] output Monad.t = Graft.handle + + module Prune = struct + let check_px_score peer = + let open Monad.Syntax in + let*! accept_px_threshold in + let*! score = get_score ~default:Score.zero peer in + if Compare.Float.(score |> Score.float < accept_px_threshold) then + Ignore_PX_score_too_low score |> fail + else unit + + let handle peer topic ~px ~backoff = + let open Monad.Syntax in + let*! mesh_opt = find_mesh topic in + match mesh_opt with + | None -> return No_peer_in_mesh + | Some mesh -> + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5006 + + backoff computation. *) + let mesh = C.Peer.Set.remove peer mesh in + let* () = set_mesh_topic topic mesh in + let* _ = add_backoff backoff topic peer in + let px = C.Peer.Set.of_seq px in + if C.Peer.Set.is_empty px then No_PX |> return + else + let*? () = check_px_score peer in + return (PX px) + end + + let handle_prune : + peer -> + topic -> + px:peer Seq.t -> + backoff:C.Time.span -> + [`Prune] output Monad.t = + Prune.handle + + module Publish = struct + let get_peers_for_unsubscribed_topic topic = + let open Monad.Syntax in + let*! fanout_opt = find_fanout topic in + let now = C.Time.now () in + let* () = set_last_published_time topic now in + let*! gossip_publish_threshold in + let*! expected_peers_per_topic in + match fanout_opt with + | None -> + let filter _peer {direct; score; _} = + (not direct) + && Compare.Float.(score |> Score.float >= gossip_publish_threshold) + in + let* not_direct_peers = + get_peers topic ~filter ~max:expected_peers_per_topic + in + let* () = set_fanout_topic topic not_direct_peers in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5010 + + We could avoid this call by directly having a map + of direct peers in the state. *) + let filter peer ({direct; _} as connection) = + filter peer connection || direct + in + let* peers = get_peers topic ~filter ~max:Int.max_int in + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4988 + + lastpub field *) + return peers + | Some peers -> return peers + + let handle ~sender topic message_id message : [`Publish] output Monad.t = + let open Monad.Syntax in + let* () = put_message_in_cache message_id message in + let*! mesh_opt = find_mesh topic in + let* peers = + match mesh_opt with + | Some peers -> return peers + | None -> get_peers_for_unsubscribed_topic topic + in + let peers = + Option.fold + ~none:peers + ~some:(fun peer -> C.Peer.Set.remove peer peers) + sender + in + Publish_message peers |> return + end + + let publish : + sender:C.Peer.t option -> + topic -> + message_id -> + message -> + [`Publish] output Monad.t = + Publish.handle + + module Join = struct + let check_is_not_subscribed topic : + < fail : [`Join] output ; pass : unit > Monad.check = + let open Monad.Syntax in + let*! mesh in + match C.Topic.Map.find topic mesh with + | None -> unit + | Some _ -> Already_subscribed |> fail + + let get_more_peers topic ~max = + let filter _peer {backoff; score; direct; _} = + not (direct || C.Topic.Map.mem topic backoff || Score.(score < zero)) + in + get_peers topic ~filter ~max + + let init_mesh topic : [`Join] output Monad.t = + let open Monad.Syntax in + let*! expected_peers_per_topic in + let*! connections in + let is_valid peer = + match C.Peer.Map.find peer connections with + | None -> + (* FIXME https://gitlab.com/tezos/tezos/-/issues/5005 + + Not supposed to happen. But maybe it is better to + return a value for defensive programming. *) + false + | Some {backoff; score; _} -> + not (C.Topic.Map.mem topic backoff || Score.(score < zero)) + in + let*! fanout in + let valid_fanout_peers = + match C.Topic.Map.find topic fanout with + | None -> C.Peer.Set.empty + | Some fanout_peers -> C.Peer.Set.filter is_valid fanout_peers + in + let* peers = + (* We prioritize fanout peers to be in the mesh for this + topic. If we need more peers, we look at all our peers + subscribed to this topic. *) + if C.Peer.Set.cardinal valid_fanout_peers >= expected_peers_per_topic + then return valid_fanout_peers + else + let max = + max + 0 + (expected_peers_per_topic - C.Peer.Set.cardinal valid_fanout_peers) + in + let* more_peers = get_more_peers topic ~max in + return (C.Peer.Set.union more_peers valid_fanout_peers) + in + let* () = set_mesh_topic topic peers in + let* () = delete_fanout topic in + let* () = delete_last_published_time topic in + Joining_topic peers |> return + + let handle topic : [`Join] output Monad.t = + let open Monad.Syntax in + let*? () = check_is_not_subscribed topic in + init_mesh topic + end + + let join : topic -> [`Join] output Monad.t = Join.handle + + module Leave = struct + type mesh = C.Peer.Set.t + + let check_already_subscribed topic : + < fail : [`Leave] output ; pass : mesh > Monad.check = + let open Monad.Syntax in + let*! mesh in + match C.Topic.Map.find topic mesh with + | None -> Not_subscribed |> fail + | Some mesh -> pass mesh + + let handle_mesh topic mesh : [`Leave] output Monad.t = + let open Monad.Syntax in + let*! unsuscribe_backoff in + let*! connections in + let connections = + C.Peer.Set.fold + (fun peer connections -> + add_connections_backoff unsuscribe_backoff topic peer connections) + mesh + connections + in + let* () = set_connections connections in + Leaving_topic {to_prune = mesh} |> return + + let handle topic : [`Leave] output Monad.t = + let open Monad.Syntax in + let*? mesh = check_already_subscribed topic in + let* () = delete_mesh topic in + handle_mesh topic mesh + end + + let leave : topic -> [`Leave] output Monad.t = Leave.handle + + module Heartbeat = struct + let handle _ = + (* FIXME https://gitlab.com/tezos/tezos/-/issues/4949 + + Implement this. *) + assert false + end + + let heartbeat : [`Heartbeat] output Monad.t = Heartbeat.handle + + module Add_peer = struct + let handle ~direct ~outbound peer : [`Add_peer] output Monad.t = + let open Monad.Syntax in + let*! connections in + match C.Peer.Map.find peer connections with + | None -> + let connection = + { + direct; + score = Score.zero; + backoff = C.Topic.Map.empty; + topics = C.Topic.Set.empty; + outbound; + expire = None; + } + in + let connections = C.Peer.Map.add peer connection connections in + let* () = set_connections connections in + return Peer_added + | Some _ -> return Peer_already_known + end + + let add_peer : + direct:bool -> outbound:bool -> peer -> [`Add_peer] output Monad.t = + Add_peer.handle + + module Remove_peer = struct + let handle peer : [`Remove_peer] output Monad.t = + let open Monad.Syntax in + let*! mesh in + let mesh = + C.Topic.Map.map (fun peers -> C.Peer.Set.remove peer peers) mesh + in + let* () = set_mesh mesh in + let*! fanout in + let fanout = + C.Topic.Map.map (fun peers -> C.Peer.Set.remove peer peers) fanout + in + let* () = set_fanout fanout in + let*! connections in + let*! retain_duration in + let connections = + C.Peer.Map.update + peer + (function + | None -> None + | Some connection -> + let now = C.Time.now () in + let expire = Some (C.Time.add now retain_duration) in + Some {connection with expire}) + connections + in + let* () = set_connections connections in + Removing_peer |> return + end + + let remove_peer : peer -> [`Remove_peer] output Monad.t = Remove_peer.handle +end diff --git a/src/lib_gossipsub/tezos_gossipsub.mli b/src/lib_gossipsub/tezos_gossipsub.mli new file mode 100644 index 0000000000000000000000000000000000000000..68c172991f1464934cf29c4900d54a7dc93c8916 --- /dev/null +++ b/src/lib_gossipsub/tezos_gossipsub.mli @@ -0,0 +1,181 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module type ITERABLE = sig + type t + + module Set : Set.S with type elt = t + + module Map : Map.S with type key = t +end + +module type CONFIGURATION = sig + module Peer : ITERABLE + + module Topic : ITERABLE + + module Message_id : ITERABLE + + module Message : sig + type t + end + + module Time : sig + include Compare.S + + type span + + val now : unit -> t + + val add : t -> span -> t + end +end + +type ('peer, 'message_id, 'span) limits = { + max_recv_ihave_per_heartbeat : int; + (** The maximum number of control message [IHave] we can receive + from our peers between two heartbeats. *) + max_sent_iwant_per_heartbeat : int; + (** The maximum number of control messages [IWant] we can sent + to our peers between two heartbeats. *) + expected_peers_per_topic : int; + (** The number of expected full connections per topic. *) + gossip_publish_threshold : float; + (** The threshold value (as a score) from which we can publish a + message to our peers. *) + accept_px_threshold : float; + (** The threshold value (as a score) from which we accept peer exchanges. *) + unsuscribe_backoff : 'span; + (** The duration that prevent reconnections after leaving a topic to our full connections. *) + graft_flood_backoff : 'span; + (** The duration added when a peer tries to graft our connection + too soon. *) + prune_backoff : 'span; (** The duration added when we prune a peer. *) + retain_duration : 'span; + (** The duration added to remove metadata + about a disconnected peer. *) +} + +type ('peer, 'message_id) parameters = { + peer_filter : + 'peer -> [`IHave of 'message_id | `IWant of 'message_id | `Graft] -> bool; +} + +module type S = sig + (** Type for peers *) + type peer + + (** Type for topic *) + type topic + + (** Type for message_id *) + type message_id + + (** Type for message *) + type message + + (** Type for time *) + type time + + (** Type for time duration *) + type span + + (** The state managed by the gossipsub automaton. The state is + purely functional. *) + type state + + (** Limits of the gossipsub protocol. *) + type limits := (peer, message_id, span) limits + + (** Parameters of the gossipsub protocol. *) + type parameters := (peer, message_id) parameters + + (** Output produced by one of the actions below. *) + type 'a output + + (** A type alias for the state monad. *) + type 'a monad := state -> state * 'a output + + (** Initialise a state. *) + val make : Random.State.t -> limits -> parameters -> state + + (** [add_peer ~direct ~outbound peer] is called to notify a new + connection. If [direct] is [true], the gossipsub always + forward messages to those peers. [outbound] is [true] if it is + an outbound connection. *) + val add_peer : direct:bool -> outbound:bool -> peer -> [`Add_peer] monad + + (** [remove_peer peer] notifies gossipsub that we are disconnected + from a peer. Do note that the [state] still maintain information + for this connection for [retain_duration] seconds. *) + val remove_peer : peer -> [`Remove_peer] monad + + (** [handle_ihave peer topic message_ids] handles the gossip message + [IHave] emitted by [peer] for [topic] with the [message_ids]. *) + val handle_ihave : peer -> topic -> message_id list -> [`IHave] monad + + (** [handle_iwant peer message_ids] handles the gossip message + [IWant] emitted by [peer] for [topic] with the [message_ids]. *) + val handle_iwant : peer -> message_id list -> [`IWant] monad + + (** [handle_graft peer topic] handles the gossip message [Graft] + emitted by [peer] for [topic]. This action allows to graft a + connection to a full connection allowing the transmission of + full messages for the given topic. *) + val handle_graft : peer -> topic -> [`Graft] monad + + (** [handle_prune peer topic ~px ~backoff] handles the gossip + message [Prune] emitted by [peer] for [topic]. This action + allows to prune a full connection. In that case, the remote peer + can send a list of peers to connect to as well as a backoff + time, which is a duration for which we cannot [Graft] this peer + on this topic. *) + val handle_prune : + peer -> topic -> px:peer Seq.t -> backoff:span -> [`Prune] monad + + (** [publish ~sender topic message_id message] allows to route a + message on the gossip network. If [sender=None], the message + comes from the application layer and we are the sender. *) + val publish : + sender:peer option -> topic -> message_id -> message -> [`Publish] monad + + (** [heartbeat] executes the heartbeat routine of the algorithm. *) + val heartbeat : [`Heartbeat] monad + + (** [join topic] join/subscribe to a new topic. *) + val join : topic -> [`Join] monad + + (** [leave topic] leave/unscribe a topic. *) + val leave : topic -> [`Leave] monad +end + +module Make (C : CONFIGURATION) : + S + with type time = C.Time.t + and type span = C.Time.span + and type peer = C.Peer.t + and type topic = C.Topic.t + and type message_id = C.Message_id.t + and type message = C.Message.t