diff --git a/manifest/externals.ml b/manifest/externals.ml index f2b1154c0e013e4780466123985cc8aa47b1380f..15d67b0df32ec74be09988192ea69735eb4c8dbe 100644 --- a/manifest/externals.ml +++ b/manifest/externals.ml @@ -38,6 +38,8 @@ let asetmap = external_lib "asetmap" V.(at_least "0.8.1") let astring = external_lib "astring" V.True +let bam_ppx = external_lib "bam-ppx" V.True + let bheap = external_lib "bheap" V.(at_least "2.0.0") let bigarray_compat = external_lib "bigarray-compat" V.True @@ -287,6 +289,8 @@ let crowbar = external_lib "crowbar" V.(at_least "0.2") let ppx_hash = external_lib "ppx_hash" V.True +let tezt_bam = external_lib "tezt-bam" V.True + let tezt_lib = external_lib "tezt" diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index b0cc0d4b852512a20cb63624572fc7f464f39021..64fbe08e72cf9bd4d4686b1da709d0f6a5aaf621 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -1438,10 +1438,12 @@ let _octez_stdlib_unix_test = [ "test_key_value_store"; "test_key_value_store_fuzzy"; + "test_key_value_store_fuzzy_bam"; "test_log_config_rules"; ] ~path:"src/lib_stdlib_unix/test/" ~opam:"octez-libs" + ~preprocess:(pps bam_ppx) ~deps: [ octez_error_monad |> open_ |> open_ ~m:"TzLwtreslib"; @@ -1450,6 +1452,7 @@ let _octez_stdlib_unix_test = octez_test_helpers |> open_; qcheck_alcotest; alcotezt; + tezt_bam; ] let octez_dal_config = diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index a43b93dccb61d421accaa77cc3bf7d7d48333e85..620f2533f8fe8eed627df97b6a5318f5784cb7ce 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -82,6 +82,8 @@ depends: [ "octez-rust-deps" { = version } "lwt-watcher" { = "0.2" } "bigstring" {with-test} + "bam-ppx" {with-test} + "tezt-bam" {with-test} ] x-opam-monorepo-opam-provided: [ "tezos-sapling-parameters" diff --git a/opam/virtual/octez-deps.opam b/opam/virtual/octez-deps.opam index c749f7bed0f401118b9c245463875828f4c6d01e..a9945f8aa584f5e7c608f2f950fa1612243585ff 100644 --- a/opam/virtual/octez-deps.opam +++ b/opam/virtual/octez-deps.opam @@ -15,6 +15,7 @@ depends: [ "alcotest-lwt" { >= "1.5.0" } "asetmap" { >= "0.8.1" } "astring" + "bam-ppx" "base-unix" "bheap" { >= "2.0.0" } "bigarray-compat" @@ -104,6 +105,7 @@ depends: [ "tar-unix" { >= "2.0.1" & < "3.0.0" } "tezos-sapling-parameters" { >= "1.1.0" } "tezt" { >= "4.1.0" & < "5.0.0" } + "tezt-bam" "tls-lwt" { >= "0.16.0" } "uri" { >= "3.1.0" } "uutf" diff --git a/src/bin_dal_node/constants.ml b/src/bin_dal_node/constants.ml index 8158207892ed52a1320e7bead56fdb56c3a8cdba..aacd8b0f02822b577e9ad08891458284da80757a 100644 --- a/src/bin_dal_node/constants.ml +++ b/src/bin_dal_node/constants.ml @@ -32,7 +32,7 @@ let shards_store_lru_size = regular file opening and one via mmap on the bitset region). Note that setting a too high value causes a "Too many open files" error. *) let irmin_internals_entries_per_toplevel_entry = 3 in - let number_of_slots = 256 in + let number_of_slots = 32 in let number_of_remembered_levels = 1 in irmin_internals_entries_per_toplevel_entry * number_of_slots * number_of_remembered_levels diff --git a/src/bin_dal_node/dal_metrics.ml b/src/bin_dal_node/dal_metrics.ml index ce3d8b2d94a25fd901439495bf8eef6b7d360ae7..3ef5cbf32b6a4a5c2269c465dae8c24508a5fc8c 100644 --- a/src/bin_dal_node/dal_metrics.ml +++ b/src/bin_dal_node/dal_metrics.ml @@ -148,6 +148,10 @@ module GS = struct module Stats = struct let gs_stats = ref (Gossipsub.Worker.Introspection.empty_stats ()) + let count_kvs_table_file = ref 0 + + let count_kvs_table_action = ref 0 + let input_events_stream_length = ref 0 let p2p_output_stream_length = ref 0 @@ -368,6 +372,14 @@ module GS = struct ~label_names:["peer"] (fun () -> !Stats.scores_of_peers) + let count_kvs_table_file = + metric ~name:"count_kvs_table_file" ~help:"todo" (fun () -> + !Stats.count_kvs_table_file |> float_of_int) + + let count_kvs_table_action = + metric ~name:"count_kvs_table_action" ~help:"todo" (fun () -> + !Stats.count_kvs_table_action |> float_of_int) + let metrics = [ (* Metrics about the stats gathered by the worker *) @@ -394,6 +406,8 @@ module GS = struct (* Other metrics about GS automaton's state *) count_peers_per_topic; scores_of_peers; + count_kvs_table_file; + count_kvs_table_action; ] let () = List.iter add_metric metrics @@ -439,6 +453,10 @@ let layer1_block_finalized_round ~block_round = let update_shards_verification_time f = Prometheus.DefaultHistogram.observe Node_metrics.verify_shard_time f +let update_kvs_table ~file ~action = + GS.Stats.count_kvs_table_file := file ; + GS.Stats.count_kvs_table_action := action + let sample_time ~sampling_frequency ~to_sample ~metric_updater = if sampling_frequency > 0 && Random.int sampling_frequency <> 0 then to_sample () diff --git a/src/bin_dal_node/dal_metrics.mli b/src/bin_dal_node/dal_metrics.mli index 89caa2f254edc54622c39fbfe5a9f77e94fd0138..5005f68975fe89bf76ceb493d65d260a3354c6e2 100644 --- a/src/bin_dal_node/dal_metrics.mli +++ b/src/bin_dal_node/dal_metrics.mli @@ -55,3 +55,5 @@ val sample_time : (** [collect_gossipsub_metrics gs_worker] allows to periodically collect metrics from the given GS Worker state. *) val collect_gossipsub_metrics : Gossipsub.Worker.t -> unit + +val update_kvs_table : file:int -> action:int -> unit diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index bacb4e55f23afb502fa72db86be980af88df2f14..b2e8912cff7646b71f8203e5198d917d7b5b7050 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -212,7 +212,11 @@ module Shards = struct let count_values store slot_id = KVS.count_values store file_layout slot_id - let remove store slot_id = KVS.remove_file store file_layout slot_id + let remove store slot_id = + let action = KVS.View.on_going_actions store in + let file = KVS.View.opened_files store in + Dal_metrics.update_kvs_table ~file ~action ; + KVS.remove_file store file_layout slot_id let init node_store_dir shard_store_dir = let root_dir = Filename.concat node_store_dir shard_store_dir in diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index 5376e445a17fa73ee3614fbb4c476d30e2942432..ece0acf4f058cbafa7b619a05d05ff656cd7b828 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -227,6 +227,12 @@ module Files : sig val count_values : 'value t -> ('key, 'value) layout -> int tzresult Lwt.t val remove : 'value t -> ('key, 'value) layout -> unit tzresult Lwt.t + + module View : sig + val opened_files : 'value t -> int + + val on_going_actions : 'value t -> int + end end = struct module LRU = Ringo.LRU_Collection @@ -346,6 +352,12 @@ end = struct The store ensures that actions performed on a given file are done sequentially. *) + module View = struct + let opened_files {files; _} = Table.length files + + let on_going_actions {last_actions; _} = Table.length last_actions + end + let init ~lru_size = (* FIXME https://gitlab.com/tezos/tezos/-/issues/6774 @@ -363,26 +375,38 @@ end = struct current action is completed. The promise returns the opened file associated to the action if it exists once the action is completed. *) - let wait_last_action = + let wait_last_action filepath = let open Lwt_syntax in function | Read p -> + Format.eprintf "WAIT READ:%s @." filepath ; let* file, _ = p in + Format.eprintf "END READ:%s @." filepath ; return file | Close p -> + Format.eprintf "WAIT CLOSE:%s @." filepath ; let* () = p in + Format.eprintf "END CLOSE:%s @." filepath ; return_none | Write p -> + Format.eprintf "WAIT WRITE:%s @." filepath ; let* file, _ = p in + Format.eprintf "END WRITE:%s @." filepath ; return_some file | Value_exists p -> + Format.eprintf "WAIT VALUE EXISTS:%s @." filepath ; let* file, _ = p in + Format.eprintf "END VALUE EXISTS:%s @." filepath ; return file | Count_values p -> + Format.eprintf "WAIT COUNT VALUES:%s @." filepath ; let* file, _ = p in + Format.eprintf "END COUNT VALUES:%s @." filepath ; return file | Remove p -> + Format.eprintf "WAIT REMOVE:%s @." filepath ; let* () = p in + Format.eprintf "END REMOVE:%s @." filepath ; return_none (* This function is the only one that calls [Lwt_unix.close]. *) @@ -522,18 +546,25 @@ end = struct (* If an action is happening concurrently on the file, we wait for it to end. The action returns the opened file if any. *) + Format.eprintf "GET FILE FROM LAST ACTION@." ; match last_or_concurrent_action with | None -> ( let file_cached = Table.find_opt files filepath in match file_cached with - | None -> Lwt.return_none + | None -> + Format.eprintf "NO FILE CACHED@." ; + Lwt.return_none | Some (Closing p) -> + Format.eprintf "FILE CACHED IS CLOSING@." ; let* () = p in + Format.eprintf "FILE CACHED IS CLOSED@." ; Lwt.return_none | Some (Opening p) -> + Format.eprintf "FILE CACHED IS OPENING@." ; let* opened_file = p in + Format.eprintf "FILE CACHED IS OPENED@." ; Lwt.return_some opened_file) - | Some action -> wait_last_action action + | Some action -> wait_last_action filepath action (* Any action on the key value store can be implemented in this way. *) let generic_action files last_actions filepath ~on_file_closed @@ -614,6 +645,7 @@ end = struct end let close_file files last_actions filepath = + Format.eprintf "CLOSE FILE: %s@." filepath ; (* Since this function does not aim to be exposed, we do not check whether the store is closed. This would actually be a mistake since it is used while the store is closing. @@ -635,6 +667,9 @@ end = struct Table.replace files filepath (Closing p) ; Table.replace last_actions filepath (Close p) ; let* () = p in + (* It is not clear why we can't call the [clean_up] function + below. Instead, it is important to just remove this filepath. + It would be nice to exploring this further. *) Table.remove files filepath ; (* To avoid any memory leaks, we woud like to remove the corresponding entry from the [last_actions] table. However, while @@ -661,6 +696,16 @@ end = struct if !closed then return_unit else ( closed := true ; + Table.iter + (fun _ value -> + match value with + | Read p -> Lwt.cancel p + | Close p -> Lwt.cancel p + | Write p -> Lwt.cancel p + | Value_exists p -> Lwt.cancel p + | Count_values p -> Lwt.cancel p + | Remove p -> Lwt.cancel p) + last_actions ; let* () = Table.iter_s (fun filename _ -> close_file files last_actions filename) @@ -681,13 +726,17 @@ end = struct is bounded by the size of the LRU. This is why we wait first for the eviction promise to be fulfilled that will close the file evicted. *) + LRU.remove lru lru_node ; let* () = close_file files last_actions filepath in + let lru_node = LRU.add lru filename in + Format.eprintf "CLOSED FILE: %s@." filepath ; return lru_node (* This function aims to be used when the file already exists on the file system. *) let load_file files last_actions lru filename = let open Lwt_syntax in + Format.eprintf "LOAD FILE: %s@." filename ; let* lru_node = add_lru files last_actions lru filename in let* fd = Lwt_unix.openfile filename [O_RDWR; O_CLOEXEC] 0o660 in (* TODO: https://gitlab.com/tezos/tezos/-/issues/6033 @@ -712,6 +761,7 @@ end = struct a file that does not exist yet. *) let initialize_file files last_actions lru layout = let open Lwt_syntax in + Format.eprintf "INITIALIZE FILE: %s@." layout.filepath ; let* lru_node = add_lru files last_actions lru layout.filepath in let* fd = Lwt_unix.openfile @@ -749,6 +799,56 @@ end = struct if b then load_file files last_actions lru layout.filepath else initialize_file files last_actions lru layout + (* Tables must be cleaned up when promises are fulfilled. While in + theory it should be doable on each action, this seems difficult + to do it properly. Hence, the current solution. This is not + adequate if the `lru` size gets bigger because at each action we + will go through all the table. + + It should be possible to optimise this function further and to + modify only the files modified when an action is performed. + + At the moment this comment was written, I was not able to find a + proper way to do that. + + Moreover, as mentioned in the [close] function, there is + something weird: this function cannot be called in the [close] + function. + *) + let clean_up files last_actions = + Table.filter_map_inplace + (fun _ value -> + match value with + | Closing p -> ( + (* When a closing promise is fulfilled, it can safely be removed. *) + match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) + | Opening _ -> + (* This does not have to be cleaned-up because actions may + use this to know whether it needs to load the file or + not. Hence, the number of files in this state is bounded + by the size of the table `last_actions`. *) + Some value) + files ; + Table.filter_map_inplace + (fun _ value -> + match value with + | Remove p | Close p -> ( + match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) + | Read p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Count_values p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Value_exists p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Write _ -> Some value) + last_actions + let read {files; last_actions; lru; closed} layout key = let open Lwt_syntax in if !closed then @@ -772,7 +872,8 @@ end = struct in let p = Action.read ~on_file_closed files last_actions layout key in Table.replace last_actions layout.filepath (Read p) ; - let+ _file, value = p in + let+ _, value = p in + clean_up files last_actions ; value (* Very similar to [read] action except we only look at the bitset @@ -797,6 +898,7 @@ end = struct in Table.replace last_actions layout.filepath (Value_exists p) ; let+ _, exists = p in + clean_up files last_actions ; exists (* Very similar to [value_exists] action except we look at the @@ -810,7 +912,9 @@ end = struct let on_file_closed ~on_file_opened = let* r = may_load_file files last_actions lru layout.filepath in match r with - | None -> return (None, Ok 0) + | None -> + (* Table.remove files layout.filepath ; *) + return (None, Ok 0) | Some opened_file_promise -> Table.replace files layout.filepath (Opening opened_file_promise) ; let* opened_file = opened_file_promise in @@ -819,6 +923,7 @@ end = struct let p = Action.count_values ~on_file_closed files last_actions layout in Table.replace last_actions layout.filepath (Count_values p) ; let+ _, count = p in + clean_up files last_actions ; count let write ?(override = false) {files; last_actions; lru; closed} layout key @@ -835,6 +940,7 @@ end = struct let* opened_file = opened_file_promise in on_file_opened opened_file in + clean_up files last_actions ; let p = Action.write ~on_file_closed @@ -847,6 +953,7 @@ end = struct in Table.replace last_actions layout.filepath (Write p) ; let+ _file, result = p in + clean_up files last_actions ; result let remove {files; last_actions; lru; closed} layout = @@ -854,7 +961,10 @@ end = struct if !closed then Lwt.return (Error (Error_monad.TzTrace.make (Closed {action = "remove"}))) else - let on_file_closed ~on_file_opened:_ = may_remove_file layout.filepath in + let on_file_closed ~on_file_opened:_ = + let* () = may_remove_file layout.filepath in + Lwt.return_unit + in let p = Action.remove_file ~on_file_closed @@ -866,13 +976,7 @@ end = struct Table.replace last_actions layout.filepath (Remove p) ; Table.replace files layout.filepath (Closing p) ; let* () = p in - (* See [close_file] for an explanation of the lines below. *) - (match Table.find_opt last_actions layout.filepath with - | Some (Close p) -> ( - match Lwt.state p with - | Lwt.Return _ -> Table.remove last_actions layout.filepath - | _ -> ()) - | _ -> ()) ; + clean_up files last_actions ; return_ok () end @@ -1066,6 +1170,12 @@ let remove_file {files; root_dir; _} file_layout file = let layout = file_layout ~root_dir file in Files.remove files layout +module View = struct + let opened_files {files; _} = Files.View.opened_files files + + let on_going_actions {files; _} = Files.View.on_going_actions files +end + module Internal_for_tests = struct let init ?(lockfile_prefix = "internal_for_tests") ~lru_size ~root_dir () = let open Lwt_result_syntax in diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index 4c88622b63dcce229a02bf99fe9d21170953e79b..68851e170196fcb95ca8e5060a1726b26ffbe4fa 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -208,6 +208,16 @@ val count_values : 'file -> int tzresult Lwt.t +module View : sig + (** Returns the number of files current opened by the key value + store. Do note this number is an upper bound on the number of + file descriptors opened. + *) + val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int +end + module Internal_for_tests : sig (** Same as {!init} above, except that the user can specify a prefix for the lock file (default is lockfile_prefix = "internal_for_tests") to avoid issues diff --git a/src/lib_stdlib_unix/test/dune b/src/lib_stdlib_unix/test/dune index 274016a3cece66243520b9ea1cef612b0e82f724..22cd78905522334e753841c179507615823141bf 100644 --- a/src/lib_stdlib_unix/test/dune +++ b/src/lib_stdlib_unix/test/dune @@ -11,7 +11,9 @@ octez-libs.event-logging octez-libs.test-helpers qcheck-alcotest - octez-alcotezt) + octez-alcotezt + tezt-bam) + (preprocess (pps bam-ppx)) (library_flags (:standard -linkall)) (flags (:standard) @@ -26,6 +28,7 @@ (modules test_key_value_store test_key_value_store_fuzzy + test_key_value_store_fuzzy_bam test_log_config_rules)) (executable diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml index ed193cba24d6332c6dca335378f16df4a475ba12..b8598f2e72796e444d1e2be7bfc7e922b254a65e 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml @@ -96,6 +96,12 @@ module type S = sig ('file, 'key, 'value) Key_value_store.file_layout -> 'file -> int tzresult Lwt.t + + module View : sig + val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int + end end let value_size = 1 @@ -142,6 +148,14 @@ module R : S = struct (fun (file', _) _ count -> if file = file' then count + 1 else count) t 0 + + module View = struct + let opened_files table = + table |> Stdlib.Hashtbl.to_seq_keys |> Seq.map fst |> List.of_seq + |> List.sort_uniq compare |> List.length + + let on_going_actions = opened_files + end end module Helpers = struct @@ -374,46 +388,26 @@ module Helpers = struct let pp_scenario fmt (action, next_actions) = let rec pp shift action fmt next_actions = - let shift_str = "|| " in - if shift then Format.fprintf fmt "%s " shift_str ; match next_actions with | [] -> Format.fprintf fmt "%a@." pp_action action ; if shift then Format.fprintf fmt "Wait" | (Parallel, next_action) :: actions -> - if shift then - Format.fprintf - fmt - "%a@.%a" - pp_action - action - (pp true next_action) - actions - else - Format.fprintf - fmt - "%a@.Wait@.%a" - pp_action - action - (pp true next_action) - actions + Format.fprintf + fmt + "P %a@.%a" + pp_action + action + (pp true next_action) + actions | (Sequential, next_action) :: actions -> - if shift then - Format.fprintf - fmt - "Wait@.%a@.%a@." - pp_action - action - (pp false next_action) - actions - else - Format.fprintf - fmt - "%a@.%a@." - pp_action - action - (pp false next_action) - actions + Format.fprintf + fmt + "S %a@.%a@." + pp_action + action + (pp false next_action) + actions in Format.fprintf fmt "%a" (pp false action) next_actions end @@ -440,6 +434,7 @@ let run_scenario |> Filename.concat "tezos-pbt-tests" |> Filename.concat tmp_dir in + Unix.system @@ Format.asprintf "rm -rf %s" root_dir |> ignore ; let file_layout ~root_dir file = let filepath = Filename.concat root_dir file in Key_value_store.layout @@ -578,7 +573,36 @@ let run_scenario (function Ok () -> return_unit | Error err -> fail err) promises_running_seq in - run_actions action next_actions Seq_s.empty + if L.View.opened_files left = min lru_size (R.View.opened_files right) + then + if + L.View.on_going_actions left + <= min lru_size (R.View.on_going_actions right) + + L.View.opened_files left + then run_actions action next_actions Seq_s.empty + else ( + Format.eprintf + "################ Expected on going actions table to be %d. Got \ + %d (lru size is: %d, of: %d, remaining: %d).@.%a@." + (R.View.on_going_actions right) + (L.View.on_going_actions left) + lru_size + (L.View.opened_files left) + (List.length next_actions + 1) + pp_scenario + scenario ; + failwith + "Expected on going actions table to be %d. Got %d (lru size is: \ + %d)." + (R.View.on_going_actions right) + (L.View.on_going_actions left) + lru_size) + else + failwith + "Expected size of files table to be %d. Got %d (lru size is: %d)." + (R.View.opened_files right) + (L.View.opened_files left) + lru_size | (Parallel, action) :: next_actions -> (* We do not wait for promises to end and append them to the list of promises on-going. *) @@ -614,6 +638,7 @@ let sequential_test = ~retries:1 test_gen (fun (parameters, scenario) -> + Format.eprintf "@@@@@@@@.%a@." pp_scenario scenario ; let promise = let* _ = run_scenario parameters scenario in return_true @@ -636,8 +661,8 @@ let parallel_test = ~print ~name:"key-value store concurrent writes/reads" ~count:10_000 - ~max_fail:1 (* to stop shrinking after [max_fail] failures. *) - ~retries:1 + ~max_fail:(-1) (* to stop shrinking after [max_fail] failures. *) + ~retries:0 test_gen (fun (parameters, scenario) -> let promise = diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml new file mode 100644 index 0000000000000000000000000000000000000000..3268c0305cbd5d4a1ad7f6d5cd32f6bbe79173bc --- /dev/null +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml @@ -0,0 +1,615 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: Key-value store + Invocation: dune exec src/lib_stdlib_unix/test/main.exe \ + -- --file test_key_value_store_fuzzy.ml + Subject: Test the key-value store +*) + +open Error_monad + +(* This test file checks the correctness of the key-value store + (module [L]) with respect to the interface [S] using a reference + implementation (see module [R]) which is obviously correct. + + The main property tested is that the implementation agrees with the + reference implementation in a sequential and concurrent + setting. Because the reference implementation does not do I/Os, the + property means that the key-value store is consistent with the + order of the operations (ex: If for a given key, we write the value + 1, and then the value 2, any subsequent read will return the value + 2 for this key). Note that this property is not trivial if the + writes are processed concurrently and actually is false if the + function [write_values] is used (this is a reason why we do not + expose it in the interface of [S]). + + This property is tested on random scenarios, where a scenario is roughly a + list of actions and two consecutive actions can be bound either sequentially + or in parallel. + + We check that both implementations return the same results on the generated + scenarios. *) + +module type S = sig + type ('file, 'key, 'value) t + + val init : + lru_size:int -> root_dir:string -> ('file, 'key, 'value) t tzresult Lwt.t + + val close : ('file, 'key, 'value) t -> unit tzresult Lwt.t + + val write_value : + ?override:bool -> + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + 'key -> + 'value -> + unit tzresult Lwt.t + + val read_value : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + 'key -> + 'value tzresult Lwt.t + + val read_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + ('file * 'key) Seq.t -> + ('file * 'key * 'value tzresult) Seq_s.t + + val remove_file : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + unit tzresult Lwt.t + + val count_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + int tzresult Lwt.t + + module View : sig + val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int + end +end + +module L : S = Key_value_store + +module R : S = struct + type ('file, 'key, 'value) t = ('file * 'key, 'value) Stdlib.Hashtbl.t + + let init ~lru_size:_ ~root_dir:_ = Lwt.return_ok @@ Stdlib.Hashtbl.create 100 + + let close _ = Lwt_syntax.return_ok_unit + + let write_value ?(override = false) t _file_layout file key value = + let open Lwt_result_syntax in + if override || not (Stdlib.Hashtbl.mem t (file, key)) then ( + Stdlib.Hashtbl.replace t (file, key) value ; + return_unit) + else return_unit + + let read_value t _file_layout file key = + let key = (file, key) in + let open Lwt_result_syntax in + match Stdlib.Hashtbl.find_opt t key with + | None -> failwith "key not found" + | Some key -> return key + + let read_values t file_layout seq = + let open Lwt_syntax in + seq |> Seq_s.of_seq + |> Seq_s.S.map (fun (file, key) -> + let* value = read_value t file_layout file key in + Lwt.return (file, key, value)) + + let remove_file t _file_layout file = + Stdlib.Hashtbl.filter_map_inplace + (fun (file', _) value -> if file = file' then None else Some value) + t ; + Lwt.return (Ok ()) + + let count_values t _file_layout file = + Lwt_result_syntax.return + @@ Stdlib.Hashtbl.fold + (fun (file', _) _ count -> if file = file' then count + 1 else count) + t + 0 + + module View = struct + let opened_files table = + table |> Stdlib.Hashtbl.to_seq_keys |> Seq.map fst |> List.of_seq + |> List.sort_uniq compare |> List.length + + let on_going_actions = opened_files + end +end + +module Helpers = struct + open Bam.Std + open Bam.Std.Syntax + + let number_of_files_max = 3 + + let key_max = 4 + + let value_size = 1 + + let make_file id = Printf.sprintf "file_%d" id + + type filename = string + + let gen_filename = + let* n = int ~min:0 ~max:(number_of_files_max - 1) () in + return (make_file n) + + type key = filename * int + + let gen_key = + let* filename = gen_filename in + let* key = int ~min:0 ~max:(key_max - 1) () in + return (filename, key) + + type value = String.t + + let gen_value = string ~size:(int ~min:1 ~max:1 ()) () + + type write_payload = {key : key; override : bool; default : bool} + [@@deriving gen] + + let pp_write_payload fmt {key = file, key; override; default} = + Format.fprintf + fmt + "[key=%s/%d, override=%b, default=%b]" + file + key + override + default + + type action = + | Count_values of filename + | Read_value of key + | Write_value of write_payload + | Remove_file of filename + | Read_values of key list [@max 10] + [@@deriving gen] + + let pp_action fmt = function + | Write_value payload -> Format.fprintf fmt "W%a" pp_write_payload payload + | Read_value (file, key) -> Format.fprintf fmt "R[key=%s/%d]" file key + | Read_values keys -> + let str_keys = + String.concat + "; " + (keys + |> List.map (fun (file, key) -> Printf.sprintf "key=%s/%d" file key) + ) + in + Format.fprintf fmt "R[%s]" str_keys + | Remove_file file -> Format.fprintf fmt "REMOVE[file=%s]" file + | Count_values file -> Format.fprintf fmt "COUNT[file=%s]" file + + type bind = Sequential | Parallel [@@deriving gen] + + let values_gen = + let open Bam.Std in + let open Bam.Std.Syntax in + let key_gen = int ~min:0 ~max:(key_max - 1) () in + let value_gen = string ~size:(int ~min:0 ~max:1 ()) () in + let* bindings = + list + ~size:(int ~min:0 ~max:4 ()) + (pair (pair gen_filename key_gen) value_gen) + in + return (bindings |> List.to_seq |> Stdlib.Hashtbl.of_seq) + + type parameters = { + number_of_files : int; + number_of_keys_per_file : int; + read_values_seq_size : int; + lru_size : int; + value_size : int; (* in bytes *) + values : (key, value) Stdlib.Hashtbl.t; + overwritten : (key, value) Stdlib.Hashtbl.t; + } + + let gen_parameters = + let* number_of_files = + int ~min:number_of_files_max ~max:number_of_files_max () + in + let* number_of_keys_per_file = int ~min:key_max ~max:key_max () in + let* read_values_seq_size = int ~min:1 ~max:key_max () in + let lru_size = max 0 (number_of_files - 2) in + let all_keys = + Stdlib.List.init number_of_files (fun file -> + Stdlib.List.init number_of_keys_per_file (fun key -> + (make_file file, key))) + |> List.flatten + in + let values_gen = + let* values = + list + ~size:(int ~min:(List.length all_keys) ~max:(List.length all_keys) ()) + gen_value + in + let bindings = Stdlib.List.combine all_keys values in + return (bindings |> List.to_seq |> Stdlib.Hashtbl.of_seq) + in + let* values = values_gen in + let* overwritten = values_gen in + return + { + number_of_files; + number_of_keys_per_file; + read_values_seq_size; + lru_size; + value_size; + values; + overwritten; + } + + let keys files_max keys_max = + Stdlib.List.init (files_max - 1) (fun file -> + Stdlib.List.init (keys_max - 1) (fun key -> (make_file file, key))) + |> List.flatten |> Array.of_list + + let pp_parameters fmt + { + number_of_files; + number_of_keys_per_file; + read_values_seq_size; + lru_size; + value_size; + values; + overwritten; + } = + let string_of_values values = + values |> Stdlib.Hashtbl.to_seq |> List.of_seq + |> List.map (fun ((file, key), value) -> + Format.asprintf "[key=%s/%d,value=%s]" file key value) + |> String.concat " " + in + Format.fprintf fmt "number of files = %d@." number_of_files ; + Format.fprintf fmt "number of keys per file = %d@." number_of_keys_per_file ; + Format.fprintf fmt "sequence length for reads = %d@." read_values_seq_size ; + Format.fprintf fmt "lru size = %d@." lru_size ; + Format.fprintf fmt "value size = %d (in bytes)@." value_size ; + Format.fprintf fmt "default values = %s@." (string_of_values values) ; + Format.fprintf fmt "override values = %s@." (string_of_values overwritten) + + (* A scenario is a list of actions. The bind elements tells whether + the next bind waits for the previous promises running or is done + in parallel. This datatype does not allow to bind sequentially a + group of parallel actions though. *) + type scenario = action * (bind * action) list + + let gen_scenario = + let open Bam.Std in + let bind_list = + list + ~shrinker:Shrinker.Prefix + ~size:(int ~min:1 ~max:50 ()) + (pair gen_bind gen_action) + in + pair gen_action bind_list + + let pp_scenario fmt (action, next_actions) = + let rec pp fmt next_actions = + match next_actions with + | [] -> () + | (Parallel, next_action) :: actions -> + Format.fprintf fmt "P %a@.%a@." pp_action next_action pp actions + | (Sequential, next_action) :: actions -> + Format.fprintf fmt "S %a@.%a@." pp_action next_action pp actions + in + Format.fprintf fmt "%a@." pp_action action ; + Format.fprintf fmt "%a" pp next_actions +end + +include Helpers + +let run_scenario + {lru_size; values; overwritten; number_of_files; number_of_keys_per_file; _} + scenario = + let open Lwt_result_syntax in + let pid = Unix.getpid () in + let tmp_dir = Filename.get_temp_dir_name () in + (* To avoid any conflict with previous runs of this test. *) + let root_dir = + Format.asprintf "key-value-store-test-key-%d" pid + |> Filename.concat "tezos-pbt-tests" + |> Filename.concat tmp_dir + in + Unix.system @@ Format.asprintf "rm -rf %s" root_dir |> ignore ; + let file_layout ~root_dir file = + let filepath = Filename.concat root_dir file in + Key_value_store.layout + ~encoding:(Data_encoding.Fixed.bytes value_size) + ~filepath + ~eq:( = ) + ~index_of:Fun.id + ~number_of_keys_per_file:4096 + () + in + let* left = L.init ~lru_size ~root_dir in + let* right = R.init ~lru_size ~root_dir in + let action, next_actions = scenario in + let compare_tzresult finalization pp_while pp_val left_result right_result = + let pp_result fmt = function + | Ok v -> pp_val fmt v + | Error err -> Error_monad.pp_print_trace fmt err + in + let fail () = + failwith + "%s Unexpected different value while %a.@. At %s:@.Expected: %a@.Got: \ + %a@." + finalization + pp_while + () + root_dir + pp_result + right_result + pp_result + left_result + in + match (left_result, right_result) with + | Ok left_value, Ok right_value -> + if left_value = right_value then return_unit else fail () + | Error _, Error _ -> return_unit + | Ok _, Error _ | Error _, Ok _ -> fail () + in + let compare_result ~finalization (file, key) left_result right_result = + let finalization = if finalization then "(finalization) " else "" in + compare_tzresult + finalization + (fun fmt () -> Format.fprintf fmt "reading key %s/%d" file key) + (fun fmt b -> Format.fprintf fmt "%s" (Bytes.to_string b)) + left_result + right_result + in + let rec run_actions action next_actions promises_running_seq = + Format.eprintf "RUN: %a@." pp_action action ; + let value_of_key ~default file key = + let key = (file, key) in + let table = if default then values else overwritten in + Stdlib.Hashtbl.find table key + in + let promise = + match action with + | Write_value {override; default; key = file, key} -> + let value = value_of_key ~default file key |> Bytes.of_string in + let left_promise = + let* r = L.write_value ~override left file_layout file key value in + Format.eprintf "AFTER: %a@." pp_action action ; + return r + in + let right_promise = + R.write_value ~override right file_layout file key value + in + tzjoin [left_promise; right_promise] + | Read_value (file, key) -> + let left_promise = L.read_value left file_layout file key in + let right_promise = R.read_value right file_layout file key in + let*! left_result = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + let*! right_result = right_promise in + compare_result + ~finalization:false + (file, key) + left_result + right_result + | Read_values seq -> + let left_promise = + let seq_s = L.read_values left file_layout (List.to_seq seq) in + Seq_s.E.iter (fun _ -> Ok ()) seq_s + in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = + let seq_s = R.read_values right file_layout (List.to_seq seq) in + Seq_s.E.iter (fun _ -> Ok ()) seq_s + in + tzjoin [left_promise; right_promise] + | Remove_file file -> + let left_promise = L.remove_file left file_layout file in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = R.remove_file right file_layout file in + + tzjoin [left_promise; right_promise] + | Count_values file -> + let left_promise = L.count_values left file_layout file in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = R.count_values right file_layout file in + let*! left_result = left_promise in + let*! right_result = right_promise in + compare_tzresult + "" + (fun fmt () -> Format.fprintf fmt "counting values in file %s" file) + (fun fmt -> Format.fprintf fmt "%d") + left_result + right_result + in + let finalize () = + let* left = L.init ~lru_size:number_of_files ~root_dir in + let* () = + Seq.ES.iter + (fun (file, key) -> + let*! left_result = L.read_value left file_layout file key in + let*! right_result = R.read_value right file_layout file key in + compare_result + ~finalization:true + (file, key) + left_result + right_result) + (keys number_of_files number_of_keys_per_file |> Array.to_seq) + in + L.close left |> Lwt.map Result.ok + in + match next_actions with + | [] -> + Format.eprintf "LAST@." ; + let* () = promise in + let* () = + Seq_s.ES.iter + (function Ok () -> return_unit | Error err -> fail err) + promises_running_seq + in + let* _ = finalize () in + return (left, right) + | (Sequential, action) :: next_actions -> + Format.eprintf "WAIT S@." ; + let* () = promise in + let* () = + Seq_s.ES.iter + (function Ok () -> return_unit | Error err -> fail err) + promises_running_seq + in + let*! number_parallel_actions = Seq_s.length promises_running_seq in + (* The following checks ensure that tables are bounded. + However, the reason for the upper bound to be + [lru_size+number_parallel_actions+1] is not 100% clear to me. + + While testing, we can check that the bound + [lru_size+number_parallel_actions] is violated. + + The reason why we need to take into account + [number_parallel_actions] is quite clear. + + When starting a new parallel action, an entry is added in + both tables no matter what. Hence, if we start [n] parallel + actions, there will be [n] entries. + + It does not mean we will have [n] opened file descriptors, + this is guaranteed by the implementation and the LRU (hard + to check here though). + + The reason why a +1 is needed is probably that when + starting an action, it can trigger a closing action for + another file. By doing so, a new entry will be created for + this file in both tables. However, I would have expected + those entries to be cleaned up before the main action ends. + However, we can't call the [clean_up] function while + closing a file. So this is probably related. + *) + let upper_bound = lru_size + number_parallel_actions + 1 in + if L.View.opened_files left <= upper_bound then + if + L.View.on_going_actions left + <= lru_size + number_parallel_actions + 1 + then run_actions action next_actions Seq_s.empty + else + failwith + "Expected size of on going action table to be at most %d. Got %d \ + (remaining actions: %d)." + upper_bound + (L.View.on_going_actions left) + (List.length next_actions + 1) + else + failwith + "Expected size of files table to be at most %d. Got %d (remaining \ + actions: %d)." + upper_bound + (L.View.opened_files left) + (List.length next_actions + 1) + | (Parallel, action) :: next_actions -> + Format.eprintf "WAIT P@." ; + (* We do not wait for promises to end and append them to the + list of promises on-going. *) + let promises_running_seq = Seq_s.cons_s promise promises_running_seq in + run_actions action next_actions promises_running_seq + in + let* result = run_actions action next_actions Seq_s.empty in + let*! _ = L.close left in + return result + +let pp fmt (scenario, parameters) = + Format.fprintf + fmt + "@.Parameters:@.%a@.@.Scenario:@.%a@.@." + pp_parameters + parameters + pp_scenario + scenario + +module SS = Set.Make (struct + type t = scenario * parameters + + let compare = compare +end) + +let set = ref SS.empty + +module SI = Set.Make (struct + type t = int + + let compare = compare +end) + +let count = ref SI.empty + +let printed = ref false + +let sequential_test = + let open Tezt_bam in + let property (scenario, parameters) = + let promise = run_scenario parameters scenario in + match Lwt_main.run promise with + | Ok _ -> Ok () + | Error err -> + Format.eprintf "#######################KO@." ; + Error (`Fail (Format.asprintf "%a" Error_monad.pp_print_trace err)) + in + Pbt.register + ~pp + ~expected_sampling_ratio:(-1.) + ~minimum_number_of_samples:0 + ~stop_after:(`Count 2_000) + ~title:"key-value store sequential writes/reads" + ~__FILE__ + ~tags:["kvs"; "sequential"] + ~on_sample:(fun sample -> Format.eprintf "SAMPLE@.%a@." pp sample) + ~gen:(Bam.Std.pair gen_scenario gen_parameters) + ~property + () diff --git a/tezt/tests/cloud/dal.ml b/tezt/tests/cloud/dal.ml index 8fc91c99e6defdecf1507c7c8a041101b40a2819..b1aa29d8b782bf44341e4efcc4985a5069471af5 100644 --- a/tezt/tests/cloud/dal.ml +++ b/tezt/tests/cloud/dal.ml @@ -530,6 +530,13 @@ module Cli = struct ~unset_long:"no-teztale" ~description:"Runs teztale" false + + let memtrace = + Clap.flag + ~section + ~set_long:"memtrace" + ~description:"Use memtrace on all the services" + false end type configuration = { @@ -1179,7 +1186,12 @@ let init_testnet cloud (configuration : configuration) teztale agent in let* () = Node.wait_for_ready node in let* () = add_source cloud agent ~job_name:"bootstrap" node dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run + ~memtrace:Cli.memtrace + ~event_level:`Notice + dal_node + in let client = Client.create ~endpoint:(Node node) () in let node_rpc_endpoint = Endpoint. @@ -1380,7 +1392,12 @@ let init_sandbox_and_activate_protocol cloud (configuration : configuration) ~bootstrap_profile:true dal_bootstrap_node in - let* () = Dal_node.run ~event_level:`Notice dal_bootstrap_node in + let* () = + Dal_node.Agent.run + ~memtrace:Cli.memtrace + ~event_level:`Notice + dal_bootstrap_node + in let* () = add_source cloud @@ -1442,7 +1459,9 @@ let init_baker cloud (configuration : configuration) ~bootstrap teztale account ~peers:[bootstrap.dal_node_p2p_endpoint] (* no need for peer *) dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in Lwt.return dal_node in let* () = @@ -1554,7 +1573,9 @@ let init_producer cloud configuration ~bootstrap teztale ~number_of_slots (* We do not wait on the promise because loading the SRS takes some time. Instead we will publish commitments only once this promise is fulfilled. *) let () = toplog "Init producer: wait for DAL node to be ready" in - let is_ready = Dal_node.run ~event_level:`Notice dal_node in + let is_ready = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in let () = toplog "Init producer: DAL node is ready" in let* () = match teztale with @@ -1602,7 +1623,9 @@ let init_observer cloud configuration ~bootstrap teztale ~slot_index i agent = node dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in let* () = match teztale with | None -> Lwt.return_unit @@ -1683,7 +1706,7 @@ let init_etherlink_operator_setup cloud configuration name ~bootstrap ~dal_slots ~peers:[bootstrap.dal_node_p2p_endpoint] dal_node in - let* () = Dal_node.run dal_node in + let* () = Dal_node.Agent.run ~memtrace:Cli.memtrace dal_node in some dal_node in let* sc_rollup_node = @@ -2113,7 +2136,7 @@ let on_new_level t level = let baker_to_reconnect = (List.nth t.bakers (b mod nb_bakers)).dal_node in - Dal_node.run baker_to_reconnect) + Dal_node.Agent.run ~memtrace:Cli.memtrace baker_to_reconnect) in Lwt.return {t with disconnection_state = Some disconnection_state} diff --git a/tezt/tests/cloud/tezos.ml b/tezt/tests/cloud/tezos.ml index 266b5b9cadd57c10d2bf061792358f199c3ed1fa..1eed2e971b5fa2983883e4426d3a875a726f6cc9 100644 --- a/tezt/tests/cloud/tezos.ml +++ b/tezt/tests/cloud/tezos.ml @@ -67,6 +67,17 @@ module Dal_node = struct let listen_addr = Format.asprintf "0.0.0.0:%d" net_port in create ?name ~path ?runner ~rpc_port ~metrics_addr ~listen_addr ~node () |> Lwt.return + + let run ?(memtrace = false) ?event_level dal_node = + let name = name dal_node in + let filename = + Format.asprintf "%s/%s-trace.ctf" (Filename.get_temp_dir_name ()) name + in + let env = + if memtrace then Some (String_map.singleton "MEMTRACE" filename) + else None + in + run ?env ?event_level dal_node end end