diff --git a/src/lib_store/unix/block_store.ml b/src/lib_store/unix/block_store.ml index 108bf78642fb120369874dab1adeffbefef60af3..371cac582dca0e9be3853faa46be9daf03acc489 100644 --- a/src/lib_store/unix/block_store.ml +++ b/src/lib_store/unix/block_store.ml @@ -875,6 +875,36 @@ let compute_new_caboose block_store history_mode ~new_savepoint module BlocksLAFL = Set.Make (Int32) +(* Limits the maximum number of elements that can be added into a + cycle. + This is mandatory when cementing metadata. Indeed, the current + version of camlzip support only 32bits zip files, that are files + smaller that ~4GB or containing less that 65_535 entries. When + cementing cycles, we might reach that limit. We set it to 2^16 - 1. *) +let default_cycle_size_limit = 65_535l + +(* May shrink the size of the given cycles to make sure that the size + of a cycle never exceeds the camlzip 32bits limitation. The shrink + consist in dividing the cycles in two even parts, recursively, + until the limit is not exceeded anymore. *) +let may_shrink_cycles cycles ~cycle_size_limit = + let rec loop acc cycles = + match cycles with + | [] -> List.rev acc + | ((cycle_start, cycle_end) as hd) :: tl -> + let diff = Int32.(sub cycle_end cycle_start) in + if diff >= cycle_size_limit then + let mid = Int32.(div diff 2l) in + let left_cycle_upper_bound = Int32.(add cycle_start mid) in + let left_cycle = (cycle_start, left_cycle_upper_bound) in + let right_cycle = + (Int32.(add left_cycle_upper_bound 1l), cycle_end) + in + loop acc (left_cycle :: right_cycle :: tl) + else loop (hd :: acc) tl + in + loop [] cycles + (* FIXME: update doc *) (* [update_floating_stores block_store ~history_mode ~ro_store ~rw_store ~new_store ~new_head ~new_head_lafl @@ -886,7 +916,7 @@ module BlocksLAFL = Set.Make (Int32) savepoint and caboose candidates. *) let update_floating_stores block_store ~history_mode ~ro_store ~rw_store ~new_store ~new_head ~new_head_lafl ~lowest_bound_to_preserve_in_floating - ~cementing_highwatermark = + ~cementing_highwatermark ~cycle_size_limit = let open Lwt_result_syntax in let*! () = Store_events.(emit start_updating_floating_stores) () in let* lafl_block = @@ -1025,7 +1055,11 @@ let update_floating_stores block_store ~history_mode ~ro_store ~rw_store let sorted_lafl = List.sort Compare.Int32.compare (BlocksLAFL.elements !blocks_lafl) in - let* cycles_to_cement = loop [] initial_pred sorted_lafl in + + let* cycles_to_cement = + let* cycles = loop [] initial_pred sorted_lafl in + return (may_shrink_cycles cycles ~cycle_size_limit) + in let* new_savepoint = compute_new_savepoint block_store @@ -1197,7 +1231,7 @@ let instanciate_temporary_floating_store block_store = let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store ~new_head ~new_head_lafl ~lowest_bound_to_preserve_in_floating - ~cementing_highwatermark = + ~cementing_highwatermark ~cycle_size_limit = let open Lwt_result_syntax in let*! () = Store_events.(emit start_merging_thread) () in let*! new_ro_store = @@ -1217,6 +1251,7 @@ let create_merging_thread block_store ~history_mode ~old_ro_store ~old_rw_store ~new_head_lafl ~lowest_bound_to_preserve_in_floating ~cementing_highwatermark + ~cycle_size_limit in let cycle_reader = read_iterator_block_range_in_floating_stores @@ -1328,9 +1363,9 @@ let split_context block_store new_head_lafl = let*! () = Store_events.(emit start_context_split new_head_lafl) in split () -let merge_stores block_store ~(on_error : tztrace -> unit tzresult Lwt.t) - ~finalizer ~history_mode ~new_head ~new_head_metadata - ~cementing_highwatermark = +let merge_stores ?(cycle_size_limit = default_cycle_size_limit) block_store + ~(on_error : tztrace -> unit tzresult Lwt.t) ~finalizer ~history_mode + ~new_head ~new_head_metadata ~cementing_highwatermark = let open Lwt_result_syntax in let* () = fail_when block_store.readonly Cannot_write_in_readonly in (* Do not allow multiple merges: force waiting for a potential @@ -1390,6 +1425,7 @@ let merge_stores block_store ~(on_error : tztrace -> unit tzresult Lwt.t) let* new_ro_store, new_savepoint, new_caboose = create_merging_thread block_store + ~cycle_size_limit ~history_mode ~old_ro_store ~old_rw_store diff --git a/src/lib_store/unix/block_store.mli b/src/lib_store/unix/block_store.mli index 034fe00320e94cf1b7ccb2ff4a14bd20f5e7dcc6..be8a2c777c281ab554ef7e75debee46ae108ba24 100644 --- a/src/lib_store/unix/block_store.mli +++ b/src/lib_store/unix/block_store.mli @@ -270,6 +270,9 @@ val move_floating_store : in [block_store] to finish if any. *) val await_merging : block_store -> unit Lwt.t +(** Default cemented cycles maximum size. I.e.: [2^16 - 1] *) +val default_cycle_size_limit : int32 + (** (* TODO UPDATE MERGE DOC *) [merge_stores block_store ?finalizer ~nb_blocks_to_preserve @@ -293,11 +296,15 @@ val await_merging : block_store -> unit Lwt.t If a merge thread is already occurring, this function will first wait for the previous merge to be done. + The cemented cycles will have a max size of [cycle_size_limit] + blocks which default to [default_cycle_size_limit]. + {b Warning} For a given [block_store], the caller must wait for this function termination before calling it again or it may result in concurrent intertwining causing the cementing to be out of order. *) val merge_stores : + ?cycle_size_limit:int32 -> block_store -> on_error:(tztrace -> unit tzresult Lwt.t) -> finalizer:(int32 -> unit tzresult Lwt.t) -> diff --git a/src/lib_store/unix/test/test_block_store.ml b/src/lib_store/unix/test/test_block_store.ml index 7b8452ea10e5f9405072e8b3c9673131632ff83c..551864b991281755a16d5f5823f81cfbfdf815d3 100644 --- a/src/lib_store/unix/test/test_block_store.ml +++ b/src/lib_store/unix/test/test_block_store.ml @@ -468,8 +468,9 @@ let test_merge_with_branches block_store = in assert_absence_in_block_store block_store (List.flatten blocks_to_gc) -let perform_n_cycles_merge ?(cycle_length = 10) block_store history_mode - nb_cycles = +let perform_n_cycles_merge ?(cycle_length = 10) + ?(cycle_size_limit = Block_store.default_cycle_size_limit) block_store + history_mode nb_cycles = let open Lwt_result_syntax in let*! cycles, head = make_n_initial_consecutive_cycles block_store ~cycle_length ~nb_cycles @@ -486,6 +487,7 @@ let perform_n_cycles_merge ?(cycle_length = 10) block_store history_mode let* () = Block_store.merge_stores block_store + ~cycle_size_limit ~on_error:(fun err -> Assert.fail_msg "merging failed: %a" pp_print_trace err) ~finalizer:(fun _ -> return_unit) @@ -695,7 +697,41 @@ let test_rolling_2_merge block_store = Assert.Int32.equal ~msg:"caboose" expected_savepoint (snd caboose) ; return_unit -let wrap_test ?(keep_dir = false) (name, g) = +let test_split_cycle_merge block_store = + let open Lwt_result_syntax in + let* cycles = + perform_n_cycles_merge + ~cycle_size_limit:3l + ~cycle_length:5 + block_store + Archive + 5 + in + (* All blocks w/ metadata should be present *) + let* () = + assert_presence_in_block_store + ~with_metadata:true + block_store + (List.concat cycles) + in + let cemented_cycles = + Cemented_block_store.cemented_blocks_files + (Block_store.cemented_block_store block_store) + |> function + | Some a -> Array.to_list a + | None -> [] + in + let () = + (* Ensure that cemented cycles are split as 2 or 3 block chunks *) + List.iter + (fun {Cemented_block_store.start_level; end_level; _} -> + let diff_nb = succ Int32.(sub end_level start_level |> to_int) in + assert (diff_nb >= 2 && diff_nb <= 3)) + cemented_cycles + in + return_unit + +let wrap_test ?(keep_dir = true) (name, g) = let open Lwt_result_syntax in let f dir_path = let genesis_block = @@ -757,6 +793,7 @@ let tests : string * unit Alcotest_lwt.test_case list = ("consecutive merge (Full + 2 cycles)", test_full_2_merge); ("consecutive merge (Rolling + 0 cycles)", test_rolling_0_merge); ("consecutive merge (Rolling + 2 cycles)", test_rolling_2_merge); + ("split cycle merge", test_split_cycle_merge); ] in ("block store", test_cases)