From f57bf636419da0fe96980e5083324b054bf32685 Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Fri, 10 Oct 2025 11:16:31 +0200 Subject: [PATCH 1/5] DAL: use Slot_manager instead of direct Store access in RPC_server --- src/lib_dal_node/RPC_server.ml | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 3abc918b5391..7ace0fc96366 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -316,7 +316,6 @@ module Profile_handlers = struct let warn_missing_shards store attester published_level expected_number_of_shards number_of_stored_shards_per_slot = let open Lwt_syntax in - let statuses_store = Store.slot_header_statuses store in let* problems = List.filter_map_s (fun (Types.Slot_id.{slot_index; _}, num_stored) -> @@ -324,8 +323,8 @@ module Profile_handlers = struct Lwt.return_some (`Ok (slot_index, num_stored)) else let* res = - Store.Statuses.get_slot_status - statuses_store + Slot_manager.get_slot_status + store ~slot_id:{slot_level = published_level; slot_index} in match res with -- GitLab From 4e47f8b01c69132c18dab02309db3c6d4b2ff65b Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Fri, 10 Oct 2025 11:34:30 +0200 Subject: [PATCH 2/5] DAL: find status in skip_list with fallback to status store --- src/lib_dal_node/RPC_server.ml | 9 +++--- src/lib_dal_node/slot_manager.ml | 53 +++++++++++++++++++++++++++++-- src/lib_dal_node/slot_manager.mli | 2 +- 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 7ace0fc96366..62db830e3559 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -253,9 +253,8 @@ module Slots_handlers = struct let get_slot_status ctxt slot_level slot_index () () = call_handler1 (fun () -> - let store = Node_context.get_store ctxt in let slot_id : Types.slot_id = {slot_level; slot_index} in - Slot_manager.get_slot_status ~slot_id store) + Slot_manager.get_slot_status ~slot_id ctxt) let get_slot_shard ctxt slot_level slot_index shard_index () () = call_handler1 (fun () -> @@ -313,7 +312,7 @@ module Profile_handlers = struct let get_assigned_shard_indices ctxt pkh level () () = Node_context.fetch_assigned_shard_indices ctxt ~level ~pkh - let warn_missing_shards store attester published_level + let warn_missing_shards ctxt attester published_level expected_number_of_shards number_of_stored_shards_per_slot = let open Lwt_syntax in let* problems = @@ -324,7 +323,7 @@ module Profile_handlers = struct else let* res = Slot_manager.get_slot_status - store + ctxt ~slot_id:{slot_level = published_level; slot_index} in match res with @@ -483,7 +482,7 @@ module Profile_handlers = struct Lwt.dont_wait (fun () -> warn_missing_shards - store + ctxt pkh published_level number_of_assigned_shards diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index 3bccdcfedae4..ca61dd7fab64 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -773,6 +773,55 @@ let publish_slot_data ctxt ~level_committee ~slot_size gs_worker let store_slot_headers ~number_of_slots ~block_level slot_headers node_store = Store.(add_slot_headers ~number_of_slots ~block_level slot_headers node_store) +module Statuses = struct + let get_status_from_skip_list ctxt (slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let* slot_cell = + Store.Skip_list_cells.find_by_slot_id_opt + (Node_context.get_store ctxt) + slot_id + in + match slot_cell with + | None -> return_none + | Some cell -> ( + let*? proto_parameters = + Node_context.get_proto_parameters + ctxt + ~level:(`Level slot_id.slot_level) + in + let attested_level = + Int32.( + add slot_id.slot_level (of_int proto_parameters.attestation_lag)) + in + let*? (module Plugin : Dal_plugin.T) = + Node_context.get_plugin_for_level ctxt ~level:attested_level + in + let cell = + Dal_proto_types.Skip_list_cell.to_proto + Plugin.Skip_list.cell_encoding + cell + in + match Plugin.Skip_list.proto_attestation_status cell with + | None -> + (* Old protocols that do not expose the information *) + return_none + | Some `Unpublished -> return_none + | Some `Attested -> return_some `Attested + | Some `Unattested -> return_some `Unattested) + + let get_status_from_store ctxt (slot_id : Types.slot_id) = + let node_store = Node_context.get_store ctxt in + let slot_header_statuses_store = Store.slot_header_statuses node_store in + Store.Statuses.get_slot_status ~slot_id slot_header_statuses_store + + let find_status ctxt (slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let*! status = get_status_from_skip_list ctxt slot_id in + match status with + | Ok (Some res) -> return res + | Ok None | Error _ -> get_status_from_store ctxt slot_id +end + let update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots node_store = let slot_header_statuses_store = Store.slot_header_statuses node_store in @@ -783,9 +832,7 @@ let update_selected_slot_headers_statuses ~block_level ~attestation_lag attested_slots slot_header_statuses_store -let get_slot_status ~slot_id node_store = - let slot_header_statuses_store = Store.slot_header_statuses node_store in - Store.Statuses.get_slot_status ~slot_id slot_header_statuses_store +let get_slot_status ~slot_id ctxt = Statuses.find_status ctxt slot_id let get_slot_shard (store : Store.t) (slot_id : Types.slot_id) shard_index = Store.Shards.read (Store.shards store) slot_id shard_index diff --git a/src/lib_dal_node/slot_manager.mli b/src/lib_dal_node/slot_manager.mli index 4c83796fb03e..4a0751ec5d45 100644 --- a/src/lib_dal_node/slot_manager.mli +++ b/src/lib_dal_node/slot_manager.mli @@ -187,7 +187,7 @@ val update_selected_slot_headers_statuses : *) val get_slot_status : slot_id:Types.slot_id -> - Store.t -> + Node_context.t -> (Types.header_status, [Errors.other | Errors.not_found]) result Lwt.t (** [get_slot_shard store slot_id shard_index] returns the shard at -- GitLab From e03a699c3e4a199a3adbfab5e366f1af371b7525 Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Fri, 10 Oct 2025 15:46:46 +0200 Subject: [PATCH 3/5] DAL: remove statuses storage - Only cache recent slots status - Retrieve data from skip list for older slots --- src/lib_dal_node/block_handler.ml | 17 +---- src/lib_dal_node/constants.ml | 10 +++ src/lib_dal_node/constants.mli | 4 + src/lib_dal_node/event.ml | 40 ---------- src/lib_dal_node/slot_manager.ml | 25 +++--- src/lib_dal_node/slot_manager.mli | 7 +- src/lib_dal_node/store.ml | 121 ++++++++---------------------- src/lib_dal_node/store.mli | 26 +++---- 8 files changed, 76 insertions(+), 174 deletions(-) diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index cc65cc346b7d..d0e21aaba0b6 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -90,17 +90,6 @@ let remove_old_level_stored_data proto_parameters ctxt current_level = else return_unit in let number_of_slots = proto_parameters.Types.number_of_slots in - let* () = - let* res = - Store.Statuses.remove_level_status - ~level:oldest_level - (Store.slot_header_statuses store) - in - match res with - | Ok () -> Event.emit_removed_status ~level:oldest_level - | Error error -> - Event.emit_removing_status_failed ~level:oldest_level ~error - in List.iter_s (fun slot_index -> let slot_id : Types.slot_id = @@ -382,7 +371,7 @@ let process_commitments ctxt cctxt store proto_parameters block_level ~block_level cctxt [@profiler.record_s {verbosity = Notice} "slot_headers"]) in - let* () = + let*! () = (Slot_manager.store_slot_headers ~number_of_slots:proto_parameters.Types.number_of_slots ~block_level @@ -443,14 +432,14 @@ let process_finalized_block_data ctxt cctxt store ~prev_proto_parameters (Plugin.dal_attestation block_info [@profiler.record_f {verbosity = Notice} "dal_attestation"]) in - let* () = + let () = (Slot_manager.update_selected_slot_headers_statuses ~block_level ~attestation_lag:proto_parameters.Types.attestation_lag ~number_of_slots:proto_parameters.number_of_slots (Plugin.is_attested dal_attestation) store - [@profiler.record_s + [@profiler.record_f {verbosity = Notice} "update_selected_slot_headers_statuses"]) in let*! () = diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 8f9f353c6e98..aec6dec9f4ce 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -58,6 +58,16 @@ let cache_size = slots multiplied by the attestation lag, which sounds reasonable. *) let slot_id_cache_size = 32 * 8 +(* This cache is used for transient info. Permanent info is stored on disk. + i.e. you need [number_of_slots * (attestation_lag + 1) * 2]. + That's because the slot status of a slot published at [L] is first added + to the cache at level [L + 1], and then updated at level + [L + attestation_lag + tb_finality]. Also, when a block is finalized, + the slots will be updated with a attested/unattested status, and you don't + want it to erase later levels from statuses cache. Using twice the cache + size solves this problem. *) +let statuses_cache_size = 32 * (8 + 1) * 2 + let shards_verification_sampling_frequency = 100 let amplification_timeout = 120. diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 014c228644bb..e02f22d85a1d 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -43,6 +43,10 @@ val cache_size : int associate commitments with slot ids at a given level. *) val slot_id_cache_size : int +(** [statuses_cache_size] is the size (in number of slots) of the cache + to associate slot ids to slot status. *) +val statuses_cache_size : int + (** The frequency at which we sample the time spent in shards crypto verification. *) val shards_verification_sampling_frequency : int diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 71e74f183444..4563203a263d 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -376,19 +376,6 @@ open struct let cached_slot_shard = cached_or_stored_slot_shard ~kind:"cached" - let stored_slot_status = - declare_3 - ~section - ~prefix_name_with_section:true - ~name:"stored_slot_status" - ~msg: - "stored slot status for level {published_level} and index \ - {slot_index}: {status}" - ~level:Debug - ("published_level", Data_encoding.int32) - ("slot_index", Data_encoding.int31) - ("status", Types.header_status_encoding) - let removed_slot_shards = declare_2 ~section @@ -409,15 +396,6 @@ open struct ("published_level", Data_encoding.int32) ("slot_index", Data_encoding.int31) - let removed_status = - declare_1 - ~section - ~prefix_name_with_section:true - ~name:"removed_status" - ~msg:"removed statuses for level {level}" - ~level:Debug - ("level", Data_encoding.int32) - let slot_header_status_storage_error = declare_3 ~section @@ -483,16 +461,6 @@ open struct ("slot_index", Data_encoding.int31) ("error", Error_monad.trace_encoding) - let removing_status_failed = - declare_2 - ~section - ~prefix_name_with_section:true - ~name:"removing_status_failed" - ~level:Warning - ~msg:"removing status file for level {level} failed: {error}" - ("level", Data_encoding.int32) - ("error", Error_monad.trace_encoding) - let removing_skip_list_cells_failed = declare_2 ~section @@ -1416,17 +1384,12 @@ let emit_stored_slot_shard ~published_level ~slot_index ~shard_index = let emit_cached_slot_shard ~published_level ~slot_index ~shard_index = emit cached_slot_shard (published_level, slot_index, shard_index) -let emit_stored_slot_status ~published_level ~slot_index ~status = - emit stored_slot_status (published_level, slot_index, status) - let emit_removed_slot_shards ~published_level ~slot_index = emit removed_slot_shards (published_level, slot_index) let emit_removed_slot ~published_level ~slot_index = emit removed_slot (published_level, slot_index) -let emit_removed_status ~level = emit removed_status level - let emit_slot_header_status_storage_error ~published_level ~slot_index ~error = emit slot_header_status_storage_error (published_level, slot_index, error) @@ -1444,9 +1407,6 @@ let emit_removing_shards_failed ~published_level ~slot_index ~error = let emit_removing_slot_failed ~published_level ~slot_index ~error = emit removing_slot_failed (published_level, slot_index, error) -let emit_removing_status_failed ~level ~error = - emit removing_status_failed (level, error) - let emit_removing_skip_list_cells_failed ~level ~error = emit removing_skip_list_cells_failed (level, error) diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index ca61dd7fab64..2f8d12e801d9 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -809,28 +809,29 @@ module Statuses = struct | Some `Attested -> return_some `Attested | Some `Unattested -> return_some `Unattested) - let get_status_from_store ctxt (slot_id : Types.slot_id) = - let node_store = Node_context.get_store ctxt in - let slot_header_statuses_store = Store.slot_header_statuses node_store in - Store.Statuses.get_slot_status ~slot_id slot_header_statuses_store - let find_status ctxt (slot_id : Types.slot_id) = let open Lwt_result_syntax in - let*! status = get_status_from_skip_list ctxt slot_id in - match status with - | Ok (Some res) -> return res - | Ok None | Error _ -> get_status_from_store ctxt slot_id + let store = Node_context.get_store ctxt in + let statuses_cache = Store.statuses_cache store in + match Store.Statuses_cache.get_slot_status statuses_cache slot_id with + | Some status -> return status + | None -> ( + let*! status = get_status_from_skip_list ctxt slot_id in + match status with + | Ok (Some res) -> return res + | Ok None -> fail `Not_found + | Error e -> fail (`Other e)) end let update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots node_store = - let slot_header_statuses_store = Store.slot_header_statuses node_store in - Store.Statuses.update_selected_slot_headers_statuses + let statuses_cache = Store.statuses_cache node_store in + Store.Statuses_cache.update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots - slot_header_statuses_store + statuses_cache let get_slot_status ~slot_id ctxt = Statuses.find_status ctxt slot_id diff --git a/src/lib_dal_node/slot_manager.mli b/src/lib_dal_node/slot_manager.mli index 4a0751ec5d45..1ae5cfb180cc 100644 --- a/src/lib_dal_node/slot_manager.mli +++ b/src/lib_dal_node/slot_manager.mli @@ -161,7 +161,7 @@ val store_slot_headers : block_level:int32 -> Dal_plugin.slot_header list -> Store.t -> - unit tzresult Lwt.t + unit Lwt.t (** [update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots store] updates the statuses of the @@ -179,11 +179,12 @@ val update_selected_slot_headers_statuses : number_of_slots:int -> (Dal_plugin.slot_index -> bool) -> Store.t -> - unit tzresult Lwt.t + unit (** [get_slot_status ~slot_id store] returns the status associated to the accepted slot of id [slot_id] or [None] if no status is currently - stored for that slot id. + stored for that slot id. Relies on a cache and the skip list store. + i.e. only works for operator nodes. *) val get_slot_status : slot_id:Types.slot_id -> diff --git a/src/lib_dal_node/store.ml b/src/lib_dal_node/store.ml index 39c2a3283e53..36386ebd0f62 100644 --- a/src/lib_dal_node/store.ml +++ b/src/lib_dal_node/store.ml @@ -25,6 +25,17 @@ module Profiler = (val Profiler.wrap Dal_profiler.dal_profiler) +(** FIFO-keyed map with slot_id as keys. *) +module Slot_map = + Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) + (struct + type t = Types.Slot_id.t + + let equal = Types.Slot_id.equal + + let hash = Types.Slot_id.hash + end) + module Version = struct type t = int @@ -46,8 +57,9 @@ module Version = struct - 0: came with octez release v20; used Irmin for storing slots - 1: removed Irmin dependency; added slot and status stores; changed layout of shard store by indexing on slot ids instead of commitments - - 2: switch the KVS skip list store for a sqlite3 one. *) - let current_version = 2 + - 2: switch the KVS skip list store for a sqlite3 one. + - 3: remove status store, keep cache. *) + let current_version = 3 type error += Could_not_read_data_dir_version of string @@ -134,8 +146,6 @@ module Stores_dirs = struct let slot = "slot_store" - let status = "status_store" - let skip_list_cells = "skip_list_store" end @@ -320,18 +330,6 @@ module Shards_disk = struct end module Shards_cache = struct - (** Underlying FIFO-keyed map from slot_id -> map from shard index to share. This is - used as the alternative to the on-disk storage [Shards_disk] for shards. *) - module Slot_map = - Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) - (struct - type t = Types.Slot_id.t - - let equal = Types.Slot_id.equal - - let hash = Types.Slot_id.hash - end) - module Int_map = Map.Make (Int) type t = Cryptobox.share Int_map.t Slot_map.t @@ -663,91 +661,39 @@ module Traps = struct [] end -module Statuses = struct - type t = (int32, int, Types.header_status) KVS.t +module Statuses_cache = struct + type t = Types.header_status Slot_map.t - let file_layout ~root_dir slot_level = - (* The number of entries per file is the number of slots. We put - here the max value (4096) because we don't have a cryptobox - at hand to get the number_of_slots parameter. *) - let number_of_keys_per_file = 4096 in - let level_string = Format.asprintf "%ld" slot_level in - let filepath = Filename.concat root_dir level_string in - Key_value_store.layout - ~encoding:Types.header_status_encoding - ~filepath - ~eq:Stdlib.( = ) - ~index_of:Fun.id - ~number_of_keys_per_file - () - - let init node_store_dir status_store_dir = - let root_dir = Filename.concat node_store_dir status_store_dir in - KVS.init ~lru_size:Constants.status_store_lru_size ~root_dir + let init = Slot_map.create let add_status t status (slot_id : Types.slot_id) = - let open Lwt_result_syntax in - let* () = - KVS.write_value - ~override:true - t - file_layout - slot_id.slot_level - slot_id.slot_index - status - |> Errors.other_lwt_result - in - let*! () = - Event.emit_stored_slot_status - ~published_level:slot_id.slot_level - ~slot_index:slot_id.slot_index - ~status - in - return_unit + Slot_map.replace t slot_id status - let find_status t (slot_id : Types.slot_id) = - let open Lwt_result_syntax in - let*! res = - KVS.read_value t file_layout slot_id.slot_level slot_id.slot_index - in - match res with - | Ok status -> return status - | Error [KVS.Missing_stored_kvs_data _] -> fail Errors.not_found - | Error err -> - let data_kind = Types.Store.Header_status in - fail @@ Errors.decoding_failed data_kind err + let get_slot_status = Slot_map.find_opt let update_slot_headers_attestation ~published_level ~number_of_slots t attested = - let open Lwt_result_syntax in - List.iter_es + List.iter (fun slot_index -> let index = Types.Slot_id.{slot_level = published_level; slot_index} in if attested slot_index then ( Dal_metrics.slot_attested ~set:true slot_index ; - add_status t `Attested index |> Errors.to_tzresult) + add_status t `Attested index) else - let* old_data_opt = - find_status t index |> Errors.to_option_tzresult - in + let old_data_opt = get_slot_status t index in Dal_metrics.slot_attested ~set:false slot_index ; - if Option.is_some old_data_opt then - add_status t `Unattested index |> Errors.to_tzresult + if Option.is_some old_data_opt then add_status t `Unattested index else (* There is no header that has been included in a block and selected for this index. So, the slot cannot be attested or unattested. *) - return_unit) + ()) (0 -- (number_of_slots - 1)) let update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested t = let published_level = Int32.(sub block_level (of_int attestation_lag)) in update_slot_headers_attestation ~published_level ~number_of_slots t attested - - let get_slot_status ~slot_id t = find_status t slot_id - - let remove_level_status ~level t = KVS.remove_file t file_layout level end module Commitment_indexed_cache = @@ -848,7 +794,7 @@ end (** Store context *) type t = { - slot_header_statuses : Statuses.t; + statuses_cache : Statuses_cache.t; shards : Shards.t; slots : Slots.t; traps : Traps.t; @@ -877,7 +823,7 @@ let shards {shards; _} = shards let skip_list_cells t = t.skip_list_cells_store -let slot_header_statuses {slot_header_statuses; _} = slot_header_statuses +let statuses_cache {statuses_cache; _} = statuses_cache let slots {slots; _} = slots @@ -972,7 +918,7 @@ let init config profile_ctxt proto_parameters = let open Lwt_result_syntax in let base_dir = Configuration_file.store_path config in let* () = check_version_and_may_upgrade base_dir in - let* slot_header_statuses = Statuses.init base_dir Stores_dirs.status in + let statuses_cache = Statuses_cache.init Constants.statuses_cache_size in let* shards = Shards.init ~profile_ctxt ~proto_parameters base_dir Stores_dirs.shard in @@ -989,7 +935,7 @@ let init config profile_ctxt proto_parameters = shards; slots; traps; - slot_header_statuses; + statuses_cache; cache = Commitment_indexed_cache.create Constants.cache_size; finalized_commitments = Slot_id_cache.create ~capacity:Constants.slot_id_cache_size; @@ -1001,10 +947,10 @@ let init config profile_ctxt proto_parameters = let add_slot_headers ~number_of_slots ~block_level slot_headers t = let module SI = Set.Make (Int) in - let open Lwt_result_syntax in - let slot_header_statuses = t.slot_header_statuses in + let open Lwt_syntax in + let statuses_cache = t.statuses_cache in let* waiting = - List.fold_left_es + List.fold_left_s (fun waiting slot_header -> let Dal_plugin.{slot_index; commitment = _; published_level} = slot_header @@ -1012,9 +958,8 @@ let add_slot_headers ~number_of_slots ~block_level slot_headers t = (* This invariant should hold. *) assert (Int32.equal published_level block_level) ; let index = Types.Slot_id.{slot_level = published_level; slot_index} in - let* () = - Statuses.add_status slot_header_statuses `Waiting_attestation index - |> Errors.to_tzresult + let () = + Statuses_cache.add_status statuses_cache `Waiting_attestation index in Slot_id_cache.add ~number_of_slots t.finalized_commitments slot_header ; return (SI.add slot_index waiting)) diff --git a/src/lib_dal_node/store.mli b/src/lib_dal_node/store.mli index 3f91eb64228d..69c9c4c81a1d 100644 --- a/src/lib_dal_node/store.mli +++ b/src/lib_dal_node/store.mli @@ -88,8 +88,8 @@ module Slot_id_cache : sig val find_opt : t -> Types.slot_id -> commitment option end -module Statuses : sig - (** A store keeping the attestation status of slot ids. *) +module Statuses_cache : sig + (** A cache keeping the attestation status of slot ids. *) type t @@ -104,19 +104,11 @@ module Statuses : sig number_of_slots:int -> (int -> bool) -> t -> - unit tzresult Lwt.t - - (** [get_slot_status ~slot_id store] returns the status associated - to the given accepted [slot_id], or [None] if no status is - associated to the [slot_id]. *) - val get_slot_status : - slot_id:Types.slot_id -> - t -> - (Types.header_status, [> Errors.other | Errors.not_found]) result Lwt.t + unit - (** [remove_level_status ~level store] removes the status of all the - slot ids published at the given level. *) - val remove_level_status : level:int32 -> t -> unit tzresult Lwt.t + (** [get_slot_status cache ~slot_id] returns the status associated + to the given [slot_id], if any. *) + val get_slot_status : t -> Types.slot_id -> Types.header_status option end module Commitment_indexed_cache : sig @@ -196,9 +188,9 @@ val shards : t -> Shards.t with the store [t]. *) val skip_list_cells : t -> Dal_store_sqlite3.Skip_list_cells.t -(** [slot_header_statuses t] returns the statuses store associated with the store +(** [statuses_cache t] returns the statuses cache associated with the store [t]. *) -val slot_header_statuses : t -> Statuses.t +val statuses_cache : t -> Statuses_cache.t (** [slots t] returns the slots store associated with the store [t]. *) @@ -241,7 +233,7 @@ val add_slot_headers : block_level:int32 -> Dal_plugin.slot_header list -> t -> - unit tzresult Lwt.t + unit Lwt.t (** [Skip_list_cells] manages the storage of [Skip_list_cell.t] *) module Skip_list_cells : sig -- GitLab From 01775838bc1d2c67dcde26b72fbfc6b0d52cadcd Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Wed, 15 Oct 2025 16:17:53 +0200 Subject: [PATCH 4/5] DAL: implement store upgrade from version 2 to 3 --- src/lib_dal_node/store.ml | 54 +++++++++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 10 deletions(-) diff --git a/src/lib_dal_node/store.ml b/src/lib_dal_node/store.ml index 36386ebd0f62..453d8dd19a33 100644 --- a/src/lib_dal_node/store.ml +++ b/src/lib_dal_node/store.ml @@ -131,11 +131,11 @@ module Version = struct (fun () -> Data_encoding.Json.destruct encoding json |> return) (fun _ -> tzfail (Could_not_read_data_dir_version file_path)) - let write_version_file ~base_dir = + let write_version_file ~version ~base_dir = let version_file = version_file_path ~base_dir in Lwt_utils_unix.Json.write_file version_file - (Data_encoding.Json.construct encoding current_version) + (Data_encoding.Json.construct encoding version) |> trace (Could_not_write_version_file version_file) end @@ -890,12 +890,35 @@ let cache_entry node_store commitment slot shares shard_proofs = commitment (slot, shares, shard_proofs) +let upgrade_from_v2_to_v3 ~base_dir = + let open Lwt_result_syntax in + let*! () = + Event.emit_store_upgrade_start + ~old_version:(Version.make 2) + ~new_version:(Version.make 3) + in + let file_path = Filename.concat base_dir "status_store" in + let*! exists = Lwt_unix.file_exists file_path in + let*! () = + if exists then Lwt_utils_unix.remove_dir file_path else Lwt.return_unit + in + Version.write_version_file ~base_dir ~version:3 + +(* [upgradable old_version new_version] returns an upgrade function if + the store is upgradable from [old_version] to [new_version]. Otherwise it + returns [None]. *) +let upgradable old_version new_version : + (base_dir:string -> unit tzresult Lwt.t) option = + match (old_version, new_version) with + | 2, 3 -> Some upgrade_from_v2_to_v3 + | _ -> None + (* Checks the version of the store with the respect to the current version. Returns [None] if the store does not need an upgrade and [Some upgrade] if the store is upgradable, where [upgrade] is a function that can be used to upgrade the store. It returns an error if the version is incompatible with the current one. *) -let check_version_and_may_upgrade base_dir = +let rec check_version_and_may_upgrade base_dir = let open Lwt_result_syntax in let file_path = Version.version_file_path ~base_dir in let*! exists = Lwt_unix.file_exists file_path in @@ -904,15 +927,26 @@ let check_version_and_may_upgrade base_dir = else (* In the absence of a version file, we use an heuristic to determine the version. *) - let*! exists = Lwt_unix.file_exists (Filename.concat base_dir "index") in - return - @@ if exists then Version.make 0 else Version.make Version.current_version + let*! index = Lwt_unix.file_exists (Filename.concat base_dir "index") in + if index then return (Version.make 0) + else + let*! status = + Lwt_unix.file_exists (Filename.concat base_dir "status_store") + in + if status then return (Version.make 2) + else return (Version.make Version.current_version) in if Version.(equal version current_version) then return_unit else - tzfail - (Version.Invalid_data_dir_version - {actual = version; expected = Version.current_version}) + match upgradable version Version.current_version with + | Some upgrade -> + let* () = upgrade ~base_dir in + (* Now that we upgraded, check version again *) + check_version_and_may_upgrade base_dir + | None -> + tzfail + (Version.Invalid_data_dir_version + {actual = version; expected = Version.current_version}) let init config profile_ctxt proto_parameters = let open Lwt_result_syntax in @@ -923,7 +957,7 @@ let init config profile_ctxt proto_parameters = Shards.init ~profile_ctxt ~proto_parameters base_dir Stores_dirs.shard in let* slots = Slots.init base_dir Stores_dirs.slot in - let* () = Version.write_version_file ~base_dir in + let* () = Version.(write_version_file ~base_dir ~version:current_version) in let traps = Traps.create ~capacity:Constants.traps_cache_size in let* chain_id = Chain_id.init ~root_dir:base_dir in let* last_processed_level = Last_processed_level.init ~root_dir:base_dir in -- GitLab From 593fe903cd86ca06b4e610fda97b5ed92d364017 Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Wed, 15 Oct 2025 09:26:07 +0200 Subject: [PATCH 5/5] DAL: document RPC behavior change --- CHANGES.rst | 9 ++++++++- src/lib_dal_node_services/services.ml | 3 ++- src/lib_dal_node_services/services.mli | 5 ++++- .../Alpha- Testing DAL node (dal node list RPCs).out | 2 +- .../S023-- Testing DAL node (dal node list RPCs).out | 2 +- .../T024-- Testing DAL node (dal node list RPCs).out | 2 +- 6 files changed, 17 insertions(+), 6 deletions(-) diff --git a/CHANGES.rst b/CHANGES.rst index 6d1c3ed1acc8..c10ae1126ab4 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -122,10 +122,17 @@ DAL node node's store. If traps are detected within the slot, then it should not be attested, so the id is not sent via the stream. (MR :gl:`!19459`) -- The DAL node now starts propagating shards one level after the inclusion of the +- The DAL node now starts propagating shards one level after the inclusion of the corresponding published slot header operation (i.e., when the operation is finalized), instead of two levels after, when the block is finalized. (MR :gl:`!19366`) +- **Breaking change** Slots status are not stored in dedicated files on disk + anymore, but found in a cache and the skip list. A consequence of this is that + the ``/levels//slots//status`` will only work with nodes that store the + skip list, and therefore not with observer nodes. Also, the RPC will now answer + a 500 error if querying a level at which the DAL was not supported instead + of a 404 error. (MR :gl:`!19471`) + - **Breaking change** Enforced stricter validation for the JSON configuration file. Previously, the parser would silently ignore any content that appeared after the first valid JSON object. Now, any extraneous data will cause the diff --git a/src/lib_dal_node_services/services.ml b/src/lib_dal_node_services/services.ml index 2025f3bcb086..05ade1d81993 100644 --- a/src/lib_dal_node_services/services.ml +++ b/src/lib_dal_node_services/services.ml @@ -289,7 +289,8 @@ let get_slot_status : ; query : unit > service = Tezos_rpc.Service.get_service - ~description:"Return the status for the given slot." + ~description: + "Return the status for the given slot. For operator nodes only." ~query:Tezos_rpc.Query.empty ~output:header_status_encoding Tezos_rpc.Path.( diff --git a/src/lib_dal_node_services/services.mli b/src/lib_dal_node_services/services.mli index 4cd913c2fc26..96e4edb99342 100644 --- a/src/lib_dal_node_services/services.mli +++ b/src/lib_dal_node_services/services.mli @@ -137,7 +137,10 @@ val get_slot_commitment : ; query : unit > service -(** Return the status for the given slot. *) +(** Return the status of the given slot. It first looks for the status in its + cache, which is available only during the attestation window. If not found + in the cache, it fetches the status in the corresponding cell in the + skip-list store; this therefore works only on operator nodes. *) val get_slot_status : < meth : [`GET] ; input : unit diff --git a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out index afece719412c..618dfd3794d4 100644 --- a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node. diff --git a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out index afece719412c..618dfd3794d4 100644 --- a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node. diff --git a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out index afece719412c..618dfd3794d4 100644 --- a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node. -- GitLab