From 47066337b231ea79506780add4e4f1eaa1378e8b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Thu, 26 Sep 2024 14:37:46 +0200 Subject: [PATCH 1/9] Tezt/Cloud: Add support for memtrace on the DAL node side --- tezt/tests/cloud/tezos.ml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/tezt/tests/cloud/tezos.ml b/tezt/tests/cloud/tezos.ml index 266b5b9cadd5..ff1e4cca368b 100644 --- a/tezt/tests/cloud/tezos.ml +++ b/tezt/tests/cloud/tezos.ml @@ -67,6 +67,17 @@ module Dal_node = struct let listen_addr = Format.asprintf "0.0.0.0:%d" net_port in create ?name ~path ?runner ~rpc_port ~metrics_addr ~listen_addr ~node () |> Lwt.return + + let run ?(memtrace = false) ?event_level dal_node = + let name = name dal_node in + let filename = + Format.asprintf "%s/%s-trace.ctf" (Filename.get_temp_dir_name ()) name + in + let env = + if memtrace then None + else Some (String_map.singleton "MEMTRACE" filename) + in + run ?env ?event_level dal_node end end -- GitLab From ef3c1b9807cadea4579eae35d9e058fdccf55f83 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Thu, 26 Sep 2024 14:38:03 +0200 Subject: [PATCH 2/9] Tezt/Cloud: Add a memtrace CLI option --- tezt/tests/cloud/dal.ml | 37 ++++++++++++++++++++++++++++++------- 1 file changed, 30 insertions(+), 7 deletions(-) diff --git a/tezt/tests/cloud/dal.ml b/tezt/tests/cloud/dal.ml index 8fc91c99e6de..b1aa29d8b782 100644 --- a/tezt/tests/cloud/dal.ml +++ b/tezt/tests/cloud/dal.ml @@ -530,6 +530,13 @@ module Cli = struct ~unset_long:"no-teztale" ~description:"Runs teztale" false + + let memtrace = + Clap.flag + ~section + ~set_long:"memtrace" + ~description:"Use memtrace on all the services" + false end type configuration = { @@ -1179,7 +1186,12 @@ let init_testnet cloud (configuration : configuration) teztale agent in let* () = Node.wait_for_ready node in let* () = add_source cloud agent ~job_name:"bootstrap" node dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run + ~memtrace:Cli.memtrace + ~event_level:`Notice + dal_node + in let client = Client.create ~endpoint:(Node node) () in let node_rpc_endpoint = Endpoint. @@ -1380,7 +1392,12 @@ let init_sandbox_and_activate_protocol cloud (configuration : configuration) ~bootstrap_profile:true dal_bootstrap_node in - let* () = Dal_node.run ~event_level:`Notice dal_bootstrap_node in + let* () = + Dal_node.Agent.run + ~memtrace:Cli.memtrace + ~event_level:`Notice + dal_bootstrap_node + in let* () = add_source cloud @@ -1442,7 +1459,9 @@ let init_baker cloud (configuration : configuration) ~bootstrap teztale account ~peers:[bootstrap.dal_node_p2p_endpoint] (* no need for peer *) dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in Lwt.return dal_node in let* () = @@ -1554,7 +1573,9 @@ let init_producer cloud configuration ~bootstrap teztale ~number_of_slots (* We do not wait on the promise because loading the SRS takes some time. Instead we will publish commitments only once this promise is fulfilled. *) let () = toplog "Init producer: wait for DAL node to be ready" in - let is_ready = Dal_node.run ~event_level:`Notice dal_node in + let is_ready = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in let () = toplog "Init producer: DAL node is ready" in let* () = match teztale with @@ -1602,7 +1623,9 @@ let init_observer cloud configuration ~bootstrap teztale ~slot_index i agent = node dal_node in - let* () = Dal_node.run ~event_level:`Notice dal_node in + let* () = + Dal_node.Agent.run ~memtrace:Cli.memtrace ~event_level:`Notice dal_node + in let* () = match teztale with | None -> Lwt.return_unit @@ -1683,7 +1706,7 @@ let init_etherlink_operator_setup cloud configuration name ~bootstrap ~dal_slots ~peers:[bootstrap.dal_node_p2p_endpoint] dal_node in - let* () = Dal_node.run dal_node in + let* () = Dal_node.Agent.run ~memtrace:Cli.memtrace dal_node in some dal_node in let* sc_rollup_node = @@ -2113,7 +2136,7 @@ let on_new_level t level = let baker_to_reconnect = (List.nth t.bakers (b mod nb_bakers)).dal_node in - Dal_node.run baker_to_reconnect) + Dal_node.Agent.run ~memtrace:Cli.memtrace baker_to_reconnect) in Lwt.return {t with disconnection_state = Some disconnection_state} -- GitLab From d267aa08cf98cf0c8cb9c2b60fb12a69dd36ae15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Thu, 26 Sep 2024 01:45:54 +0200 Subject: [PATCH 3/9] KVS: Add a view counting the number of "opened" files --- src/lib_stdlib_unix/key_value_store.ml | 12 ++++++++++++ src/lib_stdlib_unix/key_value_store.mli | 8 ++++++++ 2 files changed, 20 insertions(+) diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index 5376e445a17f..7d80958710bf 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -227,6 +227,10 @@ module Files : sig val count_values : 'value t -> ('key, 'value) layout -> int tzresult Lwt.t val remove : 'value t -> ('key, 'value) layout -> unit tzresult Lwt.t + + module View : sig + val opened_files : 'value t -> int + end end = struct module LRU = Ringo.LRU_Collection @@ -346,6 +350,10 @@ end = struct The store ensures that actions performed on a given file are done sequentially. *) + module View = struct + let opened_files {files; _} = Table.length files + end + let init ~lru_size = (* FIXME https://gitlab.com/tezos/tezos/-/issues/6774 @@ -1066,6 +1074,10 @@ let remove_file {files; root_dir; _} file_layout file = let layout = file_layout ~root_dir file in Files.remove files layout +module View = struct + let opened_files {files; _} = Files.View.opened_files files +end + module Internal_for_tests = struct let init ?(lockfile_prefix = "internal_for_tests") ~lru_size ~root_dir () = let open Lwt_result_syntax in diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index 4c88622b63dc..60b8bf3c5c59 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -208,6 +208,14 @@ val count_values : 'file -> int tzresult Lwt.t +module View : sig + (** Returns the number of files current opened by the key value + store. Do note this number is an upper bound on the number of + file descriptors opened. + *) + val opened_files : ('file, 'key, 'value) t -> int +end + module Internal_for_tests : sig (** Same as {!init} above, except that the user can specify a prefix for the lock file (default is lockfile_prefix = "internal_for_tests") to avoid issues -- GitLab From 8260ed223535d1c9f57f062e71d0bb2267703ccb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Thu, 26 Sep 2024 01:46:06 +0200 Subject: [PATCH 4/9] KVS/Test: Ensure the number of opened files is valid --- .../test/test_key_value_store_fuzzy.ml | 67 +++++++++---------- 1 file changed, 32 insertions(+), 35 deletions(-) diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml index ed193cba24d6..56c56d79aff0 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml @@ -96,6 +96,10 @@ module type S = sig ('file, 'key, 'value) Key_value_store.file_layout -> 'file -> int tzresult Lwt.t + + module View : sig + val opened_files : ('file, 'key, 'value) t -> int + end end let value_size = 1 @@ -142,6 +146,12 @@ module R : S = struct (fun (file', _) _ count -> if file = file' then count + 1 else count) t 0 + + module View = struct + let opened_files table = + table |> Stdlib.Hashtbl.to_seq_keys |> Seq.map fst |> List.of_seq + |> List.sort_uniq compare |> List.length + end end module Helpers = struct @@ -374,46 +384,26 @@ module Helpers = struct let pp_scenario fmt (action, next_actions) = let rec pp shift action fmt next_actions = - let shift_str = "|| " in - if shift then Format.fprintf fmt "%s " shift_str ; match next_actions with | [] -> Format.fprintf fmt "%a@." pp_action action ; if shift then Format.fprintf fmt "Wait" | (Parallel, next_action) :: actions -> - if shift then - Format.fprintf - fmt - "%a@.%a" - pp_action - action - (pp true next_action) - actions - else - Format.fprintf - fmt - "%a@.Wait@.%a" - pp_action - action - (pp true next_action) - actions + Format.fprintf + fmt + "P %a@.%a" + pp_action + action + (pp true next_action) + actions | (Sequential, next_action) :: actions -> - if shift then - Format.fprintf - fmt - "Wait@.%a@.%a@." - pp_action - action - (pp false next_action) - actions - else - Format.fprintf - fmt - "%a@.%a@." - pp_action - action - (pp false next_action) - actions + Format.fprintf + fmt + "S %a@.%a@." + pp_action + action + (pp false next_action) + actions in Format.fprintf fmt "%a" (pp false action) next_actions end @@ -578,7 +568,14 @@ let run_scenario (function Ok () -> return_unit | Error err -> fail err) promises_running_seq in - run_actions action next_actions Seq_s.empty + if L.View.opened_files left = min lru_size (R.View.opened_files right) + then run_actions action next_actions Seq_s.empty + else + failwith + "Expected size of files table to be %d. Got %d (lru size is: %d)." + (R.View.opened_files right) + (L.View.opened_files left) + lru_size | (Parallel, action) :: next_actions -> (* We do not wait for promises to end and append them to the list of promises on-going. *) -- GitLab From 598f0eba684f7f54219171e15dbb8a7a399a5e41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Mon, 30 Sep 2024 16:57:37 +0200 Subject: [PATCH 5/9] [with debug logs] --- manifest/externals.ml | 4 + manifest/product_octez.ml | 3 + opam/octez-libs.opam | 2 + opam/virtual/octez-deps.opam | 2 + src/lib_stdlib_unix/key_value_store.ml | 80 ++- src/lib_stdlib_unix/key_value_store.mli | 2 + src/lib_stdlib_unix/test/dune | 5 +- .../test/test_key_value_store_fuzzy.ml | 34 +- .../test/test_key_value_store_fuzzy_bam.ml | 610 ++++++++++++++++++ tezt/tests/cloud/tezos.ml | 4 +- 10 files changed, 732 insertions(+), 14 deletions(-) create mode 100644 src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml diff --git a/manifest/externals.ml b/manifest/externals.ml index f2b1154c0e01..15d67b0df32e 100644 --- a/manifest/externals.ml +++ b/manifest/externals.ml @@ -38,6 +38,8 @@ let asetmap = external_lib "asetmap" V.(at_least "0.8.1") let astring = external_lib "astring" V.True +let bam_ppx = external_lib "bam-ppx" V.True + let bheap = external_lib "bheap" V.(at_least "2.0.0") let bigarray_compat = external_lib "bigarray-compat" V.True @@ -287,6 +289,8 @@ let crowbar = external_lib "crowbar" V.(at_least "0.2") let ppx_hash = external_lib "ppx_hash" V.True +let tezt_bam = external_lib "tezt-bam" V.True + let tezt_lib = external_lib "tezt" diff --git a/manifest/product_octez.ml b/manifest/product_octez.ml index b0cc0d4b8525..64fbe08e72cf 100644 --- a/manifest/product_octez.ml +++ b/manifest/product_octez.ml @@ -1438,10 +1438,12 @@ let _octez_stdlib_unix_test = [ "test_key_value_store"; "test_key_value_store_fuzzy"; + "test_key_value_store_fuzzy_bam"; "test_log_config_rules"; ] ~path:"src/lib_stdlib_unix/test/" ~opam:"octez-libs" + ~preprocess:(pps bam_ppx) ~deps: [ octez_error_monad |> open_ |> open_ ~m:"TzLwtreslib"; @@ -1450,6 +1452,7 @@ let _octez_stdlib_unix_test = octez_test_helpers |> open_; qcheck_alcotest; alcotezt; + tezt_bam; ] let octez_dal_config = diff --git a/opam/octez-libs.opam b/opam/octez-libs.opam index a43b93dccb61..620f2533f8fe 100644 --- a/opam/octez-libs.opam +++ b/opam/octez-libs.opam @@ -82,6 +82,8 @@ depends: [ "octez-rust-deps" { = version } "lwt-watcher" { = "0.2" } "bigstring" {with-test} + "bam-ppx" {with-test} + "tezt-bam" {with-test} ] x-opam-monorepo-opam-provided: [ "tezos-sapling-parameters" diff --git a/opam/virtual/octez-deps.opam b/opam/virtual/octez-deps.opam index c749f7bed0f4..a9945f8aa584 100644 --- a/opam/virtual/octez-deps.opam +++ b/opam/virtual/octez-deps.opam @@ -15,6 +15,7 @@ depends: [ "alcotest-lwt" { >= "1.5.0" } "asetmap" { >= "0.8.1" } "astring" + "bam-ppx" "base-unix" "bheap" { >= "2.0.0" } "bigarray-compat" @@ -104,6 +105,7 @@ depends: [ "tar-unix" { >= "2.0.1" & < "3.0.0" } "tezos-sapling-parameters" { >= "1.1.0" } "tezt" { >= "4.1.0" & < "5.0.0" } + "tezt-bam" "tls-lwt" { >= "0.16.0" } "uri" { >= "3.1.0" } "uutf" diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index 7d80958710bf..df9841851844 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -230,6 +230,8 @@ module Files : sig module View : sig val opened_files : 'value t -> int + + val on_going_actions : 'value t -> int end end = struct module LRU = Ringo.LRU_Collection @@ -352,6 +354,8 @@ end = struct module View = struct let opened_files {files; _} = Table.length files + + let on_going_actions {last_actions; _} = Table.length last_actions end let init ~lru_size = @@ -371,26 +375,38 @@ end = struct current action is completed. The promise returns the opened file associated to the action if it exists once the action is completed. *) - let wait_last_action = + let wait_last_action filepath = let open Lwt_syntax in function | Read p -> + Format.eprintf "WAIT READ:%s @." filepath ; let* file, _ = p in + Format.eprintf "END READ:%s @." filepath ; return file | Close p -> + Format.eprintf "WAIT CLOSE:%s @." filepath ; let* () = p in + Format.eprintf "END CLOSE:%s @." filepath ; return_none | Write p -> + Format.eprintf "WAIT WRITE:%s @." filepath ; let* file, _ = p in + Format.eprintf "END WRITE:%s @." filepath ; return_some file | Value_exists p -> + Format.eprintf "WAIT VALUE EXISTS:%s @." filepath ; let* file, _ = p in + Format.eprintf "END VALUE EXISTS:%s @." filepath ; return file | Count_values p -> + Format.eprintf "WAIT COUNT VALUES:%s @." filepath ; let* file, _ = p in + Format.eprintf "END COUNT VALUES:%s @." filepath ; return file | Remove p -> + Format.eprintf "WAIT REMOVE:%s @." filepath ; let* () = p in + Format.eprintf "END REMOVE:%s @." filepath ; return_none (* This function is the only one that calls [Lwt_unix.close]. *) @@ -530,18 +546,25 @@ end = struct (* If an action is happening concurrently on the file, we wait for it to end. The action returns the opened file if any. *) + Format.eprintf "GET FILE FROM LAST ACTION@." ; match last_or_concurrent_action with | None -> ( let file_cached = Table.find_opt files filepath in match file_cached with - | None -> Lwt.return_none + | None -> + Format.eprintf "NO FILE CACHED@." ; + Lwt.return_none | Some (Closing p) -> + Format.eprintf "FILE CACHED IS CLOSING@." ; let* () = p in + Format.eprintf "FILE CACHED IS CLOSED@." ; Lwt.return_none | Some (Opening p) -> + Format.eprintf "FILE CACHED IS OPENING@." ; let* opened_file = p in + Format.eprintf "FILE CACHED IS OPENED@." ; Lwt.return_some opened_file) - | Some action -> wait_last_action action + | Some action -> wait_last_action filepath action (* Any action on the key value store can be implemented in this way. *) let generic_action files last_actions filepath ~on_file_closed @@ -622,6 +645,7 @@ end = struct end let close_file files last_actions filepath = + Format.eprintf "CLOSE FILE: %s@." filepath ; (* Since this function does not aim to be exposed, we do not check whether the store is closed. This would actually be a mistake since it is used while the store is closing. @@ -674,6 +698,16 @@ end = struct (fun filename _ -> close_file files last_actions filename) files in + Table.iter + (fun _ value -> + match value with + | Read p -> Lwt.cancel p + | Close p -> Lwt.cancel p + | Write p -> Lwt.cancel p + | Value_exists p -> Lwt.cancel p + | Count_values p -> Lwt.cancel p + | Remove p -> Lwt.cancel p) + last_actions ; LRU.clear lru ; return_unit) @@ -689,13 +723,17 @@ end = struct is bounded by the size of the LRU. This is why we wait first for the eviction promise to be fulfilled that will close the file evicted. *) + LRU.remove lru lru_node ; let* () = close_file files last_actions filepath in + let lru_node = LRU.add lru filename in + Format.eprintf "CLOSED FILE: %s@." filepath ; return lru_node (* This function aims to be used when the file already exists on the file system. *) let load_file files last_actions lru filename = let open Lwt_syntax in + Format.eprintf "LOAD FILE: %s@." filename ; let* lru_node = add_lru files last_actions lru filename in let* fd = Lwt_unix.openfile filename [O_RDWR; O_CLOEXEC] 0o660 in (* TODO: https://gitlab.com/tezos/tezos/-/issues/6033 @@ -720,6 +758,7 @@ end = struct a file that does not exist yet. *) let initialize_file files last_actions lru layout = let open Lwt_syntax in + Format.eprintf "INITIALIZE FILE: %s@." layout.filepath ; let* lru_node = add_lru files last_actions lru layout.filepath in let* fd = Lwt_unix.openfile @@ -780,7 +819,16 @@ end = struct in let p = Action.read ~on_file_closed files last_actions layout key in Table.replace last_actions layout.filepath (Read p) ; - let+ _file, value = p in + let+ is_opened, value = p in + (match is_opened with + | None -> ( + match Table.find_opt last_actions layout.filepath with + | Some (Read p) -> ( + match Lwt.state p with + | Lwt.Return _ -> Table.remove last_actions layout.filepath + | _ -> ()) + | _ -> ()) + | _ -> ()) ; value (* Very similar to [read] action except we only look at the bitset @@ -818,7 +866,9 @@ end = struct let on_file_closed ~on_file_opened = let* r = may_load_file files last_actions lru layout.filepath in match r with - | None -> return (None, Ok 0) + | None -> + (* Table.remove files layout.filepath ; *) + return (None, Ok 0) | Some opened_file_promise -> Table.replace files layout.filepath (Opening opened_file_promise) ; let* opened_file = opened_file_promise in @@ -826,7 +876,16 @@ end = struct in let p = Action.count_values ~on_file_closed files last_actions layout in Table.replace last_actions layout.filepath (Count_values p) ; - let+ _, count = p in + let+ is_opened, count = p in + (match is_opened with + | None -> ( + match Table.find_opt last_actions layout.filepath with + | Some (Count_values p) -> ( + match Lwt.state p with + | Lwt.Return _ -> Table.remove last_actions layout.filepath + | _ -> ()) + | _ -> ()) + | _ -> ()) ; count let write ?(override = false) {files; last_actions; lru; closed} layout key @@ -862,7 +921,10 @@ end = struct if !closed then Lwt.return (Error (Error_monad.TzTrace.make (Closed {action = "remove"}))) else - let on_file_closed ~on_file_opened:_ = may_remove_file layout.filepath in + let on_file_closed ~on_file_opened:_ = + let+ () = may_remove_file layout.filepath in + Table.remove files layout.filepath + in let p = Action.remove_file ~on_file_closed @@ -876,7 +938,7 @@ end = struct let* () = p in (* See [close_file] for an explanation of the lines below. *) (match Table.find_opt last_actions layout.filepath with - | Some (Close p) -> ( + | Some (Close p) | Some (Remove p) -> ( match Lwt.state p with | Lwt.Return _ -> Table.remove last_actions layout.filepath | _ -> ()) @@ -1076,6 +1138,8 @@ let remove_file {files; root_dir; _} file_layout file = module View = struct let opened_files {files; _} = Files.View.opened_files files + + let on_going_actions {files; _} = Files.View.on_going_actions files end module Internal_for_tests = struct diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index 60b8bf3c5c59..68851e170196 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -214,6 +214,8 @@ module View : sig file descriptors opened. *) val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int end module Internal_for_tests : sig diff --git a/src/lib_stdlib_unix/test/dune b/src/lib_stdlib_unix/test/dune index 274016a3cece..22cd78905522 100644 --- a/src/lib_stdlib_unix/test/dune +++ b/src/lib_stdlib_unix/test/dune @@ -11,7 +11,9 @@ octez-libs.event-logging octez-libs.test-helpers qcheck-alcotest - octez-alcotezt) + octez-alcotezt + tezt-bam) + (preprocess (pps bam-ppx)) (library_flags (:standard -linkall)) (flags (:standard) @@ -26,6 +28,7 @@ (modules test_key_value_store test_key_value_store_fuzzy + test_key_value_store_fuzzy_bam test_log_config_rules)) (executable diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml index 56c56d79aff0..b8598f2e7279 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy.ml @@ -99,6 +99,8 @@ module type S = sig module View : sig val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int end end @@ -151,6 +153,8 @@ module R : S = struct let opened_files table = table |> Stdlib.Hashtbl.to_seq_keys |> Seq.map fst |> List.of_seq |> List.sort_uniq compare |> List.length + + let on_going_actions = opened_files end end @@ -430,6 +434,7 @@ let run_scenario |> Filename.concat "tezos-pbt-tests" |> Filename.concat tmp_dir in + Unix.system @@ Format.asprintf "rm -rf %s" root_dir |> ignore ; let file_layout ~root_dir file = let filepath = Filename.concat root_dir file in Key_value_store.layout @@ -569,7 +574,29 @@ let run_scenario promises_running_seq in if L.View.opened_files left = min lru_size (R.View.opened_files right) - then run_actions action next_actions Seq_s.empty + then + if + L.View.on_going_actions left + <= min lru_size (R.View.on_going_actions right) + + L.View.opened_files left + then run_actions action next_actions Seq_s.empty + else ( + Format.eprintf + "################ Expected on going actions table to be %d. Got \ + %d (lru size is: %d, of: %d, remaining: %d).@.%a@." + (R.View.on_going_actions right) + (L.View.on_going_actions left) + lru_size + (L.View.opened_files left) + (List.length next_actions + 1) + pp_scenario + scenario ; + failwith + "Expected on going actions table to be %d. Got %d (lru size is: \ + %d)." + (R.View.on_going_actions right) + (L.View.on_going_actions left) + lru_size) else failwith "Expected size of files table to be %d. Got %d (lru size is: %d)." @@ -611,6 +638,7 @@ let sequential_test = ~retries:1 test_gen (fun (parameters, scenario) -> + Format.eprintf "@@@@@@@@.%a@." pp_scenario scenario ; let promise = let* _ = run_scenario parameters scenario in return_true @@ -633,8 +661,8 @@ let parallel_test = ~print ~name:"key-value store concurrent writes/reads" ~count:10_000 - ~max_fail:1 (* to stop shrinking after [max_fail] failures. *) - ~retries:1 + ~max_fail:(-1) (* to stop shrinking after [max_fail] failures. *) + ~retries:0 test_gen (fun (parameters, scenario) -> let promise = diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml new file mode 100644 index 000000000000..20887b2de7cd --- /dev/null +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml @@ -0,0 +1,610 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs, *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: Key-value store + Invocation: dune exec src/lib_stdlib_unix/test/main.exe \ + -- --file test_key_value_store_fuzzy.ml + Subject: Test the key-value store +*) + +open Error_monad + +(* This test file checks the correctness of the key-value store + (module [L]) with respect to the interface [S] using a reference + implementation (see module [R]) which is obviously correct. + + The main property tested is that the implementation agrees with the + reference implementation in a sequential and concurrent + setting. Because the reference implementation does not do I/Os, the + property means that the key-value store is consistent with the + order of the operations (ex: If for a given key, we write the value + 1, and then the value 2, any subsequent read will return the value + 2 for this key). Note that this property is not trivial if the + writes are processed concurrently and actually is false if the + function [write_values] is used (this is a reason why we do not + expose it in the interface of [S]). + + This property is tested on random scenarios, where a scenario is roughly a + list of actions and two consecutive actions can be bound either sequentially + or in parallel. + + We check that both implementations return the same results on the generated + scenarios. *) + +module type S = sig + type ('file, 'key, 'value) t + + val init : + lru_size:int -> root_dir:string -> ('file, 'key, 'value) t tzresult Lwt.t + + val close : ('file, 'key, 'value) t -> unit tzresult Lwt.t + + val write_value : + ?override:bool -> + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + 'key -> + 'value -> + unit tzresult Lwt.t + + val read_value : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + 'key -> + 'value tzresult Lwt.t + + val read_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + ('file * 'key) Seq.t -> + ('file * 'key * 'value tzresult) Seq_s.t + + val remove_file : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + unit tzresult Lwt.t + + val count_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) Key_value_store.file_layout -> + 'file -> + int tzresult Lwt.t + + module View : sig + val opened_files : ('file, 'key, 'value) t -> int + + val on_going_actions : ('file, 'key, 'value) t -> int + end +end + +module L : S = Key_value_store + +module R : S = struct + type ('file, 'key, 'value) t = ('file * 'key, 'value) Stdlib.Hashtbl.t + + let init ~lru_size:_ ~root_dir:_ = Lwt.return_ok @@ Stdlib.Hashtbl.create 100 + + let close _ = Lwt_syntax.return_ok_unit + + let write_value ?(override = false) t _file_layout file key value = + let open Lwt_result_syntax in + if override || not (Stdlib.Hashtbl.mem t (file, key)) then ( + Stdlib.Hashtbl.replace t (file, key) value ; + return_unit) + else return_unit + + let read_value t _file_layout file key = + let key = (file, key) in + let open Lwt_result_syntax in + match Stdlib.Hashtbl.find_opt t key with + | None -> failwith "key not found" + | Some key -> return key + + let read_values t file_layout seq = + let open Lwt_syntax in + seq |> Seq_s.of_seq + |> Seq_s.S.map (fun (file, key) -> + let* value = read_value t file_layout file key in + Lwt.return (file, key, value)) + + let remove_file t _file_layout file = + Stdlib.Hashtbl.filter_map_inplace + (fun (file', _) value -> if file = file' then None else Some value) + t ; + Lwt.return (Ok ()) + + let count_values t _file_layout file = + Lwt_result_syntax.return + @@ Stdlib.Hashtbl.fold + (fun (file', _) _ count -> if file = file' then count + 1 else count) + t + 0 + + module View = struct + let opened_files table = + table |> Stdlib.Hashtbl.to_seq_keys |> Seq.map fst |> List.of_seq + |> List.sort_uniq compare |> List.length + + let on_going_actions = opened_files + end +end + +module Helpers = struct + open Bam.Std + open Bam.Std.Syntax + + let number_of_files_max = 3 + + let key_max = 4 + + let value_size = 1 + + let make_file id = Printf.sprintf "file_%d" id + + type filename = string + + let gen_filename = + let* n = int ~min:0 ~max:(number_of_files_max - 1) () in + return (make_file n) + + type key = filename * int + + let gen_key = + let* filename = gen_filename in + let* key = int ~min:0 ~max:(key_max - 1) () in + return (filename, key) + + type value = String.t + + let gen_value = string ~size:(int ~min:1 ~max:1 ()) () + + type write_payload = {key : key; override : bool; default : bool} + [@@deriving gen] + + let pp_write_payload fmt {key = file, key; override; default} = + Format.fprintf + fmt + "[key=%s/%d, override=%b, default=%b]" + file + key + override + default + + type action = + | Count_values of filename + | Read_value of key + | Write_value of write_payload + | Remove_file of filename + | Read_values of key list [@max 5] + [@@deriving gen] + + let pp_action fmt = function + | Write_value payload -> Format.fprintf fmt "W%a" pp_write_payload payload + | Read_value (file, key) -> Format.fprintf fmt "R[key=%s/%d]" file key + | Read_values keys -> + let str_keys = + String.concat + "; " + (keys + |> List.map (fun (file, key) -> Printf.sprintf "key=%s/%d" file key) + ) + in + Format.fprintf fmt "R[%s]" str_keys + | Remove_file file -> Format.fprintf fmt "REMOVE[file=%s]" file + | Count_values file -> Format.fprintf fmt "COUNT[file=%s]" file + + type bind = Sequential | Parallel [@@deriving gen] + + let values_gen = + let open Bam.Std in + let open Bam.Std.Syntax in + let key_gen = int ~min:0 ~max:(key_max - 1) () in + let value_gen = string ~size:(int ~min:0 ~max:1 ()) () in + let* bindings = + list + ~size:(int ~min:0 ~max:4 ()) + (pair (pair gen_filename key_gen) value_gen) + in + return (bindings |> List.to_seq |> Stdlib.Hashtbl.of_seq) + + type parameters = { + number_of_files : int; + number_of_keys_per_file : int; + read_values_seq_size : int; + lru_size : int; + value_size : int; (* in bytes *) + values : (key, value) Stdlib.Hashtbl.t; + overwritten : (key, value) Stdlib.Hashtbl.t; + } + + let gen_parameters = + let* number_of_files = + int ~min:number_of_files_max ~max:number_of_files_max () + in + let* number_of_keys_per_file = int ~min:key_max ~max:key_max () in + let* read_values_seq_size = int ~min:1 ~max:key_max () in + let lru_size = max 0 (number_of_files - 2) in + let all_keys = + Stdlib.List.init number_of_files (fun file -> + Stdlib.List.init number_of_keys_per_file (fun key -> + (make_file file, key))) + |> List.flatten + in + let values_gen = + let* values = + list + ~size:(int ~min:(List.length all_keys) ~max:(List.length all_keys) ()) + gen_value + in + let bindings = Stdlib.List.combine all_keys values in + return (bindings |> List.to_seq |> Stdlib.Hashtbl.of_seq) + in + let* values = values_gen in + let* overwritten = values_gen in + return + { + number_of_files; + number_of_keys_per_file; + read_values_seq_size; + lru_size; + value_size; + values; + overwritten; + } + + let keys files_max keys_max = + Stdlib.List.init (files_max - 1) (fun file -> + Stdlib.List.init (keys_max - 1) (fun key -> (make_file file, key))) + |> List.flatten |> Array.of_list + + let pp_parameters fmt + { + number_of_files; + number_of_keys_per_file; + read_values_seq_size; + lru_size; + value_size; + values; + overwritten; + } = + let string_of_values values = + values |> Stdlib.Hashtbl.to_seq |> List.of_seq + |> List.map (fun ((file, key), value) -> + Format.asprintf "[key=%s/%d,value=%s]" file key value) + |> String.concat " " + in + Format.fprintf fmt "number of files = %d@." number_of_files ; + Format.fprintf fmt "number of keys per file = %d@." number_of_keys_per_file ; + Format.fprintf fmt "sequence length for reads = %d@." read_values_seq_size ; + Format.fprintf fmt "lru size = %d@." lru_size ; + Format.fprintf fmt "value size = %d (in bytes)@." value_size ; + Format.fprintf fmt "default values = %s@." (string_of_values values) ; + Format.fprintf fmt "override values = %s@." (string_of_values overwritten) + + (* A scenario is a list of actions. The bind elements tells whether + the next bind waits for the previous promises running or is done + in parallel. This datatype does not allow to bind sequentially a + group of parallel actions though. *) + type scenario = action * ((bind * action) list[@size.max 5]) [@@deriving gen] + + let pp_scenario fmt (action, next_actions) = + let rec pp fmt next_actions = + match next_actions with + | [] -> () + | (Parallel, next_action) :: actions -> + Format.fprintf fmt "P %a@.%a@." pp_action next_action pp actions + | (Sequential, next_action) :: actions -> + Format.fprintf fmt "S %a@.%a@." pp_action next_action pp actions + in + Format.fprintf fmt "%a@." pp_action action ; + Format.fprintf fmt "%a" pp next_actions +end + +include Helpers + +let run_scenario + {lru_size; values; overwritten; number_of_files; number_of_keys_per_file; _} + scenario = + let open Lwt_result_syntax in + let pid = Unix.getpid () in + let tmp_dir = Filename.get_temp_dir_name () in + (* To avoid any conflict with previous runs of this test. *) + let root_dir = + Format.asprintf "key-value-store-test-key-%d" pid + |> Filename.concat "tezos-pbt-tests" + |> Filename.concat tmp_dir + in + Unix.system @@ Format.asprintf "rm -rf %s" root_dir |> ignore ; + let file_layout ~root_dir file = + let filepath = Filename.concat root_dir file in + Key_value_store.layout + ~encoding:(Data_encoding.Fixed.bytes value_size) + ~filepath + ~eq:( = ) + ~index_of:Fun.id + ~number_of_keys_per_file:4096 + () + in + let* left = L.init ~lru_size ~root_dir in + let* right = R.init ~lru_size ~root_dir in + let action, next_actions = scenario in + let n = ref 0 in + let compare_tzresult finalization pp_while pp_val left_result right_result = + let pp_result fmt = function + | Ok v -> pp_val fmt v + | Error err -> Error_monad.pp_print_trace fmt err + in + let fail () = + failwith + "%s Unexpected different value while %a.@.For run %d at %s:@.Expected: \ + %a@.Got: %a@." + finalization + pp_while + () + !n + root_dir + pp_result + right_result + pp_result + left_result + in + match (left_result, right_result) with + | Ok left_value, Ok right_value -> + if left_value = right_value then return_unit else fail () + | Error _, Error _ -> return_unit + | Ok _, Error _ | Error _, Ok _ -> fail () + in + let compare_result ~finalization (file, key) left_result right_result = + let finalization = if finalization then "(finalization) " else "" in + compare_tzresult + finalization + (fun fmt () -> Format.fprintf fmt "reading key %s/%d" file key) + (fun fmt b -> Format.fprintf fmt "%s" (Bytes.to_string b)) + left_result + right_result + in + let rec run_actions action next_actions promises_running_seq = + incr n ; + Format.eprintf "RUN: %a@." pp_action action ; + let value_of_key ~default file key = + let key = (file, key) in + let table = if default then values else overwritten in + Stdlib.Hashtbl.find table key + in + let promise = + match action with + | Write_value {override; default; key = file, key} -> + let value = value_of_key ~default file key |> Bytes.of_string in + let left_promise = + let* r = L.write_value ~override left file_layout file key value in + Format.eprintf "AFTER: %a@." pp_action action ; + return r + in + let right_promise = + R.write_value ~override right file_layout file key value + in + tzjoin [left_promise; right_promise] + | Read_value (file, key) -> + let left_promise = L.read_value left file_layout file key in + let right_promise = R.read_value right file_layout file key in + let*! left_result = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + let*! right_result = right_promise in + compare_result + ~finalization:false + (file, key) + left_result + right_result + | Read_values seq -> + let left_promise = + let seq_s = L.read_values left file_layout (List.to_seq seq) in + Seq_s.E.iter (fun _ -> Ok ()) seq_s + in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = + let seq_s = R.read_values right file_layout (List.to_seq seq) in + Seq_s.E.iter (fun _ -> Ok ()) seq_s + in + tzjoin [left_promise; right_promise] + | Remove_file file -> + let left_promise = L.remove_file left file_layout file in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = R.remove_file right file_layout file in + + tzjoin [left_promise; right_promise] + | Count_values file -> + let left_promise = L.count_values left file_layout file in + let left_promise = + let* promise = left_promise in + Format.eprintf "AFTER: %a@." pp_action action ; + return promise + in + let right_promise = R.count_values right file_layout file in + let*! left_result = left_promise in + let*! right_result = right_promise in + compare_tzresult + "" + (fun fmt () -> Format.fprintf fmt "counting values in file %s" file) + (fun fmt -> Format.fprintf fmt "%d") + left_result + right_result + in + let finalize () = + let* left = L.init ~lru_size:number_of_files ~root_dir in + let* () = + Seq.ES.iter + (fun (file, key) -> + let*! left_result = L.read_value left file_layout file key in + let*! right_result = R.read_value right file_layout file key in + compare_result + ~finalization:true + (file, key) + left_result + right_result) + (keys number_of_files number_of_keys_per_file |> Array.to_seq) + in + L.close left |> Lwt.map Result.ok + in + match next_actions with + | [] -> + Format.eprintf "LAST@." ; + let* () = promise in + let* () = + Seq_s.ES.iter + (function Ok () -> return_unit | Error err -> fail err) + promises_running_seq + in + let* _ = finalize () in + return (left, right) + | (Sequential, action) :: next_actions -> + Format.eprintf "WAIT S@." ; + let* () = promise in + let* () = + Seq_s.ES.iter + (function Ok () -> return_unit | Error err -> fail err) + promises_running_seq + in + if L.View.opened_files left <= lru_size then + if L.View.on_going_actions left <= lru_size then + run_actions action next_actions Seq_s.empty + else + failwith + "Expected size of on going action table to be at most %d. Got %d \ + (remaining actions: %d)." + lru_size + (L.View.on_going_actions left) + (List.length next_actions + 1) + else + failwith + "Expected size of files table to be at most %d. Got %d (remaining \ + actions: %d)." + lru_size + (L.View.opened_files left) + (List.length next_actions + 1) + | (Parallel, action) :: next_actions -> + Format.eprintf "WAIT P@." ; + (* We do not wait for promises to end and append them to the + list of promises on-going. *) + let promises_running_seq = Seq_s.cons_s promise promises_running_seq in + run_actions action next_actions promises_running_seq + in + let*! result = + Lwt.pick + [ + (let*! () = Lwt_unix.sleep 2. in + Lwt.return_error [Error_monad.Timeout]); + run_actions action next_actions Seq_s.empty; + ] + in + let*! _ = L.close left in + return result + +let pp fmt (scenario, parameters) = + Format.fprintf + fmt + "@.Parameters:@.%a@.@.Scenario:@.%a@.@." + pp_parameters + parameters + pp_scenario + scenario + +module SS = Set.Make (struct + type t = scenario * parameters + + let compare = compare +end) + +let set = ref SS.empty + +module SI = Set.Make (struct + type t = int + + let compare = compare +end) + +let count = ref SI.empty + +let printed = ref false + +let sequential_test = + let open Tezt_bam in + let property (scenario, parameters) = + let promise = run_scenario parameters scenario in + match Lwt_main.run promise with + | Ok _ -> Ok () + | Error err -> + Error (`Fail (Format.asprintf "%a" Error_monad.pp_print_trace err)) + in + Pbt.register + ~pp + ~expected_sampling_ratio:(-1.) + ~minimum_number_of_samples:0 + ~stop_after:(`Count 2_000) + ~title:"key-value store sequential writes/reads" + ~__FILE__ + ~tags:["kvs"; "sequential"] + ~on_sample:(fun sample -> Format.eprintf "SAMPLE@.%a@." pp sample) + ~gen:(Bam.Std.pair gen_scenario gen_parameters) + ~property + () + +(* let parallel_test = *) +(* let open Lwt_result_syntax in *) +(* let open QCheck2 in *) +(* let test_gen = *) +(* Gen.bind parameters_gen (fun parameters -> *) +(* Gen.map *) +(* (fun scenario -> (parameters, scenario)) *) +(* (scenario_gen Concurrency parameters)) *) +(* in *) +(* Test.make *) +(* ~print *) +(* ~name:"key-value store concurrent writes/reads" *) +(* ~count:10_000 *) +(* ~max_fail:(-1) (\* to stop shrinking after [max_fail] failures. *\) *) +(* ~retries:0 *) +(* test_gen *) +(* (fun (parameters, scenario) -> *) +(* let promise = *) +(* let* _ = run_scenario parameters scenario in *) +(* return_true *) +(* in *) +(* match Lwt_main.run promise with *) +(* | Ok _ -> true *) +(* | Error err -> *) +(* QCheck2.Test.fail_reportf "%a@." Error_monad.pp_print_trace err) *) diff --git a/tezt/tests/cloud/tezos.ml b/tezt/tests/cloud/tezos.ml index ff1e4cca368b..1eed2e971b5f 100644 --- a/tezt/tests/cloud/tezos.ml +++ b/tezt/tests/cloud/tezos.ml @@ -74,8 +74,8 @@ module Dal_node = struct Format.asprintf "%s/%s-trace.ctf" (Filename.get_temp_dir_name ()) name in let env = - if memtrace then None - else Some (String_map.singleton "MEMTRACE" filename) + if memtrace then Some (String_map.singleton "MEMTRACE" filename) + else None in run ?env ?event_level dal_node end -- GitLab From 5c16a348cd20d1ca76cf200c3b5bd6ec82511827 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Mon, 30 Sep 2024 17:30:56 +0200 Subject: [PATCH 6/9] [More stuff] --- src/lib_stdlib_unix/key_value_store.ml | 14 ++--- .../test/test_key_value_store_fuzzy_bam.ml | 53 ++++++------------- 2 files changed, 22 insertions(+), 45 deletions(-) diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index df9841851844..98b9784ba2fb 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -693,11 +693,6 @@ end = struct if !closed then return_unit else ( closed := true ; - let* () = - Table.iter_s - (fun filename _ -> close_file files last_actions filename) - files - in Table.iter (fun _ value -> match value with @@ -708,6 +703,11 @@ end = struct | Count_values p -> Lwt.cancel p | Remove p -> Lwt.cancel p) last_actions ; + let* () = + Table.iter_s + (fun filename _ -> close_file files last_actions filename) + files + in LRU.clear lru ; return_unit) @@ -723,9 +723,9 @@ end = struct is bounded by the size of the LRU. This is why we wait first for the eviction promise to be fulfilled that will close the file evicted. *) - LRU.remove lru lru_node ; + (* LRU.remove lru lru_node ; *) let* () = close_file files last_actions filepath in - let lru_node = LRU.add lru filename in + (* let lru_node = LRU.add lru filename in *) Format.eprintf "CLOSED FILE: %s@." filepath ; return lru_node diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml index 20887b2de7cd..3bfbf604186f 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml @@ -355,7 +355,6 @@ let run_scenario let* left = L.init ~lru_size ~root_dir in let* right = R.init ~lru_size ~root_dir in let action, next_actions = scenario in - let n = ref 0 in let compare_tzresult finalization pp_while pp_val left_result right_result = let pp_result fmt = function | Ok v -> pp_val fmt v @@ -363,12 +362,11 @@ let run_scenario in let fail () = failwith - "%s Unexpected different value while %a.@.For run %d at %s:@.Expected: \ - %a@.Got: %a@." + "%s Unexpected different value while %a.@. At %s:@.Expected: %a@.Got: \ + %a@." finalization pp_while () - !n root_dir pp_result right_result @@ -391,7 +389,6 @@ let run_scenario right_result in let rec run_actions action next_actions promises_running_seq = - incr n ; Format.eprintf "RUN: %a@." pp_action action ; let value_of_key ~default file key = let key = (file, key) in @@ -523,15 +520,20 @@ let run_scenario let promises_running_seq = Seq_s.cons_s promise promises_running_seq in run_actions action next_actions promises_running_seq in - let*! result = - Lwt.pick - [ - (let*! () = Lwt_unix.sleep 2. in - Lwt.return_error [Error_monad.Timeout]); - run_actions action next_actions Seq_s.empty; - ] + let timeout = + let*! () = Lwt_unix.sleep 2. in + Format.eprintf "############################YOUPI@." ; + Lwt.return (Error [Error_monad.Timeout]) in + let main_promise = run_actions action next_actions Seq_s.empty in + let*! result = Lwt.choose [timeout; main_promise] in + Format.eprintf "############################YES@." ; + (match Lwt.state main_promise with + | Lwt.Return _ -> Lwt.cancel timeout + | _ -> ()) ; + Format.eprintf "############################HEY@." ; let*! _ = L.close left in + Format.eprintf "############################HO@." ; return result let pp fmt (scenario, parameters) = @@ -568,6 +570,7 @@ let sequential_test = match Lwt_main.run promise with | Ok _ -> Ok () | Error err -> + Format.eprintf "#######################KO@." ; Error (`Fail (Format.asprintf "%a" Error_monad.pp_print_trace err)) in Pbt.register @@ -582,29 +585,3 @@ let sequential_test = ~gen:(Bam.Std.pair gen_scenario gen_parameters) ~property () - -(* let parallel_test = *) -(* let open Lwt_result_syntax in *) -(* let open QCheck2 in *) -(* let test_gen = *) -(* Gen.bind parameters_gen (fun parameters -> *) -(* Gen.map *) -(* (fun scenario -> (parameters, scenario)) *) -(* (scenario_gen Concurrency parameters)) *) -(* in *) -(* Test.make *) -(* ~print *) -(* ~name:"key-value store concurrent writes/reads" *) -(* ~count:10_000 *) -(* ~max_fail:(-1) (\* to stop shrinking after [max_fail] failures. *\) *) -(* ~retries:0 *) -(* test_gen *) -(* (fun (parameters, scenario) -> *) -(* let promise = *) -(* let* _ = run_scenario parameters scenario in *) -(* return_true *) -(* in *) -(* match Lwt_main.run promise with *) -(* | Ok _ -> true *) -(* | Error err -> *) -(* QCheck2.Test.fail_reportf "%a@." Error_monad.pp_print_trace err) *) -- GitLab From ddfa8cf6dba091590f9522289ea3d538680d00f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Tue, 1 Oct 2024 16:14:35 +0200 Subject: [PATCH 7/9] [And more] --- src/bin_dal_node/dal_metrics.ml | 18 +++++++ src/bin_dal_node/dal_metrics.mli | 2 + src/bin_dal_node/store.ml | 7 ++- src/lib_stdlib_unix/key_value_store.ml | 24 +++++++-- src/lib_stdlib_unix/key_value_store.mli | 2 + .../test/test_key_value_store_fuzzy_bam.ml | 51 +++++++++---------- 6 files changed, 73 insertions(+), 31 deletions(-) diff --git a/src/bin_dal_node/dal_metrics.ml b/src/bin_dal_node/dal_metrics.ml index ce3d8b2d94a2..3ef5cbf32b6a 100644 --- a/src/bin_dal_node/dal_metrics.ml +++ b/src/bin_dal_node/dal_metrics.ml @@ -148,6 +148,10 @@ module GS = struct module Stats = struct let gs_stats = ref (Gossipsub.Worker.Introspection.empty_stats ()) + let count_kvs_table_file = ref 0 + + let count_kvs_table_action = ref 0 + let input_events_stream_length = ref 0 let p2p_output_stream_length = ref 0 @@ -368,6 +372,14 @@ module GS = struct ~label_names:["peer"] (fun () -> !Stats.scores_of_peers) + let count_kvs_table_file = + metric ~name:"count_kvs_table_file" ~help:"todo" (fun () -> + !Stats.count_kvs_table_file |> float_of_int) + + let count_kvs_table_action = + metric ~name:"count_kvs_table_action" ~help:"todo" (fun () -> + !Stats.count_kvs_table_action |> float_of_int) + let metrics = [ (* Metrics about the stats gathered by the worker *) @@ -394,6 +406,8 @@ module GS = struct (* Other metrics about GS automaton's state *) count_peers_per_topic; scores_of_peers; + count_kvs_table_file; + count_kvs_table_action; ] let () = List.iter add_metric metrics @@ -439,6 +453,10 @@ let layer1_block_finalized_round ~block_round = let update_shards_verification_time f = Prometheus.DefaultHistogram.observe Node_metrics.verify_shard_time f +let update_kvs_table ~file ~action = + GS.Stats.count_kvs_table_file := file ; + GS.Stats.count_kvs_table_action := action + let sample_time ~sampling_frequency ~to_sample ~metric_updater = if sampling_frequency > 0 && Random.int sampling_frequency <> 0 then to_sample () diff --git a/src/bin_dal_node/dal_metrics.mli b/src/bin_dal_node/dal_metrics.mli index 89caa2f254ed..5005f68975fe 100644 --- a/src/bin_dal_node/dal_metrics.mli +++ b/src/bin_dal_node/dal_metrics.mli @@ -55,3 +55,5 @@ val sample_time : (** [collect_gossipsub_metrics gs_worker] allows to periodically collect metrics from the given GS Worker state. *) val collect_gossipsub_metrics : Gossipsub.Worker.t -> unit + +val update_kvs_table : file:int -> action:int -> unit diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index bacb4e55f23a..80958edcb5f3 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -212,7 +212,12 @@ module Shards = struct let count_values store slot_id = KVS.count_values store file_layout slot_id - let remove store slot_id = KVS.remove_file store file_layout slot_id + let remove store slot_id = + KVS.clean_up store ; + let action = KVS.View.on_going_actions store in + let file = KVS.View.opened_files store in + Dal_metrics.update_kvs_table ~file ~action ; + KVS.remove_file store file_layout slot_id let init node_store_dir shard_store_dir = let root_dir = Filename.concat node_store_dir shard_store_dir in diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index 98b9784ba2fb..c39efbff2cbf 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -228,6 +228,8 @@ module Files : sig val remove : 'value t -> ('key, 'value) layout -> unit tzresult Lwt.t + val clean_up : 'value t -> unit + module View : sig val opened_files : 'value t -> int @@ -723,9 +725,9 @@ end = struct is bounded by the size of the LRU. This is why we wait first for the eviction promise to be fulfilled that will close the file evicted. *) - (* LRU.remove lru lru_node ; *) + LRU.remove lru lru_node ; let* () = close_file files last_actions filepath in - (* let lru_node = LRU.add lru filename in *) + let lru_node = LRU.add lru filename in Format.eprintf "CLOSED FILE: %s@." filepath ; return lru_node @@ -916,14 +918,26 @@ end = struct let+ _file, result = p in result + let clean_up {files; _} = + Table.filter_map_inplace + (fun _ value -> + match value with + | Closing p -> ( + match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) + | _ -> Some value) + files + let remove {files; last_actions; lru; closed} layout = let open Lwt_syntax in if !closed then Lwt.return (Error (Error_monad.TzTrace.make (Closed {action = "remove"}))) else let on_file_closed ~on_file_opened:_ = - let+ () = may_remove_file layout.filepath in - Table.remove files layout.filepath + let* () = may_remove_file layout.filepath in + (* Let's give a chance to yield so that. *) + let* () = Lwt.pause () in + Table.remove files layout.filepath ; + Lwt.return_unit in let p = Action.remove_file @@ -1136,6 +1150,8 @@ let remove_file {files; root_dir; _} file_layout file = let layout = file_layout ~root_dir file in Files.remove files layout +let clean_up {files; _} = Files.clean_up files + module View = struct let opened_files {files; _} = Files.View.opened_files files diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index 68851e170196..d1851e3a9cb4 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -208,6 +208,8 @@ val count_values : 'file -> int tzresult Lwt.t +val clean_up : ('file, 'key, 'value) t -> unit + module View : sig (** Returns the number of files current opened by the key value store. Do note this number is an upper bound on the number of diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml index 3bfbf604186f..919652a8c88a 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml @@ -202,7 +202,7 @@ module Helpers = struct | Read_value of key | Write_value of write_payload | Remove_file of filename - | Read_values of key list [@max 5] + | Read_values of key list [@max 10] [@@deriving gen] let pp_action fmt = function @@ -312,7 +312,17 @@ module Helpers = struct the next bind waits for the previous promises running or is done in parallel. This datatype does not allow to bind sequentially a group of parallel actions though. *) - type scenario = action * ((bind * action) list[@size.max 5]) [@@deriving gen] + type scenario = action * (bind * action) list + + let gen_scenario = + let open Bam.Std in + let bind_list = + list + ~shrinker:Shrinker.Prefix + ~size:(int ~min:1 ~max:50 ()) + (pair gen_bind gen_action) + in + pair gen_action bind_list let pp_scenario fmt (action, next_actions) = let rec pp fmt next_actions = @@ -497,22 +507,23 @@ let run_scenario promises_running_seq in if L.View.opened_files left <= lru_size then - if L.View.on_going_actions left <= lru_size then - run_actions action next_actions Seq_s.empty - else - failwith - "Expected size of on going action table to be at most %d. Got %d \ - (remaining actions: %d)." - lru_size - (L.View.on_going_actions left) - (List.length next_actions + 1) - else + (* if L.View.on_going_actions left <= lru_size then *) + run_actions action next_actions Seq_s.empty + (* else *) + (* failwith *) + (* "Expected size of on going action table to be at most %d. Got %d \ *) + (* (remaining actions: %d)." *) + (* lru_size *) + (* (L.View.on_going_actions left) *) + (* (List.length next_actions + 1) *) + else ( + Format.eprintf "HERE@." ; failwith "Expected size of files table to be at most %d. Got %d (remaining \ actions: %d)." lru_size (L.View.opened_files left) - (List.length next_actions + 1) + (List.length next_actions + 1)) | (Parallel, action) :: next_actions -> Format.eprintf "WAIT P@." ; (* We do not wait for promises to end and append them to the @@ -520,20 +531,8 @@ let run_scenario let promises_running_seq = Seq_s.cons_s promise promises_running_seq in run_actions action next_actions promises_running_seq in - let timeout = - let*! () = Lwt_unix.sleep 2. in - Format.eprintf "############################YOUPI@." ; - Lwt.return (Error [Error_monad.Timeout]) - in - let main_promise = run_actions action next_actions Seq_s.empty in - let*! result = Lwt.choose [timeout; main_promise] in - Format.eprintf "############################YES@." ; - (match Lwt.state main_promise with - | Lwt.Return _ -> Lwt.cancel timeout - | _ -> ()) ; - Format.eprintf "############################HEY@." ; + let* result = run_actions action next_actions Seq_s.empty in let*! _ = L.close left in - Format.eprintf "############################HO@." ; return result let pp fmt (scenario, parameters) = -- GitLab From 5ae2a0627b85ecd89d19527b12d1b6b82711e98b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Tue, 1 Oct 2024 20:46:01 +0200 Subject: [PATCH 8/9] [Finally] --- src/bin_dal_node/store.ml | 1 - src/lib_stdlib_unix/key_value_store.ml | 104 ++++++++++-------- src/lib_stdlib_unix/key_value_store.mli | 2 - .../test/test_key_value_store_fuzzy_bam.ml | 57 +++++++--- 4 files changed, 104 insertions(+), 60 deletions(-) diff --git a/src/bin_dal_node/store.ml b/src/bin_dal_node/store.ml index 80958edcb5f3..b2e8912cff76 100644 --- a/src/bin_dal_node/store.ml +++ b/src/bin_dal_node/store.ml @@ -213,7 +213,6 @@ module Shards = struct let count_values store slot_id = KVS.count_values store file_layout slot_id let remove store slot_id = - KVS.clean_up store ; let action = KVS.View.on_going_actions store in let file = KVS.View.opened_files store in Dal_metrics.update_kvs_table ~file ~action ; diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index c39efbff2cbf..ece0acf4f058 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -228,8 +228,6 @@ module Files : sig val remove : 'value t -> ('key, 'value) layout -> unit tzresult Lwt.t - val clean_up : 'value t -> unit - module View : sig val opened_files : 'value t -> int @@ -669,6 +667,9 @@ end = struct Table.replace files filepath (Closing p) ; Table.replace last_actions filepath (Close p) ; let* () = p in + (* It is not clear why we can't call the [clean_up] function + below. Instead, it is important to just remove this filepath. + It would be nice to exploring this further. *) Table.remove files filepath ; (* To avoid any memory leaks, we woud like to remove the corresponding entry from the [last_actions] table. However, while @@ -798,6 +799,56 @@ end = struct if b then load_file files last_actions lru layout.filepath else initialize_file files last_actions lru layout + (* Tables must be cleaned up when promises are fulfilled. While in + theory it should be doable on each action, this seems difficult + to do it properly. Hence, the current solution. This is not + adequate if the `lru` size gets bigger because at each action we + will go through all the table. + + It should be possible to optimise this function further and to + modify only the files modified when an action is performed. + + At the moment this comment was written, I was not able to find a + proper way to do that. + + Moreover, as mentioned in the [close] function, there is + something weird: this function cannot be called in the [close] + function. + *) + let clean_up files last_actions = + Table.filter_map_inplace + (fun _ value -> + match value with + | Closing p -> ( + (* When a closing promise is fulfilled, it can safely be removed. *) + match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) + | Opening _ -> + (* This does not have to be cleaned-up because actions may + use this to know whether it needs to load the file or + not. Hence, the number of files in this state is bounded + by the size of the table `last_actions`. *) + Some value) + files ; + Table.filter_map_inplace + (fun _ value -> + match value with + | Remove p | Close p -> ( + match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) + | Read p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Count_values p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Value_exists p -> ( + match Lwt.state p with + | Lwt.Return (None, _) -> None + | _ -> Some value) + | Write _ -> Some value) + last_actions + let read {files; last_actions; lru; closed} layout key = let open Lwt_syntax in if !closed then @@ -821,16 +872,8 @@ end = struct in let p = Action.read ~on_file_closed files last_actions layout key in Table.replace last_actions layout.filepath (Read p) ; - let+ is_opened, value = p in - (match is_opened with - | None -> ( - match Table.find_opt last_actions layout.filepath with - | Some (Read p) -> ( - match Lwt.state p with - | Lwt.Return _ -> Table.remove last_actions layout.filepath - | _ -> ()) - | _ -> ()) - | _ -> ()) ; + let+ _, value = p in + clean_up files last_actions ; value (* Very similar to [read] action except we only look at the bitset @@ -855,6 +898,7 @@ end = struct in Table.replace last_actions layout.filepath (Value_exists p) ; let+ _, exists = p in + clean_up files last_actions ; exists (* Very similar to [value_exists] action except we look at the @@ -878,16 +922,8 @@ end = struct in let p = Action.count_values ~on_file_closed files last_actions layout in Table.replace last_actions layout.filepath (Count_values p) ; - let+ is_opened, count = p in - (match is_opened with - | None -> ( - match Table.find_opt last_actions layout.filepath with - | Some (Count_values p) -> ( - match Lwt.state p with - | Lwt.Return _ -> Table.remove last_actions layout.filepath - | _ -> ()) - | _ -> ()) - | _ -> ()) ; + let+ _, count = p in + clean_up files last_actions ; count let write ?(override = false) {files; last_actions; lru; closed} layout key @@ -904,6 +940,7 @@ end = struct let* opened_file = opened_file_promise in on_file_opened opened_file in + clean_up files last_actions ; let p = Action.write ~on_file_closed @@ -916,17 +953,9 @@ end = struct in Table.replace last_actions layout.filepath (Write p) ; let+ _file, result = p in + clean_up files last_actions ; result - let clean_up {files; _} = - Table.filter_map_inplace - (fun _ value -> - match value with - | Closing p -> ( - match Lwt.state p with Lwt.Return _ -> None | _ -> Some value) - | _ -> Some value) - files - let remove {files; last_actions; lru; closed} layout = let open Lwt_syntax in if !closed then @@ -934,9 +963,6 @@ end = struct else let on_file_closed ~on_file_opened:_ = let* () = may_remove_file layout.filepath in - (* Let's give a chance to yield so that. *) - let* () = Lwt.pause () in - Table.remove files layout.filepath ; Lwt.return_unit in let p = @@ -950,13 +976,7 @@ end = struct Table.replace last_actions layout.filepath (Remove p) ; Table.replace files layout.filepath (Closing p) ; let* () = p in - (* See [close_file] for an explanation of the lines below. *) - (match Table.find_opt last_actions layout.filepath with - | Some (Close p) | Some (Remove p) -> ( - match Lwt.state p with - | Lwt.Return _ -> Table.remove last_actions layout.filepath - | _ -> ()) - | _ -> ()) ; + clean_up files last_actions ; return_ok () end @@ -1150,8 +1170,6 @@ let remove_file {files; root_dir; _} file_layout file = let layout = file_layout ~root_dir file in Files.remove files layout -let clean_up {files; _} = Files.clean_up files - module View = struct let opened_files {files; _} = Files.View.opened_files files diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index d1851e3a9cb4..68851e170196 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -208,8 +208,6 @@ val count_values : 'file -> int tzresult Lwt.t -val clean_up : ('file, 'key, 'value) t -> unit - module View : sig (** Returns the number of files current opened by the key value store. Do note this number is an upper bound on the number of diff --git a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml index 919652a8c88a..3268c0305cbd 100644 --- a/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml +++ b/src/lib_stdlib_unix/test/test_key_value_store_fuzzy_bam.ml @@ -506,24 +506,53 @@ let run_scenario (function Ok () -> return_unit | Error err -> fail err) promises_running_seq in - if L.View.opened_files left <= lru_size then - (* if L.View.on_going_actions left <= lru_size then *) - run_actions action next_actions Seq_s.empty - (* else *) - (* failwith *) - (* "Expected size of on going action table to be at most %d. Got %d \ *) - (* (remaining actions: %d)." *) - (* lru_size *) - (* (L.View.on_going_actions left) *) - (* (List.length next_actions + 1) *) - else ( - Format.eprintf "HERE@." ; + let*! number_parallel_actions = Seq_s.length promises_running_seq in + (* The following checks ensure that tables are bounded. + However, the reason for the upper bound to be + [lru_size+number_parallel_actions+1] is not 100% clear to me. + + While testing, we can check that the bound + [lru_size+number_parallel_actions] is violated. + + The reason why we need to take into account + [number_parallel_actions] is quite clear. + + When starting a new parallel action, an entry is added in + both tables no matter what. Hence, if we start [n] parallel + actions, there will be [n] entries. + + It does not mean we will have [n] opened file descriptors, + this is guaranteed by the implementation and the LRU (hard + to check here though). + + The reason why a +1 is needed is probably that when + starting an action, it can trigger a closing action for + another file. By doing so, a new entry will be created for + this file in both tables. However, I would have expected + those entries to be cleaned up before the main action ends. + However, we can't call the [clean_up] function while + closing a file. So this is probably related. + *) + let upper_bound = lru_size + number_parallel_actions + 1 in + if L.View.opened_files left <= upper_bound then + if + L.View.on_going_actions left + <= lru_size + number_parallel_actions + 1 + then run_actions action next_actions Seq_s.empty + else + failwith + "Expected size of on going action table to be at most %d. Got %d \ + (remaining actions: %d)." + upper_bound + (L.View.on_going_actions left) + (List.length next_actions + 1) + else failwith "Expected size of files table to be at most %d. Got %d (remaining \ actions: %d)." - lru_size + upper_bound (L.View.opened_files left) - (List.length next_actions + 1)) + (List.length next_actions + 1) | (Parallel, action) :: next_actions -> Format.eprintf "WAIT P@." ; (* We do not wait for promises to end and append them to the -- GitLab From bff9db87f3d011ccf8e9160f5e22856e4d7b0735 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fran=C3=A7ois=20Thir=C3=A9?= Date: Tue, 1 Oct 2024 21:03:44 +0200 Subject: [PATCH 9/9] Reduce the size of the LRU --- src/bin_dal_node/constants.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/bin_dal_node/constants.ml b/src/bin_dal_node/constants.ml index 8158207892ed..aacd8b0f0282 100644 --- a/src/bin_dal_node/constants.ml +++ b/src/bin_dal_node/constants.ml @@ -32,7 +32,7 @@ let shards_store_lru_size = regular file opening and one via mmap on the bitset region). Note that setting a too high value causes a "Too many open files" error. *) let irmin_internals_entries_per_toplevel_entry = 3 in - let number_of_slots = 256 in + let number_of_slots = 32 in let number_of_remembered_levels = 1 in irmin_internals_entries_per_toplevel_entry * number_of_slots * number_of_remembered_levels -- GitLab