diff --git a/src/bin_dal_node/daemon.ml b/src/bin_dal_node/daemon.ml index f98178d717c0d82e080796acee5b4f9460fa2a01..de9f362b492991829bbe7e433f449b51253c3efd 100644 --- a/src/bin_dal_node/daemon.ml +++ b/src/bin_dal_node/daemon.ml @@ -1200,6 +1200,34 @@ let clean_up_store ctxt cctxt ~last_processed_level ~first_seen_level head_level in do_clean_up last_processed_level head_level +let clean_up_or_reset_store ctxt cctxt ~last_processed_level ~first_seen_level + head_level proto_parameters = + let open Lwt_syntax in + let can_wipe_data = + let attesters_data_retention_period = + Profile_manager.attesters_data_retention_period proto_parameters + in + let delta_levels = Int32.(sub head_level last_processed_level |> to_int) in + Node_context.get_profile_ctxt ctxt + |> Profile_manager.is_attester_only_profile + && delta_levels > attesters_data_retention_period + in + if can_wipe_data then + let store = Node_context.get_store ctxt in + let* () = store |> Store.reset_persistent_stores in + let last_processed_level_store = Store.last_processed_level store in + Store.Last_processed_level.save + last_processed_level_store + (Int32.pred head_level) + else + clean_up_store + ctxt + cctxt + ~last_processed_level + ~first_seen_level + head_level + proto_parameters + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/3605 Improve general architecture, handle L1 disconnection etc *) @@ -1475,7 +1503,7 @@ let run ~data_dir ~configuration_override = match last_processed_level with | None -> (* there's nothing to clean up *) return_unit | Some last_processed_level -> - clean_up_store + clean_up_or_reset_store ctxt cctxt ~last_processed_level diff --git a/src/bin_dal_node/profile_manager.ml b/src/bin_dal_node/profile_manager.ml index e60d6530592ed20b4e2d2ed983ce7449337a7e39..a9c8b8aee58907eba66b2851e678b8eb1f811b30 100644 --- a/src/bin_dal_node/profile_manager.ml +++ b/src/bin_dal_node/profile_manager.ml @@ -204,6 +204,10 @@ let get_refutation_game_period proto_parameters = + proto_parameters.commitment_period_in_blocks + proto_parameters.dal_attested_slots_validity_lag +(* Amount of blocks during which an attester retains its data on disk. *) +let attesters_data_retention_period proto_parameters = + 2 * proto_parameters.Types.attestation_lag + (* This function returns whether the node should store skip list cells, in addition to the retention period for attested data (slots, shards, etc). *) let get_attested_data_default_store_period t proto_parameters = @@ -213,7 +217,7 @@ let get_attested_data_default_store_period t proto_parameters = let refutation_game_period = 2 * get_refutation_game_period proto_parameters in - let attestation_period = 2 * proto_parameters.attestation_lag in + let attestation_period = attesters_data_retention_period proto_parameters in let supports_refutations_bis, period = match get_profiles t with | Random_observer -> (false, attestation_period) diff --git a/src/bin_dal_node/profile_manager.mli b/src/bin_dal_node/profile_manager.mli index 68cf706c003046122d0d1b5647a94cc21b55d3f7..3ba878fb5f081bddb71fa418226a1d7e8d7a2757 100644 --- a/src/bin_dal_node/profile_manager.mli +++ b/src/bin_dal_node/profile_manager.mli @@ -120,6 +120,9 @@ val get_profiles : t -> Types.profile observer & slot producer, twice attestation lag for attester) *) val get_attested_data_default_store_period : t -> Types.proto_parameters -> int +(** Amount of blocks during which an attester retains its data on disk. *) +val attesters_data_retention_period : Types.proto_parameters -> int + (** Resolves a profile by either returning it unchanged (for bootstrap and operator profiles) or generating a new observer profile for random observer profile. The random slot is selected within the diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index d247052789bfe0bcd7ba05b6a908560e22ceb71f..0f2c2b5e149a55327d7859715704c37aa48078a7 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -140,6 +140,8 @@ end module Shards = struct type nonrec t = (Types.slot_id, int, Cryptobox.share) KVS.t + let reset = KVS.reset + let file_layout ~root_dir (slot_id : Types.slot_id) = (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7045 @@ -247,6 +249,8 @@ end module Slots = struct type t = (Types.slot_id * int, unit, bytes) KVS.t + let reset = KVS.reset + let file_layout ~root_dir ((slot_id : Types.slot_id), slot_size) = (* FIXME: https://gitlab.com/tezos/tezos/-/issues/7045 @@ -413,6 +417,8 @@ end module Statuses = struct type t = (int32, int, Types.header_status) KVS.t + let reset = KVS.reset + let file_layout ~root_dir slot_level = (* The number of entries per file is the number of slots. We put here the max value (4096) because we don't have a cryptobox @@ -1003,3 +1009,9 @@ let add_slot_headers ~number_of_slots ~block_level slot_headers t = Dal_metrics.slot_waiting_for_attestation ~set:(SI.mem i waiting) i) (0 -- (number_of_slots - 1)) ; return_unit + +let reset_persistent_stores store = + let open Lwt_syntax in + let* () = KVS.reset store.shards in + let* () = KVS.reset store.slots in + KVS.reset store.slot_header_statuses diff --git a/src/bin_dal_node/store.mli b/src/bin_dal_node/store.mli index d58c2c637c0312e298c482af95f800d9a6fc3890..861a385fa8213800ac10203ae40da69ddc07a5cb 100644 --- a/src/bin_dal_node/store.mli +++ b/src/bin_dal_node/store.mli @@ -226,6 +226,8 @@ val cache_entry : val init : Configuration_file.t -> t tzresult Lwt.t +val reset_persistent_stores : t -> unit Lwt.t + (** [add_slot_headers ~number_of_slots ~block_level slot_headers store] processes the [slot_headers] published at [block_level]. Concretely, for each slot header successfully applied in the L1 block, diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index dfc6e756f0e372fdbad0c1bb88f7ccd596fbda3b..57a47436a81cd18bf0c60f2e1022c386f3c974b5 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -211,6 +211,8 @@ module Files : sig val close : 'value t -> unit Lwt.t + val reset : 'value t -> unit Lwt.t + val write : ?override:bool -> 'value t -> @@ -667,6 +669,11 @@ end = struct | _ -> ()) ; Lwt.return_unit + let close_files last_actions lru files = + Table.iter_s + (fun filename _ -> close_file files lru last_actions filename) + files + (* The promise returned by this function is fullfiled once all the current actions are completed and all the opened files are closed. This function should be idempotent. *) @@ -675,11 +682,7 @@ end = struct if !closed then return_unit else ( closed := true ; - let* () = - Table.iter_s - (fun filename _ -> close_file files lru last_actions filename) - files - in + let* () = close_files last_actions lru files in LRU.clear lru ; return_unit) @@ -934,6 +937,14 @@ end = struct | _ -> ()) | _ -> ()) ; return_ok () + + let reset {closed = _; last_actions; files; lru} = + let open Lwt_syntax in + let* () = close_files last_actions lru files in + Table.clear last_actions ; + Table.clear files ; + LRU.clear lru ; + return_unit end let layout ?encoded_value_size ~encoding ~filepath ~eq ~index_of @@ -1043,15 +1054,27 @@ let lockfile_unlock fd = (Lwt_utils_unix.Io_error {action = `Close; unix_code; caller; arg}) | exn -> Lwt.reraise exn) +let lock_file ~root_dir = Filename.concat root_dir ".lock" + let init ~lru_size ~root_dir = let open Lwt_result_syntax in let*! () = if not (Sys.file_exists root_dir) then Lwt_utils_unix.create_dir root_dir else Lwt.return_unit in - with_lockfile_lock (Filename.concat root_dir ".lock") @@ fun fd -> + with_lockfile_lock (lock_file ~root_dir) @@ fun fd -> return {files = Files.init ~lru_size; root_dir; lockfile = fd} +let reset {files; root_dir; lockfile = _} = + let open Lwt_syntax in + let* () = Files.reset files in + let lock_file = lock_file ~root_dir in + Lwt_stream.iter_s + (fun file -> + if String.equal lock_file file then Lwt.return_unit + else Lwt_unix.unlink file) + (Lwt_unix.files_of_directory root_dir) + let close t = let open Lwt_result_syntax in let*! () = Files.close t.files in diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index 05e88b90efc0fafee22cdb73583b256f1b40c5af..7406924f922957e64bedb558cbb76afec39d6ac6 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -123,6 +123,9 @@ type ('file, 'key, 'value) t val init : lru_size:int -> root_dir:string -> ('file, 'key, 'value) t tzresult Lwt.t +(** Reset the given key-value store to its initial (empty) configuration. *) +val reset : ('file, 'key, 'value) t -> unit Lwt.t + (** [close kvs] waits until all pending reads and writes are completed and closes the key-value store. *) val close : ('file, 'key, 'value) t -> unit tzresult Lwt.t @@ -214,7 +217,7 @@ val count_values : module View : sig (** Returns the number of files currently opened by the key value store. Do note this number is an upper bound on the number of - file descriptors opened. This number should always be lower than [lru_size]. + file descriptors opened. This number should always be lower than [lru_size]. *) val opened_files : ('file, 'key, 'value) t -> int