From 653ac783d41ad5b11e555450dbca515e08d809a9 Mon Sep 17 00:00:00 2001 From: Thomas Letan Date: Tue, 10 Dec 2024 11:55:45 +0100 Subject: [PATCH] EVM Node: Introduce a transaction layer for the `Evm_context` state MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The main job of the `Evm_context` worker is to ensure the node’s view on its head remains consistent with its local store (consisting of an Irmin repository, and a SQLite database). The worker keeps some data about the head in its state, in a type called `session_state`. Following a well-established but tricky pattern of the Octez codebase, this type contains mutable fields, which can be modified in-place without having to return the new state. Although it is handy to reduce the verbosity of the code and the type signature, the cost becomes clear when having to deal with cancelable functions. Every functions interacting with the on-disk store of the node is potentially cancelable or can fail, and we want to be resilient to that. The previous approach was to delay any modification of the `session_state` type to after the SQLite requests and Irmin commits have succeeded. But recent refactorings have made it clear this naive approach is not sustainable. It is too easy to make a mistake. As a consequence, we propose an alternative take, with the introduction of a `Transaction` inner-module allowing to modify the session in-place while still having the possibility to abort and retain the initial session values. --- etherlink/bin_node/lib_dev/evm_context.ml | 231 +++++++++++++--------- 1 file changed, 136 insertions(+), 95 deletions(-) diff --git a/etherlink/bin_node/lib_dev/evm_context.ml b/etherlink/bin_node/lib_dev/evm_context.ml index 20bd652e8a35..6d7e5cd3160d 100644 --- a/etherlink/bin_node/lib_dev/evm_context.ml +++ b/etherlink/bin_node/lib_dev/evm_context.ml @@ -67,18 +67,6 @@ type store_info = { current_number : Ethereum_types.quantity; } -let session_to_head_info session = - { - evm_state = session.evm_state; - finalized_number = session.finalized_number; - next_blueprint_number = session.next_blueprint_number; - current_block_hash = session.current_block_hash; - pending_upgrade = - Option.map - (fun pending_upgrade -> pending_upgrade.Evm_store.kernel_upgrade) - session.pending_upgrade; - } - let pvm_config ctxt = Config.config ~preimage_directory:ctxt.preimages @@ -145,9 +133,101 @@ let lock_data_dir ~data_dir = return_unit module State = struct - let with_store_transaction ctxt k = - Evm_store.use ctxt.store @@ fun conn -> - Evm_store.with_transaction conn @@ fun conn -> k conn + let current_blueprint_number ctxt = + let (Qty next_blueprint_number) = ctxt.session.next_blueprint_number in + Ethereum_types.Qty (Z.pred next_blueprint_number) + + module Transaction : sig + (** [run ctxt k] executes [k ctxt' conn] where [ctxt'] is a copy of [ctxt] + whose [session] field can be safely modified in-place and [conn] is a + handler to a SQLite connection with a pre-started transaction. + + If [k] returns an [Error] or raises an exception, the changes made to + [ctxt'] by [k] are not applied to [ctxt] (i.e., it is left unchanged), + and the SQLite transaction is aborted. *) + val run : t -> (t -> Sqlite.conn -> 'a tzresult Lwt.t) -> 'a tzresult Lwt.t + + val initialize_head_info : t -> unit + end = struct + let with_store_transaction ctxt k = + Evm_store.use ctxt.store @@ fun conn -> + Evm_store.with_transaction conn @@ fun conn -> k conn + + (* [dup session] creates a copy of [session] that can be safely modified + without altering the initial value. *) + let dup + { + context; + finalized_number; + next_blueprint_number; + current_block_hash; + pending_upgrade; + evm_state; + last_split_block; + } = + { + context; + finalized_number; + next_blueprint_number; + current_block_hash; + pending_upgrade; + evm_state; + last_split_block; + } + + (* [apply session session'] modifies [session] in-place to match the content of [session']. *) + let apply session + { + context; + finalized_number; + next_blueprint_number; + current_block_hash; + pending_upgrade; + evm_state; + last_split_block; + } = + session.context <- context ; + session.finalized_number <- finalized_number ; + session.next_blueprint_number <- next_blueprint_number ; + session.current_block_hash <- current_block_hash ; + session.pending_upgrade <- pending_upgrade ; + session.evm_state <- evm_state ; + session.last_split_block <- last_split_block + + let session_to_head_info session = + { + evm_state = session.evm_state; + finalized_number = session.finalized_number; + next_blueprint_number = session.next_blueprint_number; + current_block_hash = session.current_block_hash; + pending_upgrade = + Option.map + (fun pending_upgrade -> pending_upgrade.Evm_store.kernel_upgrade) + session.pending_upgrade; + } + + let run ctxt (k : t -> Sqlite.conn -> 'a tzresult Lwt.t) : 'a tzresult Lwt.t + = + let open Lwt_result_syntax in + let ctxt' = {ctxt with session = dup ctxt.session} in + with_store_transaction ctxt @@ fun conn -> + let*! res = k ctxt' conn in + match res with + | Ok res -> + apply ctxt.session ctxt'.session ; + let*! head_info in + head_info := session_to_head_info ctxt.session ; + let (Qty level) = current_blueprint_number ctxt in + let (Qty finalized_level) = ctxt.session.finalized_number in + Metrics.set_confirmed_level ~level:finalized_level ; + Metrics.set_level ~level ; + return res + | Error err -> fail err + + let initialize_head_info ctxt = + let first_head = ref (session_to_head_info ctxt.session) in + Lwt.wakeup head_info_waker first_head + end let store_path ~data_dir = Filename.Infix.(data_dir // "store") @@ -361,8 +441,6 @@ module State = struct ctxt.session.current_block_hash <- current_block_hash ; ctxt.session.context <- context ; ctxt.session.pending_upgrade <- pending_upgrade ; - let*! head_info in - head_info := session_to_head_info ctxt.session ; return evm_state | None -> let*! () = @@ -370,8 +448,7 @@ module State = struct in tzfail exit_error - let blueprint_applied_event ctxt conn evm_state on_success - latest_finalized_level + let blueprint_applied_event ctxt conn evm_state latest_finalized_level ({number = Qty number; hash = expected_block_hash} : Evm_events.Blueprint_applied.t) = let open Lwt_result_syntax in @@ -397,7 +474,7 @@ module State = struct Evm_events_follower_events.upstream_blueprint_applied (number, expected_block_hash) in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) else let*! () = Evm_events_follower_events.diverged @@ -413,7 +490,7 @@ module State = struct else (* Observers needs to reset at finalized level. *) let* evm_state = reset_to_finalized_level exit_error ctxt conn in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) | None when ctxt.fail_on_missing_blueprint -> let*! () = Evm_events_follower_events.missing_blueprint @@ -428,11 +505,7 @@ module State = struct (Qty number) expected_block_hash in - return (evm_state, on_success, latest_finalized_level) - - let current_blueprint_number ctxt = - let (Qty next_blueprint_number) = ctxt.session.next_blueprint_number in - Ethereum_types.Qty (Z.pred next_blueprint_number) + return (evm_state, latest_finalized_level) let () = register_error_kind @@ -676,9 +749,6 @@ module State = struct split_info ; Broadcast.notify @@ Broadcast.Blueprint blueprint_with_events ; if applied_upgrade then ctxt.session.pending_upgrade <- None ; - let* head_info in - head_info := session_to_head_info ctxt.session ; - Metrics.set_level ~level ; return_unit type error += @@ -715,22 +785,17 @@ module State = struct in return (payload, delayed_transactions) - let rec apply_blueprint ?conn ?(events = []) ctxt timestamp payload + let rec apply_blueprint ?(events = []) ctxt conn timestamp payload delayed_transactions = let open Lwt_result_syntax in let* evm_state, context, current_block, applied_kernel_upgrade, split_info = - let kont conn = - let* () = apply_evm_events conn ctxt events in - apply_blueprint_store_unsafe - ctxt - conn - timestamp - payload - delayed_transactions - in - match conn with - | None -> with_store_transaction ctxt @@ fun conn -> kont conn - | Some conn -> kont conn + let* () = apply_evm_events conn ctxt events in + apply_blueprint_store_unsafe + ctxt + conn + timestamp + payload + delayed_transactions in let kernel_upgrade = match ctxt.session.pending_upgrade with @@ -759,22 +824,18 @@ module State = struct in return_unit - and apply_evm_event_unsafe on_success ctxt conn evm_state event - latest_finalized_level = + and apply_evm_event_unsafe ctxt conn evm_state event latest_finalized_level = let open Lwt_result_syntax in let*! () = Evm_events_follower_events.new_event event in match event with | Evm_events.Upgrade_event upgrade -> - let on_success session = - session.pending_upgrade <- - Some - { - kernel_upgrade = upgrade; - injected_before = ctxt.session.next_blueprint_number; - } ; - background_preemptive_download ctxt upgrade ; - on_success session - in + ctxt.session.pending_upgrade <- + Some + { + kernel_upgrade = upgrade; + injected_before = ctxt.session.next_blueprint_number; + } ; + background_preemptive_download ctxt upgrade ; let payload = Evm_events.Upgrade.to_bytes upgrade |> String.of_bytes in let*! evm_state = Evm_state.modify @@ -789,7 +850,7 @@ module State = struct upgrade in let*! () = Events.pending_upgrade upgrade in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) | Sequencer_upgrade_event sequencer_upgrade -> let payload = Evm_events.Sequencer_upgrade.to_bytes sequencer_upgrade @@ -801,15 +862,9 @@ module State = struct ~value:payload evm_state in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) | Blueprint_applied event -> - blueprint_applied_event - ctxt - conn - evm_state - on_success - latest_finalized_level - event + blueprint_applied_event ctxt conn evm_state latest_finalized_level event | New_delayed_transaction delayed_transaction -> let* evm_state = on_new_delayed_transaction @@ -817,7 +872,7 @@ module State = struct ~delayed_transaction evm_state in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) | Flush_delayed_inbox flushed_blueprint -> let*! () = Evm_events_follower_events.flush_delayed_inbox @@ -825,7 +880,7 @@ module State = struct flushed_blueprint.level in let* evm_state = flush_delayed_inbox ctxt conn flushed_blueprint in - return (evm_state, on_success, latest_finalized_level) + return (evm_state, latest_finalized_level) and apply_evm_events ?finalized_level conn (ctxt : t) events = let open Lwt_result_syntax in @@ -860,49 +915,37 @@ module State = struct {level_received = rollup_block_level; level_expected})) in if needs_process then ( - let* context, evm_state, on_success = - let* on_success, evm_state, context, Qty latest_finalized_number = + let* context, evm_state = + let* evm_state, context, Qty latest_finalized_number = match events with | [] -> (* Avoid an uncessary {!replace_current_commit} if the list is empty. *) return - ( ignore, - ctxt.session.evm_state, + ( ctxt.session.evm_state, ctxt.session.context, ctxt.session.finalized_number ) | events -> - let* on_success, evm_state, latest_finalized_number = + let* evm_state, latest_finalized_number = List.fold_left_es - (fun ( on_success, - evm_state, - Ethereum_types.Qty finalized_number ) - event -> - let* evm_state, on_success, latest_finalized_level = + (fun (evm_state, Ethereum_types.Qty finalized_number) event -> + let* evm_state, latest_finalized_level = apply_evm_event_unsafe - on_success ctxt conn evm_state event finalized_number in - return - ( on_success, - evm_state, - Ethereum_types.Qty latest_finalized_level )) - (ignore, ctxt.session.evm_state, ctxt.session.finalized_number) + return (evm_state, Ethereum_types.Qty latest_finalized_level)) + (ctxt.session.evm_state, ctxt.session.finalized_number) events in let* context = replace_current_commit ctxt conn evm_state in - return (on_success, evm_state, context, latest_finalized_number) + return (evm_state, context, latest_finalized_number) in (* Process the new `latest_finalized_number`. *) - let on_success session = - session.finalized_number <- Qty latest_finalized_number ; - Metrics.set_confirmed_level ~level:latest_finalized_number ; - on_success session - in + ctxt.session.finalized_number <- Qty latest_finalized_number ; let* () = Option.iter_es (fun l1_level -> @@ -913,7 +956,7 @@ module State = struct ~finalized_l2_level:(Qty latest_finalized_number)) finalized_level in - return (context, evm_state, on_success) + return (context, evm_state) in let*! () = Option.iter_s @@ -922,10 +965,7 @@ module State = struct Evm_context_events.processed_l1_level l1_level) finalized_level in - on_success ctxt.session ; on_modified_head ctxt evm_state context ; - let*! head_info in - head_info := session_to_head_info ctxt.session ; return_unit) else return_unit @@ -962,7 +1002,7 @@ module State = struct (* Apply the blueprint. *) let timestamp = flushed_blueprint.Evm_events.Flushed_blueprint.timestamp in let* () = - apply_blueprint ~conn ctxt timestamp payload delayed_transactions + apply_blueprint ctxt conn timestamp payload delayed_transactions in return ctxt.session.evm_state @@ -1446,8 +1486,7 @@ module Handlers = struct in Lwt.wakeup execution_config_waker @@ (ctxt.data_dir, pvm_config ctxt) ; Lwt.wakeup init_status_waker status ; - let first_head = ref (session_to_head_info ctxt.session) in - Lwt.wakeup head_info_waker first_head ; + State.Transaction.initialize_head_info ctxt ; let* () = State.preload_known_kernels ctxt in return ctxt @@ -1460,14 +1499,16 @@ module Handlers = struct | Apply_evm_events {finalized_level; events} -> protect @@ fun () -> let ctxt = Worker.state self in - State.with_store_transaction ctxt @@ fun conn -> + State.Transaction.run ctxt @@ fun ctxt conn -> State.apply_evm_events ?finalized_level conn ctxt events | Apply_blueprint {events; timestamp; payload; delayed_transactions} -> protect @@ fun () -> let ctxt = Worker.state self in + State.Transaction.run ctxt @@ fun ctxt conn -> State.apply_blueprint ?events ctxt + conn timestamp payload delayed_transactions -- GitLab