diff --git a/src/bin_dac_node/main_dac.ml b/src/bin_dac_node/main_dac.ml index d0d71228bf2f1924d496f97997686c4bb3c3e7c7..b3bab30ae79cf0ab07632018c907cd7a38f36cca 100644 --- a/src/bin_dac_node/main_dac.ml +++ b/src/bin_dac_node/main_dac.ml @@ -113,6 +113,13 @@ let positive_int_parameter = in if i < 0 then tzfail @@ Invalid_positive_int_parameter p else return i) +let timeout ~doc = + Tezos_clic.arg + ~long:"timeout" + ~placeholder:"timeout" + ~doc + positive_int_parameter + let threshold_param ?(name = "DAC threshold parameter") ?(desc = "Number of DAC member signatures required to validate a root page hash") = @@ -280,12 +287,22 @@ module Config_init = struct command ~group ~desc:"Configure DAC node in observer mode." - (args4 data_dir_arg rpc_address_arg rpc_port_arg reveal_data_dir_arg) + (args5 + data_dir_arg + rpc_address_arg + rpc_port_arg + reveal_data_dir_arg + (timeout + ~doc: + (Format.sprintf + "The timeout in seconds for requesting a missing page from \ + Committee Member. Defaults to %i seconds." + Configuration.Observer.default_timeout))) (prefixes ["configure"; "as"; "observer"; "with"; "coordinator"] @@ coordinator_rpc_param @@ prefixes ["and"; "committee"; "member"; "rpc"; "addresses"] @@ seq_of_param @@ committee_rpc_addresses_param) - (fun (data_dir, rpc_address, rpc_port, reveal_data_dir) + (fun (data_dir, rpc_address, rpc_port, reveal_data_dir, timeout) (coordinator_rpc_address, coordinator_rpc_port) committee_rpc_addresses cctxt -> @@ -296,6 +313,7 @@ module Config_init = struct ~rpc_port (Configuration.make_observer ~committee_rpc_addresses + ?timeout coordinator_rpc_address coordinator_rpc_port) cctxt) diff --git a/src/lib_dac_node/RPC_server.ml b/src/lib_dac_node/RPC_server.ml index 10023c2be93ca86d82e2f8be860dfd343c24f18d..8bcc33a66cd74d8613264239fabfb23a26aa703e 100644 --- a/src/lib_dac_node/RPC_server.ml +++ b/src/lib_dac_node/RPC_server.ml @@ -171,16 +171,16 @@ let handle_get_certificate dac_plugin node_store raw_root_hash = V0 (V0.make raw_root_hash aggregate_signature witnesses))) value_opt -let handle_get_missing_page cctxt page_store dac_plugin raw_root_hash = +let handle_get_missing_page timeout cctxts page_store dac_plugin raw_root_hash = let open Lwt_result_syntax in let*? root_hash = Dac_plugin.raw_to_hash dac_plugin raw_root_hash in - let remote_store = Page_store.Remote.(init {cctxt; page_store}) in + let remote_store = + Page_store.Remote_with_flooding.(init {timeout; cctxts; page_store}) + in let* preimage = - (* TODO: https://gitlab.com/tezos/tezos/-/issues/5142 - Retrieve missing page from dac committee via "flooding". *) - Page_store.Remote.load dac_plugin remote_store root_hash + Page_store.Remote_with_flooding.load dac_plugin remote_store root_hash in - let*! () = Event.(emit fetched_missing_page raw_root_hash) in + let*! () = Event.emit_fetched_missing_page root_hash in return preimage let register_get_health_live cctxt directory = @@ -239,12 +239,12 @@ let register_get_certificate node_store dac_plugin = (fun root_hash () () -> handle_get_certificate dac_plugin node_store root_hash) -let register_get_missing_page dac_plugin page_store cctxt = +let register_get_missing_page dac_plugin page_store cctxts timeout = add_service Tezos_rpc.Directory.register1 RPC_services.V0.get_missing_page (fun root_hash () () -> - handle_get_missing_page cctxt page_store dac_plugin root_hash) + handle_get_missing_page timeout cctxts page_store dac_plugin root_hash) let register_get_pages dac_plugin page_store = add_service @@ -393,11 +393,14 @@ module Committee_member = struct end module Observer = struct - let dynamic_rpc_dir dac_plugin coordinator_cctxt page_store = + let dynamic_rpc_dir dac_plugin committee_member_cctxts timeout page_store = Tezos_rpc.Directory.empty |> register_get_preimage dac_plugin page_store - |> register_get_missing_page dac_plugin page_store coordinator_cctxt - |> register_get_pages dac_plugin page_store + |> register_get_missing_page + dac_plugin + page_store + committee_member_cctxts + timeout end module Legacy = struct @@ -422,13 +425,6 @@ module Legacy = struct let secret_key_uris_opt = Node_context.Legacy.secret_key_uris_opt legacy_node_ctxt in - let register_get_missing_page = - match legacy_node_ctxt.coordinator_cctxt with - | None -> fun dir -> dir - | Some cctxt -> - fun dir -> - dir |> register_get_missing_page dac_plugin page_store cctxt - in Tezos_rpc.Directory.empty |> register_post_store_preimage dac_plugin @@ -445,7 +441,6 @@ module Legacy = struct rw_store page_store |> register_get_certificate rw_store dac_plugin - |> register_get_missing_page end let start ~rpc_address ~rpc_port node_ctxt = @@ -463,8 +458,12 @@ let start ~rpc_address ~rpc_port node_ctxt = coordinator_node_ctxt | Committee_member _committee_member_node_ctxt -> Committee_member.dynamic_rpc_dir dac_plugin page_store - | Observer {coordinator_cctxt; _} -> - Observer.dynamic_rpc_dir dac_plugin coordinator_cctxt page_store + | Observer {committee_cctxts; timeout; _} -> + Observer.dynamic_rpc_dir + dac_plugin + committee_cctxts + (Float.of_int timeout) + page_store | Legacy legacy_node_ctxt -> Legacy.dynamic_rpc_dir dac_plugin diff --git a/src/lib_dac_node/configuration.ml b/src/lib_dac_node/configuration.ml index 911681b2aa24fc519e071e5362bdc12ffda306d6..5673df1b075f9a58e33bbeb227eb98bca9017011 100644 --- a/src/lib_dac_node/configuration.ml +++ b/src/lib_dac_node/configuration.ml @@ -101,11 +101,19 @@ module Observer = struct coordinator_rpc_address : string; coordinator_rpc_port : int; committee_rpc_addresses : (string * int) list; + timeout : int; } - let make ~committee_rpc_addresses coordinator_rpc_address coordinator_rpc_port - = - {coordinator_rpc_address; coordinator_rpc_port; committee_rpc_addresses} + let default_timeout = 6 + + let make ~committee_rpc_addresses ?(timeout = default_timeout) + coordinator_rpc_address coordinator_rpc_port = + { + coordinator_rpc_address; + timeout; + coordinator_rpc_port; + committee_rpc_addresses; + } let encoding = Data_encoding.( @@ -114,25 +122,30 @@ module Observer = struct coordinator_rpc_address; coordinator_rpc_port; committee_rpc_addresses; + timeout; } -> ( coordinator_rpc_address, coordinator_rpc_port, - committee_rpc_addresses )) + committee_rpc_addresses, + timeout )) (fun ( coordinator_rpc_address, coordinator_rpc_port, - committee_rpc_addresses ) -> + committee_rpc_addresses, + timeout ) -> { coordinator_rpc_address; coordinator_rpc_port; committee_rpc_addresses; + timeout; }) - (obj3 + (obj4 (req "coordinator_rpc_address" string) (req "coordinator_rpc_port" uint16) (req "committee_rpc_addresses" (Data_encoding.list - (obj2 (req "rpc_address" string) (req "rpc_port" uint16)))))) + (obj2 (req "rpc_address" string) (req "rpc_port" uint16)))) + (req "timeout" Data_encoding.uint8))) let name = "Observer" end @@ -225,11 +238,12 @@ let make_committee_member coordinator_rpc_address coordinator_rpc_port coordinator_rpc_port committee_member_address) -let make_observer ~committee_rpc_addresses coordinator_rpc_address +let make_observer ~committee_rpc_addresses ?timeout coordinator_rpc_address coordinator_rpc_port = Observer (Observer.make ~committee_rpc_addresses + ?timeout coordinator_rpc_address coordinator_rpc_port) diff --git a/src/lib_dac_node/configuration.mli b/src/lib_dac_node/configuration.mli index 320090c1c4b35aa1038a90b14e9afd969e9c92ef..0d177948dc5545d66f9f050cfb202b7e29aff3f6 100644 --- a/src/lib_dac_node/configuration.mli +++ b/src/lib_dac_node/configuration.mli @@ -59,7 +59,11 @@ module Observer : sig coordinator_rpc_address : string; coordinator_rpc_port : int; committee_rpc_addresses : (string * int) list; + timeout : int; } + + (** Default timeout for fetching a missing page from Committee members. *) + val default_timeout : int end (** Legacy specific configuration. *) @@ -131,7 +135,11 @@ val make_committee_member : endpoints to [committee_endpoints] and Coordinator endpoint to [(coordinator_rpc_address * coordinator_rpc_port)] as the coordinator. *) val make_observer : - committee_rpc_addresses:(string * int) list -> string -> int -> mode + committee_rpc_addresses:(string * int) list -> + ?timeout:int -> + string -> + int -> + mode (** [make_legacy ?coordinator_host_and_port threshold committee_members_addresses] diff --git a/src/lib_dac_node/event.ml b/src/lib_dac_node/event.ml index 402e4a5c6ce703c12e8fda321c35a08c83f6a651..7814bf54186415d26c2c59e52809bea7f36a79c4 100644 --- a/src/lib_dac_node/event.ml +++ b/src/lib_dac_node/event.ml @@ -281,6 +281,14 @@ let cannot_retrieve_keys_from_address = ~level:Notice ("address", Tezos_crypto.Aggregate_signature.Public_key_hash.encoding) +let fetched_missing_page = + declare_1 + ~section + ~name:"missing_page_fetched" + ~msg:"Successfully fetched missing page for hash: {hash}" + ~level:Notice + ("hash", Data_encoding.(string' Hex)) + let proto_short_hash_string hash = Format.asprintf "%a" Protocol_hash.pp_short hash @@ -321,10 +329,6 @@ let emit_rpc_started = emit rpc_server_started let emit_cannot_retrieve_keys_from_address address = emit cannot_retrieve_keys_from_address address -let fetched_missing_page = - declare_1 - ~section - ~name:"missing_page_fetched" - ~msg:"Successfully fetched missing page for hash: {hash}" - ~level:Notice - ("hash", Dac_plugin.raw_hash_encoding) +let emit_fetched_missing_page root_hash = + let (`Hex root_hash) = Dac_plugin.hash_to_hex root_hash in + emit fetched_missing_page root_hash diff --git a/src/lib_dac_node/node_context.ml b/src/lib_dac_node/node_context.ml index 53ce7e7930b9ee4508099c912a883c95b3ef33dc..9f40e7e710d94b32339b1d624a30fa0ffc697ea2 100644 --- a/src/lib_dac_node/node_context.ml +++ b/src/lib_dac_node/node_context.ml @@ -99,6 +99,7 @@ module Observer = struct type t = { coordinator_cctxt : Dac_node_client.cctxt; committee_cctxts : Dac_node_client.cctxt list; + timeout : int; } let init observer_config = @@ -108,6 +109,7 @@ module Observer = struct coordinator_rpc_address; coordinator_rpc_port; committee_rpc_addresses; + timeout; } = observer_config in @@ -123,7 +125,7 @@ module Observer = struct Dac_node_client.make_unix_cctxt ~scheme:"http" ~host ~port) committee_rpc_addresses in - return {coordinator_cctxt; committee_cctxts} + return {coordinator_cctxt; committee_cctxts; timeout} end module Legacy = struct diff --git a/src/lib_dac_node/node_context.mli b/src/lib_dac_node/node_context.mli index aaacfe4f5a4249848402b23340dee2020e5bddab..a98480aa0396e9bdd1b69fc8313594fc8be24bd9 100644 --- a/src/lib_dac_node/node_context.mli +++ b/src/lib_dac_node/node_context.mli @@ -83,6 +83,8 @@ module Observer : sig committee_cctxts : Dac_node_client.cctxt list; (** The list of [Dac_node_client.cctxt] used by the [Observer] node to send requests to each [Committee_member] node respectively. *) + timeout : int; + (** Timeout in seconds for fetching a missing page from [Committee_Member]. *) } end diff --git a/src/lib_dac_node/page_store.ml b/src/lib_dac_node/page_store.ml index 679b450a72d555cf8a6ccd0111a28be6aa8ebbe1..4f47f84a4f65d908012a434401bee5de7771dccf 100644 --- a/src/lib_dac_node/page_store.ml +++ b/src/lib_dac_node/page_store.ml @@ -33,6 +33,9 @@ type error += expected : Dac_plugin.raw_hash; actual : Dac_plugin.raw_hash; } + | Cannot_fetch_remote_page_with_flooding_strategy of { + hash : Dac_plugin.raw_hash; + } let () = register_error_kind @@ -112,7 +115,26 @@ let () = (function | Incorrect_page_hash {expected; actual} -> Some (expected, actual) | _ -> None) - (fun (expected, actual) -> Incorrect_page_hash {expected; actual}) + (fun (expected, actual) -> Incorrect_page_hash {expected; actual}) ; + + register_error_kind + `Permanent + ~id:"remote_with_flooding_load_failed" + ~title:"Cannot load page content from remote store using flooding strategy" + ~description: + "Cannot load page content from remote store using flooding strategy." + ~pp:(fun ppf hash -> + Format.fprintf + ppf + "Cannot load page content from remote store using flooding strategy \ + for hash %a." + Dac_plugin.pp_raw_hash + hash) + Data_encoding.(obj1 (req "hash" Dac_plugin.raw_hash_encoding)) + (function + | Cannot_fetch_remote_page_with_flooding_strategy {hash} -> Some hash + | _ -> None) + (fun hash -> Cannot_fetch_remote_page_with_flooding_strategy {hash}) let ensure_reveal_data_dir_exists reveal_data_dir = let open Lwt_result_syntax in @@ -289,6 +311,66 @@ module Remote : S with type configuration = remote_configuration = struct let init {cctxt; page_store} = Internal.init (cctxt, page_store) end +type remote_with_flooding_configuration = { + timeout : float; + cctxts : Dac_node_client.cctxt list; + page_store : Filesystem.t; +} + +module Remote_with_flooding : + S with type configuration = remote_with_flooding_configuration = struct + module F = Filesystem_with_integrity_check + + module Internal : + S + with type configuration = + (float * Dac_node_client.cctxt list) * Filesystem.t + and type t = (float * Dac_node_client.cctxt list) * Filesystem.t = + With_remote_fetch + (struct + type remote_context = float * Dac_node_client.cctxt list + + (** TODO: https://gitlab.com/tezos/tezos/-/issues/5673 + Optimize flooding. + *) + let fetch _dac_plugin (timeout, remote_cctxts) hash = + let open Lwt_result_syntax in + let page_hash = Dac_plugin.hash_to_raw hash in + let fetch_page cctxt = + Lwt_unix.with_timeout timeout (fun () -> + Dac_node_client.V0.get_preimage cctxt ~page_hash) + in + let*! results = + List.filter_map_p + (fun committee_member_cctxt -> + Lwt.catch + (fun () -> + let*! page_data = fetch_page committee_member_cctxt in + match page_data with + | Ok page_data -> Lwt.return (Some page_data) + | Error _ -> Lwt.return_none) + (fun _ -> Lwt.return_none)) + remote_cctxts + in + match List.hd results with + | Some a -> return a + | None -> + tzfail + (Cannot_fetch_remote_page_with_flooding_strategy + {hash = Dac_plugin.hash_to_raw hash}) + end) + (F) + + include Internal + + type t = Internal.t + + type configuration = remote_with_flooding_configuration + + let init {timeout; cctxts; page_store} = + Internal.init ((timeout, cctxts), page_store) +end + module Internal_for_tests = struct module With_data_integrity_check (P : S) : S with type configuration = P.configuration and type t = P.t = diff --git a/src/lib_dac_node/page_store.mli b/src/lib_dac_node/page_store.mli index da379d62de43fd751393d01472f3e31381062dec..1d2d67266a9751ed335dac29c78c26f67628e081 100644 --- a/src/lib_dac_node/page_store.mli +++ b/src/lib_dac_node/page_store.mli @@ -33,6 +33,9 @@ type error += expected : Dac_plugin.raw_hash; actual : Dac_plugin.raw_hash; } + | Cannot_fetch_remote_page_with_flooding_strategy of { + hash : Dac_plugin.raw_hash; + } (** [S] is the module type defining the backend required for persisting DAC pages data onto the page storage. *) @@ -95,6 +98,26 @@ type remote_configuration = { saved locally. *) module Remote : S with type configuration = remote_configuration +type remote_with_flooding_configuration = { + timeout : float; + cctxts : Dac_node_client.cctxt list; + page_store : Filesystem.t; +} + +(** A [Page_store] implementation backed by the local filesystem, which + uses connections to a collection of Dac nodes to retrieve pages that are not + saved locally. It retrieves pages by sending the same page request to each remote + node and saves only 1 copy of the page locally. + + It is typically used by the Observer node to retrieve missing pages from the DAC + committee members. + *) + +(** TODO: https://gitlab.com/tezos/tezos/-/issues/5672 + Merge Remote_with_flooding into Remote *) +module Remote_with_flooding : + S with type configuration = remote_with_flooding_configuration + (**/**) module Internal_for_tests : sig diff --git a/tezt/lib_tezos/dac_node.ml b/tezt/lib_tezos/dac_node.ml index 8e27c5fd8e5253b492a08f0c0195f7405623c7cd..77765c6fefed07454e50a43b8fa37c11a6e09e3f 100644 --- a/tezt/lib_tezos/dac_node.ml +++ b/tezt/lib_tezos/dac_node.ml @@ -43,6 +43,7 @@ module Parameters = struct coordinator_rpc_host : string; coordinator_rpc_port : int; committee_member_rpcs : (string * int) list; + timeout : int option; } type mode_settings = @@ -114,6 +115,8 @@ let spawn_command dac_node = let raw_rpc (host, port) = Printf.sprintf "%s:%d" host port +let localhost = "127.0.0.1" + let spawn_config_init dac_node = let arg_command = [ @@ -177,7 +180,17 @@ let spawn_config_init dac_node = committee_member_params.address; ] | Observer - {coordinator_rpc_host; coordinator_rpc_port; committee_member_rpcs} -> + { + coordinator_rpc_host; + coordinator_rpc_port; + committee_member_rpcs; + timeout; + } -> + let with_timeout = + match timeout with + | Some t -> ["--timeout"; Int.to_string t] + | None -> [] + in let coordinator_host = coordinator_rpc_host ^ ":" ^ Int.to_string coordinator_rpc_port in @@ -195,7 +208,7 @@ let spawn_config_init dac_node = "rpc"; "addresses"; ] - @ committee_member_rpcs + @ committee_member_rpcs @ with_timeout in spawn_command dac_node (mode_command @ arg_command) @@ -347,8 +360,8 @@ let create_coordinator ?(path = Constant.dac_node) ?name ?color ?data_dir () let create_committee_member ?(path = Constant.dac_node) ?name ?color ?data_dir - ?event_pipe ?(rpc_host = "127.0.0.1") ?rpc_port ?reveal_data_dir - ?(coordinator_rpc_host = "127.0.0.1") ?coordinator_rpc_port ~address ~node + ?event_pipe ?(rpc_host = localhost) ?rpc_port ?reveal_data_dir + ?(coordinator_rpc_host = localhost) ?coordinator_rpc_port ~address ~node ~client () = let coordinator_rpc_port = match coordinator_rpc_port with None -> Port.fresh () | Some port -> port @@ -371,14 +384,20 @@ let create_committee_member ?(path = Constant.dac_node) ?name ?color ?data_dir () let create_observer ?(path = Constant.dac_node) ?name ?color ?data_dir - ?event_pipe ?(rpc_host = "127.0.0.1") ?rpc_port ?reveal_data_dir - ?(coordinator_rpc_host = "127.0.0.1") ?coordinator_rpc_port + ?event_pipe ?(rpc_host = localhost) ?rpc_port ?reveal_data_dir + ?(coordinator_rpc_host = localhost) ?coordinator_rpc_port ?timeout ~committee_member_rpcs ~node ~client () = let coordinator_rpc_port = match coordinator_rpc_port with None -> Port.fresh () | Some port -> port in let mode = - Observer {coordinator_rpc_host; coordinator_rpc_port; committee_member_rpcs} + Observer + { + coordinator_rpc_host; + coordinator_rpc_port; + committee_member_rpcs; + timeout; + } in create ~path @@ -426,3 +445,33 @@ let run ?(wait_ready = true) ?env node = let* () = run ?env node in let* () = if wait_ready then wait_for_ready node else Lwt.return_unit in return () + +let with_sleeping_node ?rpc_port ?(rpc_address = localhost) ~timeout f = + let make_host str = + match Ipaddr.of_string str with + | Ok (Ipaddr.V4 addr) -> Ipaddr.v6_of_v4 addr + | Ok (V6 addr) -> addr + | Error (`Msg _) -> + raise + (Invalid_argument + "Invalid rpc_address when initializing sleeping_node.") + in + let open Cohttp_lwt_unix in + let callback _conn _req _body = + let* _ = Lwt_unix.sleep timeout in + Server.respond_string ~status:`OK ~body:"ok" () + in + let stop, resolver = Lwt.task () in + let stopper () = Lwt.wakeup_later resolver () in + let rpc_port = Option.value rpc_port ~default:(Port.fresh ()) in + let port = `Port rpc_port in + let host = Ipaddr.V6.to_string (make_host rpc_address) in + let server = Server.make ~callback () in + let _ = + let* ctx = Conduit_lwt_unix.init ~src:host () in + let ctx = Cohttp_lwt_unix.Net.init ~ctx () in + Server.create ~ctx ~stop ~mode:(`TCP port) server + in + Lwt.finalize + (fun () -> f (rpc_address, rpc_port)) + (fun () -> return (stopper ())) diff --git a/tezt/lib_tezos/dac_node.mli b/tezt/lib_tezos/dac_node.mli index 7925dbdcef8e2d983e95d3bdaa8501d193c77889..27096e25cd45a225bf5c9bc989e6f6c50d07ee61 100644 --- a/tezt/lib_tezos/dac_node.mli +++ b/tezt/lib_tezos/dac_node.mli @@ -98,6 +98,7 @@ val create_observer : ?reveal_data_dir:string -> ?coordinator_rpc_host:string -> ?coordinator_rpc_port:int -> + ?timeout:int -> committee_member_rpcs:(string * int) list -> node:Node.t -> client:Client.t -> @@ -177,3 +178,13 @@ module Config_file : sig running, it needs to be restarted manually. *) val update : t -> (JSON.t -> JSON.t) -> unit end + +(** [with_sleeping_node] creates and runs an embedded node that sleeps for [timeout] + seconds upon receiving any request then returns "ok". It is used to test + timeout capabilities of clients. *) +val with_sleeping_node : + ?rpc_port:int -> + ?rpc_address:string -> + timeout:float -> + (string * int -> unit Lwt.t) -> + unit Lwt.t diff --git a/tezt/tests/dac.ml b/tezt/tests/dac.ml index fbbe642ee5667fd81019f19092d44f3cb09eadf5..dbf11be05ade93c56357777470729d079303fb17 100644 --- a/tezt/tests/dac.ml +++ b/tezt/tests/dac.ml @@ -921,50 +921,6 @@ module Legacy = struct let* () = fetch_root_hash_promise in check_downloaded_preimage coordinator observer expected_rh - (* 1. Observer should fetch missing page from Coordinator when GET /missing_page/{hash} - is called. - 2. As a side effect, Observer should save fetched page into its page store before - returning it in the response. This can be observed by checking the result of - retrieving preimage before and after the GET /missing_page/{hash} call.*) - let test_observer_get_missing_page _protocol node client coordinator threshold - _committee_members = - let root_hash = - "00649d431e829f4adc68edecb8d8d8071154b57086cc124b465f6f6600a4bc91c7" - in - let root_hash_stream_promise = - wait_for_root_hash_pushed_to_data_streamer coordinator root_hash - in - let* hex_root_hash = - init_hex_root_hash ~payload:"test payload abc 123" coordinator - in - - assert (root_hash = Hex.show hex_root_hash) ; - let* () = root_hash_stream_promise in - let observer = - Dac_node.create_legacy ~threshold ~committee_members:[] ~node ~client () - in - let* _ = Dac_node.init_config observer in - let () = set_coordinator observer coordinator in - let* () = Dac_node.run observer in - let* () = - assert_lwt_failure - ~__LOC__ - "Expected retrieve_preimage" - (RPC.call observer (Dac_rpc.V0.get_preimage (Hex.show hex_root_hash))) - in - let* missing_page = - RPC.call observer (Dac_rpc.V0.get_missing_page ~hex_root_hash) - in - let* coordinator_page = - RPC.call coordinator (Dac_rpc.V0.get_preimage (Hex.show hex_root_hash)) - in - check_preimage coordinator_page missing_page ; - let* observer_preimage = - RPC.call observer (Dac_rpc.V0.get_preimage (Hex.show hex_root_hash)) - in - check_preimage coordinator_page observer_preimage ; - unit - module Signature_manager = struct let test_non_committee_signer_should_fail tz_client (coordinator_node, hex_root_hash, _dac_committee) = @@ -1656,8 +1612,159 @@ module Full_infrastructure = struct one previously output by the client. *) check_raw_certificate get_certificate last_certificate_update ; return () + + (* 1. Observer should fetch missing page from Committee Members when GET /missing_page/{hash} + is called. + 2. As a side effect, Observer should save fetched page into its page store before + returning it in the response. This can be observed by checking the result of + retrieving preimage before and after the GET /missing_page/{hash} call.*) + let test_observer_get_missing_page scenario = + let Scenarios.{coordinator_node; committee_members_nodes; observer_nodes; _} + = + scenario + in + Check.( + (List.length observer_nodes = 1) + ~__LOC__ + int + ~error_msg:"Expected number of Observers to be 1.") ; + let observer_node = List.hd observer_nodes in + let payload, expected_rh = sample_payload "preimage" in + (* Initialize Committee Member nodes and wait for each node to subscribe to Coordinator's + root hash stream. *) + let* _ = + Lwt_list.iter_s + (fun committee_member_node -> + let* _ = Dac_node.init_config committee_member_node in + let wait_subscribe = + wait_for_handle_new_subscription_to_hash_streamer coordinator_node + in + let* () = Dac_node.run ~wait_ready:true committee_member_node in + let* _ = wait_subscribe in + return ()) + committee_members_nodes + in + + let committee_members_processed_rh_promise = + List.map + (fun committee_member_node -> + wait_for_received_root_hash_processed + committee_member_node + expected_rh) + committee_members_nodes + in + + (* Post payload with Observer offline so that Observer will not receive root hash. Wait + for root hash to finish processing by each committee member before continuing. *) + let* root_hash = + RPC.call coordinator_node (Dac_rpc.V0.Coordinator.post_preimage ~payload) + in + assert (root_hash = expected_rh) ; + let hex_root_hash = `Hex root_hash in + let* _ = Lwt.join committee_members_processed_rh_promise in + + (* Initialize Observer node and wait for each node to subscribe to hash streamer. Assert + that each node starts cold (without preimage of root_hash in disk) ie. get_preimage of + Observer node should fail. + *) + let* _ = + let* _ = Dac_node.init_config observer_node in + let wait_subscribe = + wait_for_handle_new_subscription_to_hash_streamer coordinator_node + in + let* _ = Dac_node.run ~wait_ready:true observer_node in + let* _ = wait_subscribe in + assert_lwt_failure + ~__LOC__ + "Expected get_preimage to fail." + (RPC.call + observer_node + (Dac_rpc.V0.get_preimage (Hex.show hex_root_hash))) + in + + (* Get missing page then assert that the retrieved missing page from Observer is the + same as the get_preimage page from Coordinator. *) + let* missing_page = + RPC.call observer_node (Dac_rpc.V0.get_missing_page ~hex_root_hash) + in + let* coordinator_page = + RPC.call coordinator_node (Dac_rpc.V0.get_preimage root_hash) + in + check_preimage coordinator_page missing_page ; + + (* Now, Observer get_preimage should pass. This means get_missing_page + saved the missing page on disk as a side effect.*) + let* observer_preimage = + RPC.call observer_node (Dac_rpc.V0.get_preimage (Hex.show hex_root_hash)) + in + check_preimage coordinator_page observer_preimage ; + unit end +let test_observer_times_out_when_page_cannot_be_fetched _protocol node client + _key = + with_coordinator_node ~name:"coordinator" ~committee_members:[] node client + @@ fun coordinator_node _ -> + (* Root hash that will never be found. *) + let missing_root_hash = + "00a3703854279d2f377d689163d1ec911a840d84b56c4c6f6cafdf0610394df7c6" + in + let sleeping_node_timeout = 20. in + (* Fake our commmitee member connection to point to the sleeping node. *) + Dac_node.with_sleeping_node ~timeout:sleeping_node_timeout @@ fun rpc -> + let committee_member_rpcs = [rpc] in + let observer_timeout = 4 in + let wait_connected_to_coordinator = + wait_for_handle_new_subscription_to_hash_streamer coordinator_node + in + let observer_node = + Dac_node.create_observer + ~name:"observer" + ~coordinator_rpc_port:(Dac_node.rpc_port coordinator_node) + ~coordinator_rpc_host:(Dac_node.rpc_host coordinator_node) + ~timeout:observer_timeout + ~committee_member_rpcs + ~node + ~client + () + in + let* _dir = Dac_node.init_config observer_node in + let* () = Dac_node.run observer_node ~wait_ready:true in + let* () = wait_connected_to_coordinator in + (* Capture the time it takes for the endpoint to fail. It should fail after + [observer_timeout] seconds but before [sleeping_node_timeout] seconds *) + let start_time = ref 0. in + let end_time = ref 0. in + let* _ = + Lwt.catch + (fun () -> + start_time := Unix.gettimeofday () ; + Lwt.map + (fun _ -> ()) + (RPC.call + observer_node + (Dac_rpc.V0.get_missing_page + ~hex_root_hash:(`Hex missing_root_hash)))) + (fun _ -> + end_time := Unix.gettimeofday () ; + Lwt.return_unit) + in + let request_time = Float.sub !end_time !start_time in + let observer_timeout = Float.of_int observer_timeout in + let result = + request_time >= observer_timeout && request_time < sleeping_node_timeout + in + Check.( + (result = true) + bool + ~__LOC__ + ~error_msg: + (Printf.sprintf + "Expected timeout after %f seconds but less than %f seconds." + observer_timeout + sleeping_node_timeout)) ; + unit + (* Modified from tezt/tests/tx_sc_rollup.ml *) module Tx_kernel_e2e = struct open Sc_rollup_helpers @@ -2740,13 +2847,19 @@ let register ~protocols = "dac_coordinator_post_preimage_endpoint" Full_infrastructure.test_coordinator_post_preimage_endpoint protocols ; - scenario_with_layer1_and_legacy_dac_nodes + scenario_with_full_dac_infrastructure ~__FILE__ - ~threshold:0 - ~committee_size:1 + ~observers:1 + ~committee_size:2 ~tags:["dac"; "dac_node"] "dac_observer_get_missing_page" - Legacy.test_observer_get_missing_page + Full_infrastructure.test_observer_get_missing_page + protocols ; + scenario_with_layer1_node + ~__FILE__ + ~tags:["dac"; "dac_node"] + "dac_observer_times_out_when_page_cannot_be_fetched" + test_observer_times_out_when_page_cannot_be_fetched protocols ; scenario_with_full_dac_infrastructure ~__FILE__