diff --git a/etherlink/bin_node/lib_dev/blueprint_events.ml b/etherlink/bin_node/lib_dev/blueprint_events.ml index c00ff3c2c9dbe1a24bcd261981ac0358b4aa7db9..a3cc3013902999d8dff8fa0ea51ecea74d142585 100644 --- a/etherlink/bin_node/lib_dev/blueprint_events.ml +++ b/etherlink/bin_node/lib_dev/blueprint_events.ml @@ -44,6 +44,22 @@ let blueprint_injection = ~level:Info ("level", Data_encoding.n) +let blueprint_injection_on_inbox = + declare_1 + ~section + ~name:"blueprint_injection_on_inbox" + ~msg:"Injecting on the shared inbox a blueprint for level {level}" + ~level:Debug + ("level", Data_encoding.n) + +let blueprint_injection_on_DAL = + declare_1 + ~section + ~name:"blueprint_injection_on_DAL" + ~msg:"Injecting on the DAL a blueprint for level {level}" + ~level:Debug + ("level", Data_encoding.n) + let blueprint_injection_failure = declare_2 ~section @@ -107,6 +123,10 @@ let publisher_shutdown () = emit publisher_shutdown () let blueprint_injected level = emit blueprint_injection level +let blueprint_injected_on_inbox level = emit blueprint_injection_on_inbox level + +let blueprint_injected_on_DAL level = emit blueprint_injection_on_DAL level + let blueprint_injection_failed level trace = emit blueprint_injection_failure (level, trace) diff --git a/etherlink/bin_node/lib_dev/blueprint_events.mli b/etherlink/bin_node/lib_dev/blueprint_events.mli index cc76270dc7a0c32a42c9056fc86626d593ce0cc6..7b70c0e15fe2ecb100ece7a5756e6e22b7f788d1 100644 --- a/etherlink/bin_node/lib_dev/blueprint_events.mli +++ b/etherlink/bin_node/lib_dev/blueprint_events.mli @@ -25,6 +25,16 @@ val blueprint_applied : Z.t * Ethereum_types.block_hash -> unit Lwt.t [level] has been forwarded to a rollup node *) val blueprint_injected : Z.t -> unit Lwt.t +(** [blueprint_injected_on_inbox level] advertizes that a blueprint + for level [level] has been forwarded to a rollup node for + injection on the shared inbox *) +val blueprint_injected_on_inbox : Z.t -> unit Lwt.t + +(** [blueprint_injected_on_DAL level] advertizes that a blueprint + for level [level] has been forwarded to a rollup node for + injection on the DAL *) +val blueprint_injected_on_DAL : Z.t -> unit Lwt.t + (** [blueprint_injection_failed level trace] advertizes that a blueprint could not be injected for level [level]. *) val blueprint_injection_failed : Z.t -> tztrace -> unit Lwt.t diff --git a/etherlink/bin_node/lib_dev/blueprints_publisher.ml b/etherlink/bin_node/lib_dev/blueprints_publisher.ml index 8e4e60927b16bee410cc4e1d702fe1d7d8efb74f..b5d1b5edd6d8ab3c8a0c517491e2bdd94e6013ba 100644 --- a/etherlink/bin_node/lib_dev/blueprints_publisher.ml +++ b/etherlink/bin_node/lib_dev/blueprints_publisher.ml @@ -28,6 +28,7 @@ type state = { (** Do not try to catch-up if [cooldown] is not equal to 0 *) enable_dal : bool; dal_slots : int list option; + mutable dal_last_used : Z.t; } module Types = struct @@ -98,7 +99,7 @@ module Worker = struct let current = current_cooldown worker in if on_cooldown worker then set_cooldown worker (current - 1) else () - let publish self payload level = + let publish self payload level ~use_dal_if_enabled = let open Lwt_result_syntax in let rollup_node_endpoint = rollup_node_endpoint self in (* We do not check if we succeed or not: this will be done when new L2 @@ -107,7 +108,27 @@ module Worker = struct let*! res = (* We do not check if we succeed or not: this will be done when new L2 heads come from the rollup node. *) - Rollup_services.publish ~keep_alive:false ~rollup_node_endpoint payload + match (state self, payload) with + | ( { + enable_dal = true; + dal_slots = Some (slot_index :: _); + dal_last_used; + _; + }, + [`External payload] ) + when use_dal_if_enabled && dal_last_used < level -> + (state self).dal_last_used <- level ; + let*! () = Blueprint_events.blueprint_injected_on_DAL level in + Rollup_services.publish_on_dal + ~rollup_node_endpoint + ~slot_index + payload + | _ -> + let*! () = Blueprint_events.blueprint_injected_on_inbox level in + Rollup_services.publish + ~keep_alive:false + ~rollup_node_endpoint + payload in let*! () = match res with @@ -155,7 +176,7 @@ module Worker = struct let* () = List.iter_es (fun (Ethereum_types.Qty current, payload) -> - publish worker payload current) + publish worker payload current ~use_dal_if_enabled:false) blueprints in @@ -224,6 +245,7 @@ module Handlers = struct keep_alive; enable_dal = Option.is_some dal_slots; dal_slots; + dal_last_used = Z.zero; } let on_request : @@ -233,7 +255,7 @@ module Handlers = struct let open Lwt_result_syntax in match request with | Publish {level; payload} -> - let* () = Worker.publish self payload level in + let* () = Worker.publish self payload level ~use_dal_if_enabled:true in return_unit | New_rollup_node_block rollup_block_lvl -> ( let* () = diff --git a/etherlink/bin_node/lib_dev/rollup_services.ml b/etherlink/bin_node/lib_dev/rollup_services.ml index 0f87dfd7ead9019cad25b9b36fb95d762698e619..ae6403a59e68971e20b2cb90319944f0af66ec5e 100644 --- a/etherlink/bin_node/lib_dev/rollup_services.ml +++ b/etherlink/bin_node/lib_dev/rollup_services.ml @@ -153,6 +153,22 @@ let batcher_injection : (list string)) (open_root / "local" / "batcher" / "injection") +let dal_injection = + let input_encoding = + Data_encoding.( + obj2 + (req "slot_content" Data_encoding.Variable.string) + (req "slot_index" uint8)) + in + Tezos_rpc.Service.post_service + ~description:"Inject the given slot in the DAL queue" + ~query:Tezos_rpc.Query.empty + ~input: + Data_encoding.( + def "dal_slot" ~description:"Slot to inject" input_encoding) + ~output:Data_encoding.unit + (open_root / "local" / "dal" / "injection") + let simulation : ( [`POST], unit, @@ -269,6 +285,24 @@ let publish : in return_unit +let publish_on_dal : + rollup_node_endpoint:Uri.t -> + slot_index:int -> + string -> + unit tzresult Lwt.t = + fun ~rollup_node_endpoint ~slot_index inputs -> + let open Lwt_result_syntax in + let* _answer = + call_service + ~keep_alive:false + ~base:rollup_node_endpoint + dal_injection + () + () + (inputs, slot_index) + in + return_unit + let durable_state_subkeys : ( [`GET], unit, diff --git a/etherlink/kernel_evm/kernel/src/dal.rs b/etherlink/kernel_evm/kernel/src/dal.rs new file mode 100644 index 0000000000000000000000000000000000000000..4fc25b9d68f6d9c61293292c88cbdf1e1374a6bc --- /dev/null +++ b/etherlink/kernel_evm/kernel/src/dal.rs @@ -0,0 +1,113 @@ +use crate::configuration::DalConfiguration; +use crate::parsing::{ + Input::ModeSpecific, InputResult::Input, SequencerInput::SequencerBlueprint, + SequencerParsingContext, +}; +use crate::storage::read_l1_level; +use anyhow::Ok; +use rlp::{DecoderError, PayloadInfo}; +use tezos_evm_logging::{log, Level::*}; +use tezos_smart_rollup_host::dal_parameters::RollupDalParameters; +use tezos_smart_rollup_host::metadata::RAW_ROLLUP_ADDRESS_SIZE; + +use tezos_smart_rollup_host::runtime::Runtime; + +// Import all the pages of a DAL slot and concatenate them. +fn import_dal_slot( + host: &mut Host, + params: &RollupDalParameters, + published_level: i32, + slot_index: u8, +) -> Option> { + // Without this the rollup node hangs. + if published_level < 0 { + return None; + }; + let page_size = params.page_size as usize; + let slot_size = params.slot_size as usize; + let mut slot: Vec = vec![0u8; slot_size]; + let number_of_pages = (params.slot_size / params.page_size) as i16; + let mut page_start = 0usize; + for page_index in 0..number_of_pages { + let imported_page_len = host + .reveal_dal_page( + published_level, + slot_index, + page_index, + &mut slot[page_start..page_start + page_size], + ) + .unwrap_or(0); + if imported_page_len == page_size { + page_start += imported_page_len + } else { + return None; + } + } + Some(slot) +} + +// data is assumed to be one RLP object followed by some padding. +// this function returns the length of the RLP object, including its +// length prefix +fn rlp_length(data: &[u8]) -> Result { + let PayloadInfo { + header_len, + value_len, + } = PayloadInfo::from(data)?; + Result::Ok(header_len + value_len) +} + +pub fn fetch_and_parse_sequencer_blueprints_from_dal( + host: &mut Host, + smart_rollup_address: [u8; RAW_ROLLUP_ADDRESS_SIZE], + dal: DalConfiguration, + parsing_context: &mut SequencerParsingContext, +) -> anyhow::Result<()> { + let params = host.reveal_dal_parameters(); + let attestation_lag = params.attestation_lag as i32; + let level = read_l1_level(host).unwrap_or_default() as i32; + let published_level = level - attestation_lag - 1; + for slot_index in dal.slot_indices { + if let Some(slot) = import_dal_slot(host, ¶ms, published_level, slot_index) { + log!( + host, + Info, + "DAL slot at level {} and index {} successfully imported", + published_level, + slot_index + ); + + // DAL slots are padded with zeros to have a constant + // size, we need to remove this padding before parsing the + // slot as a blueprint chunk. + + // The expected format is: + + // 0 (1B) / rollup_address (RAW_ROLLUP_ADDRESS_SIZE B) / blueprint tag (1B) / blueprint chunk (variable) / padding + + // To remove the padding we need to measure the length of + // the RLP-encoded blueprint chunk which starts at + // position 2 + RAW_ROLLUP_ADDRESS_SIZE + if let Result::Ok(chunk_length) = + rlp_length(&slot[2 + RAW_ROLLUP_ADDRESS_SIZE..]) + { + // Padding removal + let slot = &slot[0..2 + RAW_ROLLUP_ADDRESS_SIZE + chunk_length]; + let res = crate::parsing::InputResult::parse_external( + slot, + &smart_rollup_address, + parsing_context, + ); + if let Input(ModeSpecific(SequencerBlueprint(chunk))) = res { + log!( + host, + Info, + "DAL slot successfully parsed as a blueprint chunk" + ); + crate::blueprint_storage::store_sequencer_blueprint(host, chunk)? + } + } + } + } + Ok(()) +} diff --git a/etherlink/kernel_evm/kernel/src/inbox.rs b/etherlink/kernel_evm/kernel/src/inbox.rs index 80f1d17b2e67ef3ad6cd3d0149d4abbc12b57762..226f1f1c0a9df3fa4ab0b22d944ae60f6be63e07 100644 --- a/etherlink/kernel_evm/kernel/src/inbox.rs +++ b/etherlink/kernel_evm/kernel/src/inbox.rs @@ -6,7 +6,8 @@ // SPDX-License-Identifier: MIT use crate::blueprint_storage::store_sequencer_blueprint; -use crate::configuration::{fetch_limits, TezosContracts}; +use crate::configuration::{fetch_limits, DalConfiguration, TezosContracts}; +use crate::dal::fetch_and_parse_sequencer_blueprints_from_dal; use crate::delayed_inbox::DelayedInbox; use crate::parsing::{ Input, InputResult, Parsable, ProxyInput, SequencerInput, SequencerParsingContext, @@ -620,6 +621,7 @@ pub enum StageOneStatus { Skipped, } +#[allow(clippy::too_many_arguments)] pub fn read_sequencer_inbox( host: &mut Host, smart_rollup_address: [u8; 20], @@ -628,6 +630,7 @@ pub fn read_sequencer_inbox( sequencer: PublicKey, delayed_inbox: &mut DelayedInbox, enable_fa_bridge: bool, + dal: Option, ) -> Result { // The mutable variable is used to retrieve the information of whether the // inbox was empty or not. As we consume all the inbox in one go, if the @@ -642,6 +645,14 @@ pub fn read_sequencer_inbox( .maximum_allowed_ticks .saturating_sub(TICKS_FOR_BLUEPRINT_INTERCEPT), }; + if let Some(dal_config) = dal { + fetch_and_parse_sequencer_blueprints_from_dal( + host, + smart_rollup_address, + dal_config, + &mut parsing_context, + )?; + }; loop { // Checks there will be enough ticks to handle at least another chunk of // full size. If it is not the case, asks for reboot. diff --git a/etherlink/kernel_evm/kernel/src/lib.rs b/etherlink/kernel_evm/kernel/src/lib.rs index 80fe70a16b471d5f7605f7ac1c22002f05d7e704..c700f33f8ac3618f7e8896dee99b0336c99e5d8c 100644 --- a/etherlink/kernel_evm/kernel/src/lib.rs +++ b/etherlink/kernel_evm/kernel/src/lib.rs @@ -45,6 +45,7 @@ mod block_in_progress; mod blueprint; mod blueprint_storage; mod configuration; +mod dal; mod delayed_inbox; mod error; mod event; diff --git a/etherlink/kernel_evm/kernel/src/parsing.rs b/etherlink/kernel_evm/kernel/src/parsing.rs index 6823ad4ab3ff0f471d1eb6b75cd9c2ab3c6d5e55..93f5249a2aa0fc23a980684316e239174fb07fcd 100644 --- a/etherlink/kernel_evm/kernel/src/parsing.rs +++ b/etherlink/kernel_evm/kernel/src/parsing.rs @@ -397,7 +397,7 @@ impl InputResult { /// // External message structure : // FRAMING_PROTOCOL_TARGETTED 21B / MESSAGE_TAG 1B / DATA - fn parse_external( + pub fn parse_external( input: &[u8], smart_rollup_address: &[u8], context: &mut Mode::Context, diff --git a/etherlink/kernel_evm/kernel/src/stage_one.rs b/etherlink/kernel_evm/kernel/src/stage_one.rs index 2664a2d8b5adc80c33862cd7977bd6fd99706d98..c3a49fa675d0e960f43586d3e9cb1622cc1516f8 100644 --- a/etherlink/kernel_evm/kernel/src/stage_one.rs +++ b/etherlink/kernel_evm/kernel/src/stage_one.rs @@ -105,11 +105,6 @@ fn fetch_sequencer_blueprints( dal: Option, enable_fa_bridge: bool, ) -> Result { - if let Some(_dal_config) = dal { - log!(host, Info, "Revealing DAL parameters"); - let params = host.reveal_dal_parameters(); - log!(host, Info, "DAL params: {:?}", params); - }; match read_sequencer_inbox( host, smart_rollup_address, @@ -118,6 +113,7 @@ fn fetch_sequencer_blueprints( sequencer, delayed_inbox, enable_fa_bridge, + dal, )? { StageOneStatus::Done => { // Check if there are timed-out transactions in the delayed inbox diff --git a/etherlink/tezt/lib/helpers.ml b/etherlink/tezt/lib/helpers.ml index d63fdc2c0ffca04bcc22f565b8ffb048a4af7949..d928493ffe11dc7a5350c5c4691410af0f609c67 100644 --- a/etherlink/tezt/lib/helpers.ml +++ b/etherlink/tezt/lib/helpers.ml @@ -195,7 +195,7 @@ let sequencer_upgrade ~sc_rollup_address ~sequencer_admin in Client.bake_for_and_wait ~keys:[] client -let bake_until ?__LOC__ ?(timeout_in_blocks = 5) ?(timeout = 30.) ~bake +let bake_until ?__LOC__ ?(timeout_in_blocks = 20) ?(timeout = 30.) ~bake ~result_f () = let res = ref None in let rec go counter_block = diff --git a/etherlink/tezt/tests/evm_sequencer.ml b/etherlink/tezt/tests/evm_sequencer.ml index 93cd7b940afc52285c5771cf1a5eca3b8211e200..c03f551e71e3f74169fc1cdeae86f56cf5ecac81 100644 --- a/etherlink/tezt/tests/evm_sequencer.ml +++ b/etherlink/tezt/tests/evm_sequencer.ml @@ -83,6 +83,7 @@ type sequencer_setup = { l1_contracts : l1_contracts; boot_sector : string; kernel : Uses.t; + enable_dal : bool; } let setup_l1_contracts ?(dictator = Constant.bootstrap2) client = @@ -362,6 +363,7 @@ let setup_sequencer ?sequencer_rpc_port ?sequencer_private_rpc_port sc_rollup_node; boot_sector = output; kernel; + enable_dal; } let send_transaction (transaction : unit -> 'a Lwt.t) sequencer : 'a Lwt.t = @@ -751,16 +753,18 @@ let test_publish_blueprints = ~tags:["evm"; "sequencer"; "data"] ~title:"Sequencer publishes the blueprints to L1" ~use_dal:ci_enabled_dal_registration - @@ fun {sequencer; proxy; client; sc_rollup_node; _} _protocol -> + @@ fun {sequencer; proxy; client; sc_rollup_node; enable_dal; _} _protocol -> let* _ = repeat 5 (fun () -> let*@ _ = produce_block sequencer in unit) in - let* () = Evm_node.wait_for_blueprint_injected ~timeout:5. sequencer 5 in + (* Wait more to avoid flakiness, in particular with DAL *) + let timeout = if enable_dal then 50. else 5. in + let* () = Evm_node.wait_for_blueprint_injected ~timeout sequencer 5 in - (* At this point, the evm node should called the batcher endpoint to publish + (* At this point, the evm node should call the batcher endpoint to publish all the blueprints. Stopping the node is then not a problem. *) let* () = bake_until_sync ~sc_rollup_node ~client ~sequencer ~proxy () in @@ -771,6 +775,81 @@ let test_publish_blueprints = let* () = Lwt_unix.sleep 2. in check_head_consistency ~left:sequencer ~right:proxy () +(* This test is similar to test_publish_blueprints but it also checks + that all 5 blueprints sent from the sequencer were published on the + DAL (and none on the inbox). *) +let test_publish_blueprints_on_dal = + register_all + ~time_between_blocks:Nothing + ~tags:["evm"; "sequencer"; "data"] + ~title:"Sequencer publishes the blueprints to the DAL" + (* We want this test in the CI so we put no extra tags when DAL + is active to avoid having the [ci_disabled] or [slow] tag. *) + ~use_dal:(Register_both {extra_tags_with = []; extra_tags_without = []}) + @@ fun {sequencer; proxy; client; sc_rollup_node; enable_dal; _} _protocol -> + let number_of_blueprints = 5 in + + let number_of_blueprints_sent_to_inbox = ref 0 in + let number_of_blueprints_sent_to_dal = ref 0 in + + let count_event event counter = + Evm_node.wait_for sequencer event (fun _level -> + incr counter ; + (* We return None here to keep the loop running *) + None) + in + + let inbox_counter_p = + count_event + "blueprint_injection_on_inbox.v0" + number_of_blueprints_sent_to_inbox + in + + let dal_counter_p = + count_event "blueprint_injection_on_DAL.v0" number_of_blueprints_sent_to_dal + in + + let* _ = + repeat number_of_blueprints (fun () -> + let*@ _ = produce_block sequencer in + unit) + in + + (* Wait more to avoid flakiness, in particular with DAL *) + let timeout = if enable_dal then 50. else 5. in + let* () = + Evm_node.wait_for_blueprint_injected ~timeout sequencer number_of_blueprints + in + + (* At this point, the evm node should call the batcher endpoint to publish + all the blueprints. Stopping the node is then not a problem. *) + let* () = + bake_until_sync ~__LOC__ ~sc_rollup_node ~client ~sequencer ~proxy () + in + + (* We have unfortunately noticed that the test can be flaky. Sometimes, + the following RPC is done before the proxy being initialised, even though + we wait for it. The source of flakiness is unknown but happens very rarely, + we put a small sleep to make the least flaky possible. *) + let* () = Lwt_unix.sleep 2. in + let* () = check_head_consistency ~left:sequencer ~right:proxy () in + let expected_nb_of_bp_on_dal, expected_nb_of_bp_on_inbox = + if enable_dal then (number_of_blueprints, 0) else (0, number_of_blueprints) + in + Check.(expected_nb_of_bp_on_dal = !number_of_blueprints_sent_to_dal) + ~__LOC__ + Check.int + ~error_msg: + "Wrong number of blueprints published on the DAL; Expected %L, got %R." ; + Check.(expected_nb_of_bp_on_inbox = !number_of_blueprints_sent_to_inbox) + ~__LOC__ + Check.int + ~error_msg: + "Wrong number of blueprints published on the inbox; Expected %L, got %R." ; + Lwt.cancel dal_counter_p ; + Lwt.cancel inbox_counter_p ; + unit + let test_sequencer_too_ahead = let max_blueprints_ahead = 5 in register_all @@ -2686,9 +2765,7 @@ let test_delayed_transfer_timeout = sc_rollup_node; sequencer; proxy; - observer = _; - boot_sector = _; - kernel = _; + _; } _protocol -> (* Kill the sequencer *) @@ -2846,9 +2923,7 @@ let test_delayed_transfer_timeout_fails_l1_levels = sc_rollup_node; sequencer; proxy; - observer = _; - boot_sector = _; - kernel = _; + _; } _protocol -> (* Kill the sequencer *) @@ -3106,9 +3181,7 @@ let test_delayed_inbox_flushing = sc_rollup_node; sequencer; proxy; - observer = _; - boot_sector = _; - kernel = _; + _; } _protocol -> (* Kill the sequencer *) @@ -3246,7 +3319,7 @@ let test_timestamp_from_the_future = ~tags:["evm"; "sequencer"; "block"; "timestamp"] ~title:"Timestamp from the future are refused" ~use_dal:ci_enabled_dal_registration - @@ fun {sequencer; proxy; sc_rollup_node; client; _} _protocol -> + @@ fun {sequencer; proxy; sc_rollup_node; client; enable_dal; _} _protocol -> (* In this test the time between blocks is 1 second. *) (* Producing a block 4:50 minutes after the L1 timestamp will be accepted. We @@ -3268,8 +3341,11 @@ let test_timestamp_from_the_future = Tezos_base.Time.Protocol.(add current_l1_timestamp 330L |> to_notation) in let*@ (_ : int) = produce_block ~timestamp:refused_timestamp sequencer in + (* We wait more in case of DAL because 5 blocks are not enough to + send the blueprint through the DAL. *) + let number_of_blocks_to_wait = if enable_dal then 20 else 5 in let* _ = - repeat 5 (fun () -> + repeat number_of_blocks_to_wait (fun () -> let* _l1_lvl = next_rollup_node_level ~sc_rollup_node ~client in unit) in @@ -4826,6 +4902,7 @@ let () = test_remove_sequencer protocols ; test_persistent_state protocols ; test_publish_blueprints protocols ; + test_publish_blueprints_on_dal protocols ; test_sequencer_too_ahead protocols ; test_resilient_to_rollup_node_disconnect protocols ; test_can_fetch_smart_rollup_address protocols ;