diff --git a/src/bin_dac_node/main_dac.ml b/src/bin_dac_node/main_dac.ml index 704dc7312a878a16c744ecb1397075d0d88da578..4f521573594082b9d3cbbc2e0e47db08a6278323 100644 --- a/src/bin_dac_node/main_dac.ml +++ b/src/bin_dac_node/main_dac.ml @@ -88,7 +88,13 @@ let config_init_command = data_dir; rpc_address; rpc_port; - mode = Legacy {threshold = 0; dac_members_addresses = []}; + mode = + Legacy + { + threshold = 0; + dac_members_addresses = []; + dac_cctxt_config = None; + }; reveal_data_dir = default_reveal_data_dir; } in diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 00137e6fddaef04dfc1c1602eae27ba06577831b..a8a4204708ac8aea69d81bbca5f36880f5c25a47 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -58,13 +58,25 @@ let add_service registerer service handler directory = registerer directory service handler let handle_serialize_dac_store_preimage dac_plugin cctxt dac_sk_uris page_store - (data, pagination_scheme) = + hash_streamer (data, pagination_scheme) = let open Lwt_result_syntax in let open Pages_encoding in let* root_hash = match pagination_scheme with | Merkle_tree_V0 -> - Merkle_tree.V0.serialize_payload dac_plugin ~page_store data + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897 + Once new "PUT /preimage" endpoint is implemented, pushing + a new root hash to the data streamer should be moved there. + Tezt for testing streaming of root hashes should also use + the new endpoint. *) + let* root_hash = + Merkle_tree.V0.serialize_payload dac_plugin ~page_store data + in + let* () = Data_streamer.publish hash_streamer root_hash in + let*! () = + Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash + in + return root_hash | Hash_chain_V0 -> Hash_chain.V0.serialize_payload dac_plugin @@ -106,7 +118,7 @@ let handle_retrieve_preimage dac_plugin page_store hash = Page_store.Filesystem.load dac_plugin page_store ~hash let register_serialize_dac_store_preimage ctx cctxt dac_sk_uris page_store - directory = + hash_streamer directory = directory |> add_service Tezos_rpc.Directory.register0 @@ -117,6 +129,7 @@ let register_serialize_dac_store_preimage ctx cctxt dac_sk_uris page_store cctxt dac_sk_uris page_store + hash_streamer input) let register_verify_external_message_signature ctx public_keys_opt directory = @@ -136,7 +149,27 @@ let register_retrieve_preimage dac_plugin page_store = (RPC_services.retrieve_preimage dac_plugin) (fun hash () () -> handle_retrieve_preimage dac_plugin page_store hash) -let register dac_plugin reveal_data_dir cctxt dac_public_keys_opt dac_sk_uris = +let register_monitor_root_hashes dac_plugin hash_streamer dir = + (* Handler for subscribing to the streaming of root hashes via + GET monitor/root_hashes RPC call. *) + let handle_monitor_root_hashes = + let open Lwt_syntax in + let* handle = Data_streamer.handle_subscribe hash_streamer in + match handle with + | Ok (stream, stopper) -> + let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + Tezos_rpc.Answer.return_stream {next; shutdown} + | Error error -> Tezos_rpc.Answer.fail error + in + Tezos_rpc.Directory.gen_register + dir + (Monitor_services.S.root_hashes dac_plugin) + (fun () () () -> handle_monitor_root_hashes) + +let register dac_plugin reveal_data_dir cctxt dac_public_keys_opt dac_sk_uris + hash_streamer = let page_store = Page_store.Filesystem.init reveal_data_dir in Tezos_rpc.Directory.empty |> register_serialize_dac_store_preimage @@ -144,8 +177,10 @@ let register dac_plugin reveal_data_dir cctxt dac_public_keys_opt dac_sk_uris = cctxt dac_sk_uris page_store + hash_streamer |> register_verify_external_message_signature dac_plugin dac_public_keys_opt |> register_retrieve_preimage dac_plugin page_store + |> register_monitor_root_hashes dac_plugin hash_streamer (* TODO: https://gitlab.com/tezos/tezos/-/issues/4750 Move this to RPC_server.Legacy once all operating modes are supported. *) @@ -158,7 +193,7 @@ let start_legacy ~rpc_address ~rpc_port ~reveal_data_dir ~threshold cctxt ctxt Tezos_rpc.Path.open_root (fun () -> match Node_context.get_status ctxt with - | Ready {dac_plugin = (module Dac_plugin); _} -> + | Ready {dac_plugin = (module Dac_plugin); hash_streamer} -> let _threshold = threshold in Lwt.return (register @@ -166,7 +201,8 @@ let start_legacy ~rpc_address ~rpc_port ~reveal_data_dir ~threshold cctxt ctxt reveal_data_dir cctxt dac_pks_opt - dac_sk_uris) + dac_sk_uris + hash_streamer) | Starting -> Lwt.return Tezos_rpc.Directory.empty) in let rpc_address = P2p_addr.of_string_exn rpc_address in diff --git a/src/lib_dac_node/RPC_services.ml b/src/lib_dac_node/RPC_services.ml index 3af3feee3b917b3b48dc84ef76452761937ef598..a7efc0b4c504fe99bb0b312283e3a829dea8c045 100644 --- a/src/lib_dac_node/RPC_services.ml +++ b/src/lib_dac_node/RPC_services.ml @@ -58,7 +58,7 @@ let external_message_query = |> seal let dac_store_preimage ctx = - Tezos_rpc.Service.put_service + Tezos_rpc.Service.post_service ~description:"Split DAC reveal data" ~query:Tezos_rpc.Query.empty ~input:store_preimage_request_encoding diff --git a/src/lib_dac_node/configuration.ml b/src/lib_dac_node/configuration.ml index 88f15234582e86d8a36ff051f6d199f4d482212e..b1f2455ab127095379015dfa0bfca8ba5a4d059a 100644 --- a/src/lib_dac_node/configuration.ml +++ b/src/lib_dac_node/configuration.ml @@ -24,6 +24,8 @@ (* *) (*****************************************************************************) +type host_and_port = {host : string; port : int} + type coordinator = { threshold : int; dac_members_addresses : Tezos_crypto.Aggregate_signature.public_key_hash list; @@ -40,6 +42,7 @@ type observer = {coordinator_rpc_address : string; coordinator_rpc_port : int} type legacy = { threshold : int; dac_members_addresses : Tezos_crypto.Aggregate_signature.public_key_hash list; + dac_cctxt_config : host_and_port option; } type mode = @@ -77,6 +80,13 @@ let default_reveal_data_dir = (Filename.concat (Sys.getenv "HOME") ".tezos-smart-rollup-node") "wasm_2_0_0" +let host_and_port_encoding = + let open Data_encoding in + conv + (fun {host; port} -> (host, port)) + (fun (host, port) -> {host; port}) + (obj2 (req "rpc-host" string) (req "rpc-port" uint16)) + let coordinator_encoding = Data_encoding.( conv_with_guard @@ -120,17 +130,18 @@ let observer_encoding = let legacy_encoding = Data_encoding.( conv_with_guard - (fun {threshold; dac_members_addresses} -> - (threshold, dac_members_addresses, true)) - (fun (threshold, dac_members_addresses, legacy) -> - if legacy then Ok {threshold; dac_members_addresses} - else Error "legacy flag should be set to true") - (obj3 + (fun {threshold; dac_members_addresses; dac_cctxt_config} -> + (threshold, dac_members_addresses, dac_cctxt_config, true)) + (fun (threshold, dac_members_addresses, dac_cctxt_config, legacy) -> + if legacy then Ok {threshold; dac_members_addresses; dac_cctxt_config} + else Error "'legacy' flag should be set to true") + (obj4 (dft "threshold" uint8 default_dac_threshold) (dft "dac_members" (list Tezos_crypto.Aggregate_signature.Public_key_hash.encoding) default_dac_addresses) + (opt "dac_cctxt_config" host_and_port_encoding) (req "legacy" bool))) let mode_config_encoding = diff --git a/src/lib_dac_node/configuration.mli b/src/lib_dac_node/configuration.mli index 98b2f633d0abd204e0fb9d171e270f8c3d7fc814..166c67eeba75205e41600448eb2c730d0ddee3e4 100644 --- a/src/lib_dac_node/configuration.mli +++ b/src/lib_dac_node/configuration.mli @@ -24,6 +24,10 @@ (* *) (*****************************************************************************) +(** [host_and_port] holds server configuration details of another dac node + required to instantiate [Dac_node_client.(#cctxt)]. *) +type host_and_port = {host : string; port : int} + (** Configuration type for coordinators. *) type coordinator = { threshold : int; @@ -54,6 +58,10 @@ type legacy = { message valid. *) dac_members_addresses : Tezos_crypto.Aggregate_signature.public_key_hash list; (** The list of tz4 addresses denoting the dac members. *) + dac_cctxt_config : host_and_port option; + (** When running integration tests with multiple dac nodes in the + [legacy] mode [dac_cctxt_config] is used to create + [Dac_node_client.cctxt] for the node that mimics coordinator. *) } (* TODO: https://gitlab.com/tezos/tezos/-/issues/4707. diff --git a/src/lib_dac_node/dac_node_client.ml b/src/lib_dac_node/dac_node_client.ml index f92b7ce43baddd77f5b617f7bd150ed7441023cd..b4f778c9e79c32477c7fd19d1479375444d5d630 100644 --- a/src/lib_dac_node/dac_node_client.ml +++ b/src/lib_dac_node/dac_node_client.ml @@ -23,9 +23,6 @@ (* *) (*****************************************************************************) -(* TODO https://gitlab.com/tezos/tezos/-/issues/4705 - Dac node client context should be properly tested *) - class type cctxt = object inherit Tezos_rpc.Context.generic @@ -41,13 +38,7 @@ class unix_cctxt ~rpc_config : cctxt = end let make_unix_cctxt ~scheme ~host ~port = - let endpoint = - Uri.with_uri - ~scheme:(Some scheme) - ~host:(Some host) - ~port:(Some port) - Uri.empty - in + let endpoint = Uri.make ~scheme ~host ~port () in let rpc_config = {Tezos_rpc_http_client_unix.RPC_client_unix.default_config with endpoint} in diff --git a/src/lib_dac_node/daemon.ml b/src/lib_dac_node/daemon.ml index bff5471abf00202676aaed53f5c6e64e26e5b685..273342f06a418f76c05be690970c08fb0c240ce8 100644 --- a/src/lib_dac_node/daemon.ml +++ b/src/lib_dac_node/daemon.ml @@ -113,6 +113,22 @@ module Handler = struct make_stream_daemon handler (Tezos_shell_services.Monitor_services.heads cctxt `Main) + + (** This handler will be invoked only when a [coordinator_cctxt] is specified + in the DAC node configuration. The DAC node tries to subscribes to the + stream of root hashes via the streamed GET /monitor/root_hashes RPC call + to the dac node corresponding to [coordinator_cctxt]. *) + let new_root_hash ctxt coordinator_cctxt = + let open Lwt_result_syntax in + let handler dac_plugin _stopper root_hash = + let*! () = Event.emit_new_root_hash_received dac_plugin root_hash in + return_unit + in + let*? dac_plugin = Node_context.get_dac_plugin ctxt in + let*! () = Event.(emit subscribed_to_root_hashes_stream ()) in + make_stream_daemon + (handler dac_plugin) + (Monitor_services.root_hashes coordinator_cctxt dac_plugin) end let daemonize handlers = @@ -140,10 +156,11 @@ let run ~data_dir cctxt = Configuration.load ~data_dir in let* () = Dac_manager.Storage.ensure_reveal_data_dir_exists reveal_data_dir in - let* addresses, threshold = + let* addresses, threshold, coordinator_config_opt = match mode with - | Configuration.Legacy {dac_members_addresses; threshold} -> - return (dac_members_addresses, threshold) + | Configuration.Legacy {dac_members_addresses; threshold; dac_cctxt_config} + -> + return (dac_members_addresses, threshold, dac_cctxt_config) | Configuration.Coordinator _ -> tzfail @@ Mode_not_supported "coordinator" | Configuration.Dac_member _ -> tzfail @@ Mode_not_supported "dac_member" | Configuration.Observer _ -> tzfail @@ Mode_not_supported "observer" @@ -159,7 +176,13 @@ let run ~data_dir cctxt = | Some (_pkh, pk_opt, sk_uri) -> (pk_opt, Some sk_uri)) |> List.split in - let ctxt = Node_context.init config cctxt in + let coordinator_cctxt_opt = + Option.map + (fun Configuration.{host; port} -> + Dac_node_client.make_unix_cctxt ~scheme:"http" ~host ~port) + coordinator_config_opt + in + let ctxt = Node_context.init config cctxt coordinator_cctxt_opt in let* rpc_server = RPC_server.( start_legacy @@ -178,5 +201,13 @@ let run ~data_dir cctxt = in (* Start daemon to resolve current protocol plugin *) let* () = daemonize [Handler.resolve_plugin_and_set_ready ctxt cctxt] in - (* Start never-ending monitoring daemons *) - daemonize [Handler.new_head ctxt cctxt] + (* Start never-ending monitoring daemons. [coordinator_cctxt] is required to + monitor new root hashes in legacy mode. *) + match coordinator_cctxt_opt with + | None -> daemonize [Handler.new_head ctxt cctxt] + | Some coordinator_cctxt -> + daemonize + [ + Handler.new_head ctxt cctxt; + Handler.new_root_hash ctxt coordinator_cctxt; + ] diff --git a/src/lib_dac_node/event.ml b/src/lib_dac_node/event.ml index b70f250a097b915977abd98c9b41741e9ffc98bc..592ed75b942e07324e9ad829c44cfa2ac9063f39 100644 --- a/src/lib_dac_node/event.ml +++ b/src/lib_dac_node/event.ml @@ -163,6 +163,40 @@ let dac_account_cannot_sign = ~level:Warning ("tz4_account", Tezos_crypto.Aggregate_signature.Public_key_hash.encoding) +let handle_new_subscription_to_hash_streamer = + declare_0 + ~section + ~name:"handle_new_subscription_to_hash_streamer" + ~msg: + "Subscription of another dac node to the hash streamer handled \ + successfully." + ~level:Notice + () + +let subscribed_to_root_hashes_stream = + declare_0 + ~section + ~name:"subscribed_to_root_hashes_stream" + ~msg:"Subscribed to root hashes stream" + ~level:Notice + () + +let new_root_hash_received = + declare_1 + ~section + ~name:"dac_node_new_root_hash_received" + ~msg:"Received new root hash via monitoring rpc {root_hash}" + ~level:Notice + ("root_hash", Data_encoding.string) + +let new_hash_pushed_to_data_streamer = + declare_1 + ~section + ~name:"root_hash_pushed_to_the_data_streamer" + ~msg:"New root hash pushed to the data streamer: {root_hash}" + ~level:Notice + ("root_hash", Data_encoding.string) + let proto_short_hash_string hash = Format.asprintf "%a" Protocol_hash.pp_short hash @@ -174,3 +208,9 @@ let emit_protocol_plugin_not_resolved current_protocol next_protocol = protocol_plugin_not_resolved ( proto_short_hash_string current_protocol, proto_short_hash_string next_protocol ) + +let emit_new_root_hash_received ((module P) : Dac_plugin.t) hash = + emit new_root_hash_received (P.to_hex hash) + +let emit_root_hash_pushed_to_data_streamer ((module P) : Dac_plugin.t) hash = + emit new_hash_pushed_to_data_streamer (P.to_hex hash) diff --git a/src/lib_dac_node/monitor_services.ml b/src/lib_dac_node/monitor_services.ml new file mode 100644 index 0000000000000000000000000000000000000000..e39debbace9260ff559522f557dc652d2485fbb7 --- /dev/null +++ b/src/lib_dac_node/monitor_services.ml @@ -0,0 +1,43 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module S = struct + let root_hashes ((module P) : Dac_plugin.t) = + Tezos_rpc.Service.get_service + ~description: + "Monitor a stream of root hashes that are produced by another dac node \ + responsible for the serialization of the dac payload (coordinator). " + ~query:Tezos_rpc.Query.empty + ~output:P.encoding + Tezos_rpc.Path.(open_root / "monitor" / "root_hashes") +end + +let root_hashes dac_node_cctxt dac_plugin = + Tezos_rpc.Context.make_streamed_call + (S.root_hashes dac_plugin) + dac_node_cctxt + () + () + () diff --git a/src/lib_dac_node/monitor_services.mli b/src/lib_dac_node/monitor_services.mli new file mode 100644 index 0000000000000000000000000000000000000000..074db259889ab3a81c5f920576a0b0bdb8df9a48 --- /dev/null +++ b/src/lib_dac_node/monitor_services.mli @@ -0,0 +1,43 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +module S : sig + (** Define RPC GET /monitor/root_hashes. *) + val root_hashes : + Dac_plugin.t -> + ([`GET], unit, unit, unit, unit, Dac_plugin.hash) Tezos_rpc.Service.service +end + +(** [root_hashes streamed_cctxt dac_plugin] returns a stream of root hashes + and a stopper for it. + + Stream is produced by calling RPC GET /monitor/root_hashes. +*) +val root_hashes : + #Tezos_rpc.Context.streamed -> + Dac_plugin.t -> + (Dac_plugin.hash Lwt_stream.t * Tezos_rpc.Context.stopper) + Error_monad.tzresult + Lwt.t diff --git a/src/lib_dac_node/node_context.ml b/src/lib_dac_node/node_context.ml index bb416dbc6911e46242fd4155e01c400b0608a733..0460c34483c8d65511790db5406c0a087bd6ad61 100644 --- a/src/lib_dac_node/node_context.ml +++ b/src/lib_dac_node/node_context.ml @@ -27,7 +27,13 @@ exception Status_already_ready type dac_plugin_module = (module Dac_plugin.T) -type ready_ctxt = {dac_plugin : dac_plugin_module} +type ready_ctxt = { + dac_plugin : dac_plugin_module; + hash_streamer : Dac_plugin.hash Data_streamer.t; + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4895 + This could be problematic in case coordinator and member/observer + use two different plugins that bind different underlying hashes. *) +} type status = Ready of ready_ctxt | Starting @@ -35,9 +41,20 @@ type t = { mutable status : status; config : Configuration.t; tezos_node_cctxt : Client_context.full; + coordinator_opt : Dac_node_client.cctxt option; + (* TODO: https://gitlab.com/tezos/tezos/-/issues/4896 + [coordinator_opt] is meant to be used for running integration tests + in the multiple dac node setup, where all nodes are in the [legacy] + mode. In this setup we normally try to mimic the role of coordinator + with one node, whereas the others want to interact with it. + This is done via [Dac_node_client.cctxt]. + + Eventually, once the legacy mode is removed we should revisit the + need for this fieeld.*) } -let init config cctxt = {status = Starting; config; tezos_node_cctxt = cctxt} +let init config cctxt coordinator_opt = + {status = Starting; config; tezos_node_cctxt = cctxt; coordinator_opt} let set_ready ctxt dac_plugin = match ctxt.status with @@ -46,7 +63,7 @@ let set_ready ctxt dac_plugin = Currently, Dac only supports coordinator functionalities but we might want to filter this capability out depending on the profile. *) - ctxt.status <- Ready {dac_plugin} + ctxt.status <- Ready {dac_plugin; hash_streamer = Data_streamer.init ()} | Ready _ -> raise Status_already_ready type error += Node_not_ready @@ -80,5 +97,5 @@ let get_tezos_node_cctxt ctxt = ctxt.tezos_node_cctxt let get_dac_plugin ctxt = let open Result_syntax in match ctxt.status with - | Ready {dac_plugin} -> Ok dac_plugin + | Ready {dac_plugin; hash_streamer = _} -> Ok dac_plugin | Starting -> tzfail Node_not_ready diff --git a/src/lib_dac_node/node_context.mli b/src/lib_dac_node/node_context.mli index 1a039db028dd58db07a862ff20cf65f0935ab134..e01aa61c9ef62cdb0c79feef72fd9d58851a5748 100644 --- a/src/lib_dac_node/node_context.mli +++ b/src/lib_dac_node/node_context.mli @@ -24,9 +24,15 @@ (*****************************************************************************) type dac_plugin_module = (module Dac_plugin.T) -(** A [ready_ctx] value contains globally needed informations for a running dac - node. It is available when the DAC plugin has been loaded. *) -type ready_ctxt = {dac_plugin : dac_plugin_module} +(** A [ready_ctx] value contains globally needed information for a running dac + node. It is available when the DAC plugin has been loaded. Additionally, + it also contains an instance of [Dac_plugin.hash Data_streamer.t] - a + component for streaming root hashes, produced during the serialization of + dac payload. *) +type ready_ctxt = { + dac_plugin : dac_plugin_module; + hash_streamer : Dac_plugin.hash Data_streamer.t; +} (** The status of the dac node. *) type status = Ready of ready_ctxt | Starting @@ -35,10 +41,13 @@ type status = Ready of ready_ctxt | Starting fields are available through accessors. *) type t -(** [init config cctx] creates a [t] with a status set to [Starting] - using the given dac node configuration [config], - and tezos node client context [cctx]. *) -val init : Configuration.t -> Client_context.full -> t +(** [init config cctxt dac_node_cctxt] creates a [t] with a status set to + [Starting] using the given dac node configuration [config], + tezos node client context [cctxt], and optional client context of + another dac node [dac_node_cctxt], which can be used for writting + tests with two dac nodes running the legacy mode. *) +val init : + Configuration.t -> Client_context.full -> Dac_node_client.cctxt option -> t (** Raised by [set_ready] when the status is already [Ready _] *) exception Status_already_ready diff --git a/tezt/lib_tezos/rollup.ml b/tezt/lib_tezos/rollup.ml index bbc2ff08a2b235cddd08bba23656c0fbd41d8c12..8f110a854e207fbc21eef5367fe6e700d49abdf3 100644 --- a/tezt/lib_tezos/rollup.ml +++ b/tezt/lib_tezos/rollup.ml @@ -901,7 +901,7 @@ module Dac = struct pagination_scheme) in let data : RPC_core.data = Data (JSON.unannotate preimage) in - make ~data PUT ["store_preimage"] @@ fun json -> + make ~data POST ["store_preimage"] @@ fun json -> JSON. ( json |-> "root_hash" |> as_string, json |-> "external_message" |> get_bytes_from_json_string_node ) diff --git a/tezt/tests/dac.ml b/tezt/tests/dac.ml index 33a80b7f1d42d55062a09988ad7f0a9cfb091322..500dfeaefe12e288bc820a9b7313b310b04874a3 100644 --- a/tezt/tests/dac.ml +++ b/tezt/tests/dac.ml @@ -214,6 +214,22 @@ let wait_for_layer1_block_processing dac_node level = Dac_node.wait_for dac_node "dac_node_layer_1_new_head.v0" (fun e -> if JSON.(e |-> "level" |> as_int) = level then Some () else None) +let wait_for_root_hash_pushed_to_data_streamer dac_node root_hash = + Dac_node.wait_for + dac_node + "root_hash_pushed_to_the_data_streamer.v0" + (fun json -> if JSON.(json |> as_string) = root_hash then Some () else None) + +let wait_for_received_root_hash dac_node root_hash = + Dac_node.wait_for dac_node "dac_node_new_root_hash_received.v0" (fun json -> + if JSON.(json |> as_string) = root_hash then Some () else None) + +let wait_for_handle_new_subscription_to_hash_streamer dac_node = + Dac_node.wait_for + dac_node + "handle_new_subscription_to_hash_streamer.v0" + (fun _ -> Some ()) + type status = Applied | Failed of {error_id : string} let pp fmt = function @@ -574,6 +590,129 @@ let test_dac_node_dac_threshold_not_reached = let* () = error_promise in Dac_node.terminate dac_node +(** This modules encapsulates tests where we have two dac nodes running in + the legacy mode interacting with each other. As such one node normally tries + to mimic the coordinator and the other tries to mimic signer or observer. + Note that both nodes still run in the [legacy] mode, where as such there is + no notion of profiles. Once we have a fully working profiles, tests from this + module should be refactored. *) +module Legacy = struct + let set_coordinator dac_node coordinator = + let coordinator = + `O + [ + ("rpc-host", `String (Dac_node.rpc_host coordinator)); + ("rpc-port", `Float (float_of_int (Dac_node.rpc_port coordinator))); + ] + in + let mode_updated = + Dac_node.Config_file.read dac_node + |> JSON.get "mode" + |> JSON.put + ( "dac_cctxt_config", + JSON.annotate ~origin:"dac_node_config" coordinator ) + in + Dac_node.Config_file.update dac_node (JSON.put ("mode", mode_updated)) + + let test_streaming_of_root_hashes _protocol node client coordinator = + let coordinator_serializes_payload ~payload ~expected_rh = + let* actual_rh, _l1_operation = + RPC.call + coordinator + (Rollup.Dac.RPC.dac_store_preimage + ~payload + ~pagination_scheme:"Merkle_tree_V0") + in + return @@ check_valid_root_hash expected_rh actual_rh + in + (* 1. Create two new dac nodes; [observer_1] and [observer_2]. + 2. Initialize their default configuration. + 3. Update their configuration so that their dac node client context + points to [coordinator]. *) + let observer_1 = Dac_node.create ~node ~client () in + let observer_2 = Dac_node.create ~node ~client () in + let* _ = Dac_node.init_config observer_1 in + let* _ = Dac_node.init_config observer_2 in + let () = set_coordinator observer_1 coordinator in + let () = set_coordinator observer_2 coordinator in + let payload_1 = "test_1" in + let expected_rh_1 = + "00b29d7d1e6668fb35a9ff6d46fa321d227e9b93dae91c4649b53168e8c10c1827" + in + let payload_2 = "test_2" in + let expected_rh_2 = + "00f2f47f480fec0e4180930790e52a54b2dbd7676b5fa2a25dd93bf22969f22e33" + in + let push_promise_1 = + wait_for_root_hash_pushed_to_data_streamer coordinator expected_rh_1 + in + let push_promise_2 = + wait_for_root_hash_pushed_to_data_streamer coordinator expected_rh_2 + in + let observer_1_promise_1 = + wait_for_received_root_hash observer_1 expected_rh_1 + in + let observer_1_promise_2 = + wait_for_received_root_hash observer_1 expected_rh_2 + in + let observer_2_promise_1 = + wait_for_received_root_hash observer_2 expected_rh_1 + in + let observer_2_promise_2 = + wait_for_received_root_hash observer_2 expected_rh_2 + in + + (* Test starts here *) + + (* Start running [observer_1]. From now on we expect [observer_1] to monitor + streamed root hashes produced by [coordinator]. [coordinator] produces + and pushes them as a side effect of serializing dac payload. *) + let observer_1_is_subscribed = + wait_for_handle_new_subscription_to_hash_streamer coordinator + in + let* () = Dac_node.run observer_1 in + let* () = observer_1_is_subscribed in + (* [coordinator] serializes [payload_1]. We expect it would push + [expected_rh_1] to all attached subscribers, i.e. to [observer_1]. *) + let* () = + coordinator_serializes_payload + ~payload:payload_1 + ~expected_rh:expected_rh_1 + in + (* Assert [coordinator] emitted event that [expected_rh_1] was pushed + to the data_streamer. *) + let* () = push_promise_1 in + (* Assert [observer_1] emitted event of received [expected_rh_1]. *) + let* () = observer_1_promise_1 in + (* Start running [observer_2]. We expect that from now on [observer_2] + will also monitor streamed root hashes from [coordinator]. *) + let observer_2_is_subscribed = + wait_for_handle_new_subscription_to_hash_streamer coordinator + in + let* () = Dac_node.run observer_2 in + let* () = observer_2_is_subscribed in + (* [coordinator] serializes [payload_2]. We expect it would push + [expected_rh_2] to all attached subscribers, + i.e. to both [observer_1] and [observer_2] this time. *) + let* () = + coordinator_serializes_payload + ~payload:payload_2 + ~expected_rh:expected_rh_2 + in + (* Assert [coordinator] emitted event. *) + let* () = push_promise_2 in + (* Assert both [observer_1] and [observer_2] received [expected_rh_2]. *) + let* () = observer_1_promise_2 in + let* () = observer_2_promise_2 in + (* Since [observer_2] was not running when [expected_rh_1] was generated + and streamed by [coordinator], we expect it never received it. + We assert this, by making sure that the promise of [observer_2] about + waiting for the emitted event with payload [expected_rh_1] is still not + resolved after the promise [observer_2_promise_2] has been resolved. *) + assert (Lwt.is_sleeping observer_2_promise_1) ; + unit +end + let register ~protocols = (* Tests with layer1 and dac nodes *) test_dac_node_startup protocols ; @@ -602,4 +741,9 @@ let register ~protocols = ~tags:["dac"; "dac_node"] "dac_rollup_arith_wrong_hash" test_reveals_fails_on_wrong_hash + protocols ; + scenario_with_layer1_and_dac_nodes + ~tags:["dac"; "dac_node"] + "dac_streaming_of_root_hashes_in_legacy_mode" + Legacy.test_streaming_of_root_hashes protocols