From adf83ea50dc91482b338a97aa87e0f0c6bbd3a39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 15:17:10 +0100 Subject: [PATCH 1/5] Gossipsub: Enable to set unreachable points --- src/lib_gossipsub/gossipsub_intf.ml | 3 +++ src/lib_gossipsub/gossipsub_worker.ml | 19 +++++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 76675d82c7d9..7e1342fe7ae7 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1198,6 +1198,9 @@ module type WORKER = sig to the worker's input stream. *) val app_input : t -> app_input -> unit + (** [set_unreachable_point state point] declares this point as unreachable. *) + val set_unreachable_point : t -> Point.t -> unit + (** [p2p_input state p2p_input] adds the given P2P input [p2p_input] to the worker's input stream. *) val p2p_input : t -> p2p_input -> unit diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 602c3234da7c..7f952df78737 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -280,6 +280,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : app_output_stream : app_output Stream.t; events_logging : event -> unit Monad.t; unknown_validity_messages : Bounded_message_map.t; + unreachable_points : int64 Point.Map.t; + (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) } (** A worker instance is made of its status and state. *) @@ -913,6 +915,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : app_output_stream = Stream.empty (); events_logging; unknown_validity_messages = Bounded_message_map.make ~capacity:10_000; + unreachable_points = Point.Map.empty; }; } @@ -927,6 +930,22 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let is_subscribed t topic = GS.Introspection.(has_joined topic (view t.state.gossip_state)) + let set_unreachable_point t point = + let GS.Introspection.{heartbeat_ticks; _} = + GS.Introspection.(view t.state.gossip_state) + in + let unreachable_points = + Point.Map.update + point + (function + | None -> Some (Int64.add 5L heartbeat_ticks) + | Some x -> + Some + (Int64.mul x 2L |> Int64.max 300L |> Int64.add heartbeat_ticks)) + t.state.unreachable_points + in + t.state <- {t.state with unreachable_points} + let pp_list pp_elt = Format.pp_print_list ~pp_sep:(fun fmt () -> Format.fprintf fmt "; ") pp_elt -- GitLab From d1e382947cf8d86fe5e3887fadedd1c65907b057 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 15:27:47 +0100 Subject: [PATCH 2/5] DAL/Node: Register unreachable points --- src/lib_dal_node/gossipsub/gs_transport_connection.ml | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index bce3ae0c6206..0846dfcf0af0 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -175,7 +175,7 @@ let disconnections_handler gs_worker peer_id = | None -> (* Something is off, we should log something probably. *) () | Some (peer, _) -> Worker.(Disconnection {peer} |> p2p_input gs_worker) -let try_connect ?expected_peer_id p2p_layer ~trusted point = +let try_connect ?expected_peer_id gs_worker p2p_layer ~trusted point = let open Lwt_syntax in (* We don't wait for the promise to resolve here, because if the advertised peer is not reachable or is not responding, we might block @@ -194,9 +194,12 @@ let try_connect ?expected_peer_id p2p_layer ~trusted point = Implement a better PX exchange. *) ignore expected_peer_id ; - let* (_ : _ P2p.connection tzresult) = + let* (result : _ P2p.connection tzresult) = P2p.connect ~trusted p2p_layer point in + Result.iter_error + (fun _ -> Worker.set_unreachable_point gs_worker point) + result ; return_unit) (fun exn -> Format.eprintf @@ -256,9 +259,11 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer = try_connect ~trusted ~expected_peer_id:peer_id + gs_worker p2p_layer maybe_reachable_point - | Connect_point {point} -> try_connect p2p_layer point ~trusted:false + | Connect_point {point} -> + try_connect gs_worker p2p_layer point ~trusted:false | Forget _ -> return_unit | Kick {peer} -> P2p.pool p2p_layer -- GitLab From d3f3349df453f63e4d6f1cc7dadfb7b3816049b6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 15:37:04 +0100 Subject: [PATCH 3/5] Gossipsub/Worker: Add a `maybe_reachable_point` function --- src/lib_dal_node/gossipsub/gs_interface.ml | 3 +++ src/lib_gossipsub/gossipsub_intf.ml | 2 ++ src/lib_gossipsub/gossipsub_worker.ml | 2 ++ src/lib_gossipsub/test/test_gossipsub_shared.ml | 3 +++ 4 files changed, 10 insertions(+) diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index b150dd904bba..6990c98154ff 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -105,6 +105,9 @@ module Worker_config : module Monad = Monad module Point = Types.Point + let maybe_reachable_point Types.Peer.{maybe_reachable_point; _} = + maybe_reachable_point + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5596 Use Seq_s instead of Lwt_stream to implement module Stream. *) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 7e1342fe7ae7..ca9a3cf25815 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1051,6 +1051,8 @@ module type WORKER_CONFIGURATION = sig module Point : ITERABLE + val maybe_reachable_point : GS.Peer.t -> Point.t + (** Abstraction of the IO monad used by the worker. *) module Monad : sig (** The monad type. *) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 7f952df78737..2ed1f639b7d1 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -291,6 +291,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : self : Peer.t; } + let maybe_reachable_point = C.maybe_reachable_point + let state {state; _} = GS.Introspection.view state.gossip_state let emit_app_output state e = Stream.push e state.app_output_stream diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 00a71ff65a0d..ddb9f11948ad 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -334,6 +334,9 @@ module Worker_config = struct module GS = GS module Point = Int_iterable + (* This should be modified in the future. *) + let maybe_reachable_point _ = -2 + module Monad = struct type 'a t = 'a Lwt.t -- GitLab From 36c5c1794ab694f759a7f07a71658c39a5d1daba Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 15:41:28 +0100 Subject: [PATCH 4/5] Gossipsub/Worker: Memorize unreachable peers --- src/lib_gossipsub/gossipsub_worker.ml | 33 +++++++++++++++++++++------ 1 file changed, 26 insertions(+), 7 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 2ed1f639b7d1..92da341f63d3 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -598,16 +598,22 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | state, GS.PX peers -> Introspection.update_count_recv_prunes state.stats `Incr ; let peers = - let current_connections = - let GS.Introspection.{connections; _} = - GS.Introspection.(view state.gossip_state) - in - connections + let GS.Introspection. + {connections = current_connections; heartbeat_ticks; _} = + GS.Introspection.(view state.gossip_state) + in + let can_be_contacted peer = + let point = maybe_reachable_point peer in + match Point.Map.find_opt point state.unreachable_points with + | None -> true + | Some next_attempt -> + Int64.compare heartbeat_ticks next_attempt >= 0 in Peer.Set.filter (fun peer -> (not (GS.Introspection.Connections.mem peer current_connections)) - && not (Peer.equal peer self)) + && (not (Peer.equal peer self)) + && can_be_contacted peer) peers in emit_p2p_output @@ -660,6 +666,16 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state (IHave {topic; message_ids}) (Seq.return peer)) ; + let point_can_be_contacted point = + match Point.Map.find_opt point state.unreachable_points with + | None -> true + | Some next_attempt -> + Int64.compare gstate_view.heartbeat_ticks next_attempt >= 0 + in + let peer_can_be_contacted peer = + let point = maybe_reachable_point peer in + point_can_be_contacted point + in (* Once every 15 hearbreat ticks, try to reconnect to trusted peers if they are disconnected. Also try to reconnect to bootstrap points. *) if Int64.(equal (rem gstate_view.heartbeat_ticks 15L) 0L) then ( @@ -673,11 +689,14 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : else Seq.cons trusted_peer seq) state.trusted_peers Seq.empty + |> Seq.filter peer_can_be_contacted |> emit_p2p_output state ~mk_output:(fun trusted_peer -> Connect {peer = trusted_peer; origin = Trusted}) ; let p2p_output_stream = state.p2p_output_stream in let bootstrap_points = - state.bootstrap_points () |> Point.Set.of_list + state.bootstrap_points () + |> List.filter point_can_be_contacted + |> Point.Set.of_list in Point.Set.iter (fun point -> Stream.push (Connect_point {point}) p2p_output_stream) -- GitLab From d8791e665902d5487ab5a5a27f339fe1e1b32f47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Mon, 16 Dec 2024 16:16:50 +0100 Subject: [PATCH 5/5] Gossipsub/Worker: Clean up the map every 6h --- src/lib_gossipsub/gossipsub_worker.ml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 92da341f63d3..5ffa48f61d92 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -701,6 +701,12 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Point.Set.iter (fun point -> Stream.push (Connect_point {point}) p2p_output_stream) bootstrap_points) ; + let state = + (* We reset the map every 6h. This prevents this map to contain outdated points. *) + if Int64.(equal (rem gstate_view.heartbeat_ticks 21600L) 0L) then + {state with unreachable_points = Point.Map.empty} + else state + in state let update_gossip_state state (gossip_state, output) = -- GitLab