From 70558d59fbf628e7b422a4ed25880e4b3e6674bc Mon Sep 17 00:00:00 2001 From: Gabriel Moise Date: Mon, 23 Jun 2025 14:44:32 +0100 Subject: [PATCH] Dal_node: Add cache for shards storage for Attester profile --- src/lib_dal_node/daemon.ml | 2 +- src/lib_dal_node/store.ml | 154 +++++++++++++++++++++++++++++++++++-- src/lib_dal_node/store.mli | 21 +++-- 3 files changed, 164 insertions(+), 13 deletions(-) diff --git a/src/lib_dal_node/daemon.ml b/src/lib_dal_node/daemon.ml index 81cc76429dc4..1bd77af5d714 100644 --- a/src/lib_dal_node/daemon.ml +++ b/src/lib_dal_node/daemon.ml @@ -462,7 +462,7 @@ let run ?(disable_shard_validation = false) ~ignore_pkhs ~data_dir Gossipsub.Transport_layer.shutdown transport_layer) in (* Initialize store *) - let* store = Store.init config in + let* store = Store.init config profile_ctxt proto_parameters in let* last_processed_level = let last_processed_level_store = Store.last_processed_level store in Store.Last_processed_level.load last_processed_level_store diff --git a/src/lib_dal_node/store.ml b/src/lib_dal_node/store.ml index 4cd3c7e0a68f..65d46498b4c0 100644 --- a/src/lib_dal_node/store.ml +++ b/src/lib_dal_node/store.ml @@ -139,7 +139,7 @@ module Stores_dirs = struct let skip_list_cells = "skip_list_store" end -module Shards = struct +module Shards_disk = struct type nonrec t = (Types.slot_id, int, Cryptobox.share) KVS.t let file_layout ~root_dir (slot_id : Types.slot_id) = @@ -252,6 +252,150 @@ module Shards = struct KVS.init ~lru_size:Constants.shards_store_lru_size ~root_dir end +module Shards_cache = struct + (** Underlying FIFO-keyed map from slot_id -> map from shard index to share. This is + used as the alternative to the on-disk storage [Shards_disk] for shards. *) + module Slot_map = + Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) + (struct + type t = Types.Slot_id.t + + let equal = Types.Slot_id.equal + + let hash = Types.Slot_id.hash + end) + + module Int_map = Map.Make (Int) + + type t = Cryptobox.share Int_map.t Slot_map.t + + let init = Slot_map.create + + let number_of_shards_available cache slot_id shard_indexes = + Lwt_result_syntax.return + @@ + match Slot_map.find_opt cache slot_id with + | None -> 0 + | Some shards -> + List.fold_left + (fun count shard_index -> + if Int_map.mem shard_index shards then count + 1 else count) + 0 + shard_indexes + + let write_all cache slot_id shards = + let open Lwt_result_syntax in + let cached_shards = + match + Slot_map.find_opt + cache + slot_id [@profiler.aggregate_f {verbosity = Notice} "find_opt"] + with + | None -> Int_map.empty + | Some shards -> shards + in + (let* new_shards = + Seq.ES.fold_left + (fun shards_map {Cryptobox.index; share} -> + if Int_map.mem index shards_map then return shards_map + else + let shards_map = + (Int_map.add + index + share + shards_map + [@profiler.aggregate_f {verbosity = Notice} "add shard"]) + in + let*! () = + Event.emit_stored_slot_shard + ~published_level:slot_id.slot_level + ~slot_index:slot_id.slot_index + ~shard_index:index + in + return shards_map) + cached_shards + shards + in + Slot_map.replace cache slot_id new_shards ; + return_unit) + |> Errors.other_lwt_result + + let read_all cache slot_id = + (match Slot_map.find_opt cache slot_id with + | None -> Seq.empty + | Some shards -> + Int_map.bindings shards |> List.to_seq + |> Seq.map (fun (i, share) -> (slot_id, i, Ok share))) + |> Seq_s.of_seq + + let read cache slot_id shard_id = + let open Lwt_result_syntax in + match Slot_map.find_opt cache slot_id with + | Some shards -> ( + match Int_map.find_opt shard_id shards with + | Some share -> return {Cryptobox.share; index = shard_id} + | None -> fail Errors.not_found) + | None -> fail Errors.not_found + + let count_values cache slot_id = + Lwt_result_syntax.return + @@ + match Slot_map.find_opt cache slot_id with + | None -> 0 + | Some shards -> Int_map.cardinal shards + + let remove cache slot_id = + Lwt_result_syntax.return @@ Slot_map.remove cache slot_id +end + +module Shards = struct + module Disk = Shards_disk + module Cache = Shards_cache + + type t = Disk of Disk.t | Cache of Cache.t + + let init ~profile_ctxt ~proto_parameters node_store_dir shard_store_dir = + let open Lwt_result_syntax in + if Profile_manager.is_attester_only_profile profile_ctxt then + let storage_period = + Profile_manager.get_attested_data_default_store_period + profile_ctxt + proto_parameters + in + let cache_size = storage_period * proto_parameters.number_of_slots in + let cache = Cache.init cache_size in + return (Cache cache) + else + let* store = Disk.init node_store_dir shard_store_dir in + return (Disk store) + + let number_of_shards_available = function + | Disk store -> Disk.number_of_shards_available store + | Cache cache -> Cache.number_of_shards_available cache + + let write_all t slot_id shards = + match t with + | Disk store -> Disk.write_all store slot_id shards + | Cache cache -> Cache.write_all cache slot_id shards + + let read_all t slot_id ~number_of_shards = + match t with + | Disk store -> Disk.read_all store slot_id ~number_of_shards + | Cache cache -> Cache.read_all cache slot_id + + let read = function + | Disk store -> Disk.read store + | Cache cache -> Cache.read cache + + let count_values = function + | Disk store -> Disk.count_values store + | Cache cache -> Cache.count_values cache + + let remove = function + | Disk store -> Disk.remove store + | Cache cache -> Cache.remove cache +end + module Slots = struct type t = (Types.slot_id * int, unit, bytes) KVS.t @@ -702,14 +846,14 @@ let check_version_and_may_upgrade base_dir = (Version.Invalid_data_dir_version {actual = version; expected = Version.current_version}) -(** [init config] inits the store on the filesystem using the - given [config]. *) -let init config = +let init config profile_ctxt proto_parameters = let open Lwt_result_syntax in let base_dir = Configuration_file.store_path config in let* () = check_version_and_may_upgrade base_dir in let* slot_header_statuses = Statuses.init base_dir Stores_dirs.status in - let* shards = Shards.init base_dir Stores_dirs.shard in + let* shards = + Shards.init ~profile_ctxt ~proto_parameters base_dir Stores_dirs.shard + in let* slots = Slots.init base_dir Stores_dirs.slot in let* () = Version.write_version_file ~base_dir in let traps = Traps.create ~capacity:Constants.traps_cache_size in diff --git a/src/lib_dal_node/store.mli b/src/lib_dal_node/store.mli index 1bc13f5b336f..c0f57de01c16 100644 --- a/src/lib_dal_node/store.mli +++ b/src/lib_dal_node/store.mli @@ -6,15 +6,16 @@ (*****************************************************************************) (** This module handles the on-disk storage of the DAL node. We rely - on the [Key_value_store] module from lib_stdlib_unix. *) + on the [Key_value_store] module from lib_stdlib_unix. For shards storage, we + rely on a cache mechanism in the case of non-prover profiles. *) open Cryptobox module Shards : sig - (** A shard of some slot id consist in a shard index (a number + (** A shard of some slot id consist of a shard index (a number between 0 and the number_of_shards protocol parameter) and a share. The shard store is a mapping associating 0 or 1 share to - each (slot_id, shard index) pair. *) + each (slot_id, shard index) pair. *) type t @@ -24,8 +25,8 @@ module Shards : sig val number_of_shards_available : t -> Types.slot_id -> int list -> int tzresult Lwt.t - (** [write_all store slot_id shards] adds to the shard store all the given - shards of the given slot id. *) + (** [write_all store slot_id shards] adds to the shard store all the + given shards of the given slot id. *) val write_all : t -> Types.slot_id -> shard Seq.t -> (unit, [> Errors.other]) result Lwt.t @@ -49,7 +50,7 @@ module Shards : sig val count_values : t -> Types.slot_id -> int tzresult Lwt.t (** [remove store slot_id] removes the shards associated to the given - slot id from the store *) + slot id from the store. *) val remove : t -> Types.slot_id -> unit tzresult Lwt.t end @@ -211,7 +212,13 @@ val cache_entry : Cryptobox.shard_proof array -> unit -val init : Configuration_file.t -> t tzresult Lwt.t +(** [init config profile_ctxt proto_parameters] inits the store on the filesystem using the + given [config], [profile_ctxt] and [proto_parameters]. *) +val init : + Configuration_file.t -> + Profile_manager.t -> + Types.proto_parameters -> + t tzresult Lwt.t (** [add_slot_headers ~number_of_slots ~block_level slot_headers store] processes the [slot_headers] published at [block_level]. Concretely, for -- GitLab