diff --git a/src/lib_dal_node/gossipsub/gossipsub.ml b/src/lib_dal_node/gossipsub/gossipsub.ml index e1693b30a9a82b910644c0a4cedd2d9a9052db1b..5bb7763d3886f6196f703a3b8cf6d36f50735693 100644 --- a/src/lib_dal_node/gossipsub/gossipsub.ml +++ b/src/lib_dal_node/gossipsub/gossipsub.ml @@ -39,7 +39,7 @@ module Transport_layer = struct module Default_parameters = Transport_layer_default_parameters type t = - ( Interface.p2p_message, + ( Gs_interface.Worker_instance.p2p_message, Types.P2P.Metadata.Peer.t, Types.P2P.Metadata.Connection.t ) P2p.t diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 61e50e0c4381c4222edab4e5c8aadcb0ef40ec2b..6625749fde752451538f916c953034ba2e5f8097 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -207,17 +207,23 @@ let event = | P2P_input event -> ( match event with | New_connection {peer; direct; trusted; bootstrap} -> - emit new_connection (peer, direct, trusted, bootstrap) - | Disconnection {peer} -> emit disconnection peer + emit new_connection (peer.peer_id, direct, trusted, bootstrap) + | Disconnection {peer} -> emit disconnection peer.peer_id | In_message {from_peer; p2p_message} -> ( match p2p_message with | Message_with_header {message = _; topic; message_id} -> - emit message_with_header (from_peer, topic, message_id) - | Subscribe {topic} -> emit subscribe (from_peer, topic) - | Unsubscribe {topic} -> emit unsubscribe (from_peer, topic) - | Graft {topic} -> emit graft (from_peer, topic) + emit message_with_header (from_peer.peer_id, topic, message_id) + | Subscribe {topic} -> emit subscribe (from_peer.peer_id, topic) + | Unsubscribe {topic} -> emit unsubscribe (from_peer.peer_id, topic) + | Graft {topic} -> emit graft (from_peer.peer_id, topic) | Prune {topic; px; backoff} -> - emit prune (from_peer, topic, backoff, List.of_seq px) + emit + prune + ( from_peer.peer_id, + topic, + backoff, + List.of_seq px + |> List.map (fun Types.Peer.{peer_id; _} -> peer_id) ) | IHave {topic; message_ids} -> - emit ihave (from_peer, topic, message_ids) - | IWant {message_ids} -> emit iwant (from_peer, message_ids))) + emit ihave (from_peer.peer_id, topic, message_ids) + | IWant {message_ids} -> emit iwant (from_peer.peer_id, message_ids))) diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.ml b/src/lib_dal_node/gossipsub/gs_transport_connection.ml index 3a155715be790d1d8770dc230ee6ea3fad7a50a8..2f4c192bd3fdacab92455b2e11d378845be2f76e 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.ml +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.ml @@ -88,116 +88,22 @@ module Events = struct ("message", Transport_layer_interface.p2p_message_encoding) end -(** This module implements a cache of alternative peers (aka PX peers), that is, - peers we are not (yet) connected to, advertised by P2P neighbors within - Prune messages. - - The cache associates a P2P point to each pair made of the advertised peer - and the peer that advertised it. In case of redundancy, the last advertised - point is kept in the cache. - - The cache is in theory not bounded. However, some invariants on the way - {!insert} and {!drop} are used below ensures that we remove the added entries very - quickly. -*) -module PX_cache : sig - (** The cache data structure for advertised alternative PXs. *) - type t - - type origin = Worker.peer_origin - - (** Create a new cache data structure. The [size] parameter is an indication - on the size of internal table storing data. Its default value is - [2048]. *) - val create : ?size:int -> unit -> t - - (** [insert t ~origin ~px_peer point] associates the given [point] to - [(origin, px_peer)] pair of peers. If a point already exists for the pair, - it is overwritten. *) - val insert : - t -> origin:origin -> px_peer:P2p_peer.Id.t -> P2p_point.Id.t -> unit - - (** [find_opt t ~origin ~px_peer] returns the content associated to the entry - [(origin, px)] in the cache, if any. *) - val find_opt : - t -> origin:origin -> px_peer:P2p_peer.Id.t -> P2p_point.Id.t option - - (** [drop t ~origin ~px_peer] drops the entry [(origin, px_peer)] from the cache. *) - val drop : t -> origin:origin -> px_peer:P2p_peer.Id.t -> unit -end = struct - type origin = Worker.peer_origin - - type key = {origin : origin; px_peer : P2p_peer.Id.t} - - module Table = Hashtbl.Make (struct - type t = key - - let equal {origin; px_peer} k2 = - P2p_peer.Id.equal px_peer k2.px_peer - && - match (origin, k2.origin) with - | PX peer1, PX peer2 -> P2p_peer.Id.equal peer1 peer2 - | Trusted, Trusted -> true - | PX _, _ | Trusted, _ -> false - - let hash {origin; px_peer} = - match origin with - | PX peer -> (P2p_peer.Id.hash px_peer * 31) + P2p_peer.Id.hash peer - | Trusted -> P2p_peer.Id.hash px_peer * 17 - end) - - type t = P2p_point.Id.t Table.t - - let create ?(size = 2048) () = Table.create size - - let insert table ~origin ~px_peer point = - Table.replace table {px_peer; origin} point - - let drop table ~origin ~px_peer = Table.remove table {origin; px_peer} - - let find_opt table ~origin ~px_peer = Table.find_opt table {origin; px_peer} -end - -(* [px_peer_of_peer p2p_layer peer] returns the public IP address and port at which - [peer] could be reached. For that, it first inspects information transmitted - via connection metadata by the remote [peer]. If the address or port are - missing from the connection's metadata, the function inspects the Internet - connection link's information. It returns [None] if it does manage to get - those information with both methods. *) -let px_peer_of_peer p2p_layer peer = - let open Option_syntax in - let open Transport_layer_interface in - let* conn = P2p.find_connection_by_peer_id p2p_layer peer in - (* In general, people either provide an address and a port, or just a port. - In any case, we use `P2p.connection_remote_metadata` to get the address and - the port of the provided values, and we fall back to the values given by - `P2p.connection_info` if the former are not available - (respectively/independently for the address and the port). The first case - is covered by {!P2p.connection_remote_metadata}. But if the IP address is - not explicitly given, we rely on the function {!P2p.connection_info - p2p_layer conn}. *) - let Types.P2P.Metadata.Connection. - {advertised_net_addr; advertised_net_port; is_bootstrap_peer = _} = - P2p.connection_remote_metadata p2p_layer conn - in - let {P2p_connection.Info.id_point = conn_addr, conn_port_opt; _} = +let peer_of_connection p2p_layer conn = + let open Types.P2P.Metadata.Connection in + let {P2p_connection.Info.peer_id; id_point; remote_metadata; _} = P2p.connection_info p2p_layer conn in - let addr = Option.value advertised_net_addr ~default:conn_addr in - let* port = Option.either advertised_net_port conn_port_opt in - return {point = (addr, port); peer} - -(* FIXME: https://gitlab.com/tezos/tezos/-/issues/6637 - - When adding an RPC to trust peers/points. Use this function when trusting a peer - via an RPC. *) - -(* This function inserts the point of a trusted peer into the [px_cache] - table. *) -let cache_point_of_trusted_peer p2p_layer px_cache peer = - px_peer_of_peer p2p_layer peer - |> Option.iter (fun Transport_layer_interface.{point; peer} -> - PX_cache.insert px_cache ~origin:Trusted ~px_peer:peer point) + let default_port = + Transport_layer_default_parameters.P2p_config.listening_port + in + let addr = + Option.value remote_metadata.advertised_net_addr ~default:(fst id_point) + in + let port = + Option.value remote_metadata.advertised_net_port ~default:default_port + in + let maybe_reachable_point = (addr, port) in + Types.Peer.{peer_id; maybe_reachable_point} (** This handler forwards information about connections established by the P2P layer to the Gossipsub worker. @@ -212,7 +118,7 @@ let cache_point_of_trusted_peer p2p_layer px_cache peer = outbound to avoid possible love bombing attacks. The Rust version also implements a way to mitigate this risk, but not the Go implementation. *) -let new_connections_handler px_cache gs_worker p2p_layer peer conn = +let new_connections_handler gs_worker p2p_layer peer_id conn = let P2p_connection.Info.{id_point = addr, port_opt; _} = P2p.connection_info p2p_layer conn in @@ -226,7 +132,7 @@ let new_connections_handler px_cache gs_worker p2p_layer peer conn = ~none:true (* It doesn't matter in fake networks where pool is None *) ~some:(fun pool -> f pool arg) in - let trusted_peer = fold_pool_opt P2p_pool.Peers.get_trusted peer in + let trusted_peer = fold_pool_opt P2p_pool.Peers.get_trusted peer_id in let trusted_point = Option.fold port_opt ~none:false ~some:(fun port -> fold_pool_opt P2p_pool.Points.get_trusted (addr, port)) @@ -236,126 +142,65 @@ let new_connections_handler px_cache gs_worker p2p_layer peer conn = Add the ability to have direct peers. *) let direct = false in - if trusted then cache_point_of_trusted_peer p2p_layer px_cache peer ; + let peer = peer_of_connection p2p_layer conn in Worker.( New_connection {peer; direct; trusted; bootstrap} |> p2p_input gs_worker) (** This handler forwards information about P2P disconnections to the Gossipsub worker. *) -let disconnections_handler gs_worker peer = - Worker.(Disconnection {peer} |> p2p_input gs_worker) - -(* This function translates a Worker p2p_message to the type of messages sent - via the P2P layer. The two types don't coincide because of Prune. *) -let wrap_p2p_message p2p_layer = - let module W = Worker in - let open Transport_layer_interface in - function - | W.Graft {topic} -> Graft {topic} - | W.Prune {topic; px; backoff} -> - let px = Seq.filter_map (fun peer -> px_peer_of_peer p2p_layer peer) px in - Prune {topic; px; backoff} - | W.IHave {topic; message_ids} -> IHave {topic; message_ids} - | W.IWant {message_ids} -> IWant {message_ids} - | W.Subscribe {topic} -> Subscribe {topic} - | W.Unsubscribe {topic} -> Unsubscribe {topic} - | W.Message_with_header {message; topic; message_id} -> - Message_with_header {message; topic; message_id} - -(* This function translates a message received via the P2P layer to a Worker - p2p_message. The two types don't coincide because of Prune. *) -let unwrap_p2p_message p2p_layer ~from_peer px_cache = - let open Worker in - let module I = Transport_layer_interface in - function - | I.Graft {topic} -> Graft {topic} - | I.Prune {topic; px; backoff} -> - let px = - Seq.map - (fun I.{point; peer} -> - if Option.is_none @@ P2p.find_connection_by_peer_id p2p_layer peer - then - PX_cache.insert - px_cache - ~origin:(PX from_peer) - ~px_peer:peer - point ; - peer) - px - in - Prune {topic; px; backoff} - | I.IHave {topic; message_ids} -> IHave {topic; message_ids} - | I.IWant {message_ids} -> IWant {message_ids} - | I.Subscribe {topic} -> Subscribe {topic} - | I.Unsubscribe {topic} -> Unsubscribe {topic} - | I.Message_with_header {message; topic; message_id} -> - Message_with_header {message; topic; message_id} - -let try_connect_point ?expected_peer_id p2p_layer point = - let open Lwt_syntax in - match P2p.pool p2p_layer with - | None -> return_unit - | Some pool -> - if Option.is_some @@ P2p_pool.Connection.find_by_point pool point then - return_unit (* already connected. *) - else - (* We don't wait for the promise to resolve here, because if the - advertised peer is not reachable or is not responding, we might block - until connection timeout is reached (we observed a timeout of 10 - seconds in some case). Blocking here means that processing of other - messages from p2p_output_stream (including shards propagation) will - be delayed. *) - Lwt.dont_wait - (fun () -> - let* (_ : _ P2p.connection tzresult) = - P2p.connect ?expected_peer_id p2p_layer point - in - return_unit) - (fun exn -> - Format.eprintf - "Warning: got an exception while trying to connect to %a: %s@." - Point.pp - point - (Printexc.to_string exn)) - |> return +let disconnections_handler gs_worker peer_id = + let open GS.Introspection in + (* When this callback is called, we only have the [peer_id] and not + the [maybe_reachable_point]. + + It can be reconstructed in many ways: + + - We could do it via the octez-p2p + + - We can find it in the automaton state (choosen option) + + Last option does not have the good complexity but is simple enough. + *) + let view = Worker.state gs_worker in + let {connections; _} = view in + (* Complexity is wrong, but the number of connections should not be + too large, so it should be ok in practice. *) + let bindings = Connections.bindings connections in + let value = + List.find_opt + (fun (peer, _) -> P2p_peer.Id.equal peer.Types.Peer.peer_id peer_id) + bindings + in + match value with + | None -> (* Something is off, we should log something probably. *) () + | Some (peer, _) -> Worker.(Disconnection {peer} |> p2p_input gs_worker) -let try_connect p2p_layer px_cache ~px_peer ~origin = +let try_connect ?expected_peer_id p2p_layer point = let open Lwt_syntax in - (* If there is some [point] associated to [px_peer] and advertised by [origin] - on the [px_cache], we will try to connect to it. *) - match PX_cache.find_opt px_cache ~px_peer ~origin with - | Some point -> - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5799 - - We may have an issue as described by the following scenario: - - A legit pair [(point, peer)] is already known by P2P, but the remote - peer is disconnected for some reason; - - Some (malicious) peer advertises a fake px_peer [(point, peer')], where - [peer'] is not the real peer id associated to the [point]; - - We try to connect to [point], setting at the same time that [peer'] - is the expected peer id for this point; - - The connection fails, but the old (correct) association [(point, - peer)] is lost. - - We may want to revert [peer'] association to [point]. But we sill have - an issue in case [(point, peer)] is actually the fake info and [(point, - peer')] the legit association but the node is disconnected when trying - to connect to it. - - This implementation will be hardened once we add the notion of "signed - records" found, e.g., in Rust version, to check that the advertised - (peer, point) pair alongside a timestamp are not faked. *) - let* () = try_connect_point ~expected_peer_id:px_peer p2p_layer point in - (match origin with - | Trusted -> () (* Don't drop trusted points. *) - | PX _ -> PX_cache.drop px_cache ~px_peer ~origin) ; - return_unit - | _ -> return_unit + (* We don't wait for the promise to resolve here, because if the + advertised peer is not reachable or is not responding, we might block + until connection timeout is reached (we observed a timeout of 10 + seconds in some case). Blocking here means that processing of other + messages from p2p_output_stream (including shards propagation) will + be delayed. *) + Lwt.dont_wait + (fun () -> + let* (_ : _ P2p.connection tzresult) = + P2p.connect ?expected_peer_id p2p_layer point + in + return_unit) + (fun exn -> + Format.eprintf + "Warning: got an exception while trying to connect to %a: %s@." + Point.pp + point + (Printexc.to_string exn)) + |> return (** This handler pops and processes the items put by the worker in the p2p output stream. The out messages are sent to the corresponding peers and the directives to the P2P layer to connect or disconnect peers are handled. *) -let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = +let gs_worker_p2p_output_handler gs_worker p2p_layer = let open Lwt_syntax in (* only log sending of GS control messages *) let log_sending_message = function @@ -367,7 +212,7 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = let* () = match p2p_output with | Worker.Out_message {to_peer; p2p_message} -> ( - let conn = P2p.find_connection_by_peer_id p2p_layer to_peer in + let conn = P2p.find_connection_by_peer_id p2p_layer to_peer.peer_id in match conn with | None -> (* This could happen when the peer is disconnected or the @@ -377,34 +222,33 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = Are there weird cases in which there is no connection associated to the peer, but the peer is still registered as connected on the GS side? *) - Events.(emit no_connection_for_peer to_peer) + Events.(emit no_connection_for_peer to_peer.peer_id) | Some conn -> ( let* (res : unit tzresult) = - let msg = wrap_p2p_message p2p_layer p2p_message in let* () = if log_sending_message p2p_message then - Events.(emit send_p2p_message (to_peer, msg)) + Events.( + emit send_p2p_message (to_peer.peer_id, p2p_message)) else return_unit in - P2p.send p2p_layer conn msg + P2p.send p2p_layer conn p2p_message in match res with | Ok () -> return_unit | Error err -> - Events.(emit send_p2p_message_failed (to_peer, err)))) + Events.(emit send_p2p_message_failed (to_peer.peer_id, err)))) | Disconnect {peer} -> - P2p.find_connection_by_peer_id p2p_layer peer + P2p.find_connection_by_peer_id p2p_layer peer.peer_id |> Option.iter_s (P2p.disconnect ~reason:"disconnected by Gossipsub" p2p_layer) - | Connect {peer; origin} -> - try_connect p2p_layer px_cache ~px_peer:peer ~origin - | Connect_point {point} -> try_connect_point p2p_layer point - | Forget {peer; origin} -> - PX_cache.drop px_cache ~px_peer:peer ~origin:(PX origin) ; - return_unit + | Connect {peer; origin = _} -> + let Types.Peer.{maybe_reachable_point; peer_id} = peer in + try_connect ~expected_peer_id:peer_id p2p_layer maybe_reachable_point + | Connect_point {point} -> try_connect p2p_layer point + | Forget _ -> return_unit | Kick {peer} -> P2p.pool p2p_layer - |> Option.iter_s (fun pool -> P2p_pool.Peers.ban pool peer) + |> Option.iter_s (fun pool -> P2p_pool.Peers.ban pool peer.peer_id) in loop output_stream in @@ -412,21 +256,12 @@ let gs_worker_p2p_output_handler gs_worker p2p_layer px_cache = (** This handler forwards p2p messages received via Octez p2p to the Gossipsub worker. *) -let transport_layer_inputs_handler gs_worker p2p_layer advertised_px_cache = +let transport_layer_inputs_handler gs_worker p2p_layer = let open Lwt_syntax in let rec loop () = - let* conn, msg = P2p.recv_any p2p_layer in - let {P2p_connection.Info.peer_id = from_peer; _} = - P2p.connection_info p2p_layer conn - in - Worker.( - In_message - { - from_peer; - p2p_message = - unwrap_p2p_message p2p_layer ~from_peer advertised_px_cache msg; - } - |> p2p_input gs_worker) ; + let* conn, p2p_message = P2p.recv_any p2p_layer in + let from_peer = peer_of_connection p2p_layer conn in + Worker.(In_message {from_peer; p2p_message} |> p2p_input gs_worker) ; loop () in loop () @@ -450,17 +285,16 @@ let app_messages_handler gs_worker ~app_messages_callback = Worker.app_output_stream gs_worker |> loop let activate gs_worker p2p_layer ~app_messages_callback = - let px_cache = PX_cache.create () in (* Register a handler to notify new P2P connections to GS. *) let () = - new_connections_handler px_cache gs_worker p2p_layer + new_connections_handler gs_worker p2p_layer |> P2p.on_new_connection p2p_layer in (* Register a handler to notify P2P disconnections to GS. *) let () = disconnections_handler gs_worker |> P2p.on_disconnection p2p_layer in Lwt.join [ - gs_worker_p2p_output_handler gs_worker p2p_layer px_cache; - transport_layer_inputs_handler gs_worker p2p_layer px_cache; + gs_worker_p2p_output_handler gs_worker p2p_layer; + transport_layer_inputs_handler gs_worker p2p_layer; app_messages_handler gs_worker ~app_messages_callback; ] diff --git a/src/lib_dal_node/gossipsub/gs_transport_connection.mli b/src/lib_dal_node/gossipsub/gs_transport_connection.mli index a9de70f67293ddcca792dd74e48b0eb6cec09608..e9f7c2bc8deaf1e45cb87fc7264b6c9f932e57cf 100644 --- a/src/lib_dal_node/gossipsub/gs_transport_connection.mli +++ b/src/lib_dal_node/gossipsub/gs_transport_connection.mli @@ -35,7 +35,7 @@ *) val activate : Gs_interface.Worker_instance.t -> - ( Transport_layer_interface.p2p_message, + ( Gs_interface.Worker_instance.p2p_message, Types.P2P.Metadata.Peer.t, Types.P2P.Metadata.Connection.t ) P2p.t -> diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.ml b/src/lib_dal_node/gossipsub/transport_layer_interface.ml index 4b48d16fcf6c7255bd738b48eb5d5b5256300530..1f2bb98034c101363e1c2e5c51ba3bc0e3b8bede 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.ml +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.ml @@ -30,32 +30,18 @@ module Types = Tezos_dal_node_services.Types Version this type to ease future migrations. *) module P2p_message_V1 = struct - type px_peer = {point : P2p_point.Id.t; peer : P2p_peer.Id.t} - - type p2p_message = - | Graft of {topic : Types.Topic.t} - | Prune of { - topic : Types.Topic.t; - px : px_peer Seq.t; - backoff : Types.Span.t; - } - | IHave of {topic : Types.Topic.t; message_ids : Types.Message_id.t list} - | IWant of {message_ids : Types.Message_id.t list} - | Subscribe of {topic : Types.Topic.t} - | Unsubscribe of {topic : Types.Topic.t} - | Message_with_header of { - message : Types.Message.t; - topic : Types.Topic.t; - message_id : Types.Message_id.t; - } + type px_peer = Types.Peer.t + + type p2p_message = Gs_interface.Worker_instance.p2p_message let px_peer_encoding = let open Data_encoding in conv - (fun {point; peer} -> (point, peer)) - (fun (point, peer) -> {point; peer}) + (fun Types.Peer.{maybe_reachable_point; peer_id} -> + (maybe_reachable_point, peer_id)) + (fun (maybe_reachable_point, peer_id) -> {maybe_reachable_point; peer_id}) (obj2 - (req "point" P2p_point.Id.encoding) + (req "maybe_reachable_point" P2p_point.Id.encoding) (req "peer" P2p_peer.Id.encoding)) (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5564 @@ -63,6 +49,7 @@ module P2p_message_V1 = struct DAL/GS: bound the lists/seqs in exchanged p2p messages. *) let p2p_message_app_encoding = let open Data_encoding in + let open Gs_interface.Worker_instance in let case ?max_length ~tag ~title encoding unwrap wrap = P2p_params.Encoding {tag; title; encoding; wrap; unwrap; max_length} in @@ -152,7 +139,9 @@ module P2p_message_V1 = struct let pp_list pp_elt = Format.pp_print_list ~pp_sep:(fun fmt () -> Format.fprintf fmt "; ") pp_elt - let pp_p2p_message fmt = function + let pp_p2p_message fmt = + let open Gs_interface.Worker_instance in + function | Graft {topic} -> Format.fprintf fmt "Graft{topic=%a}" Types.Topic.pp topic | Prune {topic; px; backoff} -> Format.fprintf @@ -161,7 +150,7 @@ module P2p_message_V1 = struct Types.Topic.pp topic (pp_list Types.Peer.pp) - (List.of_seq px |> List.map (fun px_peer -> px_peer.peer)) + (List.of_seq px) Types.Span.pp backoff | IHave {topic; message_ids} -> diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.mli b/src/lib_dal_node/gossipsub/transport_layer_interface.mli index b33114a0c2fabadc5976d0fcb6cfd69c449c3f4b..508275946999237a8e20654e6b3201729eef939c 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.mli +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.mli @@ -41,31 +41,17 @@ module Types = Tezos_dal_node_services.Types (** Peers exchanged via PX. [point] represents the (address, port) pair of the exchanged peer, while [peer] represents the cryptographic identity of the peer. *) -type px_peer = {point : P2p_point.Id.t; peer : P2p_peer.Id.t} +type px_peer = Types.Peer.t -(** Without piggybacking, {!p2p_message} is almost identical to - {!Gs_interface.p2p_message}, except that for the [Prune] case, - {!P2p_peer.Id.t} elements in [px] are augmented by their {!P2p_point.Id.t} - counterpart. *) -type p2p_message = - | Graft of {topic : Types.Topic.t} - | Prune of {topic : Types.Topic.t; px : px_peer Seq.t; backoff : Types.Span.t} - | IHave of {topic : Types.Topic.t; message_ids : Types.Message_id.t list} - | IWant of {message_ids : Types.Message_id.t list} - | Subscribe of {topic : Types.Topic.t} - | Unsubscribe of {topic : Types.Topic.t} - | Message_with_header of { - message : Types.Message.t; - topic : Types.Topic.t; - message_id : Types.Message_id.t; - } +val p2p_message_encoding : + Gs_interface.Worker_instance.p2p_message Data_encoding.t -val p2p_message_encoding : p2p_message Data_encoding.t - -val pp_p2p_message : Format.formatter -> p2p_message -> unit +val pp_p2p_message : + Format.formatter -> Gs_interface.Worker_instance.p2p_message -> unit (** A P2P message config is parameterized by the network's name. *) val message_config : - network_name:string -> p2p_message P2p_params.message_config + network_name:string -> + Gs_interface.Worker_instance.p2p_message P2p_params.message_config val version : network_name:string -> Network_version.t diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 2ccfe330815ae376f8cc04b34a6c1da57178eca8..cdfd69bc07a466e098434cbcbbb4200f62797a12 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -181,21 +181,41 @@ module Message = struct end module Peer = struct - type t = P2p_peer.Id.t + type t = {peer_id : P2p_peer.Id.t; maybe_reachable_point : P2p_point.Id.t} module Cmp = struct type nonrec t = t - let compare p1 p2 = P2p_peer.Id.compare p1 p2 + (* We only compare the peer id here. The reason is that it is safe to assume: + + 1. For connected peers, they can be identified by a unique IP address. + + 2. For not connected peers we want to advertise only one point + that can change with time. *) + let compare p1 p2 = P2p_peer.Id.compare p1.peer_id p2.peer_id end include Compare.Make (Cmp) module Set = Set.Make (Cmp) module Map = Map.Make (Cmp) - let pp = P2p_peer.Id.pp + let encoding = + let open Data_encoding in + conv + (fun {peer_id; maybe_reachable_point} -> (peer_id, maybe_reachable_point)) + (fun (peer_id, maybe_reachable_point) -> {peer_id; maybe_reachable_point}) + (obj2 + (req "peer_id" P2p_peer.Id.encoding) + (req "maybe_reachable_point" P2p_point.Id.encoding)) - let encoding = P2p_peer.Id.encoding + let pp fmt {peer_id; maybe_reachable_point} = + Format.fprintf + fmt + "{peer_id=%a;@,maybe_reachable_point=%a}" + P2p_peer.Id.pp + peer_id + P2p_point.Id.pp + maybe_reachable_point end module Point = struct diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index 72dce0649ebefc66dbea72ff1c26f9fb10fd658e..f9de61cfd5b90f622b51048bcd8a5cf4a099a32e 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -112,13 +112,21 @@ module Message : sig include ENCODABLE with type t := t end -(** From the Gossipsub point of view, a peer is given by a cryptographic node - identity {!P2p_peer.Id.t}. It's up to the caller to associate the - {!P2p_peer.Id.t} to a {!P2p_point.Id.t} if needed (to e.g. implement peers - exchange, which needs addresses and ports instead of cryptographic - identities). *) +(** A peer from the point of view of gossipsub. *) module Peer : sig - type t = P2p_peer.Id.t + (** For incoming connections, we know the peer is reachable, and the peer's + address and the port are provided by the octez-p2p layer. + + For outgoing connections, if the remote peers has not specified + its address, nor its port, then the peer's address is + : (at the time of + writing 11732). However, the remote node's user can override both + the address and the port if desired and in that case, the + specified values will be used. *) + type t = {peer_id : P2p_peer.Id.t; maybe_reachable_point : P2p_point.Id.t} + + (** Comparison is not a structural one, instead only the [peer_id] + is used. *) include PRINTABLE with type t := t diff --git a/tezt/lib_tezos/dal_common.ml b/tezt/lib_tezos/dal_common.ml index 434a4dfcbe7123d0713b999a8a9fcfb649e85db9..7b7a243af9b9e82f0dad29b66c1315158a70d9a4 100644 --- a/tezt/lib_tezos/dal_common.ml +++ b/tezt/lib_tezos/dal_common.ml @@ -430,7 +430,10 @@ module Dal_RPC = struct in let as_topic_and_peers json = let topic = get "topic" json |> as_topic in - let peers = get "peers" json |> as_list |> List.map as_string in + let peers = + get "peers" json |> as_list + |> List.map (fun x -> x |-> "peer_id" |> as_string) + in (topic, peers) in make ~query_string GET ["p2p"; "gossipsub"; "topics"; "peers"] (fun json -> diff --git a/tezt/tests/dal.ml b/tezt/tests/dal.ml index 456072ba1d1789c0e4152f7df0a55ee8bc76faef..fd28c30388d4b531ad72ddeb1f15ad6049ff1b36 100644 --- a/tezt/tests/dal.ml +++ b/tezt/tests/dal.ml @@ -6876,7 +6876,7 @@ let test_rpc_get_connections _protocol dal_parameters _cryptobox node client |> List.map JSON.( fun json -> - let peer = json |-> "peer" |> as_string in + let peer = json |-> "peer" |-> "peer_id" |> as_string in let connection = json |-> "connection" in let bootstrap = connection |-> "bootstrap" |> as_bool in let topics =