From dfcf0b0a9023aeb672a2ad62fc24aedbc0ce9cf9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 29 Mar 2021 11:03:40 +0200 Subject: [PATCH 1/6] Lwtreslib: add ?max_concurrency to `Seq_s.iter_{p,ep}` --- src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml | 3 +- .../bare/structs/monad.ml | 30 +++++ .../bare/structs/monad.mli | 8 ++ .../bare/structs/seq_s.ml | 38 +++++- src/lib_lwt_result_stdlib/test/dune | 1 + .../test/test_fuzzing_helpers.ml | 2 + .../test/test_fuzzing_seq_max_concurrency.ml | 111 ++++++++++++++++++ .../traced/sigs/monad.ml | 2 + .../traced/sigs/seq_s.ml | 1 + .../traced/structs/monad.ml | 2 + .../traced/structs/seq_s.ml | 17 ++- src/lib_test/qcheck_helpers.mli | 8 +- 12 files changed, 213 insertions(+), 10 deletions(-) create mode 100644 src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml index dfeb30cb839d..9f38631aa3bf 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq_s.ml @@ -118,6 +118,7 @@ module type S = sig otherwise - is fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'trace) result Lwt.t) -> 'a t -> (unit, 'trace list) result Lwt.t @@ -128,7 +129,7 @@ module type S = sig is resolved only once all the promises of the iteration are. At this point it is either fulfilled if all promises are, or rejected if at least one of them is. *) - val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t val map : ('a -> 'b) -> 'a t -> 'b t diff --git a/src/lib_lwt_result_stdlib/bare/structs/monad.ml b/src/lib_lwt_result_stdlib/bare/structs/monad.ml index d8671994748f..fd6110c045a6 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/monad.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/monad.ml @@ -156,3 +156,33 @@ let both_ep a b = both_p a b >|= fun (a, b) -> both_e a b let lwt_apply2 f x y = try f x y with exn -> Lwt.fail exn let lwt_apply3 f a x y = try f a x y with exn -> Lwt.fail exn + +module PoolLight = struct + (* Inspired but heavily transformed from Lwt_pool *) + + type t = {max : int; mutable count : int; waiters : unit Lwt.u Queue.t} + + let create m = {max = m; count = 0; waiters = Queue.create ()} + + let release p = + match Queue.take_opt p.waiters with + | Some wakener -> Lwt.wakeup_later wakener () + | None -> p.count <- p.count - 1 + + let acquire p = + assert (p.count <= p.max) ; + if p.count = p.max then ( + let (pending, resolver) = Lwt.task () in + Queue.push resolver p.waiters ; + pending) + else ( + p.count <- p.count + 1 ; + Lwt.return_unit) + + let use p f = + acquire p >>= fun () -> + assert (p.count <= p.max) ; + Lwt.finalize f (fun () -> + release p ; + Lwt.return_unit) +end diff --git a/src/lib_lwt_result_stdlib/bare/structs/monad.mli b/src/lib_lwt_result_stdlib/bare/structs/monad.mli index 80dfc822e756..9d68f9fa1796 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/monad.mli +++ b/src/lib_lwt_result_stdlib/bare/structs/monad.mli @@ -30,3 +30,11 @@ include Bare_sigs.Monad.S val lwt_apply2 : ('a -> 'b -> 'c Lwt.t) -> 'a -> 'b -> 'c Lwt.t val lwt_apply3 : ('a -> 'b -> 'c -> 'd Lwt.t) -> 'a -> 'b -> 'c -> 'd Lwt.t + +module PoolLight : sig + type t + + val create : int -> t + + val use : t -> (unit -> 'a Lwt.t) -> 'a Lwt.t +end diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml b/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml index a4e6f3b6734f..ba36f31459c3 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq_s.ml @@ -109,11 +109,39 @@ let rec iter_es f seq = let iter_es f seq = iter_es f @@ protect seq -let iter_ep f seq = - fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_ep - -let iter_p f seq = - fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_p +let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> + fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_ep + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_ep + +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> + fold_left (fun acc item -> Lwt.apply f item :: acc) [] seq >>= join_p + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_p let rec map f seq () = seq () >|= function Nil -> Nil | Cons (item, seq) -> Cons (f item, map f seq) diff --git a/src/lib_lwt_result_stdlib/test/dune b/src/lib_lwt_result_stdlib/test/dune index 3e6cd5719dc2..de95b7b0226b 100644 --- a/src/lib_lwt_result_stdlib/test/dune +++ b/src/lib_lwt_result_stdlib/test/dune @@ -9,6 +9,7 @@ test_fuzzing_set test_fuzzing_seq_tiered test_fuzzing_option + test_fuzzing_seq_max_concurrency ) (libraries tezos-lwt-result-stdlib diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml index 0cde750f245e..39091479e0f0 100644 --- a/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_helpers.ml @@ -408,6 +408,8 @@ let many = QCheck.(list int) let maybe = QCheck.(option int) +let very_many = QCheck.map (fun l -> l @ l @ l @ l @ l @ l @ l @ l @ l @ l) many + let manymany = let open QCheck in oneof diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml new file mode 100644 index 000000000000..70a12fb95390 --- /dev/null +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml @@ -0,0 +1,111 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2021 Nomadic Labs *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +open Support.Lib +open Test_fuzzing_helpers + +let iter_p_max_concurrency = + let open QCheck in + Test.make + ~name:"Seq_s.iter_p ~max_concurrency: not too many promises" + (pair Lib_test.Qcheck_helpers.uint8 very_many) + (fun (max_concurrency, input) -> + let open Monad in + let current = ref 0 in + let max_concurrency = max_concurrency + 1 in + assert (max_concurrency > 0) ; + let fn x = + assert (!current < max_concurrency) ; + incr current ; + assert (!current <= max_concurrency) ; + log_pause x >>= fun () -> + assert (!current <= max_concurrency) ; + decr current ; + assert (!current < max_concurrency) ; + Lwt.return_unit + in + let p : bool Lwt.t = + Seq_s.iter_p ~max_concurrency fn (Seq_s.of_seq @@ List.to_seq input) + >|= fun () -> true + in + Lwt_main.run p) + +let iter_p_max_concurrency_same_as_p = + let open QCheck in + Test.make + ~name:"Seq.iter_p {~max_concurrency,}" + (triple + (triple Test_fuzzing_helpers.Fn.arith one one) + Lib_test.Qcheck_helpers.int8 + many) + (fun ((Fun (_, fn), const, init), max_concurrency, input) -> + let open Monad in + eq_es + (let acc = ref init in + Seq.iter_p (IterSOf.monotonous acc fn const) (List.to_seq input) + >>= fun () -> Monad.return !acc) + (let acc = ref init in + Seq_s.iter_p + ~max_concurrency + (IterSOf.monotonous acc fn const) + (Seq_s.of_seq @@ List.to_seq input) + >>= fun () -> Monad.return !acc)) + +let iter_ep_max_concurrency_same_as_ep = + let open QCheck in + Test.make + ~name:"Seq.iter_ep {~max_concurrency,}" + (triple + (triple Test_fuzzing_helpers.Fn.arith one one) + Lib_test.Qcheck_helpers.int8 + many) + (fun ((Fun (_, fn), const, init), max_concurrency, input) -> + let open Monad in + eq_es + (let acc = ref init in + Seq.iter_ep (IterESOf.monotonous acc fn const) (List.to_seq input) + >>=? fun () -> Monad.return !acc) + (let acc = ref init in + Seq_s.iter_ep + ~max_concurrency + (IterESOf.monotonous acc fn const) + (Seq_s.of_seq @@ List.to_seq input) + >>=? fun () -> Monad.return !acc)) + +let () = + let name = "Test_fuzzing_seq_max_concurrency" in + let tests = + [ + ( "iter_p_max_is_max", + Lib_test.Qcheck_helpers.qcheck_wrap [iter_p_max_concurrency] ); + ( "Seq_s.iter_p w-w/o max", + Lib_test.Qcheck_helpers.qcheck_wrap [iter_p_max_concurrency_same_as_p] + ); + ( "Seq_s.iter_ep w-w/o max", + Lib_test.Qcheck_helpers.qcheck_wrap [iter_ep_max_concurrency_same_as_ep] + ); + ] + in + Alcotest.run name tests diff --git a/src/lib_lwt_result_stdlib/traced/sigs/monad.ml b/src/lib_lwt_result_stdlib/traced/sigs/monad.ml index 9c26902c2253..fe0c50f4bbb9 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/monad.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/monad.ml @@ -70,4 +70,6 @@ module type S = sig ('a, 'error trace) result Lwt.t -> ('b, 'error trace) result Lwt.t -> ('a * 'b, 'error trace) result Lwt.t + + module PoolLight : module type of Bare_structs.Monad.PoolLight end diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml index 83c96feba099..15f36dd96051 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq_s.ml @@ -48,6 +48,7 @@ module type S = sig - fulfilled with [Error _] if at least one of the promises is, or - fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/structs/monad.ml b/src/lib_lwt_result_stdlib/traced/structs/monad.ml index 7b9b5a76607e..cfe6fc822a12 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/monad.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/monad.ml @@ -62,4 +62,6 @@ module Make (Trace : Traced_sigs.Trace.S) : let all_ep ts = all_p ts >|= all_e let both_ep a b = both_p a b >|= fun (a, b) -> both_e a b + + module PoolLight = Bare_structs.Monad.PoolLight end diff --git a/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml b/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml index f43f1393fcc7..643be4a5e818 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/seq_s.ml @@ -30,7 +30,20 @@ module Make (Monad : Traced_sigs.Monad.S) : and type 'a t = 'a Bare_structs.Seq_s.t = struct include Bare_structs.Seq_s - let iter_ep f seq = + let iter_ep ?max_concurrency f seq = let open Monad in - fold_left (fun acc x -> Lwt.apply f x :: acc) [] seq >>= join_ep + match max_concurrency with + | None -> fold_left (fun acc x -> Lwt.apply f x :: acc) [] seq >>= join_ep + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + (* In order to force only the part of the seq that is usable, we have to + do a bit of extra work *) + let rec traverse acc seq = + PoolLight.use p seq >>= function + | Nil -> Lwt.return acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq >>= join_ep end diff --git a/src/lib_test/qcheck_helpers.mli b/src/lib_test/qcheck_helpers.mli index ef086f8b5ddf..b12a56d89b47 100644 --- a/src/lib_test/qcheck_helpers.mli +++ b/src/lib_test/qcheck_helpers.mli @@ -90,10 +90,14 @@ val uint16 : int QCheck.arbitrary (** [int16] is an arbitrary of signed int16 arbitrary *) val int16 : int QCheck.arbitrary -(** [uint8] is an arbitrary of unsigned [int8] values *) +(** [uint8] generates an unsigned int8 arbitrary + + - Generation of values is delegated to {!int_range} *) val uint8 : int QCheck.arbitrary -(** [int8] is an arbitrary of signed int8 values *) +(** [int8] generates a signed int8 arbitrary + + - Generation of values is delegated to {!int_range} *) val int8 : int QCheck.arbitrary (** [of_option_shrink shrink_opt] returns a shrinker from an optional one. -- GitLab From 8b6322a125b2bfac9b7e2c3537e615604bb78500 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 24 May 2021 15:41:12 +0200 Subject: [PATCH 2/6] Lwtreslib: Add max_concurrency to Seq.iter_{,e}p --- src/lib_lwt_result_stdlib/bare/sigs/seq.ml | 13 +- src/lib_lwt_result_stdlib/bare/structs/seq.ml | 28 ++++ .../test/test_fuzzing_seq_max_concurrency.ml | 127 +++++++++++++++--- src/lib_lwt_result_stdlib/traced/sigs/seq.ml | 1 + .../traced/structs/seq.ml | 15 +++ 5 files changed, 159 insertions(+), 25 deletions(-) diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml index a7a549ba3312..fb22d46f96ff 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml @@ -147,8 +147,12 @@ module type S = sig - is rejected if at least one of the promises is, otherwise - is fulfilled with [Error _] if at least one of the promises is, otherwise - - is fulfilled with [Ok ()] if all the promises are. *) + - is fulfilled with [Ok ()] if all the promises are. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'trace) result Lwt.t) -> 'a t -> (unit, 'trace list) result Lwt.t @@ -157,8 +161,11 @@ module type S = sig steps of the iteration are started concurrently. The promise [iter_p f s] is resolved only once all the promises of the iteration are. At this point it is either fulfilled if all promises are, or rejected if at least one of - them is. *) - val iter_p : ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + them is. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t val unfold : ('b -> ('a * 'b) option) -> 'b -> 'a t end diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq.ml b/src/lib_lwt_result_stdlib/bare/structs/seq.ml index dfe86fd237b8..0c37d18f4923 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq.ml @@ -93,6 +93,20 @@ let iter_ep f seq = in iter_ep f seq [] +let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> iter_ep f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_ep acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq + let iter_p f seq = let rec iter_p f seq acc = match seq () with @@ -101,5 +115,19 @@ let iter_p f seq = in iter_p f seq [] +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> iter_p f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_p acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq + let rec unfold f a () = match f a with None -> Nil | Some (item, a) -> Cons (item, unfold f a) diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml index 70a12fb95390..a11133046dfe 100644 --- a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml @@ -26,10 +26,53 @@ open Support.Lib open Test_fuzzing_helpers -let iter_p_max_concurrency = +module type TESTABLE = sig + type 'a t + + val name : string + + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + + val iter_ep : + ?max_concurrency:int -> + ('a -> (unit, 'e Monad.trace) result Lwt.t) -> + 'a t -> + (unit, 'e Monad.trace) result Lwt.t + + val of_list : 'a list -> 'a t +end + +module SeqTestable : TESTABLE = struct + type 'a t = 'a Seq.t + + let name = "Seq" + + let iter_p = Seq.iter_p + + let iter_ep = Seq.iter_ep + + let of_list = List.to_seq +end + +module SeqsTestable : TESTABLE = struct + type 'a t = 'a Seq_s.t + + let name = "Seq_s" + + let iter_p = Seq_s.iter_p + + let iter_ep = Seq_s.iter_ep + + let of_list l = Seq_s.of_seq @@ List.to_seq l +end + +let iter_p_max_is_max (module M : TESTABLE) = let open QCheck in Test.make - ~name:"Seq_s.iter_p ~max_concurrency: not too many promises" + ~name: + (Format.asprintf + "%s.iter_p ~max_concurrency: not too many promises" + M.name) (pair Lib_test.Qcheck_helpers.uint8 very_many) (fun (max_concurrency, input) -> let open Monad in @@ -47,15 +90,53 @@ let iter_p_max_concurrency = Lwt.return_unit in let p : bool Lwt.t = - Seq_s.iter_p ~max_concurrency fn (Seq_s.of_seq @@ List.to_seq input) - >|= fun () -> true + M.iter_p ~max_concurrency fn (M.of_list input) >>= fun () -> + Lwt.return_true + in + Lwt_main.run p) + +let iter_ep_max_is_max (module M : TESTABLE) = + let open QCheck in + Test.make + ~name: + (Format.asprintf + "%s.iter_ep ~max_concurrency: not too many promises" + M.name) + (pair Lib_test.Qcheck_helpers.uint8 very_many) + (fun (max_concurrency, input) -> + let open Monad in + let current = ref 0 in + let max_concurrency = max_concurrency + 1 in + assert (max_concurrency > 0) ; + let fn x = + assert (!current < max_concurrency) ; + incr current ; + assert (!current <= max_concurrency) ; + log_pause x >>= fun () -> + assert (!current <= max_concurrency) ; + decr current ; + assert (!current < max_concurrency) ; + Monad.return_unit + in + let p : bool Lwt.t = + M.iter_ep ~max_concurrency fn (M.of_list input) >>= function + | Ok () -> Lwt.return_true + | Error _ -> assert false in Lwt_main.run p) -let iter_p_max_concurrency_same_as_p = +let max_is_max = + [ + iter_p_max_is_max (module SeqTestable : TESTABLE); + iter_p_max_is_max (module SeqsTestable : TESTABLE); + iter_ep_max_is_max (module SeqTestable : TESTABLE); + iter_ep_max_is_max (module SeqsTestable : TESTABLE); + ] + +let iter_p_w_wo_max (module M : TESTABLE) = let open QCheck in Test.make - ~name:"Seq.iter_p {~max_concurrency,}" + ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) (triple (triple Test_fuzzing_helpers.Fn.arith one one) Lib_test.Qcheck_helpers.int8 @@ -64,19 +145,19 @@ let iter_p_max_concurrency_same_as_p = let open Monad in eq_es (let acc = ref init in - Seq.iter_p (IterSOf.monotonous acc fn const) (List.to_seq input) + M.iter_p (IterSOf.monotonous acc fn const) (M.of_list input) >>= fun () -> Monad.return !acc) (let acc = ref init in - Seq_s.iter_p + M.iter_p ~max_concurrency (IterSOf.monotonous acc fn const) - (Seq_s.of_seq @@ List.to_seq input) + (M.of_list input) >>= fun () -> Monad.return !acc)) -let iter_ep_max_concurrency_same_as_ep = +let iter_ep_w_wo_max (module M : TESTABLE) = let open QCheck in Test.make - ~name:"Seq.iter_ep {~max_concurrency,}" + ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) (triple (triple Test_fuzzing_helpers.Fn.arith one one) Lib_test.Qcheck_helpers.int8 @@ -85,27 +166,29 @@ let iter_ep_max_concurrency_same_as_ep = let open Monad in eq_es (let acc = ref init in - Seq.iter_ep (IterESOf.monotonous acc fn const) (List.to_seq input) + M.iter_ep (IterESOf.monotonous acc fn const) (M.of_list input) >>=? fun () -> Monad.return !acc) (let acc = ref init in - Seq_s.iter_ep + M.iter_ep ~max_concurrency (IterESOf.monotonous acc fn const) - (Seq_s.of_seq @@ List.to_seq input) + (M.of_list input) >>=? fun () -> Monad.return !acc)) +let w_wo = + [ + iter_p_w_wo_max (module SeqTestable : TESTABLE); + iter_p_w_wo_max (module SeqsTestable : TESTABLE); + iter_ep_w_wo_max (module SeqTestable : TESTABLE); + iter_ep_w_wo_max (module SeqsTestable : TESTABLE); + ] + let () = let name = "Test_fuzzing_seq_max_concurrency" in let tests = [ - ( "iter_p_max_is_max", - Lib_test.Qcheck_helpers.qcheck_wrap [iter_p_max_concurrency] ); - ( "Seq_s.iter_p w-w/o max", - Lib_test.Qcheck_helpers.qcheck_wrap [iter_p_max_concurrency_same_as_p] - ); - ( "Seq_s.iter_ep w-w/o max", - Lib_test.Qcheck_helpers.qcheck_wrap [iter_ep_max_concurrency_same_as_ep] - ); + ("max_is_max", Lib_test.Qcheck_helpers.qcheck_wrap max_is_max); + ("with_without", Lib_test.Qcheck_helpers.qcheck_wrap w_wo); ] in Alcotest.run name tests diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq.ml index 049a5c5250ee..cce9fa74741c 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq.ml @@ -45,6 +45,7 @@ module type S = sig otherwise - is fulfilled with [Ok ()] if all the promises are. *) val iter_ep : + ?max_concurrency:int -> ('a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/structs/seq.ml b/src/lib_lwt_result_stdlib/traced/structs/seq.ml index e62ef795915c..dc83f131e621 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/seq.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/seq.ml @@ -34,4 +34,19 @@ module Make (Monad : Traced_sigs.Monad.S) : | Cons (item, seq) -> iter_ep f seq (Lwt.apply f item :: acc) in iter_ep f seq [] + + let iter_ep ?max_concurrency f seq = + match max_concurrency with + | None -> iter_ep f seq + | Some n -> + let open Monad in + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Nil -> join_ep acc + | Cons (item, seq) -> traverse (f item :: acc) seq + in + traverse [] seq end -- GitLab From 3ce3ff9fbd9cd323da3441eb8d79c8312698b96d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 24 May 2021 15:53:45 +0200 Subject: [PATCH 3/6] Lwtreslib: Add max_concurrency to Seq_e.iter_p --- src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml | 11 ++- .../bare/structs/seq_e.ml | 15 ++++ .../test/test_fuzzing_seq_max_concurrency.ml | 70 +++++++++++++++---- .../traced/sigs/seq_e.ml | 2 +- 4 files changed, 80 insertions(+), 18 deletions(-) diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml index fb254a3a25b7..9885fe13ace6 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq_e.ml @@ -192,8 +192,15 @@ fold_left_e prefix of [seq] have resolved. Note that the behaviour for interrupted sequences is in line with the - best-effort semantic of Lwtreslib. *) - val iter_p : ('a -> unit Lwt.t) -> ('a, 'e) t -> (unit, 'e) result Lwt.t + best-effort semantic of Lwtreslib. + + If [max_concurrency] is set, there are never more than that many pending + promises created by the iterating function. *) + val iter_p : + ?max_concurrency:int -> + ('a -> unit Lwt.t) -> + ('a, 'e) t -> + (unit, 'e) result Lwt.t (** There is no [iter_ep] in [Bare]. The reason is that there can be two sources of failures and there is no satisfying way to combine failures for diff --git a/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml b/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml index ba08cd3b10bf..f98d54ebe333 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/seq_e.ml @@ -129,6 +129,21 @@ let iter_p f seq = in iter_p [] f seq +let iter_p ?max_concurrency f seq = + match max_concurrency with + | None -> iter_p f seq + | Some n -> + let n = max n 1 in + let p = PoolLight.create n in + let f x = PoolLight.use p (fun () -> f x) in + let rec traverse acc seq = + PoolLight.use p (fun () -> Lwt.return (seq ())) >>= function + | Error _ as e -> join_p acc >>= fun () -> Lwt.return e + | Ok Nil -> join_p acc >>= fun () -> Monad.unit_es + | Ok (Cons (item, seq)) -> traverse (f item :: acc) seq + in + traverse [] seq + let rec map f seq () = seq () >|? function Nil -> Nil | Cons (item, seq) -> Cons (f item, map f seq) diff --git a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml index a11133046dfe..c4759811b05e 100644 --- a/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml +++ b/src/lib_lwt_result_stdlib/test/test_fuzzing_seq_max_concurrency.ml @@ -26,13 +26,21 @@ open Support.Lib open Test_fuzzing_helpers -module type TESTABLE = sig +module type TESTABLE_P = sig type 'a t val name : string val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val of_list : 'a list -> 'a t +end + +module type TESTABLE_EP = sig + type 'a t + + val name : string + val iter_ep : ?max_concurrency:int -> ('a -> (unit, 'e Monad.trace) result Lwt.t) -> @@ -42,7 +50,23 @@ module type TESTABLE = sig val of_list : 'a list -> 'a t end -module SeqTestable : TESTABLE = struct +module type TESTABLE_P_EP = sig + type 'a t + + val name : string + + val iter_p : ?max_concurrency:int -> ('a -> unit Lwt.t) -> 'a t -> unit Lwt.t + + val iter_ep : + ?max_concurrency:int -> + ('a -> (unit, 'e Monad.trace) result Lwt.t) -> + 'a t -> + (unit, 'e Monad.trace) result Lwt.t + + val of_list : 'a list -> 'a t +end + +module SeqTestable : TESTABLE_P_EP = struct type 'a t = 'a Seq.t let name = "Seq" @@ -54,7 +78,7 @@ module SeqTestable : TESTABLE = struct let of_list = List.to_seq end -module SeqsTestable : TESTABLE = struct +module SeqsTestable : TESTABLE_P_EP = struct type 'a t = 'a Seq_s.t let name = "Seq_s" @@ -66,7 +90,21 @@ module SeqsTestable : TESTABLE = struct let of_list l = Seq_s.of_seq @@ List.to_seq l end -let iter_p_max_is_max (module M : TESTABLE) = +module SeqeTestable : TESTABLE_P = struct + type 'a t = ('a, unit) Seq_e.t + + let name = "Seq_e" + + let iter_p ?max_concurrency f s = + let open Monad in + Seq_e.iter_p ?max_concurrency f s >>= function + | Ok o -> Lwt.return o + | Error () -> assert false + + let of_list l = Seq_e.of_seq @@ List.to_seq l +end + +let iter_p_max_is_max (module M : TESTABLE_P) = let open QCheck in Test.make ~name: @@ -95,7 +133,7 @@ let iter_p_max_is_max (module M : TESTABLE) = in Lwt_main.run p) -let iter_ep_max_is_max (module M : TESTABLE) = +let iter_ep_max_is_max (module M : TESTABLE_EP) = let open QCheck in Test.make ~name: @@ -127,13 +165,14 @@ let iter_ep_max_is_max (module M : TESTABLE) = let max_is_max = [ - iter_p_max_is_max (module SeqTestable : TESTABLE); - iter_p_max_is_max (module SeqsTestable : TESTABLE); - iter_ep_max_is_max (module SeqTestable : TESTABLE); - iter_ep_max_is_max (module SeqsTestable : TESTABLE); + iter_p_max_is_max (module SeqTestable : TESTABLE_P); + iter_p_max_is_max (module SeqsTestable : TESTABLE_P); + iter_p_max_is_max (module SeqeTestable : TESTABLE_P); + iter_ep_max_is_max (module SeqTestable : TESTABLE_EP); + iter_ep_max_is_max (module SeqsTestable : TESTABLE_EP); ] -let iter_p_w_wo_max (module M : TESTABLE) = +let iter_p_w_wo_max (module M : TESTABLE_P) = let open QCheck in Test.make ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) @@ -154,7 +193,7 @@ let iter_p_w_wo_max (module M : TESTABLE) = (M.of_list input) >>= fun () -> Monad.return !acc)) -let iter_ep_w_wo_max (module M : TESTABLE) = +let iter_ep_w_wo_max (module M : TESTABLE_EP) = let open QCheck in Test.make ~name:(Format.asprintf "%s.iter_p {~max_concurrency,}" M.name) @@ -177,10 +216,11 @@ let iter_ep_w_wo_max (module M : TESTABLE) = let w_wo = [ - iter_p_w_wo_max (module SeqTestable : TESTABLE); - iter_p_w_wo_max (module SeqsTestable : TESTABLE); - iter_ep_w_wo_max (module SeqTestable : TESTABLE); - iter_ep_w_wo_max (module SeqsTestable : TESTABLE); + iter_p_w_wo_max (module SeqTestable : TESTABLE_P); + iter_p_w_wo_max (module SeqsTestable : TESTABLE_P); + iter_p_w_wo_max (module SeqeTestable : TESTABLE_P); + iter_ep_w_wo_max (module SeqTestable : TESTABLE_EP); + iter_ep_w_wo_max (module SeqsTestable : TESTABLE_EP); ] let () = diff --git a/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml b/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml index 8e267bea458b..a79be33ab399 100644 --- a/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml +++ b/src/lib_lwt_result_stdlib/traced/sigs/seq_e.ml @@ -41,7 +41,7 @@ module type S = following traversors that are not supported in the [Bare] version. We will add those in future versions. - - [iter_p] which returns a [Trace.cons] of the sequence-interruption and the + - [iter_ep] which returns a [Trace.cons] of the sequence-interruption and the [Trace.conp_list] of the iterator errors. We need to decide on a semantic: which error is consed "above" the other? -- GitLab From 97b380198e14640fec333eb125d10cde66cece80 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Mon, 24 May 2021 16:06:32 +0200 Subject: [PATCH 4/6] Lwtreslib: Add max_concurrency to Hastbl*.iter_*p --- .../bare/functor_outputs/hashtbl.ml | 8 ++++++-- src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml | 12 ++++++++---- .../traced/functor_outputs/hashtbl.ml | 2 ++ src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml | 6 ++++-- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml b/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml index b6a6a39ff227..7f36bb9ef3cf 100644 --- a/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/bare/functor_outputs/hashtbl.ml @@ -60,7 +60,8 @@ module type S = sig val iter_s : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t - val iter_p : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : + ?max_concurrency:int -> (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t val iter_e : (key -> 'a -> (unit, 'trace) result) -> 'a t -> (unit, 'trace) result @@ -71,6 +72,7 @@ module type S = sig (unit, 'trace) result Lwt.t val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error) result Lwt.t) -> 'a t -> (unit, 'error list) result Lwt.t @@ -150,7 +152,8 @@ module type SeededS = sig val iter_s : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t - val iter_p : (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t + val iter_p : + ?max_concurrency:int -> (key -> 'a -> unit Lwt.t) -> 'a t -> unit Lwt.t val iter_e : (key -> 'a -> (unit, 'trace) result) -> 'a t -> (unit, 'trace) result @@ -161,6 +164,7 @@ module type SeededS = sig (unit, 'trace) result Lwt.t val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error) result Lwt.t) -> 'a t -> (unit, 'error list) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml b/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml index ca3d41fa6839..82f84d7b696f 100644 --- a/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/bare/structs/hashtbl.ml @@ -46,9 +46,11 @@ module Make (H : Stdlib.Hashtbl.HashedType) : S with type key = H.t = struct let iter_es f t = iter_es (fun (k, v) -> f k v) (to_seq t) - let iter_p f t = iter_p (fun (k, v) -> f k v) (to_seq t) + let iter_p ?max_concurrency f t = + iter_p ?max_concurrency (fun (k, v) -> f k v) (to_seq t) - let iter_ep f t = iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) let fold_e f t init = fold_left_e (fun acc (k, v) -> f k v acc) init (to_seq t) @@ -81,9 +83,11 @@ module MakeSeeded (H : Stdlib.Hashtbl.SeededHashedType) : let iter_es f t = iter_es (fun (k, v) -> f k v) (to_seq t) - let iter_ep f t = iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) - let iter_p f t = iter_p (fun (k, v) -> f k v) (to_seq t) + let iter_p ?max_concurrency f t = + iter_p ?max_concurrency (fun (k, v) -> f k v) (to_seq t) let fold_e f t init = fold_left_e (fun acc (k, v) -> f k v acc) init (to_seq t) diff --git a/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml b/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml index ddd8ffb0bf08..1a30b8d7e0ce 100644 --- a/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/traced/functor_outputs/hashtbl.ml @@ -42,6 +42,7 @@ module type S = sig type 'error trace val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t @@ -66,6 +67,7 @@ module type SeededS = sig type 'error trace val iter_ep : + ?max_concurrency:int -> (key -> 'a -> (unit, 'error trace) result Lwt.t) -> 'a t -> (unit, 'error trace) result Lwt.t diff --git a/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml b/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml index c68f651c4e0d..e4d91cf8f09c 100644 --- a/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml +++ b/src/lib_lwt_result_stdlib/traced/structs/hashtbl.ml @@ -44,7 +44,8 @@ struct module Make (H : Stdlib.Hashtbl.HashedType) : S with type key = H.t = struct include Bare_structs.Hashtbl.Make (H) - let iter_ep f t = Seq.iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + Seq.iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) end module type SeededS = @@ -55,7 +56,8 @@ struct SeededS with type key = H.t = struct include Bare_structs.Hashtbl.MakeSeeded (H) - let iter_ep f t = Seq.iter_ep (fun (k, v) -> f k v) (to_seq t) + let iter_ep ?max_concurrency f t = + Seq.iter_ep ?max_concurrency (fun (k, v) -> f k v) (to_seq t) end module type S_ES = -- GitLab From 303b003da17430c84f3d0f20752918adc1a904a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 25 May 2021 08:11:28 +0200 Subject: [PATCH 5/6] Lwtreslib: small doc improvement --- src/lib_lwt_result_stdlib/bare/sigs/monad.ml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib_lwt_result_stdlib/bare/sigs/monad.ml b/src/lib_lwt_result_stdlib/bare/sigs/monad.ml index 81df6077f1eb..47721f5fbfba 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/monad.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/monad.ml @@ -26,7 +26,8 @@ (** {1 Lwt, result, and Lwt-result monad operators} This module provides the necessary functions and operators to use Lwt, - result and Lwt-result as a monad. + result and Lwt-result as a monad. There is some overlap with {!Lwt} and + {!Lwt_result}. {2 Basics} -- GitLab From c5edb69feca3c386963afdf0a79b40dc8f3108e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Proust?= Date: Tue, 1 Jun 2021 09:34:28 +0200 Subject: [PATCH 6/6] Lwtreslib: document `max_concurrency` as a general concept of the lib --- src/lib_lwt_result_stdlib/bare/sigs/seq.ml | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml index fb22d46f96ff..09e2189fd370 100644 --- a/src/lib_lwt_result_stdlib/bare/sigs/seq.ml +++ b/src/lib_lwt_result_stdlib/bare/sigs/seq.ml @@ -63,6 +63,19 @@ All the traversal functions that are suffixed with [_ep] are within the combined error-and-Lwt monad. These function traverse the elements concurrently with a best-effort behaviour. + + Some of [_p]- and [_ep]-suffixed functions have a [max_concurrency] + parameter. If this parameter is omitted, there are not limits to the number + of concurrent calls to the traversal function that can happen concurrently. + More specifically, the library does not bound the number of pending promises + created by calls to the traversal function. If this parameter is set, then + there are at most this number (or 1, whichever is highest) pending traversal + promises at any given time. + + Note that setting [max_concurrency] to [1] causes the elements to be + processed one after the other just like with the [_s] (or [_es]) traversor. + However, the error management is different: [_p] (and [_ep]) traversors have + a best-effort semantic even if [max_concurrency] is set. *) (** {2 Special consideration} -- GitLab