diff --git a/CHANGES.rst b/CHANGES.rst index 6d1c3ed1acc86518fba96226ecddcc23f820403e..c10ae1126ab4a99cda4f86aa66e6c8395aa4d141 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -122,10 +122,17 @@ DAL node node's store. If traps are detected within the slot, then it should not be attested, so the id is not sent via the stream. (MR :gl:`!19459`) -- The DAL node now starts propagating shards one level after the inclusion of the +- The DAL node now starts propagating shards one level after the inclusion of the corresponding published slot header operation (i.e., when the operation is finalized), instead of two levels after, when the block is finalized. (MR :gl:`!19366`) +- **Breaking change** Slots status are not stored in dedicated files on disk + anymore, but found in a cache and the skip list. A consequence of this is that + the ``/levels//slots//status`` will only work with nodes that store the + skip list, and therefore not with observer nodes. Also, the RPC will now answer + a 500 error if querying a level at which the DAL was not supported instead + of a 404 error. (MR :gl:`!19471`) + - **Breaking change** Enforced stricter validation for the JSON configuration file. Previously, the parser would silently ignore any content that appeared after the first valid JSON object. Now, any extraneous data will cause the diff --git a/src/lib_dal_node/RPC_server.ml b/src/lib_dal_node/RPC_server.ml index 3abc918b5391c3fe243ba9b69c193889e6727146..62db830e3559e6fe46cd426ea74660deaf79b2e2 100644 --- a/src/lib_dal_node/RPC_server.ml +++ b/src/lib_dal_node/RPC_server.ml @@ -253,9 +253,8 @@ module Slots_handlers = struct let get_slot_status ctxt slot_level slot_index () () = call_handler1 (fun () -> - let store = Node_context.get_store ctxt in let slot_id : Types.slot_id = {slot_level; slot_index} in - Slot_manager.get_slot_status ~slot_id store) + Slot_manager.get_slot_status ~slot_id ctxt) let get_slot_shard ctxt slot_level slot_index shard_index () () = call_handler1 (fun () -> @@ -313,10 +312,9 @@ module Profile_handlers = struct let get_assigned_shard_indices ctxt pkh level () () = Node_context.fetch_assigned_shard_indices ctxt ~level ~pkh - let warn_missing_shards store attester published_level + let warn_missing_shards ctxt attester published_level expected_number_of_shards number_of_stored_shards_per_slot = let open Lwt_syntax in - let statuses_store = Store.slot_header_statuses store in let* problems = List.filter_map_s (fun (Types.Slot_id.{slot_index; _}, num_stored) -> @@ -324,8 +322,8 @@ module Profile_handlers = struct Lwt.return_some (`Ok (slot_index, num_stored)) else let* res = - Store.Statuses.get_slot_status - statuses_store + Slot_manager.get_slot_status + ctxt ~slot_id:{slot_level = published_level; slot_index} in match res with @@ -484,7 +482,7 @@ module Profile_handlers = struct Lwt.dont_wait (fun () -> warn_missing_shards - store + ctxt pkh published_level number_of_assigned_shards diff --git a/src/lib_dal_node/block_handler.ml b/src/lib_dal_node/block_handler.ml index cc65cc346b7da8cbd340a3d5c61e3e225f29386c..d0e21aaba0b6fb643852ed9bcfe4af6e6563b4c0 100644 --- a/src/lib_dal_node/block_handler.ml +++ b/src/lib_dal_node/block_handler.ml @@ -90,17 +90,6 @@ let remove_old_level_stored_data proto_parameters ctxt current_level = else return_unit in let number_of_slots = proto_parameters.Types.number_of_slots in - let* () = - let* res = - Store.Statuses.remove_level_status - ~level:oldest_level - (Store.slot_header_statuses store) - in - match res with - | Ok () -> Event.emit_removed_status ~level:oldest_level - | Error error -> - Event.emit_removing_status_failed ~level:oldest_level ~error - in List.iter_s (fun slot_index -> let slot_id : Types.slot_id = @@ -382,7 +371,7 @@ let process_commitments ctxt cctxt store proto_parameters block_level ~block_level cctxt [@profiler.record_s {verbosity = Notice} "slot_headers"]) in - let* () = + let*! () = (Slot_manager.store_slot_headers ~number_of_slots:proto_parameters.Types.number_of_slots ~block_level @@ -443,14 +432,14 @@ let process_finalized_block_data ctxt cctxt store ~prev_proto_parameters (Plugin.dal_attestation block_info [@profiler.record_f {verbosity = Notice} "dal_attestation"]) in - let* () = + let () = (Slot_manager.update_selected_slot_headers_statuses ~block_level ~attestation_lag:proto_parameters.Types.attestation_lag ~number_of_slots:proto_parameters.number_of_slots (Plugin.is_attested dal_attestation) store - [@profiler.record_s + [@profiler.record_f {verbosity = Notice} "update_selected_slot_headers_statuses"]) in let*! () = diff --git a/src/lib_dal_node/constants.ml b/src/lib_dal_node/constants.ml index 8f9f353c6e9821bbc8061bc9d8254eec512fccfa..aec6dec9f4cef87698e94ce9c311aeb7817f2554 100644 --- a/src/lib_dal_node/constants.ml +++ b/src/lib_dal_node/constants.ml @@ -58,6 +58,16 @@ let cache_size = slots multiplied by the attestation lag, which sounds reasonable. *) let slot_id_cache_size = 32 * 8 +(* This cache is used for transient info. Permanent info is stored on disk. + i.e. you need [number_of_slots * (attestation_lag + 1) * 2]. + That's because the slot status of a slot published at [L] is first added + to the cache at level [L + 1], and then updated at level + [L + attestation_lag + tb_finality]. Also, when a block is finalized, + the slots will be updated with a attested/unattested status, and you don't + want it to erase later levels from statuses cache. Using twice the cache + size solves this problem. *) +let statuses_cache_size = 32 * (8 + 1) * 2 + let shards_verification_sampling_frequency = 100 let amplification_timeout = 120. diff --git a/src/lib_dal_node/constants.mli b/src/lib_dal_node/constants.mli index 014c228644bb05534a08914135f7a86d09abe7aa..e02f22d85a1d633165442e20f7573e183fd04a1f 100644 --- a/src/lib_dal_node/constants.mli +++ b/src/lib_dal_node/constants.mli @@ -43,6 +43,10 @@ val cache_size : int associate commitments with slot ids at a given level. *) val slot_id_cache_size : int +(** [statuses_cache_size] is the size (in number of slots) of the cache + to associate slot ids to slot status. *) +val statuses_cache_size : int + (** The frequency at which we sample the time spent in shards crypto verification. *) val shards_verification_sampling_frequency : int diff --git a/src/lib_dal_node/event.ml b/src/lib_dal_node/event.ml index 71e74f183444026876ab48bdf60447245849aea8..4563203a263d958bcedbec8c14956944298e4038 100644 --- a/src/lib_dal_node/event.ml +++ b/src/lib_dal_node/event.ml @@ -376,19 +376,6 @@ open struct let cached_slot_shard = cached_or_stored_slot_shard ~kind:"cached" - let stored_slot_status = - declare_3 - ~section - ~prefix_name_with_section:true - ~name:"stored_slot_status" - ~msg: - "stored slot status for level {published_level} and index \ - {slot_index}: {status}" - ~level:Debug - ("published_level", Data_encoding.int32) - ("slot_index", Data_encoding.int31) - ("status", Types.header_status_encoding) - let removed_slot_shards = declare_2 ~section @@ -409,15 +396,6 @@ open struct ("published_level", Data_encoding.int32) ("slot_index", Data_encoding.int31) - let removed_status = - declare_1 - ~section - ~prefix_name_with_section:true - ~name:"removed_status" - ~msg:"removed statuses for level {level}" - ~level:Debug - ("level", Data_encoding.int32) - let slot_header_status_storage_error = declare_3 ~section @@ -483,16 +461,6 @@ open struct ("slot_index", Data_encoding.int31) ("error", Error_monad.trace_encoding) - let removing_status_failed = - declare_2 - ~section - ~prefix_name_with_section:true - ~name:"removing_status_failed" - ~level:Warning - ~msg:"removing status file for level {level} failed: {error}" - ("level", Data_encoding.int32) - ("error", Error_monad.trace_encoding) - let removing_skip_list_cells_failed = declare_2 ~section @@ -1416,17 +1384,12 @@ let emit_stored_slot_shard ~published_level ~slot_index ~shard_index = let emit_cached_slot_shard ~published_level ~slot_index ~shard_index = emit cached_slot_shard (published_level, slot_index, shard_index) -let emit_stored_slot_status ~published_level ~slot_index ~status = - emit stored_slot_status (published_level, slot_index, status) - let emit_removed_slot_shards ~published_level ~slot_index = emit removed_slot_shards (published_level, slot_index) let emit_removed_slot ~published_level ~slot_index = emit removed_slot (published_level, slot_index) -let emit_removed_status ~level = emit removed_status level - let emit_slot_header_status_storage_error ~published_level ~slot_index ~error = emit slot_header_status_storage_error (published_level, slot_index, error) @@ -1444,9 +1407,6 @@ let emit_removing_shards_failed ~published_level ~slot_index ~error = let emit_removing_slot_failed ~published_level ~slot_index ~error = emit removing_slot_failed (published_level, slot_index, error) -let emit_removing_status_failed ~level ~error = - emit removing_status_failed (level, error) - let emit_removing_skip_list_cells_failed ~level ~error = emit removing_skip_list_cells_failed (level, error) diff --git a/src/lib_dal_node/slot_manager.ml b/src/lib_dal_node/slot_manager.ml index 3bccdcfedae49513437ac6ce65866a18775a3109..2f8d12e801d9a1efd2649b8bb7812a4dd84f51ab 100644 --- a/src/lib_dal_node/slot_manager.ml +++ b/src/lib_dal_node/slot_manager.ml @@ -773,19 +773,67 @@ let publish_slot_data ctxt ~level_committee ~slot_size gs_worker let store_slot_headers ~number_of_slots ~block_level slot_headers node_store = Store.(add_slot_headers ~number_of_slots ~block_level slot_headers node_store) +module Statuses = struct + let get_status_from_skip_list ctxt (slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let* slot_cell = + Store.Skip_list_cells.find_by_slot_id_opt + (Node_context.get_store ctxt) + slot_id + in + match slot_cell with + | None -> return_none + | Some cell -> ( + let*? proto_parameters = + Node_context.get_proto_parameters + ctxt + ~level:(`Level slot_id.slot_level) + in + let attested_level = + Int32.( + add slot_id.slot_level (of_int proto_parameters.attestation_lag)) + in + let*? (module Plugin : Dal_plugin.T) = + Node_context.get_plugin_for_level ctxt ~level:attested_level + in + let cell = + Dal_proto_types.Skip_list_cell.to_proto + Plugin.Skip_list.cell_encoding + cell + in + match Plugin.Skip_list.proto_attestation_status cell with + | None -> + (* Old protocols that do not expose the information *) + return_none + | Some `Unpublished -> return_none + | Some `Attested -> return_some `Attested + | Some `Unattested -> return_some `Unattested) + + let find_status ctxt (slot_id : Types.slot_id) = + let open Lwt_result_syntax in + let store = Node_context.get_store ctxt in + let statuses_cache = Store.statuses_cache store in + match Store.Statuses_cache.get_slot_status statuses_cache slot_id with + | Some status -> return status + | None -> ( + let*! status = get_status_from_skip_list ctxt slot_id in + match status with + | Ok (Some res) -> return res + | Ok None -> fail `Not_found + | Error e -> fail (`Other e)) +end + let update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots node_store = - let slot_header_statuses_store = Store.slot_header_statuses node_store in - Store.Statuses.update_selected_slot_headers_statuses + let statuses_cache = Store.statuses_cache node_store in + Store.Statuses_cache.update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots - slot_header_statuses_store + statuses_cache -let get_slot_status ~slot_id node_store = - let slot_header_statuses_store = Store.slot_header_statuses node_store in - Store.Statuses.get_slot_status ~slot_id slot_header_statuses_store +let get_slot_status ~slot_id ctxt = Statuses.find_status ctxt slot_id let get_slot_shard (store : Store.t) (slot_id : Types.slot_id) shard_index = Store.Shards.read (Store.shards store) slot_id shard_index diff --git a/src/lib_dal_node/slot_manager.mli b/src/lib_dal_node/slot_manager.mli index 4c83796fb03e836034c164fa17968235c2db787a..1ae5cfb180cc644b8d25815560d1bd361f7ae6d4 100644 --- a/src/lib_dal_node/slot_manager.mli +++ b/src/lib_dal_node/slot_manager.mli @@ -161,7 +161,7 @@ val store_slot_headers : block_level:int32 -> Dal_plugin.slot_header list -> Store.t -> - unit tzresult Lwt.t + unit Lwt.t (** [update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested_slots store] updates the statuses of the @@ -179,15 +179,16 @@ val update_selected_slot_headers_statuses : number_of_slots:int -> (Dal_plugin.slot_index -> bool) -> Store.t -> - unit tzresult Lwt.t + unit (** [get_slot_status ~slot_id store] returns the status associated to the accepted slot of id [slot_id] or [None] if no status is currently - stored for that slot id. + stored for that slot id. Relies on a cache and the skip list store. + i.e. only works for operator nodes. *) val get_slot_status : slot_id:Types.slot_id -> - Store.t -> + Node_context.t -> (Types.header_status, [Errors.other | Errors.not_found]) result Lwt.t (** [get_slot_shard store slot_id shard_index] returns the shard at diff --git a/src/lib_dal_node/store.ml b/src/lib_dal_node/store.ml index 39c2a3283e535f7dcaef8b4ca7ed72516f80387b..453d8dd19a335a02fa44cbdbfe5d0e842cd0bdbc 100644 --- a/src/lib_dal_node/store.ml +++ b/src/lib_dal_node/store.ml @@ -25,6 +25,17 @@ module Profiler = (val Profiler.wrap Dal_profiler.dal_profiler) +(** FIFO-keyed map with slot_id as keys. *) +module Slot_map = + Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) + (struct + type t = Types.Slot_id.t + + let equal = Types.Slot_id.equal + + let hash = Types.Slot_id.hash + end) + module Version = struct type t = int @@ -46,8 +57,9 @@ module Version = struct - 0: came with octez release v20; used Irmin for storing slots - 1: removed Irmin dependency; added slot and status stores; changed layout of shard store by indexing on slot ids instead of commitments - - 2: switch the KVS skip list store for a sqlite3 one. *) - let current_version = 2 + - 2: switch the KVS skip list store for a sqlite3 one. + - 3: remove status store, keep cache. *) + let current_version = 3 type error += Could_not_read_data_dir_version of string @@ -119,11 +131,11 @@ module Version = struct (fun () -> Data_encoding.Json.destruct encoding json |> return) (fun _ -> tzfail (Could_not_read_data_dir_version file_path)) - let write_version_file ~base_dir = + let write_version_file ~version ~base_dir = let version_file = version_file_path ~base_dir in Lwt_utils_unix.Json.write_file version_file - (Data_encoding.Json.construct encoding current_version) + (Data_encoding.Json.construct encoding version) |> trace (Could_not_write_version_file version_file) end @@ -134,8 +146,6 @@ module Stores_dirs = struct let slot = "slot_store" - let status = "status_store" - let skip_list_cells = "skip_list_store" end @@ -320,18 +330,6 @@ module Shards_disk = struct end module Shards_cache = struct - (** Underlying FIFO-keyed map from slot_id -> map from shard index to share. This is - used as the alternative to the on-disk storage [Shards_disk] for shards. *) - module Slot_map = - Aches.Vache.Map (Aches.Vache.FIFO_Precise) (Aches.Vache.Strong) - (struct - type t = Types.Slot_id.t - - let equal = Types.Slot_id.equal - - let hash = Types.Slot_id.hash - end) - module Int_map = Map.Make (Int) type t = Cryptobox.share Int_map.t Slot_map.t @@ -663,91 +661,39 @@ module Traps = struct [] end -module Statuses = struct - type t = (int32, int, Types.header_status) KVS.t +module Statuses_cache = struct + type t = Types.header_status Slot_map.t - let file_layout ~root_dir slot_level = - (* The number of entries per file is the number of slots. We put - here the max value (4096) because we don't have a cryptobox - at hand to get the number_of_slots parameter. *) - let number_of_keys_per_file = 4096 in - let level_string = Format.asprintf "%ld" slot_level in - let filepath = Filename.concat root_dir level_string in - Key_value_store.layout - ~encoding:Types.header_status_encoding - ~filepath - ~eq:Stdlib.( = ) - ~index_of:Fun.id - ~number_of_keys_per_file - () - - let init node_store_dir status_store_dir = - let root_dir = Filename.concat node_store_dir status_store_dir in - KVS.init ~lru_size:Constants.status_store_lru_size ~root_dir + let init = Slot_map.create let add_status t status (slot_id : Types.slot_id) = - let open Lwt_result_syntax in - let* () = - KVS.write_value - ~override:true - t - file_layout - slot_id.slot_level - slot_id.slot_index - status - |> Errors.other_lwt_result - in - let*! () = - Event.emit_stored_slot_status - ~published_level:slot_id.slot_level - ~slot_index:slot_id.slot_index - ~status - in - return_unit + Slot_map.replace t slot_id status - let find_status t (slot_id : Types.slot_id) = - let open Lwt_result_syntax in - let*! res = - KVS.read_value t file_layout slot_id.slot_level slot_id.slot_index - in - match res with - | Ok status -> return status - | Error [KVS.Missing_stored_kvs_data _] -> fail Errors.not_found - | Error err -> - let data_kind = Types.Store.Header_status in - fail @@ Errors.decoding_failed data_kind err + let get_slot_status = Slot_map.find_opt let update_slot_headers_attestation ~published_level ~number_of_slots t attested = - let open Lwt_result_syntax in - List.iter_es + List.iter (fun slot_index -> let index = Types.Slot_id.{slot_level = published_level; slot_index} in if attested slot_index then ( Dal_metrics.slot_attested ~set:true slot_index ; - add_status t `Attested index |> Errors.to_tzresult) + add_status t `Attested index) else - let* old_data_opt = - find_status t index |> Errors.to_option_tzresult - in + let old_data_opt = get_slot_status t index in Dal_metrics.slot_attested ~set:false slot_index ; - if Option.is_some old_data_opt then - add_status t `Unattested index |> Errors.to_tzresult + if Option.is_some old_data_opt then add_status t `Unattested index else (* There is no header that has been included in a block and selected for this index. So, the slot cannot be attested or unattested. *) - return_unit) + ()) (0 -- (number_of_slots - 1)) let update_selected_slot_headers_statuses ~block_level ~attestation_lag ~number_of_slots attested t = let published_level = Int32.(sub block_level (of_int attestation_lag)) in update_slot_headers_attestation ~published_level ~number_of_slots t attested - - let get_slot_status ~slot_id t = find_status t slot_id - - let remove_level_status ~level t = KVS.remove_file t file_layout level end module Commitment_indexed_cache = @@ -848,7 +794,7 @@ end (** Store context *) type t = { - slot_header_statuses : Statuses.t; + statuses_cache : Statuses_cache.t; shards : Shards.t; slots : Slots.t; traps : Traps.t; @@ -877,7 +823,7 @@ let shards {shards; _} = shards let skip_list_cells t = t.skip_list_cells_store -let slot_header_statuses {slot_header_statuses; _} = slot_header_statuses +let statuses_cache {statuses_cache; _} = statuses_cache let slots {slots; _} = slots @@ -944,12 +890,35 @@ let cache_entry node_store commitment slot shares shard_proofs = commitment (slot, shares, shard_proofs) +let upgrade_from_v2_to_v3 ~base_dir = + let open Lwt_result_syntax in + let*! () = + Event.emit_store_upgrade_start + ~old_version:(Version.make 2) + ~new_version:(Version.make 3) + in + let file_path = Filename.concat base_dir "status_store" in + let*! exists = Lwt_unix.file_exists file_path in + let*! () = + if exists then Lwt_utils_unix.remove_dir file_path else Lwt.return_unit + in + Version.write_version_file ~base_dir ~version:3 + +(* [upgradable old_version new_version] returns an upgrade function if + the store is upgradable from [old_version] to [new_version]. Otherwise it + returns [None]. *) +let upgradable old_version new_version : + (base_dir:string -> unit tzresult Lwt.t) option = + match (old_version, new_version) with + | 2, 3 -> Some upgrade_from_v2_to_v3 + | _ -> None + (* Checks the version of the store with the respect to the current version. Returns [None] if the store does not need an upgrade and [Some upgrade] if the store is upgradable, where [upgrade] is a function that can be used to upgrade the store. It returns an error if the version is incompatible with the current one. *) -let check_version_and_may_upgrade base_dir = +let rec check_version_and_may_upgrade base_dir = let open Lwt_result_syntax in let file_path = Version.version_file_path ~base_dir in let*! exists = Lwt_unix.file_exists file_path in @@ -958,26 +927,37 @@ let check_version_and_may_upgrade base_dir = else (* In the absence of a version file, we use an heuristic to determine the version. *) - let*! exists = Lwt_unix.file_exists (Filename.concat base_dir "index") in - return - @@ if exists then Version.make 0 else Version.make Version.current_version + let*! index = Lwt_unix.file_exists (Filename.concat base_dir "index") in + if index then return (Version.make 0) + else + let*! status = + Lwt_unix.file_exists (Filename.concat base_dir "status_store") + in + if status then return (Version.make 2) + else return (Version.make Version.current_version) in if Version.(equal version current_version) then return_unit else - tzfail - (Version.Invalid_data_dir_version - {actual = version; expected = Version.current_version}) + match upgradable version Version.current_version with + | Some upgrade -> + let* () = upgrade ~base_dir in + (* Now that we upgraded, check version again *) + check_version_and_may_upgrade base_dir + | None -> + tzfail + (Version.Invalid_data_dir_version + {actual = version; expected = Version.current_version}) let init config profile_ctxt proto_parameters = let open Lwt_result_syntax in let base_dir = Configuration_file.store_path config in let* () = check_version_and_may_upgrade base_dir in - let* slot_header_statuses = Statuses.init base_dir Stores_dirs.status in + let statuses_cache = Statuses_cache.init Constants.statuses_cache_size in let* shards = Shards.init ~profile_ctxt ~proto_parameters base_dir Stores_dirs.shard in let* slots = Slots.init base_dir Stores_dirs.slot in - let* () = Version.write_version_file ~base_dir in + let* () = Version.(write_version_file ~base_dir ~version:current_version) in let traps = Traps.create ~capacity:Constants.traps_cache_size in let* chain_id = Chain_id.init ~root_dir:base_dir in let* last_processed_level = Last_processed_level.init ~root_dir:base_dir in @@ -989,7 +969,7 @@ let init config profile_ctxt proto_parameters = shards; slots; traps; - slot_header_statuses; + statuses_cache; cache = Commitment_indexed_cache.create Constants.cache_size; finalized_commitments = Slot_id_cache.create ~capacity:Constants.slot_id_cache_size; @@ -1001,10 +981,10 @@ let init config profile_ctxt proto_parameters = let add_slot_headers ~number_of_slots ~block_level slot_headers t = let module SI = Set.Make (Int) in - let open Lwt_result_syntax in - let slot_header_statuses = t.slot_header_statuses in + let open Lwt_syntax in + let statuses_cache = t.statuses_cache in let* waiting = - List.fold_left_es + List.fold_left_s (fun waiting slot_header -> let Dal_plugin.{slot_index; commitment = _; published_level} = slot_header @@ -1012,9 +992,8 @@ let add_slot_headers ~number_of_slots ~block_level slot_headers t = (* This invariant should hold. *) assert (Int32.equal published_level block_level) ; let index = Types.Slot_id.{slot_level = published_level; slot_index} in - let* () = - Statuses.add_status slot_header_statuses `Waiting_attestation index - |> Errors.to_tzresult + let () = + Statuses_cache.add_status statuses_cache `Waiting_attestation index in Slot_id_cache.add ~number_of_slots t.finalized_commitments slot_header ; return (SI.add slot_index waiting)) diff --git a/src/lib_dal_node/store.mli b/src/lib_dal_node/store.mli index 3f91eb64228dff8f3ec61f3f3c7f314284b84c8c..69c9c4c81a1d4993faa8d9820ae654b8ea0426fa 100644 --- a/src/lib_dal_node/store.mli +++ b/src/lib_dal_node/store.mli @@ -88,8 +88,8 @@ module Slot_id_cache : sig val find_opt : t -> Types.slot_id -> commitment option end -module Statuses : sig - (** A store keeping the attestation status of slot ids. *) +module Statuses_cache : sig + (** A cache keeping the attestation status of slot ids. *) type t @@ -104,19 +104,11 @@ module Statuses : sig number_of_slots:int -> (int -> bool) -> t -> - unit tzresult Lwt.t - - (** [get_slot_status ~slot_id store] returns the status associated - to the given accepted [slot_id], or [None] if no status is - associated to the [slot_id]. *) - val get_slot_status : - slot_id:Types.slot_id -> - t -> - (Types.header_status, [> Errors.other | Errors.not_found]) result Lwt.t + unit - (** [remove_level_status ~level store] removes the status of all the - slot ids published at the given level. *) - val remove_level_status : level:int32 -> t -> unit tzresult Lwt.t + (** [get_slot_status cache ~slot_id] returns the status associated + to the given [slot_id], if any. *) + val get_slot_status : t -> Types.slot_id -> Types.header_status option end module Commitment_indexed_cache : sig @@ -196,9 +188,9 @@ val shards : t -> Shards.t with the store [t]. *) val skip_list_cells : t -> Dal_store_sqlite3.Skip_list_cells.t -(** [slot_header_statuses t] returns the statuses store associated with the store +(** [statuses_cache t] returns the statuses cache associated with the store [t]. *) -val slot_header_statuses : t -> Statuses.t +val statuses_cache : t -> Statuses_cache.t (** [slots t] returns the slots store associated with the store [t]. *) @@ -241,7 +233,7 @@ val add_slot_headers : block_level:int32 -> Dal_plugin.slot_header list -> t -> - unit tzresult Lwt.t + unit Lwt.t (** [Skip_list_cells] manages the storage of [Skip_list_cell.t] *) module Skip_list_cells : sig diff --git a/src/lib_dal_node_services/services.ml b/src/lib_dal_node_services/services.ml index 2025f3bcb086d327c1c227264bc5197d34931d6a..05ade1d81993680bd2d7def9a28aaf9d88dc82dc 100644 --- a/src/lib_dal_node_services/services.ml +++ b/src/lib_dal_node_services/services.ml @@ -289,7 +289,8 @@ let get_slot_status : ; query : unit > service = Tezos_rpc.Service.get_service - ~description:"Return the status for the given slot." + ~description: + "Return the status for the given slot. For operator nodes only." ~query:Tezos_rpc.Query.empty ~output:header_status_encoding Tezos_rpc.Path.( diff --git a/src/lib_dal_node_services/services.mli b/src/lib_dal_node_services/services.mli index 4cd913c2fc2619268fcc572aa77ad72250cdd916..96e4edb993427f3c40b5af00273ca0596c064c80 100644 --- a/src/lib_dal_node_services/services.mli +++ b/src/lib_dal_node_services/services.mli @@ -137,7 +137,10 @@ val get_slot_commitment : ; query : unit > service -(** Return the status for the given slot. *) +(** Return the status of the given slot. It first looks for the status in its + cache, which is available only during the attestation window. If not found + in the cache, it fetches the status in the corresponding cell in the + skip-list store; this therefore works only on operator nodes. *) val get_slot_status : < meth : [`GET] ; input : unit diff --git a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out index afece719412c115c8f3c40fee701e9069f3308cb..618dfd3794d4b8608b975e81b95181dfb1264f66 100644 --- a/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/Alpha- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node. diff --git a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out index afece719412c115c8f3c40fee701e9069f3308cb..618dfd3794d4b8608b975e81b95181dfb1264f66 100644 --- a/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/S023-- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node. diff --git a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out index afece719412c115c8f3c40fee701e9069f3308cb..618dfd3794d4b8608b975e81b95181dfb1264f66 100644 --- a/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out +++ b/tezt/tests/expected/dal.ml/T024-- Testing DAL node (dal node list RPCs).out @@ -23,7 +23,7 @@ Available services: - GET /levels//slots//shards//content Fetch shard as bytes - GET /levels//slots//status - Return the status for the given slot. + Return the status for the given slot. For operator nodes only. - GET /monitor/synchronized Returns the stream of synchronization statuses of the DAL node with the L1 node.