From cf1f464da0856957bad5a3f571fab2d237d95e8d Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 27 Nov 2024 11:52:11 +0100 Subject: [PATCH 1/2] evm/node: register error of rollup node follower --- .../bin_node/lib_dev/rollup_node_follower.ml | 20 +++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/etherlink/bin_node/lib_dev/rollup_node_follower.ml b/etherlink/bin_node/lib_dev/rollup_node_follower.ml index 7e9da33c63bf..4ffb7fdf2008 100644 --- a/etherlink/bin_node/lib_dev/rollup_node_follower.ml +++ b/etherlink/bin_node/lib_dev/rollup_node_follower.ml @@ -14,6 +14,26 @@ let process_new_block ~proxy ~finalized_level () = type error += Connection_lost | Connection_timeout +let () = + register_error_kind + `Temporary + ~id:"rollup_node_follower_connection_lost" + ~title:"Rollup_node_follower_connection_lost" + ~description:"Connection to the rollup node was lost" + Data_encoding.unit + (function Connection_lost -> Some () | _ -> None) + (fun () -> Connection_lost) ; + register_error_kind + `Temporary + ~id:"rollup_node_follower_connection_timeout" + ~title:"Rollup_node_follower_connection_timeout" + ~description: + "Connection to the rollup node was timeouted, e.g. the rollup node \ + stream was staling" + Data_encoding.unit + (function Connection_timeout -> Some () | _ -> None) + (fun () -> Connection_timeout) + (** [process_finalized_level ~oldest_rollup_node_known_l1_level ~finalized_level ~rollup_node_endpoint] process the rollup node block level [finalized_level] iff it's known by the rollup node -- GitLab From c932add110fccee55b34bf56360388ea3ba836df Mon Sep 17 00:00:00 2001 From: Sylvain Ribstein Date: Wed, 27 Nov 2024 11:52:34 +0100 Subject: [PATCH 2/2] evm/node: catchup of evm event is now in the evm_event_follower --- etherlink/CHANGES_NODE.md | 1 + .../bin_node/lib_dev/evm_events_follower.ml | 123 +++++++++++++++--- .../lib_dev/evm_events_follower_events.ml | 13 ++ .../lib_dev/evm_events_follower_types.ml | 19 ++- .../bin_node/lib_dev/rollup_node_follower.ml | 63 ++------- 5 files changed, 148 insertions(+), 71 deletions(-) diff --git a/etherlink/CHANGES_NODE.md b/etherlink/CHANGES_NODE.md index 587d58cbfd96..df8923004381 100644 --- a/etherlink/CHANGES_NODE.md +++ b/etherlink/CHANGES_NODE.md @@ -10,6 +10,7 @@ ### Internals +- Make the rollup node follower more resilient. (!15745) - Use a single RPC (if the rollup node supports it) when fetching EVM events. (!15629, !15703) - Private RPC `produceBlock` can produce a block without delayed transactions, diff --git a/etherlink/bin_node/lib_dev/evm_events_follower.ml b/etherlink/bin_node/lib_dev/evm_events_follower.ml index 6ad66aee98ce..680d1f7400b9 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower.ml @@ -14,7 +14,12 @@ type parameters = { module StringSet = Set.Make (String) module Types = struct - type state = parameters + type state = { + rollup_node_endpoint : Uri.t; + filter_event : Evm_events.t -> bool; + keep_alive : bool; + mutable last_l1_level : int32 option; + } type nonrec parameters = parameters end @@ -68,7 +73,7 @@ let fetch_event ({rollup_node_endpoint; keep_alive; _} : Types.state) return event_opt let fetch_events_one_by_one - ({rollup_node_endpoint; keep_alive; filter_event} as state : Types.state) + ({rollup_node_endpoint; keep_alive; filter_event; _} as state : Types.state) rollup_block_lvl = let open Lwt_result_syntax in let* nb_of_events_bytes = @@ -100,7 +105,7 @@ let fetch_events_one_by_one events let fetch_events_at_once - ({rollup_node_endpoint; keep_alive; filter_event} : Types.state) + ({rollup_node_endpoint; keep_alive; filter_event; _} : Types.state) rollup_block_lvl = let open Lwt_result_syntax in let path = Durable_storage_path.Evm_events.events in @@ -169,11 +174,80 @@ let fetch_events = let*! () = Evm_events_follower_events.fallback () in fetch_events_one_by_one state rollup_block_lvl -let on_new_head state rollup_block_lvl = +let apply_events state rollup_block_lvl = let open Lwt_result_syntax in let* events = fetch_events state rollup_block_lvl in Evm_context.apply_evm_events ~finalized_level:rollup_block_lvl events +type error += Evm_events_follower_is_closed + +let () = + register_error_kind + `Permanent + ~id:"evm_events_follower_is_closed" + ~title:"Evm_events_follower_is_closed" + ~description: + "Tried to process evm events from the rollup node but the worker is \ + closed" + Data_encoding.unit + (function Evm_events_follower_is_closed -> Some () | _ -> None) + (fun () -> Evm_events_follower_is_closed) + +let new_rollup_block worker rollup_level = + let open Lwt_result_syntax in + let state = Worker.state worker in + let add_request level = + let*! (pushed : bool) = + Worker.Queue.push_request worker (Apply_evm_events level) + in + if not pushed then + (* this should not happen as we are called from within in the + worker *) + tzfail Evm_events_follower_is_closed + else return_unit + in + (* add request for fetching evm events for rollup node block going + from [from] to [to_] inclusive. *) + let[@tailrec] rec aux ~from ~to_ = + if from > to_ then + failwith + "Internal error: The catchup of evm_event went too far, it should be \ + impossible. Catching up from %ld to %ld." + from + to_ + else + let* () = add_request from in + if from = to_ then (* we are catching up *) return_unit + else aux ~from:(Int32.succ from) ~to_ + in + match state.last_l1_level with + | Some last_l1_level + when rollup_level = last_l1_level || rollup_level = Int32.pred last_l1_level + -> + let*! () = + Evm_events_follower_events.rollup_level_is_already_processed + rollup_level + in + return_unit + | Some last_l1_level when rollup_level < last_l1_level -> + failwith + "Internal error: the received block from the rollup node is too old. \ + Received level: %ld, last_l1_level: %ld" + rollup_level + last_l1_level + | None | Some _ -> + let* from = + match state.last_l1_level with + | Some last_l1_level -> return @@ Int32.succ last_l1_level + | None -> + let*! () = Evm_store_events.no_l1_latest_level_to_catch_up () in + return rollup_level + in + (* we apply all evm events from [current l1 + 1] to [rollup_head] *) + let* () = aux ~from ~to_:rollup_level in + state.last_l1_level <- Some rollup_level ; + return_unit + module Handlers = struct open Evm_events_follower_types @@ -184,18 +258,22 @@ module Handlers = struct worker -> (r, request_error) Request.t -> (r, request_error) result Lwt.t = fun worker request -> - let open Lwt_result_syntax in match request with - | Request.New_rollup_node_block rollup_block_lvl -> - protect @@ fun () -> - let* () = on_new_head (Worker.state worker) rollup_block_lvl in - return_unit + | Request.New_rollup_node_block rollup_head -> + protect @@ fun () -> new_rollup_block worker rollup_head + | Apply_evm_events rollup_lvl -> + protect @@ fun () -> apply_events (Worker.state worker) rollup_lvl type launch_error = error trace - let on_launch _w () (parameters : Types.parameters) = - let state = parameters in - Lwt_result_syntax.return state + let on_launch _w () + ({rollup_node_endpoint; filter_event; keep_alive} : Types.parameters) = + let open Lwt_result_syntax in + let* last_l1_level = Evm_context.last_known_l1_level () in + let state : Types.state = + {rollup_node_endpoint; filter_event; keep_alive; last_l1_level} + in + return state let on_error : type r request_error. @@ -214,6 +292,13 @@ module Handlers = struct errs in return_unit + | Request.Apply_evm_events _ -> + let*! () = + Evm_events_follower_events.worker_request_failed + (Request.view req) + errs + in + return_unit let on_completion _ _ _ _ = Lwt.return_unit @@ -258,11 +343,19 @@ let shutdown () = let*! () = Worker.shutdown w in return_unit +let handle_request_error rq = + let open Lwt_syntax in + let* rq in + match rq with + | Ok res -> return_ok res + | Error (Worker.Request_error errs) -> Lwt.return_error errs + | Error (Closed None) -> Lwt.return_error [Evm_events_follower_is_closed] + | Error (Closed (Some errs)) -> Lwt.return_error errs + | Error (Any exn) -> Lwt.return_error [Exn exn] + let worker_add_request ~request = - let open Lwt_result_syntax in bind_worker @@ fun w -> - let*! (_pushed : bool) = Worker.Queue.push_request w request in - return_unit + Worker.Queue.push_request_and_wait w request |> handle_request_error let new_rollup_block rollup_level = worker_add_request ~request:(New_rollup_node_block rollup_level) diff --git a/etherlink/bin_node/lib_dev/evm_events_follower_events.ml b/etherlink/bin_node/lib_dev/evm_events_follower_events.ml index b68da9bb7773..b2028fb375e5 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower_events.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower_events.ml @@ -130,6 +130,16 @@ module Event = struct ("expected", Data_encoding.int31) ("fetched", Data_encoding.int31) + let rollup_level_already_processed = + declare_1 + ~section + ~name:"evm_events_follower_rollup_level_already_processed" + ~msg: + "The rollup level {level} given to the evm events follower was already \ + seen, skipping it." + ~level:Info + ("level", Data_encoding.int32) + let fallback = declare_0 ~section @@ -185,6 +195,9 @@ let unexpected_number_of_events ~expected ~fetched = Event.unexpected_number_of_events (expected, fetched) +let rollup_level_is_already_processed rollup_level = + Internal_event.Simple.emit Event.rollup_level_already_processed rollup_level + let fallback () = Internal_event.Simple.emit Event.fallback () let flush_delayed_inbox ~timestamp Ethereum_types.(Qty level) = diff --git a/etherlink/bin_node/lib_dev/evm_events_follower_types.ml b/etherlink/bin_node/lib_dev/evm_events_follower_types.ml index 5fb3c90c0475..d1be3970c4b0 100644 --- a/etherlink/bin_node/lib_dev/evm_events_follower_types.ml +++ b/etherlink/bin_node/lib_dev/evm_events_follower_types.ml @@ -6,7 +6,9 @@ (*****************************************************************************) module Request = struct - type ('a, 'b) t = New_rollup_node_block : Int32.t -> (unit, error trace) t + type ('a, 'b) t = + | New_rollup_node_block : Int32.t -> (unit, error trace) t + | Apply_evm_events : Int32.t -> (unit, error trace) t type view = View : _ t -> view @@ -23,12 +25,25 @@ module Request = struct (req "request" (constant "new_rollup_node_block")) (req "rollup_head" int32)) (function - | View (New_rollup_node_block rollup_head) -> Some ((), rollup_head)) + | View (New_rollup_node_block rollup_head) -> Some ((), rollup_head) + | _ -> None) (fun ((), rollup_head) -> View (New_rollup_node_block rollup_head)); + case + (Tag 1) + ~title:"Apply_evm_events" + (obj2 + (req "request" (constant "apply_evm_events")) + (req "rollup_lvl" int32)) + (function + | View (Apply_evm_events rollup_lvl) -> Some ((), rollup_lvl) + | _ -> None) + (fun ((), rollup_head) -> View (Apply_evm_events rollup_head)); ] let pp ppf (View r) = match r with | New_rollup_node_block rollup_head -> Format.fprintf ppf "New_rollup_node_block (level %ld)" rollup_head + | Apply_evm_events rollup_lvl -> + Format.fprintf ppf "Apply_evm_events (level %ld)" rollup_lvl end diff --git a/etherlink/bin_node/lib_dev/rollup_node_follower.ml b/etherlink/bin_node/lib_dev/rollup_node_follower.ml index 4ffb7fdf2008..c412db67e916 100644 --- a/etherlink/bin_node/lib_dev/rollup_node_follower.ml +++ b/etherlink/bin_node/lib_dev/rollup_node_follower.ml @@ -112,32 +112,16 @@ let[@tailrec] rec connect_to_stream ?(count = 0) ~rollup_node_endpoint () = ~rollup_node_endpoint () -(** [catchup_evm_event ~rollup_node_endpoint ~from ~to_] catchup on - evm events from [from] to [to_] from the rollup node. *) -let[@tailrec] rec catchup_evm_event ~rollup_node_endpoint ~from ~to_ = - let open Lwt_result_syntax in - if from = to_ then (*we are catch up *) return_unit - else if from > to_ then - failwith - "Internal error: The catchup of evm_event went too far, it should be \ - impossible." - else - (* reading event from [from] level then catching up from [from + - 1]. *) - let next_l1_level = Int32.succ from in - let* () = Evm_events_follower.new_rollup_block next_l1_level in - catchup_evm_event ~rollup_node_endpoint ~from:next_l1_level ~to_ - -(** [catchup_and_next_block ~proxy ~catchup_event ~connection] - returns the next block found in [connection.stream]. +(** [get_next_block ~proxy ~connection] returns the next block found + in [connection.stream]. - If the connection drops then it tries to reconnect the stream using [connect_to_stream]. - If the connection timeout (takes more than [connection.timeout]) or if the connection fails then reconnect with [connect_to_stream] - and try to fetch [catchup_and_next_block] with that new stream.*) -let[@tailrec] rec catchup_and_next_block ~proxy ~catchup_event ~connection = + and try to fetch [get_next_block] with that new stream.*) +let[@tailrec] rec get_next_block ~proxy ~connection = let open Lwt_result_syntax in let get_promise () = let*! res = Lwt_stream.get connection.stream in @@ -151,25 +135,7 @@ let[@tailrec] rec catchup_and_next_block ~proxy ~catchup_event ~connection = Lwt.pick [get_promise (); timeout_promise connection.timeout] in match get_or_timeout with - | Ok block -> - let* () = - if catchup_event then - let* latest_known_l1_level = Evm_context.last_known_l1_level () in - match latest_known_l1_level with - | None -> - (* sequencer has no value to start from, it must be the - initial start. *) - let*! () = Evm_store_events.no_l1_latest_level_to_catch_up () in - return_unit - | Some from -> - let to_ = Sc_rollup_block.(Int32.(sub block.header.level 2l)) in - catchup_evm_event - ~rollup_node_endpoint:connection.rollup_node_endpoint - ~from - ~to_ - else return_unit - in - return (block, connection) + | Ok block -> return (block, connection) | Error ([(Connection_lost | Connection_timeout)] as errs) -> connection.close () ; let*! () = Rollup_node_follower_events.connection_failed errs in @@ -179,13 +145,7 @@ let[@tailrec] rec catchup_and_next_block ~proxy ~catchup_event ~connection = ~rollup_node_endpoint:connection.rollup_node_endpoint () in - (catchup_and_next_block [@tailcall]) - ~proxy - ~catchup_event:(not proxy) - (* catchup event if not in proxy mode, proxy does not have - `evm_context` and would fail to fetch some data. Else - catchup possible missed event.*) - ~connection + (get_next_block [@tailcall]) ~proxy ~connection | Error errs -> let*! () = Rollup_node_follower_events.stream_failed errs in fail errs @@ -194,15 +154,13 @@ let[@tailrec] rec catchup_and_next_block ~proxy ~catchup_event ~connection = ~oldest_rollup_node_known_l1_level ~connection] main loop to process the block. - get the current rollup node block with [catchup_and_next_block], process it + get the current rollup node block with [get_next_block], process it with [process_finalized_level] then loop over. *) -let[@tailrec] rec loop_on_rollup_node_stream ~keep_alive ~catchup_event ~proxy +let[@tailrec] rec loop_on_rollup_node_stream ~keep_alive ~proxy ~oldest_rollup_node_known_l1_level ~connection () = let open Lwt_result_syntax in let start_time = Unix.gettimeofday () in - let* block, connection = - catchup_and_next_block ~proxy ~catchup_event ~connection - in + let* block, connection = get_next_block ~proxy ~connection in let elapsed = Unix.gettimeofday () -. start_time in let connection = update_timeout ~elapsed ~connection in let finalized_level = Sc_rollup_block.(Int32.(sub block.header.level 2l)) in @@ -215,7 +173,6 @@ let[@tailrec] rec loop_on_rollup_node_stream ~keep_alive ~catchup_event ~proxy in (loop_on_rollup_node_stream [@tailcall]) ~keep_alive - ~catchup_event:false ~proxy ~oldest_rollup_node_known_l1_level ~connection @@ -233,8 +190,6 @@ let start ~keep_alive ~proxy ~rollup_node_endpoint () = let* connection = connect_to_stream ~rollup_node_endpoint () in loop_on_rollup_node_stream ~keep_alive - (* when fetching first block it should try to catchup if needed *) - ~catchup_event:(not proxy) ~proxy ~oldest_rollup_node_known_l1_level ~connection -- GitLab