diff --git a/src/lib_dac_node/RPC_handlers.ml b/src/lib_dac_node/RPC_handlers.ml new file mode 100644 index 0000000000000000000000000000000000000000..db36003baa9ffd3e7a0a57cbcd9b1491906d69f8 --- /dev/null +++ b/src/lib_dac_node/RPC_handlers.ml @@ -0,0 +1,258 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Trili Tech *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +type error += + | DAC_node_not_ready of string + | Cannot_construct_external_message + | Cannot_deserialize_external_message + +let () = + register_error_kind + `Permanent + ~id:"dac_node_not_ready" + ~title:"DAC Node is not ready" + ~description: + "RPC server of DAC node is not started and plugin is not resolved." + ~pp:(fun ppf message -> + Format.fprintf ppf "DAC Node is not ready, current status is: %s" message) + Data_encoding.(obj1 (req "value" string)) + (function DAC_node_not_ready message -> Some message | _ -> None) + (fun message -> DAC_node_not_ready message) ; + register_error_kind + `Permanent + ~id:"dac_cannot_construct_external_message" + ~title:"External rollup message could not be constructed" + ~description:"External rollup message could not be constructed" + ~pp:(fun ppf () -> + Format.fprintf ppf "External rollup message could not be constructed") + Data_encoding.unit + (function Cannot_construct_external_message -> Some () | _ -> None) + (fun () -> Cannot_construct_external_message) ; + register_error_kind + `Permanent + ~id:"dac_cannot_deserialize_rollup_external_message" + ~title:"External rollup message could not be deserialized" + ~description:"External rollup message could not be deserialized" + ~pp:(fun ppf () -> + Format.fprintf ppf "External rollup message could not be deserialized") + Data_encoding.unit + (function Cannot_deserialize_external_message -> Some () | _ -> None) + (fun () -> Cannot_deserialize_external_message) + +let handle_get_health_live node_ctxt = + match Node_context.get_status node_ctxt with + | Ready _ | Starting -> Lwt_result_syntax.return true + +let handle_get_health_ready node_ctxt = + match Node_context.get_status node_ctxt with + | Ready _ -> Lwt_result_syntax.return true + | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting" + +module Shared_by_V0_and_V1 = struct + (** [handle_get_page] is a handler shared by both "GET v0/preimage" + and "GET v1/pages". It fetches a page that corresponds + to a given [raw_hash]. *) + let handle_get_page dac_plugin page_store raw_hash = + let open Lwt_result_syntax in + let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in + Page_store.Filesystem.load dac_plugin page_store hash +end + +module V0 = struct + let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store + hash_streamer (data, pagination_scheme) = + let open Lwt_result_syntax in + let open Pages_encoding in + let* root_hash = + match pagination_scheme with + | Pagination_scheme.Merkle_tree_V0 -> + (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897 + Once new "PUT /preimage" endpoint is implemented, pushing + a new root hash to the data streamer should be moved there. + Tezt for testing streaming of root hashes should also use + the new endpoint. *) + let* root_hash = + Merkle_tree.V0.Filesystem.serialize_payload + dac_plugin + ~page_store + data + in + let () = + Data_streamer.publish + hash_streamer + (Dac_plugin.hash_to_raw root_hash) + in + let*! () = + Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash + in + return root_hash + | Pagination_scheme.Hash_chain_V0 -> + Hash_chain.V0.serialize_payload + dac_plugin + ~for_each_page:(fun (hash, content) -> + Page_store.Filesystem.save dac_plugin page_store ~hash ~content) + data + in + let* signature, witnesses = + Signature_manager.Legacy.sign_root_hash + dac_plugin + cctxt + dac_sk_uris + root_hash + in + let raw_root_hash = Dac_plugin.hash_to_raw root_hash in + let*! external_message = + External_message.Default.make dac_plugin root_hash signature witnesses + in + match external_message with + | Ok external_message -> return @@ (raw_root_hash, external_message) + | Error _ -> tzfail @@ Cannot_construct_external_message + + let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message + = + let open Lwt_result_syntax in + let ((module Plugin) : Dac_plugin.t) = dac_plugin in + let external_message = + let open Option_syntax in + let* encoded_l1_message in + let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in + External_message.Default.of_bytes Plugin.encoding as_bytes + in + match external_message with + | None -> tzfail @@ Cannot_deserialize_external_message + | Some {root_hash; signature; witnesses} -> + Signature_manager.verify + dac_plugin + ~public_keys_opt + (Dac_plugin.hash_to_raw root_hash) + signature + witnesses + + let handle_monitor_root_hashes hash_streamer = + let open Lwt_syntax in + let stream, stopper = Data_streamer.handle_subscribe hash_streamer in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in + Tezos_rpc.Answer.return_stream {next; shutdown} + + let handle_get_certificate dac_plugin node_store raw_root_hash = + let open Lwt_result_syntax in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + + let+ value_opt = Store.Certificate_store.find node_store root_hash in + Option.map + (fun Store.{aggregate_signature; witnesses} -> + Certificate_repr.( + V0 (V0.make raw_root_hash aggregate_signature witnesses))) + value_opt + + module Coordinator = struct + let handle_post_preimage dac_plugin page_store hash_streamer payload = + let open Lwt_result_syntax in + let* root_hash = + Pages_encoding.Merkle_tree.V0.Filesystem.serialize_payload + dac_plugin + ~page_store + payload + in + let () = + Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) + in + let*! () = + Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash + in + return @@ Dac_plugin.hash_to_raw root_hash + + let handle_monitor_certificate dac_plugin ro_node_store + certificate_streamers raw_root_hash committee_members = + let open Lwt_result_syntax in + let*? stream, stopper = + Certificate_streamers.handle_subscribe + dac_plugin + certificate_streamers + raw_root_hash + in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + let*! () = Event.emit_new_subscription_to_certificate_updates root_hash in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = Lwt_stream.get stream in + (* Add the current certificate to the streamer, if any, to ensure that + a certificate is returned even in the case that no updates to the + certificate happen for a long time. *) + let*! current_certificate_store_value_res = + Store.Certificate_store.find ro_node_store root_hash + in + match current_certificate_store_value_res with + | Ok current_certificate_store_value -> + let () = + Option.iter + (fun Store.{aggregate_signature; witnesses} -> + let certificate = + Certificate_repr.( + V0 (V0.make raw_root_hash aggregate_signature witnesses)) + in + let _ = + Certificate_streamers.push + dac_plugin + certificate_streamers + raw_root_hash + certificate + in + if + Certificate_repr.all_committee_members_have_signed + committee_members + certificate + then + let _ = + Certificate_streamers.close + dac_plugin + certificate_streamers + raw_root_hash + in + () + else ()) + current_certificate_store_value + in + return (next, shutdown) + | Error e -> fail e + end + + module Observer = struct + let handle_get_missing_page timeout cctxts page_store dac_plugin + raw_root_hash = + let open Lwt_result_syntax in + let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in + let remote_store = + Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) + in + let* preimage = + Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash + in + let*! () = Event.emit_fetched_missing_page root_hash in + return preimage + end +end diff --git a/src/lib_dac_node/RPC_handlers.mli b/src/lib_dac_node/RPC_handlers.mli new file mode 100644 index 0000000000000000000000000000000000000000..ece545af4db87c85c7057e7501a3088e6bb488a5 --- /dev/null +++ b/src/lib_dac_node/RPC_handlers.mli @@ -0,0 +1,122 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Trili Tech *) +(* Copyright (c) 2023 Marigold *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(** [RPC_handlers] is a module with handlers for DAC API endpoints. *) + +type error += + | DAC_node_not_ready of string + | Cannot_construct_external_message + | Cannot_deserialize_external_message + +(** [handle_get_health_live] is a handler for "GET dac/health/live". *) +val handle_get_health_live : Node_context.t -> (bool, error trace) result Lwt.t + +(** [handle_get_health_ready] is a handler for "GET dac/health/ready". *) +val handle_get_health_ready : Node_context.t -> (bool, error trace) result Lwt.t + +(** [Shared_by_V0_and_V1] encapsulates handlers that are common to + both [V0] and [V1] API. *) +module Shared_by_V0_and_V1 : sig + (** [handle_get_page] is a handler shared by both "GET v0/preimage" + and "GET v1/pages". It fetches a page that corresponds + to a given [raw_hash]. *) + val handle_get_page : + Dac_plugin.t -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash -> + (bytes, tztrace) result Lwt.t +end + +(** [V0] encapsulates handlers specific to [V0] API. *) +module V0 : sig + (** [handle_post_store_preimage] is a handler for "POST v0/store_preimage". *) + val handle_post_store_preimage : + Dac_plugin.t -> + #Client_context.wallet -> + Client_keys.aggregate_sk_uri option trace -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash Data_streamer.t -> + bytes * Pagination_scheme.t -> + (Dac_plugin.raw_hash * bytes, tztrace) result Lwt.t + + (** [handle_get_verify_signature] is a handler for "GET v0/verify_signature". *) + val handle_get_verify_signature : + Dac_plugin.t -> + Tezos_crypto.Aggregate_signature.public_key option trace -> + string option -> + (bool, error trace) result Lwt.t + + (** [handle_monitor_root_hashes] is a handler for subscribing to the + streaming of root hashes via "GET v0/monitor/root_hashes" RPC call. *) + val handle_monitor_root_hashes : + Dac_plugin.raw_hash Data_streamer.t -> + Dac_plugin.raw_hash Tezos_rpc__RPC_answer.t Lwt.t + + (** [handle_get_certificate] is a handler for "GET v0/certificate". *) + val handle_get_certificate : + Dac_plugin.t -> + [> `Read] Store.Irmin_store.t -> + Dac_plugin.raw_hash -> + (Certificate_repr.t option, tztrace) result Lwt.t + + (** [Coordinator] encapsulates, Coordinator's mode specific handlers + of [V0] API. *) + module Coordinator : sig + (** [handle_post_preimage] is a handler for "PUT v0/preimage". *) + val handle_post_preimage : + Dac_plugin.t -> + Page_store.Filesystem.t -> + Dac_plugin.raw_hash Data_streamer.t -> + bytes -> + (Dac_plugin.raw_hash, tztrace) result Lwt.t + + (** [handle_monitor_certificate] is a handler for subscribing to the stream + of certificate updates via "GET v0/monitor/certificate" RPC call. *) + val handle_monitor_certificate : + Dac_plugin.t -> + [> `Read] Store.Irmin_store.t -> + Certificate_streamers.t -> + Dac_plugin.raw_hash -> + 'a trace -> + ( (unit -> Certificate_repr.t option Lwt.t) * (unit -> unit), + tztrace ) + result + Lwt.t + end + + (** [Observer] encapsulates, Observer's mode specific handlers + of [V0] API. *) + module Observer : sig + (** [handle_get_missing_page] is a handler for "GET v0/missing_page". *) + val handle_get_missing_page : + float -> + Dac_node_client.cctxt trace -> + Page_store.Filesystem.t -> + Dac_plugin.t -> + Dac_plugin.raw_hash -> + (bytes, tztrace) result Lwt.t + end +end diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 8bcc33a66cd74d8613264239fabfb23a26aa703e..d07c3bed36aa4484c4566b76e91aac711c558aa1 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -28,419 +28,235 @@ open Tezos_rpc_http open Tezos_rpc_http_server -type error += - | Cannot_construct_external_message - | Cannot_deserialize_external_message - | DAC_node_not_ready of string - -let () = - register_error_kind - `Permanent - ~id:"dac_cannot_construct_external_message" - ~title:"External rollup message could not be constructed" - ~description:"External rollup message could not be constructed" - ~pp:(fun ppf () -> - Format.fprintf ppf "External rollup message could not be constructed") - Data_encoding.unit - (function Cannot_construct_external_message -> Some () | _ -> None) - (fun () -> Cannot_construct_external_message) ; - register_error_kind - `Permanent - ~id:"dac_cannot_deserialize_rollup_external_message" - ~title:"External rollup message could not be deserialized" - ~description:"External rollup message could not be deserialized" - ~pp:(fun ppf () -> - Format.fprintf ppf "External rollup message could not be deserialized") - Data_encoding.unit - (function Cannot_deserialize_external_message -> Some () | _ -> None) - (fun () -> Cannot_deserialize_external_message) ; - register_error_kind - `Permanent - ~id:"dac_node_not_ready" - ~title:"DAC Node is not ready" - ~description: - "RPC server of DAC node is not started and plugin is not resolved." - ~pp:(fun ppf message -> - Format.fprintf ppf "DAC Node is not ready, current status is: %s" message) - Data_encoding.(obj1 (req "value" string)) - (function DAC_node_not_ready message -> Some message | _ -> None) - (fun message -> DAC_node_not_ready message) - let add_service registerer service handler directory = registerer directory service handler -let handle_get_health_live node_ctxt = - match Node_context.get_status node_ctxt with - | Ready _ | Starting -> Lwt_result_syntax.return true - -let handle_get_health_ready node_ctxt = - match Node_context.get_status node_ctxt with - | Ready _ -> Lwt_result_syntax.return true - | Starting -> Lwt_result_syntax.tzfail @@ DAC_node_not_ready "starting" - -let handle_post_store_preimage dac_plugin cctxt dac_sk_uris page_store - hash_streamer (data, pagination_scheme) = - let open Lwt_result_syntax in - let open Pages_encoding in - let* root_hash = - match pagination_scheme with - | Pagination_scheme.Merkle_tree_V0 -> - (* FIXME: https://gitlab.com/tezos/tezos/-/issues/4897 - Once new "PUT /preimage" endpoint is implemented, pushing - a new root hash to the data streamer should be moved there. - Tezt for testing streaming of root hashes should also use - the new endpoint. *) - let* root_hash = - Merkle_tree.V0.Filesystem.serialize_payload - dac_plugin - ~page_store - data - in - let () = - Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) - in - let*! () = - Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash - in - return root_hash - | Pagination_scheme.Hash_chain_V0 -> - Hash_chain.V0.serialize_payload - dac_plugin - ~for_each_page:(fun (hash, content) -> - Page_store.Filesystem.save dac_plugin page_store ~hash ~content) - data - in - let* signature, witnesses = - Signature_manager.Legacy.sign_root_hash - dac_plugin - cctxt - dac_sk_uris - root_hash - in - let raw_root_hash = Dac_plugin.hash_to_raw root_hash in - let*! external_message = - External_message.Default.make dac_plugin root_hash signature witnesses - in - match external_message with - | Ok external_message -> return @@ (raw_root_hash, external_message) - | Error _ -> tzfail @@ Cannot_construct_external_message - -let handle_get_verify_signature dac_plugin public_keys_opt encoded_l1_message = - let open Lwt_result_syntax in - let ((module Plugin) : Dac_plugin.t) = dac_plugin in - let external_message = - let open Option_syntax in - let* encoded_l1_message in - let* as_bytes = Hex.to_bytes @@ `Hex encoded_l1_message in - External_message.Default.of_bytes Plugin.encoding as_bytes - in - match external_message with - | None -> tzfail @@ Cannot_deserialize_external_message - | Some {root_hash; signature; witnesses} -> - Signature_manager.verify - dac_plugin - ~public_keys_opt - (Dac_plugin.hash_to_raw root_hash) - signature - witnesses - -(** [handle_get_preimage] is shared by both [V0] and [V1] API. *) -let handle_get_preimage dac_plugin page_store raw_hash = - let open Lwt_result_syntax in - let*? hash = Dac_plugin.raw_to_hash dac_plugin raw_hash in - Page_store.Filesystem.load dac_plugin page_store hash - -(* Handler for subscribing to the streaming of root hashes via - GET monitor/root_hashes RPC call. *) -let handle_monitor_root_hashes hash_streamer = - let open Lwt_syntax in - let stream, stopper = Data_streamer.handle_subscribe hash_streamer in - let shutdown () = Lwt_watcher.shutdown stopper in - let next () = Lwt_stream.get stream in - let* () = Event.(emit handle_new_subscription_to_hash_streamer ()) in - Tezos_rpc.Answer.return_stream {next; shutdown} - -let handle_get_certificate dac_plugin node_store raw_root_hash = - let open Lwt_result_syntax in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - - let+ value_opt = Store.Certificate_store.find node_store root_hash in - Option.map - (fun Store.{aggregate_signature; witnesses} -> - Certificate_repr.( - V0 (V0.make raw_root_hash aggregate_signature witnesses))) - value_opt - -let handle_get_missing_page timeout cctxts page_store dac_plugin raw_root_hash = - let open Lwt_result_syntax in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let remote_store = - Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) - in - let* preimage = - Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash - in - let*! () = Event.emit_fetched_missing_page root_hash in - return preimage - let register_get_health_live cctxt directory = directory |> add_service Tezos_rpc.Directory.register0 RPC_services.get_health_live - (fun () () -> handle_get_health_live cctxt) + (fun () () -> RPC_handlers.handle_get_health_live cctxt) let register_get_health_ready cctxt directory = directory |> add_service Tezos_rpc.Directory.register0 RPC_services.get_health_ready - (fun () () -> handle_get_health_ready cctxt) - -let register_post_store_preimage ctx cctxt dac_sk_uris page_store hash_streamer - directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.post_store_preimage - (fun () input -> - handle_post_store_preimage - ctx - cctxt - dac_sk_uris - page_store - hash_streamer - input) - -let register_get_verify_signature dac_plugin public_keys_opt directory = - directory - |> add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.get_verify_signature - (fun external_message () -> - handle_get_verify_signature dac_plugin public_keys_opt external_message) - -let register_get_preimage dac_plugin page_store = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_preimage - (fun hash () () -> handle_get_preimage dac_plugin page_store hash) - -let register_monitor_root_hashes hash_streamer dir = - Tezos_rpc.Directory.gen_register - dir - Monitor_services.V0.S.root_hashes - (fun () () () -> handle_monitor_root_hashes hash_streamer) - -let register_get_certificate node_store dac_plugin = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_certificate - (fun root_hash () () -> - handle_get_certificate dac_plugin node_store root_hash) - -let register_get_missing_page dac_plugin page_store cctxts timeout = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V0.get_missing_page - (fun root_hash () () -> - handle_get_missing_page timeout cctxts page_store dac_plugin root_hash) - -let register_get_pages dac_plugin page_store = - add_service - Tezos_rpc.Directory.register1 - RPC_services.V1.get_pages - (fun hash () () -> handle_get_preimage dac_plugin page_store hash) - -module Coordinator = struct - let handle_post_preimage dac_plugin page_store hash_streamer payload = - let open Lwt_result_syntax in - let* root_hash = - Pages_encoding.Merkle_tree.V0.Filesystem.serialize_payload - dac_plugin - ~page_store - payload - in - let () = - Data_streamer.publish hash_streamer (Dac_plugin.hash_to_raw root_hash) - in - let*! () = - Event.emit_root_hash_pushed_to_data_streamer dac_plugin root_hash - in - return @@ Dac_plugin.hash_to_raw root_hash - - let handle_monitor_certificate dac_plugin ro_node_store certificate_streamers - raw_root_hash committee_members = - let open Lwt_result_syntax in - let*? stream, stopper = - Certificate_streamers.handle_subscribe - dac_plugin - certificate_streamers - raw_root_hash - in - let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let*! () = Event.emit_new_subscription_to_certificate_updates root_hash in - let shutdown () = Lwt_watcher.shutdown stopper in - let next () = Lwt_stream.get stream in - (* Add the current certificate to the streamer, if any, to ensure that - a certificate is returned even in the case that no updates to the - certificate happen for a long time. *) - let*! current_certificate_store_value_res = - Store.Certificate_store.find ro_node_store root_hash - in - match current_certificate_store_value_res with - | Ok current_certificate_store_value -> - let () = - Option.iter - (fun Store.{aggregate_signature; witnesses} -> - let certificate = - Certificate_repr.( - V0 (V0.make raw_root_hash aggregate_signature witnesses)) - in - let _ = - Certificate_streamers.push - dac_plugin - certificate_streamers - raw_root_hash - certificate - in - if - Certificate_repr.all_committee_members_have_signed - committee_members - certificate - then - let _ = - Certificate_streamers.close - dac_plugin - certificate_streamers - raw_root_hash - in - () - else ()) - current_certificate_store_value - in - return (next, shutdown) - | Error e -> fail e + (fun () () -> RPC_handlers.handle_get_health_ready cctxt) + +module V0 = struct + let register_post_store_preimage ctx cctxt dac_sk_uris page_store + hash_streamer directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.post_store_preimage + (fun () input -> + RPC_handlers.V0.handle_post_store_preimage + ctx + cctxt + dac_sk_uris + page_store + hash_streamer + input) + + let register_get_verify_signature dac_plugin public_keys_opt directory = + directory + |> add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.get_verify_signature + (fun external_message () -> + RPC_handlers.V0.handle_get_verify_signature + dac_plugin + public_keys_opt + external_message) + + let register_get_preimage dac_plugin page_store = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V0.get_preimage + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page + dac_plugin + page_store + hash) - let register_monitor_certificate dac_plugin ro_node_store - certificate_streamers committee_members dir = + let register_monitor_root_hashes hash_streamer dir = Tezos_rpc.Directory.gen_register dir - Monitor_services.V0.S.certificate - (fun ((), root_hash) () () -> - let open Lwt_result_syntax in - let*! handler = - handle_monitor_certificate - dac_plugin - ro_node_store - certificate_streamers - root_hash - committee_members - in - match handler with - | Ok (next, shutdown) -> Tezos_rpc.Answer.return_stream {next; shutdown} - | Error e -> Tezos_rpc.Answer.fail e) - - let register_post_preimage dac_plugin hash_streamer page_store = - add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.Coordinator.post_preimage - (fun () payload -> - handle_post_preimage dac_plugin page_store hash_streamer payload) + Monitor_services.V0.S.root_hashes + (fun () () () -> RPC_handlers.V0.handle_monitor_root_hashes hash_streamer) - let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store - = + let register_get_certificate node_store dac_plugin = add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.put_dac_member_signature - (fun () dac_member_signature -> - Signature_manager.Coordinator.handle_put_dac_member_signature - ctx - dac_plugin - rw_node_store - page_store - dac_member_signature) - - let dynamic_rpc_dir dac_plugin rw_store page_store coordinator_node_ctxt = - let hash_streamer = - coordinator_node_ctxt.Node_context.Coordinator.hash_streamer - in - let certificate_streamers = coordinator_node_ctxt.certificate_streamers in - let committee_members = coordinator_node_ctxt.committee_members in - Tezos_rpc.Directory.empty - |> register_post_preimage dac_plugin hash_streamer page_store - |> register_get_preimage dac_plugin page_store - |> register_monitor_root_hashes hash_streamer - |> register_monitor_certificate - dac_plugin - rw_store - certificate_streamers - committee_members - |> register_put_dac_member_signature - coordinator_node_ctxt - dac_plugin - rw_store - page_store - |> register_get_certificate rw_store dac_plugin - |> register_get_pages dac_plugin page_store -end + Tezos_rpc.Directory.register1 + RPC_services.V0.get_certificate + (fun root_hash () () -> + RPC_handlers.V0.handle_get_certificate dac_plugin node_store root_hash) + + module Coordinator = struct + let register_monitor_certificate dac_plugin ro_node_store + certificate_streamers committee_members dir = + Tezos_rpc.Directory.gen_register + dir + Monitor_services.V0.S.certificate + (fun ((), root_hash) () () -> + let open Lwt_result_syntax in + let*! handler = + RPC_handlers.V0.Coordinator.handle_monitor_certificate + dac_plugin + ro_node_store + certificate_streamers + root_hash + committee_members + in + match handler with + | Ok (next, shutdown) -> + Tezos_rpc.Answer.return_stream {next; shutdown} + | Error e -> Tezos_rpc.Answer.fail e) + + let register_post_preimage dac_plugin hash_streamer page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.Coordinator.post_preimage + (fun () payload -> + RPC_handlers.V0.Coordinator.handle_post_preimage + dac_plugin + page_store + hash_streamer + payload) + + let register_put_dac_member_signature ctx dac_plugin rw_node_store + page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.put_dac_member_signature + (fun () dac_member_signature -> + Signature_manager.Coordinator.handle_put_dac_member_signature + ctx + dac_plugin + rw_node_store + page_store + dac_member_signature) -module Committee_member = struct - let dynamic_rpc_dir dac_plugin page_store = - Tezos_rpc.Directory.empty - |> register_get_preimage dac_plugin page_store - |> register_get_pages dac_plugin page_store -end + let dynamic_rpc_dir dac_plugin rw_store page_store coordinator_node_ctxt = + let hash_streamer = + coordinator_node_ctxt.Node_context.Coordinator.hash_streamer + in + let certificate_streamers = coordinator_node_ctxt.certificate_streamers in + let committee_members = coordinator_node_ctxt.committee_members in + Tezos_rpc.Directory.empty + |> register_post_preimage dac_plugin hash_streamer page_store + |> register_get_preimage dac_plugin page_store + |> register_monitor_root_hashes hash_streamer + |> register_monitor_certificate + dac_plugin + rw_store + certificate_streamers + committee_members + |> register_put_dac_member_signature + coordinator_node_ctxt + dac_plugin + rw_store + page_store + |> register_get_certificate rw_store dac_plugin + end + + module Committee_member = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store + end + + module Observer = struct + let register_get_missing_page dac_plugin page_store cctxts timeout = + add_service + Tezos_rpc.Directory.register1 + RPC_services.V0.get_missing_page + (fun root_hash () () -> + RPC_handlers.V0.Observer.handle_get_missing_page + timeout + cctxts + page_store + dac_plugin + root_hash) -module Observer = struct - let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = - Tezos_rpc.Directory.empty - |> register_get_preimage dac_plugin page_store - |> register_get_missing_page - dac_plugin - page_store - committee_member_cctxts - timeout + let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = + Tezos_rpc.Directory.empty + |> register_get_preimage dac_plugin page_store + |> register_get_missing_page + dac_plugin + page_store + committee_member_cctxts + timeout + end + + module Legacy = struct + let register_put_dac_member_signature ctx dac_plugin rw_node_store + page_store = + add_service + Tezos_rpc.Directory.register0 + RPC_services.V0.put_dac_member_signature + (fun () dac_member_signature -> + Signature_manager.Legacy.handle_put_dac_member_signature + ctx + dac_plugin + rw_node_store + page_store + dac_member_signature) + + let dynamic_rpc_dir dac_plugin rw_store page_store cctxt legacy_node_ctxt = + let hash_streamer = legacy_node_ctxt.Node_context.Legacy.hash_streamer in + let public_keys_opt = + Node_context.Legacy.public_keys_opt legacy_node_ctxt + in + let secret_key_uris_opt = + Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt + in + Tezos_rpc.Directory.empty + |> register_post_store_preimage + dac_plugin + cctxt + secret_key_uris_opt + page_store + hash_streamer + |> register_get_verify_signature dac_plugin public_keys_opt + |> register_get_preimage dac_plugin page_store + |> register_monitor_root_hashes hash_streamer + |> register_put_dac_member_signature + legacy_node_ctxt + dac_plugin + rw_store + page_store + |> register_get_certificate rw_store dac_plugin + end end -module Legacy = struct - let register_put_dac_member_signature ctx dac_plugin rw_node_store page_store - = +module V1 = struct + let register_get_pages dac_plugin page_store = add_service - Tezos_rpc.Directory.register0 - RPC_services.V0.put_dac_member_signature - (fun () dac_member_signature -> - Signature_manager.Legacy.handle_put_dac_member_signature - ctx + Tezos_rpc.Directory.register1 + RPC_services.V1.get_pages + (fun hash () () -> + RPC_handlers.Shared_by_V0_and_V1.handle_get_page dac_plugin - rw_node_store page_store - dac_member_signature) - - let dynamic_rpc_dir dac_plugin rw_store page_store cctxt legacy_node_ctxt = - let hash_streamer = legacy_node_ctxt.Node_context.Legacy.hash_streamer in - let public_keys_opt = - Node_context.Legacy.public_keys_opt legacy_node_ctxt - in - let secret_key_uris_opt = - Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt - in - Tezos_rpc.Directory.empty - |> register_post_store_preimage - dac_plugin - cctxt - secret_key_uris_opt - page_store - hash_streamer - |> register_get_verify_signature dac_plugin public_keys_opt - |> register_get_preimage dac_plugin page_store - |> register_monitor_root_hashes hash_streamer - |> register_put_dac_member_signature - legacy_node_ctxt - dac_plugin - rw_store - page_store - |> register_get_certificate rw_store dac_plugin + hash) + + module Coordinator = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end + + module Committee_member = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end + + module Observer = struct + let dynamic_rpc_dir dac_plugin page_store = + Tezos_rpc.Directory.empty |> register_get_pages dac_plugin page_store + end end let start ~rpc_address ~rpc_port node_ctxt = @@ -451,21 +267,27 @@ let start ~rpc_address ~rpc_port node_ctxt = let register_dynamic_rpc dac_plugin = match Node_context.get_mode node_ctxt with | Coordinator coordinator_node_ctxt -> - Coordinator.dynamic_rpc_dir - dac_plugin - rw_store - page_store - coordinator_node_ctxt + Tezos_rpc.Directory.merge + (V0.Coordinator.dynamic_rpc_dir + dac_plugin + rw_store + page_store + coordinator_node_ctxt) + (V1.Coordinator.dynamic_rpc_dir dac_plugin page_store) | Committee_member _committee_member_node_ctxt -> - Committee_member.dynamic_rpc_dir dac_plugin page_store + Tezos_rpc.Directory.merge + (V0.Committee_member.dynamic_rpc_dir dac_plugin page_store) + (V1.Committee_member.dynamic_rpc_dir dac_plugin page_store) | Observer {committee_cctxts; timeout; _} -> - Observer.dynamic_rpc_dir - dac_plugin - committee_cctxts - (Float.of_int timeout) - page_store + Tezos_rpc.Directory.merge + (V0.Observer.dynamic_rpc_dir + dac_plugin + committee_cctxts + (Float.of_int timeout) + page_store) + (V1.Observer.dynamic_rpc_dir dac_plugin page_store) | Legacy legacy_node_ctxt -> - Legacy.dynamic_rpc_dir + V0.Legacy.dynamic_rpc_dir dac_plugin rw_store page_store