diff --git a/src/lib_crypto_dal/cryptobox.ml b/src/lib_crypto_dal/cryptobox.ml index 3517949b0f0530f68887723667003b111c076380..0397dfab4a94e758a824eb28175af92f08184854 100644 --- a/src/lib_crypto_dal/cryptobox.ml +++ b/src/lib_crypto_dal/cryptobox.ml @@ -148,7 +148,9 @@ module Inner = struct type page = bytes - type share = Scalar.t array + module Share = Compare.Array (Scalar) + + type share = Share.t type shard = {index : int; share : share} diff --git a/src/lib_crypto_dal/cryptobox.mli b/src/lib_crypto_dal/cryptobox.mli index df89a5429615a92a710372e689394de1f0a6d8f1..471f975ae2ed52fd79c1a3094f95787c30e62619 100644 --- a/src/lib_crypto_dal/cryptobox.mli +++ b/src/lib_crypto_dal/cryptobox.mli @@ -199,14 +199,18 @@ val string_of_commit_error : string (** A portion of the data represented by a polynomial. *) -type share +module Share : Compare.COMPARABLE + +type share = Share.t (** A shard is share with its index (see {!val:shards_from_polynomial}). *) type shard = {index : int; share : share} (** A proof that a shard belongs to some commitment. *) -type shard_proof +module Proof : Compare.COMPARABLE + +type shard_proof = Proof.t module Verifier : VERIFIER diff --git a/src/lib_dal_node/message_validation.ml b/src/lib_dal_node/message_validation.ml index facb572fb98236875ed9516f546d15d50a0bf1b0..13af6944508e7fa7384e50de524693eeead0bfc6 100644 --- a/src/lib_dal_node/message_validation.ml +++ b/src/lib_dal_node/message_validation.ml @@ -326,12 +326,23 @@ let gossipsub_batch_validation ctxt cryptobox ~head_level proto_parameters batch (List.hd batch_list) in let res = - Dal_metrics.sample_time - ~sampling_frequency: - Constants.shards_verification_sampling_frequency - ~metric_updater:Dal_metrics.update_shards_verification_time - ~to_sample:(fun () -> - Cryptobox.verify_shard_multi cryptobox commitment shards proofs) + if Node_context.get_disable_shard_validation ctxt then Ok () + else + Dal_metrics.sample_time + ~sampling_frequency: + Constants.shards_verification_sampling_frequency + ~metric_updater:Dal_metrics.update_shards_verification_time + ~to_sample:(fun () -> + Cryptobox.verify_shard_multi + cryptobox + commitment + shards + proofs) + [@profiler.wrap_f + {driver_ids = [Opentelemetry]} + (Opentelemetry_helpers.trace_slot + ~name:"verify_shards" + Types.Slot_id.{slot_level = level; slot_index})] in match res with | Ok () -> diff --git a/src/lib_dal_node_services/types.ml b/src/lib_dal_node_services/types.ml index 9970810cdc0776e477d319965a5e8d12537f5ff1..f76114b59f52efb31ff6d38e6cf701261a6efda5 100644 --- a/src/lib_dal_node_services/types.ml +++ b/src/lib_dal_node_services/types.ml @@ -178,6 +178,16 @@ module Message = struct (obj2 (req "share" Cryptobox.share_encoding) (req "shard_proof" Cryptobox.shard_proof_encoding)) + + module Cmp = struct + type nonrec t = t + + let compare t1 t2 = + Compare.or_else (Cryptobox.Share.compare t1.share t2.share) (fun () -> + Cryptobox.Proof.compare t1.shard_proof t2.shard_proof) + end + + include Compare.Make (Cmp) end module Peer = struct diff --git a/src/lib_dal_node_services/types.mli b/src/lib_dal_node_services/types.mli index e2f3249afb29a7fc125125140d838dff0aef1ac4..4c668b155f7f6e3425f875b70bcf8164cdfd9ca3 100644 --- a/src/lib_dal_node_services/types.mli +++ b/src/lib_dal_node_services/types.mli @@ -116,6 +116,8 @@ module Message : sig include PRINTABLE with type t := t include ENCODABLE with type t := t + + include COMPARABLE with type t := t end (** A peer from the point of view of gossipsub. *) diff --git a/src/lib_gossipsub/gossipsub_intf.ml b/src/lib_gossipsub/gossipsub_intf.ml index 3eebe69ca4ceeab077d8ac0bd7e17a8345f30f02..6045c6ccf61e58355f7389a255708813c02f7810 100644 --- a/src/lib_gossipsub/gossipsub_intf.ml +++ b/src/lib_gossipsub/gossipsub_intf.ml @@ -53,7 +53,11 @@ module type AUTOMATON_SUBCONFIG = sig end module Message : sig - include PRINTABLE + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t (** [valid] performs an application layer-level validity check on a message id and a message if given. @@ -564,7 +568,13 @@ module type AUTOMATON = sig end (** Module for message *) - module Message : PRINTABLE + module Message : sig + type t + + include PRINTABLE with type t := t + + include COMPARABLE with type t := t + end (** Module for time *) module Time : PRINTABLE diff --git a/src/lib_gossipsub/gossipsub_worker.ml b/src/lib_gossipsub/gossipsub_worker.ml index 363da1901f910580338de0012b94f5e384c8224b..6692f0838856930aeb13c131e911c208f9598479 100644 --- a/src/lib_gossipsub/gossipsub_worker.ml +++ b/src/lib_gossipsub/gossipsub_worker.ml @@ -270,9 +270,17 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : Some (value, t) end - type batch_state = - | Pending - | Accumulating of (GS.receive_message * Peer.Set.t) list + module ReceiveMsgMap = Map.Make (struct + type t = GS.receive_message + + let compare (m1 : t) (m2 : t) = + Compare.or_else + (Compare.or_else (Topic.compare m1.topic m2.topic) (fun () -> + Message_id.compare m1.message_id m2.message_id)) + (fun () -> Message.compare m1.message m2.message) + end) + + type batch_state = Pending | Accumulating of Peer.Set.t ReceiveMsgMap.t (** The worker's state is made of the gossipsub automaton's state, and a stream of events to process. It also has two output streams to @@ -779,7 +787,7 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let current_batch = ref Pending in fun output time_interval events_stream -> match output with - | GS.To_include_in_batch content -> ( + | GS.To_include_in_batch (msg, peers) -> ( match !current_batch with | Pending -> let open Lwt_syntax in @@ -787,15 +795,24 @@ module Make (C : Gossipsub_intf.WORKER_CONFIGURATION) : let* () = Lwt_unix.sleep (GS.Span.to_float_s time_interval) in let batch = match !current_batch with - | Accumulating batch -> batch + | Accumulating batch -> ReceiveMsgMap.bindings batch | Pending -> [] in current_batch := Pending ; Stream.push (Process_batch batch) events_stream ; return_unit) ; - current_batch := Accumulating [content] + let content_map = ReceiveMsgMap.singleton msg peers in + current_batch := Accumulating content_map | Accumulating prev_contents -> - current_batch := Accumulating (content :: prev_contents)) + let new_contents = + ReceiveMsgMap.update + msg + (function + | None -> Some peers + | Some prev_peers -> Some (Peer.Set.inter peers prev_peers)) + prev_contents + in + current_batch := Accumulating new_contents) | _ -> () (** Handling messages received from the P2P network. *) diff --git a/src/lib_stdlib/compare.ml b/src/lib_stdlib/compare.ml index e041882af1ffb4d211c2f5c86547b45685139a3c..741bd9a3bba1265612aaad900ae52ddae7c0987f 100644 --- a/src/lib_stdlib/compare.ml +++ b/src/lib_stdlib/compare.ml @@ -91,6 +91,22 @@ module List (P : COMPARABLE) = Make (struct if hd <> 0 then hd else compare xs ys end) +module Array (P : COMPARABLE) = Make (struct + type t = P.t array + + let compare a b = + let len_a = Array.length a in + let len_b = Array.length b in + let rec loop i = + if i = len_a then if i = len_b then 0 else -1 + else if i = len_b then 1 + else + let c = P.compare a.(i) b.(i) in + if c <> 0 then c else loop (i + 1) + in + loop 0 +end) + module Option (P : COMPARABLE) = Make (struct type t = P.t option diff --git a/src/lib_stdlib/compare.mli b/src/lib_stdlib/compare.mli index 424b2709a4722164986dbafe3648538ab044774c..4d851ffa7b044a1cd3e9dc77b9e6207aec21a571 100644 --- a/src/lib_stdlib/compare.mli +++ b/src/lib_stdlib/compare.mli @@ -151,6 +151,8 @@ module Q : S with type t = Q.t module List (P : COMPARABLE) : S with type t = P.t list +module Array (P : COMPARABLE) : S with type t = P.t array + module Option (P : COMPARABLE) : S with type t = P.t option module Result (Ok : COMPARABLE) (Error : COMPARABLE) :