diff --git a/etherlink/bin_node/lib_dev/events.ml b/etherlink/bin_node/lib_dev/events.ml index c8636142d9ff4868f605821f2d5711b27501f34e..cf4e74e04841b0f54bc88337db8b5e63590cbbca 100644 --- a/etherlink/bin_node/lib_dev/events.ml +++ b/etherlink/bin_node/lib_dev/events.ml @@ -247,6 +247,16 @@ let missing_chain_id = ~msg:"missing chain id: skipping consistency check with selected network" () +let missing_block = + Internal_event.Simple.declare_1 + ~level:Warning + ~section + ~name:"missing_block" + ~msg: + "missing block: received event saying the block for {level} would be \ + available, but reading it from storage failed" + ("level", Data_encoding.int32) + let importing_snapshot = Internal_event.Simple.declare_1 ~level:Notice @@ -523,6 +533,8 @@ let wasm_pvm_fallback () = emit wasm_pvm_fallback () let missing_chain_id () = emit missing_chain_id () +let missing_block level = emit missing_block level + let multichain_node_singlechain_kernel () = emit multichain_node_singlechain_kernel () diff --git a/etherlink/bin_node/lib_dev/events.mli b/etherlink/bin_node/lib_dev/events.mli index 3089a956074365809a3c411168c3dc75f4dd3708..7860277f3025a791b40a5af9fca2a28f33b5e950 100644 --- a/etherlink/bin_node/lib_dev/events.mli +++ b/etherlink/bin_node/lib_dev/events.mli @@ -144,6 +144,9 @@ val wasm_pvm_fallback : unit -> unit Lwt.t consistency of the stored chain id with the selected network. *) val missing_chain_id : unit -> unit Lwt.t +(** [missing_block level] advertises that the node could not find the block in level despite having received an event saying it would be available. *) +val missing_block : int32 -> unit Lwt.t + (** [multichain_node_singlechain_kernel ()] warns that the node was configured to be executed in a multichain environment, but was given a kernel for a single chain environment. *) diff --git a/etherlink/bin_node/lib_dev/tezlink/dune b/etherlink/bin_node/lib_dev/tezlink/dune index af9f1515af5f72d4186ec88252ab802926cf96fd..2341c8040a6a99154a1f812c9e51569d5e356340 100644 --- a/etherlink/bin_node/lib_dev/tezlink/dune +++ b/etherlink/bin_node/lib_dev/tezlink/dune @@ -11,7 +11,8 @@ tezos-protocol-022-PsRiotum.parameters octez-libs.base octez-shell-libs.shell-services - octez-libs.version) + octez-libs.version + lwt-watcher) (flags (:standard) -open Evm_node_lib_dev_encoding diff --git a/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml b/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml index 7e9f2174dad9a81c3c608a70e9046b3ec257b766..f862f82aa4811f561f945a3261a9fbd7b5e18d41 100644 --- a/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml @@ -35,4 +35,7 @@ module type S = sig val block_hash : [`Main] -> block_param -> Ethereum_types.block_hash option tzresult Lwt.t + + val monitor_heads : + [> `Main] -> 'a -> L2_types.Tezos_block.t Lwt_stream.t * Lwt_watcher.stopper end diff --git a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml index 7ea27c1668145fc23bd251496185a271f686274b..cdd4d226e64836dfb71f257a4b32f7b1ce293984 100644 --- a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml +++ b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml @@ -372,6 +372,17 @@ module Imported_services = struct Block_hash.t * Time.Protocol.t ) Constants_services.RPC_service.t = import_service Tezos_shell_services.Monitor_services.S.bootstrapped + + let monitor_heads : + ( [`GET], + unit, + unit * chain, + < protocols : Protocol_hash.t list + ; next_protocols : Protocol_hash.t list >, + unit, + Block_hash.t * Block_header.t ) + Tezos_rpc.Service.t = + Tezos_shell_services.Monitor_services.S.heads end let chain_directory_path = Tezos_shell_services.Chain_services.path @@ -518,6 +529,41 @@ let register_chain_services ~l2_chain_id base_dir (Tezos_rpc.Directory.prefix chain_directory_path dir) +(** Builds the directory registering the service at `/monitor/heads/`. *) +let register_monitor_heads (module Backend : Tezlink_backend_sig.S) dir = + Tezos_rpc.Directory.gen_register + dir + Imported_services.monitor_heads + (fun ((), chain) query () -> + let stream, stopper = Backend.monitor_heads chain query in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = + let open Lwt_syntax in + let* block_opt = Lwt_stream.get stream in + match block_opt with + | None -> return_none + | Some block -> ( + match + ( Protocol_types.Block_header.tezlink_block_to_shell_header block, + Protocol_types.ethereum_to_tezos_block_hash block.hash ) + with + | Ok shell, Ok hash -> + return_some + ( hash, + ({ + shell; + protocol_data = + Data_encoding.Binary.to_bytes_exn + Imported_protocol.Block_header_repr + .protocol_data_encoding + Mock.protocol_data; + } + : Block_header.t) ) + | Error _, _ | _, Error _ -> return_none) + in + + Tezos_rpc.Answer.return_stream {next; shutdown}) + (** Builds the root directory. *) let build_dir ~l2_chain_id backend = let (module Backend : Tezlink_backend_sig.S) = backend in @@ -531,6 +577,7 @@ let build_dir ~l2_chain_id backend = let open Result_syntax in let* hash = Protocol_types.ethereum_to_tezos_block_hash input_hash in return (hash, input_time)) + |> register_monitor_heads backend |> register ~service:Imported_services.version ~impl:(fun () () () -> version ()) diff --git a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml index cfb147d50a24a82cfc617db0cd26f37b6e7973f6..669cf93c7f868699c1b74cb3d956acb4cf273d3a 100644 --- a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml +++ b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml @@ -137,6 +137,47 @@ module Make (Backend : Backend) : Tezlink_backend_sig.S = struct let* block_number = shell_block_param_to_block_number block in Backend.tez_nth_block (Z.of_int32 block_number) + let monitor_heads chain query = + (* TODO: #7831 + take chain into account + For the moment this implementation only supports the main chain, once + the rpc support of tezlink is more stable, we can add support for other chains *) + ignore (chain, query) ; + + let blueprint_stream, stopper = Broadcast.create_blueprint_stream () in + + let retry_delays_ms = [0.; 50.; 100.; 500.] in + + (* Convert blueprint notifications into full blocks, giving the store a + short grace period if the block is not yet written. + Note that this delay does not correspond to the time between blueprint production + and block application, but rather from the time the Database says the data has been + written in the storage and the moment it actually becomes available to be read. *) + let rec fetch_block level = + let open Lwt_syntax in + function + | [] -> + (* After all retries failed, emit warning event. *) + let* () = Events.missing_block @@ Z.to_int32 level in + return_none + | delay_ms :: rest -> ( + let* () = Lwt_unix.sleep (delay_ms /. 1000.) in + let* block_result = Backend.tez_nth_block level in + match block_result with + | Ok block -> return_some block + | Error _ -> fetch_block level rest) + in + + let block_stream = + Lwt_stream.filter_map_s + (fun (bp_with_events : Blueprint_types.with_events) -> + (* Extract the level from the blueprint. *) + let (Ethereum_types.Qty level) = bp_with_events.blueprint.number in + fetch_block level retry_delays_ms) + blueprint_stream + in + (block_stream, stopper) + (* TODO: #7963 Support Observer Mode Here the catchup mechanism to fetch blueprints is not taken into account as the observer mode is not supported yet *) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index f0f8f4e385134bd5b29a56aab5391d35b8ecc062..e18a5781080cb88bb9c5537f29965e0a54564a2a 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -825,6 +825,93 @@ let test_tezlink_bootstrapped = ~error_msg:"Check timestamp is latest, Expected %R but got %L") ; unit +let test_tezlink_monitor_heads = + register_tezlink_test + ~title:"Test of the monitor/heads RPC" + ~tags:["evm"; "rpc"; "monitor_heads"] + @@ fun {sequencer; client; _} _protocol -> + let open Lwt.Syntax in + (* Prepare the RPC endpoint *) + let rpc_info = Evm_node.rpc_endpoint_record sequencer in + let endpoint = Client.Foreign_endpoint {rpc_info with path = "tezlink"} in + + let total_headers = 4 in + + (* Promise to resolve when we’ve received all expected headers *) + let wait_for_all, notify_all = Lwt.wait () in + let received_count = ref 0 in + + (* Fetch the initial head for monotonicity checks *) + let* initial = + Client.RPC.call ~hooks ~endpoint client @@ RPC.get_chain_block_header () + in + let previous_header = ref initial in + + (* Notifier that is going to be used to indicate the block has been received by the monitor *) + let notify_block_received : (unit -> unit) ref = + ref (fun () -> ()) + (* dummy, will be replaced below *) + in + + (* Process each JSON header line from curl *) + let process_line line process = + !notify_block_received () ; + incr received_count ; + Log.info "Received block header #%d" !received_count ; + let current = JSON.parse ~origin:"curl_monitor_heads" line in + check_header + ~previous_header:!previous_header + ~current_header:current + ~current_timestamp:None + ~chain_id:None ; + previous_header := current ; + if !received_count = total_headers then ( + Log.info "Received all %d headers" total_headers ; + Tezt.Process.terminate process ; + Lwt.wakeup_later notify_all ()) ; + unit + in + + (* Start streaming headers via curl and process lines *) + let start_monitor () = + let url = Evm_node.endpoint sequencer ^ "/tezlink/monitor/heads/main" in + let process = Tezt.Process.spawn "curl" ["--no-buffer"; "--silent"; url] in + let ic = Tezt.Process.stdout process in + let rec loop () = + let* line_opt = Lwt_io.read_line_opt ic in + match line_opt with + | None -> Lwt.return_unit + | Some line -> + let* () = process_line line process in + loop () + in + Lwt.async loop ; + unit + in + + (* Produce blocks *) + let rec produce i = + if i = 0 then Lwt.return_unit + else + let wait_block_received, block_notifier = Lwt.wait () in + (notify_block_received := fun () -> Lwt.wakeup_later block_notifier ()) ; + let* _ = Rpc.produce_block sequencer in + let* () = wait_block_received in + produce (i - 1) + in + + (* Timeout in case headers don’t arrive in time *) + let timeout = + let* () = Evm_node.wait_for_blueprint_applied sequencer total_headers in + let* () = Lwt_unix.sleep 10.0 in + Test.fail ~__LOC__ "Timed out waiting for streamed headers" + in + + let* () = start_monitor () and* () = produce total_headers in + let* () = Lwt.pick [wait_for_all; timeout] in + + unit + let test_make_l2_kernel_installer_config chain_family = Protocol.register_test ~__FILE__ @@ -13659,6 +13746,7 @@ let () = test_tezlink_manager_key [Alpha] ; test_tezlink_counter [Alpha] ; test_tezlink_protocols [Alpha] ; + test_tezlink_monitor_heads [Alpha] ; test_tezlink_version [Alpha] ; test_tezlink_header [Alpha] ; test_tezlink_constants [Alpha] ; diff --git a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out index 06fa5646ac7789b97915f0706ad63de540fe08b1..4356467508e588413a7d04c8c47b7ac4b1cfa95d 100644 --- a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out +++ b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out @@ -1226,6 +1226,14 @@ migrations_from_the_future: { "applied": integer ∈ [-2^30, 2^30], "known": integer ∈ [-2^30, 2^30] } } +missing_block: + description: missing block: received event saying the block for {level} would be available, but reading it from storage failed + level: warning + section: evm_node.dev + json format: + { /* missing_block version 0 */ + "missing_block.v0": integer ∈ [-2^31-1, 2^31] } + missing_blueprints: description: store is missing {count} blueprints in the range [{from}; {to_}] level: error diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out index e66f5a090a2a24155b1d56daf9b6936e854fad38..f72c787eb6434aa5d1666323e26ea29ddf21d765 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out @@ -60,6 +60,9 @@ Available services: happen during the bootstrapping process, and closing the stream at the end. If the node was already bootstrapped, returns the current head immediately. + - GET /tezlink/monitor/heads/ + Monitor all blocks that are successfully validated and applied by the + node and selected as the new head of the given chain. - GET /tezlink/version Get information on the node version - GET /version @@ -130,6 +133,9 @@ Available services: happen during the bootstrapping process, and closing the stream at the end. If the node was already bootstrapped, returns the current head immediately. + - GET /monitor/heads/ + Monitor all blocks that are successfully validated and applied by the + node and selected as the new head of the given chain. - GET /version Get information on the node version diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index 1cd0fea80725c4f0ce2741038042720ee7bc6475..e78220e28705b93bbc6b039d9921f3e1c4ad2a91 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -262,6 +262,7 @@ let evm_node_lib_dev_tezlink = octez_base |> open_ ~m:"TzPervasives"; octez_shell_services; octez_version; + lwt_watcher; ] let evm_node_config = diff --git a/opam/octez-evm-node-libs.opam b/opam/octez-evm-node-libs.opam index 90cd268b1648c0b29479a6f75379402db3dd1066..2a33c685d11f086dfa85c391524e4fa6a005009f 100644 --- a/opam/octez-evm-node-libs.opam +++ b/opam/octez-evm-node-libs.opam @@ -27,9 +27,9 @@ depends: [ "octez-protocol-022-PsRiotum-libs" "tezos-protocol-022-PsRiotum" "octez-shell-libs" + "lwt-watcher" { = "0.2" } "dream" { >= "1.0.0~alpha7" } "octez-version" - "lwt-watcher" { = "0.2" } "lwt-exit" "octez-smart-rollup-wasm-debugger-lib" "tezos-dal-node-services"