diff --git a/src/bin_dal_node/worker_parameters.ml b/src/bin_dal_node/worker_parameters.ml index 6748d133c2e924e074f1fd9c4b27b3aee655fc43..165b6662e81adad4b813b9c3e395a875b9e427a0 100644 --- a/src/bin_dal_node/worker_parameters.ml +++ b/src/bin_dal_node/worker_parameters.ml @@ -52,6 +52,7 @@ let limits = retain_duration; fanout_ttl; heartbeat_interval; + heartbeat_ping_interval; backoff_cleanup_ticks; score_cleanup_ticks; degree_low; diff --git a/src/lib_dal_node/gossipsub/gs_default_parameters.ml b/src/lib_dal_node/gossipsub/gs_default_parameters.ml index 5dca52a5059b56534994023a733763f10d91570a..3313e33308d1058ce9cf8b10f7c83d6b91dfcfba 100644 --- a/src/lib_dal_node/gossipsub/gs_default_parameters.ml +++ b/src/lib_dal_node/gossipsub/gs_default_parameters.ml @@ -117,6 +117,25 @@ module Limits = struct let heartbeat_interval = Types.Span.of_int_s 1 + (* The ping should be performed by the heartbeat handler at least + every 45s, hence every [heartbeat_ping_interval] ticks when + [heartbeat_interval] = 1s. + + A 45-second ping interval is a reasonable choice as it sits in + the middle of the commonly recommended 30-60 second range for P2P + networks. + + This timing provides a good balance between maintaining + reliable connections and managing network overhead. + + It's frequent enough to detect stale peers before most + NAT/firewall timeouts (which typically occur after 60-120 + seconds) + *) + let heartbeat_ping_interval = + let heartbeat_interval = Types.Span.to_int_s heartbeat_interval in + 45 / heartbeat_interval + let backoff_cleanup_ticks = 15 let score_cleanup_ticks = 1 diff --git a/src/lib_dal_node/gossipsub/gs_logging.ml b/src/lib_dal_node/gossipsub/gs_logging.ml index 279b39223bc68c41a4d04999ad4a4c7055f9ee05..844004be7dfe6838628bd02460c26de89cad4c54 100644 --- a/src/lib_dal_node/gossipsub/gs_logging.ml +++ b/src/lib_dal_node/gossipsub/gs_logging.ml @@ -130,6 +130,15 @@ module Events = struct ("peer", P2p_peer.Id.encoding) ("topic", Types.Topic.encoding) + let ping = + declare_1 + ~section + ~name:(prefix "ping") + ~msg:"Process Ping from {peer}" + ~level:Info + ~pp1:P2p_peer.Id.pp + ("peer", P2p_peer.Id.encoding) + let unsubscribe = declare_2 ~section @@ -211,6 +220,9 @@ let event ~verbose = | Disconnection {peer} -> emit disconnection peer.peer_id | In_message {from_peer; p2p_message} -> ( match p2p_message with + | Ping -> + if not verbose then Lwt.return_unit + else emit ping from_peer.peer_id | Message_with_header {message = _; topic; message_id} -> emit message_with_header (from_peer.peer_id, topic, message_id) | Subscribe {topic} -> emit subscribe (from_peer.peer_id, topic) diff --git a/src/lib_dal_node/gossipsub/transport_layer_interface.ml b/src/lib_dal_node/gossipsub/transport_layer_interface.ml index 1f2bb98034c101363e1c2e5c51ba3bc0e3b8bede..4f4c8221c88451906f5ecba0092417ab2d369581 100644 --- a/src/lib_dal_node/gossipsub/transport_layer_interface.ml +++ b/src/lib_dal_node/gossipsub/transport_layer_interface.ml @@ -44,6 +44,11 @@ module P2p_message_V1 = struct (req "maybe_reachable_point" P2p_point.Id.encoding) (req "peer" P2p_peer.Id.encoding)) + (* This ping payload is to be considered as a dummy one using is the + pkh zero address. *) + let ping_topic = + Types.Topic.{slot_index = 0; pkh = Signature.Public_key_hash.zero} + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5564 DAL/GS: bound the lists/seqs in exchanged p2p messages. *) @@ -95,14 +100,26 @@ module P2p_message_V1 = struct (req "message_ids" (list Types.Message_id.encoding))) (function IWant {message_ids} -> Some ((), message_ids) | _ -> None) (fun ((), message_ids) -> IWant {message_ids}); + (* In the following encoding we introduce a special case such that the [Ping] is + encoded as a [Subscribe] using a dummy payload [ping_topic] and the p2p messages + encoding remain compatible. + + FIXME/DAL: https://gitlab.com/tezos/tezos/-/issues/7768 + + Define a [Ping] own message encoding when bumping the p2p message version. + *) case ~tag:0x14 ~title:"Subscribe" (obj2 (req "kind" (constant "subscribe")) (req "topic" Types.Topic.encoding)) - (function Subscribe {topic} -> Some ((), topic) | _ -> None) - (fun ((), topic) -> Subscribe {topic}); + (function + | Subscribe {topic} -> Some ((), topic) + | Ping -> Some ((), ping_topic) + | _ -> None) + (fun ((), topic) -> + if Types.Topic.(topic = ping_topic) then Ping else Subscribe {topic}); case ~tag:0x15 ~title:"Unsubscribe" @@ -177,6 +194,7 @@ module P2p_message_V1 = struct "FullMessage{message_id=%a}" Types.Message_id.pp message_id + | Ping -> Format.fprintf fmt "Ping" let distributed_db_version = Distributed_db_version.zero diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 9f35c46cbd46032f5aaa0f7bbf9933b90ab59a14..73804c0caf7e61cdc5992af161e6bb0e1eea8c60 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -247,6 +247,8 @@ type ('topic, 'peer, 'message_id, 'span) limits = { subscribed to, then we don't track that topic anymore, that is, we delete it from the fanout map. *) heartbeat_interval : 'span; (** The time between heartbeats. *) + heartbeat_ping_interval : int; + (** Number of heartbeat ticks between consecutive pings. *) backoff_cleanup_ticks : int; (** The number of heartbeat ticks setting the frequency at which the backoffs are checked and potentially cleared. *) @@ -1121,6 +1123,7 @@ module type WORKER = sig | Subscribe of {topic : GS.Topic.t} | Unsubscribe of {topic : GS.Topic.t} | Message_with_header of message_with_header + | Ping (** The different kinds of input events that could be received from the P2P layer. *) diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 3ccb3aa7737bd2d06677799c67a4e7ec3e9a5cf0..7cb53c56d9fa952132aa6e5566050c8f0fb09f68 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -181,6 +181,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | Subscribe of {topic : Topic.t} | Unsubscribe of {topic : Topic.t} | Message_with_header of message_with_header + | Ping (* FIXME: https://gitlab.com/tezos/tezos/-/issues/5323 @@ -307,7 +308,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : | IWant _ -> Introspection.update_count_sent_iwants stats `Incr | Message_with_header _ -> Introspection.update_count_sent_app_messages stats `Incr - | Subscribe _ | Unsubscribe _ -> ()) + | Subscribe _ | Unsubscribe _ | Ping -> ()) | Connect _ | Connect_point _ | Disconnect _ | Forget _ | Kick _ -> () in fun {connected_bootstrap_peers; p2p_output_stream; stats; _} ~mk_output -> @@ -323,7 +324,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : (* Don't emit app messages, send IHave messages or respond to IWant if the remote peer has a bootstrap profile. *) false - | Graft _ | Prune _ | Subscribe _ | Unsubscribe _ -> true) + | Graft _ | Prune _ | Subscribe _ | Unsubscribe _ | Ping -> true) | Connect _ | Connect_point _ | Disconnect _ | Forget _ | Kick _ -> true in @@ -632,13 +633,29 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Add more verification/attacks protections as done in Rust. *) - (** On a [Heartbeat] events, the worker sends graft and prune messages - following the automaton's output. It also sends [IHave] messages (computed - by the automaton as well). *) - let handle_heartheat = function + let may_emit_pings state = + let gstate = state.gossip_state in + let gstate_view = View.view gstate in + let heartbeat_ping_interval = + Int64.of_int gstate_view.limits.heartbeat_ping_interval + in + let heartbeat_ticks = gstate_view.heartbeat_ticks in + let rem = Int64.rem heartbeat_ticks heartbeat_ping_interval in + if Int64.equal rem 0L then + GS.Introspection.Connections.iter + (fun peer _connection -> emit_p2p_message state Ping (Seq.return peer)) + gstate_view.connections + + (** On a [Heartbeat] events, the worker sends graft and prune + messages following the automaton's output. It also sends [IHave] + messages (computed by the automaton as well). It also send ping + messages to its for each peer in its connections every + [heartbeat_ping_interval] ticks. *) + let handle_heartbeat = function | state, GS.Heartbeat {to_graft; to_prune; noPX_peers} -> let gstate = state.gossip_state in let gstate_view = View.view gstate in + may_emit_pings state ; let iter pmap mk_msg = Peer.Map.iter (fun peer topicset -> @@ -763,6 +780,9 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : GS.handle_prune prune gossip_state |> update_gossip_state state |> handle_prune ~self ~from_peer px + | Ping -> + (* We treat [Ping] message as a no-op and return the current [state]. *) + state (** Handling events received from P2P layer. *) let apply_p2p_event ~self ({gossip_state; _} as state) = function @@ -828,7 +848,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Do we want to detect cases where two successive [Heartbeat] events would be handled (e.g. because the first one is late)? *) GS.heartbeat gossip_state |> update_gossip_state state - |> handle_heartheat + |> handle_heartbeat | P2P_input event -> apply_p2p_event ~self state event | App_input event -> apply_app_event state event | Check_unknown_messages -> check_unknown_messages_id state @@ -1032,6 +1052,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Format.fprintf fmt "Unsubscribe{topic=%a}" Topic.pp topic | Message_with_header message_with_header -> pp_message_with_header fmt message_with_header + | Ping -> Format.fprintf fmt "Ping" let pp_p2p_output fmt = function | Disconnect {peer} -> Format.fprintf fmt "Disconnect{peer=%a}" Peer.pp peer diff --git a/src/lib_gossipsub/test/default_limits.ml b/src/lib_gossipsub/test/default_limits.ml index fda29164be0cfe856cf4368b94c2f499be2b4115..2fab2da57d726fdbfc20cf68e7f256fce22db096 100644 --- a/src/lib_gossipsub/test/default_limits.ml +++ b/src/lib_gossipsub/test/default_limits.ml @@ -90,6 +90,8 @@ let default_limits ?(time_in_mesh_weight = 1.0) retain_duration = Milliseconds.Span.of_int_s 10; fanout_ttl = Milliseconds.Span.of_int_s 60; heartbeat_interval = Milliseconds.Span.of_int_s 1; + heartbeat_ping_interval = 45 / 1; + (* 45 / heartbeat_interval *) backoff_cleanup_ticks = 15; score_cleanup_ticks = 1; degree_low = 5; diff --git a/src/lib_gossipsub/test/test_gossipsub_shared.ml b/src/lib_gossipsub/test/test_gossipsub_shared.ml index ddb9f11948adc0362cceef90176a62e063c3fa02..9ba07142f11e2feed35a38279365c77127fe967f 100644 --- a/src/lib_gossipsub/test/test_gossipsub_shared.ml +++ b/src/lib_gossipsub/test/test_gossipsub_shared.ml @@ -240,6 +240,7 @@ let pp_limits fmtr retain_duration; fanout_ttl; heartbeat_interval; + heartbeat_ping_interval; backoff_cleanup_ticks; score_cleanup_ticks; degree_low; @@ -276,6 +277,7 @@ let pp_limits fmtr retain_duration = %a;@;\ fanout_ttl = %a;@;\ heartbeat_interval = %a;@;\ + heartbeat_ping_interval = %d;@;\ backoff_cleanup_ticks = %d;@;\ score_cleanup_ticks = %d;@;\ degree_low = %d;@;\ @@ -312,6 +314,7 @@ let pp_limits fmtr fanout_ttl GS.Span.pp heartbeat_interval + heartbeat_ping_interval backoff_cleanup_ticks score_cleanup_ticks degree_low