From 1e2d1f22d2f7b11fe6af997bb12c834e05eed785 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 13:11:20 +0200 Subject: [PATCH 1/7] DAC: Move `handle_get_missing_page` to `Observer` module --- src/lib_dac_node/RPC_server.ml | 39 +++++++++++++++++----------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 8bcc33a66cd7..9c51dec6f2e8 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -171,18 +171,6 @@ let handle_get_certificate dac_plugin node_store raw_root_hash = V0 (V0.make raw_root_hash aggregate_signature witnesses))) value_opt -let handle_get_missing_page timeout cctxts page_store dac_plugin raw_root_hash = - let open Lwt_result_syntax in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let remote_store = - Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) - in - let* preimage = - Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash - in - let*! () = Event.emit_fetched_missing_page root_hash in - return preimage - let register_get_health_live cctxt directory = directory |> add_service @@ -239,13 +227,6 @@ let register_get_certificate node_store dac_plugin = (fun root_hash () () -> handle_get_certificate dac_plugin node_store root_hash) -let register_get_missing_page dac_plugin page_store cctxts timeout = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_missing_page - (fun root_hash () () -> - handle_get_missing_page timeout cctxts page_store dac_plugin root_hash) - let register_get_pages dac_plugin page_store = add_service Tezos_rpc.Directory.register1 @@ -393,6 +374,26 @@ module Committee_member = struct end module Observer = struct + let handle_get_missing_page timeout cctxts page_store dac_plugin raw_root_hash + = + let open Lwt_result_syntax in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + let remote_store = + Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) + in + let* preimage = + Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash + in + let*! () = Event.emit_fetched_missing_page root_hash in + return preimage + + let register_get_missing_page dac_plugin page_store cctxts timeout = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V0.get_missing_page + (fun root_hash () () -> + handle_get_missing_page timeout cctxts page_store dac_plugin root_hash) + let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store -- GitLab From 724fab118373c70d8329e220fb9e4d4f9ed07d36 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 13:15:26 +0200 Subject: [PATCH 2/7] DAC: Introduce `RPC_handlers` module --- src/lib_dac_node/RPC_handlers.ml | 25 +++++++++++++++++++++++++ src/lib_dac_node/RPC_handlers.mli | 27 +++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 src/lib_dac_node/RPC_handlers.ml create mode 100644 src/lib_dac_node/RPC_handlers.mli diff --git a/src/lib_dac_node/RPC_handlers.ml b/src/lib_dac_node/RPC_handlers.ml new file mode 100644 index 000000000000..cec5df2189c5 --- /dev/null +++ b/src/lib_dac_node/RPC_handlers.ml @@ -0,0 +1,25 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Trili Tech *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) diff --git a/src/lib_dac_node/RPC_handlers.mli b/src/lib_dac_node/RPC_handlers.mli new file mode 100644 index 000000000000..af51cd1e5eff --- /dev/null +++ b/src/lib_dac_node/RPC_handlers.mli @@ -0,0 +1,27 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Trili Tech *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** [RPC_handlers] is a module with handlers for DAC API endpoints. *) -- GitLab From 990c8bf61c71f5639212decebff1c1b8cec70aa1 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 13:23:56 +0200 Subject: [PATCH 3/7] DAC: Move health handlers to `RPC_handlers` --- src/lib_dac_node/RPC_handlers.ml | 24 ++++++++++++++++ src/lib_dac_node/RPC_handlers.mli | 8 ++++++ src/lib_dac_node/RPC_server.ml | 47 +++++++++---------------------- 3 files changed, 45 insertions(+), 34 deletions(-) diff --git a/src/lib_dac_node/RPC_handlers.ml b/src/lib_dac_node/RPC_handlers.ml index cec5df2189c5..08e9bff54120 100644 --- a/src/lib_dac_node/RPC_handlers.ml +++ b/src/lib_dac_node/RPC_handlers.ml @@ -23,3 +23,27 @@ (* DEALINGS IN THE SOFTWARE. *) (* *) (*****************************************************************************) + +type error += DAC_node_not_ready of string + +let () = + register_error_kind + `Permanent + ~id:"dac_node_not_ready" + ~title:"DAC Node is not ready" + ~description: + "RPC server of DAC node is not started and plugin is not resolved." + ~pp:(fun ppf message -> + Format.fprintf ppf "DAC Node is not ready, current status is: %s" message) + Data_encoding.(obj1 (req "value" string)) + (function DAC_node_not_ready message -> Some message | _ -> None) + (fun message -> DAC_node_not_ready message) + +let handle_get_health_live node_ctxt = + match Node_context.get_status node_ctxt with + | Ready _ | Starting -> Lwt_result_syntax.return true + +let handle_get_health_ready node_ctxt = + match Node_context.get_status node_ctxt with + | Ready _ -> Lwt_result_syntax.return true + | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting" diff --git a/src/lib_dac_node/RPC_handlers.mli b/src/lib_dac_node/RPC_handlers.mli index af51cd1e5eff..b430c9b1b475 100644 --- a/src/lib_dac_node/RPC_handlers.mli +++ b/src/lib_dac_node/RPC_handlers.mli @@ -25,3 +25,11 @@ (*****************************************************************************) (** [RPC_handlers] is a module with handlers for DAC API endpoints. *) + +type error += DAC_node_not_ready of string + +(** [handle_get_health_live] is a handler for "GET dac/health/live". *) +val handle_get_health_live : Node_context.t -> (bool, error trace) result Lwt.t + +(** [handle_get_health_ready] is a handler for "GET dac/health/ready". *) +val handle_get_health_ready : Node_context.t -> (bool, error trace) result Lwt.t diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 9c51dec6f2e8..819f49dae7e0 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -31,7 +31,6 @@ open Tezos_rpc_http_server type error += | Cannot_construct_external_message | Cannot_deserialize_external_message - | DAC_node_not_ready of string let () = register_error_kind @@ -53,30 +52,24 @@ let () = Format.fprintf ppf "External rollup message could not be deserialized") Data_encoding.unit (function Cannot_deserialize_external_message -> Some () | _ -> None) - (fun () -> Cannot_deserialize_external_message) ; - register_error_kind - `Permanent - ~id:"dac_node_not_ready" - ~title:"DAC Node is not ready" - ~description: - "RPC server of DAC node is not started and plugin is not resolved." - ~pp:(fun ppf message -> - Format.fprintf ppf "DAC Node is not ready, current status is: %s" message) - Data_encoding.(obj1 (req "value" string)) - (function DAC_node_not_ready message -> Some message | _ -> None) - (fun message -> DAC_node_not_ready message) + (fun () -> Cannot_deserialize_external_message) let add_service registerer service handler directory = registerer directory service handler -let handle_get_health_live node_ctxt = - match Node_context.get_status node_ctxt with - | Ready _ | Starting -> Lwt_result_syntax.return true +let register_get_health_live cctxt directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.get_health_live + (fun () () -> RPC_handlers.handle_get_health_live cctxt) -let handle_get_health_ready node_ctxt = - match Node_context.get_status node_ctxt with - | Ready _ -> Lwt_result_syntax.return true - | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting" +let register_get_health_ready cctxt directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.get_health_ready + (fun () () -> RPC_handlers.handle_get_health_ready cctxt) let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store hash_streamer (data, pagination_scheme) = @@ -171,20 +164,6 @@ let handle_get_certificate dac_plugin node_store raw_root_hash = V0 (V0.make raw_root_hash aggregate_signature witnesses))) value_opt -let register_get_health_live cctxt directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.get_health_live - (fun () () -> handle_get_health_live cctxt) - -let register_get_health_ready cctxt directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.get_health_ready - (fun () () -> handle_get_health_ready cctxt) - let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer directory = directory -- GitLab From bc6a95654914789bf6d51d0b71a62c600d8274d7 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 13:31:44 +0200 Subject: [PATCH 4/7] DAC: Move handlers common to `V0` and `V1` to `RPC_handlers` --- src/lib_dac_node/RPC_handlers.ml | 10 ++++++++++ src/lib_dac_node/RPC_handlers.mli | 13 +++++++++++++ src/lib_dac_node/RPC_server.ml | 18 ++++++++++-------- 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/src/lib_dac_node/RPC_handlers.ml b/src/lib_dac_node/RPC_handlers.ml index 08e9bff54120..2e1e99e3eab7 100644 --- a/src/lib_dac_node/RPC_handlers.ml +++ b/src/lib_dac_node/RPC_handlers.ml @@ -47,3 +47,13 @@ let handle_get_health_ready node_ctxt = match Node_context.get_status node_ctxt with | Ready _ -> Lwt_result_syntax.return true | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting" + +module Shared_by_V0_and_V1 = struct + (** [handle_get_page] is a handler shared by both "GET v0/preimage" + and "GET v1/pages". It fetches a page that corresponds + to a given [raw_hash]. *) + let handle_get_page dac_plugin page_store raw_hash = + let open Lwt_result_syntax in + let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in + Page_store.Filesystem.load dac_plugin page_store hash +end diff --git a/src/lib_dac_node/RPC_handlers.mli b/src/lib_dac_node/RPC_handlers.mli index b430c9b1b475..0459af3254e4 100644 --- a/src/lib_dac_node/RPC_handlers.mli +++ b/src/lib_dac_node/RPC_handlers.mli @@ -33,3 +33,16 @@ val handle_get_health_live : Node_context.t -> (bool, error trace) result Lwt.t (** [handle_get_health_ready] is a handler for "GET dac/health/ready". *) val handle_get_health_ready : Node_context.t -> (bool, error trace) result Lwt.t + +(** [Shared_by_V0_and_V1] encapsulates handlers that are common to + both [V0] and [V1] API. *) +module Shared_by_V0_and_V1 : sig + (** [handle_get_page] is a handler shared by both "GET v0/preimage" + and "GET v1/pages". It fetches a page that corresponds + to a given [raw_hash]. *) + val handle_get_page : + Dac_plugin.t -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash -> + (bytes, tztrace) result Lwt.t +end diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 819f49dae7e0..344bd0927a8f 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -137,12 +137,6 @@ let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message = signature witnesses -(** [handle_get_preimage] is shared by both [V0] and [V1] API. *) -let handle_get_preimage dac_plugin page_store raw_hash = - let open Lwt_result_syntax in - let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in - Page_store.Filesystem.load dac_plugin page_store hash - (* Handler for subscribing to the streaming of root hashes via GET monitor/root_hashes RPC call. *) let handle_monitor_root_hashes hash_streamer = @@ -191,7 +185,11 @@ let register_get_preimage dac_plugin page_store = add_service Tezos_rpc.Directory.register1 RPC_services.V0.get_preimage - (fun hash () () -> handle_get_preimage dac_plugin page_store hash) + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page + dac_plugin + page_store + hash) let register_monitor_root_hashes hash_streamer dir = Tezos_rpc.Directory.gen_register @@ -210,7 +208,11 @@ let register_get_pages dac_plugin page_store = add_service Tezos_rpc.Directory.register1 RPC_services.V1.get_pages - (fun hash () () -> handle_get_preimage dac_plugin page_store hash) + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page + dac_plugin + page_store + hash) module Coordinator = struct let handle_post_preimage dac_plugin page_store hash_streamer payload = -- GitLab From 1796bfc7ed06966f98aafeda646baf47addee454 Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 13:45:09 +0200 Subject: [PATCH 5/7] DAC: Move `V0` handlers to `RPC_handlers` --- src/lib_dac_node/RPC_handlers.ml | 203 ++++++++++++++++++++++++++- src/lib_dac_node/RPC_handlers.mli | 76 +++++++++- src/lib_dac_node/RPC_server.ml | 221 +++--------------------------- 3 files changed, 295 insertions(+), 205 deletions(-) diff --git a/src/lib_dac_node/RPC_handlers.ml b/src/lib_dac_node/RPC_handlers.ml index 2e1e99e3eab7..db36003baa9f 100644 --- a/src/lib_dac_node/RPC_handlers.ml +++ b/src/lib_dac_node/RPC_handlers.ml @@ -24,7 +24,10 @@ (* *) (*****************************************************************************) -type error += DAC_node_not_ready of string +type error += + | DAC_node_not_ready of string + | Cannot_construct_external_message + | Cannot_deserialize_external_message let () = register_error_kind @@ -37,7 +40,27 @@ let () = Format.fprintf ppf "DAC Node is not ready, current status is: %s" message) Data_encoding.(obj1 (req "value" string)) (function DAC_node_not_ready message -> Some message | _ -> None) - (fun message -> DAC_node_not_ready message) + (fun message -> DAC_node_not_ready message) ; + register_error_kind + `Permanent + ~id:"dac_cannot_construct_external_message" + ~title:"External rollup message could not be constructed" + ~description:"External rollup message could not be constructed" + ~pp:(fun ppf () -> + Format.fprintf ppf "External rollup message could not be constructed") + Data_encoding.unit + (function Cannot_construct_external_message -> Some () | _ -> None) + (fun () -> Cannot_construct_external_message) ; + register_error_kind + `Permanent + ~id:"dac_cannot_deserialize_rollup_external_message" + ~title:"External rollup message could not be deserialized" + ~description:"External rollup message could not be deserialized" + ~pp:(fun ppf () -> + Format.fprintf ppf "External rollup message could not be deserialized") + Data_encoding.unit + (function Cannot_deserialize_external_message -> Some () | _ -> None) + (fun () -> Cannot_deserialize_external_message) let handle_get_health_live node_ctxt = match Node_context.get_status node_ctxt with @@ -57,3 +80,179 @@ module Shared_by_V0_and_V1 = struct let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in Page_store.Filesystem.load dac_plugin page_store hash end + +module V0 = struct + let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store + hash_streamer (data, pagination_scheme) = + let open Lwt_result_syntax in + let open Pages_encoding in + let* root_hash = + match pagination_scheme with + | Pagination_scheme.Merkle_tree_V0 -> + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897 + Once new "PUT /preimage" endpoint is implemented, pushing + a new root hash to the data streamer should be moved there. + Tezt for testing streaming of root hashes should also use + the new endpoint. *) + let* root_hash = + Merkle_tree.V0.Filesystem.serialize_payload + dac_plugin + ~page_store + data + in + let () = + Data_streamer.publish + hash_streamer + (Dac_plugin.hash_to_raw root_hash) + in + let*! () = + Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash + in + return root_hash + | Pagination_scheme.Hash_chain_V0 -> + Hash_chain.V0.serialize_payload + dac_plugin + ~for_each_page:(fun (hash, content) -> + Page_store.Filesystem.save dac_plugin page_store ~hash ~content) + data + in + let* signature, witnesses = + Signature_manager.Legacy.sign_root_hash + dac_plugin + cctxt + dac_sk_uris + root_hash + in + let raw_root_hash = Dac_plugin.hash_to_raw root_hash in + let*! external_message = + External_message.Default.make dac_plugin root_hash signature witnesses + in + match external_message with + | Ok external_message -> return @@ (raw_root_hash, external_message) + | Error _ -> tzfail @@ Cannot_construct_external_message + + let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message + = + let open Lwt_result_syntax in + let ((module Plugin) : Dac_plugin.t) = dac_plugin in + let external_message = + let open Option_syntax in + let* encoded_l1_message in + let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in + External_message.Default.of_bytes Plugin.encoding as_bytes + in + match external_message with + | None -> tzfail @@ Cannot_deserialize_external_message + | Some {root_hash; signature; witnesses} -> + Signature_manager.verify + dac_plugin + ~public_keys_opt + (Dac_plugin.hash_to_raw root_hash) + signature + witnesses + + let handle_monitor_root_hashes hash_streamer = + let open Lwt_syntax in + let stream, stopper = Data_streamer.handle_subscribe hash_streamer in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in + Tezos_rpc.Answer.return_stream {next; shutdown} + + let handle_get_certificate dac_plugin node_store raw_root_hash = + let open Lwt_result_syntax in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + + let+ value_opt = Store.Certificate_store.find node_store root_hash in + Option.map + (fun Store.{aggregate_signature; witnesses} -> + Certificate_repr.( + V0 (V0.make raw_root_hash aggregate_signature witnesses))) + value_opt + + module Coordinator = struct + let handle_post_preimage dac_plugin page_store hash_streamer payload = + let open Lwt_result_syntax in + let* root_hash = + Pages_encoding.Merkle_tree.V0.Filesystem.serialize_payload + dac_plugin + ~page_store + payload + in + let () = + Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) + in + let*! () = + Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash + in + return @@ Dac_plugin.hash_to_raw root_hash + + let handle_monitor_certificate dac_plugin ro_node_store + certificate_streamers raw_root_hash committee_members = + let open Lwt_result_syntax in + let*? stream, stopper = + Certificate_streamers.handle_subscribe + dac_plugin + certificate_streamers + raw_root_hash + in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + let*! () = Event.emit_new_subscription_to_certificate_updates root_hash in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + (* Add the current certificate to the streamer, if any, to ensure that + a certificate is returned even in the case that no updates to the + certificate happen for a long time. *) + let*! current_certificate_store_value_res = + Store.Certificate_store.find ro_node_store root_hash + in + match current_certificate_store_value_res with + | Ok current_certificate_store_value -> + let () = + Option.iter + (fun Store.{aggregate_signature; witnesses} -> + let certificate = + Certificate_repr.( + V0 (V0.make raw_root_hash aggregate_signature witnesses)) + in + let _ = + Certificate_streamers.push + dac_plugin + certificate_streamers + raw_root_hash + certificate + in + if + Certificate_repr.all_committee_members_have_signed + committee_members + certificate + then + let _ = + Certificate_streamers.close + dac_plugin + certificate_streamers + raw_root_hash + in + () + else ()) + current_certificate_store_value + in + return (next, shutdown) + | Error e -> fail e + end + + module Observer = struct + let handle_get_missing_page timeout cctxts page_store dac_plugin + raw_root_hash = + let open Lwt_result_syntax in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + let remote_store = + Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) + in + let* preimage = + Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash + in + let*! () = Event.emit_fetched_missing_page root_hash in + return preimage + end +end diff --git a/src/lib_dac_node/RPC_handlers.mli b/src/lib_dac_node/RPC_handlers.mli index 0459af3254e4..ece545af4db8 100644 --- a/src/lib_dac_node/RPC_handlers.mli +++ b/src/lib_dac_node/RPC_handlers.mli @@ -26,7 +26,10 @@ (** [RPC_handlers] is a module with handlers for DAC API endpoints. *) -type error += DAC_node_not_ready of string +type error += + | DAC_node_not_ready of string + | Cannot_construct_external_message + | Cannot_deserialize_external_message (** [handle_get_health_live] is a handler for "GET dac/health/live". *) val handle_get_health_live : Node_context.t -> (bool, error trace) result Lwt.t @@ -46,3 +49,74 @@ module Shared_by_V0_and_V1 : sig Dac_plugin.raw_hash -> (bytes, tztrace) result Lwt.t end + +(** [V0] encapsulates handlers specific to [V0] API. *) +module V0 : sig + (** [handle_post_store_preimage] is a handler for "POST v0/store_preimage". *) + val handle_post_store_preimage : + Dac_plugin.t -> + #Client_context.wallet -> + Client_keys.aggregate_sk_uri option trace -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash Data_streamer.t -> + bytes * Pagination_scheme.t -> + (Dac_plugin.raw_hash * bytes, tztrace) result Lwt.t + + (** [handle_get_verify_signature] is a handler for "GET v0/verify_signature". *) + val handle_get_verify_signature : + Dac_plugin.t -> + Tezos_crypto.Aggregate_signature.public_key option trace -> + string option -> + (bool, error trace) result Lwt.t + + (** [handle_monitor_root_hashes] is a handler for subscribing to the + streaming of root hashes via "GET v0/monitor/root_hashes" RPC call. *) + val handle_monitor_root_hashes : + Dac_plugin.raw_hash Data_streamer.t -> + Dac_plugin.raw_hash Tezos_rpc__RPC_answer.t Lwt.t + + (** [handle_get_certificate] is a handler for "GET v0/certificate". *) + val handle_get_certificate : + Dac_plugin.t -> + [> `Read] Store.Irmin_store.t -> + Dac_plugin.raw_hash -> + (Certificate_repr.t option, tztrace) result Lwt.t + + (** [Coordinator] encapsulates, Coordinator's mode specific handlers + of [V0] API. *) + module Coordinator : sig + (** [handle_post_preimage] is a handler for "PUT v0/preimage". *) + val handle_post_preimage : + Dac_plugin.t -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash Data_streamer.t -> + bytes -> + (Dac_plugin.raw_hash, tztrace) result Lwt.t + + (** [handle_monitor_certificate] is a handler for subscribing to the stream + of certificate updates via "GET v0/monitor/certificate" RPC call. *) + val handle_monitor_certificate : + Dac_plugin.t -> + [> `Read] Store.Irmin_store.t -> + Certificate_streamers.t -> + Dac_plugin.raw_hash -> + 'a trace -> + ( (unit -> Certificate_repr.t option Lwt.t) * (unit -> unit), + tztrace ) + result + Lwt.t + end + + (** [Observer] encapsulates, Observer's mode specific handlers + of [V0] API. *) + module Observer : sig + (** [handle_get_missing_page] is a handler for "GET v0/missing_page". *) + val handle_get_missing_page : + float -> + Dac_node_client.cctxt trace -> + Page_store.Filesystem.t -> + Dac_plugin.t -> + Dac_plugin.raw_hash -> + (bytes, tztrace) result Lwt.t + end +end diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 344bd0927a8f..88ded481fec1 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -28,32 +28,6 @@ open Tezos_rpc_http open Tezos_rpc_http_server -type error += - | Cannot_construct_external_message - | Cannot_deserialize_external_message - -let () = - register_error_kind - `Permanent - ~id:"dac_cannot_construct_external_message" - ~title:"External rollup message could not be constructed" - ~description:"External rollup message could not be constructed" - ~pp:(fun ppf () -> - Format.fprintf ppf "External rollup message could not be constructed") - Data_encoding.unit - (function Cannot_construct_external_message -> Some () | _ -> None) - (fun () -> Cannot_construct_external_message) ; - register_error_kind - `Permanent - ~id:"dac_cannot_deserialize_rollup_external_message" - ~title:"External rollup message could not be deserialized" - ~description:"External rollup message could not be deserialized" - ~pp:(fun ppf () -> - Format.fprintf ppf "External rollup message could not be deserialized") - Data_encoding.unit - (function Cannot_deserialize_external_message -> Some () | _ -> None) - (fun () -> Cannot_deserialize_external_message) - let add_service registerer service handler directory = registerer directory service handler @@ -71,93 +45,6 @@ let register_get_health_ready cctxt directory = RPC_services.get_health_ready (fun () () -> RPC_handlers.handle_get_health_ready cctxt) -let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store - hash_streamer (data, pagination_scheme) = - let open Lwt_result_syntax in - let open Pages_encoding in - let* root_hash = - match pagination_scheme with - | Pagination_scheme.Merkle_tree_V0 -> - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897 - Once new "PUT /preimage" endpoint is implemented, pushing - a new root hash to the data streamer should be moved there. - Tezt for testing streaming of root hashes should also use - the new endpoint. *) - let* root_hash = - Merkle_tree.V0.Filesystem.serialize_payload - dac_plugin - ~page_store - data - in - let () = - Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) - in - let*! () = - Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash - in - return root_hash - | Pagination_scheme.Hash_chain_V0 -> - Hash_chain.V0.serialize_payload - dac_plugin - ~for_each_page:(fun (hash, content) -> - Page_store.Filesystem.save dac_plugin page_store ~hash ~content) - data - in - let* signature, witnesses = - Signature_manager.Legacy.sign_root_hash - dac_plugin - cctxt - dac_sk_uris - root_hash - in - let raw_root_hash = Dac_plugin.hash_to_raw root_hash in - let*! external_message = - External_message.Default.make dac_plugin root_hash signature witnesses - in - match external_message with - | Ok external_message -> return @@ (raw_root_hash, external_message) - | Error _ -> tzfail @@ Cannot_construct_external_message - -let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message = - let open Lwt_result_syntax in - let ((module Plugin) : Dac_plugin.t) = dac_plugin in - let external_message = - let open Option_syntax in - let* encoded_l1_message in - let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in - External_message.Default.of_bytes Plugin.encoding as_bytes - in - match external_message with - | None -> tzfail @@ Cannot_deserialize_external_message - | Some {root_hash; signature; witnesses} -> - Signature_manager.verify - dac_plugin - ~public_keys_opt - (Dac_plugin.hash_to_raw root_hash) - signature - witnesses - -(* Handler for subscribing to the streaming of root hashes via - GET monitor/root_hashes RPC call. *) -let handle_monitor_root_hashes hash_streamer = - let open Lwt_syntax in - let stream, stopper = Data_streamer.handle_subscribe hash_streamer in - let shutdown () = Lwt_watcher.shutdown stopper in - let next () = Lwt_stream.get stream in - let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in - Tezos_rpc.Answer.return_stream {next; shutdown} - -let handle_get_certificate dac_plugin node_store raw_root_hash = - let open Lwt_result_syntax in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - - let+ value_opt = Store.Certificate_store.find node_store root_hash in - Option.map - (fun Store.{aggregate_signature; witnesses} -> - Certificate_repr.( - V0 (V0.make raw_root_hash aggregate_signature witnesses))) - value_opt - let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer directory = directory @@ -165,7 +52,7 @@ let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer Tezos_rpc.Directory.register0 RPC_services.V0.post_store_preimage (fun () input -> - handle_post_store_preimage + RPC_handlers.V0.handle_post_store_preimage ctx cctxt dac_sk_uris @@ -179,7 +66,10 @@ let register_get_verify_signature dac_plugin public_keys_opt directory = Tezos_rpc.Directory.register0 RPC_services.V0.get_verify_signature (fun external_message () -> - handle_get_verify_signature dac_plugin public_keys_opt external_message) + RPC_handlers.V0.handle_get_verify_signature + dac_plugin + public_keys_opt + external_message) let register_get_preimage dac_plugin page_store = add_service @@ -195,14 +85,14 @@ let register_monitor_root_hashes hash_streamer dir = Tezos_rpc.Directory.gen_register dir Monitor_services.V0.S.root_hashes - (fun () () () -> handle_monitor_root_hashes hash_streamer) + (fun () () () -> RPC_handlers.V0.handle_monitor_root_hashes hash_streamer) let register_get_certificate node_store dac_plugin = add_service Tezos_rpc.Directory.register1 RPC_services.V0.get_certificate (fun root_hash () () -> - handle_get_certificate dac_plugin node_store root_hash) + RPC_handlers.V0.handle_get_certificate dac_plugin node_store root_hash) let register_get_pages dac_plugin page_store = add_service @@ -215,75 +105,6 @@ let register_get_pages dac_plugin page_store = hash) module Coordinator = struct - let handle_post_preimage dac_plugin page_store hash_streamer payload = - let open Lwt_result_syntax in - let* root_hash = - Pages_encoding.Merkle_tree.V0.Filesystem.serialize_payload - dac_plugin - ~page_store - payload - in - let () = - Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) - in - let*! () = - Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash - in - return @@ Dac_plugin.hash_to_raw root_hash - - let handle_monitor_certificate dac_plugin ro_node_store certificate_streamers - raw_root_hash committee_members = - let open Lwt_result_syntax in - let*? stream, stopper = - Certificate_streamers.handle_subscribe - dac_plugin - certificate_streamers - raw_root_hash - in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let*! () = Event.emit_new_subscription_to_certificate_updates root_hash in - let shutdown () = Lwt_watcher.shutdown stopper in - let next () = Lwt_stream.get stream in - (* Add the current certificate to the streamer, if any, to ensure that - a certificate is returned even in the case that no updates to the - certificate happen for a long time. *) - let*! current_certificate_store_value_res = - Store.Certificate_store.find ro_node_store root_hash - in - match current_certificate_store_value_res with - | Ok current_certificate_store_value -> - let () = - Option.iter - (fun Store.{aggregate_signature; witnesses} -> - let certificate = - Certificate_repr.( - V0 (V0.make raw_root_hash aggregate_signature witnesses)) - in - let _ = - Certificate_streamers.push - dac_plugin - certificate_streamers - raw_root_hash - certificate - in - if - Certificate_repr.all_committee_members_have_signed - committee_members - certificate - then - let _ = - Certificate_streamers.close - dac_plugin - certificate_streamers - raw_root_hash - in - () - else ()) - current_certificate_store_value - in - return (next, shutdown) - | Error e -> fail e - let register_monitor_certificate dac_plugin ro_node_store certificate_streamers committee_members dir = Tezos_rpc.Directory.gen_register @@ -292,7 +113,7 @@ module Coordinator = struct (fun ((), root_hash) () () -> let open Lwt_result_syntax in let*! handler = - handle_monitor_certificate + RPC_handlers.V0.Coordinator.handle_monitor_certificate dac_plugin ro_node_store certificate_streamers @@ -308,7 +129,11 @@ module Coordinator = struct Tezos_rpc.Directory.register0 RPC_services.V0.Coordinator.post_preimage (fun () payload -> - handle_post_preimage dac_plugin page_store hash_streamer payload) + RPC_handlers.V0.Coordinator.handle_post_preimage + dac_plugin + page_store + hash_streamer + payload) let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store = @@ -355,25 +180,17 @@ module Committee_member = struct end module Observer = struct - let handle_get_missing_page timeout cctxts page_store dac_plugin raw_root_hash - = - let open Lwt_result_syntax in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let remote_store = - Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) - in - let* preimage = - Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash - in - let*! () = Event.emit_fetched_missing_page root_hash in - return preimage - let register_get_missing_page dac_plugin page_store cctxts timeout = add_service Tezos_rpc.Directory.register1 RPC_services.V0.get_missing_page (fun root_hash () () -> - handle_get_missing_page timeout cctxts page_store dac_plugin root_hash) + RPC_handlers.V0.Observer.handle_get_missing_page + timeout + cctxts + page_store + dac_plugin + root_hash) let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = Tezos_rpc.Directory.empty -- GitLab From a3b1ff6ce6565408d302510f7a783f07dd5a92bc Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 14:17:59 +0200 Subject: [PATCH 6/7] DAC/RPC_server: Encapsulate `V1` API --- src/lib_dac_node/RPC_server.ml | 70 ++++++++++++++++++++++------------ 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 88ded481fec1..1c0493e2ab37 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -94,16 +94,6 @@ let register_get_certificate node_store dac_plugin = (fun root_hash () () -> RPC_handlers.V0.handle_get_certificate dac_plugin node_store root_hash) -let register_get_pages dac_plugin page_store = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V1.get_pages - (fun hash () () -> - RPC_handlers.Shared_by_V0_and_V1.handle_get_page - dac_plugin - page_store - hash) - module Coordinator = struct let register_monitor_certificate dac_plugin ro_node_store certificate_streamers committee_members dir = @@ -169,14 +159,11 @@ module Coordinator = struct rw_store page_store |> register_get_certificate rw_store dac_plugin - |> register_get_pages dac_plugin page_store end module Committee_member = struct let dynamic_rpc_dir dac_plugin page_store = - Tezos_rpc.Directory.empty - |> register_get_preimage dac_plugin page_store - |> register_get_pages dac_plugin page_store + Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store end module Observer = struct @@ -242,6 +229,33 @@ module Legacy = struct |> register_get_certificate rw_store dac_plugin end +module V1 = struct + let register_get_pages dac_plugin page_store = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V1.get_pages + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page + dac_plugin + page_store + hash) + + module Coordinator = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end + + module Committee_member = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end + + module Observer = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end +end + let start ~rpc_address ~rpc_port node_ctxt = let open Lwt_syntax in let rw_store = Node_context.get_node_store node_ctxt Store_sigs.Read_write in @@ -250,19 +264,25 @@ let start ~rpc_address ~rpc_port node_ctxt = let register_dynamic_rpc dac_plugin = match Node_context.get_mode node_ctxt with | Coordinator coordinator_node_ctxt -> - Coordinator.dynamic_rpc_dir - dac_plugin - rw_store - page_store - coordinator_node_ctxt + Tezos_rpc.Directory.merge + (Coordinator.dynamic_rpc_dir + dac_plugin + rw_store + page_store + coordinator_node_ctxt) + (V1.Coordinator.dynamic_rpc_dir dac_plugin page_store) | Committee_member _committee_member_node_ctxt -> - Committee_member.dynamic_rpc_dir dac_plugin page_store + Tezos_rpc.Directory.merge + (Committee_member.dynamic_rpc_dir dac_plugin page_store) + (V1.Committee_member.dynamic_rpc_dir dac_plugin page_store) | Observer {committee_cctxts; timeout; _} -> - Observer.dynamic_rpc_dir - dac_plugin - committee_cctxts - (Float.of_int timeout) - page_store + Tezos_rpc.Directory.merge + (Observer.dynamic_rpc_dir + dac_plugin + committee_cctxts + (Float.of_int timeout) + page_store) + (V1.Observer.dynamic_rpc_dir dac_plugin page_store) | Legacy legacy_node_ctxt -> Legacy.dynamic_rpc_dir dac_plugin -- GitLab From 3cee5d170724bb9b8327fb3048c26d6e28eb88fd Mon Sep 17 00:00:00 2001 From: Martin Tomazic Date: Thu, 25 May 2023 14:21:50 +0200 Subject: [PATCH 7/7] DAC/RPC_server: Encapsulate `V0` API --- src/lib_dac_node/RPC_server.ml | 347 +++++++++++++++++---------------- 1 file changed, 175 insertions(+), 172 deletions(-) diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 1c0493e2ab37..d07c3bed36aa 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -45,188 +45,191 @@ let register_get_health_ready cctxt directory = RPC_services.get_health_ready (fun () () -> RPC_handlers.handle_get_health_ready cctxt) -let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer - directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.post_store_preimage - (fun () input -> - RPC_handlers.V0.handle_post_store_preimage - ctx - cctxt - dac_sk_uris - page_store - hash_streamer - input) - -let register_get_verify_signature dac_plugin public_keys_opt directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.get_verify_signature - (fun external_message () -> - RPC_handlers.V0.handle_get_verify_signature - dac_plugin - public_keys_opt - external_message) - -let register_get_preimage dac_plugin page_store = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_preimage - (fun hash () () -> - RPC_handlers.Shared_by_V0_and_V1.handle_get_page - dac_plugin - page_store - hash) +module V0 = struct + let register_post_store_preimage ctx cctxt dac_sk_uris page_store + hash_streamer directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.post_store_preimage + (fun () input -> + RPC_handlers.V0.handle_post_store_preimage + ctx + cctxt + dac_sk_uris + page_store + hash_streamer + input) -let register_monitor_root_hashes hash_streamer dir = - Tezos_rpc.Directory.gen_register - dir - Monitor_services.V0.S.root_hashes - (fun () () () -> RPC_handlers.V0.handle_monitor_root_hashes hash_streamer) + let register_get_verify_signature dac_plugin public_keys_opt directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.get_verify_signature + (fun external_message () -> + RPC_handlers.V0.handle_get_verify_signature + dac_plugin + public_keys_opt + external_message) -let register_get_certificate node_store dac_plugin = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_certificate - (fun root_hash () () -> - RPC_handlers.V0.handle_get_certificate dac_plugin node_store root_hash) + let register_get_preimage dac_plugin page_store = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V0.get_preimage + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page + dac_plugin + page_store + hash) -module Coordinator = struct - let register_monitor_certificate dac_plugin ro_node_store - certificate_streamers committee_members dir = + let register_monitor_root_hashes hash_streamer dir = Tezos_rpc.Directory.gen_register dir - Monitor_services.V0.S.certificate - (fun ((), root_hash) () () -> - let open Lwt_result_syntax in - let*! handler = - RPC_handlers.V0.Coordinator.handle_monitor_certificate - dac_plugin - ro_node_store - certificate_streamers - root_hash - committee_members - in - match handler with - | Ok (next, shutdown) -> Tezos_rpc.Answer.return_stream {next; shutdown} - | Error e -> Tezos_rpc.Answer.fail e) + Monitor_services.V0.S.root_hashes + (fun () () () -> RPC_handlers.V0.handle_monitor_root_hashes hash_streamer) - let register_post_preimage dac_plugin hash_streamer page_store = + let register_get_certificate node_store dac_plugin = add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.Coordinator.post_preimage - (fun () payload -> - RPC_handlers.V0.Coordinator.handle_post_preimage - dac_plugin - page_store - hash_streamer - payload) + Tezos_rpc.Directory.register1 + RPC_services.V0.get_certificate + (fun root_hash () () -> + RPC_handlers.V0.handle_get_certificate dac_plugin node_store root_hash) - let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store - = - add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.put_dac_member_signature - (fun () dac_member_signature -> - Signature_manager.Coordinator.handle_put_dac_member_signature - ctx - dac_plugin - rw_node_store - page_store - dac_member_signature) + module Coordinator = struct + let register_monitor_certificate dac_plugin ro_node_store + certificate_streamers committee_members dir = + Tezos_rpc.Directory.gen_register + dir + Monitor_services.V0.S.certificate + (fun ((), root_hash) () () -> + let open Lwt_result_syntax in + let*! handler = + RPC_handlers.V0.Coordinator.handle_monitor_certificate + dac_plugin + ro_node_store + certificate_streamers + root_hash + committee_members + in + match handler with + | Ok (next, shutdown) -> + Tezos_rpc.Answer.return_stream {next; shutdown} + | Error e -> Tezos_rpc.Answer.fail e) - let dynamic_rpc_dir dac_plugin rw_store page_store coordinator_node_ctxt = - let hash_streamer = - coordinator_node_ctxt.Node_context.Coordinator.hash_streamer - in - let certificate_streamers = coordinator_node_ctxt.certificate_streamers in - let committee_members = coordinator_node_ctxt.committee_members in - Tezos_rpc.Directory.empty - |> register_post_preimage dac_plugin hash_streamer page_store - |> register_get_preimage dac_plugin page_store - |> register_monitor_root_hashes hash_streamer - |> register_monitor_certificate - dac_plugin - rw_store - certificate_streamers - committee_members - |> register_put_dac_member_signature - coordinator_node_ctxt - dac_plugin - rw_store - page_store - |> register_get_certificate rw_store dac_plugin -end + let register_post_preimage dac_plugin hash_streamer page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.Coordinator.post_preimage + (fun () payload -> + RPC_handlers.V0.Coordinator.handle_post_preimage + dac_plugin + page_store + hash_streamer + payload) -module Committee_member = struct - let dynamic_rpc_dir dac_plugin page_store = - Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store -end + let register_put_dac_member_signature ctx dac_plugin rw_node_store + page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.put_dac_member_signature + (fun () dac_member_signature -> + Signature_manager.Coordinator.handle_put_dac_member_signature + ctx + dac_plugin + rw_node_store + page_store + dac_member_signature) -module Observer = struct - let register_get_missing_page dac_plugin page_store cctxts timeout = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_missing_page - (fun root_hash () () -> - RPC_handlers.V0.Observer.handle_get_missing_page - timeout - cctxts - page_store - dac_plugin - root_hash) + let dynamic_rpc_dir dac_plugin rw_store page_store coordinator_node_ctxt = + let hash_streamer = + coordinator_node_ctxt.Node_context.Coordinator.hash_streamer + in + let certificate_streamers = coordinator_node_ctxt.certificate_streamers in + let committee_members = coordinator_node_ctxt.committee_members in + Tezos_rpc.Directory.empty + |> register_post_preimage dac_plugin hash_streamer page_store + |> register_get_preimage dac_plugin page_store + |> register_monitor_root_hashes hash_streamer + |> register_monitor_certificate + dac_plugin + rw_store + certificate_streamers + committee_members + |> register_put_dac_member_signature + coordinator_node_ctxt + dac_plugin + rw_store + page_store + |> register_get_certificate rw_store dac_plugin + end - let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = - Tezos_rpc.Directory.empty - |> register_get_preimage dac_plugin page_store - |> register_get_missing_page - dac_plugin - page_store - committee_member_cctxts - timeout -end + module Committee_member = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store + end -module Legacy = struct - let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store - = - add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.put_dac_member_signature - (fun () dac_member_signature -> - Signature_manager.Legacy.handle_put_dac_member_signature - ctx - dac_plugin - rw_node_store - page_store - dac_member_signature) + module Observer = struct + let register_get_missing_page dac_plugin page_store cctxts timeout = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V0.get_missing_page + (fun root_hash () () -> + RPC_handlers.V0.Observer.handle_get_missing_page + timeout + cctxts + page_store + dac_plugin + root_hash) - let dynamic_rpc_dir dac_plugin rw_store page_store cctxt legacy_node_ctxt = - let hash_streamer = legacy_node_ctxt.Node_context.Legacy.hash_streamer in - let public_keys_opt = - Node_context.Legacy.public_keys_opt legacy_node_ctxt - in - let secret_key_uris_opt = - Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt - in - Tezos_rpc.Directory.empty - |> register_post_store_preimage - dac_plugin - cctxt - secret_key_uris_opt - page_store - hash_streamer - |> register_get_verify_signature dac_plugin public_keys_opt - |> register_get_preimage dac_plugin page_store - |> register_monitor_root_hashes hash_streamer - |> register_put_dac_member_signature - legacy_node_ctxt - dac_plugin - rw_store - page_store - |> register_get_certificate rw_store dac_plugin + let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = + Tezos_rpc.Directory.empty + |> register_get_preimage dac_plugin page_store + |> register_get_missing_page + dac_plugin + page_store + committee_member_cctxts + timeout + end + + module Legacy = struct + let register_put_dac_member_signature ctx dac_plugin rw_node_store + page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.put_dac_member_signature + (fun () dac_member_signature -> + Signature_manager.Legacy.handle_put_dac_member_signature + ctx + dac_plugin + rw_node_store + page_store + dac_member_signature) + + let dynamic_rpc_dir dac_plugin rw_store page_store cctxt legacy_node_ctxt = + let hash_streamer = legacy_node_ctxt.Node_context.Legacy.hash_streamer in + let public_keys_opt = + Node_context.Legacy.public_keys_opt legacy_node_ctxt + in + let secret_key_uris_opt = + Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt + in + Tezos_rpc.Directory.empty + |> register_post_store_preimage + dac_plugin + cctxt + secret_key_uris_opt + page_store + hash_streamer + |> register_get_verify_signature dac_plugin public_keys_opt + |> register_get_preimage dac_plugin page_store + |> register_monitor_root_hashes hash_streamer + |> register_put_dac_member_signature + legacy_node_ctxt + dac_plugin + rw_store + page_store + |> register_get_certificate rw_store dac_plugin + end end module V1 = struct @@ -265,7 +268,7 @@ let start ~rpc_address ~rpc_port node_ctxt = match Node_context.get_mode node_ctxt with | Coordinator coordinator_node_ctxt -> Tezos_rpc.Directory.merge - (Coordinator.dynamic_rpc_dir + (V0.Coordinator.dynamic_rpc_dir dac_plugin rw_store page_store @@ -273,18 +276,18 @@ let start ~rpc_address ~rpc_port node_ctxt = (V1.Coordinator.dynamic_rpc_dir dac_plugin page_store) | Committee_member _committee_member_node_ctxt -> Tezos_rpc.Directory.merge - (Committee_member.dynamic_rpc_dir dac_plugin page_store) + (V0.Committee_member.dynamic_rpc_dir dac_plugin page_store) (V1.Committee_member.dynamic_rpc_dir dac_plugin page_store) | Observer {committee_cctxts; timeout; _} -> Tezos_rpc.Directory.merge - (Observer.dynamic_rpc_dir + (V0.Observer.dynamic_rpc_dir dac_plugin committee_cctxts (Float.of_int timeout) page_store) (V1.Observer.dynamic_rpc_dir dac_plugin page_store) | Legacy legacy_node_ctxt -> - Legacy.dynamic_rpc_dir + V0.Legacy.dynamic_rpc_dir dac_plugin rw_store page_store -- GitLab