From 3eb35a59cfd811f86a085d2c5b105b0303f94b5d Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Thu, 26 Sep 2024 09:19:57 +0200 Subject: [PATCH 1/2] Store: remove may_shrink_cycles while cementing cycles --- src/lib_store/unix/block_store.ml | 60 +++++---------------- src/lib_store/unix/block_store.mli | 7 --- src/lib_store/unix/test/test_block_store.ml | 41 +------------- 3 files changed, 14 insertions(+), 94 deletions(-) diff --git a/src/lib_store/unix/block_store.ml b/src/lib_store/unix/block_store.ml index 433a6d097f09..d7d33b2a1026 100644 --- a/src/lib_store/unix/block_store.ml +++ b/src/lib_store/unix/block_store.ml @@ -879,36 +879,6 @@ let compute_new_caboose block_store history_mode ~new_savepoint module BlocksLPBL = Set.Make (Int32) -(* Limits the maximum number of elements that can be added into a - cycle. - This is mandatory when cementing metadata. Indeed, the current - version of camlzip support only 32bits zip files, that are files - smaller that ~4GB or containing less that 65_535 entries. When - cementing cycles, we might reach that limit. We set it to 2^16 - 1. *) -let default_cycle_size_limit = 65_535l - -(* May shrink the size of the given cycles to make sure that the size - of a cycle never exceeds the camlzip 32bits limitation. The shrink - consist in dividing the cycles in two even parts, recursively, - until the limit is not exceeded anymore. *) -let may_shrink_cycles cycles ~cycle_size_limit = - let rec loop acc cycles = - match cycles with - | [] -> List.rev acc - | ((cycle_start, cycle_end) as hd) :: tl -> - let diff = Int32.(sub cycle_end cycle_start) in - if diff >= cycle_size_limit then - let mid = Int32.(div diff 2l) in - let left_cycle_upper_bound = Int32.(add cycle_start mid) in - let left_cycle = (cycle_start, left_cycle_upper_bound) in - let right_cycle = - (Int32.(add left_cycle_upper_bound 1l), cycle_end) - in - loop acc (left_cycle :: right_cycle :: tl) - else loop (hd :: acc) tl - in - loop [] cycles - (* FIXME: update doc *) (* [update_floating_stores block_store ~history_mode ~ro_store ~rw_store ~new_store ~new_head ~new_head_lpbl @@ -920,7 +890,7 @@ let may_shrink_cycles cycles ~cycle_size_limit = savepoint and caboose candidates. *) let update_floating_stores block_store ~history_mode ~ro_store ~rw_store ~new_store ~new_head ~new_head_lpbl ~lowest_bound_to_preserve_in_floating - ~cementing_highwatermark ~cycle_size_limit = + ~cementing_highwatermark = let open Lwt_result_syntax in let*! () = Store_events.(emit start_updating_floating_stores) () in let* lpbl_block = @@ -1099,16 +1069,13 @@ let update_floating_stores block_store ~history_mode ~ro_store ~rw_store List.sort Compare.Int32.compare (BlocksLPBL.elements !blocks_lpbl) in let* cycles_to_cement = - let* cycles = - (loop - [] - initial_pred - sorted_lpbl - [@profiler.record_s - {verbosity = Debug; metadata = [("prometheus", "")]} - "retrieve cycle to cement"]) - in - return (may_shrink_cycles cycles ~cycle_size_limit) + (loop + [] + initial_pred + sorted_lpbl + [@profiler.record_s + {verbosity = Debug; metadata = [("prometheus", "")]} + "retrieve cycle to cement"]) in let* new_savepoint = (compute_new_savepoint @@ -1318,7 +1285,7 @@ let unlock lockfile = Lwt_unix.lockf lockfile Unix.F_ULOCK 0 let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store ~new_head ~new_head_lpbl ~lowest_bound_to_preserve_in_floating - ~cementing_highwatermark ~cycle_size_limit = + ~cementing_highwatermark = let open Lwt_result_syntax in let*! () = Store_events.(emit start_merging_thread) () in let*! new_ro_store = @@ -1344,7 +1311,6 @@ let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store ~new_head_lpbl ~lowest_bound_to_preserve_in_floating ~cementing_highwatermark - ~cycle_size_limit [@profiler.record_s {verbosity = Debug; metadata = [("prometheus", "")]} "update floating stores"]) @@ -1480,10 +1446,9 @@ let split_context block_store new_head_lpbl = let*! () = Store_events.(emit start_context_split new_head_lpbl) in split () -let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store - ~(on_error : tztrace -> unit tzresult Lwt.t) ~finalizer ~history_mode - ~new_head ~new_head_metadata ~cementing_highwatermark - ~disable_context_pruning = +let merge_stores block_store ~(on_error : tztrace -> unit tzresult Lwt.t) + ~finalizer ~history_mode ~new_head ~new_head_metadata + ~cementing_highwatermark ~disable_context_pruning = let open Lwt_result_syntax in let* () = fail_when block_store.readonly Cannot_write_in_readonly in (* Do not allow multiple merges: force waiting for a potential @@ -1573,7 +1538,6 @@ let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store let* new_ro_store, new_savepoint, new_caboose = (create_merging_thread block_store - ~cycle_size_limit ~history_mode ~old_ro_store ~old_rw_store diff --git a/src/lib_store/unix/block_store.mli b/src/lib_store/unix/block_store.mli index 284bc4fe677f..cdfae1aa8085 100644 --- a/src/lib_store/unix/block_store.mli +++ b/src/lib_store/unix/block_store.mli @@ -264,9 +264,6 @@ val move_floating_store : in [block_store] to finish if any. *) val await_merging : block_store -> unit Lwt.t -(** Default cemented cycles maximum size. I.e.: [2^16 - 1] *) -val default_cycle_size_limit : int32 - (** (* TODO UPDATE MERGE DOC *) [merge_stores block_store ?finalizer ~nb_blocks_to_preserve @@ -292,15 +289,11 @@ val default_cycle_size_limit : int32 If a merge thread is already occurring, this function will first wait for the previous merge to be done. - The cemented cycles will have a max size of [cycle_size_limit] - blocks which default to [default_cycle_size_limit]. - {b Warning} For a given [block_store], the caller must wait for this function termination before calling it again or it may result in concurrent intertwining causing the cementing to be out of order. *) val merge_stores : - ?cycle_size_limit:int32 -> block_store -> on_error:(tztrace -> unit tzresult Lwt.t) -> finalizer:(int32 -> unit tzresult Lwt.t) -> diff --git a/src/lib_store/unix/test/test_block_store.ml b/src/lib_store/unix/test/test_block_store.ml index 23b79c2e02be..0683640728bd 100644 --- a/src/lib_store/unix/test/test_block_store.ml +++ b/src/lib_store/unix/test/test_block_store.ml @@ -472,9 +472,8 @@ let test_merge_with_branches block_store = in assert_absence_in_block_store block_store (List.flatten blocks_to_gc) -let perform_n_cycles_merge ?(cycle_length = 10) - ?(cycle_size_limit = Block_store.default_cycle_size_limit) block_store - history_mode nb_cycles = +let perform_n_cycles_merge ?(cycle_length = 10) block_store history_mode + nb_cycles = let open Lwt_result_syntax in let*! cycles, head = make_n_initial_consecutive_cycles block_store ~cycle_length ~nb_cycles @@ -491,7 +490,6 @@ let perform_n_cycles_merge ?(cycle_length = 10) let* () = Block_store.merge_stores block_store - ~cycle_size_limit ~on_error:(fun err -> Assert.fail_msg "merging failed: %a" pp_print_trace err) ~finalizer:(fun _ -> return_unit) @@ -702,40 +700,6 @@ let test_rolling_2_merge block_store = Assert.Int32.equal ~msg:"caboose" expected_savepoint (snd caboose) ; return_unit -let test_split_cycle_merge block_store = - let open Lwt_result_syntax in - let* cycles = - perform_n_cycles_merge - ~cycle_size_limit:3l - ~cycle_length:5 - block_store - Archive - 5 - in - (* All blocks w/ metadata should be present *) - let* () = - assert_presence_in_block_store - ~with_metadata:true - block_store - (List.concat cycles) - in - let cemented_cycles = - Cemented_block_store.cemented_blocks_files - (Block_store.cemented_block_store block_store) - |> function - | Some a -> Array.to_list a - | None -> [] - in - let () = - (* Ensure that cemented cycles are split as 2 or 3 block chunks *) - List.iter - (fun {Cemented_block_store.start_level; end_level; _} -> - let diff_nb = succ Int32.(sub end_level start_level |> to_int) in - assert (diff_nb >= 2 && diff_nb <= 3)) - cemented_cycles - in - return_unit - let wrap_test ?(keep_dir = true) (name, g) = let open Lwt_result_syntax in let f dir_path = @@ -800,7 +764,6 @@ let tests : string * unit Alcotest_lwt.test_case list = ("consecutive merge (Full + 2 cycles)", test_full_2_merge); ("consecutive merge (Rolling + 0 cycles)", test_rolling_0_merge); ("consecutive merge (Rolling + 2 cycles)", test_rolling_2_merge); - ("split cycle merge", test_split_cycle_merge); ] in ("block store", test_cases) -- GitLab From e0e9f0bba5a9728c0c59ee8db471b37d416d48be Mon Sep 17 00:00:00 2001 From: Victor Allombert Date: Mon, 30 Sep 2024 11:08:51 +0200 Subject: [PATCH 2/2] Store: improve block_store.mli comments --- src/lib_store/unix/block_store.mli | 36 ++++++++++++++++-------------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/src/lib_store/unix/block_store.mli b/src/lib_store/unix/block_store.mli index cdfae1aa8085..af331148fe47 100644 --- a/src/lib_store/unix/block_store.mli +++ b/src/lib_store/unix/block_store.mli @@ -212,7 +212,7 @@ val resulting_context_hash : (** [read_block block_store ~read_metadata key] reads the block [key] in [block_store] if present. Return [None] if the block is - unknown. If [read_metadata] is set to [true] it tries to retreive + unknown. If [read_metadata] is set to [true] it tries to retrieve the metadata but do not fail if it is not available. *) val read_block : block_store -> read_metadata:bool -> key -> Block_repr.t option tzresult Lwt.t @@ -264,21 +264,19 @@ val move_floating_store : in [block_store] to finish if any. *) val await_merging : block_store -> unit Lwt.t -(** - (* TODO UPDATE MERGE DOC *) - [merge_stores block_store ?finalizer ~nb_blocks_to_preserve - ~history_mode ~from_block ~to_block] triggers a merge as described - in the above description. This will result, {b asynchronously}, - in: +(** [merge_stores block_store ~on_error ~finalizer ~history_mode + ~new_head ~new_head_metadata ~cementing_highwatermark + ~disable_context_pruning] triggers a merge as described in the + above description. This will result, {b asynchronously}, in: - - the cementing (if needs be) of a cycle from [from_block] to - [to_block] (included) + - the cementing (if needs be) of a cycle from [new_head] to + [cementing_highwatermark] (included) - - trimming the floating stores and preserves [to_block] - - [nb_blocks_to_preserve] blocks (iff these blocks are present or - the longest suffix otherwise) along with their metadata in the - floating store. It may potentially have duplicates in the - cemented block store. + - trimming the floating stores and preserves blocks (iff these + blocks are present or the longest suffix otherwise) along with + their metadata in the floating store, depending on the + [last_preserved_block_level] from the [new_head_metadata]. It may + potentially have duplicates in the cemented block store. After the cementing, {!Cemented_block_store.trigger_gc} will be called with the given [history_mode]. When the merging thread @@ -289,6 +287,10 @@ val await_merging : block_store -> unit Lwt.t If a merge thread is already occurring, this function will first wait for the previous merge to be done. + [on_error] is called as soon as an error occurs during the + [merge_stores]. It is typically used to unlock resources that are + locked during the [finalizer] call to avoid deadlocks. + {b Warning} For a given [block_store], the caller must wait for this function termination before calling it again or it may result in concurrent intertwining causing the cementing to be out of @@ -349,8 +351,8 @@ val unlock_block_store : t -> unit Lwt.t (** [sync ?last_status block_store] updates the [block_store] internal file descriptors so that the return block store points to the - latest fds. This is useful to keep track of a block store opened - in readonly mode that is updated by another read/write + latest values. This is useful to keep track of a block store + opened in readonly mode that is updated by another read/write instance. If [last_status] is provided, it gives a hint about the previous sync call and may avoid unnecessary synchronizations. As a result, @@ -394,5 +396,5 @@ val may_recover_merge : block_store -> unit tzresult Lwt.t val stat_metadata_cycles : t -> (string * metadata_stat list) list tzresult Lwt.t -(** Upgrade the block_store_status *) +(** Upgrade the block_store_status. *) val v_3_1_upgrade : [`Chain_dir] Naming.directory -> unit tzresult Lwt.t -- GitLab