diff --git a/src/lib_dal_node/gossipsub/gs_interface.ml b/src/lib_dal_node/gossipsub/gs_interface.ml index b150dd904bba5a8ef3b67d317f2818bf79cb06c6..6990c98154fffe6ebeff3c52ccfd0f9178c65162 100644 --- a/src/lib_dal_node/gossipsub/gs_interface.ml +++ b/src/lib_dal_node/gossipsub/gs_interface.ml @@ -105,6 +105,9 @@ module Worker_config : module Monad = Monad module Point = Types.Point + let maybe_reachable_point Types.Peer.{maybe_reachable_point; _} = + maybe_reachable_point + (* TODO: https://gitlab.com/tezos/tezos/-/issues/5596 Use Seq_s instead of Lwt_stream to implement module Stream. *) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index bce3ae0c6206b67f8c41b7562859ae09cf60e1fd..0846dfcf0af04409797c569503d1e2b85473be72 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -175,7 +175,7 @@ let disconnections_handler gs_worker peer_id = | None -> (* Something is off, we should log something probably. *) () | Some (peer, _) -> Worker.(Disconnection {peer} |> p2p_input gs_worker) -let try_connect ?expected_peer_id p2p_layer ~trusted point = +let try_connect ?expected_peer_id gs_worker p2p_layer ~trusted point = let open Lwt_syntax in (* We don't wait for the promise to resolve here, because if the advertised peer is not reachable or is not responding, we might block @@ -194,9 +194,12 @@ let try_connect ?expected_peer_id p2p_layer ~trusted point = Implement a better PX exchange. *) ignore expected_peer_id ; - let* (_ : _ P2p.connection tzresult) = + let* (result : _ P2p.connection tzresult) = P2p.connect ~trusted p2p_layer point in + Result.iter_error + (fun _ -> Worker.set_unreachable_point gs_worker point) + result ; return_unit) (fun exn -> Format.eprintf @@ -256,9 +259,11 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer = try_connect ~trusted ~expected_peer_id:peer_id + gs_worker p2p_layer maybe_reachable_point - | Connect_point {point} -> try_connect p2p_layer point ~trusted:false + | Connect_point {point} -> + try_connect gs_worker p2p_layer point ~trusted:false | Forget _ -> return_unit | Kick {peer} -> P2p.pool p2p_layer diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 76675d82c7d9563e609adb6c543560e0c6edb6bc..ca9a3cf258152841e1dcbd7f4e641f4f5958978f 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1051,6 +1051,8 @@ module type WORKER_CONFIGURATION = sig module Point : ITERABLE + val maybe_reachable_point : GS.Peer.t -> Point.t + (** Abstraction of the IO monad used by the worker. *) module Monad : sig (** The monad type. *) @@ -1198,6 +1200,9 @@ module type WORKER = sig to the worker's input stream. *) val app_input : t -> app_input -> unit + (** [set_unreachable_point state point] declares this point as unreachable. *) + val set_unreachable_point : t -> Point.t -> unit + (** [p2p_input state p2p_input] adds the given P2P input [p2p_input] to the worker's input stream. *) val p2p_input : t -> p2p_input -> unit diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 602c3234da7c3ae3a256e7661aeb44924a2a4570..5ffa48f61d92e041971d446ee9dd77f7c598e93e 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -280,6 +280,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : app_output_stream : app_output Stream.t; events_logging : event -> unit Monad.t; unknown_validity_messages : Bounded_message_map.t; + unreachable_points : int64 Point.Map.t; + (* For each point, stores the next heartbeat tick when we can try to recontact this point again. *) } (** A worker instance is made of its status and state. *) @@ -289,6 +291,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : self : Peer.t; } + let maybe_reachable_point = C.maybe_reachable_point + let state {state; _} = GS.Introspection.view state.gossip_state let emit_app_output state e = Stream.push e state.app_output_stream @@ -594,16 +598,22 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | state, GS.PX peers -> Introspection.update_count_recv_prunes state.stats `Incr ; let peers = - let current_connections = - let GS.Introspection.{connections; _} = - GS.Introspection.(view state.gossip_state) - in - connections + let GS.Introspection. + {connections = current_connections; heartbeat_ticks; _} = + GS.Introspection.(view state.gossip_state) + in + let can_be_contacted peer = + let point = maybe_reachable_point peer in + match Point.Map.find_opt point state.unreachable_points with + | None -> true + | Some next_attempt -> + Int64.compare heartbeat_ticks next_attempt >= 0 in Peer.Set.filter (fun peer -> (not (GS.Introspection.Connections.mem peer current_connections)) - && not (Peer.equal peer self)) + && (not (Peer.equal peer self)) + && can_be_contacted peer) peers in emit_p2p_output @@ -656,6 +666,16 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state (IHave {topic; message_ids}) (Seq.return peer)) ; + let point_can_be_contacted point = + match Point.Map.find_opt point state.unreachable_points with + | None -> true + | Some next_attempt -> + Int64.compare gstate_view.heartbeat_ticks next_attempt >= 0 + in + let peer_can_be_contacted peer = + let point = maybe_reachable_point peer in + point_can_be_contacted point + in (* Once every 15 hearbreat ticks, try to reconnect to trusted peers if they are disconnected. Also try to reconnect to bootstrap points. *) if Int64.(equal (rem gstate_view.heartbeat_ticks 15L) 0L) then ( @@ -669,15 +689,24 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : else Seq.cons trusted_peer seq) state.trusted_peers Seq.empty + |> Seq.filter peer_can_be_contacted |> emit_p2p_output state ~mk_output:(fun trusted_peer -> Connect {peer = trusted_peer; origin = Trusted}) ; let p2p_output_stream = state.p2p_output_stream in let bootstrap_points = - state.bootstrap_points () |> Point.Set.of_list + state.bootstrap_points () + |> List.filter point_can_be_contacted + |> Point.Set.of_list in Point.Set.iter (fun point -> Stream.push (Connect_point {point}) p2p_output_stream) bootstrap_points) ; + let state = + (* We reset the map every 6h. This prevents this map to contain outdated points. *) + if Int64.(equal (rem gstate_view.heartbeat_ticks 21600L) 0L) then + {state with unreachable_points = Point.Map.empty} + else state + in state let update_gossip_state state (gossip_state, output) = @@ -913,6 +942,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : app_output_stream = Stream.empty (); events_logging; unknown_validity_messages = Bounded_message_map.make ~capacity:10_000; + unreachable_points = Point.Map.empty; }; } @@ -927,6 +957,22 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let is_subscribed t topic = GS.Introspection.(has_joined topic (view t.state.gossip_state)) + let set_unreachable_point t point = + let GS.Introspection.{heartbeat_ticks; _} = + GS.Introspection.(view t.state.gossip_state) + in + let unreachable_points = + Point.Map.update + point + (function + | None -> Some (Int64.add 5L heartbeat_ticks) + | Some x -> + Some + (Int64.mul x 2L |> Int64.max 300L |> Int64.add heartbeat_ticks)) + t.state.unreachable_points + in + t.state <- {t.state with unreachable_points} + let pp_list pp_elt = Format.pp_print_list ~pp_sep:(fun fmt () -> Format.fprintf fmt "; ") pp_elt diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index 00a71ff65a0d655d4d4c7ba8c3a8d4002cd7be91..ddb9f11948adc0362cceef90176a62e063c3fa02 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -334,6 +334,9 @@ module Worker_config = struct module GS = GS module Point = Int_iterable + (* This should be modified in the future. *) + let maybe_reachable_point _ = -2 + module Monad = struct type 'a t = 'a Lwt.t