From 745e0dff0494fa36828f4d0f2ee093139d26390d Mon Sep 17 00:00:00 2001 From: Luciano Freitas Date: Wed, 2 Apr 2025 13:38:37 +0200 Subject: [PATCH 1/5] Tezlink: implement monitor_heads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raphaël Cauderlier Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Raphaël Cauderlier --- etherlink/bin_node/lib_dev/events.ml | 12 ++++++ etherlink/bin_node/lib_dev/events.mli | 3 ++ .../bin_node/lib_dev/tezlink_services_impl.ml | 41 +++++++++++++++++++ .../EVM node- list events regression.out | 8 ++++ 4 files changed, 64 insertions(+) diff --git a/etherlink/bin_node/lib_dev/events.ml b/etherlink/bin_node/lib_dev/events.ml index c8636142d9ff..cf4e74e04841 100644 --- a/etherlink/bin_node/lib_dev/events.ml +++ b/etherlink/bin_node/lib_dev/events.ml @@ -247,6 +247,16 @@ let missing_chain_id = ~msg:"missing chain id: skipping consistency check with selected network" () +let missing_block = + Internal_event.Simple.declare_1 + ~level:Warning + ~section + ~name:"missing_block" + ~msg: + "missing block: received event saying the block for {level} would be \ + available, but reading it from storage failed" + ("level", Data_encoding.int32) + let importing_snapshot = Internal_event.Simple.declare_1 ~level:Notice @@ -523,6 +533,8 @@ let wasm_pvm_fallback () = emit wasm_pvm_fallback () let missing_chain_id () = emit missing_chain_id () +let missing_block level = emit missing_block level + let multichain_node_singlechain_kernel () = emit multichain_node_singlechain_kernel () diff --git a/etherlink/bin_node/lib_dev/events.mli b/etherlink/bin_node/lib_dev/events.mli index 3089a9560743..7860277f3025 100644 --- a/etherlink/bin_node/lib_dev/events.mli +++ b/etherlink/bin_node/lib_dev/events.mli @@ -144,6 +144,9 @@ val wasm_pvm_fallback : unit -> unit Lwt.t consistency of the stored chain id with the selected network. *) val missing_chain_id : unit -> unit Lwt.t +(** [missing_block level] advertises that the node could not find the block in level despite having received an event saying it would be available. *) +val missing_block : int32 -> unit Lwt.t + (** [multichain_node_singlechain_kernel ()] warns that the node was configured to be executed in a multichain environment, but was given a kernel for a single chain environment. *) diff --git a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml index cfb147d50a24..0a7d88cbdad6 100644 --- a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml +++ b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml @@ -137,6 +137,47 @@ module Make (Backend : Backend) : Tezlink_backend_sig.S = struct let* block_number = shell_block_param_to_block_number block in Backend.tez_nth_block (Z.of_int32 block_number) + let _monitor_heads chain query = + (* TODO: #7831 + take chain into account + For the moment this implementation only supports the main chain, once + the rpc support of tezlink is more stable, we can add support for other chains *) + ignore (chain, query) ; + + let blueprint_stream, stopper = Broadcast.create_blueprint_stream () in + + let retry_delays_ms = [0.; 50.; 100.; 500.] in + + (* Convert blueprint notifications into full blocks, giving the store a + short grace period if the block is not yet written. + Note that this delay does not correspond to the time between blueprint production + and block application, but rather from the time the Database says the data has been + written in the storage and the moment it actually becomes available to be read. *) + let rec fetch_block level = + let open Lwt_syntax in + function + | [] -> + (* After all retries failed, emit warning event. *) + let* () = Events.missing_block @@ Z.to_int32 level in + return_none + | delay_ms :: rest -> ( + let* () = Lwt_unix.sleep (delay_ms /. 1000.) in + let* block_result = Backend.tez_nth_block level in + match block_result with + | Ok block -> return_some block + | Error _ -> fetch_block level rest) + in + + let block_stream = + Lwt_stream.filter_map_s + (fun (bp_with_events : Blueprint_types.with_events) -> + (* Extract the level from the blueprint. *) + let (Ethereum_types.Qty level) = bp_with_events.blueprint.number in + fetch_block level retry_delays_ms) + blueprint_stream + in + (block_stream, stopper) + (* TODO: #7963 Support Observer Mode Here the catchup mechanism to fetch blueprints is not taken into account as the observer mode is not supported yet *) diff --git a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out index 06fa5646ac77..4356467508e5 100644 --- a/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out +++ b/etherlink/tezt/tests/expected/evm_rollup.ml/EVM node- list events regression.out @@ -1226,6 +1226,14 @@ migrations_from_the_future: { "applied": integer ∈ [-2^30, 2^30], "known": integer ∈ [-2^30, 2^30] } } +missing_block: + description: missing block: received event saying the block for {level} would be available, but reading it from storage failed + level: warning + section: evm_node.dev + json format: + { /* missing_block version 0 */ + "missing_block.v0": integer ∈ [-2^31-1, 2^31] } + missing_blueprints: description: store is missing {count} blueprints in the range [{from}; {to_}] level: error -- GitLab From be5d852a2b4a319099b60ceb2ad80a6dc5b2183f Mon Sep 17 00:00:00 2001 From: Luciano Freitas Date: Wed, 2 Apr 2025 13:46:43 +0200 Subject: [PATCH 2/5] Tezlink: import monitor_heads service definition --- etherlink/bin_node/lib_dev/tezlink/tezos_services.ml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml index 7ea27c166814..36f9e08db9f9 100644 --- a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml +++ b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml @@ -372,6 +372,17 @@ module Imported_services = struct Block_hash.t * Time.Protocol.t ) Constants_services.RPC_service.t = import_service Tezos_shell_services.Monitor_services.S.bootstrapped + + let _monitor_heads : + ( [`GET], + unit, + unit * chain, + < protocols : Protocol_hash.t list + ; next_protocols : Protocol_hash.t list >, + unit, + Block_hash.t * Block_header.t ) + Tezos_rpc.Service.t = + Tezos_shell_services.Monitor_services.S.heads end let chain_directory_path = Tezos_shell_services.Chain_services.path -- GitLab From ba184518a2564cd7c1b7e7080c281b7d9f5a5881 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rapha=C3=ABl=20Cauderlier?= Date: Fri, 18 Apr 2025 06:33:05 +0200 Subject: [PATCH 3/5] Tezlink/Node: lib-tezlink depends on lwt-watcher --- etherlink/bin_node/lib_dev/tezlink/dune | 3 ++- manifest/product_etherlink.ml | 1 + opam/octez-evm-node-libs.opam | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tezlink/dune b/etherlink/bin_node/lib_dev/tezlink/dune index af9f1515af5f..2341c8040a6a 100644 --- a/etherlink/bin_node/lib_dev/tezlink/dune +++ b/etherlink/bin_node/lib_dev/tezlink/dune @@ -11,7 +11,8 @@ tezos-protocol-022-PsRiotum.parameters octez-libs.base octez-shell-libs.shell-services - octez-libs.version) + octez-libs.version + lwt-watcher) (flags (:standard) -open Evm_node_lib_dev_encoding diff --git a/manifest/product_etherlink.ml b/manifest/product_etherlink.ml index 1cd0fea80725..e78220e28705 100644 --- a/manifest/product_etherlink.ml +++ b/manifest/product_etherlink.ml @@ -262,6 +262,7 @@ let evm_node_lib_dev_tezlink = octez_base |> open_ ~m:"TzPervasives"; octez_shell_services; octez_version; + lwt_watcher; ] let evm_node_config = diff --git a/opam/octez-evm-node-libs.opam b/opam/octez-evm-node-libs.opam index 90cd268b1648..2a33c685d11f 100644 --- a/opam/octez-evm-node-libs.opam +++ b/opam/octez-evm-node-libs.opam @@ -27,9 +27,9 @@ depends: [ "octez-protocol-022-PsRiotum-libs" "tezos-protocol-022-PsRiotum" "octez-shell-libs" + "lwt-watcher" { = "0.2" } "dream" { >= "1.0.0~alpha7" } "octez-version" - "lwt-watcher" { = "0.2" } "lwt-exit" "octez-smart-rollup-wasm-debugger-lib" "tezos-dal-node-services" -- GitLab From 4c4885e5db079e50e7034262dfff22865e13d63b Mon Sep 17 00:00:00 2001 From: Luciano Freitas Date: Thu, 10 Apr 2025 11:16:34 +0200 Subject: [PATCH 4/5] Tezlink: register monitor_heads service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Raphaël Cauderlier Tezt: update regressions Apply 1 suggestion(s) to 1 file(s) Co-authored-by: Raphaël Cauderlier --- .../lib_dev/tezlink/tezlink_backend_sig.ml | 3 ++ .../lib_dev/tezlink/tezos_services.ml | 38 ++++++++++++++++++- .../bin_node/lib_dev/tezlink_services_impl.ml | 2 +- .../Alpha- Test the -describe endpoint.out | 6 +++ 4 files changed, 47 insertions(+), 2 deletions(-) diff --git a/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml b/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml index 7e9f2174dad9..f862f82aa481 100644 --- a/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml +++ b/etherlink/bin_node/lib_dev/tezlink/tezlink_backend_sig.ml @@ -35,4 +35,7 @@ module type S = sig val block_hash : [`Main] -> block_param -> Ethereum_types.block_hash option tzresult Lwt.t + + val monitor_heads : + [> `Main] -> 'a -> L2_types.Tezos_block.t Lwt_stream.t * Lwt_watcher.stopper end diff --git a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml index 36f9e08db9f9..cdd4d226e648 100644 --- a/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml +++ b/etherlink/bin_node/lib_dev/tezlink/tezos_services.ml @@ -373,7 +373,7 @@ module Imported_services = struct Constants_services.RPC_service.t = import_service Tezos_shell_services.Monitor_services.S.bootstrapped - let _monitor_heads : + let monitor_heads : ( [`GET], unit, unit * chain, @@ -529,6 +529,41 @@ let register_chain_services ~l2_chain_id base_dir (Tezos_rpc.Directory.prefix chain_directory_path dir) +(** Builds the directory registering the service at `/monitor/heads/`. *) +let register_monitor_heads (module Backend : Tezlink_backend_sig.S) dir = + Tezos_rpc.Directory.gen_register + dir + Imported_services.monitor_heads + (fun ((), chain) query () -> + let stream, stopper = Backend.monitor_heads chain query in + let shutdown () = Lwt_watcher.shutdown stopper in + let next () = + let open Lwt_syntax in + let* block_opt = Lwt_stream.get stream in + match block_opt with + | None -> return_none + | Some block -> ( + match + ( Protocol_types.Block_header.tezlink_block_to_shell_header block, + Protocol_types.ethereum_to_tezos_block_hash block.hash ) + with + | Ok shell, Ok hash -> + return_some + ( hash, + ({ + shell; + protocol_data = + Data_encoding.Binary.to_bytes_exn + Imported_protocol.Block_header_repr + .protocol_data_encoding + Mock.protocol_data; + } + : Block_header.t) ) + | Error _, _ | _, Error _ -> return_none) + in + + Tezos_rpc.Answer.return_stream {next; shutdown}) + (** Builds the root directory. *) let build_dir ~l2_chain_id backend = let (module Backend : Tezlink_backend_sig.S) = backend in @@ -542,6 +577,7 @@ let build_dir ~l2_chain_id backend = let open Result_syntax in let* hash = Protocol_types.ethereum_to_tezos_block_hash input_hash in return (hash, input_time)) + |> register_monitor_heads backend |> register ~service:Imported_services.version ~impl:(fun () () () -> version ()) diff --git a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml index 0a7d88cbdad6..669cf93c7f86 100644 --- a/etherlink/bin_node/lib_dev/tezlink_services_impl.ml +++ b/etherlink/bin_node/lib_dev/tezlink_services_impl.ml @@ -137,7 +137,7 @@ module Make (Backend : Backend) : Tezlink_backend_sig.S = struct let* block_number = shell_block_param_to_block_number block in Backend.tez_nth_block (Z.of_int32 block_number) - let _monitor_heads chain query = + let monitor_heads chain query = (* TODO: #7831 take chain into account For the moment this implementation only supports the main chain, once diff --git a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out index e66f5a090a2a..f72c787eb643 100644 --- a/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out +++ b/etherlink/tezt/tests/expected/evm_sequencer.ml/Alpha- Test the -describe endpoint.out @@ -60,6 +60,9 @@ Available services: happen during the bootstrapping process, and closing the stream at the end. If the node was already bootstrapped, returns the current head immediately. + - GET /tezlink/monitor/heads/ + Monitor all blocks that are successfully validated and applied by the + node and selected as the new head of the given chain. - GET /tezlink/version Get information on the node version - GET /version @@ -130,6 +133,9 @@ Available services: happen during the bootstrapping process, and closing the stream at the end. If the node was already bootstrapped, returns the current head immediately. + - GET /monitor/heads/ + Monitor all blocks that are successfully validated and applied by the + node and selected as the new head of the given chain. - GET /version Get information on the node version -- GitLab From daa736be5930f39c1aa4fba8dec4d5bd847c7d6b Mon Sep 17 00:00:00 2001 From: Luciano Freitas Date: Fri, 18 Apr 2025 11:13:23 +0200 Subject: [PATCH 5/5] Tezlink/Tezt: add monitor_heads test --- etherlink/tezt/tests/evm_sequencer.ml | 88 +++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index f0f8f4e38513..e18a5781080c 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -825,6 +825,93 @@ let test_tezlink_bootstrapped = ~error_msg:"Check timestamp is latest, Expected %R but got %L") ; unit +let test_tezlink_monitor_heads = + register_tezlink_test + ~title:"Test of the monitor/heads RPC" + ~tags:["evm"; "rpc"; "monitor_heads"] + @@ fun {sequencer; client; _} _protocol -> + let open Lwt.Syntax in + (* Prepare the RPC endpoint *) + let rpc_info = Evm_node.rpc_endpoint_record sequencer in + let endpoint = Client.Foreign_endpoint {rpc_info with path = "tezlink"} in + + let total_headers = 4 in + + (* Promise to resolve when we’ve received all expected headers *) + let wait_for_all, notify_all = Lwt.wait () in + let received_count = ref 0 in + + (* Fetch the initial head for monotonicity checks *) + let* initial = + Client.RPC.call ~hooks ~endpoint client @@ RPC.get_chain_block_header () + in + let previous_header = ref initial in + + (* Notifier that is going to be used to indicate the block has been received by the monitor *) + let notify_block_received : (unit -> unit) ref = + ref (fun () -> ()) + (* dummy, will be replaced below *) + in + + (* Process each JSON header line from curl *) + let process_line line process = + !notify_block_received () ; + incr received_count ; + Log.info "Received block header #%d" !received_count ; + let current = JSON.parse ~origin:"curl_monitor_heads" line in + check_header + ~previous_header:!previous_header + ~current_header:current + ~current_timestamp:None + ~chain_id:None ; + previous_header := current ; + if !received_count = total_headers then ( + Log.info "Received all %d headers" total_headers ; + Tezt.Process.terminate process ; + Lwt.wakeup_later notify_all ()) ; + unit + in + + (* Start streaming headers via curl and process lines *) + let start_monitor () = + let url = Evm_node.endpoint sequencer ^ "/tezlink/monitor/heads/main" in + let process = Tezt.Process.spawn "curl" ["--no-buffer"; "--silent"; url] in + let ic = Tezt.Process.stdout process in + let rec loop () = + let* line_opt = Lwt_io.read_line_opt ic in + match line_opt with + | None -> Lwt.return_unit + | Some line -> + let* () = process_line line process in + loop () + in + Lwt.async loop ; + unit + in + + (* Produce blocks *) + let rec produce i = + if i = 0 then Lwt.return_unit + else + let wait_block_received, block_notifier = Lwt.wait () in + (notify_block_received := fun () -> Lwt.wakeup_later block_notifier ()) ; + let* _ = Rpc.produce_block sequencer in + let* () = wait_block_received in + produce (i - 1) + in + + (* Timeout in case headers don’t arrive in time *) + let timeout = + let* () = Evm_node.wait_for_blueprint_applied sequencer total_headers in + let* () = Lwt_unix.sleep 10.0 in + Test.fail ~__LOC__ "Timed out waiting for streamed headers" + in + + let* () = start_monitor () and* () = produce total_headers in + let* () = Lwt.pick [wait_for_all; timeout] in + + unit + let test_make_l2_kernel_installer_config chain_family = Protocol.register_test ~__FILE__ @@ -13659,6 +13746,7 @@ let () = test_tezlink_manager_key [Alpha] ; test_tezlink_counter [Alpha] ; test_tezlink_protocols [Alpha] ; + test_tezlink_monitor_heads [Alpha] ; test_tezlink_version [Alpha] ; test_tezlink_header [Alpha] ; test_tezlink_constants [Alpha] ; -- GitLab