From 92efc26f1a40d87819cd19b5c3e4a5e98f82e6e1 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Tue, 7 Feb 2023 18:02:53 +0100 Subject: [PATCH 01/11] DAC: Move data_streamer to lib_dac_node --- src/{bin_dac_node => lib_dac_node}/data_streamer.ml | 0 src/{bin_dac_node => lib_dac_node}/data_streamer.mli | 0 2 files changed, 0 insertions(+), 0 deletions(-) rename src/{bin_dac_node => lib_dac_node}/data_streamer.ml (100%) rename src/{bin_dac_node => lib_dac_node}/data_streamer.mli (100%) diff --git a/src/bin_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml similarity index 100% rename from src/bin_dac_node/data_streamer.ml rename to src/lib_dac_node/data_streamer.ml diff --git a/src/bin_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli similarity index 100% rename from src/bin_dac_node/data_streamer.mli rename to src/lib_dac_node/data_streamer.mli -- GitLab From fd800405e8372aa03e1989507a8cff6f7132aa1a Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 9 Feb 2023 17:08:44 +0100 Subject: [PATCH 02/11] DAC: Make data streamer interface polymorphic via 'a t --- src/lib_dac_node/data_streamer.ml | 10 ++++------ src/lib_dac_node/data_streamer.mli | 26 +++++++++++++------------- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/src/lib_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml index 642726795818..2771b494f6b8 100644 --- a/src/lib_dac_node/data_streamer.ml +++ b/src/lib_dac_node/data_streamer.ml @@ -27,17 +27,15 @@ Implement a useful Root_hash_streamer *) module Root_hash_streamer = struct - type t = unit + type 'a t = unit type configuration = unit let init (_configuration : configuration) = () - let publish (_streamer : t) (_hash : Dac_plugin.Dac_hash.t) = - Lwt_result_syntax.return_unit + let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit - let make_subscription (_streamer : t) : - (Dac_plugin.Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t - = + let make_subscription (_streamer : 'a t) : + ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () end diff --git a/src/lib_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli index 76f920a159a3..0541d754e8eb 100644 --- a/src/lib_dac_node/data_streamer.mli +++ b/src/lib_dac_node/data_streamer.mli @@ -23,29 +23,29 @@ (* *) (*****************************************************************************) -(** [Root_hash_streamer] manages the pub-sub mechanism for streaming root - page hashes from publishers to subscribers. Root hash refers to the - root hash of the DAC payload Merkle tree. +(** [Root_hash_streamer] is an in-memory data structure for handling pub-sub + mechanism of streaming data from publishers to subscribers. *) module Root_hash_streamer : sig - type t + (** ['a t] represents an instance of [Root_hash_streamer], where ['a] + is the type of the data that we stream. *) + type 'a t (* Streamer configuration. *) type configuration (** Initializes a [Root_hash_streamer.t] *) - val init : configuration -> t + val init : configuration -> 'a t - (** [publish streamer root_hash] publishes a [root_hash] to all attached - subscribers in [streamer]. + (** [publish streamer data] publishes [data] to all attached + subscribers of the [streamer]. *) - val publish : t -> Dac_plugin.Dac_hash.t -> unit tzresult Lwt.t + val publish : 'a t -> 'a -> unit tzresult Lwt.t - (** [make_subscription streamer] returns a new stream of hashes for the subscriber to - consume. An [Lwt_watcher.stopper] function is also returned for the - subscriber to close the stream. + (** [make_subscription streamer] returns a new stream of data for the + subscriber to consume. An [Lwt_watcher.stopper] function is also returned + for the subscriber to close the stream. *) val make_subscription : - t -> - (Dac_plugin.Dac_hash.t Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t + 'a t -> ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t end -- GitLab From 4d9340d12833ddb86f39ee7d13c09cc558e4ac23 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 9 Feb 2023 17:28:16 +0100 Subject: [PATCH 03/11] DAC: Remove configuration from Root_hash_streamer --- src/lib_dac_node/data_streamer.ml | 4 +--- src/lib_dac_node/data_streamer.mli | 5 +---- 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/lib_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml index 2771b494f6b8..2507cef9fbef 100644 --- a/src/lib_dac_node/data_streamer.ml +++ b/src/lib_dac_node/data_streamer.ml @@ -29,9 +29,7 @@ module Root_hash_streamer = struct type 'a t = unit - type configuration = unit - - let init (_configuration : configuration) = () + let init () = () let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit diff --git a/src/lib_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli index 0541d754e8eb..458432f87045 100644 --- a/src/lib_dac_node/data_streamer.mli +++ b/src/lib_dac_node/data_streamer.mli @@ -31,11 +31,8 @@ module Root_hash_streamer : sig is the type of the data that we stream. *) type 'a t - (* Streamer configuration. *) - type configuration - (** Initializes a [Root_hash_streamer.t] *) - val init : configuration -> 'a t + val init : unit -> 'a t (** [publish streamer data] publishes [data] to all attached subscribers of the [streamer]. -- GitLab From 1ae5cafd305376c0b51372eb67d54f24fc4bdbb6 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 9 Feb 2023 17:30:24 +0100 Subject: [PATCH 04/11] DAC: Rename data streamer make_subscription to handle_subscribe --- src/lib_dac_node/data_streamer.ml | 2 +- src/lib_dac_node/data_streamer.mli | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml index 2507cef9fbef..21e57f50cdc4 100644 --- a/src/lib_dac_node/data_streamer.ml +++ b/src/lib_dac_node/data_streamer.ml @@ -33,7 +33,7 @@ module Root_hash_streamer = struct let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit - let make_subscription (_streamer : 'a t) : + let handle_subscribe (_streamer : 'a t) : ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () end diff --git a/src/lib_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli index 458432f87045..769d79a6e6cb 100644 --- a/src/lib_dac_node/data_streamer.mli +++ b/src/lib_dac_node/data_streamer.mli @@ -39,10 +39,10 @@ module Root_hash_streamer : sig *) val publish : 'a t -> 'a -> unit tzresult Lwt.t - (** [make_subscription streamer] returns a new stream of data for the + (** [handle_subscribe streamer] returns a new stream of data for the subscriber to consume. An [Lwt_watcher.stopper] function is also returned for the subscriber to close the stream. *) - val make_subscription : + val handle_subscribe : 'a t -> ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t end -- GitLab From 6584be6165c0f789f79e890696f776966892b7f7 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 9 Feb 2023 17:32:51 +0100 Subject: [PATCH 05/11] DAC: Remove redundant module nesting of Data_streamer --- src/lib_dac_node/data_streamer.ml | 15 +++++++------- src/lib_dac_node/data_streamer.mli | 33 ++++++++++++++---------------- 2 files changed, 22 insertions(+), 26 deletions(-) diff --git a/src/lib_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml index 21e57f50cdc4..d7745d2d7899 100644 --- a/src/lib_dac_node/data_streamer.ml +++ b/src/lib_dac_node/data_streamer.ml @@ -26,14 +26,13 @@ (** FIXME: https://gitlab.com/tezos/tezos/-/issues/4740 Implement a useful Root_hash_streamer *) -module Root_hash_streamer = struct - type 'a t = unit - let init () = () +type 'a t = unit - let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit +let init () = () - let handle_subscribe (_streamer : 'a t) : - ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = - Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () -end +let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit + +let handle_subscribe (_streamer : 'a t) : + ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = + Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () diff --git a/src/lib_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli index 769d79a6e6cb..f2498f87c768 100644 --- a/src/lib_dac_node/data_streamer.mli +++ b/src/lib_dac_node/data_streamer.mli @@ -23,26 +23,23 @@ (* *) (*****************************************************************************) -(** [Root_hash_streamer] is an in-memory data structure for handling pub-sub +(** [Data_streamer] is an in-memory data structure for handling pub-sub mechanism of streaming data from publishers to subscribers. *) -module Root_hash_streamer : sig - (** ['a t] represents an instance of [Root_hash_streamer], where ['a] - is the type of the data that we stream. *) - type 'a t - (** Initializes a [Root_hash_streamer.t] *) - val init : unit -> 'a t +(** ['a t] represents an instance of [Data_streamer], where ['a] + is the type of the data that we stream. *) +type 'a t - (** [publish streamer data] publishes [data] to all attached - subscribers of the [streamer]. - *) - val publish : 'a t -> 'a -> unit tzresult Lwt.t +(** Initializes an instance of [Data_streamer.t]. *) +val init : unit -> 'a t - (** [handle_subscribe streamer] returns a new stream of data for the - subscriber to consume. An [Lwt_watcher.stopper] function is also returned - for the subscriber to close the stream. - *) - val handle_subscribe : - 'a t -> ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t -end +(** [publish streamer data] publishes [data] to all attached + subscribers of the [streamer]. *) +val publish : 'a t -> 'a -> unit tzresult Lwt.t + +(** [handle_subscribe streamer] returns a new stream of data for the + subscriber to consume. An [Lwt_watcher.stopper] function is also returned + for the subscriber to close the stream. *) +val handle_subscribe : + 'a t -> ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t -- GitLab From bd7cdabe954a8dc298a89398e9d23e04cedae127 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Tue, 7 Feb 2023 18:17:43 +0100 Subject: [PATCH 06/11] DAC: Implement Data_streamer using Lwt_watcher --- src/lib_dac_node/data_streamer.ml | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/lib_dac_node/data_streamer.ml b/src/lib_dac_node/data_streamer.ml index d7745d2d7899..d84209b2cea7 100644 --- a/src/lib_dac_node/data_streamer.ml +++ b/src/lib_dac_node/data_streamer.ml @@ -2,6 +2,7 @@ (* *) (* Open Source License *) (* Copyright (c) 2023 TriliTech, *) +(* Copyright (c) 2023 Marigold *) (* *) (* Permission is hereby granted, free of charge, to any person obtaining a *) (* copy of this software and associated documentation files (the "Software"),*) @@ -23,16 +24,12 @@ (* *) (*****************************************************************************) -(** FIXME: https://gitlab.com/tezos/tezos/-/issues/4740 - Implement a useful Root_hash_streamer -*) +type 'a t = 'a Lwt_watcher.input -type 'a t = unit +let init () = Lwt_watcher.create_input () -let init () = () +let publish streamer hash = + Lwt_result_syntax.return @@ Lwt_watcher.notify streamer hash -let publish (_streamer : 'a t) (_hash : 'a) = Lwt_result_syntax.return_unit - -let handle_subscribe (_streamer : 'a t) : - ('a Lwt_stream.t * Lwt_watcher.stopper) tzresult Lwt.t = - Lwt_result_syntax.return @@ Lwt_watcher.create_fake_stream () +let handle_subscribe streamer = + Lwt_result_syntax.return @@ Lwt_watcher.create_stream streamer -- GitLab From 35b655648f72dbcd730c33e1f223308c7b608385 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 13 Feb 2023 19:23:43 +0100 Subject: [PATCH 07/11] Main: Add _octez_dac_node_lib_tests --- .gitlab/ci/jobs/packaging/opam_package.yml | 2 + dune-project | 1 + manifest/main.ml | 17 ++++++++ opam/tezos-dac-node-lib-test.opam | 26 ++++++++++++ src/lib_dac_node/test/dune | 27 +++++++++++++ src/lib_dac_node/test/main.ml | 47 ++++++++++++++++++++++ 6 files changed, 120 insertions(+) create mode 100644 opam/tezos-dac-node-lib-test.opam create mode 100644 src/lib_dac_node/test/dune create mode 100644 src/lib_dac_node/test/main.ml diff --git a/.gitlab/ci/jobs/packaging/opam_package.yml b/.gitlab/ci/jobs/packaging/opam_package.yml index f18421168fb0..7c4053d2bc47 100644 --- a/.gitlab/ci/jobs/packaging/opam_package.yml +++ b/.gitlab/ci/jobs/packaging/opam_package.yml @@ -611,6 +611,8 @@ opam:tezos-crypto-dal: # Ignoring unreleased package tezos-dac-node-lib. +# Ignoring unreleased package tezos-dac-node-lib-test. + # Ignoring unreleased package tezos-dal-016-PtMumbai. # Ignoring unreleased package tezos-dal-alpha. diff --git a/dune-project b/dune-project index 86074daf942d..4565a0c8033c 100644 --- a/dune-project +++ b/dune-project @@ -79,6 +79,7 @@ (package (name tezos-crypto-dal)) (package (name tezos-dac-alpha)) (package (name tezos-dac-node-lib)(allow_empty)) +(package (name tezos-dac-node-lib-test)(allow_empty)) (package (name tezos-dal-016-PtMumbai)) (package (name tezos-dal-alpha)) (package (name tezos-dal-node-lib)(allow_empty)) diff --git a/manifest/main.ml b/manifest/main.ml index ba3ce039ae85..96c122bf344e 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -3293,6 +3293,23 @@ let octez_dac_node_lib = octez_stdlib_unix |> open_; ] +let _octez_dac_node_lib_tests = + test + "main" + ~path:"src/lib_dac_node/test" + ~opam:"tezos-dac-node-lib-test" + ~synopsis:"Test for dac node lib" + ~deps: + [ + octez_stdlib |> open_; + octez_stdlib_unix |> open_; + octez_base |> open_ |> open_ ~m:"TzPervasives"; + octez_test_helpers |> open_; + octez_base_test_helpers |> open_; + octez_dac_node_lib |> open_; + alcotest_lwt; + ] + let octez_node_config = public_lib "octez-node-config" diff --git a/opam/tezos-dac-node-lib-test.opam b/opam/tezos-dac-node-lib-test.opam new file mode 100644 index 000000000000..f21b671a59dd --- /dev/null +++ b/opam/tezos-dac-node-lib-test.opam @@ -0,0 +1,26 @@ +# This file was automatically generated, do not edit. +# Edit file manifest/main.ml instead. +opam-version: "2.0" +maintainer: "contact@tezos.com" +authors: ["Tezos devteam"] +homepage: "https://www.tezos.com/" +bug-reports: "https://gitlab.com/tezos/tezos/issues" +dev-repo: "git+https://gitlab.com/tezos/tezos.git" +license: "MIT" +depends: [ + "dune" { >= "3.0" } + "ocaml" { >= "4.14" } + "tezos-stdlib" {with-test} + "tezos-stdlib-unix" {with-test} + "tezos-base" {with-test} + "tezos-test-helpers" {with-test} + "tezos-base-test-helpers" {with-test} + "tezos-dac-node-lib" {with-test} + "alcotest-lwt" { with-test & >= "1.5.0" } +] +build: [ + ["rm" "-r" "vendors"] + ["dune" "build" "-p" name "-j" jobs] + ["dune" "runtest" "-p" name "-j" jobs] {with-test} +] +synopsis: "Test for dac node lib" diff --git a/src/lib_dac_node/test/dune b/src/lib_dac_node/test/dune new file mode 100644 index 000000000000..1205238ef9a2 --- /dev/null +++ b/src/lib_dac_node/test/dune @@ -0,0 +1,27 @@ +; This file was automatically generated, do not edit. +; Edit file manifest/main.ml instead. + +(executable + (name main) + (libraries + tezos-stdlib + tezos-stdlib-unix + tezos-base + tezos-test-helpers + tezos-base-test-helpers + tezos_dac_node_lib + alcotest-lwt) + (flags + (:standard) + -open Tezos_stdlib + -open Tezos_stdlib_unix + -open Tezos_base + -open Tezos_base.TzPervasives + -open Tezos_test_helpers + -open Tezos_base_test_helpers + -open Tezos_dac_node_lib)) + +(rule + (alias runtest) + (package tezos-dac-node-lib-test) + (action (run %{dep:./main.exe}))) diff --git a/src/lib_dac_node/test/main.ml b/src/lib_dac_node/test/main.ml new file mode 100644 index 000000000000..ac7db01b4851 --- /dev/null +++ b/src/lib_dac_node/test/main.ml @@ -0,0 +1,47 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module Unit_test : sig + (** + * Example: [spec "Data_streamer.ml" Test_data_streamer.tests] + * Unit tests needs tag in log (like "[UNIT] some test description here...") + * This function handles such meta data *) + val spec : + string -> + unit Alcotest_lwt.test_case list -> + string * unit Alcotest_lwt.test_case list + + (** Tests with description string without [Unit] are skipped *) + val _skip : + string -> + unit Alcotest_lwt.test_case list -> + string * unit Alcotest_lwt.test_case list +end = struct + let spec unit_name test_cases = ("[Unit] " ^ unit_name, test_cases) + + let _skip unit_name test_cases = ("[SKIPPED] " ^ unit_name, test_cases) +end + +let () = Alcotest_lwt.run "protocol > unit" [] |> Lwt_main.run -- GitLab From 30aea7d09a296a7325bcdd31f9c2764b440f16b4 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Tue, 7 Feb 2023 18:32:45 +0100 Subject: [PATCH 08/11] DAC/test: Add simple pub sub test for data streamer --- src/lib_dac_node/test/main.ml | 6 ++- src/lib_dac_node/test/test_data_streamer.ml | 56 +++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) create mode 100644 src/lib_dac_node/test/test_data_streamer.ml diff --git a/src/lib_dac_node/test/main.ml b/src/lib_dac_node/test/main.ml index ac7db01b4851..51faa35f86c2 100644 --- a/src/lib_dac_node/test/main.ml +++ b/src/lib_dac_node/test/main.ml @@ -44,4 +44,8 @@ end = struct let _skip unit_name test_cases = ("[SKIPPED] " ^ unit_name, test_cases) end -let () = Alcotest_lwt.run "protocol > unit" [] |> Lwt_main.run +let () = + Alcotest_lwt.run + "protocol > unit" + [Unit_test.spec "Data_streamer.ml" Test_data_streamer.tests] + |> Lwt_main.run diff --git a/src/lib_dac_node/test/test_data_streamer.ml b/src/lib_dac_node/test/test_data_streamer.ml new file mode 100644 index 000000000000..1f01d3b6386c --- /dev/null +++ b/src/lib_dac_node/test/test_data_streamer.ml @@ -0,0 +1,56 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** Testing + ------- + Component: Lib_dac_node Data_streamer + Invocation: dune exec src/lib_dac_node/test/main.exe \ + -- test "^\[Unit\] Data_streamer.ml$" + Subject: Tests for the data streamer component. +*) + +let dac_hash_1 = "hash_1" + +let assert_equal_string expected actual = + Assert.equal + ~loc:__LOC__ + ~eq:String.equal + ~pp:Format.pp_print_string + expected + actual + +let test_simple_pub_sub () = + let open Lwt_result_syntax in + let open Data_streamer in + let streamer = init () in + let* stream, stopper = handle_subscribe streamer in + let* () = publish streamer dac_hash_1 in + let*! next = Lwt_stream.next stream in + let () = Lwt_watcher.shutdown stopper in + let*! is_empty = Lwt_stream.is_empty stream in + let () = Assert.assert_true "Expected empty stream." is_empty in + return @@ assert_equal_string dac_hash_1 next + +let tests = [Tztest.tztest "Simple pub sub" `Quick test_simple_pub_sub] -- GitLab From dcce04bdb53a11ba68e578f3576b3cb54dbbfca9 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Wed, 8 Feb 2023 16:33:52 +0100 Subject: [PATCH 09/11] DAC/test: Test subscription time matters --- src/lib_dac_node/test/test_data_streamer.ml | 36 ++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/src/lib_dac_node/test/test_data_streamer.ml b/src/lib_dac_node/test/test_data_streamer.ml index 1f01d3b6386c..a80829503259 100644 --- a/src/lib_dac_node/test/test_data_streamer.ml +++ b/src/lib_dac_node/test/test_data_streamer.ml @@ -33,6 +33,8 @@ let dac_hash_1 = "hash_1" +let dac_hash_2 = "hash_2" + let assert_equal_string expected actual = Assert.equal ~loc:__LOC__ @@ -53,4 +55,36 @@ let test_simple_pub_sub () = let () = Assert.assert_true "Expected empty stream." is_empty in return @@ assert_equal_string dac_hash_1 next -let tests = [Tztest.tztest "Simple pub sub" `Quick test_simple_pub_sub] +let test_subscription_time () = + (* 1. subscriber_1 subscribes to the streamer component. + 2. [dac_hash_1] is published via the streamer component. + 3. subscriber_2 subscribes to the streamer component. + 4. [dac_hash_2] is published via streamer component. + + As such we expect: + - subscriber_1 first element inside the stream is [dac_hash_1] + and second one is [dac_hash_2]. + - subscriber_2 first element inside the stream is [dac_hash_2]. + + Note that subscriber_2 does not receive [dac_hash_1] as it was published + before his/her subscription to the streaming component. + *) + let open Lwt_result_syntax in + let open Data_streamer in + let streamer = init () in + let* stream_1, _stopper_1 = handle_subscribe streamer in + let* () = publish streamer dac_hash_1 in + let* stream_2, _stopper_2 = handle_subscribe streamer in + let* () = publish streamer dac_hash_2 in + let*! subscriber_1_first = Lwt_stream.next stream_1 in + let () = assert_equal_string dac_hash_1 subscriber_1_first in + let*! subscriber_1_second = Lwt_stream.next stream_1 in + let () = assert_equal_string dac_hash_2 subscriber_1_second in + let*! subscriber_2_first = Lwt_stream.next stream_2 in + return @@ assert_equal_string dac_hash_2 subscriber_2_first + +let tests = + [ + Tztest.tztest "Simple pub sub" `Quick test_simple_pub_sub; + Tztest.tztest "Test order of subscription" `Quick test_subscription_time; + ] -- GitLab From 7557f9c005cd0eab6655a5d8777a3393a0c8d39f Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 13 Feb 2023 13:28:14 +0100 Subject: [PATCH 10/11] DAC/test: Test closing subscriber stream --- src/lib_dac_node/test/test_data_streamer.ml | 26 +++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/lib_dac_node/test/test_data_streamer.ml b/src/lib_dac_node/test/test_data_streamer.ml index a80829503259..e518c7b3fee6 100644 --- a/src/lib_dac_node/test/test_data_streamer.ml +++ b/src/lib_dac_node/test/test_data_streamer.ml @@ -83,8 +83,34 @@ let test_subscription_time () = let*! subscriber_2_first = Lwt_stream.next stream_2 in return @@ assert_equal_string dac_hash_2 subscriber_2_first +let test_closing_subscriber_stream () = + (* 1. subscriber_1 subscribes to the streamer component. + 2. subscriber_2 subscribes to the streamer component. + 3. subscriber_1's [stream_1] is closed. + 4. [dac_hash_1] is published via streamer component. + + As such we expect: + - subscriber_1's [stream_1] is empty. + - subscriber_2 first element inside the stream is [dac_hash_1]. + *) + let open Lwt_result_syntax in + let open Data_streamer in + let streamer = init () in + let* stream_1, stopper_1 = handle_subscribe streamer in + let* stream_2, _stopper_2 = handle_subscribe streamer in + let () = Lwt_watcher.shutdown stopper_1 in + let* () = publish streamer dac_hash_1 in + let*! is_empty = Lwt_stream.is_empty stream_1 in + let () = Assert.assert_true "[stream_1]: expected empty stream." is_empty in + let*! subscriber_2_first = Lwt_stream.next stream_2 in + return @@ assert_equal_string dac_hash_1 subscriber_2_first + let tests = [ Tztest.tztest "Simple pub sub" `Quick test_simple_pub_sub; Tztest.tztest "Test order of subscription" `Quick test_subscription_time; + Tztest.tztest + "Test closing subscriber stream" + `Quick + test_closing_subscriber_stream; ] -- GitLab From 2254216f10ad781b008eb839004af1501e1b1f7d Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Mon, 13 Feb 2023 19:38:25 +0100 Subject: [PATCH 11/11] DAC: Add issue for making data streamer stopper abstract --- src/lib_dac_node/data_streamer.mli | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lib_dac_node/data_streamer.mli b/src/lib_dac_node/data_streamer.mli index f2498f87c768..361511e47376 100644 --- a/src/lib_dac_node/data_streamer.mli +++ b/src/lib_dac_node/data_streamer.mli @@ -27,6 +27,12 @@ mechanism of streaming data from publishers to subscribers. *) +(* TODO https://gitlab.com/tezos/tezos/-/issues/4848 + To make [Data_streamer] interface implementation agnostic we should + replace [Lwt_watcher.stopper] with an abstract [stopper] type, + and add [unsubscribe] method that uses it. +*) + (** ['a t] represents an instance of [Data_streamer], where ['a] is the type of the data that we stream. *) type 'a t -- GitLab