diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index 9e50d202604dbb8d2d0de024586303eae559569a..aadf959e352d1bfb064812fcb85fdd823db7488f 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1207,6 +1207,8 @@ let run ~data_dir ~configuration_override = in return (fun () -> !bootstrap_points) in + let* p2p_config = Transport_layer_parameters.p2p_config config in + let p2p_limits = Transport_layer_parameters.p2p_limits in (* Create and start a GS worker *) let gs_worker = let rng = @@ -1246,11 +1248,18 @@ let run ~data_dir ~configuration_override = } else limits in + let identity = p2p_config.P2p.identity in + let self = + (* What matters is the identity, the reachable point is more like a placeholder here. *) + Types.Peer. + {peer_id = identity.peer_id; maybe_reachable_point = public_addr} + in let gs_worker = Gossipsub.Worker.( make ~bootstrap_points:get_bootstrap_points ~events_logging:Logging.event + ~self rng limits peer_filter_parameters) @@ -1261,8 +1270,6 @@ let run ~data_dir ~configuration_override = let points = get_bootstrap_points () in (* Create a transport (P2P) layer instance. *) let* transport_layer = - let open Transport_layer_parameters in - let* p2p_config = p2p_config config in Gossipsub.Transport_layer.create ~public_addr ~is_bootstrap_peer:(profile = Profile_manager.bootstrap) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 7459a5bf3c3cccc35e7900caac268ed86e57b797..76675d82c7d9563e609adb6c543560e0c6edb6bc 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -1181,6 +1181,7 @@ module type WORKER = sig val make : ?events_logging:(event -> unit Monad.t) -> ?bootstrap_points:(unit -> Point.t list) -> + self:GS.Peer.t -> Random.State.t -> (GS.Topic.t, GS.Peer.t, GS.Message_id.t, GS.span) limits -> (GS.Peer.t, GS.Message_id.t) parameters -> diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index c659644c4d91b7cf27235a91602a8026cf0a3c82..602c3234da7c3ae3a256e7661aeb44924a2a4570 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -283,7 +283,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : } (** A worker instance is made of its status and state. *) - type t = {mutable status : worker_status; mutable state : worker_state} + type t = { + mutable status : worker_status; + mutable state : worker_state; + self : Peer.t; + } let state {state; _} = GS.Introspection.view state.gossip_state @@ -572,7 +576,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : automaton removes that peer from the given topic's mesh. It also filters the given collection of alternative peers to connect to. The worker then asks the P2P part to connect to those peeers. *) - let handle_prune ~from_peer input_px = + let handle_prune ~self ~from_peer input_px = let forget_all state = emit_p2p_output state @@ -589,6 +593,19 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : state | state, GS.PX peers -> Introspection.update_count_recv_prunes state.stats `Incr ; + let peers = + let current_connections = + let GS.Introspection.{connections; _} = + GS.Introspection.(view state.gossip_state) + in + connections + in + Peer.Set.filter + (fun peer -> + (not (GS.Introspection.Connections.mem peer current_connections)) + && not (Peer.equal peer self)) + peers + in emit_p2p_output state ~mk_output:(fun to_peer -> @@ -681,7 +698,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : |> update_gossip_state state |> handle_leave topic (** Handling messages received from the P2P network. *) - let apply_p2p_message ({gossip_state; _} as state) from_peer = function + let apply_p2p_message ~self ({gossip_state; _} as state) from_peer = function | Message_with_header {message; topic; message_id} -> let receive_message = {GS.sender = from_peer; topic; message_id; message} @@ -715,10 +732,11 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Prune {topic; px; backoff} -> let prune : GS.prune = {peer = from_peer; topic; px; backoff} in GS.handle_prune prune gossip_state - |> update_gossip_state state |> handle_prune ~from_peer px + |> update_gossip_state state + |> handle_prune ~self ~from_peer px (** Handling events received from P2P layer. *) - let apply_p2p_event ({gossip_state; _} as state) = function + let apply_p2p_event ~self ({gossip_state; _} as state) = function | New_connection {peer; direct; trusted; bootstrap} -> GS.add_peer {direct; outbound = trusted; peer; bootstrap} gossip_state |> update_gossip_state state @@ -727,7 +745,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.remove_peer {peer} gossip_state |> update_gossip_state state |> handle_disconnection peer | In_message {from_peer; p2p_message} -> - apply_p2p_message state from_peer p2p_message + apply_p2p_message ~self state from_peer p2p_message let rec check_unknown_messages_id state = match Bounded_message_map.remove_min state.unknown_validity_messages with @@ -771,7 +789,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (** This is the main function of the worker. It interacts with the Gossipsub automaton given an event. The function possibly sends messages to the P2P and application layers and returns the new worker's state. *) - let apply_event ({gossip_state; _} as state) = function + let apply_event ~self ({gossip_state; _} as state) = function (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5326 Notify the GS worker about the status of messages sent to peers. *) @@ -782,12 +800,12 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : would be handled (e.g. because the first one is late)? *) GS.heartbeat gossip_state |> update_gossip_state state |> handle_heartheat - | P2P_input event -> apply_p2p_event state event + | P2P_input event -> apply_p2p_event ~self state event | App_input event -> apply_app_event state event | Check_unknown_messages -> check_unknown_messages_id state (** A helper function that pushes events in the state *) - let push e {status = _; state} = Stream.push e state.events_stream + let push e {status = _; state; self = _} = Stream.push e state.events_stream let app_input t input = push (App_input input) t @@ -831,7 +849,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : if !shutdown then return () else let* () = events_logging event in - t.state <- apply_event t.state event ; + t.state <- apply_event ~self:t.self t.state event ; loop t in let promise = loop t in @@ -879,8 +897,9 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : event_loop_promise let make ?(events_logging = fun _event -> Monad.return ()) - ?(bootstrap_points = fun () -> []) rng limits parameters = + ?(bootstrap_points = fun () -> []) ~self rng limits parameters = { + self; status = Starting; state = { diff --git a/src/lib_gossipsub/test/test_integration_worker.ml b/src/lib_gossipsub/test/test_integration_worker.ml index 5f06cb5b397dbba061e17cfa416f60513f8689c4..2ce350e82b5454c9a0edbd5ca6c1bd944e31c1de 100644 --- a/src/lib_gossipsub/test/test_integration_worker.ml +++ b/src/lib_gossipsub/test/test_integration_worker.ml @@ -146,7 +146,8 @@ let test_worker_start_and_stop rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let heartbeat_span = limits.heartbeat_interval in let () = Worker.start ["1"; "2"; "3"] worker in let context = @@ -187,7 +188,8 @@ let test_worker_connect_and_graft rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let heartbeat_span = limits.heartbeat_interval in (* 1. The worker joins topic "1" at startup. *) let () = Worker.start [topic] worker in @@ -277,7 +279,8 @@ let test_worker_filter_messages_for_app rng limits parameters = the expected outputs. *) let expected_p2p_output = Queue.create () in let expected_app_output = Queue.create () in - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in (* 1. The worker joins topic "1" at startup. *) let () = Worker.start [topic] worker in let heartbeat_span = limits.heartbeat_interval in @@ -365,7 +368,8 @@ let test_worker_join rng limits parameters = ~title:"GS worker: Join topic" ~tags:["gossipsub"; "worker"; "join"] @@ fun () -> - let worker = Worker.make rng limits parameters in + let self = -1 in + let worker = Worker.make ~self rng limits parameters in let () = Worker.start [] worker in let () = Worker.app_input worker (Join "topic") in Check.(