From 149e554e293296b7b5ca27adee65a468d2e1f5ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 10:43:38 +0100 Subject: [PATCH 1/3] DAL/Node: Filter peers that we already know --- src/lib_gossipsub/gossipsub_worker.ml | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index c659644c4d91..7eeb7415e96e 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -589,6 +589,18 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state | state, GS.PX peers -> Introspection.update_count_recv_prunes state.stats `Incr ; + let peers = + let current_connections = + let GS.Introspection.{connections; _} = + GS.Introspection.(view state.gossip_state) + in + connections + in + Peer.Set.filter + (fun peer -> + not (GS.Introspection.Connections.mem peer current_connections)) + peers + in emit_p2p_output state ~mk_output:(fun to_peer -> -- GitLab From 0ab31b3824afa63eeb53ecf4132b1d78213611f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 10:58:42 +0100 Subject: [PATCH 2/3] DAL/Node: The gossipsub worker knows about itself --- src/bin_dal_node/daemon.ml | 11 +++++++++-- src/lib_gossipsub/gossipsub_intf.ml | 1 + src/lib_gossipsub/gossipsub_worker.ml | 11 ++++++++--- src/lib_gossipsub/test/test_integration_worker.ml | 12 ++++++++---- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 9e50d202604d..aadf959e352d 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1207,6 +1207,8 @@ let run ~data_dir ~configuration_override = in return (fun () -> !bootstrap_points) in + let* p2p_config = Transport_layer_parameters.p2p_config config in + let p2p_limits = Transport_layer_parameters.p2p_limits in (* Create and start a GS worker *) let gs_worker = let rng = @@ -1246,11 +1248,18 @@ let run ~data_dir ~configuration_override = } else limits in + let identity = p2p_config.P2p.identity in + let self = + (* What matters is the identity, the reachable point is more like a placeholder here. *) + Types.Peer. + {peer_id = identity.peer_id; maybe_reachable_point = public_addr} + in let gs_worker = Gossipsub.Worker.( make ~bootstrap_points:get_bootstrap_points ~events_logging:Logging.event + ~self rng limits peer_filter_parameters) @@ -1261,8 +1270,6 @@ let run ~data_dir ~configuration_override = let points = get_bootstrap_points () in (* Create a transport (P2P) layer instance. *) let* transport_layer = - let open Transport_layer_parameters in - let* p2p_config = p2p_config config in Gossipsub.Transport_layer.create ~public_addr ~is_bootstrap_peer:(profile = Profile_manager.bootstrap) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 7459a5bf3c3c..76675d82c7d9 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1181,6 +1181,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?bootstrap_points:(unit -> Point.t list) -> + self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> (GS.Peer.t, GS.Message_id.t) parameters -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 7eeb7415e96e..a3ff26c9e3c2 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -283,7 +283,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : } (** A worker instance is made of its status and state. *) - type t = {mutable status : worker_status; mutable state : worker_state} + type t = { + mutable status : worker_status; + mutable state : worker_state; + self : Peer.t; + } let state {state; _} = GS.Introspection.view state.gossip_state @@ -799,7 +803,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Check_unknown_messages -> check_unknown_messages_id state (** A helper function that pushes events in the state *) - let push e {status = _; state} = Stream.push e state.events_stream + let push e {status = _; state; self = _} = Stream.push e state.events_stream let app_input t input = push (App_input input) t @@ -891,8 +895,9 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : event_loop_promise let make ?(events_logging = fun _event -> Monad.return ()) - ?(bootstrap_points = fun () -> []) rng limits parameters = + ?(bootstrap_points = fun () -> []) ~self rng limits parameters = { + self; status = Starting; state = { diff --git a/src/lib_gossipsub/test/test_integration_worker.ml b/src/lib_gossipsub/test/test_integration_worker.ml index 5f06cb5b397d..2ce350e82b54 100644 --- a/src/lib_gossipsub/test/test_integration_worker.ml +++ b/src/lib_gossipsub/test/test_integration_worker.ml @@ -146,7 +146,8 @@ let test_worker_start_and_stop rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let heartbeat_span = limits.heartbeat_interval in let () = Worker.start ["1"; "2"; "3"] worker in let context = @@ -187,7 +188,8 @@ let test_worker_connect_and_graft rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let heartbeat_span = limits.heartbeat_interval in (* 1. The worker joins topic "1" at startup. *) let () = Worker.start [topic] worker in @@ -277,7 +279,8 @@ let test_worker_filter_messages_for_app rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in (* 1. The worker joins topic "1" at startup. *) let () = Worker.start [topic] worker in let heartbeat_span = limits.heartbeat_interval in @@ -365,7 +368,8 @@ let test_worker_join rng limits parameters = ~title:"GS worker: Join topic" ~tags:["gossipsub"; "worker"; "join"] @@ fun () -> - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let () = Worker.start [] worker in let () = Worker.app_input worker (Join "topic") in Check.( -- GitLab From 66e0f2547b8ce7ab164a0ed73bf56185be346f23 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Fri, 13 Dec 2024 11:01:27 +0100 Subject: [PATCH 3/3] DAL/Node: Avoid trying to connect to itself --- src/lib_gossipsub/gossipsub_worker.ml | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index a3ff26c9e3c2..602c3234da7c 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -576,7 +576,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : automaton removes that peer from the given topic's mesh. It also filters the given collection of alternative peers to connect to. The worker then asks the P2P part to connect to those peeers. *) - let handle_prune ~from_peer input_px = + let handle_prune ~self ~from_peer input_px = let forget_all state = emit_p2p_output state @@ -602,7 +602,8 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : in Peer.Set.filter (fun peer -> - not (GS.Introspection.Connections.mem peer current_connections)) + (not (GS.Introspection.Connections.mem peer current_connections)) + && not (Peer.equal peer self)) peers in emit_p2p_output @@ -697,7 +698,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : |> update_gossip_state state |> handle_leave topic (** Handling messages received from the P2P network. *) - let apply_p2p_message ({gossip_state; _} as state) from_peer = function + let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> let receive_message = {GS.sender = from_peer; topic; message_id; message} @@ -731,10 +732,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Prune {topic; px; backoff} -> let prune : GS.prune = {peer = from_peer; topic; px; backoff} in GS.handle_prune prune gossip_state - |> update_gossip_state state |> handle_prune ~from_peer px + |> update_gossip_state state + |> handle_prune ~self ~from_peer px (** Handling events received from P2P layer. *) - let apply_p2p_event ({gossip_state; _} as state) = function + let apply_p2p_event ~self ({gossip_state; _} as state) = function | New_connection {peer; direct; trusted; bootstrap} -> GS.add_peer {direct; outbound = trusted; peer; bootstrap} gossip_state |> update_gossip_state state @@ -743,7 +745,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.remove_peer {peer} gossip_state |> update_gossip_state state |> handle_disconnection peer | In_message {from_peer; p2p_message} -> - apply_p2p_message state from_peer p2p_message + apply_p2p_message ~self state from_peer p2p_message let rec check_unknown_messages_id state = match Bounded_message_map.remove_min state.unknown_validity_messages with @@ -787,7 +789,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (** This is the main function of the worker. It interacts with the Gossipsub automaton given an event. The function possibly sends messages to the P2P and application layers and returns the new worker's state. *) - let apply_event ({gossip_state; _} as state) = function + let apply_event ~self ({gossip_state; _} as state) = function (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5326 Notify the GS worker about the status of messages sent to peers. *) @@ -798,7 +800,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : would be handled (e.g. because the first one is late)? *) GS.heartbeat gossip_state |> update_gossip_state state |> handle_heartheat - | P2P_input event -> apply_p2p_event state event + | P2P_input event -> apply_p2p_event ~self state event | App_input event -> apply_app_event state event | Check_unknown_messages -> check_unknown_messages_id state @@ -847,7 +849,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : if !shutdown then return () else let* () = events_logging event in - t.state <- apply_event t.state event ; + t.state <- apply_event ~self:t.self t.state event ; loop t in let promise = loop t in -- GitLab