From 8956c31772d02def19a97e5c7c6fcdee8e4deedb Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Tue, 11 Dec 2018 18:41:55 +0100 Subject: [PATCH 1/6] irmin-lmdb/GC: add a generational, stop-the-world GC --- src/lib_shell/state.ml | 1 + src/lib_storage/context.ml | 29 +- src/lib_storage/context.mli | 2 + src/lib_storage/test/test_context.ml | 1 + vendors/irmin-lmdb/dune | 2 +- vendors/irmin-lmdb/irmin-lmdb.opam | 2 + vendors/irmin-lmdb/irmin_lmdb.ml | 830 +++++++++++++++++++-------- vendors/irmin-lmdb/irmin_lmdb.mli | 35 +- vendors/irmin-lmdb/test/dune | 3 + vendors/irmin-lmdb/test/test.ml | 131 +++++ 10 files changed, 800 insertions(+), 236 deletions(-) create mode 100644 vendors/irmin-lmdb/test/dune create mode 100644 vendors/irmin-lmdb/test/test.ml diff --git a/src/lib_shell/state.ml b/src/lib_shell/state.ml index 7740042bbbaf..41ba842c9fc0 100644 --- a/src/lib_shell/state.ml +++ b/src/lib_shell/state.ml @@ -1302,6 +1302,7 @@ let init return (state, main_chain_state, context_index) let close { global_data } = + Context.close () >>= fun () -> Shared.use global_data begin fun { global_store } -> Store.close global_store ; Lwt.return_unit diff --git a/src/lib_storage/context.ml b/src/lib_storage/context.ml index 6db05a472fc3..2e58a16c2029 100644 --- a/src/lib_storage/context.ml +++ b/src/lib_storage/context.ml @@ -339,10 +339,33 @@ let hash ~time ?(message = "") context = (* FIXME: this doesn't have to be lwt *) Lwt.return x +let commits = ref 0 +let gc_in_progress = ref None + let commit ~time ?message context = - raw_commit ~time ?message context >>= fun commit -> - let h = GitStore.Commit.hash commit in - Lwt.return h + raw_commit ~time ?message context >|= fun commit -> + (* we do 1 GC per cycle and only 1 at a time *) + (* CR(samoht): we should find a better place to add the GC hook *) + if !commits mod 4096 = 0 && !gc_in_progress = None then + Lwt.async (fun () -> + (* we keep 5 cycles *) + let keep = 5 * 4096 in + (* CR(samoht): replace Fmt.pr by lwt_log function *) + Fmt.pr "GC: promoting %d contexts\n%!" keep; + let switch = Lwt_switch.create () in + gc_in_progress := Some switch; + GitStore.gc ~repo:context.index.repo ~keep ~switch commit + >|= fun stats -> + gc_in_progress := None; + Fmt.pr "GC: live objects: %a\n%!" GitStore.pp_stats stats + ); + incr commits; + GitStore.Commit.hash commit + +let close () = + match !gc_in_progress with + | None -> Lwt.return () + | Some s -> Lwt_switch.turn_off s (*-- Generic Store Primitives ------------------------------------------------*) diff --git a/src/lib_storage/context.mli b/src/lib_storage/context.mli index 309a0318aa51..fca3a0ab3014 100644 --- a/src/lib_storage/context.mli +++ b/src/lib_storage/context.mli @@ -40,6 +40,8 @@ val init: string -> index Lwt.t +val close: unit -> unit Lwt.t + val commit_genesis: index -> chain_id:Chain_id.t -> diff --git a/src/lib_storage/test/test_context.ml b/src/lib_storage/test/test_context.ml index 224b70afe042..22cd565d0d71 100644 --- a/src/lib_storage/test/test_context.ml +++ b/src/lib_storage/test/test_context.ml @@ -112,6 +112,7 @@ let wrap_context_init f _ () = create_block3a idx block2 >>= fun block3a -> create_block3b idx block2 >>= fun block3b -> f { idx; genesis; block2 ; block3a; block3b } >>= fun result -> + Context.close () >>= fun () -> Lwt.return result end diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune index ab0458feb96d..aa9e9066612e 100644 --- a/vendors/irmin-lmdb/dune +++ b/vendors/irmin-lmdb/dune @@ -1,5 +1,5 @@ (library (name irmin_lmdb) (public_name irmin-lmdb) - (libraries irmin lmdb) + (libraries irmin lmdb hashset lwt.unix ocplib-endian) (flags (:standard -safe-string))) diff --git a/vendors/irmin-lmdb/irmin-lmdb.opam b/vendors/irmin-lmdb/irmin-lmdb.opam index 4e83e575193e..2c2a4a0d1f74 100644 --- a/vendors/irmin-lmdb/irmin-lmdb.opam +++ b/vendors/irmin-lmdb/irmin-lmdb.opam @@ -15,6 +15,8 @@ build-test: ["dune" "runtest" "-p" name] depends: [ "dune" {build & >= "1.0.1"} + "ocplib-endian" {>= "1.0"} + "hashset" "irmin" {>= "1.4.0"} "lmdb" {>= "0.1"} ] diff --git a/vendors/irmin-lmdb/irmin_lmdb.ml b/vendors/irmin-lmdb/irmin_lmdb.ml index 9bf0f145246e..42fd078f7b0e 100644 --- a/vendors/irmin-lmdb/irmin_lmdb.ml +++ b/vendors/irmin-lmdb/irmin_lmdb.ml @@ -22,6 +22,11 @@ module Option = struct let value_map ~default ~f = function | None -> default | Some v -> f v + + let get = function + | Some v -> v + | None -> failwith "no value" + end module Result = struct @@ -30,24 +35,51 @@ module Result = struct | Error err -> Error err end -let cstruct_of_ba_copy ba = - let cs = Cstruct.of_bigarray ba in - let len = Cstruct.len cs in - let cs_copy = Cstruct.create_unsafe len in - Cstruct.blit cs 0 cs_copy 0 len ; - cs_copy - open Lwt.Infix +type wtxn = Lmdb.rw Lmdb.txn * Lmdb.db + +(* The GC has 3 modes: + - normal: all reads and writes are done normally on the main database file. + - promotion: a (concurrent) promotion to a different database file is in + progresss. + - pivot: eg. "stop-the-world" all the operations are stopped, the database + files are swapped on disk. *) + +type mode = + | Normal + | Promotion + | Pivot + +type resource = { + c: unit Lwt_condition.t; + m: Lwt_mutex.t; + mutable n: int; + mutable ready: bool; +} + +let resource () = { + c = Lwt_condition.create (); + m = Lwt_mutex.create (); + n = 0; + ready = true; +} + type t = { - db: Lmdb.t ; root: string ; - mutable wtxn: (Lmdb.rw Lmdb.txn * Lmdb.db) option; + readonly: bool; + readers: resource; + writers: resource; + mutable db: Lmdb.t ; + mutable gc_mode: mode; + mutable wtxn: wtxn option; + mutable concurrent_adds: string (* =key *) list; + mutable generation: int; } -let of_result = function - | Ok v -> Lwt.return v - | Error err -> Lwt.fail_with (Lmdb.string_of_error err) +let of_result op = function + | Ok v -> Lwt.return v + | Error err -> Fmt.kstrf Lwt.fail_with "%s: %a" op Lmdb.pp_error err let (|>>) v f = match v with @@ -63,27 +95,6 @@ let get_wtxn db = db.wtxn <- Some (txn, ddb); Ok (txn, ddb) -let commit_wtxn db = - match db.wtxn with - | None -> Ok () - | Some (t, _ddb) -> - db.wtxn <- None; - Lmdb.commit_txn t - -let add db k v = - get_wtxn db |>> fun (txn, ddb) -> - Lmdb.put_string txn ddb k v - -let add db k v = - of_result @@ add db k v - -let add_cstruct db k v = - get_wtxn db |>> fun (txn, ddb) -> - Lmdb.put txn ddb k (Cstruct.to_bigarray v) - -let add_cstruct db k v = - of_result @@ add_cstruct db k v - let src = Logs.Src.create "irmin.lmdb" ~doc:"Irmin in a Lmdb store" module Log = (val Logs.src_log src : Logs.LOG) @@ -105,8 +116,27 @@ module Conf = struct let root = Irmin.Private.Conf.root let mapsize = Irmin.Private.Conf.key "mapsize" int64_converter 40_960_000_000L - let readonly = - Irmin.Private.Conf.key "readonly" bool_converter false + let readonly = + Irmin.Private.Conf.key "readonly" bool_converter false + + type t = { + root : string ; + mapsize : int64 ; + readonly: bool ; + (* TODO *) + (* ?write_buffer_size:int -> *) + (* ?max_open_files:int -> *) + (* ?block_size:int -> *) + (* ?block_restart_interval:int -> *) + (* ?cache_size:int *) + } + + let of_config c = + let root = Irmin.Private.Conf.get c root in + let mapsize = Irmin.Private.Conf.get c mapsize in + let readonly = Irmin.Private.Conf.get c readonly in + let root = match root with None -> "irmin" | Some root -> root in + { root ; mapsize ; readonly } end @@ -117,7 +147,55 @@ let config let config = C.add config Conf.readonly readonly in Option.value_map mapsize ~default:config ~f:(C.add config Conf.mapsize) -type ('r) reader = { f : 'k. 'k Lmdb.txn -> Lmdb.db -> ('r, Lmdb.error) result } [@@unboxed] +let open_db ~root ~mapsize ~readonly = + if not (Sys.file_exists root) then Unix.mkdir root 0o755; + let flags = if readonly then [ Lmdb.RdOnly ] else [] in + let sync_flag = + match Sys.getenv_opt "TEZOS_CONTEXT_SYNC" with + | None -> [] + | Some s -> + match String.lowercase_ascii s with + | "nosync" -> [ Lmdb.NoSync ] + | "nometasync" -> [ Lmdb.NoMetaSync ] + | _ -> + Printf.eprintf "Unrecognized TEZOS_SYNC option : %s\n\ + allowed: nosync nometasync" s; + [] + in + let flags = sync_flag @ Lmdb.NoRdAhead :: Lmdb.NoTLS :: flags in + let file_flags = if readonly then 0o444 else 0o644 in + match Lmdb.opendir ~mapsize ~flags root file_flags with + | Ok db -> db + | Error err -> + Fmt.failwith "open {%s} %a" (Filename.basename root) Lmdb.pp_error err + +let dbs = Hashtbl.create 3 + +let make conf = + let { Conf.root ; mapsize ; readonly } = Conf.of_config conf in + try Hashtbl.find dbs (root, readonly) + with Not_found -> + let db = open_db ~root ~mapsize ~readonly in + let db = { + db; root; readonly; + gc_mode = Normal; + readers = resource (); + writers = resource (); + wtxn = None; + concurrent_adds = []; + generation = 0; + } in + Hashtbl.add dbs (root, readonly) db; + db + +let close t = + Hashtbl.remove dbs (t.root, t.readonly); + assert (t.readers.n = 0); + assert (t.writers.n = 0); + Lmdb.closedir t.db + +type ('r) reader = + { f : 'k. 'k Lmdb.txn -> Lmdb.db -> ('r, Lmdb.error) result } [@@unboxed] let with_read_db db ~f = match db.wtxn with @@ -126,15 +204,145 @@ let with_read_db db ~f = | Some (txn, ddb) -> f.f txn ddb -let mem db k = - with_read_db db ~f:{ f = fun txn db -> Lmdb.mem txn db k } |> - of_result +let get txn db k = + Result.map ~f:Cstruct.of_bigarray (Lmdb.get txn db k) let find_bind db k ~f = - match with_read_db db ~f:{ f = fun txn db -> Result.map ~f (Lmdb.get txn db k) } with - | Error KeyNotFound -> Lwt.return_none - | Error err -> Lwt.fail_with (Lmdb.string_of_error err) - | Ok v -> Lwt.return v + match + with_read_db db ~f:{ f = fun txn db -> Result.map ~f (get txn db k) } + with + | Error KeyNotFound -> Ok None + | Error err -> Error err + | Ok v -> Ok v + +module Raw = struct + + let mem db k = + with_read_db db ~f:{ f = fun txn db -> Lmdb.mem txn db k } + |> of_result "mem" + + let find db key of_ba = + find_bind db key ~f:(fun v -> Option.of_result (of_ba v)) + |> of_result "find" + + let add_string db k v = + (get_wtxn db |>> fun (txn, ddb) -> + Lmdb.put_string txn ddb k v) + |> of_result "add_string" + + let add_cstruct db k v = + (get_wtxn db |>> fun (txn, ddb) -> + Lmdb.put txn ddb k (Cstruct.to_bigarray v)) + |> of_result "add_ba" + + let add db k = function + | `String v -> add_string db k v + | `Cstruct v -> add_cstruct db k v + + let remove db k = + (get_wtxn db |>> fun (txn, ddb) -> + match Lmdb.del txn ddb k with + | Ok () | Error Lmdb.KeyNotFound -> Ok () + | x -> x) + |> of_result "remove" + + let commit op db = + (match db.wtxn with + | None -> Ok () + | Some (t, _ddb) -> + db.wtxn <- None; + Lmdb.commit_txn t) + |> of_result op + + let fsync db = + Lmdb.sync ~force:true db.db + |> of_result "fsync" + +end + +let with_resource r f = + Lwt_mutex.lock r.m >>= fun () -> + let wait () = + if r.ready then Lwt.return () + else Lwt_condition.wait ~mutex:r.m r.c + in + wait () >>= fun () -> + r.n <- r.n + 1; + Lwt_mutex.unlock r.m; + Lwt.finalize f (fun () -> + Lwt_mutex.with_lock r.m (fun () -> + r.n <- r.n - 1; + Lwt_condition.signal r.c (); + Lwt.return ())) + +let disable r = + Lwt_mutex.lock r.m >>= fun () -> + r.ready <- false; + let wait () = + if r.n = 0 then Lwt.return () + else Lwt_condition.wait ~mutex:r.m r.c + in + wait () >>= fun () -> + Lwt_condition.signal r.c (); + Lwt_mutex.unlock r.m; + Lwt.return () + +let enable r = + Lwt_mutex.with_lock r.m @@ fun () -> + r.ready <- true; + Lwt_condition.signal r.c (); + Lwt.return () + +let maybe_with_read_lock db f = match db.gc_mode with + | Pivot -> with_resource db.readers f + | _ -> f () + +let maybe_with_write_lock db f = match db.gc_mode with + | Pivot -> with_resource db.writers f + | _ -> f () + +let maybe_remember db k = match db.gc_mode with + | Promotion -> db.concurrent_adds <- k :: db.concurrent_adds + | _ -> () + +module AO (K: Irmin.Hash.S) (V: Irmin.Contents.S0) (Conv: sig + val of_key: K.t -> string + val to_value: Cstruct.t -> (V.t, [`Msg of string]) result + val of_value: V.t -> [`String of string | `Cstruct of Cstruct.t] + val digest: V.t -> K.t + end) = struct + + include Conv + + type nonrec t = t + type key = K.t + type value = V.t + + let mem db key = + maybe_with_read_lock db @@ fun () -> + Raw.mem db (Conv.of_key key) + + let unsafe_find db key = + Raw.find db (Conv.of_key key) @@ fun v -> + Conv.to_value v + + let find db key = + maybe_with_read_lock db @@ fun () -> + unsafe_find db key + + let unsafe_add db v = + let k = Conv.digest v in + let v = Conv.of_value v in + Raw.add db (Conv.of_key k) v >|= fun () -> + k + + let add db v = + maybe_with_write_lock db @@ fun () -> + unsafe_add db v >|= fun k -> + maybe_remember db (Conv.of_key k); + k + +end module Irmin_value_store (M: Irmin.Metadata.S) @@ -143,34 +351,14 @@ module Irmin_value_store (P: Irmin.Path.S) = struct module XContents = struct - - type nonrec t = t - type key = H.t - type value = C.t - - let lmdb_of_key h = - "contents/" ^ Cstruct.to_string (H.to_raw h) - - let mem db key = - let key = lmdb_of_key key in - mem db key - - let find db key = - let key = lmdb_of_key key in - find_bind db key ~f:begin fun v -> - Option.of_result (C.of_string Cstruct.(to_string (of_bigarray v))) - end - - let to_string = Fmt.to_to_string C.pp - - let add db v = - let k = H.digest C.t v in - let k_lmdb = lmdb_of_key k in - let v = to_string v in - add db k_lmdb v >|= fun () -> k - module Val = C module Key = H + include AO (Key) (Val) (struct + let of_key h = "contents/" ^ Cstruct.to_string (Key.to_raw h) + let to_value v = Val.of_string (Cstruct.(to_string v)) + let of_value s = `String (Fmt.to_to_string Val.pp s) + let digest v = Key.digest Val.t v + end) end module Contents = Irmin.Contents.Store(XContents) @@ -401,36 +589,25 @@ module Irmin_value_store let t = Irmin.Type.like N.t of_n to_n end - module AO = struct - - type nonrec t = t - type key = H.t - type value = Val.t + include AO (Key) (Val) (struct - let lmdb_of_key h = - "node/" ^ Cstruct.to_string (H.to_raw h) + let of_key h = "node/" ^ Cstruct.to_string (H.to_raw h) - let mem db key = - let key = lmdb_of_key key in - mem db key + let to_value v = + Irmin.Type.decode_cstruct (Irmin.Type.list Val.entry_t) v - let of_cstruct v = - Irmin.Type.decode_cstruct (Irmin.Type.list Val.entry_t) v |> - Option.of_result + let of_value v = + let c = Irmin.Type.encode_cstruct (Irmin.Type.list Val.entry_t) v in + `Cstruct c - let find db key = - let key = lmdb_of_key key in - find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v)) + let digest v = + let v = Irmin.Type.encode_cstruct (Irmin.Type.list Val.entry_t) v in + H.digest Irmin.Type.cstruct v - let add db v = - let v = Irmin.Type.encode_cstruct (Irmin.Type.list Val.entry_t) v in - let k = H.digest Irmin.Type.cstruct v in - let k_lmdb = lmdb_of_key k in - add_cstruct db k_lmdb v >|= fun () -> k - end - include AO + end) end + module Node = Irmin.Private.Node.Store(Contents)(P)(M)(XNode) module XCommit = struct @@ -464,36 +641,19 @@ module Irmin_value_store module Key = H - module AO = struct + include AO (Key) (Val) (struct + let of_key h = "commit/" ^ Cstruct.to_string (H.to_raw h) + let of_value v = `Cstruct (Irmin.Type.encode_cstruct Val.t v) + let to_value v = Irmin.Type.decode_cstruct Val.t v + let digest v = + let v = Irmin.Type.encode_cstruct Val.t v in + H.digest Irmin.Type.cstruct v + end) - let lmdb_of_key h = - "commit/" ^ Cstruct.to_string (H.to_raw h) - - type nonrec t = t - type key = H.t - type value = Val.t - - let mem db key = - let key = lmdb_of_key key in - mem db key - - let of_cstruct v = - Irmin.Type.decode_cstruct Val.t v |> - Option.of_result - - let find db key = - let key = lmdb_of_key key in - find_bind db key ~f:(fun v -> of_cstruct (cstruct_of_ba_copy v)) - - let add db v = - let v = Irmin.Type.encode_cstruct Val.t v in - let k = H.digest Irmin.Type.cstruct v in - let k_lmdb = lmdb_of_key k in - add_cstruct db k_lmdb v >>= fun () -> - of_result @@ commit_wtxn db >|= fun () -> k - - end - include AO + let add db v = + add db v >>= fun k -> + Raw.commit "Commit.add" db >|= fun () -> + k end module Commit = Irmin.Private.Commit.Store(Node)(XCommit) @@ -523,49 +683,39 @@ module Irmin_branch_store (B: Branch) (H: Irmin.Hash.S) = struct module Val = H module W = Irmin.Private.Watch.Make(Key)(Val) + module L = Irmin.Private.Lock.Make(B) type nonrec t = { db: t; w: W.t; + l: L.t; } let watches = Hashtbl.create 10 type key = Key.t type value = Val.t - type watch = W.watch * (unit -> unit Lwt.t) + type watch = W.watch - (* let branch_of_lmdb r = *) - (* let str = String.trim @@ Git.Reference.to_raw r in *) - (* match B.of_ref str with *) - (* | Ok r -> Some r *) - (* | Error (`Msg _) -> None *) - - let lmdb_of_branch r = Fmt.to_to_string B.pp_ref r + let lmdb_of_branch r = "refs/" ^ Fmt.to_to_string B.pp_ref r + let hash_of_lmdb v = H.of_raw v + let lmdb_of_hash r = H.to_raw r let mem db r = - mem db.db (lmdb_of_branch r) - - let find db r = - find_bind db.db (lmdb_of_branch r) - ~f:(fun v -> Some (H.of_raw (cstruct_of_ba_copy v))) + maybe_with_read_lock db.db @@ fun () -> + Raw.mem db.db (lmdb_of_branch r) - let listen_dir _ = - Lwt.return (fun () -> Lwt.return_unit) + let unsafe_find db r = + Raw.find db.db (lmdb_of_branch r) (fun x -> Ok (hash_of_lmdb x)) - let watch_key t key ?init f = - listen_dir t >>= fun stop -> - W.watch_key t.w key ?init f >|= fun w -> - (w, stop) + let find db r = + maybe_with_read_lock db.db @@ fun () -> + unsafe_find db r - let watch t ?init f = - listen_dir t >>= fun stop -> - W.watch t.w ?init f >|= fun w -> - (w, stop) + let watch_key t key ?init f = W.watch_key t.w key ?init f - let unwatch t (w, stop) = - stop () >>= fun () -> - W.unwatch t.w w + let watch t ?init f = W.watch t.w ?init f + let unwatch t w = W.unwatch t.w w let v db (* ~head *) = let w = @@ -576,48 +726,56 @@ module Irmin_branch_store (B: Branch) (H: Irmin.Hash.S) = struct Hashtbl.add watches db.root w; w in - Lwt.return { db ; w } - - let list _ = Lwt.return_nil (* TODO, or not *) - - (* let write_index _t _gr _gk = *) - (* Lwt.return_unit *) - - let set _t r _k = - Log.debug (fun f -> f "update %a" B.pp r); - Lwt.return_unit - (* let gr = git_of_branch r in *) - (* let gk = git_of_commit k in *) - (* G.write_reference t.t gr gk >>= fun () -> *) - (* W.notify t.w r (Some k) >>= fun () -> *) - (* write_index t gr (Git.Hash.to_commit gk) *) - - let remove _t r = + let l = L.v () in + Lwt.return { db ; w; l } + + let list _ = + (* CR(samoht): normally this should return the references, but + Tezos don't use that function, so just skip it. *) + Lwt.return_nil (* TODO, or not *) + + let set_unsafe t r k = + let r = lmdb_of_branch r in + let k = lmdb_of_hash k in + Raw.add_cstruct t.db r k >|= fun () -> + maybe_remember t.db r + + let set t r k = + Log.debug (fun f -> f "set %a" B.pp r); + L.with_lock t.l r @@ fun () -> + set_unsafe t r k >>= fun () -> + Raw.commit "set" t.db + + let remove_unsafe t r = + let r = lmdb_of_branch r in + Raw.remove t.db r >|= fun () -> + maybe_remember t.db r + + let remove t r = Log.debug (fun f -> f "remove %a" B.pp r); - Lwt.return_unit - (* G.remove_reference t.t (git_of_branch r) >>= fun () -> *) - (* W.notify t.w r None *) + L.with_lock t.l r @@ fun () -> + remove_unsafe t r >>= fun () -> + Raw.commit "remove" t.db - let test_and_set _t _r ~test:_ ~set:_ = + let eq_hashes = Irmin.Type.equal H.t + + let test_and_set t r ~test ~set = Log.debug (fun f -> f "test_and_set"); - Lwt.return_true - (* let gr = git_of_branch r in *) - (* let c = function None -> None | Some h -> Some (git_of_commit h) in *) - (* G.test_and_set_reference t.t gr ~test:(c test) ~set:(c set) >>= fun b -> *) - (* (if b then W.notify t.w r set else Lwt.return_unit) >>= fun () -> *) - (* begin *) - (* We do not protect [write_index] because it can take a log - time and we don't want to hold the lock for too long. Would - be safer to grab a lock, although the expanded filesystem - is not critical for Irmin consistency (it's only a - convenience for the user). *) - (* if b then match set with *) - (* | None -> Lwt.return_unit *) - (* | Some v -> write_index t gr (Git.Hash.to_commit (git_of_commit v)) *) - (* else *) - (* Lwt.return_unit *) - (* end >|= fun () -> *) - (* b *) + maybe_with_write_lock t.db @@ fun () -> + L.with_lock t.l r @@ fun () -> + find t r >>= fun v -> + let set () = + (match set with + | None -> remove_unsafe t r + | Some v -> set_unsafe t r v) + >>= fun () -> + Raw.commit "test_and_set" t.db >|= fun () -> + true + in + match test, v with + | None , None -> set () + | Some v, Some v'-> if eq_hashes v v' then set () else Lwt.return false + | __ -> Lwt.return false end @@ -654,55 +812,265 @@ module Make let node_t t : Node.t = contents_t t, t.db let commit_t t : Commit.t = node_t t, t.db - type config = { - root : string option ; - mapsize : int64 ; - readonly : bool ; - (* TODO *) - (* ?write_buffer_size:int -> *) - (* ?max_open_files:int -> *) - (* ?block_size:int -> *) - (* ?block_restart_interval:int -> *) - (* ?cache_size:int *) - } - - let config c = - let root = Irmin.Private.Conf.get c Conf.root in - let mapsize = Irmin.Private.Conf.get c Conf.mapsize in - let readonly = Irmin.Private.Conf.get c Conf.readonly in - { root ; mapsize ; readonly } - - let v conf = - let { root ; mapsize ; readonly } = config conf in - let root = match root with None -> "irmin.ldb" | Some root -> root in - if not (Sys.file_exists root) then Unix.mkdir root 0o755 ; - let flags = if readonly then [ Lmdb.RdOnly ] else [] in - let sync_flag = - match Sys.getenv_opt "TEZOS_CONTEXT_SYNC" with - | None -> [] - | Some s -> - match String.lowercase_ascii s with - | "nosync" -> [ Lmdb.NoSync ] - | "nometasync" -> [ Lmdb.NoMetaSync ] - | _ -> - Printf.eprintf "Unrecognized TEZOS_SYNC option : %s\n\ - allowed: nosync nometasync" s; - [] - in - let flags = sync_flag @ Lmdb.NoRdAhead :: Lmdb.NoTLS :: flags in - let file_flags = if readonly then 0o444 else 0o644 in - match Lmdb.opendir ~mapsize ~flags root file_flags with - | Error err -> Lwt.fail_with (Lmdb.string_of_error err) - | Ok db -> - let db = { db ; root ; wtxn = None } in - Branch.v db >|= fun branch -> - { db; branch; config = conf } + let v config = + let db = make config in + Branch.v db >|= fun branch -> + { db; branch; config } end end include Irmin.Make_ext(P) -end + type stats = { + mutable promoted_contents: int; + mutable promoted_nodes : int; + mutable promoted_commits: int; + mutable width: int; + mutable depth: int; + } -include Conf + let promoted_contents t = t.promoted_contents + let promoted_nodes t = t.promoted_nodes + let promoted_commits t = t.promoted_commits + + let pp_stats ppf t = + Fmt.pf ppf "[%d blobs/%d nodes/%d commits] depth:%d width:%d" + t.promoted_contents + t.promoted_nodes + t.promoted_commits + t.depth + t.width + + let stats () = { + promoted_contents = 0; + promoted_nodes = 0; + promoted_commits = 0; + width = 0; + depth = 0; + } + + (* poor-man GC *) + module Irmin_GC = struct + + let incr_contents s = s.promoted_contents <- s.promoted_contents + 1 + let incr_nodes s = s.promoted_nodes <- s.promoted_nodes + 1 + let incr_commits s = s.promoted_commits <- s.promoted_commits + 1 + let update_width s c = s.width <- max s.width (List.length c) + let update_depth s n = s.depth <- max s.depth n + + module Tbl = Hashset.Make(struct + type t = string + let equal x y = String.equal x y + let hash t = + assert (String.length t > H.digest_size); + EndianString.NativeEndian.get_int64 t (String.length t - 8) + |> Int64.to_int + end) + + type t = { + tbl : Tbl.t; + new_db: P.Contents.t; + old_db: P.Contents.t; + stats : stats; + switch: Lwt_switch.t option; + } + + let close t = + close t.new_db; + close t.old_db + + let new_root repo = + let c = Conf.of_config repo.P.Repo.config in + c.Conf.root ^ ".1" + + let v ?switch repo = + let config = + let root = new_root repo in + Irmin.Private.Conf.add repo.P.Repo.config Conf.root (Some root) + in + let new_db = make config in + let tbl = Tbl.create 10_123 in + let stats = stats () in + { tbl; stats; new_db; old_db = repo.db; switch } + + let promote_val t k v = + Raw.add_cstruct t.new_db k v + + let promote msg t k = + Raw.find t.old_db k (fun x -> Ok x) >>= function + | Some v -> promote_val t k v + | None -> + let k = H.of_raw (Cstruct.of_string k) in + Fmt.failwith "promote %s: cannot promote key %a\n%!" msg H.pp k + + let mem t k = + if Tbl.mem t.tbl k then Lwt.return true + else Raw.mem t.new_db k + + let copy_contents t k = + Lwt_switch.check t.switch; + let k = P.XContents.of_key k in + mem t k >>= function + | true -> Lwt.return () + | false -> + Tbl.add t.tbl k; + incr_contents t.stats; + promote "contents" t k + + let copy_node t k = + let rec aux x = + Lwt_switch.check t.switch; + match x with + | [] -> Lwt.return () + | (ks, _, `Black, _) :: todo -> + promote "node" t ks >>= fun () -> + aux todo + | (ks, k, `Gray , n) :: todo -> + mem t ks >>= function + | true -> aux todo + | false -> + Tbl.add t.tbl ks; + P.XNode.unsafe_find t.old_db k >|= Option.get >>= fun v -> + let children = P.Node.Val.list v in + incr_nodes t.stats; + update_width t.stats children; + update_depth t.stats n; + let todo = ref ((ks, k, `Black, n) :: todo) in + Lwt_list.iter_p (fun (_, c) -> match c with + | `Contents (k, _) -> copy_contents t k + | `Node k -> + let ks = P.XNode.of_key k in + todo := (ks, k, `Gray, n+1) :: !todo; + Lwt.return () + ) children + >>= fun () -> + aux !todo + in + let ks = P.XNode.of_key k in + mem t ks >>= function + | true -> Lwt.return () + | false -> aux [ks, k, `Gray, 0] + + let copy_commit t k = + Lwt_switch.check t.switch; + P.XCommit.unsafe_find t.old_db k >|= Option.get >>= fun v -> + let k = P.XCommit.of_key k in + (* we ignore the parents *) + copy_node t (P.Commit.Val.node v) >>= fun () -> + incr_commits t.stats; + promote "commit" t k + + let root repo = + let c = repo.P.Repo.config in + match Irmin.Private.Conf.get c Conf.root with + | None -> "context" + | Some r -> r + + let pivot ~branches repo t = + let rename () = + let old_data = Filename.concat (root repo) "data.mdb" in + let new_data = Filename.concat (new_root repo) "data.mdb" in + let old_lock = Filename.concat (root repo) "lock.mdb" in + let new_lock = Filename.concat (new_root repo) "lock.mdb" in + Lwt_unix.rename new_data old_data >>= fun () -> + Lwt_unix.unlink new_lock >>= fun () -> + Lwt_unix.unlink old_lock + in + disable repo.db.writers >>= fun () -> + try + (* flush the concurrent writes to the new file *) + Lwt_list.iter_s + (promote "concurrent writes" t) + (List.rev repo.db.concurrent_adds) + >>= fun () -> + repo.db.concurrent_adds <- []; + + (* promote the live refs *) + Lwt_list.iter_p (fun br -> + let k = P.Branch.lmdb_of_branch br in + promote "refs" t k + ) branches + >>= fun () -> + + (* fsync *) + Raw.commit "pivot" t.new_db >>= fun () -> + Raw.fsync t.new_db >>= fun () -> + + (* rename *) + disable repo.db.readers >>= fun () -> + close t; + rename () >>= fun () -> + + (* re-open the database *) + P.Repo.v repo.config >>= fun x -> + repo.db.db <- x.db.db; + repo.db.generation <- repo.db.generation + 1; + + enable repo.db.readers >>= fun () -> + enable repo.db.writers + with e -> + enable repo.db.readers >>= fun () -> + enable repo.db.writers >>= fun () -> + raise e + + end + + let roots ~root ~keep = + let rec aux (n, acc) root = + let return () = Lwt.return (List.rev (Commit.hash root :: acc)) in + Commit.parents root >>= function + | [] -> return () + | [h] -> + if n < keep then aux (n+1, Commit.hash root :: acc) h + else return () + | _ -> assert false + in + aux (1, []) root + + let promote_all ~(repo:repo) ?before_pivot ~branches t roots = + repo.db.gc_mode <- Promotion; + let last_update = ref (Unix.gettimeofday ()) in + Lwt_list.iteri_s (fun i k -> + Irmin_GC.copy_commit t k >>= fun () -> + if i mod 100 = 0 || i = List.length roots - 1 then ( + let last = !last_update in + last_update := Unix.gettimeofday (); + Fmt.pr "GC: ETA=%2.0fmin %5d/%d %a (concurrent writes=%d)\n%!" + ((!last_update -. last) /. 100. + *. (float (List.length roots - 1 - i)) + /. 60.) + (i + 1) (List.length roots) pp_stats t.stats + (List.length repo.db.concurrent_adds); + (* flush to disk regularly to not hold too much data into RAM *) + if i mod 1000 = 0 then ( + Irmin_GC.Tbl.clear t.tbl; + Raw.commit "flush roots" t.new_db + ) else + Lwt.return () + ) else + Lwt.return (); + ) roots + >>= fun () -> + (match before_pivot with + | None -> Lwt.return () + | Some t -> t () + ) >>= fun () -> + repo.db.gc_mode <- Pivot; + Irmin_GC.pivot ~branches repo t >|= fun () -> + assert (repo.db.concurrent_adds = []); + repo.db.gc_mode <- Normal; + t.stats + + let gc ~repo ?before_pivot ~keep ?(branches=[]) ?switch root = + let t, w = Lwt.task () in + Lwt_switch.add_hook switch (fun () -> t); + roots ~root ~keep >>= fun roots -> + let gc = Irmin_GC.v ?switch repo in + Lwt.catch + (fun () -> promote_all ~repo ?before_pivot ~branches gc roots) + (function + | Lwt_switch.Off -> Lwt.wakeup w (); Lwt.return gc.stats + | e -> Lwt.fail e) + +end diff --git a/vendors/irmin-lmdb/irmin_lmdb.mli b/vendors/irmin-lmdb/irmin_lmdb.mli index 34a1a3cbdb4f..a381575be0e1 100644 --- a/vendors/irmin-lmdb/irmin_lmdb.mli +++ b/vendors/irmin-lmdb/irmin_lmdb.mli @@ -20,4 +20,37 @@ val config: ?config:Irmin.config -> ?mapsize:int64 -> ?readonly:bool -> string -> Irmin.config -module Make : Irmin.S_MAKER +module Make + (M: Irmin.Metadata.S) + (C: Irmin.Contents.S) + (P: Irmin.Path.S) + (B: Irmin.Branch.S) + (H: Irmin.Hash.S): sig + include Irmin.S + with type key = P.t + and type step = P.step + and module Key = P + and type metadata = M.t + and type contents = C.t + and type branch = B.t + and type Commit.Hash.t = H.t + and type Tree.Hash.t = H.t + and type Contents.Hash.t = H.t + + type stats + + val promoted_contents: stats -> int + val promoted_nodes: stats -> int + val promoted_commits: stats -> int + + val pp_stats: stats Fmt.t + + val gc: + repo:Repo.t -> + ?before_pivot:(unit -> unit Lwt.t) -> + keep:int -> + ?branches:B.t list -> + ?switch:Lwt_switch.t -> + commit -> stats Lwt.t + +end diff --git a/vendors/irmin-lmdb/test/dune b/vendors/irmin-lmdb/test/dune new file mode 100644 index 000000000000..17dfc25abf4a --- /dev/null +++ b/vendors/irmin-lmdb/test/dune @@ -0,0 +1,3 @@ +(executable + (name test) + (libraries irmin-lmdb alcotest-lwt)) diff --git a/vendors/irmin-lmdb/test/test.ml b/vendors/irmin-lmdb/test/test.ml new file mode 100644 index 000000000000..08ffc27bdb42 --- /dev/null +++ b/vendors/irmin-lmdb/test/test.ml @@ -0,0 +1,131 @@ +open Irmin +open Lwt.Infix + +module Store = Irmin_lmdb.Make + (Metadata.None) + (Contents.String) + (Path.String_list) + (Branch.String) + (Hash.SHA1) + +let config = Irmin_lmdb.config "/tmp/irmin-lmdb" + +let date = ref 0L + +let info () = + date := Int64.add !date 1L; + Info.v ~date:!date ~author:"test lmdb" "commit" + +let fill t entries = + Lwt_list.iter_s (fun (k, v) -> Store.set ~info t k v) entries + +let store () = + Store.Repo.v config >>= fun repo -> + Store.Branch.remove repo Store.Branch.master >>= fun () -> + Store.master repo + +let branches = [Store.Branch.master] + +let test_basics _switch () = + let promote_1 () = + store () >>= fun t -> + fill t [ + ["foo"] , "a"; + ["bar";"toto"], "1"; + ["bar";"titi"], "2"; + ] >>= fun () -> + Store.Head.get t >>= fun root -> + Store.gc ~branches ~repo:(Store.repo t) ~keep:1 root >>= fun stats -> + Alcotest.(check int) "1: promoted commits" 1 (Store.promoted_commits stats); + Alcotest.(check int) "1: promoted nodes" 2 (Store.promoted_nodes stats); + Alcotest.(check int) "1: promoted contents" 3 (Store.promoted_contents stats); + Store.find t ["foo"] >>= fun v -> + Alcotest.(check (option string)) "foo" (Some "a") v; + Store.find t ["bar";"toto"] >>= fun v -> + Alcotest.(check (option string)) "bar/toto" (Some "1") v; + Store.find t ["bar";"titi"] >>= fun v -> + Alcotest.(check (option string)) "bar/titi" (Some "2") v; + Store.Commit.parents root >|= function + | [] -> () + | _ -> Alcotest.fail "parent should not exists!" + in + let promote_2 () = + store () >>= fun t -> + fill t [ + ["foo"] , "a"; + ["bar";"toto"], "1"; + ["bar";"titi"], "2"; + ["bar";"titi"], "3"; + ] >>= fun () -> + Store.Head.get t >>= fun root -> + Store.gc ~branches ~repo:(Store.repo t) ~keep:2 root >>= fun stats -> + Alcotest.(check int) "2: promoted commits" 2 (Store.promoted_commits stats); + Alcotest.(check int) "2: promoted nodes" 4 (Store.promoted_nodes stats); + Alcotest.(check int) "2: promoted contents" 4 (Store.promoted_contents stats); + Store.find t ["foo"] >>= fun v -> + Alcotest.(check (option string)) "foo" (Some "a") v; + Store.find t ["bar";"toto"] >>= fun v -> + Alcotest.(check (option string)) "bar/toto" (Some "1") v; + Store.find t ["bar";"titi"] >>= fun v -> + Alcotest.(check (option string)) "bar/titi" (Some "3") v; + Store.Commit.parents root >>= function + | [] -> Alcotest.fail "parent should exist" + | [p] -> + (Store.Commit.parents p >|= function + | [] -> () + | _ -> Alcotest.fail "grand-parents should not exist") + | _ -> Alcotest.fail "too many parents!" + in + promote_1 () >>= + promote_2 + +let test_basics_loop sw () = + let rec loop = function + | 0 -> Lwt.return () + | n -> + test_basics sw () >>= fun () -> + loop (n-1) + in + loop 100 + +let test_concurrency _ () = + store () >>= fun t -> + fill t [ + ["foo"] , "a"; + ["bar";"toto"], "1"; + ["bar";"titi"], "2"; + ["bar";"titi"], "3"; + ] >>= fun () -> + let before_pivot () = + fill t [ + ["bar"; "yo0"], "a"; + ["bar"; "yo1"], "a"; + ["bar"; "yo1"], "b"; + ] >>= fun () -> + (* check that we can read the new values *) + Store.get t ["bar";"yo1"] >|= fun v -> + Alcotest.(check string) "writing to the store still works" "b" v; + in + Store.Head.get t >>= fun root -> + let wait_for_gc, gc = Lwt.task () in + Lwt.async (fun () -> + Store.gc ~branches ~repo:(Store.repo t) ~before_pivot ~keep:2 root >|= fun _stats -> + Lwt.wakeup gc () + ); + wait_for_gc >>= fun () -> + + (* check that the new values are there *) + + Store.get t ["bar";"titi"] >>= fun v -> + Alcotest.(check string) "bar/titi" "3" v; + Store.get t ["bar";"yo1"] >|= fun v -> + Alcotest.(check string) "new values are still here after a pivot" "b" v + +let () = + Alcotest.run "irmin-lmdb" [ + "GC", [ + Alcotest_lwt.test_case "basics" `Quick test_basics; + Alcotest_lwt.test_case "basic loop" `Quick test_basics_loop; + Alcotest_lwt.test_case "concurrency" `Quick test_concurrency; + + ]] -- GitLab From 4b0166718aa7c72236f966b86457a87576684b74 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 21 Dec 2018 19:03:41 +0100 Subject: [PATCH 2/6] irmin-lmdb/GC: vendor hashset From: https://github.com/backtracking/hashset commit: f2b4b3bf7d8482cbaf76142162d131a120645a0b --- vendors/irmin-lmdb/dune | 2 +- vendors/irmin-lmdb/hashset.ml | 174 +++++++++++++++++++++++++++++ vendors/irmin-lmdb/hashset.mli | 107 ++++++++++++++++++ vendors/irmin-lmdb/irmin-lmdb.opam | 1 - 4 files changed, 282 insertions(+), 2 deletions(-) create mode 100644 vendors/irmin-lmdb/hashset.ml create mode 100644 vendors/irmin-lmdb/hashset.mli diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune index aa9e9066612e..8a5132794f40 100644 --- a/vendors/irmin-lmdb/dune +++ b/vendors/irmin-lmdb/dune @@ -1,5 +1,5 @@ (library (name irmin_lmdb) (public_name irmin-lmdb) - (libraries irmin lmdb hashset lwt.unix ocplib-endian) + (libraries irmin lmdb lwt.unix ocplib-endian) (flags (:standard -safe-string))) diff --git a/vendors/irmin-lmdb/hashset.ml b/vendors/irmin-lmdb/hashset.ml new file mode 100644 index 000000000000..5033100ea2f6 --- /dev/null +++ b/vendors/irmin-lmdb/hashset.ml @@ -0,0 +1,174 @@ +(**************************************************************************) +(* *) +(* Copyright (C) Jean-Christophe Filliatre *) +(* *) +(* This software is free software; you can redistribute it and/or *) +(* modify it under the terms of the GNU Library General Public *) +(* License version 2.1, with the special exception on linking *) +(* described in file LICENSE. *) +(* *) +(* This software is distributed in the hope that it will be useful, *) +(* but WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *) +(* *) +(**************************************************************************) + +(* Sets as hash tables. Code adapted from ocaml's Hashtbl. *) + +(* We do dynamic hashing, and resize the table and rehash the elements + when buckets become too long. *) + +type 'a t = + { mutable size: int; (* number of elements *) + mutable data: 'a list array } (* the buckets *) + +let create initial_size = + let s = min (max 1 initial_size) Sys.max_array_length in + { size = 0; data = Array.make s [] } + +let clear h = + for i = 0 to Array.length h.data - 1 do + h.data.(i) <- [] + done; + h.size <- 0 + +let copy h = + { size = h.size; + data = Array.copy h.data } + +let resize hashfun tbl = + let odata = tbl.data in + let osize = Array.length odata in + let nsize = min (2 * osize + 1) Sys.max_array_length in + if nsize <> osize then begin + let ndata = Array.create nsize [] in + let rec insert_bucket = function + [] -> () + | key :: rest -> + insert_bucket rest; (* preserve original order of elements *) + let nidx = (hashfun key) mod nsize in + ndata.(nidx) <- key :: ndata.(nidx) in + for i = 0 to osize - 1 do + insert_bucket odata.(i) + done; + tbl.data <- ndata; + end + +let add h key = + let i = (Hashtbl.hash key) mod (Array.length h.data) in + let bucket = h.data.(i) in + if not (List.mem key bucket) then begin + h.data.(i) <- key :: bucket; + h.size <- succ h.size; + if h.size > Array.length h.data lsl 1 then resize Hashtbl.hash h + end + +let remove h key = + let rec remove_bucket = function + [] -> + [] + | k :: next -> + if k = key + then begin h.size <- pred h.size; next end + else k :: remove_bucket next in + let i = (Hashtbl.hash key) mod (Array.length h.data) in + h.data.(i) <- remove_bucket h.data.(i) + +let mem h key = + List.mem key h.data.((Hashtbl.hash key) mod (Array.length h.data)) + +let cardinal h = + let d = h.data in + let c = ref 0 in + for i = 0 to Array.length d - 1 do + c := !c + List.length d.(i) + done; + !c + +let iter f h = + let d = h.data in + for i = 0 to Array.length d - 1 do + List.iter f d.(i) + done + +let fold f h init = + let rec do_bucket b accu = + match b with + [] -> + accu + | k :: rest -> + do_bucket rest (f k accu) in + let d = h.data in + let accu = ref init in + for i = 0 to Array.length d - 1 do + accu := do_bucket d.(i) !accu + done; + !accu + +(* Functorial interface *) + +module type HashedType = + sig + type t + val equal: t -> t -> bool + val hash: t -> int + end + +module type S = + sig + type elt + type t + val create: int -> t + val clear: t -> unit + val copy: t -> t + val add: t -> elt -> unit + val remove: t -> elt -> unit + val mem : t -> elt -> bool + val cardinal: t -> int + val iter: (elt -> unit) -> t -> unit + val fold: (elt -> 'a -> 'a) -> t -> 'a -> 'a + end + +module Make(H: HashedType): (S with type elt = H.t) = + struct + type elt = H.t + type set = elt t + type t = set + let create = create + let clear = clear + let copy = copy + + let safehash key = (H.hash key) land max_int + + let rec mem_in_bucket key = function + | [] -> false + | x :: r -> H.equal key x || mem_in_bucket key r + + let add h key = + let i = (safehash key) mod (Array.length h.data) in + let bucket = h.data.(i) in + if not (mem_in_bucket key bucket) then begin + h.data.(i) <- key :: bucket; + h.size <- succ h.size; + if h.size > Array.length h.data lsl 1 then resize safehash h + end + + let remove h key = + let rec remove_bucket = function + [] -> + [] + | k :: next -> + if H.equal k key + then begin h.size <- pred h.size; next end + else k :: remove_bucket next in + let i = (safehash key) mod (Array.length h.data) in + h.data.(i) <- remove_bucket h.data.(i) + + let mem h key = + mem_in_bucket key h.data.((safehash key) mod (Array.length h.data)) + + let cardinal = cardinal + + let iter = iter + let fold = fold + end diff --git a/vendors/irmin-lmdb/hashset.mli b/vendors/irmin-lmdb/hashset.mli new file mode 100644 index 000000000000..5d7eb8aba898 --- /dev/null +++ b/vendors/irmin-lmdb/hashset.mli @@ -0,0 +1,107 @@ +(**************************************************************************) +(* *) +(* Copyright (C) Jean-Christophe Filliatre *) +(* *) +(* This software is free software; you can redistribute it and/or *) +(* modify it under the terms of the GNU Library General Public *) +(* License version 2.1, with the special exception on linking *) +(* described in file LICENSE. *) +(* *) +(* This software is distributed in the hope that it will be useful, *) +(* but WITHOUT ANY WARRANTY; without even the implied warranty of *) +(* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. *) +(* *) +(**************************************************************************) + +(* This module implements imperative sets as hash tables. + Operations like union, intersection or difference are not provided, + since there is no way to implement them easily (i.e. more easily than + iterating over sets). *) + +(*s Generic interface *) + +type 'a t +(* The type of sets. Elements have type ['a]. *) + +val create : int -> 'a t +(* [Hashset.create n] creates a new, empty set. + For best results, [n] should be on the + order of the expected number of elements that will be in + the set. The internal structure grows as needed, so [n] is just an + initial guess. *) + +val clear : 'a t -> unit +(* Empty a set. *) + +val add : 'a t -> 'a -> unit +(* [Hashset.add s x] adds [x] into the set [s]. *) + +val copy : 'a t -> 'a t +(* Return a copy of the given set. *) + +val mem : 'a t -> 'a -> bool +(* [Hashset.mem s x] checks if [x] belongs to [s]. *) + +val remove : 'a t -> 'a -> unit +(* [Hashset.remove s x] removes [x] from [s]. + It does nothing if [x] does not belong to [s]. *) + +val cardinal : 'a t -> int +(* [Hashset.cardinal s] returns the cardinal of [s]. *) + +val iter : ('a -> unit) -> 'a t -> unit +(* [Hashset.iter f s] applies [f] to all elements in [s]. *) + +val fold : ('a -> 'b -> 'b) -> 'a t -> 'b -> 'b +(* [Hashset.fold f s init] computes + [(f eN ... (f e1 init)...)], + where [e1 ... eN] are the elements in [s]. + The order in which the elements are passed to [f] is unspecified. *) + + +(*s Functorial interface *) + +module type HashedType = + sig + type t + (* The type of the elements. *) + val equal : t -> t -> bool + (* The equality predicate used to compare elements. *) + val hash : t -> int + (* A hashing function on elements. It must be such that if two elements are + equal according to [equal], then they have identical hash values + as computed by [hash]. + Examples: suitable ([equal], [hash]) pairs for arbitrary element + types include + ([(=)], {!Hashset.hash}) for comparing objects by structure, and + ([(==)], {!Hashset.hash}) for comparing objects by addresses + (e.g. for mutable or cyclic keys). *) + end + +(* The input signature of the functor {!Hashset.Make}. *) + +module type S = + sig + type elt + type t + val create : int -> t + val clear : t -> unit + val copy : t -> t + val add : t -> elt -> unit + val remove : t -> elt -> unit + val mem : t -> elt -> bool + val cardinal : t -> int + val iter : (elt -> unit) -> t -> unit + val fold : (elt -> 'a -> 'a) -> t -> 'a -> 'a + end +(* The output signature of the functor {!Hashset.Make}. *) + +module Make (H : HashedType) : S with type elt = H.t +(* Functor building an implementation of the hashtable structure. + The functor [Hashset.Make] returns a structure containing + a type [elt] of elements and a type [t] of hash sets. + The operations perform similarly to those of the generic + interface, but use the hashing and equality functions + specified in the functor argument [H] instead of generic + equality and hashing. *) + diff --git a/vendors/irmin-lmdb/irmin-lmdb.opam b/vendors/irmin-lmdb/irmin-lmdb.opam index 2c2a4a0d1f74..809d3b8e49b0 100644 --- a/vendors/irmin-lmdb/irmin-lmdb.opam +++ b/vendors/irmin-lmdb/irmin-lmdb.opam @@ -16,7 +16,6 @@ build-test: ["dune" "runtest" "-p" name] depends: [ "dune" {build & >= "1.0.1"} "ocplib-endian" {>= "1.0"} - "hashset" "irmin" {>= "1.4.0"} "lmdb" {>= "0.1"} ] -- GitLab From 5f678df53ae28fd1974c02ca425ccb25fa8fef3e Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 21 Dec 2018 19:09:20 +0100 Subject: [PATCH 3/6] irmin-lmdb/GC: fix upstream compilation warning for hashset --- vendors/irmin-lmdb/hashset.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/vendors/irmin-lmdb/hashset.ml b/vendors/irmin-lmdb/hashset.ml index 5033100ea2f6..020d07094582 100644 --- a/vendors/irmin-lmdb/hashset.ml +++ b/vendors/irmin-lmdb/hashset.ml @@ -41,7 +41,7 @@ let resize hashfun tbl = let osize = Array.length odata in let nsize = min (2 * osize + 1) Sys.max_array_length in if nsize <> osize then begin - let ndata = Array.create nsize [] in + let ndata = Array.make nsize [] in let rec insert_bucket = function [] -> () | key :: rest -> -- GitLab From 9dccd13da60648bbe1d15e948870874aa71be00d Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 21 Dec 2018 20:22:22 +0100 Subject: [PATCH 4/6] irmin-lmdb: multi-threaded I/O --- src/lib_storage/context.mli | 1 - vendors/irmin-lmdb/dune | 2 +- vendors/irmin-lmdb/irmin-lmdb.opam | 1 + vendors/irmin-lmdb/irmin_lmdb.ml | 152 +++++++++++++++++++---------- 4 files changed, 103 insertions(+), 53 deletions(-) diff --git a/src/lib_storage/context.mli b/src/lib_storage/context.mli index fca3a0ab3014..1f536a52d255 100644 --- a/src/lib_storage/context.mli +++ b/src/lib_storage/context.mli @@ -88,7 +88,6 @@ val commit: val set_head: index -> Chain_id.t -> Context_hash.t -> unit Lwt.t val set_master: index -> Context_hash.t -> unit Lwt.t - (** {2 Predefined Fields} ****************************************************) val get_protocol: context -> Protocol_hash.t Lwt.t diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune index 8a5132794f40..7a7ee303a80c 100644 --- a/vendors/irmin-lmdb/dune +++ b/vendors/irmin-lmdb/dune @@ -1,5 +1,5 @@ (library (name irmin_lmdb) (public_name irmin-lmdb) - (libraries irmin lmdb lwt.unix ocplib-endian) + (libraries irmin lmdb lwt.unix ocplib-endian mwt) (flags (:standard -safe-string))) diff --git a/vendors/irmin-lmdb/irmin-lmdb.opam b/vendors/irmin-lmdb/irmin-lmdb.opam index 809d3b8e49b0..87e8d3254be3 100644 --- a/vendors/irmin-lmdb/irmin-lmdb.opam +++ b/vendors/irmin-lmdb/irmin-lmdb.opam @@ -18,6 +18,7 @@ depends: [ "ocplib-endian" {>= "1.0"} "irmin" {>= "1.4.0"} "lmdb" {>= "0.1"} + "mwt" ] available: [ocaml-version >= "4.01.0"] diff --git a/vendors/irmin-lmdb/irmin_lmdb.ml b/vendors/irmin-lmdb/irmin_lmdb.ml index 42fd078f7b0e..d9695b312a9b 100644 --- a/vendors/irmin-lmdb/irmin_lmdb.ml +++ b/vendors/irmin-lmdb/irmin_lmdb.ml @@ -37,7 +37,14 @@ end open Lwt.Infix -type wtxn = Lmdb.rw Lmdb.txn * Lmdb.db +type db_r = { + db: Lmdb.t; +} + +type db_w = { + db: Lmdb.t; + mutable wtxn: (Lmdb.rw Lmdb.txn * Lmdb.db) option; +} (* The GC has 3 modes: - normal: all reads and writes are done normally on the main database file. @@ -70,9 +77,9 @@ type t = { readonly: bool; readers: resource; writers: resource; - mutable db: Lmdb.t ; mutable gc_mode: mode; - mutable wtxn: wtxn option; + mutable db_r: db_r Lazy.t Mwt.t; + mutable db_w: db_w Lazy.t Mwt.t; mutable concurrent_adds: string (* =key *) list; mutable generation: int; } @@ -86,14 +93,20 @@ let (|>>) v f = | Ok v -> f v | Error e -> Error e -let get_wtxn db = - match db.wtxn with - | Some t -> Ok t - | None -> - Lmdb.create_rw_txn db.db |>> fun txn -> - Lmdb.opendb txn |>> fun ddb -> - db.wtxn <- Some (txn, ddb); - Ok (txn, ddb) +let (!!) = Lazy.force + +let get_wtxn op db = + Mwt.run db.db_w (fun t -> + let t = !!t in + match t.wtxn with + | Some t -> Ok t + | None -> + Lmdb.create_rw_txn t.db |>> fun txn -> + Lmdb.opendb txn |>> fun ddb -> + t.wtxn <- Some (txn, ddb); + Ok (txn, ddb) + ) + >>= of_result op let src = Logs.Src.create "irmin.lmdb" ~doc:"Irmin in a Lmdb store" module Log = (val Logs.src_log src : Logs.LOG) @@ -147,7 +160,19 @@ let config let config = C.add config Conf.readonly readonly in Option.value_map mapsize ~default:config ~f:(C.add config Conf.mapsize) +let open_m = Mutex.create () +let with_lock m f = + Mutex.lock m; + try + let r = f () in + Mutex.unlock m; + r + with e -> + Mutex.unlock m; + raise e + let open_db ~root ~mapsize ~readonly = + with_lock open_m @@ fun () -> if not (Sys.file_exists root) then Unix.mkdir root 0o755; let flags = if readonly then [ Lmdb.RdOnly ] else [] in let sync_flag = @@ -171,17 +196,39 @@ let open_db ~root ~mapsize ~readonly = let dbs = Hashtbl.create 3 +let concurrent_writers = 1 +let concurrent_readers = 10 + let make conf = let { Conf.root ; mapsize ; readonly } = Conf.of_config conf in - try Hashtbl.find dbs (root, readonly) + try Lwt.return (Hashtbl.find dbs (root, readonly)) with Not_found -> - let db = open_db ~root ~mapsize ~readonly in + Mwt.make + ~init:(fun () -> + lazy {db=open_db ~root ~mapsize ~readonly; wtxn=None}) + ~at_exit:(fun db -> if Lazy.is_val db then ( + let t = !!db in + Lmdb.closedir t.db; + t.wtxn <- None)) + concurrent_writers + >>= fun db_w -> + Mwt.make + ~init:(fun () -> + lazy {db=open_db ~root ~mapsize ~readonly:true}) + ~at_exit:(fun db -> + if Lazy.is_val db then Lmdb.closedir !!db.db) + concurrent_readers + >>= fun db_r -> + (* force read-write handlers to make sure the file permissions are + correct *) + (if readonly then Lwt.return () + else Mwt.run db_w (fun db -> ignore !!db)) + >|= fun () -> let db = { - db; root; readonly; + db_r; db_w; root; readonly; gc_mode = Normal; readers = resource (); writers = resource (); - wtxn = None; concurrent_adds = []; generation = 0; } in @@ -192,25 +239,24 @@ let close t = Hashtbl.remove dbs (t.root, t.readonly); assert (t.readers.n = 0); assert (t.writers.n = 0); - Lmdb.closedir t.db + Mwt.close t.db_r >>= fun () -> + Mwt.close t.db_w type ('r) reader = { f : 'k. 'k Lmdb.txn -> Lmdb.db -> ('r, Lmdb.error) result } [@@unboxed] +(* Warning: it reads only commited data. This is fine for Irmin + "transactions" and fine for the GC has we keep an in-memory cache + of the promoted keys (see Tbl below). *) let with_read_db db ~f = - match db.wtxn with - | None -> - Lmdb.with_ro_db db.db ~f:f.f - | Some (txn, ddb) -> - f.f txn ddb + Mwt.run db.db_r (fun t -> Lmdb.with_ro_db !!t.db ~f:f.f) let get txn db k = Result.map ~f:Cstruct.of_bigarray (Lmdb.get txn db k) let find_bind db k ~f = - match - with_read_db db ~f:{ f = fun txn db -> Result.map ~f (get txn db k) } - with + with_read_db db ~f:{ f = fun txn db -> Result.map ~f (get txn db k) } + >|= function | Error KeyNotFound -> Ok None | Error err -> Error err | Ok v -> Ok v @@ -219,44 +265,47 @@ module Raw = struct let mem db k = with_read_db db ~f:{ f = fun txn db -> Lmdb.mem txn db k } - |> of_result "mem" + >>= of_result "mem" let find db key of_ba = find_bind db key ~f:(fun v -> Option.of_result (of_ba v)) - |> of_result "find" + >>= of_result "find" let add_string db k v = - (get_wtxn db |>> fun (txn, ddb) -> - Lmdb.put_string txn ddb k v) - |> of_result "add_string" + get_wtxn "add_string" db >>= fun (txn, ddb) -> + Mwt.run db.db_w (fun _ -> Lmdb.put_string txn ddb k v) + >>= of_result "add_string" let add_cstruct db k v = - (get_wtxn db |>> fun (txn, ddb) -> - Lmdb.put txn ddb k (Cstruct.to_bigarray v)) - |> of_result "add_ba" + get_wtxn "add_cstruct" db >>= fun (txn, ddb) -> + Mwt.run db.db_w (fun _ -> Lmdb.put txn ddb k (Cstruct.to_bigarray v)) + >>= of_result "add_cstruct" let add db k = function | `String v -> add_string db k v | `Cstruct v -> add_cstruct db k v let remove db k = - (get_wtxn db |>> fun (txn, ddb) -> - match Lmdb.del txn ddb k with - | Ok () | Error Lmdb.KeyNotFound -> Ok () - | x -> x) - |> of_result "remove" + get_wtxn "remove" db >>= fun (txn, ddb) -> + Mwt.run db.db_w (fun _ -> + match Lmdb.del txn ddb k with + | Ok () | Error Lmdb.KeyNotFound -> Ok () + | x -> x) + >>= of_result "remove" let commit op db = - (match db.wtxn with - | None -> Ok () - | Some (t, _ddb) -> - db.wtxn <- None; - Lmdb.commit_txn t) - |> of_result op + Mwt.run db.db_w (fun t -> + let db = !!t in + match db.wtxn with + | None -> Ok () + | Some (t, _ddb) -> + db.wtxn <- None; + Lmdb.commit_txn t) + >>= of_result op let fsync db = - Lmdb.sync ~force:true db.db - |> of_result "fsync" + Mwt.run db.db_w (fun db -> Lmdb.sync ~force:true !!db.db) + >>= of_result "fsync" end @@ -813,7 +862,7 @@ module Make let commit_t t : Commit.t = node_t t, t.db let v config = - let db = make config in + make config >>= fun db -> Branch.v db >|= fun branch -> { db; branch; config } @@ -877,7 +926,7 @@ module Make } let close t = - close t.new_db; + close t.new_db >>= fun () -> close t.old_db let new_root repo = @@ -889,7 +938,7 @@ module Make let root = new_root repo in Irmin.Private.Conf.add repo.P.Repo.config Conf.root (Some root) in - let new_db = make config in + make config >|= fun new_db -> let tbl = Tbl.create 10_123 in let stats = stats () in { tbl; stats; new_db; old_db = repo.db; switch } @@ -999,12 +1048,13 @@ module Make (* rename *) disable repo.db.readers >>= fun () -> - close t; + close t >>= fun () -> rename () >>= fun () -> (* re-open the database *) P.Repo.v repo.config >>= fun x -> - repo.db.db <- x.db.db; + repo.db.db_r <- x.db.db_r; + repo.db.db_w <- x.db.db_w; repo.db.generation <- repo.db.generation + 1; enable repo.db.readers >>= fun () -> @@ -1066,7 +1116,7 @@ module Make let t, w = Lwt.task () in Lwt_switch.add_hook switch (fun () -> t); roots ~root ~keep >>= fun roots -> - let gc = Irmin_GC.v ?switch repo in + Irmin_GC.v ?switch repo >>= fun gc -> Lwt.catch (fun () -> promote_all ~repo ?before_pivot ~branches gc roots) (function -- GitLab From 1b2e0581312ec2b1f99a9a86b4eee0340a107073 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 21 Dec 2018 20:23:02 +0100 Subject: [PATCH 5/6] irmin-lmdb: vendor mwt from: https://github.com/hcarty/mwt commit: fb7d8b0238b444d9cd5bae90896415911c9ed11b --- vendors/irmin-lmdb/dune | 2 +- vendors/irmin-lmdb/irmin-lmdb.opam | 1 - vendors/irmin-lmdb/mwt.ml | 206 +++++++++++++++++++++++++++++ vendors/irmin-lmdb/mwt.mli | 64 +++++++++ 4 files changed, 271 insertions(+), 2 deletions(-) create mode 100644 vendors/irmin-lmdb/mwt.ml create mode 100644 vendors/irmin-lmdb/mwt.mli diff --git a/vendors/irmin-lmdb/dune b/vendors/irmin-lmdb/dune index 7a7ee303a80c..8a5132794f40 100644 --- a/vendors/irmin-lmdb/dune +++ b/vendors/irmin-lmdb/dune @@ -1,5 +1,5 @@ (library (name irmin_lmdb) (public_name irmin-lmdb) - (libraries irmin lmdb lwt.unix ocplib-endian mwt) + (libraries irmin lmdb lwt.unix ocplib-endian) (flags (:standard -safe-string))) diff --git a/vendors/irmin-lmdb/irmin-lmdb.opam b/vendors/irmin-lmdb/irmin-lmdb.opam index 87e8d3254be3..809d3b8e49b0 100644 --- a/vendors/irmin-lmdb/irmin-lmdb.opam +++ b/vendors/irmin-lmdb/irmin-lmdb.opam @@ -18,7 +18,6 @@ depends: [ "ocplib-endian" {>= "1.0"} "irmin" {>= "1.4.0"} "lmdb" {>= "0.1"} - "mwt" ] available: [ocaml-version >= "4.01.0"] diff --git a/vendors/irmin-lmdb/mwt.ml b/vendors/irmin-lmdb/mwt.ml new file mode 100644 index 000000000000..5b986161ccc2 --- /dev/null +++ b/vendors/irmin-lmdb/mwt.ml @@ -0,0 +1,206 @@ +(* Mwt, originally derived from the Lwt project's lwt_preemptive.ml *) + +[@@@ocaml.warning "-3"] + +module Lwt_sequence = Lwt_sequence + +[@@@ocaml.warning "+3"] + +let noop () = () + +type 'state worker = + { (* Channel used to communicate notification id and tasks to the worker + thread. *) + task_channel: [`Task of int * ('state -> unit) | `Quit] Event.channel + ; (* The worker thread. *) + mutable thread: Thread.t + ; (* The worker's parent thread pool *) + pool: 'state t + ; (* Wake this up when the worker quits *) + quit: int + ; (* This will resolve once the worker quits *) + complete: unit Lwt.t } + +and 'state t = + { (* Has the pool been closed? *) + mutable closed: bool + ; (* Number of preemptive threads in the pool *) + num_threads: int + ; (* Initialization function for a new thread *) + init: unit -> 'state + ; (* Function to call when a thread is disposed of *) + at_exit: 'state -> unit + ; (* Queue of worker threads *) + workers: 'state worker Queue.t + ; (* Queue of clients waiting for a worker to be available *) + waiters: 'state worker Lwt.u Lwt_sequence.t + ; (* All workers in the pool, even those in use *) + all_workers: 'state worker Weak.t } + +(* Code executed by a worker *) +let worker_loop init_complete init_result worker = + match worker.pool.init () with + | exception exn -> + init_result := Error exn ; + Lwt_unix.send_notification init_complete + | state -> + init_result := Ok () ; + Lwt_unix.send_notification init_complete ; + let finally () = + worker.pool.at_exit state ; + Lwt_unix.send_notification worker.quit + in + try + while not worker.pool.closed do + match Event.sync (Event.receive worker.task_channel) with + | `Task (id, task) -> + task state ; + (* Tell the main thread that work is done *) + Lwt_unix.send_notification id + | `Quit -> () + done ; + finally () + with exn -> finally () ; raise exn + +(* Create a new worker *) +let make_worker pool = + let worker = + let complete, waiter = Lwt.wait () in + let quit = + Lwt_unix.make_notification ~once:true (fun () -> Lwt.wakeup waiter ()) + in + { task_channel= Event.new_channel () + ; thread= Thread.self () + ; pool + ; quit + ; complete } + in + let ready, init_waiter = Lwt.wait () in + let init_result = ref (Error (Failure "Mwt.make")) in + let init_complete = + Lwt_unix.make_notification ~once:true (fun () -> + Lwt.wakeup_result init_waiter !init_result ) + in + worker.thread <- Thread.create (worker_loop init_complete init_result) worker ; + (worker, ready) + +(* Add a worker to the pool *) +let add_worker pool worker = + match Lwt_sequence.take_opt_l pool.waiters with + | None -> Queue.add worker pool.workers + | Some w -> Lwt.wakeup w worker + +(* Wait for worker to be available, then return it *) +let get_worker pool = + if not (Queue.is_empty pool.workers) then + Lwt.return (Queue.take pool.workers) + else ( Lwt.add_task_r pool.waiters [@ocaml.warning "-3"] ) + +let run pool f = + let%lwt () = + if pool.closed then Lwt.fail_invalid_arg "Mwt.run" else Lwt.return_unit + in + let result = ref (Error (Failure "Mwt.run")) in + (* The task for the worker thread: *) + let task state = + try result := Ok (f state) with exn -> result := Error exn + in + let%lwt worker = get_worker pool in + let waiter, wakener = Lwt.wait () in + let id = + Lwt_unix.make_notification ~once:true (fun () -> + Lwt.wakeup_result wakener !result ) + in + Lwt.finalize + (fun () -> + (* Send the id and the task to the worker: *) + Event.sync (Event.send worker.task_channel (`Task (id, task))) ; + waiter ) + (fun () -> add_worker pool worker ; Lwt.return_unit) + +let close_async pool = + (* Close all the available worker threads in the pool *) + pool.closed <- true ; + Queue.iter + (fun worker -> Event.sync (Event.send worker.task_channel `Quit)) + pool.workers + +let close pool = + close_async pool ; + (* Wait for all the workers to signal that they are done *) + Array.init (Weak.length pool.all_workers) (fun i -> + Weak.get pool.all_workers i ) + |> Array.fold_left + (fun l worker -> + match worker with None -> l | Some w -> w.complete :: l ) + [] + |> Lwt.join + +let make ~init ~at_exit num_threads = + if num_threads < 1 then + invalid_arg + (Format.asprintf "Mwt.make: number of threads is %d, must be >= 1" + num_threads) ; + let pool = + { num_threads + ; init + ; at_exit + ; workers= Queue.create () + ; waiters= Lwt_sequence.create () + ; closed= false + ; all_workers= Weak.create num_threads } + in + let ready = ref [] in + for _ = 1 to num_threads do + let worker, init = make_worker pool in + ready := init :: !ready ; + Queue.add worker pool.workers + done ; + let i = ref 0 in + Queue.iter + (fun worker -> + Weak.set pool.all_workers !i (Some worker) ; + incr i ) + pool.workers ; + let%lwt () = Lwt.join !ready in + Lwt.return pool + +(* Calling back into the main thread *) +(* Queue of [unit -> unit Lwt.t] functions. *) +let jobs = Queue.create () + +(* Mutex to protect access to [jobs]. *) +let jobs_mutex = Mutex.create () + +let job_notification = + Lwt_unix.make_notification (fun () -> + (* Take the first job. The queue is never empty at this point. *) + Mutex.lock jobs_mutex ; + let thunk = Queue.take jobs in + Mutex.unlock jobs_mutex ; + ignore (thunk ()) ) + +let run_in_main f = + let channel = Event.new_channel () in + (* Create the job. *) + let job () = + (* Execute [f] and wait for its result. *) + let%lwt result = + match%lwt f () with + | ret -> Lwt.return (Ok ret) + | exception exn -> Lwt.return (Error exn) + in + (* Send the result. *) + Event.sync (Event.send channel result) ; + Lwt.return_unit + in + (* Add the job to the queue. *) + Mutex.lock jobs_mutex ; + Queue.add job jobs ; + Mutex.unlock jobs_mutex ; + (* Notify the main thread. *) + Lwt_unix.send_notification job_notification ; + (* Wait for the result. *) + match Event.sync (Event.receive channel) with + | Ok ret -> ret + | Error exn -> raise exn diff --git a/vendors/irmin-lmdb/mwt.mli b/vendors/irmin-lmdb/mwt.mli new file mode 100644 index 000000000000..facfd36e7b77 --- /dev/null +++ b/vendors/irmin-lmdb/mwt.mli @@ -0,0 +1,64 @@ +(** {1 Preemptive threads managed from Lwt} + + This module provides a way to call a function from Lwt and have that + function in a separate OCaml preemptive thread. *) + +(** {2 Preemptive thread pools} + + Pools of threads which may be delegated to from an Lwt application. Each + thread in the pool carries its own local state which is passed to any + function running within a pool's thread. *) + +(** {3 Creating a thread pool} *) + +(** Preemptive thread pool carrying some local state type *) +type 'state t + +val make : + init:(unit -> 'state) -> at_exit:('state -> unit) -> int -> 'state t Lwt.t +(** [make ~init ~at_exit num_threads] creates a new pool of [num_threads] + preemptive threads. If no special per-thread state is required you can + pass {!noop} to [init] and [at_exit]. The returned promise will resolve + once all [num_threads] calls to [init] have completed. If any instance of + [init] raises an exception then the resulting promise will resolve to the + same exception. + + @param init is run from within each newly created thread in the pool. Its + return value defines the initial state of the newly initialized thread. + @param at_exit is run within each thread in the pool immediately before + that thread exits. Its argument is the state of the exiting thread. + + @raise Invalid_argument if [num_threads < 1]. *) + +val noop : unit -> unit +(** [noop] may be passed to the [init] and [at_exit] parameters of {!make} when + no special per-thread initialization or cleanup is required. [noop] is + [fun () -> ()]. *) + +(** {3 Using a thread pool} *) + +val run : 'state t -> ('state -> 'a) -> 'a Lwt.t +(** [run pool f] will run [f state] in one of the preemptive threads in + [pool], where [state] carries the state of the thread [f] runs under. If + there is a thread available in [pool] then the call will block until [f] + completes. If no thread is available from [pool] then the call to [f] will + be queued until a thread in [pool] is available. + + [f] may use {!run_in_main} to run code in a program's Lwt context. *) + +val close : _ t -> unit Lwt.t +(** [close pool] immediately marks [pool] as closed, quits all idle threads in + the pool and blocks until all in-use threads have terminated. Any further + uses of [pool] will raise [Invalid_argument]. *) + +val close_async : _ t -> unit +(** [close_async pool] marks [pool] as closed and sends a "quit" signal to all + idle threads. Any threads currently in use will quit once they are done + with their current task. This function does not block. *) + +(** {2 Calling back into Lwt from a preemptive thread} *) + +val run_in_main : (unit -> 'a Lwt.t) -> 'a +(** [run_in_main f] can be used from within a preemptive thread to run [f ()] + in the program's main Lwt context. It can be seen as a dual to {!run} + and {!run_thread}. *) -- GitLab From 0622a3389d9854c11ebb8b24ed188e17d2b54097 Mon Sep 17 00:00:00 2001 From: Thomas Gazagnaire Date: Fri, 21 Dec 2018 20:29:25 +0100 Subject: [PATCH 6/6] irmin-lmdb: fix compilation of mwt without ppx_let --- vendors/irmin-lmdb/mwt.ml | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/vendors/irmin-lmdb/mwt.ml b/vendors/irmin-lmdb/mwt.ml index 5b986161ccc2..a54ea2feacea 100644 --- a/vendors/irmin-lmdb/mwt.ml +++ b/vendors/irmin-lmdb/mwt.ml @@ -1,5 +1,7 @@ (* Mwt, originally derived from the Lwt project's lwt_preemptive.ml *) +open Lwt.Infix + [@@@ocaml.warning "-3"] module Lwt_sequence = Lwt_sequence @@ -97,15 +99,14 @@ let get_worker pool = else ( Lwt.add_task_r pool.waiters [@ocaml.warning "-3"] ) let run pool f = - let%lwt () = - if pool.closed then Lwt.fail_invalid_arg "Mwt.run" else Lwt.return_unit - in + (if pool.closed then Lwt.fail_invalid_arg "Mwt.run" else Lwt.return_unit) + >>= fun () -> let result = ref (Error (Failure "Mwt.run")) in (* The task for the worker thread: *) let task state = try result := Ok (f state) with exn -> result := Error exn in - let%lwt worker = get_worker pool in + get_worker pool >>= fun worker -> let waiter, wakener = Lwt.wait () in let id = Lwt_unix.make_notification ~once:true (fun () -> @@ -162,7 +163,7 @@ let make ~init ~at_exit num_threads = Weak.set pool.all_workers !i (Some worker) ; incr i ) pool.workers ; - let%lwt () = Lwt.join !ready in + Lwt.join !ready >>= fun () -> Lwt.return pool (* Calling back into the main thread *) @@ -185,11 +186,10 @@ let run_in_main f = (* Create the job. *) let job () = (* Execute [f] and wait for its result. *) - let%lwt result = - match%lwt f () with - | ret -> Lwt.return (Ok ret) - | exception exn -> Lwt.return (Error exn) - in + Lwt.catch + (fun () -> f () >|= fun ret -> Ok ret) + (fun exn -> Lwt.return (Error exn)) + >>= fun result -> (* Send the result. *) Event.sync (Event.send channel result) ; Lwt.return_unit -- GitLab