diff --git a/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml b/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml index b6a6a39ff227cd3b56eb629c8e2a24249069e688..7f36bb9ef3cf1e53b15b1765bb5f9f58fd035154 100644 --- a/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml @@ -60,7 +60,8 @@ module type S = sig val iter_s : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t - val iter_p : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : + ?max_concurrency:int -> (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t val iter_e : (key -> 'a -> (unit, 'trace) result) -> 'a t -> (unit, 'trace) result @@ -71,6 +72,7 @@ module type S = sig (unit, 'trace) result Lwt.t val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error) result Lwt.t) -> 'a t -> (unit, 'error list) result Lwt.t @@ -150,7 +152,8 @@ module type SeededS = sig val iter_s : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t - val iter_p : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : + ?max_concurrency:int -> (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t val iter_e : (key -> 'a -> (unit, 'trace) result) -> 'a t -> (unit, 'trace) result @@ -161,6 +164,7 @@ module type SeededS = sig (unit, 'trace) result Lwt.t val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error) result Lwt.t) -> 'a t -> (unit, 'error list) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/bare/sigs/monad.ml b/src/lib_lwt_result_stdlib/bare/sigs/monad.ml index 81df6077f1ebf169b9a3268c7350fe04a402be90..47721f5fbfbaf88bc2ffcb0192d08d78063833d5 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/monad.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/monad.ml @@ -26,7 +26,8 @@ (** {1 Lwt, result, and Lwt-result monad operators} This module provides the necessary functions and operators to use Lwt, - result and Lwt-result as a monad. + result and Lwt-result as a monad. There is some overlap with {!Lwt} and + {!Lwt_result}. {2 Basics} diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml index a7a549ba3312c4a0581e228fee7d5e41ce12d32a..09e2189fd370f0dc0453757e448ab850b4cad745 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml @@ -63,6 +63,19 @@ All the traversal functions that are suffixed with [_ep] are within the combined error-and-Lwt monad. These function traverse the elements concurrently with a best-effort behaviour. + + Some of [_p]- and [_ep]-suffixed functions have a [max_concurrency] + parameter. If this parameter is omitted, there are not limits to the number + of concurrent calls to the traversal function that can happen concurrently. + More specifically, the library does not bound the number of pending promises + created by calls to the traversal function. If this parameter is set, then + there are at most this number (or 1, whichever is highest) pending traversal + promises at any given time. + + Note that setting [max_concurrency] to [1] causes the elements to be + processed one after the other just like with the [_s] (or [_es]) traversor. + However, the error management is different: [_p] (and [_ep]) traversors have + a best-effort semantic even if [max_concurrency] is set. *) (** {2 Special consideration} @@ -147,8 +160,12 @@ module type S = sig - is rejected if at least one of the promises is, otherwise - is fulfilled with [Error _] if at least one of the promises is, otherwise - - is fulfilled with [Ok ()] if all the promises are. *) + - is fulfilled with [Ok ()] if all the promises are. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'trace) result Lwt.t) -> 'a t -> (unit, 'trace list) result Lwt.t @@ -157,8 +174,11 @@ module type S = sig steps of the iteration are started concurrently. The promise [iter_p f s] is resolved only once all the promises of the iteration are. At this point it is either fulfilled if all promises are, or rejected if at least one of - them is. *) - val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + them is. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t val unfold : ('b -> ('a * 'b) option) -> 'b -> 'a t end diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml index fb254a3a25b7442db73151b5bd20273e8246dfa9..9885fe13ace6290783ba1ac44d11502cdc407f5f 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml @@ -192,8 +192,15 @@ fold_left_e prefix of [seq] have resolved. Note that the behaviour for interrupted sequences is in line with the - best-effort semantic of Lwtreslib. *) - val iter_p : ('a -> unit Lwt.t) -> ('a, 'e) t -> (unit, 'e) result Lwt.t + best-effort semantic of Lwtreslib. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) + val iter_p : + ?max_concurrency:int -> + ('a -> unit Lwt.t) -> + ('a, 'e) t -> + (unit, 'e) result Lwt.t (** There is no [iter_ep] in [Bare]. The reason is that there can be two sources of failures and there is no satisfying way to combine failures for diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml index dfeb30cb839dc2b03e63d842e5491ed91a648bfd..9f38631aa3bf4b7fdf4275771ac2fdb0efc754d6 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml @@ -118,6 +118,7 @@ module type S = sig otherwise - is fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'trace) result Lwt.t) -> 'a t -> (unit, 'trace list) result Lwt.t @@ -128,7 +129,7 @@ module type S = sig is resolved only once all the promises of the iteration are. At this point it is either fulfilled if all promises are, or rejected if at least one of them is. *) - val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t val map : ('a -> 'b) -> 'a t -> 'b t diff --git a/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml b/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml index ca3d41fa683997291d0d3d28421cf9b1dbdeb82f..82f84d7b696fcd74f8cf1d77f1457f8075c002ef 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml @@ -46,9 +46,11 @@ module Make (H : Stdlib.Hashtbl.HashedType) : S with type key = H.t = struct let iter_es f t = iter_es (fun (k, v) -> f k v) (to_seq t) - let iter_p f t = iter_p (fun (k, v) -> f k v) (to_seq t) + let iter_p ?max_concurrency f t = + iter_p ?max_concurrency (fun (k, v) -> f k v) (to_seq t) - let iter_ep f t = iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) let fold_e f t init = fold_left_e (fun acc (k, v) -> f k v acc) init (to_seq t) @@ -81,9 +83,11 @@ module MakeSeeded (H : Stdlib.Hashtbl.SeededHashedType) : let iter_es f t = iter_es (fun (k, v) -> f k v) (to_seq t) - let iter_ep f t = iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) - let iter_p f t = iter_p (fun (k, v) -> f k v) (to_seq t) + let iter_p ?max_concurrency f t = + iter_p ?max_concurrency (fun (k, v) -> f k v) (to_seq t) let fold_e f t init = fold_left_e (fun acc (k, v) -> f k v acc) init (to_seq t) diff --git a/src/lib_lwt_result_stdlib/bare/structs/monad.ml b/src/lib_lwt_result_stdlib/bare/structs/monad.ml index d8671994748fd2d44e1c7b4deebd2dba3e575886..fd6110c045a6ad201381408b54b772b99ab36cf5 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/monad.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/monad.ml @@ -156,3 +156,33 @@ let both_ep a b = both_p a b >|= fun (a, b) -> both_e a b let lwt_apply2 f x y = try f x y with exn -> Lwt.fail exn let lwt_apply3 f a x y = try f a x y with exn -> Lwt.fail exn + +module PoolLight = struct + (* Inspired but heavily transformed from Lwt_pool *) + + type t = {max : int; mutable count : int; waiters : unit Lwt.u Queue.t} + + let create m = {max = m; count = 0; waiters = Queue.create ()} + + let release p = + match Queue.take_opt p.waiters with + | Some wakener -> Lwt.wakeup_later wakener () + | None -> p.count <- p.count - 1 + + let acquire p = + assert (p.count <= p.max) ; + if p.count = p.max then ( + let (pending, resolver) = Lwt.task () in + Queue.push resolver p.waiters ; + pending) + else ( + p.count <- p.count + 1 ; + Lwt.return_unit) + + let use p f = + acquire p >>= fun () -> + assert (p.count <= p.max) ; + Lwt.finalize f (fun () -> + release p ; + Lwt.return_unit) +end diff --git a/src/lib_lwt_result_stdlib/bare/structs/monad.mli b/src/lib_lwt_result_stdlib/bare/structs/monad.mli index 80dfc822e756eddd8dd0489db9a2967cab405f4a..9d68f9fa1796ef5d8097a2a31052fce5d84bdb7d 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/monad.mli +++ b/src/lib_lwt_result_stdlib/bare/structs/monad.mli @@ -30,3 +30,11 @@ include Bare_sigs.Monad.S val lwt_apply2 : ('a -> 'b -> 'c Lwt.t) -> 'a -> 'b -> 'c Lwt.t val lwt_apply3 : ('a -> 'b -> 'c -> 'd Lwt.t) -> 'a -> 'b -> 'c -> 'd Lwt.t + +module PoolLight : sig + type t + + val create : int -> t + + val use : t -> (unit -> 'a Lwt.t) -> 'a Lwt.t +end diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq.ml b/src/lib_lwt_result_stdlib/bare/structs/seq.ml index dfe86fd237b8bc7e9be894b1be6b7b4b48d4065c..0c37d18f492301fc248056f37711dd8be0392537 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq.ml @@ -93,6 +93,20 @@ let iter_ep f seq = in iter_ep f seq [] +let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> iter_ep f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_ep acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq + let iter_p f seq = let rec iter_p f seq acc = match seq () with @@ -101,5 +115,19 @@ let iter_p f seq = in iter_p f seq [] +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> iter_p f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_p acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq + let rec unfold f a () = match f a with None -> Nil | Some (item, a) -> Cons (item, unfold f a) diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml b/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml index ba08cd3b10bf2dc8ba09dc67c7905ef825c322cd..f98d54ebe333c1b40e3222ba5e83391c7320fe63 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml @@ -129,6 +129,21 @@ let iter_p f seq = in iter_p [] f seq +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> iter_p f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Error _ as e -> join_p acc >>= fun () -> Lwt.return e + | Ok Nil -> join_p acc >>= fun () -> Monad.unit_es + | Ok (Cons (item, seq)) -> traverse (f item :: acc) seq + in + traverse [] seq + let rec map f seq () = seq () >|? function Nil -> Nil | Cons (item, seq) -> Cons (f item, map f seq) diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml b/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml index a4e6f3b6734fa4f2e7135e21922a78fa21571348..ba36f31459c348929e884dc9490be62673e7ff23 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml @@ -109,11 +109,39 @@ let rec iter_es f seq = let iter_es f seq = iter_es f @@ protect seq -let iter_ep f seq = - fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_ep - -let iter_p f seq = - fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_p +let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> + fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_ep + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_ep + +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> + fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_p + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_p let rec map f seq () = seq () >|= function Nil -> Nil | Cons (item, seq) -> Cons (f item, map f seq) diff --git a/src/lib_lwt_result_stdlib/test/dune b/src/lib_lwt_result_stdlib/test/dune index 3e6cd5719dc229078022bab5d1aff922aa48be4d..de95b7b0226bd46475cb2235194e69003d5d3572 100644 --- a/src/lib_lwt_result_stdlib/test/dune +++ b/src/lib_lwt_result_stdlib/test/dune @@ -9,6 +9,7 @@ test_fuzzing_set test_fuzzing_seq_tiered test_fuzzing_option + test_fuzzing_seq_max_concurrency ) (libraries tezos-lwt-result-stdlib diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml index 0cde750f245eb4243ba5ea398bb10cbafaada3f9..39091479e0f08894381d42e12c3bf3e6c67dff75 100644 --- a/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml @@ -408,6 +408,8 @@ let many = QCheck.(list int) let maybe = QCheck.(option int) +let very_many = QCheck.map (fun l -> l @ l @ l @ l @ l @ l @ l @ l @ l @ l) many + let manymany = let open QCheck in oneof diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml new file mode 100644 index 0000000000000000000000000000000000000000..c4759811b05e40c5faade826c43cc2402f57f1e3 --- /dev/null +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml @@ -0,0 +1,234 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2021 Nomadic Labs *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +open Support.Lib +open Test_fuzzing_helpers + +module type TESTABLE_P = sig + type 'a t + + val name : string + + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + + val of_list : 'a list -> 'a t +end + +module type TESTABLE_EP = sig + type 'a t + + val name : string + + val iter_ep : + ?max_concurrency:int -> + ('a -> (unit, 'e Monad.trace) result Lwt.t) -> + 'a t -> + (unit, 'e Monad.trace) result Lwt.t + + val of_list : 'a list -> 'a t +end + +module type TESTABLE_P_EP = sig + type 'a t + + val name : string + + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + + val iter_ep : + ?max_concurrency:int -> + ('a -> (unit, 'e Monad.trace) result Lwt.t) -> + 'a t -> + (unit, 'e Monad.trace) result Lwt.t + + val of_list : 'a list -> 'a t +end + +module SeqTestable : TESTABLE_P_EP = struct + type 'a t = 'a Seq.t + + let name = "Seq" + + let iter_p = Seq.iter_p + + let iter_ep = Seq.iter_ep + + let of_list = List.to_seq +end + +module SeqsTestable : TESTABLE_P_EP = struct + type 'a t = 'a Seq_s.t + + let name = "Seq_s" + + let iter_p = Seq_s.iter_p + + let iter_ep = Seq_s.iter_ep + + let of_list l = Seq_s.of_seq @@ List.to_seq l +end + +module SeqeTestable : TESTABLE_P = struct + type 'a t = ('a, unit) Seq_e.t + + let name = "Seq_e" + + let iter_p ?max_concurrency f s = + let open Monad in + Seq_e.iter_p ?max_concurrency f s >>= function + | Ok o -> Lwt.return o + | Error () -> assert false + + let of_list l = Seq_e.of_seq @@ List.to_seq l +end + +let iter_p_max_is_max (module M : TESTABLE_P) = + let open QCheck in + Test.make + ~name: + (Format.asprintf + "%s.iter_p ~max_concurrency: not too many promises" + M.name) + (pair Lib_test.Qcheck_helpers.uint8 very_many) + (fun (max_concurrency, input) -> + let open Monad in + let current = ref 0 in + let max_concurrency = max_concurrency + 1 in + assert (max_concurrency > 0) ; + let fn x = + assert (!current < max_concurrency) ; + incr current ; + assert (!current <= max_concurrency) ; + log_pause x >>= fun () -> + assert (!current <= max_concurrency) ; + decr current ; + assert (!current < max_concurrency) ; + Lwt.return_unit + in + let p : bool Lwt.t = + M.iter_p ~max_concurrency fn (M.of_list input) >>= fun () -> + Lwt.return_true + in + Lwt_main.run p) + +let iter_ep_max_is_max (module M : TESTABLE_EP) = + let open QCheck in + Test.make + ~name: + (Format.asprintf + "%s.iter_ep ~max_concurrency: not too many promises" + M.name) + (pair Lib_test.Qcheck_helpers.uint8 very_many) + (fun (max_concurrency, input) -> + let open Monad in + let current = ref 0 in + let max_concurrency = max_concurrency + 1 in + assert (max_concurrency > 0) ; + let fn x = + assert (!current < max_concurrency) ; + incr current ; + assert (!current <= max_concurrency) ; + log_pause x >>= fun () -> + assert (!current <= max_concurrency) ; + decr current ; + assert (!current < max_concurrency) ; + Monad.return_unit + in + let p : bool Lwt.t = + M.iter_ep ~max_concurrency fn (M.of_list input) >>= function + | Ok () -> Lwt.return_true + | Error _ -> assert false + in + Lwt_main.run p) + +let max_is_max = + [ + iter_p_max_is_max (module SeqTestable : TESTABLE_P); + iter_p_max_is_max (module SeqsTestable : TESTABLE_P); + iter_p_max_is_max (module SeqeTestable : TESTABLE_P); + iter_ep_max_is_max (module SeqTestable : TESTABLE_EP); + iter_ep_max_is_max (module SeqsTestable : TESTABLE_EP); + ] + +let iter_p_w_wo_max (module M : TESTABLE_P) = + let open QCheck in + Test.make + ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) + (triple + (triple Test_fuzzing_helpers.Fn.arith one one) + Lib_test.Qcheck_helpers.int8 + many) + (fun ((Fun (_, fn), const, init), max_concurrency, input) -> + let open Monad in + eq_es + (let acc = ref init in + M.iter_p (IterSOf.monotonous acc fn const) (M.of_list input) + >>= fun () -> Monad.return !acc) + (let acc = ref init in + M.iter_p + ~max_concurrency + (IterSOf.monotonous acc fn const) + (M.of_list input) + >>= fun () -> Monad.return !acc)) + +let iter_ep_w_wo_max (module M : TESTABLE_EP) = + let open QCheck in + Test.make + ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) + (triple + (triple Test_fuzzing_helpers.Fn.arith one one) + Lib_test.Qcheck_helpers.int8 + many) + (fun ((Fun (_, fn), const, init), max_concurrency, input) -> + let open Monad in + eq_es + (let acc = ref init in + M.iter_ep (IterESOf.monotonous acc fn const) (M.of_list input) + >>=? fun () -> Monad.return !acc) + (let acc = ref init in + M.iter_ep + ~max_concurrency + (IterESOf.monotonous acc fn const) + (M.of_list input) + >>=? fun () -> Monad.return !acc)) + +let w_wo = + [ + iter_p_w_wo_max (module SeqTestable : TESTABLE_P); + iter_p_w_wo_max (module SeqsTestable : TESTABLE_P); + iter_p_w_wo_max (module SeqeTestable : TESTABLE_P); + iter_ep_w_wo_max (module SeqTestable : TESTABLE_EP); + iter_ep_w_wo_max (module SeqsTestable : TESTABLE_EP); + ] + +let () = + let name = "Test_fuzzing_seq_max_concurrency" in + let tests = + [ + ("max_is_max", Lib_test.Qcheck_helpers.qcheck_wrap max_is_max); + ("with_without", Lib_test.Qcheck_helpers.qcheck_wrap w_wo); + ] + in + Alcotest.run name tests diff --git a/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml b/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml index ddd8ffb0bf08412497947e42369874bf9c5cab10..1a30b8d7e0cedd55b1ad6a884da115571a355e53 100644 --- a/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml @@ -42,6 +42,7 @@ module type S = sig type 'error trace val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t @@ -66,6 +67,7 @@ module type SeededS = sig type 'error trace val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/sigs/monad.ml b/src/lib_lwt_result_stdlib/traced/sigs/monad.ml index 9c26902c2253557d7f74b63fd85fb2b88d7f113e..fe0c50f4bbb9a9882ce5825739ca2150c647778c 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/monad.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/monad.ml @@ -70,4 +70,6 @@ module type S = sig ('a, 'error trace) result Lwt.t -> ('b, 'error trace) result Lwt.t -> ('a * 'b, 'error trace) result Lwt.t + + module PoolLight : module type of Bare_structs.Monad.PoolLight end diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq.ml index 049a5c5250ee698d767e6507daee920c58ff1d97..cce9fa74741cb91935807eb75ab0a345cf80ddca 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq.ml @@ -45,6 +45,7 @@ module type S = sig otherwise - is fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml index 8e267bea458bad687fcc4d7d09b6d568702e41e3..a79be33ab39947ae95df3f3b8e2a5553001acbb1 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml @@ -41,7 +41,7 @@ module type S = following traversors that are not supported in the [Bare] version. We will add those in future versions. - - [iter_p] which returns a [Trace.cons] of the sequence-interruption and the + - [iter_ep] which returns a [Trace.cons] of the sequence-interruption and the [Trace.conp_list] of the iterator errors. We need to decide on a semantic: which error is consed "above" the other? diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml index 83c96feba09948924034fc7c9d664b72789c20d7..15f36dd96051d719d397fd52df2d58e7fff402fc 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml @@ -48,6 +48,7 @@ module type S = sig - fulfilled with [Error _] if at least one of the promises is, or - fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml b/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml index c68f651c4e0d1e06b29679f7c2e44a03be30e1e4..e4d91cf8f09caa393dfe8970c587939cd65f9dd7 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml @@ -44,7 +44,8 @@ struct module Make (H : Stdlib.Hashtbl.HashedType) : S with type key = H.t = struct include Bare_structs.Hashtbl.Make (H) - let iter_ep f t = Seq.iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + Seq.iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) end module type SeededS = @@ -55,7 +56,8 @@ struct SeededS with type key = H.t = struct include Bare_structs.Hashtbl.MakeSeeded (H) - let iter_ep f t = Seq.iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + Seq.iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) end module type S_ES = diff --git a/src/lib_lwt_result_stdlib/traced/structs/monad.ml b/src/lib_lwt_result_stdlib/traced/structs/monad.ml index 7b9b5a76607e097480b85521868d53e5e6a0a299..cfe6fc822a12f8035d83215e36b71d6d2bc4cc9b 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/monad.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/monad.ml @@ -62,4 +62,6 @@ module Make (Trace : Traced_sigs.Trace.S) : let all_ep ts = all_p ts >|= all_e let both_ep a b = both_p a b >|= fun (a, b) -> both_e a b + + module PoolLight = Bare_structs.Monad.PoolLight end diff --git a/src/lib_lwt_result_stdlib/traced/structs/seq.ml b/src/lib_lwt_result_stdlib/traced/structs/seq.ml index e62ef795915c80a79bacead526d3abcad2eaf17a..dc83f131e621a5150eac43776a002701ca1f58ce 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/seq.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/seq.ml @@ -34,4 +34,19 @@ module Make (Monad : Traced_sigs.Monad.S) : | Cons (item, seq) -> iter_ep f seq (Lwt.apply f item :: acc) in iter_ep f seq [] + + let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> iter_ep f seq + | Some n -> + let open Monad in + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_ep acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq end diff --git a/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml b/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml index f43f1393fcc7551ed355c156e9460765c76ede3b..643be4a5e8188c340f29ad212b44a06883a6a248 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml @@ -30,7 +30,20 @@ module Make (Monad : Traced_sigs.Monad.S) : and type 'a t = 'a Bare_structs.Seq_s.t = struct include Bare_structs.Seq_s - let iter_ep f seq = + let iter_ep ?max_concurrency f seq = let open Monad in - fold_left (fun acc x -> Lwt.apply f x :: acc) [] seq >>= join_ep + match max_concurrency with + | None -> fold_left (fun acc x -> Lwt.apply f x :: acc) [] seq >>= join_ep + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_ep end diff --git a/src/lib_test/qcheck_helpers.mli b/src/lib_test/qcheck_helpers.mli index ef086f8b5ddf627b4adba228315ad26ffd837d94..b12a56d89b4769d76c61d332f986b729915cfc6f 100644 --- a/src/lib_test/qcheck_helpers.mli +++ b/src/lib_test/qcheck_helpers.mli @@ -90,10 +90,14 @@ val uint16 : int QCheck.arbitrary (** [int16] is an arbitrary of signed int16 arbitrary *) val int16 : int QCheck.arbitrary -(** [uint8] is an arbitrary of unsigned [int8] values *) +(** [uint8] generates an unsigned int8 arbitrary + + - Generation of values is delegated to {!int_range} *) val uint8 : int QCheck.arbitrary -(** [int8] is an arbitrary of signed int8 values *) +(** [int8] generates a signed int8 arbitrary + + - Generation of values is delegated to {!int_range} *) val int8 : int QCheck.arbitrary (** [of_option_shrink shrink_opt] returns a shrinker from an optional one.