From 39fec49fb8d2a0d7d9245096785b048b8914aaea Mon Sep 17 00:00:00 2001 From: Julien Sagot Date: Thu, 27 Nov 2025 11:27:46 +0100 Subject: [PATCH] DAL: add a read-only interface for KVS --- src/lib_stdlib_unix/key_value_store.ml | 156 ++++++++++++----------- src/lib_stdlib_unix/key_value_store.mli | 160 ++++++++++++++---------- 2 files changed, 177 insertions(+), 139 deletions(-) diff --git a/src/lib_stdlib_unix/key_value_store.ml b/src/lib_stdlib_unix/key_value_store.ml index 52b8e8300493..1dfbb6314b51 100644 --- a/src/lib_stdlib_unix/key_value_store.ml +++ b/src/lib_stdlib_unix/key_value_store.ml @@ -991,16 +991,6 @@ let layout ?encoded_value_size ~encoding ~filepath ~eq ~index_of invalid_arg "Key_value_store.layout: encoding does not have fixed size") -(* Main data-structure of the store. - - Each physical file may have a different layout. -*) -type ('file, 'key, 'value) t = { - files : 'value Files.t; - root_dir : string; - lockfile : Lwt_unix.file_descr; -} - type ('file, 'key, 'value) file_layout = root_dir:string -> 'file -> ('key, 'value) layout @@ -1071,19 +1061,97 @@ let lockfile_unlock fd = (Lwt_utils_unix.Io_error {action = `Close; unix_code; caller; arg}) | exn -> Lwt.reraise exn) +module Read = struct + (* Main data-structure of the store. + + Each physical file may have a different layout. + *) + type ('file, 'key, 'value) t = { + files : 'value Files.t; + root_dir : string; + lockfile : Lwt_unix.file_descr; + } + + let read_value : type file key value. + (file, key, value) t -> + (file, key, value) file_layout -> + file -> + key -> + value tzresult Lwt.t = + fun {files; root_dir; _} file_layout file key -> + let layout = file_layout ~root_dir file in + Files.read files layout key + + let value_exists : type file key value. + (file, key, value) t -> + (file, key, value) file_layout -> + file -> + key -> + bool tzresult Lwt.t = + fun {files; root_dir; _} file_layout file key -> + let layout = file_layout ~root_dir file in + Files.value_exists files layout key + + let count_values : type file key value. + (file, key, value) t -> + (file, key, value) file_layout -> + file -> + int tzresult Lwt.t = + fun {files; root_dir; _} file_layout file -> + let layout = file_layout ~root_dir file in + Files.count_values files layout + + let read_values t file_layout seq = + let open Lwt_syntax in + Seq_s.of_seq seq + |> Seq_s.S.map (fun (file, key) -> + let* maybe_value = read_value t file_layout file key in + return (file, key, maybe_value)) + + let read_values_from_bytes file_layout bytes seq = + let open Lwt_syntax in + (* Fake root dir for [file_layout] in this case. We don't have a valid + root_dir for data in memory. *) + let root_dir = "memory://dal_shard" in + Seq_s.of_seq seq + |> Seq_s.S.map (fun (file, key) -> + let layout = file_layout ~root_dir file in + let maybe_value = Files.read_value_from (`Bytes bytes) layout key in + return (file, key, maybe_value)) + + let values_exist t file_layout seq = + let open Lwt_syntax in + Seq_s.of_seq seq + |> Seq_s.S.map (fun (file, key) -> + let* maybe_value = value_exists t file_layout file key in + return (file, key, maybe_value)) + + let init ?lockfile ~lru_size root_dir = + let lockfile = + match lockfile with + | Some f -> f + | None -> + Filename.temp_file ((Unix.getpid () |> string_of_int) ^ ".") ".lock" + in + with_lockfile_lock lockfile @@ fun fd -> + Lwt.return_ok {files = Files.init ~lru_size; root_dir; lockfile = fd} + + let close t = + let open Lwt_result_syntax in + let*! () = Files.close t.files in + lockfile_unlock t.lockfile +end + +include Read + let init ~lru_size ~root_dir = let open Lwt_result_syntax in let*! () = if not (Sys.file_exists root_dir) then Lwt_utils_unix.create_dir root_dir else Lwt.return_unit in - with_lockfile_lock (Filename.concat root_dir ".lock") @@ fun fd -> - return {files = Files.init ~lru_size; root_dir; lockfile = fd} - -let close t = - let open Lwt_result_syntax in - let*! () = Files.close t.files in - lockfile_unlock t.lockfile + let lockfile = Filename.concat root_dir ".lock" in + init ~lockfile ~lru_size root_dir let root_dir t = t.root_dir @@ -1099,66 +1167,12 @@ let write_value : type file key value. let layout = file_layout ~root_dir file in Files.write ?override files layout key value -let read_value : type file key value. - (file, key, value) t -> - (file, key, value) file_layout -> - file -> - key -> - value tzresult Lwt.t = - fun {files; root_dir; _} file_layout file key -> - let layout = file_layout ~root_dir file in - Files.read files layout key - -let value_exists : type file key value. - (file, key, value) t -> - (file, key, value) file_layout -> - file -> - key -> - bool tzresult Lwt.t = - fun {files; root_dir; _} file_layout file key -> - let layout = file_layout ~root_dir file in - Files.value_exists files layout key - -let count_values : type file key value. - (file, key, value) t -> - (file, key, value) file_layout -> - file -> - int tzresult Lwt.t = - fun {files; root_dir; _} file_layout file -> - let layout = file_layout ~root_dir file in - Files.count_values files layout - let write_values ?override t file_layout seq = Seq.ES.iter (fun (file, key, value) -> write_value ?override t file_layout file key value) seq -let read_values t file_layout seq = - let open Lwt_syntax in - Seq_s.of_seq seq - |> Seq_s.S.map (fun (file, key) -> - let* maybe_value = read_value t file_layout file key in - return (file, key, maybe_value)) - -let read_values_from_bytes file_layout bytes seq = - let open Lwt_syntax in - (* Fake root dir for [file_layout] in this case. We don't have a valid - root_dir for data in memory. *) - let root_dir = "memory://dal_shard" in - Seq_s.of_seq seq - |> Seq_s.S.map (fun (file, key) -> - let layout = file_layout ~root_dir file in - let maybe_value = Files.read_value_from (`Bytes bytes) layout key in - return (file, key, maybe_value)) - -let values_exist t file_layout seq = - let open Lwt_syntax in - Seq_s.of_seq seq - |> Seq_s.S.map (fun (file, key) -> - let* maybe_value = value_exists t file_layout file key in - return (file, key, maybe_value)) - let remove_file {files; root_dir; _} file_layout file = let layout = file_layout ~root_dir file in Files.remove files layout diff --git a/src/lib_stdlib_unix/key_value_store.mli b/src/lib_stdlib_unix/key_value_store.mli index e9cd3af4285d..b0aefff33347 100644 --- a/src/lib_stdlib_unix/key_value_store.mli +++ b/src/lib_stdlib_unix/key_value_store.mli @@ -108,29 +108,105 @@ val layout : unit -> ('key, 'value) layout -(** An abstract representation of a file-based key-value store. *) -type ('file, 'key, 'value) t +module Read : sig + (** An abstract representation of a file-based key-value store. *) + type ('file, 'key, 'value) t + + (** [read_value t file_layout file key] reads the value associated to [key] in the + [file] in the store. Fails if no value were attached to this [key]. The + value read is the last one that was produced by a successful write. *) + val read_value : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) file_layout -> + 'file -> + 'key -> + 'value tzresult Lwt.t + + (** [read_values t file_layout keys] produces a sequence of [values] associated to + the sequence of [keys]. This function is almost instantaneous since no reads + are performed. Reads are done when the caller consumes the values of the + sequence returned. *) + val read_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) file_layout -> + ('file * 'key) Seq.t -> + ('file * 'key * 'value tzresult) Seq_s.t + + (** variant of read_values, but decodes the shards stored in the given bytes + using the given store's layout. *) + val read_values_from_bytes : + ('file, 'key, 'value) file_layout -> + bytes -> + ('file * 'key) Seq.t -> + ('file * 'key * 'value tzresult) Seq_s.t + + (** Same as {!read_value} expect that this function returns whether the given + entry exists without reading it. *) + val value_exists : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) file_layout -> + 'file -> + 'key -> + bool tzresult Lwt.t + + (** Same as {!read_values} expect that this function returns whether the given + entries exist without reading them. *) + val values_exist : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) file_layout -> + ('file * 'key) Seq.t -> + ('file * 'key * bool tzresult) Seq_s.t + + (** This function returns the number of entries for a given file. *) + val count_values : + ('file, 'key, 'value) t -> + ('file, 'key, 'value) file_layout -> + 'file -> + int tzresult Lwt.t + + (** [init ?lockfile ~lru_size root_dir] initialises a read-only file-based + key-value store. The [root_dir] must exist on disk. + + [lru_size] is the maximum number of open files kept in the LRU cache. + Choose this value based on your value sizes and available memory. + + The implementation always uses a lockfile to coordinate access. If + [lockfile] is provided, that path is used; if the lockfile is already + held by another process, this function returns an error. If [lockfile] is + not provided, a unique, randomly generated lockfile path is used, allowing + concurrent opens even from different processes. + + Note that lockfiles do not prevent concurrent opens by the same process; + use a mutex if needed for intra-process coordination. + + This function is intended for scenarios where multiple processes need + read-only access to the same store. Since the returned store is read-only, + concurrent writes from different processes are not possible, avoiding disk + corruption. However, reads may be inconsistent if performed while another + process is writing to the underlying files. + *) + val init : + ?lockfile:string -> + lru_size:int -> + string -> + ('file, 'key, 'value) t tzresult Lwt.t -(** [init ~lru_size ~root_dir] initialises a file-based key-value store. The - [root_dir] is created on disk if it doesn't exist. All the keys/values - associated to a file are stored in a single physical file. + (** [close kvs] waits until all pending reads and writes are completed + and closes the key-value store. *) + val close : ('file, 'key, 'value) t -> unit tzresult Lwt.t +end - [lru_size] is a parameter that represents maximum number of open files. It - is up to the user of this library to decide this number depending on the - sizes of the values. +(** All functions from the Read module are available at the top level. *) +include module type of Read - Internally creates a lockfile and returns an error if a key value store in - the same [root_dir] is locked by another process. This lockfile does not - prevent concurrent opens by the same process and should be completed by a - mutex if necessary. +(** [init ~lru_size ~root_dir] is the same as {!val:Read.init} but uses a fixed + lockfile path derived from [root_dir]. This prevents opening multiple + read-write stores on the same [root_dir] from different processes. + If [root_dir] is not present, it is created. *) val init : lru_size:int -> root_dir:string -> ('file, 'key, 'value) t tzresult Lwt.t -(** [close kvs] waits until all pending reads and writes are completed - and closes the key-value store. *) -val close : ('file, 'key, 'value) t -> unit tzresult Lwt.t - (** [root_dir t] returns the [root_dir] directory used to create [t]. *) val root_dir : ('file, 'key, 'value) t -> string @@ -158,51 +234,6 @@ val write_values : ('file * 'key * 'value) Seq.t -> unit tzresult Lwt.t -(** [read_value t file_layout file key] reads the value associated to [key] in the - [file] in the store. Fails if no value were attached to this [key]. The - value read is the last one that was produced by a successful write. *) -val read_value : - ('file, 'key, 'value) t -> - ('file, 'key, 'value) file_layout -> - 'file -> - 'key -> - 'value tzresult Lwt.t - -(** [read_values t file_layout keys] produces a sequence of [values] associated to - the sequence of [keys]. This function is almost instantaneous since no reads - are performed. Reads are done when the caller consumes the values of the - sequence returned. *) -val read_values : - ('file, 'key, 'value) t -> - ('file, 'key, 'value) file_layout -> - ('file * 'key) Seq.t -> - ('file * 'key * 'value tzresult) Seq_s.t - -(** variant of read_values, but decodes the shards stored in the given bytes - using the given store's layout. *) -val read_values_from_bytes : - ('file, 'key, 'value) file_layout -> - bytes -> - ('file * 'key) Seq.t -> - ('file * 'key * 'value tzresult) Seq_s.t - -(** Same as {!read_value} expect that this function returns whether the given - entry exists without reading it. *) -val value_exists : - ('file, 'key, 'value) t -> - ('file, 'key, 'value) file_layout -> - 'file -> - 'key -> - bool tzresult Lwt.t - -(** Same as {!read_values} expect that this function returns whether the given - entries exist without reading them. *) -val values_exist : - ('file, 'key, 'value) t -> - ('file, 'key, 'value) file_layout -> - ('file * 'key) Seq.t -> - ('file * 'key * bool tzresult) Seq_s.t - (** [remove_file t file_layout] removes the corresponding physical file of [file] from the disk as well as the corresponding keys/values of the store. In case of concurrent read/write, this function should @@ -216,13 +247,6 @@ val remove_file : 'file -> unit tzresult Lwt.t -(** This function returns the number of entries for a given file. *) -val count_values : - ('file, 'key, 'value) t -> - ('file, 'key, 'value) file_layout -> - 'file -> - int tzresult Lwt.t - module View : sig (** Returns the number of files currently opened by the key value store. Do note this number is an upper bound on the number of -- GitLab