From 6f38b158f2df7d4cdb62eb9dec060d6fff45426d Mon Sep 17 00:00:00 2001 From: mattiasdrp Date: Mon, 16 Sep 2024 12:27:17 +0200 Subject: [PATCH 1/2] Brassaia: Making Io a wrapper around Unix Io was previously a functor that was only used with Unix as a backend and made the code complicated for no reason in octez --- .../unix/append_only_file.ml | 287 ++++---- .../unix/append_only_file_intf.ml | 11 +- .../lib_brassaia_pack/unix/chunked_suffix.ml | 645 +++++++++--------- .../unix/chunked_suffix_intf.ml | 12 +- .../lib_brassaia_pack/unix/control_file.ml | 4 +- .../unix/control_file_intf.ml | 6 +- brassaia/lib_brassaia_pack/unix/dispatcher.ml | 4 +- .../lib_brassaia_pack/unix/dispatcher_intf.ml | 4 +- .../lib_brassaia_pack/unix/file_manager.ml | 42 +- .../unix/file_manager_intf.ml | 30 +- brassaia/lib_brassaia_pack/unix/gc.ml | 17 +- brassaia/lib_brassaia_pack/unix/gc.mli | 4 +- brassaia/lib_brassaia_pack/unix/gc_args.ml | 3 +- brassaia/lib_brassaia_pack/unix/gc_worker.ml | 35 +- brassaia/lib_brassaia_pack/unix/gc_worker.mli | 2 +- brassaia/lib_brassaia_pack/unix/io.ml | 3 - brassaia/lib_brassaia_pack/unix/io_errors.ml | 140 ++-- brassaia/lib_brassaia_pack/unix/io_intf.ml | 2 - brassaia/lib_brassaia_pack/unix/lower.ml | 343 +++++----- brassaia/lib_brassaia_pack/unix/lower.mli | 5 +- brassaia/lib_brassaia_pack/unix/lower_intf.ml | 22 +- .../lib_brassaia_pack/unix/pack_index_intf.ml | 5 +- brassaia/lib_brassaia_pack/unix/pack_store.ml | 15 +- .../lib_brassaia_pack/unix/pack_store_intf.ml | 3 +- brassaia/lib_brassaia_pack/unix/snapshot.ml | 4 +- .../lib_brassaia_pack/unix/sparse_file.ml | 451 ++++++------ .../unix/sparse_file_intf.ml | 10 +- brassaia/lib_brassaia_pack/unix/store.ml | 76 ++- brassaia/lib_brassaia_pack/unix/store_intf.ml | 8 +- .../unix/traverse_pack_file.ml | 9 +- brassaia/test/brassaia-pack/common.ml | 27 +- brassaia/test/brassaia-pack/common.mli | 9 +- .../test/brassaia-pack/test_dispatcher.ml | 6 +- .../test/brassaia-pack/test_flush_reload.ml | 6 +- brassaia/test/brassaia-pack/test_inode.ml | 26 +- brassaia/test/brassaia-pack/test_lower.ml | 13 +- brassaia/test/brassaia-pack/test_mapping.ml | 18 +- brassaia/test/brassaia-pack/test_pack.ml | 8 +- .../brassaia-pack/test_pack_version_bump.ml | 6 +- 39 files changed, 1130 insertions(+), 1191 deletions(-) diff --git a/brassaia/lib_brassaia_pack/unix/append_only_file.ml b/brassaia/lib_brassaia_pack/unix/append_only_file.ml index 1f2151742b92..4b2c574486d0 100644 --- a/brassaia/lib_brassaia_pack/unix/append_only_file.ml +++ b/brassaia/lib_brassaia_pack/unix/append_only_file.ml @@ -15,41 +15,37 @@ *) open Import -include Append_only_file_intf - -module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct - module Io = Io - module Errs = Errs - - type t = { - io : Io.t; - mutable persisted_end_poff : int63; - dead_header_size : int63; - rw_perm : rw_perm option; +module Io = Io.Unix + +type t = { + io : Io.t; + mutable persisted_end_poff : int63; + dead_header_size : int63; + rw_perm : rw_perm option; +} + +and auto_flush_procedure = [ `Internal | `External of t -> unit ] + +and rw_perm = { + buf : Buffer.t; + auto_flush_threshold : int; + auto_flush_procedure : auto_flush_procedure; +} +(** [rw_perm] contains the data necessary to operate in readwrite mode. *) + +let create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure = + let open Result_syntax in + let+ io = Io.create ~path ~overwrite in + let persisted_end_poff = Int63.zero in + let buf = Buffer.create 0 in + { + io; + persisted_end_poff; + dead_header_size = Int63.zero; + rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure }; } - and auto_flush_procedure = [ `Internal | `External of t -> unit ] - - and rw_perm = { - buf : Buffer.t; - auto_flush_threshold : int; - auto_flush_procedure : auto_flush_procedure; - } - (** [rw_perm] contains the data necessary to operate in readwrite mode. *) - - let create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure = - let open Result_syntax in - let+ io = Io.create ~path ~overwrite in - let persisted_end_poff = Int63.zero in - let buf = Buffer.create 0 in - { - io; - persisted_end_poff; - dead_header_size = Int63.zero; - rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure }; - } - - (** A store is consistent if the real offset of the suffix/dict files is the +(** A store is consistent if the real offset of the suffix/dict files is the one recorded in the control file. When opening the store, the offset from the control file is passed as the [end_poff] argument to the [open_ro], [open_rw] functions. The [end_poff] from the control file is then used as @@ -59,117 +55,116 @@ module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct the real offset. We cannot recover otherwise, because we have no guarantees that the last object fsynced to disk is written entirely to disk. *) - let check_consistent_store ~end_poff ~dead_header_size io = - let open Result_syntax in - let* real_offset = Io.read_size io in - let dead_header_size = Int63.of_int dead_header_size in - let real_offset_without_header = - Int63.Syntax.(real_offset - dead_header_size) - in - if real_offset_without_header < end_poff then Error `Inconsistent_store - else ( - if real_offset_without_header > end_poff then - [%log.warn - "The end offset in the control file %a is smaller than the offset on \ - disk %a for %s; the store was closed in a inconsistent state." - Int63.pp end_poff Int63.pp real_offset_without_header (Io.path io)]; - Ok ()) - - let open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold - ~auto_flush_procedure = - let open Result_syntax in - let* io = Io.open_ ~path ~readonly:false in - let+ () = check_consistent_store ~end_poff ~dead_header_size io in - let persisted_end_poff = end_poff in - let dead_header_size = Int63.of_int dead_header_size in - let buf = Buffer.create 0 in - { - io; - persisted_end_poff; - dead_header_size; - rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure }; - } - - let open_ro ~path ~end_poff ~dead_header_size = - let open Result_syntax in - let* io = Io.open_ ~path ~readonly:true in - let+ () = check_consistent_store ~end_poff ~dead_header_size io in - let persisted_end_poff = end_poff in - let dead_header_size = Int63.of_int dead_header_size in - { io; persisted_end_poff; dead_header_size; rw_perm = None } - - let empty_buffer = function - | { rw_perm = Some { buf; _ }; _ } when Buffer.length buf > 0 -> false - | _ -> true - - let close t = - if not @@ empty_buffer t then Error `Pending_flush else Io.close t.io - - let readonly t = Io.readonly t.io - let path t = Io.path t.io - - let auto_flush_threshold = function - | { rw_perm = None; _ } -> None - | { rw_perm = Some rw_perm; _ } -> Some rw_perm.auto_flush_threshold - - let end_poff t = - match t.rw_perm with - | None -> t.persisted_end_poff - | Some rw_perm -> - let open Int63.Syntax in - t.persisted_end_poff + (Buffer.length rw_perm.buf |> Int63.of_int) - - let refresh_end_poff t new_end_poff = - match t.rw_perm with - | Some _ -> Error `Rw_not_allowed - | None -> - t.persisted_end_poff <- new_end_poff; - Ok () - - let flush t = - match t.rw_perm with - | None -> Error `Ro_not_allowed - | Some rw_perm -> - let open Result_syntax in - let open Int63.Syntax in - let s = Buffer.contents rw_perm.buf in - let off = t.persisted_end_poff + t.dead_header_size in - let+ () = Io.write_string t.io ~off s in - t.persisted_end_poff <- - t.persisted_end_poff + (String.length s |> Int63.of_int); - (* [truncate] is semantically identical to [clear], except that - [truncate] doesn't deallocate the internal buffer. We use - [clear] in legacy_io. *) - Buffer.truncate rw_perm.buf 0 - - let fsync t = Io.fsync t.io - - let read_exn t ~off ~len b = - let open Int63.Syntax in - let off' = off + Int63.of_int len in - if off' > t.persisted_end_poff then - raise (Errors.Pack_error `Read_out_of_bounds); +let check_consistent_store ~end_poff ~dead_header_size io = + let open Result_syntax in + let* real_offset = Io.read_size io in + let dead_header_size = Int63.of_int dead_header_size in + let real_offset_without_header = + Int63.Syntax.(real_offset - dead_header_size) + in + if real_offset_without_header < end_poff then Error `Inconsistent_store + else ( + if real_offset_without_header > end_poff then + [%log.warn + "The end offset in the control file %a is smaller than the offset on \ + disk %a for %s; the store was closed in a inconsistent state." + Int63.pp end_poff Int63.pp real_offset_without_header (Io.path io)]; + Ok ()) + +let open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold + ~auto_flush_procedure = + let open Result_syntax in + let* io = Io.open_ ~path ~readonly:false in + let+ () = check_consistent_store ~end_poff ~dead_header_size io in + let persisted_end_poff = end_poff in + let dead_header_size = Int63.of_int dead_header_size in + let buf = Buffer.create 0 in + { + io; + persisted_end_poff; + dead_header_size; + rw_perm = Some { buf; auto_flush_threshold; auto_flush_procedure }; + } + +let open_ro ~path ~end_poff ~dead_header_size = + let open Result_syntax in + let* io = Io.open_ ~path ~readonly:true in + let+ () = check_consistent_store ~end_poff ~dead_header_size io in + let persisted_end_poff = end_poff in + let dead_header_size = Int63.of_int dead_header_size in + { io; persisted_end_poff; dead_header_size; rw_perm = None } + +let empty_buffer = function + | { rw_perm = Some { buf; _ }; _ } when Buffer.length buf > 0 -> false + | _ -> true + +let close t = + if not @@ empty_buffer t then Error `Pending_flush else Io.close t.io + +let readonly t = Io.readonly t.io +let path t = Io.path t.io + +let auto_flush_threshold = function + | { rw_perm = None; _ } -> None + | { rw_perm = Some rw_perm; _ } -> Some rw_perm.auto_flush_threshold + +let end_poff t = + match t.rw_perm with + | None -> t.persisted_end_poff + | Some rw_perm -> + let open Int63.Syntax in + t.persisted_end_poff + (Buffer.length rw_perm.buf |> Int63.of_int) + +let refresh_end_poff t new_end_poff = + match t.rw_perm with + | Some _ -> Error `Rw_not_allowed + | None -> + t.persisted_end_poff <- new_end_poff; + Ok () + +let flush t = + match t.rw_perm with + | None -> Error `Ro_not_allowed + | Some rw_perm -> + let open Result_syntax in + let open Int63.Syntax in + let s = Buffer.contents rw_perm.buf in + let off = t.persisted_end_poff + t.dead_header_size in + let+ () = Io.write_string t.io ~off s in + t.persisted_end_poff <- + t.persisted_end_poff + (String.length s |> Int63.of_int); + (* [truncate] is semantically identical to [clear], except that + [truncate] doesn't deallocate the internal buffer. We use + [clear] in legacy_io. *) + Buffer.truncate rw_perm.buf 0 + +let fsync t = Io.fsync t.io + +let read_exn t ~off ~len b = + let open Int63.Syntax in + let off' = off + Int63.of_int len in + if off' > t.persisted_end_poff then + raise (Errors.Pack_error `Read_out_of_bounds); + let off = off + t.dead_header_size in + Io.read_exn t.io ~off ~len b + +let read_to_string t ~off ~len = + let open Int63.Syntax in + let off' = off + Int63.of_int len in + if off' > t.persisted_end_poff then Error `Read_out_of_bounds + else let off = off + t.dead_header_size in - Io.read_exn t.io ~off ~len b - - let read_to_string t ~off ~len = - let open Int63.Syntax in - let off' = off + Int63.of_int len in - if off' > t.persisted_end_poff then Error `Read_out_of_bounds - else - let off = off + t.dead_header_size in - Io.read_to_string t.io ~off ~len - - let append_exn t s = - match t.rw_perm with - | None -> raise Errors.RO_not_allowed - | Some rw_perm -> ( - assert (Buffer.length rw_perm.buf < rw_perm.auto_flush_threshold); - Buffer.add_string rw_perm.buf s; - if Buffer.length rw_perm.buf >= rw_perm.auto_flush_threshold then - match rw_perm.auto_flush_procedure with - | `Internal -> flush t |> Errs.raise_if_error - | `External cb -> - cb t; - assert (empty_buffer t)) -end + Io.read_to_string t.io ~off ~len + +let append_exn t s = + match t.rw_perm with + | None -> raise Errors.RO_not_allowed + | Some rw_perm -> ( + assert (Buffer.length rw_perm.buf < rw_perm.auto_flush_threshold); + Buffer.add_string rw_perm.buf s; + if Buffer.length rw_perm.buf >= rw_perm.auto_flush_threshold then + match rw_perm.auto_flush_procedure with + | `Internal -> flush t |> Io_errors.raise_if_error + | `External cb -> + cb t; + assert (empty_buffer t)) diff --git a/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml b/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml index f46ca32dba18..96071a50166f 100644 --- a/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml @@ -15,6 +15,7 @@ *) open Import +module Io = Io.Unix module type S = sig (** Abstraction for brassaia-pack's append only files (i.e. suffix and dict). @@ -25,9 +26,6 @@ module type S = sig It comprises a persistent file, an append buffer and take care of automatically shifting offsets to deal with legacy file headers. *) - module Io : Io.S - module Errs : Io_errors.S - type t type auto_flush_procedure = [ `Internal | `External of t -> unit ] @@ -184,9 +182,4 @@ module type S = sig val path : t -> string end -module type Sigs = sig - module type S = S - - module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) : - S with module Io = Io and module Errs = Errs -end +module type Sigs = S diff --git a/brassaia/lib_brassaia_pack/unix/chunked_suffix.ml b/brassaia/lib_brassaia_pack/unix/chunked_suffix.ml index 036964fbf2ce..82a137e5c3f1 100644 --- a/brassaia/lib_brassaia_pack/unix/chunked_suffix.ml +++ b/brassaia/lib_brassaia_pack/unix/chunked_suffix.ml @@ -15,345 +15,338 @@ *) open Import -include Chunked_suffix_intf - -module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct - module Io = Io - module Errs = Errs - module Ao = Append_only_file.Make (Io) (Errs) - - type chunk = { idx : int; suffix_off : int63; ao : Ao.t } - type create_error = Io.create_error - - type open_error = - [ Io.open_error - | `Closed - | `Invalid_argument - | `Inconsistent_store - | `Read_out_of_bounds ] - - type add_new_error = - [ open_error - | Io.close_error - | `Pending_flush - | `File_exists of string - | `Multiple_empty_chunks ] - - (** A simple container for chunks. *) - module Inventory : sig - type t - - val init : int -> (int -> chunk) -> t - val appendable : t -> chunk - - val find : off:int63 -> t -> chunk * int63 - (** [find ~off t] returns the chunk that contains suffix offset [off], along +module Io = Io.Unix +module Ao = Append_only_file + +type chunk = { idx : int; suffix_off : int63; ao : Ao.t } +type create_error = Io.create_error + +type open_error = + [ Io.open_error + | `Closed + | `Invalid_argument + | `Inconsistent_store + | `Read_out_of_bounds ] + +type add_new_error = + [ open_error + | Io.close_error + | `Pending_flush + | `File_exists of string + | `Multiple_empty_chunks ] + +(** A simple container for chunks. *) +module Inventory : sig + type t + + val init : int -> (int -> chunk) -> t + val appendable : t -> chunk + + val find : off:int63 -> t -> chunk * int63 + (** [find ~off t] returns the chunk that contains suffix offset [off], along with the corresponding [poff] within the chunk. Raises `Read_out_of_bounds exception. *) - val fold : - (acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a - - val open_ : - start_idx:int -> - chunk_num:int -> - open_chunk: - (chunk_idx:int -> - is_legacy:bool -> - is_appendable:bool -> - (Ao.t, open_error) result) -> - (t, [> open_error ]) result - - val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result - - val add_new_appendable : - open_chunk: - (chunk_idx:int -> - is_legacy:bool -> - is_appendable:bool -> - (Ao.t, add_new_error) result) -> - t -> - (unit, [> add_new_error ]) result - - val length : t -> int63 - (** [length t] is the length of bytes for all chunks *) - - val start_idx : t -> int - (** [start_idx t] is the idx of the first chunk *) - - val count : t -> int - (** [count t] is the number of chunks *) - end = struct - type t = { mutable chunks : chunk Array.t } - - exception OpenInventoryError of open_error - - let init num create = { chunks = Array.init num create } - let appendable t = Array.get t.chunks (Array.length t.chunks - 1) - - let find ~off t = - let open Int63.Syntax in - let suffix_off_to_chunk_poff c = off - c.suffix_off in - let find c = - let end_poff = Ao.end_poff c.ao in - let poff = suffix_off_to_chunk_poff c in - Int63.zero <= poff && poff < end_poff - in - match Array.find_opt find t.chunks with - | None -> raise (Errors.Pack_error `Read_out_of_bounds) - | Some c -> (c, suffix_off_to_chunk_poff c) - - let end_offset_of_chunk start_offset ao = - let chunk_len = Ao.end_poff ao in - Int63.Syntax.(start_offset + chunk_len) - - let is_legacy chunk_idx = chunk_idx = 0 - - let fold f acc t = - let appendable_idx = (appendable t).idx in - Array.fold_left - (fun acc chunk -> - let is_appendable = chunk.idx = appendable_idx in - f ~acc ~is_appendable ~chunk) - acc t.chunks - - let open_ ~start_idx ~chunk_num ~open_chunk = - let off_acc = ref Int63.zero in - let create_chunk i = - let suffix_off = !off_acc in - let is_appendable = i = chunk_num - 1 in - let chunk_idx = start_idx + i in - let is_legacy = is_legacy chunk_idx in - let open_result = open_chunk ~chunk_idx ~is_legacy ~is_appendable in - match open_result with - | Error err -> raise (OpenInventoryError err) - | Ok ao -> - off_acc := end_offset_of_chunk suffix_off ao; - { idx = chunk_idx; suffix_off; ao } - in - try Ok (init chunk_num create_chunk) - with OpenInventoryError err -> - Error (err : open_error :> [> open_error ]) - - let close t = - (* Close immutable chunks, ignoring errors. *) - let _ = - Array.sub t.chunks 0 (Array.length t.chunks - 1) - |> Array.iter @@ fun chunk -> - let _ = Ao.close chunk.ao in - () - in - (* Close appendable chunk and keep error since this - is the one that can have a pending flush. *) - (appendable t).ao |> Ao.close - - let wrap_error result = - Result.map_error - (fun err -> (err : add_new_error :> [> add_new_error ])) - result - - let reopen_last_chunk ~open_chunk t = - (* Close the previous appendable chunk and reopen as non-appendable. *) - let open Result_syntax in - let ({ idx; ao; suffix_off } as last_chunk) = appendable t in - let is_legacy = is_legacy idx in - (* Compute the suffix_off for the following chunk. *) - let length = end_offset_of_chunk suffix_off ao in - let* () = Ao.close ao in - let* ao = - open_chunk ~chunk_idx:idx ~is_legacy ~is_appendable:false |> wrap_error - in - let pos = Array.length t.chunks - 1 in - t.chunks.(pos) <- { last_chunk with ao }; - Ok length - - let create_appendable_chunk ~open_chunk t suffix_off = - let open Result_syntax in - let next_id = succ (appendable t).idx in - let* ao = - open_chunk ~chunk_idx:next_id ~is_legacy:false ~is_appendable:true - in - Ok { idx = next_id; suffix_off; ao } - - let add_new_appendable ~open_chunk t = - let open Result_syntax in - let* next_suffix_off = reopen_last_chunk ~open_chunk t in - let* chunk = - create_appendable_chunk ~open_chunk t next_suffix_off |> wrap_error - in - t.chunks <- Array.append t.chunks [| chunk |]; - Ok () - - let length t = - let open Int63.Syntax in - Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks - - let count t = Array.length t.chunks - let start_idx t = t.chunks.(0).idx - end - - type t = { inventory : Inventory.t; root : string; dead_header_size : int } - - let chunk_path = Layout.V4.suffix_chunk - - let create_rw ~root ~start_idx ~overwrite ~auto_flush_threshold - ~auto_flush_procedure = - let open Result_syntax in - let chunk_idx = start_idx in - let path = chunk_path ~root ~chunk_idx in - let+ ao = - Ao.create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure + val fold : + (acc:'a -> is_appendable:bool -> chunk:chunk -> 'a) -> 'a -> t -> 'a + + val open_ : + start_idx:int -> + chunk_num:int -> + open_chunk: + (chunk_idx:int -> + is_legacy:bool -> + is_appendable:bool -> + (Ao.t, open_error) result) -> + (t, [> open_error ]) result + + val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result + + val add_new_appendable : + open_chunk: + (chunk_idx:int -> + is_legacy:bool -> + is_appendable:bool -> + (Ao.t, add_new_error) result) -> + t -> + (unit, [> add_new_error ]) result + + val length : t -> int63 + (** [length t] is the length of bytes for all chunks *) + + val start_idx : t -> int + (** [start_idx t] is the idx of the first chunk *) + + val count : t -> int + (** [count t] is the number of chunks *) +end = struct + type t = { mutable chunks : chunk Array.t } + + exception OpenInventoryError of open_error + + let init num create = { chunks = Array.init num create } + let appendable t = Array.get t.chunks (Array.length t.chunks - 1) + + let find ~off t = + let open Int63.Syntax in + let suffix_off_to_chunk_poff c = off - c.suffix_off in + let find c = + let end_poff = Ao.end_poff c.ao in + let poff = suffix_off_to_chunk_poff c in + Int63.zero <= poff && poff < end_poff in - let chunk = { idx = chunk_idx; suffix_off = Int63.zero; ao } in - let inventory = Inventory.init 1 (Fun.const chunk) in - { inventory; root; dead_header_size = 0 } - - (** A module to adjust values when mapping from chunks to append-only files *) - module Ao_shim = struct - type t = { dead_header_size : int; end_poff : int63 } - - let init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy - ~is_appendable = - let open Result_syntax in - (* Only use the legacy dead_header_size for legacy chunks. *) - let dead_header_size = if is_legacy then dead_header_size else 0 in - (* The appendable chunk uses the provided [appendable_chunk_poff]; but the others - read their size on disk. TODO: this is needed for the Ao module's current - APIs but could perhaps be removed by future Ao API modifications. *) - let+ end_poff = - if is_appendable then Ok appendable_chunk_poff - else - match Io.size_of_path path with - (* Subtract [dead_header_size] because the poff value stored in the - control file does the same. *) - | Ok s -> Ok Int63.Syntax.(s - Int63.of_int dead_header_size) - | Error _ as e -> e - in - { dead_header_size; end_poff } - end - - let open_rw ~root ~appendable_chunk_poff ~start_idx ~chunk_num - ~dead_header_size ~auto_flush_threshold ~auto_flush_procedure = - let open Result_syntax in - let open_chunk ~chunk_idx ~is_legacy ~is_appendable = - let path = chunk_path ~root ~chunk_idx in - let* { dead_header_size; end_poff } = - Ao_shim.init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy - ~is_appendable - in - match is_appendable with - | true -> - Ao.open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold - ~auto_flush_procedure - | false -> Ao.open_ro ~path ~end_poff ~dead_header_size + match Array.find_opt find t.chunks with + | None -> raise (Errors.Pack_error `Read_out_of_bounds) + | Some c -> (c, suffix_off_to_chunk_poff c) + + let end_offset_of_chunk start_offset ao = + let chunk_len = Ao.end_poff ao in + Int63.Syntax.(start_offset + chunk_len) + + let is_legacy chunk_idx = chunk_idx = 0 + + let fold f acc t = + let appendable_idx = (appendable t).idx in + Array.fold_left + (fun acc chunk -> + let is_appendable = chunk.idx = appendable_idx in + f ~acc ~is_appendable ~chunk) + acc t.chunks + + let open_ ~start_idx ~chunk_num ~open_chunk = + let off_acc = ref Int63.zero in + let create_chunk i = + let suffix_off = !off_acc in + let is_appendable = i = chunk_num - 1 in + let chunk_idx = start_idx + i in + let is_legacy = is_legacy chunk_idx in + let open_result = open_chunk ~chunk_idx ~is_legacy ~is_appendable in + match open_result with + | Error err -> raise (OpenInventoryError err) + | Ok ao -> + off_acc := end_offset_of_chunk suffix_off ao; + { idx = chunk_idx; suffix_off; ao } + in + try Ok (init chunk_num create_chunk) + with OpenInventoryError err -> Error (err : open_error :> [> open_error ]) + + let close t = + (* Close immutable chunks, ignoring errors. *) + let _ = + Array.sub t.chunks 0 (Array.length t.chunks - 1) + |> Array.iter @@ fun chunk -> + let _ = Ao.close chunk.ao in + () in - let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in - { inventory; root; dead_header_size } + (* Close appendable chunk and keep error since this + is the one that can have a pending flush. *) + (appendable t).ao |> Ao.close - let open_ro ~root ~appendable_chunk_poff ~dead_header_size ~start_idx - ~chunk_num = + let wrap_error result = + Result.map_error + (fun err -> (err : add_new_error :> [> add_new_error ])) + result + + let reopen_last_chunk ~open_chunk t = + (* Close the previous appendable chunk and reopen as non-appendable. *) let open Result_syntax in - let open_chunk ~chunk_idx ~is_legacy ~is_appendable = - let path = chunk_path ~root ~chunk_idx in - let* { dead_header_size; end_poff } = - Ao_shim.init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy - ~is_appendable - in - Ao.open_ro ~path ~end_poff ~dead_header_size + let ({ idx; ao; suffix_off } as last_chunk) = appendable t in + let is_legacy = is_legacy idx in + (* Compute the suffix_off for the following chunk. *) + let length = end_offset_of_chunk suffix_off ao in + let* () = Ao.close ao in + let* ao = + open_chunk ~chunk_idx:idx ~is_legacy ~is_appendable:false |> wrap_error in - let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in - { inventory; root; dead_header_size } - - let start_idx t = Inventory.start_idx t.inventory - let chunk_num t = Inventory.count t.inventory - let appendable_ao t = (Inventory.appendable t.inventory).ao - let appendable_chunk_poff t = appendable_ao t |> Ao.end_poff - let end_soff t = Inventory.length t.inventory - - let read_exn t ~off ~len buf = - let rec read progress_off suffix_off len_requested = - let open Int63.Syntax in - (* Find chunk with [suffix_off] and calculate length we can read. *) - let chunk, poff = Inventory.find ~off:suffix_off t.inventory in - let chunk_end_poff = Ao.end_poff chunk.ao in - let read_end_poff = poff + len_requested in - let len_read = - if read_end_poff > chunk_end_poff then chunk_end_poff - poff - else len_requested - in - - (* Perform read. If this is the first read, we can use [buf]; otherwise, - we create a new buffer and transfer after the read. *) - let len_i = Int63.to_int len_read in - let is_first_read = progress_off = Int63.zero in - let ao_buf = if is_first_read then buf else Bytes.create len_i in - Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf; - if not is_first_read then - Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i; - - (* Read more if any is [rem]aining. *) - let rem = len_requested - len_read in - if rem > Int63.zero then - read (progress_off + len_read) (suffix_off + len_read) rem - else () + let pos = Array.length t.chunks - 1 in + t.chunks.(pos) <- { last_chunk with ao }; + Ok length + + let create_appendable_chunk ~open_chunk t suffix_off = + let open Result_syntax in + let next_id = succ (appendable t).idx in + let* ao = + open_chunk ~chunk_idx:next_id ~is_legacy:false ~is_appendable:true in - read Int63.zero off (Int63.of_int len) - - let read_range_exn t ~off ~min_len ~max_len buf = - let len = - let max_off = end_soff t in - let bytes_after_off = Int63.(to_int Syntax.(max_off - off)) in - if bytes_after_off < min_len then - raise (Errors.Pack_error `Read_out_of_bounds) - else if bytes_after_off > max_len then max_len - else bytes_after_off + Ok { idx = next_id; suffix_off; ao } + + let add_new_appendable ~open_chunk t = + let open Result_syntax in + let* next_suffix_off = reopen_last_chunk ~open_chunk t in + let* chunk = + create_appendable_chunk ~open_chunk t next_suffix_off |> wrap_error in - read_exn t ~off ~len buf; - len + t.chunks <- Array.append t.chunks [| chunk |]; + Ok () + + let length t = + let open Int63.Syntax in + Array.fold_left (fun sum c -> sum + Ao.end_poff c.ao) Int63.zero t.chunks + + let count t = Array.length t.chunks + let start_idx t = t.chunks.(0).idx +end + +type t = { inventory : Inventory.t; root : string; dead_header_size : int } - let append_exn t s = Ao.append_exn (appendable_ao t) s +let chunk_path = Layout.V4.suffix_chunk - let add_chunk ~auto_flush_threshold ~auto_flush_procedure t = +let create_rw ~root ~start_idx ~overwrite ~auto_flush_threshold + ~auto_flush_procedure = + let open Result_syntax in + let chunk_idx = start_idx in + let path = chunk_path ~root ~chunk_idx in + let+ ao = + Ao.create_rw ~path ~overwrite ~auto_flush_threshold ~auto_flush_procedure + in + let chunk = { idx = chunk_idx; suffix_off = Int63.zero; ao } in + let inventory = Inventory.init 1 (Fun.const chunk) in + { inventory; root; dead_header_size = 0 } + +(** A module to adjust values when mapping from chunks to append-only files *) +module Ao_shim = struct + type t = { dead_header_size : int; end_poff : int63 } + + let init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy + ~is_appendable = let open Result_syntax in - let* () = - let end_poff = appendable_chunk_poff t in - if Int63.(equal end_poff zero) then Error `Multiple_empty_chunks - else Ok () - in - let root = t.root in - let dead_header_size = t.dead_header_size in - let open_chunk ~chunk_idx ~is_legacy ~is_appendable = - let path = chunk_path ~root ~chunk_idx in - let* { dead_header_size; end_poff } = - Ao_shim.init ~path ~appendable_chunk_poff:Int63.zero ~dead_header_size - ~is_legacy ~is_appendable - in - match is_appendable with - | true -> - Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold - ~auto_flush_procedure - | false -> Ao.open_ro ~path ~end_poff ~dead_header_size + (* Only use the legacy dead_header_size for legacy chunks. *) + let dead_header_size = if is_legacy then dead_header_size else 0 in + (* The appendable chunk uses the provided [appendable_chunk_poff]; but the others + read their size on disk. TODO: this is needed for the Ao module's current + APIs but could perhaps be removed by future Ao API modifications. *) + let+ end_poff = + if is_appendable then Ok appendable_chunk_poff + else + match Io.size_of_path path with + (* Subtract [dead_header_size] because the poff value stored in the + control file does the same. *) + | Ok s -> Ok Int63.Syntax.(s - Int63.of_int dead_header_size) + | Error _ as e -> e in - Inventory.add_new_appendable ~open_chunk t.inventory - - let close t = Inventory.close t.inventory - let empty_buffer t = appendable_ao t |> Ao.empty_buffer - let flush t = appendable_ao t |> Ao.flush - let fsync t = appendable_ao t |> Ao.fsync - - let refresh_appendable_chunk_poff t new_poff = - Ao.refresh_end_poff (appendable_ao t) new_poff - - let readonly t = appendable_ao t |> Ao.readonly - let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold - - let fold_chunks f acc t = - Inventory.fold - (fun ~acc ~is_appendable ~chunk -> - let len = Ao.end_poff chunk.ao in - let start_suffix_off = chunk.suffix_off in - let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in - f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable) - acc t.inventory + { dead_header_size; end_poff } end + +let open_rw ~root ~appendable_chunk_poff ~start_idx ~chunk_num ~dead_header_size + ~auto_flush_threshold ~auto_flush_procedure = + let open Result_syntax in + let open_chunk ~chunk_idx ~is_legacy ~is_appendable = + let path = chunk_path ~root ~chunk_idx in + let* { dead_header_size; end_poff } = + Ao_shim.init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy + ~is_appendable + in + match is_appendable with + | true -> + Ao.open_rw ~path ~end_poff ~dead_header_size ~auto_flush_threshold + ~auto_flush_procedure + | false -> Ao.open_ro ~path ~end_poff ~dead_header_size + in + let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in + { inventory; root; dead_header_size } + +let open_ro ~root ~appendable_chunk_poff ~dead_header_size ~start_idx ~chunk_num + = + let open Result_syntax in + let open_chunk ~chunk_idx ~is_legacy ~is_appendable = + let path = chunk_path ~root ~chunk_idx in + let* { dead_header_size; end_poff } = + Ao_shim.init ~path ~appendable_chunk_poff ~dead_header_size ~is_legacy + ~is_appendable + in + Ao.open_ro ~path ~end_poff ~dead_header_size + in + let+ inventory = Inventory.open_ ~start_idx ~chunk_num ~open_chunk in + { inventory; root; dead_header_size } + +let start_idx t = Inventory.start_idx t.inventory +let chunk_num t = Inventory.count t.inventory +let appendable_ao t = (Inventory.appendable t.inventory).ao +let appendable_chunk_poff t = appendable_ao t |> Ao.end_poff +let end_soff t = Inventory.length t.inventory + +let read_exn t ~off ~len buf = + let rec read progress_off suffix_off len_requested = + let open Int63.Syntax in + (* Find chunk with [suffix_off] and calculate length we can read. *) + let chunk, poff = Inventory.find ~off:suffix_off t.inventory in + let chunk_end_poff = Ao.end_poff chunk.ao in + let read_end_poff = poff + len_requested in + let len_read = + if read_end_poff > chunk_end_poff then chunk_end_poff - poff + else len_requested + in + + (* Perform read. If this is the first read, we can use [buf]; otherwise, + we create a new buffer and transfer after the read. *) + let len_i = Int63.to_int len_read in + let is_first_read = progress_off = Int63.zero in + let ao_buf = if is_first_read then buf else Bytes.create len_i in + Ao.read_exn chunk.ao ~off:poff ~len:len_i ao_buf; + if not is_first_read then + Bytes.blit ao_buf 0 buf (Int63.to_int progress_off) len_i; + + (* Read more if any is [rem]aining. *) + let rem = len_requested - len_read in + if rem > Int63.zero then + read (progress_off + len_read) (suffix_off + len_read) rem + else () + in + read Int63.zero off (Int63.of_int len) + +let read_range_exn t ~off ~min_len ~max_len buf = + let len = + let max_off = end_soff t in + let bytes_after_off = Int63.(to_int Syntax.(max_off - off)) in + if bytes_after_off < min_len then + raise (Errors.Pack_error `Read_out_of_bounds) + else if bytes_after_off > max_len then max_len + else bytes_after_off + in + read_exn t ~off ~len buf; + len + +let append_exn t s = Ao.append_exn (appendable_ao t) s + +let add_chunk ~auto_flush_threshold ~auto_flush_procedure t = + let open Result_syntax in + let* () = + let end_poff = appendable_chunk_poff t in + if Int63.(equal end_poff zero) then Error `Multiple_empty_chunks else Ok () + in + let root = t.root in + let dead_header_size = t.dead_header_size in + let open_chunk ~chunk_idx ~is_legacy ~is_appendable = + let path = chunk_path ~root ~chunk_idx in + let* { dead_header_size; end_poff } = + Ao_shim.init ~path ~appendable_chunk_poff:Int63.zero ~dead_header_size + ~is_legacy ~is_appendable + in + match is_appendable with + | true -> + Ao.create_rw ~path ~overwrite:true ~auto_flush_threshold + ~auto_flush_procedure + | false -> Ao.open_ro ~path ~end_poff ~dead_header_size + in + Inventory.add_new_appendable ~open_chunk t.inventory + +let close t = Inventory.close t.inventory +let empty_buffer t = appendable_ao t |> Ao.empty_buffer +let flush t = appendable_ao t |> Ao.flush +let fsync t = appendable_ao t |> Ao.fsync + +let refresh_appendable_chunk_poff t new_poff = + Ao.refresh_end_poff (appendable_ao t) new_poff + +let readonly t = appendable_ao t |> Ao.readonly +let auto_flush_threshold t = appendable_ao t |> Ao.auto_flush_threshold + +let fold_chunks f acc t = + Inventory.fold + (fun ~acc ~is_appendable ~chunk -> + let len = Ao.end_poff chunk.ao in + let start_suffix_off = chunk.suffix_off in + let end_suffix_off = Int63.Syntax.(start_suffix_off + len) in + f ~acc ~idx:chunk.idx ~start_suffix_off ~end_suffix_off ~is_appendable) + acc t.inventory diff --git a/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml b/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml index c294151a3ff8..ec46d5e4b314 100644 --- a/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml @@ -25,9 +25,8 @@ module type S = sig - [start_idx] and [chunk_num] for the open functions to know the starting file name and how many files there are. *) - module Io : Io.S - module Errs : Io_errors.S - module Ao : Append_only_file.S + module Io = Io.Unix + module Ao = Append_only_file type t type create_error = Io.create_error @@ -133,9 +132,4 @@ module type S = sig 'a end -module type Sigs = sig - module type S = S - - module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) : - S with module Io = Io and module Errs = Errs -end +module type Sigs = S diff --git a/brassaia/lib_brassaia_pack/unix/control_file.ml b/brassaia/lib_brassaia_pack/unix/control_file.ml index c2df9dfc4acb..bfa8fff038be 100644 --- a/brassaia/lib_brassaia_pack/unix/control_file.ml +++ b/brassaia/lib_brassaia_pack/unix/control_file.ml @@ -317,8 +317,8 @@ module Serde = struct end end -module Make (Serde : Serde.S) (Io : Io.S) = struct - module Io = Io +module Make (Serde : Serde.S) = struct + module Io = Io.Unix type payload = Serde.payload diff --git a/brassaia/lib_brassaia_pack/unix/control_file_intf.ml b/brassaia/lib_brassaia_pack/unix/control_file_intf.ml index ccf437c49ece..a73ee924f277 100644 --- a/brassaia/lib_brassaia_pack/unix/control_file_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/control_file_intf.ml @@ -297,7 +297,7 @@ module type S = sig None of the functions raise exceptions. *) - module Io : Io.S + module Io = Io.Unix type payload type raw_payload @@ -418,6 +418,6 @@ module type Sigs = sig module type Upper = Upper module type Volume = Volume - module Upper (Io : Io.S) : Upper with module Io = Io - module Volume (Io : Io.S) : Volume with module Io = Io + module Upper : Upper + module Volume : Volume end diff --git a/brassaia/lib_brassaia_pack/unix/dispatcher.ml b/brassaia/lib_brassaia_pack/unix/dispatcher.ml index a1a72fdfe43e..0d12ace598b7 100644 --- a/brassaia/lib_brassaia_pack/unix/dispatcher.ml +++ b/brassaia/lib_brassaia_pack/unix/dispatcher.ml @@ -19,14 +19,12 @@ include Dispatcher_intf module Payload = Control_file.Payload.Upper.Latest (* The following [with module Io = Io.Unix] forces unix *) -module Make (File_manager : File_manager.S with module Io = Io.Unix) : +module Make (File_manager : File_manager.S) : S with module File_manager = File_manager = struct module File_manager = File_manager module Io = File_manager.Io module Suffix = File_manager.Suffix module Sparse = File_manager.Sparse - module Lower = File_manager.Lower - module Errs = File_manager.Errs module Control = File_manager.Control type t = { file_manager : File_manager.t } diff --git a/brassaia/lib_brassaia_pack/unix/dispatcher_intf.ml b/brassaia/lib_brassaia_pack/unix/dispatcher_intf.ml index 7c4f2cb6d5cd..c27b953ff88b 100644 --- a/brassaia/lib_brassaia_pack/unix/dispatcher_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/dispatcher_intf.ml @@ -21,7 +21,7 @@ module type S = sig type t - val init : File_manager.t -> (t, [> File_manager.Errs.t ]) result + val init : File_manager.t -> (t, [> Io_errors.t ]) result val read_exn : t -> @@ -100,6 +100,6 @@ end module type Sigs = sig module type S = S - module Make (File_manager : File_manager.S with module Io = Io.Unix) : + module Make (File_manager : File_manager.S) : S with module File_manager = File_manager end diff --git a/brassaia/lib_brassaia_pack/unix/file_manager.ml b/brassaia/lib_brassaia_pack/unix/file_manager.ml index 54b861c0240e..e7f248059c40 100644 --- a/brassaia/lib_brassaia_pack/unix/file_manager.ml +++ b/brassaia/lib_brassaia_pack/unix/file_manager.ml @@ -101,21 +101,18 @@ module Events = struct ("chunk_num", Data_encoding.int64) end -module Make - (Io : Io.S) - (Index : Pack_index.S with module Io = Io) - (Errs : Io_errors.S with module Io = Io) = -struct - module Io = Errs.Io +module Make (Index : Pack_index.S) = struct + module Io = Io.Unix module Index = Index - module Errs = Io_errors.Make (Io) - module Control = Control_file.Upper (Io) - module Dict = Append_only_file.Make (Io) (Errs) - module Suffix = Chunked_suffix.Make (Io) (Errs) - module Sparse = Sparse_file.Make (Io) - module Lower = Lower.Make (Io) (Errs) - - type after_reload_consumer = { after_reload : unit -> (unit, Errs.t) result } + module Control = Control_file.Upper + module Dict = Append_only_file + module Suffix = Chunked_suffix + module Sparse = Sparse_file + + type after_reload_consumer = { + after_reload : unit -> (unit, Io_errors.t) result; + } + type after_flush_consumer = { after_flush : unit -> unit } type t = { @@ -170,7 +167,7 @@ struct List.fold_left (fun acc { after_reload } -> Result.bind acc after_reload) (Ok ()) consumers - |> Result.map_error (fun err -> (err : Errs.t :> [> Errs.t ])) + |> Result.map_error (fun err -> (err : Io_errors.t :> [> Io_errors.t ])) (** Flush stages ************************************************************* @@ -259,20 +256,20 @@ struct that the file manager flushes. *) let dict_requires_a_flush_exn t = Stats.incr_fm_field Auto_dict; - flush_dict t |> Errs.raise_if_error + flush_dict t |> Io_errors.raise_if_error (** Is expected to be called by the suffix when its append buffer is full so that the file manager flushes. *) let suffix_requires_a_flush_exn t = Stats.incr_fm_field Auto_suffix; - flush_suffix_and_its_deps t |> Errs.raise_if_error + flush_suffix_and_its_deps t |> Io_errors.raise_if_error (** Is expected to be called by the index when its append buffer is full so that the dependendies of index are flushes. When the function returns, index will flush itself. *) let index_is_about_to_auto_flush_exn t = Stats.incr_fm_field Auto_index; - flush_suffix_and_its_deps t |> Errs.raise_if_error + flush_suffix_and_its_deps t |> Io_errors.raise_if_error (* Explicit flush ********************************************************* *) @@ -551,7 +548,7 @@ struct Please manually remove %s." path) | `No_such_file_or_directory, _ -> Io.mkdir path - | (`File | `Other), _ -> Errs.raise_error (`Not_a_directory path)) + | (`File | `Other), _ -> Io_errors.raise_error (`Not_a_directory path)) let create_rw ~overwrite config = let open Result_syntax in @@ -761,7 +758,7 @@ struct (* Bytes 0-7 contains the offset. Bytes 8-15 contain the version. *) let* io = Io.open_ ~path ~readonly:true in Errors.finalise (fun _ -> - Io.close io |> Errs.log_if_error "FM: read_offset_from_legacy_file") + Io.close io |> Io_errors.log_if_error "FM: read_offset_from_legacy_file") @@ fun () -> let* s = Io.read_to_string io ~off:Int63.zero ~len:8 in let x = Int63.decode ~off:0 s in @@ -772,7 +769,8 @@ struct (* Bytes 0-7 contains the offset. Bytes 8-15 contain the version. *) let* io = Io.open_ ~path ~readonly:true in Errors.finalise (fun _ -> - Io.close io |> Errs.log_if_error "FM: read_version_from_legacy_file") + Io.close io + |> Io_errors.log_if_error "FM: read_version_from_legacy_file") @@ fun () -> let off = Int63.of_int 8 in let* s = Io.read_to_string io ~off ~len:8 in @@ -1105,7 +1103,7 @@ struct let* () = Suffix.close suffix in (* Step 3. Create the control file and close it. *) let status = Payload.Gced gced in - let dict_end_poff = Io.size_of_path dst_dict |> Errs.raise_if_error in + let dict_end_poff = Io.size_of_path dst_dict |> Io_errors.raise_if_error in let pl = { Payload.dict_end_poff; diff --git a/brassaia/lib_brassaia_pack/unix/file_manager_intf.ml b/brassaia/lib_brassaia_pack/unix/file_manager_intf.ml index bf9ec417cad1..4629f7dbb94b 100644 --- a/brassaia/lib_brassaia_pack/unix/file_manager_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/file_manager_intf.ml @@ -58,14 +58,12 @@ module type S = sig 3. and 5. are highly critical. *) - module Io : Io.S - module Control : Control_file.Upper with module Io = Io - module Dict : Append_only_file.S with module Io = Io - module Suffix : Chunked_suffix.S with module Io = Io + module Io = Io.Unix + module Control = Control_file.Upper + module Dict = Append_only_file + module Suffix = Chunked_suffix module Index : Pack_index.S - module Errs : Io_errors.S with module Io = Io - module Sparse : Sparse_file.S with module Io = Io - module Lower : Lower.S with module Io = Io + module Sparse = Sparse_file type t @@ -232,16 +230,16 @@ module type S = sig type reload_stages := [ `After_index | `After_control | `After_suffix ] - val reload : ?hook:reload_stages hook -> t -> (unit, [> Errs.t ]) result + val reload : ?hook:reload_stages hook -> t -> (unit, [> Io_errors.t ]) result (** Execute the reload routine. Is a no-op if the control file did not change. *) val register_dict_consumer : - t -> after_reload:(unit -> (unit, Errs.t) result) -> unit + t -> after_reload:(unit -> (unit, Io_errors.t) result) -> unit val register_prefix_consumer : - t -> after_reload:(unit -> (unit, Errs.t) result) -> unit + t -> after_reload:(unit -> (unit, Io_errors.t) result) -> unit val register_suffix_consumer : t -> after_flush:(unit -> unit) -> unit @@ -270,7 +268,7 @@ module type S = sig suffix_dead_bytes:int63 -> latest_gc_target_offset:int63 -> volume:Lower.volume_identifier option -> - (unit, [> Errs.t ]) result + (unit, [> Io_errors.t ]) result (** Swaps to using files from the GC [generation]. The values [suffix_start_offset], [chunk_start_idx], [chunk_num], and [suffix_dead_bytes] are used to properly load and read the suffix after a @@ -280,8 +278,8 @@ module type S = sig val readonly : t -> bool val generation : t -> int val gc_allowed : t -> bool - val split : t -> (unit, [> Errs.t ]) result - val add_volume : t -> (unit, [> Errs.t ]) result + val split : t -> (unit, [> Io_errors.t ]) result + val add_volume : t -> (unit, [> Io_errors.t ]) result val gc_behaviour : t -> [ `Delete | `Archive ] (** Decides if the GC will delete or archive the garbage data, depending on @@ -305,9 +303,5 @@ end module type Sigs = sig module type S = S - module Make - (Io : Io.S) - (Index : Pack_index.S with module Io = Io) - (Errs : Io_errors.S with module Io = Io) : - S with module Io = Io and module Index = Index and module Errs = Errs + module Make (Index : Pack_index.S) : S with module Index = Index end diff --git a/brassaia/lib_brassaia_pack/unix/gc.ml b/brassaia/lib_brassaia_pack/unix/gc.ml index 0cad0aed79c2..bbc085f0a5e4 100644 --- a/brassaia/lib_brassaia_pack/unix/gc.ml +++ b/brassaia/lib_brassaia_pack/unix/gc.ml @@ -21,7 +21,7 @@ module Make (Args : Gc_args.S) = struct module Args = Args open Args module Io = File_manager.Io - module Ao = Append_only_file.Make (Io) (Errs) + module Ao = Append_only_file module Worker = Gc_worker.Make (Args) type t = { @@ -30,8 +30,8 @@ module Make (Args : Gc_args.S) = struct task : Async.t; unlink : bool; new_suffix_start_offset : int63; - resolver : (Stats.Latest_gc.stats, Errs.t) result Lwt.u; - promise : (Stats.Latest_gc.stats, Errs.t) result Lwt.t; + resolver : (Stats.Latest_gc.stats, Io_errors.t) result Lwt.u; + promise : (Stats.Latest_gc.stats, Io_errors.t) result Lwt.t; dispatcher : Dispatcher.t; file_manager : File_manager.t; contents : read Contents_store.t; @@ -239,16 +239,19 @@ module Make (Args : Gc_args.S) = struct Ok string in let read_error err = - `Corrupted_gc_result_file (Brassaia.Type.to_string Errs.t err) + `Corrupted_gc_result_file (Brassaia.Type.to_string Io_errors.t err) + in + let gc_error err = + `Gc_process_error (Brassaia.Type.to_string Io_errors.t err) in - let gc_error err = `Gc_process_error (Brassaia.Type.to_string Errs.t err) in let* s = read_file () |> Result.map_error read_error in match Brassaia.Type.of_json_string Worker.gc_output_t s with | Error (`Msg error) -> Error (`Corrupted_gc_result_file error) | Ok ok -> ok |> Result.map_error gc_error let clean_after_abort t = - File_manager.cleanup t.file_manager |> Errs.log_if_error "clean_after_abort" + File_manager.cleanup t.file_manager + |> Io_errors.log_if_error "clean_after_abort" let finalise ~wait t = match t.resulting_stats with @@ -328,7 +331,7 @@ module Make (Args : Gc_args.S) = struct mapping_end_poff = Some gc_results.mapping_size; } | _ -> - let r = gc_errors status gc_output |> Errs.raise_if_error in + let r = gc_errors status gc_output |> Io_errors.raise_if_error in Lwt.return r let on_finalise t f = diff --git a/brassaia/lib_brassaia_pack/unix/gc.mli b/brassaia/lib_brassaia_pack/unix/gc.mli index ab57095d3c3e..81b0ce72977c 100644 --- a/brassaia/lib_brassaia_pack/unix/gc.mli +++ b/brassaia/lib_brassaia_pack/unix/gc.mli @@ -42,14 +42,14 @@ module Make val finalise : wait:bool -> t -> - ([> `Running | `Finalised of Stats.Latest_gc.stats ], Args.Errs.t) result + ([> `Running | `Finalised of Stats.Latest_gc.stats ], Io_errors.t) result Lwt.t (** [finalise ~wait t] returns the state of the GC process. If [wait = true], the call will block until GC finishes. *) val on_finalise : - t -> ((Stats.Latest_gc.stats, Args.Errs.t) result -> unit Lwt.t) -> unit + t -> ((Stats.Latest_gc.stats, Io_errors.t) result -> unit Lwt.t) -> unit (** Attaches a callback to the GC process, which will be called when the GC finalises. *) diff --git a/brassaia/lib_brassaia_pack/unix/gc_args.ml b/brassaia/lib_brassaia_pack/unix/gc_args.ml index 942109d36571..65603508cca6 100644 --- a/brassaia/lib_brassaia_pack/unix/gc_args.ml +++ b/brassaia/lib_brassaia_pack/unix/gc_args.ml @@ -17,10 +17,9 @@ open! Import module type S = sig - module File_manager : File_manager.S with module Io = Io.Unix + module File_manager : File_manager.S module Async : Async.S module Dict : Dict.S with module File_manager = File_manager - module Errs : Io_errors.S with module Io = File_manager.Io module Dispatcher : Dispatcher.S with module File_manager = File_manager type hash diff --git a/brassaia/lib_brassaia_pack/unix/gc_worker.ml b/brassaia/lib_brassaia_pack/unix/gc_worker.ml index 557c00f663f7..e9bc3f4bf067 100644 --- a/brassaia/lib_brassaia_pack/unix/gc_worker.ml +++ b/brassaia/lib_brassaia_pack/unix/gc_worker.ml @@ -22,9 +22,8 @@ exception Pack_error = Errors.Pack_error module Make (Args : Gc_args.S) = struct open Args module Io = File_manager.Io - module Lower = File_manager.Lower module Sparse = Dispatcher.File_manager.Sparse - module Ao = Append_only_file.Make (File_manager.Io) (Errs) + module Ao = Append_only_file let string_of_key = Brassaia.Type.to_string key_t @@ -211,7 +210,7 @@ module Make (Args : Gc_args.S) = struct } [@@deriving brassaia] - type gc_output = (gc_results, Args.Errs.t) result [@@deriving brassaia] + type gc_output = (gc_results, Io_errors.t) result [@@deriving brassaia] let run ~lower_root ~generation ~new_files_path root commit_key new_suffix_start_offset = @@ -228,13 +227,15 @@ module Make (Args : Gc_args.S) = struct report_old_file_sizes ~root ~generation:(generation - 1) stats |> ignore in - let file_manager = File_manager.open_ro config |> Errs.raise_if_error in + let file_manager = + File_manager.open_ro config |> Io_errors.raise_if_error + in Errors.finalise_exn (fun _outcome -> File_manager.close file_manager - |> Errs.log_if_error "GC: Close File_manager") + |> Io_errors.log_if_error "GC: Close File_manager") @@ fun () -> - let dict = Dict.init file_manager |> Errs.raise_if_error in - let dispatcher = Dispatcher.init file_manager |> Errs.raise_if_error in + let dict = Dict.init file_manager |> Io_errors.raise_if_error in + let dispatcher = Dispatcher.init file_manager |> Io_errors.raise_if_error in let node_store = Node_store.init ~config ~file_manager ~dict ~dispatcher ~lru:None in @@ -250,7 +251,8 @@ module Make (Args : Gc_args.S) = struct Commit_store.unsafe_find ~check_integrity:false commit_store commit_key with | None -> - Errs.raise_error (`Commit_key_is_dangling (string_of_key commit_key)) + Io_errors.raise_error + (`Commit_key_is_dangling (string_of_key commit_key)) | Some commit -> commit in @@ -276,14 +278,16 @@ module Make (Args : Gc_args.S) = struct Brassaia_pack.Layout.V4.prefix ~root:new_files_path ~generation in let mapping_size = - let prefix = Sparse.Ao.create ~mapping ~data |> Errs.raise_if_error in + let prefix = + Sparse.Ao.create ~mapping ~data |> Io_errors.raise_if_error + in (* Step 5. Transfer to the new prefix, flush and close. *) [%log.debug "GC: transfering to the new prefix"]; stats := Gc_stats.Worker.finish_current_step !stats "prefix: transfer"; Errors.finalise_exn (fun _ -> Sparse.Ao.flush prefix >>= (fun _ -> Sparse.Ao.close prefix) - |> Errs.log_if_error "GC: Close prefix after data copy") + |> Io_errors.log_if_error "GC: Close prefix after data copy") @@ fun () -> (* Step 5.1. Transfer all. *) Ranges.iter @@ -301,12 +305,13 @@ module Make (Args : Gc_args.S) = struct Gc_stats.Worker.finish_current_step !stats "prefix: rewrite commit parents"; let prefix = - Sparse.Wo.open_wo ~mapping_size ~mapping ~data |> Errs.raise_if_error + Sparse.Wo.open_wo ~mapping_size ~mapping ~data + |> Io_errors.raise_if_error in Errors.finalise_exn (fun _outcome -> Sparse.Wo.fsync prefix >>= (fun _ -> Sparse.Wo.close prefix) - |> Errs.log_if_error "GC: Close prefix after parent rewrite") + |> Io_errors.log_if_error "GC: Close prefix after parent rewrite") @@ fun () -> let write_exn = Sparse.Wo.write_exn prefix in List.iter @@ -435,13 +440,13 @@ module Make (Args : Gc_args.S) = struct let run_and_output_result ~lower_root ~generation ~new_files_path root commit_key new_suffix_start_offset = let result = - Errs.catch (fun () -> + Io_errors.catch (fun () -> run ~lower_root ~generation ~new_files_path root commit_key new_suffix_start_offset) in - Errs.log_if_error "gc run" result; + Io_errors.log_if_error "gc run" result; let write_result = write_gc_output ~root ~generation result in - write_result |> Errs.log_if_error "writing gc output" + write_result |> Io_errors.log_if_error "writing gc output" (* No need to raise or log if [result] is [Error _], we've written it in the file. *) end diff --git a/brassaia/lib_brassaia_pack/unix/gc_worker.mli b/brassaia/lib_brassaia_pack/unix/gc_worker.mli index fc9d4d498050..ee00f5609aaa 100644 --- a/brassaia/lib_brassaia_pack/unix/gc_worker.mli +++ b/brassaia/lib_brassaia_pack/unix/gc_worker.mli @@ -48,6 +48,6 @@ module Make } [@@deriving brassaia] - type gc_output = (gc_results, Args.Errs.t) result [@@deriving brassaia] + type gc_output = (gc_results, Io_errors.t) result [@@deriving brassaia] end with module Args := Args diff --git a/brassaia/lib_brassaia_pack/unix/io.ml b/brassaia/lib_brassaia_pack/unix/io.ml index 554113f3aa68..673d1a5e87a2 100644 --- a/brassaia/lib_brassaia_pack/unix/io.ml +++ b/brassaia/lib_brassaia_pack/unix/io.ml @@ -15,7 +15,6 @@ *) open! Import -open Io_intf module Syscalls = Brassaia_index_unix.Index_unix.Syscalls (* File utils, taken from index.unix package. @@ -48,8 +47,6 @@ module Util = struct aux fd_offset 0 length end -module type S = S - module Unix = struct type misc_error = Unix.error * string * string diff --git a/brassaia/lib_brassaia_pack/unix/io_errors.ml b/brassaia/lib_brassaia_pack/unix/io_errors.ml index 062b42f6e9da..d6ca1f8230e0 100644 --- a/brassaia/lib_brassaia_pack/unix/io_errors.ml +++ b/brassaia/lib_brassaia_pack/unix/io_errors.ml @@ -20,7 +20,7 @@ open Errors (** Error manager for errors and exceptions defined in {!Errors} and {!Io.S.misc_error} *) module type S = sig - module Io : Io.S + module Io = Io.Unix type t = [ Base.t | `Io_misc of Io.misc_error ] [@@deriving brassaia] @@ -31,79 +31,77 @@ module type S = sig val log_if_error : string -> ('a, [< t ]) result -> unit end -module Make (Io : Io.S) : S with module Io = Io = struct - module Io = Io +module Io = Io.Unix - (* Inline the definition of the polymorphic variant for the ppx. *) - type t = - [ `Double_close - | `File_exists of string - | `Invalid_parent_directory - | `No_such_file_or_directory of string - | `Not_a_file - | `Read_out_of_bounds - | `Invalid_argument - | `Decoding_error - | `Not_a_directory of string - | `Index_failure of string - | `Invalid_layout - | `Corrupted_legacy_file - | `Corrupted_mapping_file of string - | `Pending_flush - | `Rw_not_allowed - | `Migration_needed - | `Migration_to_lower_not_allowed - | `Corrupted_control_file of string - | `Sys_error of string - | `V3_store_from_the_future - | `Gc_forbidden_during_batch - | `Unknown_major_pack_version of string - | `Only_minimal_indexing_strategy_allowed - | `Commit_key_is_dangling of string - | `Dangling_key of string - | `Gc_disallowed of string - | `Node_or_contents_key_is_indexed of string - | `Gc_process_error of string - | `Corrupted_gc_result_file of string - | `Gc_process_died_without_result_file of string - | `Gc_forbidden_on_32bit_platforms - | `Invalid_prefix_read of string - | `Invalid_sparse_read of [ `After | `Before | `Hole ] * int63 - | `Invalid_volume_read of [ `Empty | `Closed ] * int63 - | `Inconsistent_store - | `Closed - | `Ro_not_allowed - | `Io_misc of Io.misc_error - | `Split_forbidden_during_batch - | `Split_disallowed - | `Multiple_empty_chunks - | `Forbidden_during_gc - | `Multiple_empty_volumes - | `Volume_missing of string - | `Volume_history_newer_than_archived_data of int63 * int63 - | `Lower_has_no_volume - | `Add_volume_forbidden_during_gc - | `Add_volume_requires_lower - | `Volume_not_found of string - | `No_tmp_path_provided ] - [@@deriving brassaia] +(* Inline the definition of the polymorphic variant for the ppx. *) +type t = + [ `Double_close + | `File_exists of string + | `Invalid_parent_directory + | `No_such_file_or_directory of string + | `Not_a_file + | `Read_out_of_bounds + | `Invalid_argument + | `Decoding_error + | `Not_a_directory of string + | `Index_failure of string + | `Invalid_layout + | `Corrupted_legacy_file + | `Corrupted_mapping_file of string + | `Pending_flush + | `Rw_not_allowed + | `Migration_needed + | `Migration_to_lower_not_allowed + | `Corrupted_control_file of string + | `Sys_error of string + | `V3_store_from_the_future + | `Gc_forbidden_during_batch + | `Unknown_major_pack_version of string + | `Only_minimal_indexing_strategy_allowed + | `Commit_key_is_dangling of string + | `Dangling_key of string + | `Gc_disallowed of string + | `Node_or_contents_key_is_indexed of string + | `Gc_process_error of string + | `Corrupted_gc_result_file of string + | `Gc_process_died_without_result_file of string + | `Gc_forbidden_on_32bit_platforms + | `Invalid_prefix_read of string + | `Invalid_sparse_read of [ `After | `Before | `Hole ] * int63 + | `Invalid_volume_read of [ `Empty | `Closed ] * int63 + | `Inconsistent_store + | `Closed + | `Ro_not_allowed + | `Io_misc of Io.misc_error + | `Split_forbidden_during_batch + | `Split_disallowed + | `Multiple_empty_chunks + | `Forbidden_during_gc + | `Multiple_empty_volumes + | `Volume_missing of string + | `Volume_history_newer_than_archived_data of int63 * int63 + | `Lower_has_no_volume + | `Add_volume_forbidden_during_gc + | `Add_volume_requires_lower + | `Volume_not_found of string + | `No_tmp_path_provided ] +[@@deriving brassaia] - let raise_error = function - | `Io_misc e -> Io.raise_misc_error e - | #error as e -> Base.raise_error e +let raise_error = function + | `Io_misc e -> Io.raise_misc_error e + | #error as e -> Base.raise_error e - let log_error context e = - [%log.err "%s failed: %a" context (Brassaia.Type.pp t) (e :> t)] +let log_error context e = + [%log.err "%s failed: %a" context (Brassaia.Type.pp t) (e :> t)] - let catch f = - match Base.catch (fun () -> Io.catch_misc_error f) with - | Ok (Ok v) -> Ok v - | Ok (Error e) -> Error (e :> t) - | Error e -> Error (e :> t) +let catch f = + match Base.catch (fun () -> Io.catch_misc_error f) with + | Ok (Ok v) -> Ok v + | Ok (Error e) -> Error (e :> t) + | Error e -> Error (e :> t) - let raise_if_error = function Ok x -> x | Error e -> raise_error e +let raise_if_error = function Ok x -> x | Error e -> raise_error e - let log_if_error context = function - | Ok _ -> () - | Error e -> log_error context e -end +let log_if_error context = function + | Ok _ -> () + | Error e -> log_error context e diff --git a/brassaia/lib_brassaia_pack/unix/io_intf.ml b/brassaia/lib_brassaia_pack/unix/io_intf.ml index 768bed8e4a46..965b01ffa326 100644 --- a/brassaia/lib_brassaia_pack/unix/io_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/io_intf.ml @@ -156,7 +156,5 @@ module type S = sig end module type Sigs = sig - module type S = S - module Unix : S with type misc_error = Unix.error * string * string end diff --git a/brassaia/lib_brassaia_pack/unix/lower.ml b/brassaia/lib_brassaia_pack/unix/lower.ml index 3281f16dc896..ccf3fb7df154 100644 --- a/brassaia/lib_brassaia_pack/unix/lower.ml +++ b/brassaia/lib_brassaia_pack/unix/lower.ml @@ -15,22 +15,19 @@ *) open Import -include Lower_intf module Layout = Brassaia_pack.Layout.V5.Volume module Payload = Control_file.Payload.Volume.Latest +module Io = Io.Unix +module Errs = Io_errors +module Control = Control_file.Volume -module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct - module Io = Io - module Errs = Errs - module Control = Control_file.Volume (Io) - module Sparse = Sparse_file.Make (Io) - +module Volume = struct type t = | Empty of { path : string } | Nonempty of { path : string; control : Payload.t; - mutable sparse : Sparse.t option; + mutable sparse : Sparse_file.t option; } type open_error = @@ -88,7 +85,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let* () = Io.mkdir root in let* () = Io.move_file ~src ~dst:data in let* mapping_end_poff = - Sparse.Wo.create_from_data ~mapping ~dead_header_size ~size ~data + Sparse_file.Wo.create_from_data ~mapping ~dead_header_size ~size ~data in let payload = { @@ -126,7 +123,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct let mapping = Layout.mapping ~root in let data = Layout.data ~root in let mapping_size = Int63.to_int control.Payload.mapping_end_poff in - let+ sparse = Sparse.open_ro ~mapping_size ~mapping ~data in + let+ sparse = Sparse_file.open_ro ~mapping_size ~mapping ~data in t.sparse <- Some sparse) let close = function @@ -136,7 +133,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct | None -> Error `Double_close | Some s -> let open Result_syntax in - let+ () = Sparse.close s in + let+ () = Sparse_file.close s in t.sparse <- None) let identifier t = path t @@ -148,7 +145,7 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct | Nonempty { sparse; _ } -> ( match sparse with | None -> Errs.raise_error (`Invalid_volume_read (`Closed, off)) - | Some s -> Sparse.read_range_exn s ~off ~min_len ~max_len b) + | Some s -> Sparse_file.read_range_exn s ~off ~min_len ~max_len b) let archive_seq ~upper_root ~generation ~is_first ~to_archive ~first_off t = let open Result_syntax in @@ -186,14 +183,14 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct | Nonempty { control; _ } -> Ok control.mapping_end_poff in (* Append archived data *) - let* ao = Sparse.Ao.open_ao ~mapping_size ~mapping ~data in + let* ao = Sparse_file.Ao.open_ao ~mapping_size ~mapping ~data in List.iter - (fun (off, seq) -> Sparse.Ao.append_seq_exn ao ~off seq) + (fun (off, seq) -> Sparse_file.Ao.append_seq_exn ao ~off seq) to_archive; - let end_offset = Sparse.Ao.end_off ao in - let mapping_end_poff = Sparse.Ao.mapping_size ao in - let* () = Sparse.Ao.flush ao in - let* () = Sparse.Ao.close ao in + let end_offset = Sparse_file.Ao.end_off ao in + let mapping_end_poff = Sparse_file.Ao.mapping_size ao in + let* () = Sparse_file.Ao.flush ao in + let* () = Sparse_file.Ao.close ao in (* Prepare new control file *) let start_offset = match t with @@ -252,167 +249,161 @@ module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct path t |> Sys.readdir |> Array.to_list |> List.iter_result clean end -module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) = struct - module Io = Io - module Errs = Errs - module Volume = Make_volume (Io) (Errs) - - type t = { - root : string; - mutable readonly : bool; - mutable volumes : Volume.t array; - mutable open_volume : Volume.t option; - } - - type open_error = [ Volume.open_error | `Volume_missing of string ] - type close_error = [ | Io.close_error ] - type nonrec volume_identifier = volume_identifier [@@deriving brassaia] - - type add_error = - [ open_error - | `Ro_not_allowed - | `Multiple_empty_volumes - | `File_exists of string - | `Invalid_parent_directory ] - - let close_open_volume t = - match t.open_volume with +type t = { + root : string; + mutable readonly : bool; + mutable volumes : Volume.t array; + mutable open_volume : Volume.t option; +} + +type open_error = [ Volume.open_error | `Volume_missing of string ] +type close_error = [ | Io.close_error ] +type volume_identifier = string [@@deriving brassaia] + +type add_error = + [ open_error + | `Ro_not_allowed + | `Multiple_empty_volumes + | `File_exists of string + | `Invalid_parent_directory ] + +let close_open_volume t = + match t.open_volume with + | None -> Ok () + | Some v -> + let open Result_syntax in + let+ _ = Volume.close v in + t.open_volume <- None + +exception LoadVolumeError of open_error + +let load_volumes ~volume_num t = + let open Result_syntax in + let* () = close_open_volume t in + let* volumes = + let root = t.root in + let volume i = + let path = Layout.directory ~root ~idx:i in + match Io.classify_path path with + | `File | `Other | `No_such_file_or_directory -> + raise (LoadVolumeError (`Volume_missing path)) + | `Directory -> ( + match Volume.open_volume path with + | Error e -> raise (LoadVolumeError e) + | Ok v -> v) + in + try Ok (Array.init volume_num volume) + with LoadVolumeError err -> Error (err : open_error :> [> open_error ]) + in + t.volumes <- volumes; + Ok t + +let open_volumes ~readonly ~volume_num root = + load_volumes ~volume_num + { root; readonly; volumes = [||]; open_volume = None } + +let reload ~volume_num t = + let open Result_syntax in + let* _ = load_volumes ~volume_num t in + Ok () + +let set_readonly t flag = t.readonly <- flag +let close = close_open_volume +let volume_num t = Array.length t.volumes + +let appendable_volume t = + match volume_num t with 0 -> None | num -> Some t.volumes.(num - 1) + +let add_volume t = + let open Result_syntax in + let* () = if t.readonly then Error `Ro_not_allowed else Ok () in + let* () = + match appendable_volume t with | None -> Ok () | Some v -> - let open Result_syntax in - let+ _ = Volume.close v in - t.open_volume <- None - - exception LoadVolumeError of open_error - - let load_volumes ~volume_num t = - let open Result_syntax in - let* () = close_open_volume t in - let* volumes = - let root = t.root in - let volume i = - let path = Layout.directory ~root ~idx:i in - match Io.classify_path path with - | `File | `Other | `No_such_file_or_directory -> - raise (LoadVolumeError (`Volume_missing path)) - | `Directory -> ( - match Volume.open_volume path with - | Error e -> raise (LoadVolumeError e) - | Ok v -> v) - in - try Ok (Array.init volume_num volume) - with LoadVolumeError err -> Error (err : open_error :> [> open_error ]) - in - t.volumes <- volumes; - Ok t - - let open_volumes ~readonly ~volume_num root = - load_volumes ~volume_num - { root; readonly; volumes = [||]; open_volume = None } - - let reload ~volume_num t = - let open Result_syntax in - let* _ = load_volumes ~volume_num t in - Ok () - - let set_readonly t flag = t.readonly <- flag - let close = close_open_volume - let volume_num t = Array.length t.volumes - - let appendable_volume t = - match volume_num t with 0 -> None | num -> Some t.volumes.(num - 1) - - let add_volume t = + if Volume.is_empty v then Error `Multiple_empty_volumes else Ok () + in + let volume_path = + let next_idx = volume_num t in + Layout.directory ~root:t.root ~idx:next_idx + in + let* vol = Volume.create volume_path in + t.volumes <- Array.append t.volumes [| vol |]; + Ok vol + +let find_volume ~off t = Array.find_opt (Volume.contains ~off) t.volumes + +let find_volume_by_offset_exn ~off t = + match find_volume ~off t with + | None -> + let err = Fmt.str "Looking for offset %d" (Int63.to_int off) in + Errs.raise_error (`Volume_not_found err) + | Some v -> v + +let find_volume_by_identifier ~id t = + match Array.find_opt (Volume.identifier_eq ~id) t.volumes with + | None -> + let err = Fmt.str "Looking for identifier %s" id in + Error (`Volume_not_found err) + | Some v -> Ok v + +let find_volume_by_identifier_exn ~id t = + find_volume_by_identifier ~id t |> Errs.raise_if_error + +let read_range_exn ~off ~min_len ~max_len ?volume t b = + [%log.debug + "read_range_exn ~off:%a ~min_len:%i ~max_len:%i" Int63.pp off min_len + max_len]; + let set_open_volume t v = + (* Maintain one open volume at a time. *) let open Result_syntax in - let* () = if t.readonly then Error `Ro_not_allowed else Ok () in let* () = - match appendable_volume t with + match t.open_volume with | None -> Ok () - | Some v -> - if Volume.is_empty v then Error `Multiple_empty_volumes else Ok () - in - let volume_path = - let next_idx = volume_num t in - Layout.directory ~root:t.root ~idx:next_idx - in - let* vol = Volume.create volume_path in - t.volumes <- Array.append t.volumes [| vol |]; - Ok vol - - let find_volume ~off t = Array.find_opt (Volume.contains ~off) t.volumes - - let find_volume_by_offset_exn ~off t = - match find_volume ~off t with - | None -> - let err = Fmt.str "Looking for offset %d" (Int63.to_int off) in - Errs.raise_error (`Volume_not_found err) - | Some v -> v - - let find_volume_by_identifier ~id t = - match Array.find_opt (Volume.identifier_eq ~id) t.volumes with - | None -> - let err = Fmt.str "Looking for identifier %s" id in - Error (`Volume_not_found err) - | Some v -> Ok v - - let find_volume_by_identifier_exn ~id t = - find_volume_by_identifier ~id t |> Errs.raise_if_error - - let read_range_exn ~off ~min_len ~max_len ?volume t b = - [%log.debug - "read_range_exn ~off:%a ~min_len:%i ~max_len:%i" Int63.pp off min_len - max_len]; - let set_open_volume t v = - (* Maintain one open volume at a time. *) - let open Result_syntax in - let* () = - match t.open_volume with - | None -> Ok () - | Some v0 -> if Volume.eq v0 v then Ok () else close_open_volume t - in - let+ _ = Volume.open_ v in - t.open_volume <- Some v + | Some v0 -> if Volume.eq v0 v then Ok () else close_open_volume t in - let volume = - match volume with - | None -> find_volume_by_offset_exn t ~off - | Some id -> find_volume_by_identifier_exn t ~id - in - set_open_volume t volume |> Errs.raise_if_error; - let len = Volume.read_range_exn ~off ~min_len ~max_len b volume in - (len, Volume.identifier volume) - - let archive_seq_exn ~upper_root ~generation ~to_archive t = - Errs.raise_if_error - (let open Result_syntax in - let* () = if t.readonly then Error `Ro_not_allowed else Ok () in - let* v = - match appendable_volume t with - | None -> Error `Lower_has_no_volume - | Some v -> Ok v - in - let* () = - match t.open_volume with - | None -> Ok () - | Some v0 -> if Volume.eq v0 v then close_open_volume t else Ok () - in - let is_first = volume_num t = 1 in - Volume.archive_seq ~upper_root ~generation ~to_archive ~is_first v) - - let read_exn ~off ~len ?volume t b = - let _, volume = read_range_exn ~off ~min_len:len ~max_len:len ?volume t b in - volume - - let create_from = Volume.create_from - - let swap ~volume ~generation ~volume_num t = - let open Result_syntax in - let* vol = find_volume_by_identifier ~id:volume t in - let* () = Volume.swap ~generation vol in - reload ~volume_num t - - let cleanup ~generation t = - match appendable_volume t with - | None -> Ok [%log.warn "Attempted to cleanup but lower has no volumes"] - | Some v -> Volume.cleanup ~generation v -end + let+ _ = Volume.open_ v in + t.open_volume <- Some v + in + let volume = + match volume with + | None -> find_volume_by_offset_exn t ~off + | Some id -> find_volume_by_identifier_exn t ~id + in + set_open_volume t volume |> Errs.raise_if_error; + let len = Volume.read_range_exn ~off ~min_len ~max_len b volume in + (len, Volume.identifier volume) + +let archive_seq_exn ~upper_root ~generation ~to_archive t = + Errs.raise_if_error + (let open Result_syntax in + let* () = if t.readonly then Error `Ro_not_allowed else Ok () in + let* v = + match appendable_volume t with + | None -> Error `Lower_has_no_volume + | Some v -> Ok v + in + let* () = + match t.open_volume with + | None -> Ok () + | Some v0 -> if Volume.eq v0 v then close_open_volume t else Ok () + in + let is_first = volume_num t = 1 in + Volume.archive_seq ~upper_root ~generation ~to_archive ~is_first v) + +let read_exn ~off ~len ?volume t b = + let _, volume = read_range_exn ~off ~min_len:len ~max_len:len ?volume t b in + volume + +let create_from = Volume.create_from + +let swap ~volume ~generation ~volume_num t = + let open Result_syntax in + let* vol = find_volume_by_identifier ~id:volume t in + let* () = Volume.swap ~generation vol in + reload ~volume_num t + +let cleanup ~generation t = + match appendable_volume t with + | None -> Ok [%log.warn "Attempted to cleanup but lower has no volumes"] + | Some v -> Volume.cleanup ~generation v diff --git a/brassaia/lib_brassaia_pack/unix/lower.mli b/brassaia/lib_brassaia_pack/unix/lower.mli index 1db18f08e88e..5a6c0421be64 100644 --- a/brassaia/lib_brassaia_pack/unix/lower.mli +++ b/brassaia/lib_brassaia_pack/unix/lower.mli @@ -14,5 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -include Lower_intf.Sigs +include Lower_intf.S + +type volume_identifier = string [@@deriving brassaia] + (** @inline *) diff --git a/brassaia/lib_brassaia_pack/unix/lower_intf.ml b/brassaia/lib_brassaia_pack/unix/lower_intf.ml index fdc8b254ccef..2b25b8afa8ea 100644 --- a/brassaia/lib_brassaia_pack/unix/lower_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/lower_intf.ml @@ -19,9 +19,9 @@ open! Import type volume_identifier = string [@@deriving brassaia] module type Volume = sig - module Io : Io.S - module Errs : Io_errors.S - module Sparse : Sparse_file.S + module Io = Io.Unix + module Errs = Io_errors + module Sparse = Sparse_file type t @@ -49,9 +49,9 @@ module type Volume = sig end module type S = sig - module Io : Io.S + module Io = Io.Unix module Errs : Io_errors.S - module Volume : Volume with module Io = Io + module Volume : Volume type t type open_error = [ Volume.open_error | `Volume_missing of string ] @@ -177,15 +177,3 @@ module type S = sig (** [cleanup ~generation t] will attempt to cleanup the appendable volume if a GC crash has occurred. *) end - -module type Sigs = sig - module type S = S - - type nonrec volume_identifier = volume_identifier [@@deriving brassaia] - - module Make_volume (Io : Io.S) (Errs : Io_errors.S with module Io = Io) : - Volume with module Io = Io and module Errs = Errs - - module Make (Io : Io.S) (Errs : Io_errors.S with module Io = Io) : - S with module Io = Io and module Errs = Errs -end diff --git a/brassaia/lib_brassaia_pack/unix/pack_index_intf.ml b/brassaia/lib_brassaia_pack/unix/pack_index_intf.ml index c2194b29cc02..37b035b7b808 100644 --- a/brassaia/lib_brassaia_pack/unix/pack_index_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/pack_index_intf.ml @@ -25,7 +25,7 @@ module type S = sig type value = int63 * int * Pack_value.Kind.t include Index.S with type value := value and type t := t and type key := key - module Io : Io.S + module Io = Io.Unix val init_exn : ?flush_callback:(unit -> unit) -> @@ -71,6 +71,5 @@ end module type Sigs = sig module type S = S - module Make (K : Brassaia.Hash.S) : - S with type key = K.t and module Io = Io.Unix + module Make (K : Brassaia.Hash.S) : S with type key = K.t end diff --git a/brassaia/lib_brassaia_pack/unix/pack_store.ml b/brassaia/lib_brassaia_pack/unix/pack_store.ml index 7fc22c12eaea..72935175cd00 100644 --- a/brassaia/lib_brassaia_pack/unix/pack_store.ml +++ b/brassaia/lib_brassaia_pack/unix/pack_store.ml @@ -15,12 +15,13 @@ *) open Import -include Pack_store_intf exception Invalid_read of string exception Corrupted_store of string exception Dangling_hash +module type S = Pack_store_intf.S + let invalid_read fmt = Fmt.kstr (fun s -> raise (Invalid_read s)) fmt let corrupted_store fmt = Fmt.kstr (fun s -> raise (Corrupted_store s)) fmt @@ -54,8 +55,7 @@ module Make_without_close_checks (Hash : Brassaia.Hash.S with type t = File_manager.Index.key) (Val : Pack_value.Persistent with type hash := Hash.t - and type key := Hash.t Pack_key.t) - (Errs : Io_errors.S with module Io = File_manager.Io) = + and type key := Hash.t Pack_key.t) = struct module Tbl = Table (Hash) module Control = File_manager.Control @@ -450,7 +450,7 @@ struct "[pack] calling batch directory on a store is not recommended. Use \ repo.batch instead."]; let on_success res = - File_manager.flush t.file_manager |> Errs.raise_if_error; + File_manager.flush t.file_manager |> Io_errors.raise_if_error; Lwt.return res in let on_fail exn = @@ -462,7 +462,8 @@ struct | Error err -> [%log.err "[pack] batch failed and flush failed. Silencing flush fail. (%a)" - (Brassaia.Type.pp Errs.t) err] + (Brassaia.Type.pp Io_errors.t) + err] in raise exn in @@ -545,12 +546,10 @@ module Make (Hash : Brassaia.Hash.S with type t = File_manager.Index.key) (Val : Pack_value.Persistent with type hash := Hash.t - and type key := Hash.t Pack_key.t) - (Errs : Io_errors.S with module Io = File_manager.Io) = + and type key := Hash.t Pack_key.t) = struct module Inner = Make_without_close_checks (File_manager) (Dict) (Dispatcher) (Hash) (Val) - (Errs) include Inner include Indexable.Closeable (Inner) diff --git a/brassaia/lib_brassaia_pack/unix/pack_store_intf.ml b/brassaia/lib_brassaia_pack/unix/pack_store_intf.ml index dea21d46d543..531cee162abd 100644 --- a/brassaia/lib_brassaia_pack/unix/pack_store_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/pack_store_intf.ml @@ -96,8 +96,7 @@ module type Sigs = sig (Hash : Brassaia.Hash.S with type t = File_manager.Index.key) (Val : Pack_value.Persistent with type hash := Hash.t - and type key := Hash.t Pack_key.t) - (Errs : Io_errors.S with module Io = File_manager.Io) : + and type key := Hash.t Pack_key.t) : S with type key = Hash.t Pack_key.t and type hash = Hash.t diff --git a/brassaia/lib_brassaia_pack/unix/snapshot.ml b/brassaia/lib_brassaia_pack/unix/snapshot.ml index 01270834e7ec..7ef15e7a3444 100644 --- a/brassaia/lib_brassaia_pack/unix/snapshot.ml +++ b/brassaia/lib_brassaia_pack/unix/snapshot.ml @@ -61,10 +61,10 @@ module Make (Args : Args) = struct files: suffix and control. We just open the file manager for simplicity. *) let file_manager = - File_manager.open_ro config |> File_manager.Errs.raise_if_error + File_manager.open_ro config |> Io_errors.raise_if_error in let dispatcher = - Dispatcher.init file_manager |> File_manager.Errs.raise_if_error + Dispatcher.init file_manager |> Io_errors.raise_if_error in let log_size = Conf.index_log_size config in { file_manager; dispatcher; log_size; inode_pack; contents_pack } diff --git a/brassaia/lib_brassaia_pack/unix/sparse_file.ml b/brassaia/lib_brassaia_pack/unix/sparse_file.ml index 36f2790ada52..5bc1c1e38f53 100644 --- a/brassaia/lib_brassaia_pack/unix/sparse_file.ml +++ b/brassaia/lib_brassaia_pack/unix/sparse_file.ml @@ -15,12 +15,13 @@ *) open Import -include Sparse_file_intf module BigArr1 = Bigarray.Array1 type int64_bigarray = (int64, Bigarray.int64_elt, Bigarray.c_layout) BigArr1.t -module Int64_mmap (Io : Io.S) : sig +module Int64_mmap : sig + module Io = Io.Unix + type t val open_ro : fn:string -> sz:int -> (t, [> Io.open_error ]) result @@ -28,6 +29,8 @@ module Int64_mmap (Io : Io.S) : sig val get : t -> int -> Int64.t val close : t -> (unit, [> Io.close_error ]) result end = struct + module Io = Io.Unix + type t = { fn : string; fd : Io.t; @@ -70,235 +73,229 @@ end = struct t.arr.{i} end -module Make (Io : Io.S) = struct - module Io = Io - module Errs = Io_errors.Make (Io) - - module Mapping_file = struct - module Int64_mmap = Int64_mmap (Io) - - let ( .%{} ) = Int64_mmap.get - - type t = Int64_mmap.t - - let open_map ~path ~size = - match Io.classify_path path with - | `File -> - let open Result_syntax in - let* mmap = Int64_mmap.open_ro ~fn:path ~sz:size in - if Int64_mmap.length mmap mod 3 = 0 then Ok mmap - else - Error - (`Corrupted_mapping_file - (__FILE__ ^ ": mapping mmap size did not meet size requirements")) - | _ -> Error (`No_such_file_or_directory path) - - let close = Int64_mmap.close - let entry_count t = Int64_mmap.length t / 3 - let entry_idx i = i * 3 - let entry_off t i = t.%{entry_idx i} |> Int63.of_int64 - let entry_poff t i = t.%{entry_idx i + 1} |> Int63.of_int64 - let entry_len t i = t.%{entry_idx i + 2} |> Int64.to_int - - let iter_exn t f = - for i = 0 to entry_count t - 1 do - f ~off:(entry_off t i) ~len:(entry_len t i) - done - - let iter t f = Errs.catch (fun () -> iter_exn t f) - - type entry = { off : int63; poff : int63; len : int } - - let find_nearest_geq arr off = - let get arr i = - let start = arr.%{entry_idx i} |> Int64.to_int in - let len = entry_len arr i in - start + len - 1 - in - match - Utils.nearest_geq ~arr ~get ~lo:0 - ~hi:(entry_count arr - 1) - ~key:(Int63.to_int off) - with - | None -> `After - | Some i -> - let entry = - { - off = entry_off arr i; - poff = entry_poff arr i; - len = entry_len arr i; - } - in - if i == 0 && entry.off > off then `Before entry else `Inside entry - end - - type t = { mapping : Mapping_file.t; data : Io.t } - - let open_ ~readonly ~mapping_size ~mapping ~data = - let open Result_syntax in - let* mapping = Mapping_file.open_map ~path:mapping ~size:mapping_size in - let+ data = Io.open_ ~path:data ~readonly in - { mapping; data } +module Io = Io.Unix +module Errs = Io_errors - let open_ro ~mapping_size ~mapping ~data = - open_ ~readonly:true ~mapping_size ~mapping ~data +module Mapping_file = struct + let ( .%{} ) = Int64_mmap.get - let close t = - let open Result_syntax in - let* () = Mapping_file.close t.mapping in - Io.close t.data - - let iter t fn = Mapping_file.iter t.mapping fn - - let get_poff { mapping; _ } ~off = - match Mapping_file.find_nearest_geq mapping off with - | `After -> raise (Errors.Pack_error (`Invalid_sparse_read (`After, off))) - | `Before _ -> - raise (Errors.Pack_error (`Invalid_sparse_read (`Before, off))) - | `Inside entry when entry.off > off -> - raise (Errors.Pack_error (`Invalid_sparse_read (`Hole, off))) - | `Inside entry -> - let open Int63.Syntax in - let shift_in_entry = off - entry.off in - let max_entry_len = Int63.of_int entry.len - shift_in_entry in - let poff = entry.poff + off - entry.off in - (poff, Int63.to_int max_entry_len) + type t = Int64_mmap.t - let read_exn t ~off ~len buf = - let poff, max_entry_len = get_poff t ~off in - if max_entry_len < len then raise (Errors.Pack_error `Read_out_of_bounds); - Io.read_exn t.data ~off:poff ~len buf + let open_map ~path ~size = + match Io.classify_path path with + | `File -> + let open Result_syntax in + let* mmap = Int64_mmap.open_ro ~fn:path ~sz:size in + if Int64_mmap.length mmap mod 3 = 0 then Ok mmap + else + Error + (`Corrupted_mapping_file + (__FILE__ ^ ": mapping mmap size did not meet size requirements")) + | _ -> Error (`No_such_file_or_directory path) + + let close = Int64_mmap.close + let entry_count t = Int64_mmap.length t / 3 + let entry_idx i = i * 3 + let entry_off t i = t.%{entry_idx i} |> Int63.of_int64 + let entry_poff t i = t.%{entry_idx i + 1} |> Int63.of_int64 + let entry_len t i = t.%{entry_idx i + 2} |> Int64.to_int + + let iter_exn t f = + for i = 0 to entry_count t - 1 do + f ~off:(entry_off t i) ~len:(entry_len t i) + done + + let iter t f = Errs.catch (fun () -> iter_exn t f) + + type entry = { off : int63; poff : int63; len : int } + + let find_nearest_geq arr off = + let get arr i = + let start = arr.%{entry_idx i} |> Int64.to_int in + let len = entry_len arr i in + start + len - 1 + in + match + Utils.nearest_geq ~arr ~get ~lo:0 + ~hi:(entry_count arr - 1) + ~key:(Int63.to_int off) + with + | None -> `After + | Some i -> + let entry = + { + off = entry_off arr i; + poff = entry_poff arr i; + len = entry_len arr i; + } + in + if i == 0 && entry.off > off then `Before entry else `Inside entry +end - let read_range_exn t ~off ~min_len ~max_len buf = - [%log.debug - "read_range_exn ~off:%a ~min_len:%i ~max_len:%i" Int63.pp off min_len - max_len]; +type t = { mapping : Mapping_file.t; data : Io.t } + +let open_ ~readonly ~mapping_size ~mapping ~data = + let open Result_syntax in + let* mapping = Mapping_file.open_map ~path:mapping ~size:mapping_size in + let+ data = Io.open_ ~path:data ~readonly in + { mapping; data } + +let open_ro ~mapping_size ~mapping ~data = + open_ ~readonly:true ~mapping_size ~mapping ~data + +let close t = + let open Result_syntax in + let* () = Mapping_file.close t.mapping in + Io.close t.data + +let iter t fn = Mapping_file.iter t.mapping fn + +let get_poff { mapping; _ } ~off = + match Mapping_file.find_nearest_geq mapping off with + | `After -> raise (Errors.Pack_error (`Invalid_sparse_read (`After, off))) + | `Before _ -> raise (Errors.Pack_error (`Invalid_sparse_read (`Before, off))) + | `Inside entry when entry.off > off -> + raise (Errors.Pack_error (`Invalid_sparse_read (`Hole, off))) + | `Inside entry -> + let open Int63.Syntax in + let shift_in_entry = off - entry.off in + let max_entry_len = Int63.of_int entry.len - shift_in_entry in + let poff = entry.poff + off - entry.off in + (poff, Int63.to_int max_entry_len) + +let read_exn t ~off ~len buf = + let poff, max_entry_len = get_poff t ~off in + if max_entry_len < len then raise (Errors.Pack_error `Read_out_of_bounds); + Io.read_exn t.data ~off:poff ~len buf + +let read_range_exn t ~off ~min_len ~max_len buf = + [%log.debug + "read_range_exn ~off:%a ~min_len:%i ~max_len:%i" Int63.pp off min_len + max_len]; + let poff, max_entry_len = get_poff t ~off in + if max_entry_len < min_len then raise (Errors.Pack_error `Read_out_of_bounds); + let len = min max_len max_entry_len in + Io.read_exn t.data ~off:poff ~len buf; + len + +let next_valid_offset { mapping; _ } ~off = + match Mapping_file.find_nearest_geq mapping off with + | `After -> None + | `Before entry -> Some entry.off + | `Inside entry -> + let open Int63.Syntax in + Some (if entry.off < off then off else entry.off) + +let make_entry ~off ~poff ~len = + if Int64.(equal zero) len then "" + else + let buf = Bytes.create (3 * 8) in + Bytes.set_int64_le buf 0 off; + Bytes.set_int64_le buf 8 poff; + Bytes.set_int64_le buf 16 len; + Bytes.unsafe_to_string buf + +module Wo = struct + type nonrec t = t + + let open_wo ~mapping_size ~mapping ~data = + open_ ~readonly:false ~mapping_size ~mapping ~data + + let write_exn t ~off ~len str = let poff, max_entry_len = get_poff t ~off in - if max_entry_len < min_len then - raise (Errors.Pack_error `Read_out_of_bounds); - let len = min max_len max_entry_len in - Io.read_exn t.data ~off:poff ~len buf; - len - - let next_valid_offset { mapping; _ } ~off = - match Mapping_file.find_nearest_geq mapping off with - | `After -> None - | `Before entry -> Some entry.off - | `Inside entry -> + assert (len <= max_entry_len); + Io.write_exn t.data ~off:poff ~len str + + let fsync t = Io.fsync t.data + let close = close + + let create_from_data ~mapping ~dead_header_size ~size ~data:_ = + let open Result_syntax in + let entry = + make_entry ~off:Int64.zero + ~poff:(Int64.of_int dead_header_size) + ~len:(Int63.to_int64 size) + in + let* mapping = Io.create ~path:mapping ~overwrite:false in + let* () = Io.write_string mapping ~off:Int63.zero entry in + let+ () = Io.close mapping in + Int63.of_int (String.length entry) +end + +module Ao = struct + module Ao = Append_only_file + + type t = { mapping : Ao.t; data : Ao.t; mutable end_off : Int63.t } + + let end_off t = t.end_off + let mapping_size t = Ao.end_poff t.mapping + + let create ~mapping ~data = + let open Result_syntax in + let ao_create path = + Ao.create_rw ~path ~overwrite:false ~auto_flush_threshold:1_000_000 + ~auto_flush_procedure:`Internal + in + let* mapping = ao_create mapping in + let+ data = ao_create data in + { mapping; data; end_off = Int63.zero } + + let open_ao ~mapping_size ~mapping ~data = + let open Result_syntax in + let ao_open ~end_poff path = + Ao.open_rw ~path ~end_poff ~dead_header_size:0 + ~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal + in + let* ao_mapping = ao_open ~end_poff:mapping_size mapping in + let* end_off, end_poff = + if mapping_size <= Int63.zero then Ok (Int63.zero, Int63.zero) + else + let entry_len = 3 * 8 in + let+ entry = + Ao.read_to_string ao_mapping + ~off:Int63.(Syntax.(mapping_size - of_int entry_len)) + ~len:entry_len + in + let entry = Bytes.of_string entry in + let end_off = Bytes.get_int64_le entry 0 |> Int63.of_int64 in + let end_poff = Bytes.get_int64_le entry 8 |> Int63.of_int64 in + let len = Bytes.get_int64_le entry 16 |> Int63.of_int64 in let open Int63.Syntax in - Some (if entry.off < off then off else entry.off) - - let make_entry ~off ~poff ~len = - if Int64.(equal zero) len then "" - else - let buf = Bytes.create (3 * 8) in - Bytes.set_int64_le buf 0 off; - Bytes.set_int64_le buf 8 poff; - Bytes.set_int64_le buf 16 len; - Bytes.unsafe_to_string buf - - module Wo = struct - type nonrec t = t - - let open_wo ~mapping_size ~mapping ~data = - open_ ~readonly:false ~mapping_size ~mapping ~data - - let write_exn t ~off ~len str = - let poff, max_entry_len = get_poff t ~off in - assert (len <= max_entry_len); - Io.write_exn t.data ~off:poff ~len str - - let fsync t = Io.fsync t.data - let close = close - - let create_from_data ~mapping ~dead_header_size ~size ~data:_ = - let open Result_syntax in - let entry = - make_entry ~off:Int64.zero - ~poff:(Int64.of_int dead_header_size) - ~len:(Int63.to_int64 size) - in - let* mapping = Io.create ~path:mapping ~overwrite:false in - let* () = Io.write_string mapping ~off:Int63.zero entry in - let+ () = Io.close mapping in - Int63.of_int (String.length entry) - end - - module Ao = struct - module Ao = Append_only_file.Make (Io) (Errs) - - type t = { mapping : Ao.t; data : Ao.t; mutable end_off : Int63.t } - - let end_off t = t.end_off - let mapping_size t = Ao.end_poff t.mapping - - let create ~mapping ~data = - let open Result_syntax in - let ao_create path = - Ao.create_rw ~path ~overwrite:false ~auto_flush_threshold:1_000_000 - ~auto_flush_procedure:`Internal - in - let* mapping = ao_create mapping in - let+ data = ao_create data in - { mapping; data; end_off = Int63.zero } - - let open_ao ~mapping_size ~mapping ~data = - let open Result_syntax in - let ao_open ~end_poff path = - Ao.open_rw ~path ~end_poff ~dead_header_size:0 - ~auto_flush_threshold:1_000_000 ~auto_flush_procedure:`Internal - in - let* ao_mapping = ao_open ~end_poff:mapping_size mapping in - let* end_off, end_poff = - if mapping_size <= Int63.zero then Ok (Int63.zero, Int63.zero) - else - let entry_len = 3 * 8 in - let+ entry = - Ao.read_to_string ao_mapping - ~off:Int63.(Syntax.(mapping_size - of_int entry_len)) - ~len:entry_len - in - let entry = Bytes.of_string entry in - let end_off = Bytes.get_int64_le entry 0 |> Int63.of_int64 in - let end_poff = Bytes.get_int64_le entry 8 |> Int63.of_int64 in - let len = Bytes.get_int64_le entry 16 |> Int63.of_int64 in - let open Int63.Syntax in - (end_off + len, end_poff + len) - in - let+ ao_data = ao_open ~end_poff data in - { mapping = ao_mapping; data = ao_data; end_off } - - let check_offset_exn { end_off; _ } ~off = - if Int63.Syntax.(end_off > off) then - Fmt.failwith - "Sparse.Ao.append_exn at offset %a, smaller than latest offset %a" - Int63.pp off Int63.pp end_off - - let append_seq_exn t ~off seq = - check_offset_exn t ~off; - let poff = Ao.end_poff t.data in - let len = - Seq.fold_left - (fun len str -> - Ao.append_exn t.data str; - len + String.length str) - 0 seq - in - let entry = - make_entry ~off:(Int63.to_int64 off) ~poff:(Int63.to_int64 poff) - ~len:(Int64.of_int len) - in - Ao.append_exn t.mapping entry; - t.end_off <- Int63.(Syntax.(off + of_int len)) - - let flush t = - let open Result_syntax in - let* () = Ao.flush t.data in - Ao.flush t.mapping - - let close t = - let open Result_syntax in - let* () = Ao.close t.data in - Ao.close t.mapping - end + (end_off + len, end_poff + len) + in + let+ ao_data = ao_open ~end_poff data in + { mapping = ao_mapping; data = ao_data; end_off } + + let check_offset_exn { end_off; _ } ~off = + if Int63.Syntax.(end_off > off) then + Fmt.failwith + "Sparse.Ao.append_exn at offset %a, smaller than latest offset %a" + Int63.pp off Int63.pp end_off + + let append_seq_exn t ~off seq = + check_offset_exn t ~off; + let poff = Ao.end_poff t.data in + let len = + Seq.fold_left + (fun len str -> + Ao.append_exn t.data str; + len + String.length str) + 0 seq + in + let entry = + make_entry ~off:(Int63.to_int64 off) ~poff:(Int63.to_int64 poff) + ~len:(Int64.of_int len) + in + Ao.append_exn t.mapping entry; + t.end_off <- Int63.(Syntax.(off + of_int len)) + + let flush t = + let open Result_syntax in + let* () = Ao.flush t.data in + Ao.flush t.mapping + + let close t = + let open Result_syntax in + let* () = Ao.close t.data in + Ao.close t.mapping end diff --git a/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml b/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml index 83840512de74..8dbeb5c7115c 100644 --- a/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml @@ -17,8 +17,8 @@ open! Import module type S = sig - module Io : Io.S - module Errs : Io_errors.S with module Io = Io + module Io : module type of Io.Unix + module Errs : Io_errors.S type t type open_error := [ Io.open_error | `Corrupted_mapping_file of string ] @@ -150,8 +150,4 @@ module type S = sig end end -module type Sigs = sig - module type S = S - - module Make (Io : Io.S) : S with module Io = Io -end +module type Sigs = S diff --git a/brassaia/lib_brassaia_pack/unix/store.ml b/brassaia/lib_brassaia_pack/unix/store.ml index d3b456af2548..310c43b56b03 100644 --- a/brassaia/lib_brassaia_pack/unix/store.ml +++ b/brassaia/lib_brassaia_pack/unix/store.ml @@ -32,8 +32,7 @@ module Maker (Config : Conf.S) = struct module H = Schema.Hash module Io = Io.Unix module Index = Pack_index.Make (H) - module Errs = Io_errors.Make (Io) - module File_manager = File_manager.Make (Io) (Index) (Errs) + module File_manager = File_manager.Make (Index) module Dict = Dict.Make (File_manager) module Dispatcher = Dispatcher.Make (File_manager) module XKey = Pack_key.Make (H) @@ -49,7 +48,6 @@ module Maker (Config : Conf.S) = struct module CA = Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Pack_value) - (Errs) include Brassaia.Contents.Store_indexable (CA) (H) (C) end @@ -63,7 +61,6 @@ module Maker (Config : Conf.S) = struct module Pack' = Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Inter.Raw) - (Errs) include Inode.Make_persistent (H) (Value) (Inter) (Pack') end @@ -90,7 +87,6 @@ module Maker (Config : Conf.S) = struct module CA = Pack_store.Make (File_manager) (Dict) (Dispatcher) (H) (Pack_value) - (Errs) include Brassaia.Commit.Generic_key.Store (Schema.Info) (Node) (CA) (H) @@ -129,7 +125,6 @@ module Maker (Config : Conf.S) = struct module Gc = Gc.Make (struct module Async = Async.Unix module File_manager = File_manager - module Errs = Errs module Dict = Dict module Dispatcher = Dispatcher module Hash = Schema.Hash @@ -172,22 +167,24 @@ module Maker (Config : Conf.S) = struct let fresh = Brassaia_pack.Conf.fresh config in let file_manager = let readonly = Brassaia_pack.Conf.readonly config in - if readonly then File_manager.open_ro config |> Errs.raise_if_error + if readonly then + File_manager.open_ro config |> Io_errors.raise_if_error else match (Io.classify_path root, fresh) with | `No_such_file_or_directory, _ -> File_manager.create_rw ~overwrite:false config - |> Errs.raise_if_error + |> Io_errors.raise_if_error | `Directory, true -> File_manager.create_rw ~overwrite:true config - |> Errs.raise_if_error + |> Io_errors.raise_if_error | `Directory, false -> - File_manager.open_rw config |> Errs.raise_if_error - | (`File | `Other), _ -> Errs.raise_error (`Not_a_directory root) + File_manager.open_rw config |> Io_errors.raise_if_error + | (`File | `Other), _ -> + Io_errors.raise_error (`Not_a_directory root) in - let dict = Dict.init file_manager |> Errs.raise_if_error in + let dict = Dict.init file_manager |> Io_errors.raise_if_error in let dispatcher = - Dispatcher.init file_manager |> Errs.raise_if_error + Dispatcher.init file_manager |> Io_errors.raise_if_error in let lru = Lru.create config in let contents = @@ -225,10 +222,14 @@ module Maker (Config : Conf.S) = struct } let flush t = - File_manager.flush ?hook:None t.file_manager |> Errs.raise_if_error + File_manager.flush ?hook:None t.file_manager + |> Io_errors.raise_if_error - let fsync t = File_manager.fsync t.file_manager |> Errs.raise_if_error - let reload t = File_manager.reload t.file_manager |> Errs.raise_if_error + let fsync t = + File_manager.fsync t.file_manager |> Io_errors.raise_if_error + + let reload t = + File_manager.reload t.file_manager |> Io_errors.raise_if_error module Gc = struct let is_allowed { file_manager; _ } = @@ -294,7 +295,7 @@ module Maker (Config : Conf.S) = struct in match result with | Ok _ -> Lwt.return true - | Error e -> Errs.raise_error e) + | Error e -> Io_errors.raise_error e) let finalise_exn ?(wait = false) t = let* result = @@ -312,7 +313,7 @@ module Maker (Config : Conf.S) = struct | Ok waited -> Lwt.return waited | Error e -> t.running_gc <- None; - Errs.raise_error e + Io_errors.raise_error e let is_finished t = Option.is_none t.running_gc @@ -360,11 +361,11 @@ module Maker (Config : Conf.S) = struct match Io.classify_path path with | `Directory -> () | `No_such_file_or_directory -> - Io.mkdir path |> Errs.raise_if_error - | _ -> Errs.raise_error `Invalid_layout + Io.mkdir path |> Io_errors.raise_if_error + | _ -> Io_errors.raise_error `Invalid_layout in let commit_key = - direct_commit_key t commit_key |> Errs.raise_if_error + direct_commit_key t commit_key |> Io_errors.raise_if_error in (* The GC action here does not matter, since we'll not fully finalise it *) @@ -373,7 +374,7 @@ module Maker (Config : Conf.S) = struct commit_key in let () = - if not launched then Errs.raise_error `Forbidden_during_gc + if not launched then Io_errors.raise_error `Forbidden_during_gc in let* gced = match t.running_gc with @@ -386,7 +387,7 @@ module Maker (Config : Conf.S) = struct let () = File_manager.create_one_commit_store t.file_manager config gced commit_key - |> Errs.raise_if_error + |> Io_errors.raise_if_error in let branch_path = Brassaia_pack.Layout.V4.branch ~root:path in let* branch_store = @@ -411,21 +412,21 @@ module Maker (Config : Conf.S) = struct in File_manager.split t.file_manager - let split_exn repo = split repo |> Errs.raise_if_error + let split_exn repo = split repo |> Io_errors.raise_if_error let add_volume_exn t = let () = if Brassaia_pack.Conf.readonly t.config then - Errs.raise_error `Ro_not_allowed; + Io_errors.raise_error `Ro_not_allowed; if Gc.is_finished t = false then - Errs.raise_error `Add_volume_forbidden_during_gc + Io_errors.raise_error `Add_volume_forbidden_during_gc in - File_manager.add_volume t.file_manager |> Errs.raise_if_error + File_manager.add_volume t.file_manager |> Io_errors.raise_if_error let batch t f = [%log.debug "[pack] batch start"]; let readonly = Brassaia_pack.Conf.readonly t.config in - if readonly then Errs.raise_error `Ro_not_allowed + if readonly then Io_errors.raise_error `Ro_not_allowed else let c0 = Mtime_clock.counter () in let try_finalise () = Gc.try_auto_finalise_exn t in @@ -441,7 +442,7 @@ module Maker (Config : Conf.S) = struct let s = Mtime_clock.count c0 |> Mtime.span_to_s in [%log.info "[pack] batch completed in %.6fs" s]; t.during_batch <- false; - File_manager.flush t.file_manager |> Errs.raise_if_error; + File_manager.flush t.file_manager |> Io_errors.raise_if_error; let* _ = try_finalise () in Lwt.return res in @@ -457,7 +458,8 @@ module Maker (Config : Conf.S) = struct [%log.err "[pack] batch failed and flush failed. Silencing flush \ fail. (%a)" - (Brassaia.Type.pp Errs.t) err] + (Brassaia.Type.pp Io_errors.t) + err] in (* Kill gc process in at_exit. *) raise exn @@ -468,7 +470,9 @@ module Maker (Config : Conf.S) = struct (* Step 1 - Kill the gc process if it is running *) let _ = Gc.cancel t in (* Step 2 - Close the files *) - let () = File_manager.close t.file_manager |> Errs.raise_if_error in + let () = + File_manager.close t.file_manager |> Io_errors.raise_if_error + in Branch.close t.branch >>= fun () -> (* Step 3 - Close the in-memory abstractions *) Dict.close t.dict; @@ -631,9 +635,10 @@ module Maker (Config : Conf.S) = struct let error_msg = Fmt.str "[%s] resulted in error: %s" context err in Lwt.return_error (`Msg error_msg) - let map_errors context (error : Errs.t) = + let map_errors context (error : Io_errors.t) = let err_msg = - Fmt.str "[%s] resulted in error: %a" context (Brassaia.Type.pp Errs.t) + Fmt.str "[%s] resulted in error: %a" context + (Brassaia.Type.pp Io_errors.t) error in `Msg err_msg @@ -729,7 +734,7 @@ module Maker (Config : Conf.S) = struct Export.run ?on_disk export f_contents f_nodes (key, Pack_value.Kind.Inode_v2_root) in - Export.close export |> Errs.raise_if_error; + Export.close export |> Io_errors.raise_if_error; Lwt.return total end @@ -757,10 +762,9 @@ module Maker (Config : Conf.S) = struct end module Internal = struct + module File_manager = File_manager module Io = Io - module Errs = Errs module Index = Index - module File_manager = File_manager let file_manager (repo : X.Repo.t) = repo.file_manager diff --git a/brassaia/lib_brassaia_pack/unix/store_intf.ml b/brassaia/lib_brassaia_pack/unix/store_intf.ml index bcc011ccfe58..66b95671a102 100644 --- a/brassaia/lib_brassaia_pack/unix/store_intf.ml +++ b/brassaia/lib_brassaia_pack/unix/store_intf.ml @@ -294,14 +294,8 @@ module type S = sig to implement or test inodes. *) module Io = Io.Unix - module Errs : Io_errors.S with module Io = Io module Index : Pack_index.S with type key = hash - - module File_manager : - File_manager.S - with module Io = Io - and module Errs = Errs - and module Index = Index + module File_manager : File_manager.S with module Index = Index val file_manager : repo -> File_manager.t diff --git a/brassaia/lib_brassaia_pack/unix/traverse_pack_file.ml b/brassaia/lib_brassaia_pack/unix/traverse_pack_file.ml index 791a89030dc5..9fe90f006690 100644 --- a/brassaia/lib_brassaia_pack/unix/traverse_pack_file.ml +++ b/brassaia/lib_brassaia_pack/unix/traverse_pack_file.ml @@ -85,7 +85,6 @@ module Make (Args : Args) : sig unit end = struct open Args - module Errs = Io_errors.Make (File_manager.Io) let pp_key = Brassaia.Type.pp Hash.t let decode_key = Brassaia.Type.(unstage (decode_bin Hash.t)) @@ -385,8 +384,10 @@ end = struct (iter_pack_entry ~always v, finalise v, "Checking and fixing index") in let run_duration = Mtime_clock.counter () in - let file_manager = File_manager.open_ro config |> Errs.raise_if_error in - let dispatcher = Dispatcher.init file_manager |> Errs.raise_if_error in + let file_manager = + File_manager.open_ro config |> Io_errors.raise_if_error + in + let dispatcher = Dispatcher.init file_manager |> Io_errors.raise_if_error in let total = Dispatcher.end_offset dispatcher in let ingest_data progress = if File_manager.gc_allowed file_manager then @@ -405,7 +406,7 @@ end = struct Progress.(with_reporter bar) ingest_data in finalise (); - File_manager.close file_manager |> Errs.raise_if_error; + File_manager.close file_manager |> Io_errors.raise_if_error; let run_duration = Mtime_clock.count run_duration in let store_stats fmt = Fmt.pf fmt "Store statistics:@, @[%a@]" Stats.pp stats diff --git a/brassaia/test/brassaia-pack/common.ml b/brassaia/test/brassaia-pack/common.ml index 08f1a8f0b347..32d263df4fc9 100644 --- a/brassaia/test/brassaia-pack/common.ml +++ b/brassaia/test/brassaia-pack/common.ml @@ -84,16 +84,17 @@ module I = Brassaia_pack_unix.Index module Index = Brassaia_pack_unix.Index.Make (Schema.Hash) module Key = Brassaia_pack_unix.Pack_key.Make (Schema.Hash) module Io = Brassaia_pack_unix.Io.Unix -module Errs = Brassaia_pack_unix.Io_errors.Make (Io) -module File_manager = Brassaia_pack_unix.File_manager.Make (Io) (Index) (Errs) +module Io_errors = Brassaia_pack_unix.Io_errors +module File_manager = Brassaia_pack_unix.File_manager.Make (Index) module Dict = Brassaia_pack_unix.Dict.Make (File_manager) module Dispatcher = Brassaia_pack_unix.Dispatcher.Make (File_manager) +module Io_Errors = Brassaia_pack_unix.Io_errors +module Lower = Brassaia_pack_unix.Lower module Pack = Brassaia_pack_unix.Pack_store.Make (File_manager) (Dict) (Dispatcher) (Schema.Hash) (Contents) - (Errs) module Branch = Brassaia_pack_unix.Atomic_write.Make_persistent @@ -125,22 +126,24 @@ struct (* TODO : remove duplication with brassaia_pack/ext.ml *) let get_file_manager config = let readonly = Brassaia_pack.Conf.readonly config in - if readonly then File_manager.open_ro config |> Errs.raise_if_error + if readonly then File_manager.open_ro config |> Io_errors.raise_if_error else let fresh = Brassaia_pack.Conf.fresh config in if fresh then ( let root = Brassaia_pack.Conf.root config in mkdir_dash_p root; - File_manager.create_rw ~overwrite:true config |> Errs.raise_if_error) - else File_manager.open_rw config |> Errs.raise_if_error + File_manager.create_rw ~overwrite:true config + |> Io_errors.raise_if_error) + else File_manager.open_rw config |> Io_errors.raise_if_error let get_dict ?name ~readonly ~fresh () = let name = Option.value name ~default:(fresh_name "dict") in let file_manager = config ~readonly ~fresh name |> get_file_manager in - let dict = Dict.init file_manager |> Errs.raise_if_error in + let dict = Dict.init file_manager |> Io_errors.raise_if_error in { name; dict; file_manager } - let close_dict d = File_manager.close d.file_manager |> Errs.raise_if_error + let close_dict d = + File_manager.close d.file_manager |> Io_errors.raise_if_error type t = { name : string; @@ -154,13 +157,13 @@ struct let f = ref (fun () -> ()) in let config = config ~readonly ~fresh name in let file_manager = get_file_manager config in - let dispatcher = Dispatcher.init file_manager |> Errs.raise_if_error in + let dispatcher = Dispatcher.init file_manager |> Io_errors.raise_if_error in (* open the index created by the fm. *) let index = File_manager.index file_manager in - let dict = Dict.init file_manager |> Errs.raise_if_error in + let dict = Dict.init file_manager |> Io_errors.raise_if_error in let lru = Some (Brassaia_pack_unix.Lru.create config) in let pack = Pack.init ~config ~file_manager ~dict ~dispatcher ~lru in - (f := fun () -> File_manager.flush file_manager |> Errs.raise_if_error); + (f := fun () -> File_manager.flush file_manager |> Io_errors.raise_if_error); { name; index; pack; dict; file_manager } |> Lwt.return let get_rw_pack () = @@ -172,7 +175,7 @@ struct let close_pack t = Index.close_exn t.index; - File_manager.close t.file_manager |> Errs.raise_if_error; + File_manager.close t.file_manager |> Io_errors.raise_if_error; (* closes pack and dict *) Lwt.return_unit end diff --git a/brassaia/test/brassaia-pack/common.mli b/brassaia/test/brassaia-pack/common.mli index e9dd7b835794..f67f7390b4dd 100644 --- a/brassaia/test/brassaia-pack/common.mli +++ b/brassaia/test/brassaia-pack/common.mli @@ -19,12 +19,9 @@ module Int63 = Optint.Int63 module Dict : Brassaia_pack_unix.Dict.S module I = Brassaia_pack_unix.Index module Conf : Brassaia_pack.Conf.S - -module File_manager : - Brassaia_pack_unix.File_manager.S with module Io = Brassaia_pack_unix.Io.Unix - -module Errs : - Brassaia_pack_unix.Io_errors.S with module Io = Brassaia_pack_unix.Io.Unix +module File_manager : Brassaia_pack_unix.File_manager.S +module Io_errors = Brassaia_pack_unix.Io_errors +module Lower = Brassaia_pack_unix.Lower module Schema : Brassaia.Schema.Extended diff --git a/brassaia/test/brassaia-pack/test_dispatcher.ml b/brassaia/test/brassaia-pack/test_dispatcher.ml index bac710d52524..3a87928dd439 100644 --- a/brassaia/test/brassaia-pack/test_dispatcher.ml +++ b/brassaia/test/brassaia-pack/test_dispatcher.ml @@ -77,8 +77,8 @@ let check_hex msg buf expected = let test_read () = let* config = setup_store () in - let fm = File_manager.open_ro config |> Errs.raise_if_error in - let dsp = Dispatcher.init fm |> Errs.raise_if_error in + let fm = File_manager.open_ro config |> Io_errors.raise_if_error in + let dsp = Dispatcher.init fm |> Io_errors.raise_if_error in let _ = Alcotest.check_raises "cannot read node_1" (Brassaia_pack_unix.Errors.Pack_error @@ -97,7 +97,7 @@ let test_read () = test_accessor "commit_2" commit_2; test_accessor "node_3" node_3; - File_manager.close fm |> Errs.raise_if_error; + File_manager.close fm |> Io_errors.raise_if_error; Lwt.return_unit let tests = [ Alcotest_lwt.test_case "read" `Quick (fun _switch -> test_read) ] diff --git a/brassaia/test/brassaia-pack/test_flush_reload.ml b/brassaia/test/brassaia-pack/test_flush_reload.ml index 4cefbf3e38b5..3d87e3069e37 100644 --- a/brassaia/test/brassaia-pack/test_flush_reload.ml +++ b/brassaia/test/brassaia-pack/test_flush_reload.ml @@ -125,7 +125,8 @@ let test_one t ~(ro_reload_at : phase_flush) = in let () = Store.S.Internal.( - File_manager.flush ~hook (file_manager rw) |> Errs.raise_if_error) + File_manager.flush ~hook (file_manager rw) + |> Io_errors.raise_if_error) in let () = aux S4_after_flush in Lwt.return_unit) @@ -207,7 +208,8 @@ let test_one t ~(rw_flush_at : phase_reload) = in let () = Store.S.Internal.( - File_manager.reload ~hook (file_manager ro) |> Errs.raise_if_error) + File_manager.reload ~hook (file_manager ro) + |> Io_errors.raise_if_error) in let () = aux S5_after_reload in Lwt.return_unit) diff --git a/brassaia/test/brassaia-pack/test_inode.ml b/brassaia/test/brassaia-pack/test_inode.ml index 66f2c7ebb1d7..61fbee7cc5ce 100644 --- a/brassaia/test/brassaia-pack/test_inode.ml +++ b/brassaia/test/brassaia-pack/test_inode.ml @@ -56,8 +56,8 @@ struct end module Io = Brassaia_pack_unix.Io.Unix - module Errs = Brassaia_pack_unix.Io_errors.Make (Io) - module File_manager = Brassaia_pack_unix.File_manager.Make (Io) (Index) (Errs) + module Io_errors = Brassaia_pack_unix.Io_errors + module File_manager = Brassaia_pack_unix.File_manager.Make (Index) module Dict = Brassaia_pack_unix.Dict.Make (File_manager) module Dispatcher = Brassaia_pack_unix.Dispatcher.Make (File_manager) @@ -65,13 +65,11 @@ struct Brassaia_pack_unix.Pack_store.Make (File_manager) (Dict) (Dispatcher) (Schema.Hash) (Inter.Raw) - (Errs) module Pack_mock = Brassaia_pack_unix.Pack_store.Make (File_manager) (Dict) (Dispatcher) (Schema.Hash) (Inter_mock.Raw) - (Errs) module Inode = Brassaia_pack_unix.Inode.Make_persistent (Schema.Hash) (Node) (Inter) (Pack) @@ -88,7 +86,6 @@ struct Brassaia_pack_unix.Pack_store.Make (File_manager) (Dict) (Dispatcher) (Schema.Hash) (Contents_value) - (Errs) module Context_make (Inode : Brassaia_pack_unix.Inode.Persistent @@ -117,7 +114,7 @@ struct let get_file_manager config = let readonly = Brassaia_pack.Conf.readonly config in - if readonly then File_manager.open_ro config |> Errs.raise_if_error + if readonly then File_manager.open_ro config |> Io_errors.raise_if_error else let fresh = Brassaia_pack.Conf.fresh config in let root = Brassaia_pack.Conf.root config in @@ -130,20 +127,23 @@ struct match (Io.classify_path root, fresh) with | `No_such_file_or_directory, _ -> File_manager.create_rw ~overwrite:false config - |> Errs.raise_if_error + |> Io_errors.raise_if_error | `Directory, true -> - File_manager.create_rw ~overwrite:true config |> Errs.raise_if_error + File_manager.create_rw ~overwrite:true config + |> Io_errors.raise_if_error | `Directory, false -> - File_manager.open_rw config |> Errs.raise_if_error - | (`File | `Other), _ -> Errs.raise_error (`Not_a_directory root) + File_manager.open_rw config |> Io_errors.raise_if_error + | (`File | `Other), _ -> Io_errors.raise_error (`Not_a_directory root) let get_store ~indexing_strategy () = [%log.app "Constructing a fresh context for use by the test"]; rm_dir root; let config = config ~indexing_strategy ~readonly:false ~fresh:true root in let file_manager = get_file_manager config in - let dict = Dict.init file_manager |> Errs.raise_if_error in - let dispatcher = Dispatcher.init file_manager |> Errs.raise_if_error in + let dict = Dict.init file_manager |> Io_errors.raise_if_error in + let dispatcher = + Dispatcher.init file_manager |> Io_errors.raise_if_error + in let lru = Some (Brassaia_pack_unix.Lru.create config) in let store = Inode.init ~config ~file_manager ~dict ~dispatcher ~lru in let store_contents = @@ -159,7 +159,7 @@ struct { store; store_contents; file_manager; foo; bar } let close t = - File_manager.close t.file_manager |> Errs.raise_if_error; + File_manager.close t.file_manager |> Io_errors.raise_if_error; (* closes dict, inodes and contents store. *) Lwt.return_unit end diff --git a/brassaia/test/brassaia-pack/test_lower.ml b/brassaia/test/brassaia-pack/test_lower.ml index db1600fee379..968cc4321da5 100644 --- a/brassaia/test/brassaia-pack/test_lower.ml +++ b/brassaia/test/brassaia-pack/test_lower.ml @@ -25,10 +25,9 @@ module Io = Brassaia_pack_unix.Io.Unix let ( let$ ) res f = f @@ Result.get_ok res module Direct_tc = struct - module Control = Brassaia_pack_unix.Control_file.Volume (Io) - module Errs = Brassaia_pack_unix.Io_errors.Make (Io) - module Lower = Brassaia_pack_unix.Lower.Make (Io) (Errs) - module Sparse = Brassaia_pack_unix.Sparse_file.Make (Io) + module Control = Brassaia_pack_unix.Control_file.Volume + module Lower = Brassaia_pack_unix.Lower + module Sparse = Brassaia_pack_unix.Sparse_file let create_control volume_path payload = let path = Brassaia_pack.Layout.V5.Volume.control ~root:volume_path in @@ -188,7 +187,7 @@ module Store_tc = struct let open Store.Internal in file_manager repo |> File_manager.lower - |> Option.map File_manager.Lower.volume_num + |> Option.map Lower.volume_num |> Option.value ~default:0 let volume_path repo offset = @@ -197,11 +196,11 @@ module Store_tc = struct let volume = match lower with | None -> Alcotest.fail "expected lower" - | Some l -> File_manager.Lower.find_volume ~off:offset l + | Some l -> Lower.find_volume ~off:offset l in match volume with | None -> Alcotest.fail "expected volume" - | Some v -> File_manager.Lower.Volume.path v + | Some v -> Lower.Volume.path v let generation repo = let open Store.Internal in diff --git a/brassaia/test/brassaia-pack/test_mapping.ml b/brassaia/test/brassaia-pack/test_mapping.ml index b914ec26c723..03be5758a2bc 100644 --- a/brassaia/test/brassaia-pack/test_mapping.ml +++ b/brassaia/test/brassaia-pack/test_mapping.ml @@ -17,8 +17,8 @@ open! Import module Int63 = Optint.Int63 module Io = Brassaia_pack_unix.Io.Unix -module Errs = Brassaia_pack_unix.Io_errors.Make (Io) -module Sparse_file = Brassaia_pack_unix.Sparse_file.Make (Io) +module Io_errors = Brassaia_pack_unix.Io_errors +module Sparse_file = Brassaia_pack_unix.Sparse_file let test_dir = Filename.concat "_build" "test-pack-mapping" @@ -34,7 +34,9 @@ let process_on_disk pairs = Io.unlink mapping |> ignore; let data = Brassaia_pack.Layout.V5.prefix ~root:test_dir ~generation:1 in Io.unlink data |> ignore; - let sparse = Sparse_file.Ao.create ~mapping ~data |> Errs.raise_if_error in + let sparse = + Sparse_file.Ao.create ~mapping ~data |> Io_errors.raise_if_error + in List.iter (fun (off, len) -> Format.printf "%i (+%i) => %i@." off len (off + len); @@ -43,15 +45,15 @@ let process_on_disk pairs = Sparse_file.Ao.append_seq_exn sparse ~off str) (List.rev pairs); let mapping_size = Int63.to_int (Sparse_file.Ao.mapping_size sparse) in - Sparse_file.Ao.flush sparse |> Errs.raise_if_error; - Sparse_file.Ao.close sparse |> Errs.raise_if_error; + Sparse_file.Ao.flush sparse |> Io_errors.raise_if_error; + Sparse_file.Ao.close sparse |> Io_errors.raise_if_error; let sparse = - Sparse_file.open_ro ~mapping_size ~mapping ~data |> Errs.raise_if_error + Sparse_file.open_ro ~mapping_size ~mapping ~data |> Io_errors.raise_if_error in let l = ref [] in let f ~off ~len = l := (Int63.to_int off, len) :: !l in - Sparse_file.iter sparse f |> Errs.raise_if_error; - Sparse_file.close sparse |> Errs.raise_if_error; + Sparse_file.iter sparse f |> Io_errors.raise_if_error; + Sparse_file.close sparse |> Io_errors.raise_if_error; !l |> List.rev (** Emulate the behaviour of the [Mapping_file] routines to process [pairs] *) diff --git a/brassaia/test/brassaia-pack/test_pack.ml b/brassaia/test/brassaia-pack/test_pack.ml index 4fabf42ab33f..b5d317656bf7 100644 --- a/brassaia/test/brassaia-pack/test_pack.ml +++ b/brassaia/test/brassaia-pack/test_pack.ml @@ -94,8 +94,8 @@ module Context = Make_context (struct let root = test_dir end) -let flush fm = File_manager.flush fm |> Errs.raise_if_error -let reload fm = File_manager.reload fm |> Errs.raise_if_error +let flush fm = File_manager.flush fm |> Io_errors.raise_if_error +let reload fm = File_manager.reload fm |> Io_errors.raise_if_error module Dict = struct let test_dict () = @@ -354,7 +354,7 @@ module Pack = struct let k2 = Pack.unsafe_append ~ensure_unique:true ~overcommit:false w h2 x2 in - Index.flush t.index ~with_fsync:false |> Errs.raise_if_error; + Index.flush t.index ~with_fsync:false |> Io_errors.raise_if_error; let+ y2 = Pack.find t'.pack k2 in Alcotest.(check (option string)) "reload after flush" (Some x2) y2 in @@ -392,7 +392,7 @@ module Pack = struct let k3 = Pack.unsafe_append ~ensure_unique:true ~overcommit:false w h3 x3 in - Index.flush t.index ~with_fsync:false |> Errs.raise_if_error; + Index.flush t.index ~with_fsync:false |> Io_errors.raise_if_error; check k2 x2 "find after flush" >>= fun () -> flush t.file_manager; reload t'.file_manager; diff --git a/brassaia/test/brassaia-pack/test_pack_version_bump.ml b/brassaia/test/brassaia-pack/test_pack_version_bump.ml index 2b824bc2db5c..4c45b14ff88e 100644 --- a/brassaia/test/brassaia-pack/test_pack_version_bump.ml +++ b/brassaia/test/brassaia-pack/test_pack_version_bump.ml @@ -28,8 +28,8 @@ module Private = struct (* the following modules are necessary to expose the File_manager.*) module Index = Brassaia_pack_unix.Index.Make (Schema.Hash) module Io = Brassaia_pack_unix.Io.Unix - module Errs = Brassaia_pack_unix.Io_errors.Make (Io) - module File_manager = Brassaia_pack_unix.File_manager.Make (Io) (Index) (Errs) + module Io_errors = Brassaia_pack_unix.Io_errors + module File_manager = Brassaia_pack_unix.File_manager.Make (Index) end module Util = struct @@ -93,7 +93,7 @@ module Util = struct (** Get the version of the underlying file; file is assumed to exist; file is assumed to be an Brassaia_pack.IO.Unix file *) let io_get_version ~root : [ `V1 | `V2 | `V3 | `V4 | `V5 ] = - File_manager.version ~root |> Errs.raise_if_error + File_manager.version ~root |> Io_errors.raise_if_error let alco_check_version ~pos:_ ~expected ~actual = Alcotest.check_repr Brassaia_pack.Version.t "" expected actual -- GitLab From aa67cbbc390efe2dd94c687a60dccd8898187147 Mon Sep 17 00:00:00 2001 From: mattiasdrp Date: Mon, 16 Sep 2024 12:27:39 +0200 Subject: [PATCH 2/2] Brassaia: Remove useless intf files associated to Io --- .../unix/append_only_file.mli | 168 +++++++++++++++- .../unix/append_only_file_intf.ml | 185 ------------------ .../lib_brassaia_pack/unix/chunked_suffix.mli | 117 ++++++++++- .../unix/chunked_suffix_intf.ml | 135 ------------- brassaia/lib_brassaia_pack/unix/io.mli | 141 ++++++++++++- brassaia/lib_brassaia_pack/unix/io_intf.ml | 160 --------------- brassaia/lib_brassaia_pack/unix/lower.mli | 154 ++++++++++++++- brassaia/lib_brassaia_pack/unix/lower_intf.ml | 179 ----------------- .../lib_brassaia_pack/unix/sparse_file.mli | 132 ++++++++++++- .../unix/sparse_file_intf.ml | 153 --------------- 10 files changed, 701 insertions(+), 823 deletions(-) delete mode 100644 brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml delete mode 100644 brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml delete mode 100644 brassaia/lib_brassaia_pack/unix/io_intf.ml delete mode 100644 brassaia/lib_brassaia_pack/unix/lower_intf.ml delete mode 100644 brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml diff --git a/brassaia/lib_brassaia_pack/unix/append_only_file.mli b/brassaia/lib_brassaia_pack/unix/append_only_file.mli index 47ec6aea4031..6769ba6bc062 100644 --- a/brassaia/lib_brassaia_pack/unix/append_only_file.mli +++ b/brassaia/lib_brassaia_pack/unix/append_only_file.mli @@ -14,5 +14,169 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -include Append_only_file_intf.Sigs -(** @inline *) +open Import + +(** Abstraction for brassaia-pack's append only files (i.e. suffix and dict). + + It is parameterized with [Io], a file system abstraction (e.g. unix, + mirage, eio_linux). + + It comprises a persistent file, an append buffer and take care of + automatically shifting offsets to deal with legacy file headers. *) + +module Io = Io.Unix + +type t + +type auto_flush_procedure = [ `Internal | `External of t -> unit ] +(** [auto_flush_procedure] defines behavior when the flush threshold is + reached. + + - Use [`Internal] to have the buffer automatically flushed. + - Use [`External f] to have [f] called when the flush threshold is + reached. It is the responsibility of [f] to call flush, in addition to + any other processing it does. *) + +val create_rw : + path:string -> + overwrite:bool -> + auto_flush_threshold:int -> + auto_flush_procedure:auto_flush_procedure -> + (t, [> Io.create_error ]) result +(** Create a rw instance of [t] by creating the file at [path]. *) + +val open_rw : + path:string -> + end_poff:int63 -> + dead_header_size:int -> + auto_flush_threshold:int -> + auto_flush_procedure:auto_flush_procedure -> + ( t, + [> Io.open_error + | `Closed + | `Invalid_argument + | `Read_out_of_bounds + | `Inconsistent_store ] ) + result +(** Create a rw instance of [t] by opening an existing file at [path]. + + {3 End Offset} + + The file has an end offset at which new data will be saved. While this + information could be computed by looking at the size of the file, we + prefer storing that information elsewhere (i.e. in the control file). This + is why [open_rw] and [open_ro] take an [end_poff] parameter, and also why + [refresh_end_poff] exists. The abstractions above [Append_only_file] are + responsible for reading/writing the offsets from/to the control file. + + {3 [dead_header_size]} + + Designates a small area at the beginning of the file that should be + ignored. The offsets start after that area. + + The actual persisted size of a file is [end_poff + dead_header_size]. + + This concept exists in order to keep supporting [`V1] and [`V2] pack + stores with [`V3]. + + {3 Auto Flushes} + + One of the goals of the [Append_only_file] abstraction is to provide + buffered appends. [auto_flush_threshold] is the soft cap after which the + buffer should be flushed. When a call to [append_exn] fills the buffer, + either the buffer will be flushed automatically, if + [auto_flush_procedure = `Internal], or the supplied external function [f] + will be called, if [auto_flush_procedure = `External f]. *) + +val open_ro : + path:string -> + end_poff:int63 -> + dead_header_size:int -> + ( t, + [> Io.open_error + | `Closed + | `Inconsistent_store + | `Invalid_argument + | `Read_out_of_bounds ] ) + result +(** Create a ro instance of [t] by opening an existing file at [path] *) + +val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result +(** Close the underlying file. + + The internal buffer is expected to be in a flushed state when [close] is + called. Otherwise, an error is returned. *) + +val end_poff : t -> int63 +(** [end_poff t] is the number of bytes of the file. That function doesn't + perform IO. + + {3 RW mode} + + It also counts the bytes not flushed yet. + + {3 RO mode} + + This information originates from the latest reload of the control file. + Calling [refresh_end_poff t] updates [end_poff]. *) + +val read_to_string : + t -> off:int63 -> len:int -> (string, [> Io.read_error ]) result + +val read_exn : t -> off:int63 -> len:int -> bytes -> unit +(** [read_exn t ~off ~len b] puts the [len] bytes of [t] at [off] to [b]. + + [read_to_string] should always be favored over [read_exn], except when + performences matter. + + It is not possible to read from an offset further than [end_poff t]. + + Raises [Io.Read_error] and [Errors.Pack_error `Read_out_of_bounds]. + + {3 RW mode} + + Attempting to read from the append buffer results in an + [`Read_out_of_bounds] error. This feature could easily be implemented in + the future if ever needed. It was not needed with io_legacy. *) + +val append_exn : t -> string -> unit +(** [append_exn t ~off b] writes [b] to the end of [t]. Might trigger an auto + flush. + + Grows [end_poff], but the parent abstraction is expected to persist this + somewhere (e.g. in the control file). + + Post-condition: [end_poff t - end_poff (old t) = String.length b]. + + Raises [Io.Write_error] + + {3 RW mode} + + Always raises [Errors.RO_not_allowed] *) + +val flush : t -> (unit, [> Io.write_error ]) result +(** Flush the append buffer. Does not call [fsync]. + + {3 RO mode} + + Always returns [Error `Ro_not_allowed]. *) + +val fsync : t -> (unit, [> Io.write_error ]) result +(** Tell the os to fush its internal buffers. Does not call [flush]. + + {3 RO mode} + + Always returns [Error `Ro_not_allowed]. *) + +val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result +(** Ingest the new end offset of the file. Typically happens in RO mode when + the control file has been re-read. + + {3 RW mode} + + Always returns [Error `Rw_not_allowed]. *) + +val readonly : t -> bool +val auto_flush_threshold : t -> int option +val empty_buffer : t -> bool +val path : t -> string diff --git a/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml b/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml deleted file mode 100644 index 96071a50166f..000000000000 --- a/brassaia/lib_brassaia_pack/unix/append_only_file_intf.ml +++ /dev/null @@ -1,185 +0,0 @@ -(* - * Copyright (c) 2022-2022 Tarides - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open Import -module Io = Io.Unix - -module type S = sig - (** Abstraction for brassaia-pack's append only files (i.e. suffix and dict). - - It is parameterized with [Io], a file system abstraction (e.g. unix, - mirage, eio_linux). - - It comprises a persistent file, an append buffer and take care of - automatically shifting offsets to deal with legacy file headers. *) - - type t - - type auto_flush_procedure = [ `Internal | `External of t -> unit ] - (** [auto_flush_procedure] defines behavior when the flush threshold is - reached. - - - Use [`Internal] to have the buffer automatically flushed. - - Use [`External f] to have [f] called when the flush threshold is - reached. It is the responsibility of [f] to call flush, in addition to - any other processing it does. *) - - val create_rw : - path:string -> - overwrite:bool -> - auto_flush_threshold:int -> - auto_flush_procedure:auto_flush_procedure -> - (t, [> Io.create_error ]) result - (** Create a rw instance of [t] by creating the file at [path]. *) - - val open_rw : - path:string -> - end_poff:int63 -> - dead_header_size:int -> - auto_flush_threshold:int -> - auto_flush_procedure:auto_flush_procedure -> - ( t, - [> Io.open_error - | `Closed - | `Invalid_argument - | `Read_out_of_bounds - | `Inconsistent_store ] ) - result - (** Create a rw instance of [t] by opening an existing file at [path]. - - {3 End Offset} - - The file has an end offset at which new data will be saved. While this - information could be computed by looking at the size of the file, we - prefer storing that information elsewhere (i.e. in the control file). This - is why [open_rw] and [open_ro] take an [end_poff] parameter, and also why - [refresh_end_poff] exists. The abstractions above [Append_only_file] are - responsible for reading/writing the offsets from/to the control file. - - {3 [dead_header_size]} - - Designates a small area at the beginning of the file that should be - ignored. The offsets start after that area. - - The actual persisted size of a file is [end_poff + dead_header_size]. - - This concept exists in order to keep supporting [`V1] and [`V2] pack - stores with [`V3]. - - {3 Auto Flushes} - - One of the goals of the [Append_only_file] abstraction is to provide - buffered appends. [auto_flush_threshold] is the soft cap after which the - buffer should be flushed. When a call to [append_exn] fills the buffer, - either the buffer will be flushed automatically, if - [auto_flush_procedure = `Internal], or the supplied external function [f] - will be called, if [auto_flush_procedure = `External f]. *) - - val open_ro : - path:string -> - end_poff:int63 -> - dead_header_size:int -> - ( t, - [> Io.open_error - | `Closed - | `Inconsistent_store - | `Invalid_argument - | `Read_out_of_bounds ] ) - result - (** Create a ro instance of [t] by opening an existing file at [path] *) - - val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result - (** Close the underlying file. - - The internal buffer is expected to be in a flushed state when [close] is - called. Otherwise, an error is returned. *) - - val end_poff : t -> int63 - (** [end_poff t] is the number of bytes of the file. That function doesn't - perform IO. - - {3 RW mode} - - It also counts the bytes not flushed yet. - - {3 RO mode} - - This information originates from the latest reload of the control file. - Calling [refresh_end_poff t] updates [end_poff]. *) - - val read_to_string : - t -> off:int63 -> len:int -> (string, [> Io.read_error ]) result - - val read_exn : t -> off:int63 -> len:int -> bytes -> unit - (** [read_exn t ~off ~len b] puts the [len] bytes of [t] at [off] to [b]. - - [read_to_string] should always be favored over [read_exn], except when - performences matter. - - It is not possible to read from an offset further than [end_poff t]. - - Raises [Io.Read_error] and [Errors.Pack_error `Read_out_of_bounds]. - - {3 RW mode} - - Attempting to read from the append buffer results in an - [`Read_out_of_bounds] error. This feature could easily be implemented in - the future if ever needed. It was not needed with io_legacy. *) - - val append_exn : t -> string -> unit - (** [append_exn t ~off b] writes [b] to the end of [t]. Might trigger an auto - flush. - - Grows [end_poff], but the parent abstraction is expected to persist this - somewhere (e.g. in the control file). - - Post-condition: [end_poff t - end_poff (old t) = String.length b]. - - Raises [Io.Write_error] - - {3 RW mode} - - Always raises [Errors.RO_not_allowed] *) - - val flush : t -> (unit, [> Io.write_error ]) result - (** Flush the append buffer. Does not call [fsync]. - - {3 RO mode} - - Always returns [Error `Ro_not_allowed]. *) - - val fsync : t -> (unit, [> Io.write_error ]) result - (** Tell the os to fush its internal buffers. Does not call [flush]. - - {3 RO mode} - - Always returns [Error `Ro_not_allowed]. *) - - val refresh_end_poff : t -> int63 -> (unit, [> `Rw_not_allowed ]) result - (** Ingest the new end offset of the file. Typically happens in RO mode when - the control file has been re-read. - - {3 RW mode} - - Always returns [Error `Rw_not_allowed]. *) - - val readonly : t -> bool - val auto_flush_threshold : t -> int option - val empty_buffer : t -> bool - val path : t -> string -end - -module type Sigs = S diff --git a/brassaia/lib_brassaia_pack/unix/chunked_suffix.mli b/brassaia/lib_brassaia_pack/unix/chunked_suffix.mli index 36e443f191bc..50553d5cd43b 100644 --- a/brassaia/lib_brassaia_pack/unix/chunked_suffix.mli +++ b/brassaia/lib_brassaia_pack/unix/chunked_suffix.mli @@ -14,5 +14,118 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -include Chunked_suffix_intf.Sigs -(** @inline *) +open Import + +(** Abstraction for a chunked suffix. It is functionally equivalent to + {!Append_only_file} but with a chunked implementation that is + parameterized by + + - [start_idx] for {!create_rw} to know the starting file name, and + - [start_idx] and [chunk_num] for the open functions to know the starting + file name and how many files there are. *) + +module Io = Io.Unix +module Ao = Append_only_file + +type t +type create_error = Io.create_error + +type open_error = + [ Io.open_error + | `Closed + | `Invalid_argument + | `Inconsistent_store + | `Read_out_of_bounds ] + +type add_new_error = + [ open_error + | Io.close_error + | `Pending_flush + | `File_exists of string + | `Multiple_empty_chunks ] + +val create_rw : + root:string -> + start_idx:int -> + overwrite:bool -> + auto_flush_threshold:int -> + auto_flush_procedure:Ao.auto_flush_procedure -> + (t, [> create_error ]) result + +val open_rw : + root:string -> + appendable_chunk_poff:int63 -> + start_idx:int -> + chunk_num:int -> + dead_header_size:int -> + auto_flush_threshold:int -> + auto_flush_procedure:Ao.auto_flush_procedure -> + (t, [> open_error ]) result + +val open_ro : + root:string -> + appendable_chunk_poff:int63 -> + dead_header_size:int -> + start_idx:int -> + chunk_num:int -> + (t, [> open_error ]) result + +val add_chunk : + auto_flush_threshold:int -> + auto_flush_procedure:Ao.auto_flush_procedure -> + t -> + (unit, [> add_new_error ]) result + +val start_idx : t -> int +val chunk_num : t -> int +val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result +val empty_buffer : t -> bool +val flush : t -> (unit, [> Io.write_error ]) result +val fsync : t -> (unit, [> Io.write_error ]) result + +val appendable_chunk_poff : t -> int63 +(** [appendable_chunk_poff t] is the number of bytes of the chunk file that is + currently appendable. It does not perform IO. + + {3 RW mode} + + It also counts the bytes not flushed yet. + + {3 RO mode} + + This information originates from the latest reload of the control file. + Calling [refresh_appendable_chunk_poff t] updates [appendable_chunk_poff]. *) + +val refresh_appendable_chunk_poff : + t -> int63 -> (unit, [> `Rw_not_allowed ]) result +(** Ingest the new end offset of the appendable chunk file. Typically happens + in RO mode when the control file has been re-read. + + {3 RW mode} + + Always returns [Error `Rw_not_allowed]. *) + +val end_soff : t -> int63 +(** [end_soff t] is the end offset for the chunked suffix. The valid range of + offsets is 0 <= off < end_soff. Therefore, [end_soff] also represents the + length of the chunked suffix. *) + +val read_exn : t -> off:int63 -> len:int -> bytes -> unit + +val read_range_exn : + t -> off:int63 -> min_len:int -> max_len:int -> bytes -> int + +val append_exn : t -> string -> unit +val readonly : t -> bool +val auto_flush_threshold : t -> int option + +val fold_chunks : + (acc:'a -> + idx:int -> + start_suffix_off:int63 -> + end_suffix_off:int63 -> + is_appendable:bool -> + 'a) -> + 'a -> + t -> + 'a diff --git a/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml b/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml deleted file mode 100644 index ec46d5e4b314..000000000000 --- a/brassaia/lib_brassaia_pack/unix/chunked_suffix_intf.ml +++ /dev/null @@ -1,135 +0,0 @@ -(* - * Copyright (c) 2022-2022 Tarides - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open Import - -module type S = sig - (** Abstraction for a chunked suffix. It is functionally equivalent to - {!Append_only_file} but with a chunked implementation that is - parameterized by - - - [start_idx] for {!create_rw} to know the starting file name, and - - [start_idx] and [chunk_num] for the open functions to know the starting - file name and how many files there are. *) - - module Io = Io.Unix - module Ao = Append_only_file - - type t - type create_error = Io.create_error - - type open_error = - [ Io.open_error - | `Closed - | `Invalid_argument - | `Inconsistent_store - | `Read_out_of_bounds ] - - type add_new_error = - [ open_error - | Io.close_error - | `Pending_flush - | `File_exists of string - | `Multiple_empty_chunks ] - - val create_rw : - root:string -> - start_idx:int -> - overwrite:bool -> - auto_flush_threshold:int -> - auto_flush_procedure:Ao.auto_flush_procedure -> - (t, [> create_error ]) result - - val open_rw : - root:string -> - appendable_chunk_poff:int63 -> - start_idx:int -> - chunk_num:int -> - dead_header_size:int -> - auto_flush_threshold:int -> - auto_flush_procedure:Ao.auto_flush_procedure -> - (t, [> open_error ]) result - - val open_ro : - root:string -> - appendable_chunk_poff:int63 -> - dead_header_size:int -> - start_idx:int -> - chunk_num:int -> - (t, [> open_error ]) result - - val add_chunk : - auto_flush_threshold:int -> - auto_flush_procedure:Ao.auto_flush_procedure -> - t -> - (unit, [> add_new_error ]) result - - val start_idx : t -> int - val chunk_num : t -> int - val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result - val empty_buffer : t -> bool - val flush : t -> (unit, [> Io.write_error ]) result - val fsync : t -> (unit, [> Io.write_error ]) result - - val appendable_chunk_poff : t -> int63 - (** [appendable_chunk_poff t] is the number of bytes of the chunk file that is - currently appendable. It does not perform IO. - - {3 RW mode} - - It also counts the bytes not flushed yet. - - {3 RO mode} - - This information originates from the latest reload of the control file. - Calling [refresh_appendable_chunk_poff t] updates [appendable_chunk_poff]. *) - - val refresh_appendable_chunk_poff : - t -> int63 -> (unit, [> `Rw_not_allowed ]) result - (** Ingest the new end offset of the appendable chunk file. Typically happens - in RO mode when the control file has been re-read. - - {3 RW mode} - - Always returns [Error `Rw_not_allowed]. *) - - val end_soff : t -> int63 - (** [end_soff t] is the end offset for the chunked suffix. The valid range of - offsets is 0 <= off < end_soff. Therefore, [end_soff] also represents the - length of the chunked suffix. *) - - val read_exn : t -> off:int63 -> len:int -> bytes -> unit - - val read_range_exn : - t -> off:int63 -> min_len:int -> max_len:int -> bytes -> int - - val append_exn : t -> string -> unit - val readonly : t -> bool - val auto_flush_threshold : t -> int option - - val fold_chunks : - (acc:'a -> - idx:int -> - start_suffix_off:int63 -> - end_suffix_off:int63 -> - is_appendable:bool -> - 'a) -> - 'a -> - t -> - 'a -end - -module type Sigs = S diff --git a/brassaia/lib_brassaia_pack/unix/io.mli b/brassaia/lib_brassaia_pack/unix/io.mli index f6ec8493a227..e43cd7ef8c9d 100644 --- a/brassaia/lib_brassaia_pack/unix/io.mli +++ b/brassaia/lib_brassaia_pack/unix/io.mli @@ -13,6 +13,143 @@ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +open Import -include Io_intf.Sigs -(** @inline *) +module Unix : sig + (** Low level IO abstraction. A typical implementation is unix. This + abstraction is meant to be dead simple. Not a lot of documentation is + required. + + It is not resistant to race condictions. There should not be concurrent + modifications of the files. + + These functions are essentially invoking the underlying functions from + {!Unix} directly; there is no buffering for example. *) + + type t + + (** {1 Errors} *) + + type misc_error = Unix.error * string * string [@@deriving brassaia] + (** An abstract error type that contains the IO-backend specific errors. (e.g. + [Unix.error]) *) + + type create_error = [ `Io_misc of misc_error | `File_exists of string ] + + type open_error = + [ `Io_misc of misc_error + | `No_such_file_or_directory of string + | `Not_a_file ] + + type read_error = + [ `Io_misc of misc_error + | `Read_out_of_bounds + | `Closed + | `Invalid_argument ] + + type write_error = [ `Io_misc of misc_error | `Ro_not_allowed | `Closed ] + type close_error = [ `Io_misc of misc_error | `Double_close ] + + type mkdir_error = + [ `Io_misc of misc_error + | `File_exists of string + | `No_such_file_or_directory of string + | `Invalid_parent_directory ] + + (** {1 Safe Functions} + + None of the functions in this section raise exceptions. They may however + perform effects that are always continued. + + {2 Life Cycle} *) + + val create : path:string -> overwrite:bool -> (t, [> create_error ]) result + val open_ : path:string -> readonly:bool -> (t, [> open_error ]) result + val close : t -> (unit, [> close_error ]) result + + (** {2 Write Functions} *) + + val write_string : t -> off:int63 -> string -> (unit, [> write_error ]) result + (** [write_string t ~off s] writes [s] at [offset] in [t]. *) + + val fsync : t -> (unit, [> write_error ]) result + (** [fsync t] persists to the file system the effects of previous [create] or + write. *) + + val move_file : + src:string -> dst:string -> (unit, [> `Sys_error of string ]) result + + val copy_file : + src:string -> dst:string -> (unit, [> `Sys_error of string ]) result + + val mkdir : string -> (unit, [> mkdir_error ]) result + val unlink : string -> (unit, [> `Sys_error of string ]) result + + val unlink_dont_wait : on_exn:(exn -> unit) -> string -> unit + (** [unlink_dont_wait file] attempts to unlink the named file but doesn't wait + for the completion of the unlink operation. + + If unlink raises an exception it is passed to [on_exn]. *) + + (** {2 Read Functions} *) + + val read_to_string : + t -> off:int63 -> len:int -> (string, [> read_error ]) result + (** [read_to_string t ~off ~len] are the [len] bytes of [t] at [off]. *) + + val read_all_to_string : + t -> (string, [> `Io_misc of misc_error | `Closed ]) result + (** [read_to_string t] is the contents full contents of the file. + + The individual pages are not guaranteed to be read atomically. *) + + val read_size : t -> (int63, [> read_error ]) result + (** [read_size t] is the number of bytes of the file handled by [t]. + + This function is expensive in the unix implementation because it performs + syscalls. *) + + val size_of_path : + string -> + ( int63, + [> `Io_misc of misc_error + | `No_such_file_or_directory of string + | `Not_a_file ] ) + result + + val classify_path : + string -> [> `File | `Directory | `No_such_file_or_directory | `Other ] + + (** {1 MISC.} *) + + val readonly : t -> bool + val path : t -> string + val page_size : int + + (** {1 Unsafe Functions} + + These functions are equivalents to exising safe ones, but using exceptions + instead of the result monad for performances reasons. *) + + val read_exn : t -> off:int63 -> len:int -> bytes -> unit + (** [read_exn t ~off ~len b] reads the [len] bytes of [t] at [off] to [b]. + + Raises [Errors.Pack_error] and [Errors.RO_not_allowed]. + + Also raises backend-specific exceptions (e.g. [Unix.Unix_error] for the + unix backend). *) + + val write_exn : t -> off:int63 -> len:int -> string -> unit + (** [write_exn t ~off ~len b] writes the first [len] bytes pf [b] to [t] at + offset [off]. + + Raises [Errors.Pack_error] and [Errors.RO_not_allowed]. + + Also raises backend-specific exceptions (e.g. [Unix.Unix_error] for the + unix backend). *) + + val raise_misc_error : misc_error -> 'a + + val catch_misc_error : + (unit -> 'a) -> ('a, [> `Io_misc of misc_error ]) result +end diff --git a/brassaia/lib_brassaia_pack/unix/io_intf.ml b/brassaia/lib_brassaia_pack/unix/io_intf.ml deleted file mode 100644 index 965b01ffa326..000000000000 --- a/brassaia/lib_brassaia_pack/unix/io_intf.ml +++ /dev/null @@ -1,160 +0,0 @@ -(* - * Copyright (c) 2022-2023 Tarides - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open Import - -module type S = sig - (** Low level IO abstraction. A typical implementation is unix. This - abstraction is meant to be dead simple. Not a lot of documentation is - required. - - It is not resistant to race condictions. There should not be concurrent - modifications of the files. - - These functions are essentially invoking the underlying functions from - {!Unix} directly; there is no buffering for example. *) - - type t - - (** {1 Errors} *) - - type misc_error [@@deriving brassaia] - (** An abstract error type that contains the IO-backend specific errors. (e.g. - [Unix.error]) *) - - type create_error = [ `Io_misc of misc_error | `File_exists of string ] - - type open_error = - [ `Io_misc of misc_error - | `No_such_file_or_directory of string - | `Not_a_file ] - - type read_error = - [ `Io_misc of misc_error - | `Read_out_of_bounds - | `Closed - | `Invalid_argument ] - - type write_error = [ `Io_misc of misc_error | `Ro_not_allowed | `Closed ] - type close_error = [ `Io_misc of misc_error | `Double_close ] - - type mkdir_error = - [ `Io_misc of misc_error - | `File_exists of string - | `No_such_file_or_directory of string - | `Invalid_parent_directory ] - - (** {1 Safe Functions} - - None of the functions in this section raise exceptions. They may however - perform effects that are always continued. - - {2 Life Cycle} *) - - val create : path:string -> overwrite:bool -> (t, [> create_error ]) result - val open_ : path:string -> readonly:bool -> (t, [> open_error ]) result - val close : t -> (unit, [> close_error ]) result - - (** {2 Write Functions} *) - - val write_string : t -> off:int63 -> string -> (unit, [> write_error ]) result - (** [write_string t ~off s] writes [s] at [offset] in [t]. *) - - val fsync : t -> (unit, [> write_error ]) result - (** [fsync t] persists to the file system the effects of previous [create] or - write. *) - - val move_file : - src:string -> dst:string -> (unit, [> `Sys_error of string ]) result - - val copy_file : - src:string -> dst:string -> (unit, [> `Sys_error of string ]) result - - val mkdir : string -> (unit, [> mkdir_error ]) result - val unlink : string -> (unit, [> `Sys_error of string ]) result - - val unlink_dont_wait : on_exn:(exn -> unit) -> string -> unit - (** [unlink_dont_wait file] attempts to unlink the named file but doesn't wait - for the completion of the unlink operation. - - If unlink raises an exception it is passed to [on_exn]. *) - - (** {2 Read Functions} *) - - val read_to_string : - t -> off:int63 -> len:int -> (string, [> read_error ]) result - (** [read_to_string t ~off ~len] are the [len] bytes of [t] at [off]. *) - - val read_all_to_string : - t -> (string, [> `Io_misc of misc_error | `Closed ]) result - (** [read_to_string t] is the contents full contents of the file. - - The individual pages are not guaranteed to be read atomically. *) - - val read_size : t -> (int63, [> read_error ]) result - (** [read_size t] is the number of bytes of the file handled by [t]. - - This function is expensive in the unix implementation because it performs - syscalls. *) - - val size_of_path : - string -> - ( int63, - [> `Io_misc of misc_error - | `No_such_file_or_directory of string - | `Not_a_file ] ) - result - - val classify_path : - string -> [> `File | `Directory | `No_such_file_or_directory | `Other ] - - (** {1 MISC.} *) - - val readonly : t -> bool - val path : t -> string - val page_size : int - - (** {1 Unsafe Functions} - - These functions are equivalents to exising safe ones, but using exceptions - instead of the result monad for performances reasons. *) - - val read_exn : t -> off:int63 -> len:int -> bytes -> unit - (** [read_exn t ~off ~len b] reads the [len] bytes of [t] at [off] to [b]. - - Raises [Errors.Pack_error] and [Errors.RO_not_allowed]. - - Also raises backend-specific exceptions (e.g. [Unix.Unix_error] for the - unix backend). *) - - val write_exn : t -> off:int63 -> len:int -> string -> unit - (** [write_exn t ~off ~len b] writes the first [len] bytes pf [b] to [t] at - offset [off]. - - Raises [Errors.Pack_error] and [Errors.RO_not_allowed]. - - Also raises backend-specific exceptions (e.g. [Unix.Unix_error] for the - unix backend). *) - - val raise_misc_error : misc_error -> 'a - - val catch_misc_error : - (unit -> 'a) -> ('a, [> `Io_misc of misc_error ]) result -end - -module type Sigs = sig - module Unix : S with type misc_error = Unix.error * string * string -end diff --git a/brassaia/lib_brassaia_pack/unix/lower.mli b/brassaia/lib_brassaia_pack/unix/lower.mli index 5a6c0421be64..be7805598128 100644 --- a/brassaia/lib_brassaia_pack/unix/lower.mli +++ b/brassaia/lib_brassaia_pack/unix/lower.mli @@ -13,9 +13,157 @@ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) - -include Lower_intf.S +open Import +module Io = Io.Unix +module Errs : Io_errors.S type volume_identifier = string [@@deriving brassaia] -(** @inline *) +module Volume : sig + type t + + type open_error = + [ Io.open_error + | `Closed + | `Double_close + | `Corrupted_control_file of string + | `Unknown_major_pack_version of string ] + + val open_volume : string -> (t, [> open_error ]) result + (** [open_volume path] loads the volume at [path] in read-only. *) + + val path : t -> string + (** [path t] is the directory that contains the volume. *) + + val is_empty : t -> bool + (** [is_empty t] returns whether [t] is empty or not. *) + + val control : t -> Control_file.Payload.Volume.Latest.t option + (** [control t] returns the control file payload for the volume. *) + + val identifier : t -> volume_identifier + (** [identifier t] is a unique idendifier for the volume. *) +end + +type t +type open_error = [ Volume.open_error | `Volume_missing of string ] +type close_error = [ | Io.close_error ] + +type add_error = + [ open_error + | `Ro_not_allowed + | `Multiple_empty_volumes + | `File_exists of string + | `Invalid_parent_directory ] + +val open_volumes : + readonly:bool -> volume_num:int -> string -> (t, [> open_error ]) result +(** [open_volumes ~readonly ~volume_num lower_root] loads all volumes located in the + directory [lower_root]. + + [volume_num] is the number of volumes that are expected in [lower_root]. 0 + is valid for an empty lower. + + [Error `Volume_missing path] is returned if an expected volume is not on + disk, with the path to first volume that is missing. This can happen if + [volume_num] is larger than the number of volumes on disk, or if one of + the volume directories has been renamed accidentally. + + If [readonly] is false, no write operations are allowed. *) + +val reload : volume_num:int -> t -> (unit, [> open_error ]) result +(** [reload ~volume_num t] reloads volumes located in the root directory of + [t], using [volume_num] as the expected number of volumes. *) + +val close : t -> (unit, [> close_error ]) result +(** [close t] closes all resources opened by [t]. *) + +val volume_num : t -> int +(** [volume_num t] returns the number of volumes in the lower [t]. *) + +val add_volume : t -> (Volume.t, [> add_error ]) result +(** [add_volume t] adds a new empty volume to [t]. + + If there is already an empty volume, [Error `Multiple_empty_volumes] is + returned. Only one empty volume is allowed. + + If [t] is read-only, [Error `Ro_not_allowed] is returned. *) + +val find_volume : off:int63 -> t -> Volume.t option +(** [find_volume ~off t] returns the {!Volume} that contains [off]. *) + +val read_exn : + off:int63 -> + len:int -> + ?volume:volume_identifier -> + t -> + bytes -> + volume_identifier +(** [read_exn ~off ~len ~volume t b] will read [len] bytes from a global [off] + located in the volume with identifier [volume]. The volume identifier of + the volume where the read occurs is returned. + + If [volume] is not provided, {!find_volume} will be used to attempt to + locate the correct volume for the read. *) + +val set_readonly : t -> bool -> unit +(** [set_readonly t flag] changes the writing permission of the lower layer + (where [true] is read only). This should only be called by the GC worker + to temporarily allow RW before calling {!archive_seq_exn}. *) + +val archive_seq_exn : + upper_root:string -> + generation:int -> + to_archive:(int63 * string Seq.t) list -> + t -> + volume_identifier +(** [archive_seq ~upper_root ~generation ~to_archive t] is called by the GC + worker during the creation of the new [generation] to archive [to_archive] + in the lower layer and returns the identifier of the volume where data was + appended. + + It is the only write operation allowed on the lower layer, and it makes no + observable change as the control file is left untouched : instead new + changes are written to volume.gen.control, which is swapped during GC + finalization. *) + +val read_range_exn : + off:int63 -> + min_len:int -> + max_len:int -> + ?volume:volume_identifier -> + t -> + bytes -> + int * volume_identifier +(** Same as [read_exn] but will read at least [min_len] bytes and at most + [max_len]. Returns the read length and the volume identifier from which + the data was fetched. *) + +type create_error := + [ open_error | close_error | add_error | `Sys_error of string ] + +val create_from : + src:string -> + dead_header_size:int -> + size:Int63.t -> + string -> + (unit, [> create_error ]) result +(** [create_from ~src ~dead_header_size ~size lower_root] initializes the + first lower volume in the directory [lower_root] by moving the suffix file + [src] with end offset [size]. *) + +val swap : + volume:volume_identifier -> + generation:int -> + volume_num:int -> + t -> + ( unit, + [> `Volume_not_found of string | `Sys_error of string | open_error ] ) + result +(** [swap ~volume ~generation ~volume_num t] will rename a new volume control + file in [volume] for [generation] of GC and then reload the lower with + [volume_num] volumes. *) + +val cleanup : generation:int -> t -> (unit, [> `Sys_error of string ]) result +(** [cleanup ~generation t] will attempt to cleanup the appendable volume if a + GC crash has occurred. *) diff --git a/brassaia/lib_brassaia_pack/unix/lower_intf.ml b/brassaia/lib_brassaia_pack/unix/lower_intf.ml deleted file mode 100644 index 2b25b8afa8ea..000000000000 --- a/brassaia/lib_brassaia_pack/unix/lower_intf.ml +++ /dev/null @@ -1,179 +0,0 @@ -(* - * Copyright (c) 2023 Tarides - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open! Import - -type volume_identifier = string [@@deriving brassaia] - -module type Volume = sig - module Io = Io.Unix - module Errs = Io_errors - module Sparse = Sparse_file - - type t - - type open_error = - [ Io.open_error - | `Closed - | `Double_close - | `Corrupted_control_file of string - | `Unknown_major_pack_version of string ] - - val open_volume : string -> (t, [> open_error ]) result - (** [open_volume path] loads the volume at [path] in read-only. *) - - val path : t -> string - (** [path t] is the directory that contains the volume. *) - - val is_empty : t -> bool - (** [is_empty t] returns whether [t] is empty or not. *) - - val control : t -> Control_file.Payload.Volume.Latest.t option - (** [control t] returns the control file payload for the volume. *) - - val identifier : t -> volume_identifier - (** [identifier t] is a unique idendifier for the volume. *) -end - -module type S = sig - module Io = Io.Unix - module Errs : Io_errors.S - module Volume : Volume - - type t - type open_error = [ Volume.open_error | `Volume_missing of string ] - type close_error = [ | Io.close_error ] - type nonrec volume_identifier = volume_identifier [@@deriving brassaia] - - type add_error = - [ open_error - | `Ro_not_allowed - | `Multiple_empty_volumes - | `File_exists of string - | `Invalid_parent_directory ] - - val open_volumes : - readonly:bool -> volume_num:int -> string -> (t, [> open_error ]) result - (** [open_volumes ~readonly ~volume_num lower_root] loads all volumes located in the - directory [lower_root]. - - [volume_num] is the number of volumes that are expected in [lower_root]. 0 - is valid for an empty lower. - - [Error `Volume_missing path] is returned if an expected volume is not on - disk, with the path to first volume that is missing. This can happen if - [volume_num] is larger than the number of volumes on disk, or if one of - the volume directories has been renamed accidentally. - - If [readonly] is false, no write operations are allowed. *) - - val reload : volume_num:int -> t -> (unit, [> open_error ]) result - (** [reload ~volume_num t] reloads volumes located in the root directory of - [t], using [volume_num] as the expected number of volumes. *) - - val close : t -> (unit, [> close_error ]) result - (** [close t] closes all resources opened by [t]. *) - - val volume_num : t -> int - (** [volume_num t] returns the number of volumes in the lower [t]. *) - - val add_volume : t -> (Volume.t, [> add_error ]) result - (** [add_volume t] adds a new empty volume to [t]. - - If there is already an empty volume, [Error `Multiple_empty_volumes] is - returned. Only one empty volume is allowed. - - If [t] is read-only, [Error `Ro_not_allowed] is returned. *) - - val find_volume : off:int63 -> t -> Volume.t option - (** [find_volume ~off t] returns the {!Volume} that contains [off]. *) - - val read_exn : - off:int63 -> - len:int -> - ?volume:volume_identifier -> - t -> - bytes -> - volume_identifier - (** [read_exn ~off ~len ~volume t b] will read [len] bytes from a global [off] - located in the volume with identifier [volume]. The volume identifier of - the volume where the read occurs is returned. - - If [volume] is not provided, {!find_volume} will be used to attempt to - locate the correct volume for the read. *) - - val set_readonly : t -> bool -> unit - (** [set_readonly t flag] changes the writing permission of the lower layer - (where [true] is read only). This should only be called by the GC worker - to temporarily allow RW before calling {!archive_seq_exn}. *) - - val archive_seq_exn : - upper_root:string -> - generation:int -> - to_archive:(int63 * string Seq.t) list -> - t -> - volume_identifier - (** [archive_seq ~upper_root ~generation ~to_archive t] is called by the GC - worker during the creation of the new [generation] to archive [to_archive] - in the lower layer and returns the identifier of the volume where data was - appended. - - It is the only write operation allowed on the lower layer, and it makes no - observable change as the control file is left untouched : instead new - changes are written to volume.gen.control, which is swapped during GC - finalization. *) - - val read_range_exn : - off:int63 -> - min_len:int -> - max_len:int -> - ?volume:volume_identifier -> - t -> - bytes -> - int * volume_identifier - (** Same as [read_exn] but will read at least [min_len] bytes and at most - [max_len]. Returns the read length and the volume identifier from which - the data was fetched. *) - - type create_error := - [ open_error | close_error | add_error | `Sys_error of string ] - - val create_from : - src:string -> - dead_header_size:int -> - size:Int63.t -> - string -> - (unit, [> create_error ]) result - (** [create_from ~src ~dead_header_size ~size lower_root] initializes the - first lower volume in the directory [lower_root] by moving the suffix file - [src] with end offset [size]. *) - - val swap : - volume:volume_identifier -> - generation:int -> - volume_num:int -> - t -> - ( unit, - [> `Volume_not_found of string | `Sys_error of string | open_error ] ) - result - (** [swap ~volume ~generation ~volume_num t] will rename a new volume control - file in [volume] for [generation] of GC and then reload the lower with - [volume_num] volumes. *) - - val cleanup : generation:int -> t -> (unit, [> `Sys_error of string ]) result - (** [cleanup ~generation t] will attempt to cleanup the appendable volume if a - GC crash has occurred. *) -end diff --git a/brassaia/lib_brassaia_pack/unix/sparse_file.mli b/brassaia/lib_brassaia_pack/unix/sparse_file.mli index c950f74a4915..7e0adc31cc2a 100644 --- a/brassaia/lib_brassaia_pack/unix/sparse_file.mli +++ b/brassaia/lib_brassaia_pack/unix/sparse_file.mli @@ -13,6 +13,134 @@ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +open! Import +module Io = Io.Unix +module Errs = Io_errors -include Sparse_file_intf.Sigs -(** @inline *) +type t +type open_error := [ Io.open_error | `Corrupted_mapping_file of string ] + +val open_ro : + mapping_size:int -> + mapping:string -> + data:string -> + (t, [> open_error ]) result +(** [open_ro ~mapping_size ~mapping ~data] returns a new read-only view of the + sparse file, represented on disk by two files named [mapping] and [data]. + The mapping file is expected to have size at least [mapping_size] (and the + rest is ignored if the file is larger). *) + +val close : t -> (unit, [> Io.close_error ]) result +(** Close the underlying files. *) + +val read_exn : t -> off:int63 -> len:int -> bytes -> unit +(** [read_exn t ~off ~len buffer] writes into [buffer] the bytes from [off] to + [off+len]. *) + +val read_range_exn : + t -> off:int63 -> min_len:int -> max_len:int -> bytes -> int +(** Same as [read_exn], the amount read is [max_len] if possible or at least + [min_len] if reading more would step over a hole in the sparse file. + + Returns the actually read length. *) + +val next_valid_offset : t -> off:int63 -> int63 option +(** [next_valid_offset t ~off] returns [Some off'] such that [off'] is the + smallest readable offset larger or equal to [off]. This enables jumping + over a sparse hole to the next compact range of data. *) + +val iter : t -> (off:int63 -> len:int -> unit) -> (unit, Errs.t) result +(** [iter t f] calls [f] on each [(off,len)] pair in [mapping]. Only used for + testing and debugging. + + It is guaranteed for the offsets to be iterated in monotonic order. + + It is guaranteed that entries don't overlap. + + The exceptions raised by [f] are caught and returned (as long as they are + known by [Errs]). *) + +module Wo : sig + type t + + val open_wo : + mapping_size:int -> + mapping:string -> + data:string -> + (t, [> open_error ]) result + (** [open_wo ~mapping_size ~mapping ~data] returns a write-only instance of + the sparse file. + + Note: This is unsafe and is only used by the GC to mark the parent + commits as dangling. One must ensure that no read-only instance is + opened at the same time, as otherwise the writes would be observable by + it. *) + + val write_exn : t -> off:int63 -> len:int -> string -> unit + (** [write_exn t ~off ~len str] writes the first [len] bytes of [str] to [t] + at the virtual offset [off]. *) + + val fsync : t -> (unit, [> Io.write_error ]) result + (** [fsync t] persists to the file system the effects of previous writes. *) + + val close : t -> (unit, [> Io.close_error ]) result + (** Close the underlying files. *) + + val create_from_data : + mapping:string -> + dead_header_size:int -> + size:Int63.t -> + data:string -> + (int63, [> Io.create_error | Io.write_error | Io.close_error ]) result + (** [create_from_data ~mapping ~dead_header_size ~size ~data] initializes a + new sparse file on disk from the existing file [data], by creating the + corresponding [mapping] file. The first [dead_header_size] bytes are + ignored and the remaining [size] bytes of [data] are made available. + + On success, returns the size of the [mapping] file to be stored in the + control file for consistency checking on open. *) +end + +module Ao : sig + type t + + val end_off : t -> Int63.t + (** [end_off t] returns the largest virtual offset contained in the sparse + file [t]. Attempting to append with a strictly smaller virtual offset + will fail. *) + + val mapping_size : t -> Int63.t + (** [end_off t] returns the current size of the mapping file associated to + the sparse file [t] including additions not yet flushed to the file + system. It can be passed to {!open_ao} as [mapping_size] when opening + the file again. *) + + val create : mapping:string -> data:string -> (t, [> Io.create_error ]) result + (** [create ~mapping ~data] initializes a new empty sparse file, represented + on disk by two files named [mapping] and [data]. *) + + val open_ao : + mapping_size:Int63.t -> + mapping:string -> + data:string -> + ( t, + [> Io.open_error + | `Closed + | `Invalid_argument + | `Read_out_of_bounds + | `Inconsistent_store ] ) + result + (** [open_ao ~mapping_size ~mapping ~data] returns an append-only instance + of the sparse file. *) + + val append_seq_exn : t -> off:int63 -> string Seq.t -> unit + (** [append_seq_exn t ~off seq] appends the sequence of strings [seq] to the + sparse file [t], at the virtual offset [off] which must be larger than + the previously appended offsets. *) + + val flush : t -> (unit, [> Io.write_error ]) result + (** Flush the append buffer. Does not call [fsync]. *) + + val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result + (** Close the underlying files. *) +end diff --git a/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml b/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml deleted file mode 100644 index 8dbeb5c7115c..000000000000 --- a/brassaia/lib_brassaia_pack/unix/sparse_file_intf.ml +++ /dev/null @@ -1,153 +0,0 @@ -(* - * Copyright (c) 2022 Tarides - * - * Permission to use, copy, modify, and distribute this software for any - * purpose with or without fee is hereby granted, provided that the above - * copyright notice and this permission notice appear in all copies. - * - * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - *) - -open! Import - -module type S = sig - module Io : module type of Io.Unix - module Errs : Io_errors.S - - type t - type open_error := [ Io.open_error | `Corrupted_mapping_file of string ] - - val open_ro : - mapping_size:int -> - mapping:string -> - data:string -> - (t, [> open_error ]) result - (** [open_ro ~mapping_size ~mapping ~data] returns a new read-only view of the - sparse file, represented on disk by two files named [mapping] and [data]. - The mapping file is expected to have size at least [mapping_size] (and the - rest is ignored if the file is larger). *) - - val close : t -> (unit, [> Io.close_error ]) result - (** Close the underlying files. *) - - val read_exn : t -> off:int63 -> len:int -> bytes -> unit - (** [read_exn t ~off ~len buffer] writes into [buffer] the bytes from [off] to - [off+len]. *) - - val read_range_exn : - t -> off:int63 -> min_len:int -> max_len:int -> bytes -> int - (** Same as [read_exn], the amount read is [max_len] if possible or at least - [min_len] if reading more would step over a hole in the sparse file. - - Returns the actually read length. *) - - val next_valid_offset : t -> off:int63 -> int63 option - (** [next_valid_offset t ~off] returns [Some off'] such that [off'] is the - smallest readable offset larger or equal to [off]. This enables jumping - over a sparse hole to the next compact range of data. *) - - val iter : t -> (off:int63 -> len:int -> unit) -> (unit, Errs.t) result - (** [iter t f] calls [f] on each [(off,len)] pair in [mapping]. Only used for - testing and debugging. - - It is guaranteed for the offsets to be iterated in monotonic order. - - It is guaranteed that entries don't overlap. - - The exceptions raised by [f] are caught and returned (as long as they are - known by [Errs]). *) - - module Wo : sig - type t - - val open_wo : - mapping_size:int -> - mapping:string -> - data:string -> - (t, [> open_error ]) result - (** [open_wo ~mapping_size ~mapping ~data] returns a write-only instance of - the sparse file. - - Note: This is unsafe and is only used by the GC to mark the parent - commits as dangling. One must ensure that no read-only instance is - opened at the same time, as otherwise the writes would be observable by - it. *) - - val write_exn : t -> off:int63 -> len:int -> string -> unit - (** [write_exn t ~off ~len str] writes the first [len] bytes of [str] to [t] - at the virtual offset [off]. *) - - val fsync : t -> (unit, [> Io.write_error ]) result - (** [fsync t] persists to the file system the effects of previous writes. *) - - val close : t -> (unit, [> Io.close_error ]) result - (** Close the underlying files. *) - - val create_from_data : - mapping:string -> - dead_header_size:int -> - size:Int63.t -> - data:string -> - (int63, [> Io.create_error | Io.write_error | Io.close_error ]) result - (** [create_from_data ~mapping ~dead_header_size ~size ~data] initializes a - new sparse file on disk from the existing file [data], by creating the - corresponding [mapping] file. The first [dead_header_size] bytes are - ignored and the remaining [size] bytes of [data] are made available. - - On success, returns the size of the [mapping] file to be stored in the - control file for consistency checking on open. *) - end - - module Ao : sig - type t - - val end_off : t -> Int63.t - (** [end_off t] returns the largest virtual offset contained in the sparse - file [t]. Attempting to append with a strictly smaller virtual offset - will fail. *) - - val mapping_size : t -> Int63.t - (** [end_off t] returns the current size of the mapping file associated to - the sparse file [t] including additions not yet flushed to the file - system. It can be passed to {!open_ao} as [mapping_size] when opening - the file again. *) - - val create : - mapping:string -> data:string -> (t, [> Io.create_error ]) result - (** [create ~mapping ~data] initializes a new empty sparse file, represented - on disk by two files named [mapping] and [data]. *) - - val open_ao : - mapping_size:Int63.t -> - mapping:string -> - data:string -> - ( t, - [> Io.open_error - | `Closed - | `Invalid_argument - | `Read_out_of_bounds - | `Inconsistent_store ] ) - result - (** [open_ao ~mapping_size ~mapping ~data] returns an append-only instance - of the sparse file. *) - - val append_seq_exn : t -> off:int63 -> string Seq.t -> unit - (** [append_seq_exn t ~off seq] appends the sequence of strings [seq] to the - sparse file [t], at the virtual offset [off] which must be larger than - the previously appended offsets. *) - - val flush : t -> (unit, [> Io.write_error ]) result - (** Flush the append buffer. Does not call [fsync]. *) - - val close : t -> (unit, [> Io.close_error | `Pending_flush ]) result - (** Close the underlying files. *) - end -end - -module type Sigs = S -- GitLab