diff --git a/.gitignore b/.gitignore index c888daac6e421b86e234c4919e9bcb884cca6d3e..ccf3e2779e7aa8707627047876d69826dcf802a7 100644 --- a/.gitignore +++ b/.gitignore @@ -96,6 +96,7 @@ target evm_kernel.wasm evm_installer.wasm -sequenced_kernel.wasm +sequenced_empty_kernel.wasm +sequenced_concat_kernel.wasm smart-rollup-installer _evm_installer_preimages/ diff --git a/kernels.mk b/kernels.mk index b87bad1fdb69b2b24205b2c1f9a9a6a305309660..bbbb151749de984c4aff8b63c31418ea763b3c2e 100644 --- a/kernels.mk +++ b/kernels.mk @@ -1,4 +1,4 @@ -KERNELS = evm_kernel.wasm +KERNELS = evm_kernel.wasm sequenced_empty_kernel.wasm sequenced_concat_kernel.wasm SDK_DIR=src/kernel_sdk EVM_DIR=src/kernel_evm SEQUENCER_DIR=src/kernel_sequencer @@ -31,13 +31,18 @@ endif evm_installer_dev.wasm:: @${MAKE} -f kernels.mk EVM_CONFIG=src/kernel_evm/config/dev.yaml evm_installer.wasm -sequenced_kernel.wasm: +sequenced_empty_kernel.wasm: @make -C src/kernel_sequencer build - @cp src/kernel_sequencer/target/wasm32-unknown-unknown/release/examples/sequenced_kernel.wasm $@ + @cp src/kernel_sequencer/target/wasm32-unknown-unknown/release/examples/sequenced_empty_kernel.wasm $@ + @wasm-strip $@ + +sequenced_concat_kernel.wasm: + @make -C src/kernel_sequencer build + @cp src/kernel_sequencer/target/wasm32-unknown-unknown/release/examples/sequenced_concat_kernel.wasm $@ @wasm-strip $@ .PHONY: build -build: ${KERNELS} kernel_sdk sequenced_kernel.wasm +build: ${KERNELS} kernel_sdk .PHONY: build-dev-deps build-dev-deps: build-deps diff --git a/manifest/main.ml b/manifest/main.ml index 8bf90160c657c3d4a70e49a8dbb008712e9292ec..8d969fe89bc00c8b9d8d22213e81654a28d0c943 100644 --- a/manifest/main.ml +++ b/manifest/main.ml @@ -7518,6 +7518,8 @@ let octez_scoru_sequencer = octez_workers |> open_; octez_smart_rollup_node_lib |> open_; octez_smart_rollup_lib |> open_; + octez_client_base |> open_; + octez_layer2_store |> open_; octez_rpc; octez_rpc_http; octez_rpc_http_server; diff --git a/opam/octez-smart-rollup-sequencer.opam b/opam/octez-smart-rollup-sequencer.opam index 974b5b44db3d9f1949a5ada3ac9b332df499fb27..7fba9c628f064b2f75719563180bcf6a9c2f5c80 100644 --- a/opam/octez-smart-rollup-sequencer.opam +++ b/opam/octez-smart-rollup-sequencer.opam @@ -17,6 +17,8 @@ depends: [ "tezos-workers" "octez-smart-rollup-node-lib" "octez-smart-rollup" + "tezos-client-base" + "tezos-layer2-store" "tezos-rpc" "tezos-rpc-http" "tezos-rpc-http-server" diff --git a/src/bin_sequencer_node/main_sequencer_node.ml b/src/bin_sequencer_node/main_sequencer_node.ml index a054fe89dd49d34678526be0de6f0ea169012966..27d4cfcd9882e506138c14cf4195ddb040208af1 100644 --- a/src/bin_sequencer_node/main_sequencer_node.ml +++ b/src/bin_sequencer_node/main_sequencer_node.ml @@ -44,9 +44,9 @@ let sc_operator_pkh next = ~desc:"Public key hash, or alias, of a sequencer node operator." ( Tezos_clic.parameter @@ fun cctxt s -> let parse_pkh s = - let from_alias s = Client_keys.Public_key_hash.find cctxt s in + let from_alias s = Client_keys.V0.Public_key_hash.find cctxt s in let from_key s = - match Signature.Public_key_hash.of_b58check_opt s with + match Signature.V0.Public_key_hash.of_b58check_opt s with | None -> failwith "Could not read public key hash for sequencer operator" | Some pkh -> return pkh @@ -58,7 +58,7 @@ let sc_operator_pkh next = match String.split ~limit:1 ':' s with | [_] -> let+ pkh = parse_pkh s in - `Default pkh + `Default (Signature.Of_V0.public_key_hash pkh) | [_purpose; _operator_s] -> failwith "Purposes are not supported for a sequencer operator" | _ -> diff --git a/src/kernel_sdk/CHANGES.md b/src/kernel_sdk/CHANGES.md index dffc7cb9d985d8c6a657e279407647a937ce867a..d12a5b4ae52f6e66c020558da02cb4281a0a3831 100644 --- a/src/kernel_sdk/CHANGES.md +++ b/src/kernel_sdk/CHANGES.md @@ -16,6 +16,11 @@ ### Installer client/kernel - Add option `--display-root-hash` to display the root hash of the kernel that will be installed. +## Version 0.3.0 + +### SDK + +- Implements `PublicKeySignatureVerifier` for `PublicKey` ## Version 0.2.0 diff --git a/src/kernel_sdk/encoding/src/public_key.rs b/src/kernel_sdk/encoding/src/public_key.rs index 16e06156a743583f4c7109f552b2034e45ec1a31..3e7d90268fc880fbc0740a6df0fd963bb6c0eeb1 100644 --- a/src/kernel_sdk/encoding/src/public_key.rs +++ b/src/kernel_sdk/encoding/src/public_key.rs @@ -5,7 +5,10 @@ //! Public Key of Layer1. use std::fmt::Display; -use tezos_crypto_rs::hash::{PublicKeyEd25519, PublicKeyP256, PublicKeySecp256k1}; +use tezos_crypto_rs::hash::{ + PublicKeyEd25519, PublicKeyP256, PublicKeySecp256k1, Signature, +}; +use tezos_crypto_rs::{CryptoError, PublicKeySignatureVerifier}; use tezos_data_encoding::enc::BinWriter; use tezos_data_encoding::encoding::HasEncoding; use tezos_data_encoding::nom::NomReader; @@ -80,6 +83,23 @@ impl TryFrom<&str> for PublicKey { } } +impl PublicKeySignatureVerifier for PublicKey { + type Signature = Signature; + type Error = CryptoError; + + fn verify_signature( + &self, + signature: &Self::Signature, + msg: &[u8], + ) -> Result { + match self { + PublicKey::Ed25519(ed25519) => ed25519.verify_signature(signature, msg), + PublicKey::Secp256k1(secp256k1) => secp256k1.verify_signature(signature, msg), + PublicKey::P256(p256) => p256.verify_signature(signature, msg), + } + } +} + #[cfg(test)] mod test { use super::*; @@ -182,4 +202,108 @@ mod test { assert_eq!(2_u8, bin[0]); assert_eq!(public_key, deserde_pk); } + + #[test] + fn tz1_signature_signature_verification_succeeds() { + let tz1 = PublicKey::from_b58check( + "edpkvWR5truf7AMF3PZVCXx7ieQLCW4MpNDzM3VwPfmFWVbBZwswBw", + ) + .expect("public key decoding should work"); + let sig: Signature = Signature::from_base58_check( + "sigdGBG68q2vskMuac4AzyNb1xCJTfuU8MiMbQtmZLUCYydYrtTd5Lessn1EFLTDJzjXoYxRasZxXbx6tHnirbEJtikcMHt3" + ).expect("signature decoding should work"); + let msg = hex::decode( + "bcbb7b77cb0712e4cd02160308cfd53e8dde8a7980c4ff28b62deb12304913c2", + ) + .expect("payload decoding should work"); + + let result = tz1 + .verify_signature(&sig, &msg) + .expect("signature should be correct"); + assert!(result); + } + + #[test] + fn tz1_signature_signature_verification_fails() { + let tz1 = PublicKey::from_b58check( + "edpkuDMUm7Y53wp4gxeLBXuiAhXZrLn8XB1R83ksvvesH8Lp8bmCfK", + ) + .expect("public key decoding should work"); + let sig = Signature::from_base58_check( + "sigdGBG68q2vskMuac4AzyNb1xCJTfuU8MiMbQtmZLUCYydYrtTd5Lessn1EFLTDJzjXoYxRasZxXbx6tHnirbEJtikcMHt3" + ).expect("signature decoding should work"); + let msg = hex::decode( + "bcbb7b77cb0712e4cd02160308cfd53e8dde8a7980c4ff28b62deb12304913c2", + ) + .expect("payload decoding should work"); + + let result = tz1.verify_signature(&sig, &msg); + assert!(result.is_err()); + } + + #[test] + fn tz2_signature_signature_verification_succeeds() { + let tz2 = PublicKey::from_b58check( + "sppk7cwkTzCPptCSxSTvGNg4uqVcuTbyWooLnJp4yxJNH5DReUGxYvs", + ) + .expect("public key decoding should work"); + let sig = Signature::from_base58_check("sigrJ2jqanLupARzKGvzWgL1Lv6NGUqDovHKQg9MX4PtNtHXgcvG6131MRVzujJEXfvgbuRtfdGbXTFaYJJjuUVLNNZTf5q1").expect("signature decoding should work"); + let msg = hex::decode( + "5538e2cc90c9b053a12e2d2f3a985aff1809eac59501db4d644e4bb381b06b4b", + ) + .expect("payload decoding should work"); + + let result = tz2.verify_signature(&sig, &msg).unwrap(); + assert!(result); + } + + #[test] + fn tz2_signature_signature_verification_fails() { + let tz2 = "sppk7Zik17H7AxECMggqD1FyXUQdrGRFtz9X7aR8W2BhaJoWwSnPEGA"; + let tz2 = PublicKey::from_b58check(tz2).expect("parsing should world"); + let sig = Signature::from_base58_check("sigrJ2jqanLupARzKGvzWgL1Lv6NGUqDovHKQg9MX4PtNtHXgcvG6131MRVzujJEXfvgbuRtfdGbXTFaYJJjuUVLNNZTf5q1").expect("signature decoding should work"); + let msg = hex::decode( + "5538e2cc90c9b053a12e2d2f3a985aff1809eac59501db4d644e4bb381b06b4b", + ) + .expect("payload decoding should work"); + + let result = tz2.verify_signature(&sig, &msg).unwrap(); + assert!(!result); + } + + #[test] + fn tz3_signature_signature_verification_succeeds() { + let tz3 = PublicKey::from_b58check( + "p2pk67Cwb5Ke6oSmqeUbJxURXMe3coVnH9tqPiB2xD84CYhHbBKs4oM", + ) + .expect("decoding public key should work"); + let sig = Signature::from_base58_check( + "sigNCaj9CnmD94eZH9C7aPPqBbVCJF72fYmCFAXqEbWfqE633WNFWYQJFnDUFgRUQXR8fQ5tKSfJeTe6UAi75eTzzQf7AEc1" + ).expect("signature decoding should work"); + let msg = hex::decode( + "5538e2cc90c9b053a12e2d2f3a985aff1809eac59501db4d644e4bb381b06b4b", + ) + .expect("payload decoding should work"); + + let result = tz3.verify_signature(&sig, &msg).unwrap(); + assert!(result); + } + + #[test] + fn tz3_signature_signature_verification_fails() { + let tz3 = PublicKey::from_b58check( + "p2pk67VpBjWwoPULwXCpayec6rFxaAKv8VjJ8cVMHmLDCYARu31zx5Z", + ) + .expect("decoding public key should work"); + let sig = Signature::from_base58_check( + "sigNCaj9CnmD94eZH9C7aPPqBbVCJF72fYmCFAXqEbWfqE633WNFWYQJFnDUFgRUQXR8fQ5tKSfJeTe6UAi75eTzzQf7AEc1" + ).expect("signature decoding should work"); + let msg = hex::decode( + "5538e2cc90c9b053a12e2d2f3a985aff1809eac59501db4d644e4bb381b06b4b", + ) + .expect("payload decoding should work"); + + let result = tz3.verify_signature(&sig, &msg).unwrap(); + assert!(!result); + } } diff --git a/src/kernel_sequencer/Cargo.lock b/src/kernel_sequencer/Cargo.lock index ec9b9d95b25b33bd6587ae5b7f5187a9d9049a50..001360d5afabf5a997c0adbdb1c3640aa161d780 100644 --- a/src/kernel_sequencer/Cargo.lock +++ b/src/kernel_sequencer/Cargo.lock @@ -346,8 +346,10 @@ checksum = "453ad9f582a441959e5f0d088b02ce04cfe8d51a8eaf077f12ac6d3e94164ca6" name = "kernel-sequencer" version = "0.1.0" dependencies = [ + "hex", "nom", "tezos-smart-rollup-core", + "tezos-smart-rollup-debug", "tezos-smart-rollup-encoding", "tezos-smart-rollup-host", "tezos-smart-rollup-mock", @@ -766,6 +768,14 @@ checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" name = "tezos-smart-rollup-core" version = "0.2.0" +[[package]] +name = "tezos-smart-rollup-debug" +version = "0.2.0" +dependencies = [ + "tezos-smart-rollup-core", + "tezos-smart-rollup-host", +] + [[package]] name = "tezos-smart-rollup-encoding" version = "0.2.0" diff --git a/src/kernel_sequencer/Cargo.toml b/src/kernel_sequencer/Cargo.toml index 6a9a8784a239f0ca32fb1d8d436aaa98c46607b5..4ab2b63b298beaef563b49ea62dc4cd358b581c1 100644 --- a/src/kernel_sequencer/Cargo.toml +++ b/src/kernel_sequencer/Cargo.toml @@ -31,7 +31,7 @@ version = "0.5" [dependencies.tezos_data_encoding_derive] version = "0.5" -[dependencies.nom] +[dependencies.nom] version = "7.1" [dependencies.tezos_crypto_rs] @@ -42,13 +42,25 @@ default-features = false path = "../kernel_sdk/encoding" version = "0.2.0" +[dependencies.tezos-smart-rollup-debug] +path = "../kernel_sdk/debug" +version = "0.2.0" + +[dependencies.hex] +version = "0.4" + [dev-dependencies.tezos_crypto_rs] version = "0.5.0" default-features = false + [features] default = [] [[example]] -name = "sequenced_kernel" +name = "sequenced_empty_kernel" +crate-type = ["cdylib", "rlib"] + +[[example]] +name = "sequenced_concat_kernel" crate-type = ["cdylib", "rlib"] diff --git a/src/kernel_sequencer/Makefile b/src/kernel_sequencer/Makefile index fddf296387134f8180506da944e921be5d0800ad..cdc3a788d9c2d17dd7791b1ad23857e4ddf4898a 100644 --- a/src/kernel_sequencer/Makefile +++ b/src/kernel_sequencer/Makefile @@ -8,7 +8,8 @@ all: build check test .PHONY: build build: - @cargo build --release --target wasm32-unknown-unknown --example sequenced_kernel + @cargo build --release --target wasm32-unknown-unknown --example sequenced_empty_kernel + @cargo build --release --target wasm32-unknown-unknown --example sequenced_concat_kernel .PHONY: check check: diff --git a/src/kernel_sequencer/examples/sequenced_concat_kernel.rs b/src/kernel_sequencer/examples/sequenced_concat_kernel.rs new file mode 100644 index 0000000000000000000000000000000000000000..ca5373d4db18d1747077262b0e31fcaf8e4be33b --- /dev/null +++ b/src/kernel_sequencer/examples/sequenced_concat_kernel.rs @@ -0,0 +1,88 @@ +// SPDX-FileCopyrightText: 2023 TriliTech +// +// SPDX-License-Identifier: MIT + +use kernel_sequencer::{sequencer_kernel_entry, Framed}; +use tezos_data_encoding::nom::{NomReader, NomResult}; +use tezos_smart_rollup_debug::debug_msg; +use tezos_smart_rollup_encoding::{ + inbox::{InboxMessage, InternalInboxMessage}, + michelson::MichelsonUnit, +}; +use tezos_smart_rollup_host::{ + path::RefPath, + runtime::{Runtime, RuntimeError}, +}; + +const CONCAT_PATH: RefPath = RefPath::assert_from("/concat".as_bytes()); + +struct RawBytes(Vec); + +impl NomReader for RawBytes { + fn nom_read(input: &[u8]) -> NomResult { + Ok((&[], RawBytes(Vec::from(input)))) + } +} + +fn concat_msg(host: &mut impl Runtime, bytes: &[u8]) -> Result<(), RuntimeError> { + debug_msg!( + host, + "store_write WRITING TO {}, VALUE: {:?}", + &CONCAT_PATH, + String::from_utf8(Vec::from(bytes)) + ); + let offset = match host.store_read_all(&CONCAT_PATH) { + Err(_) => 0, + Ok(result) => result.len(), + }; + + host.store_write(&CONCAT_PATH, bytes, offset)?; + host.store_write(&CONCAT_PATH, "; ".as_bytes(), bytes.len() + offset) +} + +pub fn kernel_loop(host: &mut Host) { + while let Ok(Some(message)) = host.read_input() { + debug_msg!( + host, + "Processing MessageData {} at level {} with payload {:?}", + message.id, + message.level, + message.as_ref() + ); + + let result = match InboxMessage::::parse(message.as_ref()) { + Err(_e) => Ok(debug_msg!(host, "Parsing error")), + Ok((_, InboxMessage::External(s))) => match Framed::::nom_read(s) { + Err(_err) => Ok(debug_msg!(host, "Couldn't parse Framing headers")), + Ok((_, raw_bytes)) => concat_msg(host, &raw_bytes.payload.0), + }, + Ok((_, InboxMessage::Internal(InternalInboxMessage::StartOfLevel))) => { + concat_msg(host, format!("[SoL {}", message.level).as_bytes()) + } + Ok((_, InboxMessage::Internal(InternalInboxMessage::InfoPerLevel(_)))) => { + concat_msg(host, format!("IpL {}", message.level).as_bytes()) + } + Ok((_, InboxMessage::Internal(InternalInboxMessage::EndOfLevel))) => { + concat_msg(host, format!("EoL {}]", message.level).as_bytes()) + } + Ok((_, _)) => Ok(debug_msg!(host, "Ignore Transfer message")), + }; + + if result.is_err() { + debug_msg!( + host, + "Failed to process message with error {}", + result.unwrap_err() + ); + } + + if let Err(e) = host.mark_for_reboot() { + debug_msg!(host, "Could not mark host for reboot: {}", e); + } + } +} + +sequencer_kernel_entry!( + kernel_loop, + kernel_sequencer::FilterBehavior::OnlyThisRollup +); diff --git a/src/kernel_sequencer/examples/sequenced_empty_kernel.rs b/src/kernel_sequencer/examples/sequenced_empty_kernel.rs new file mode 100644 index 0000000000000000000000000000000000000000..4bb5c05a7d011e79267b37bda60c2ab896596957 --- /dev/null +++ b/src/kernel_sequencer/examples/sequenced_empty_kernel.rs @@ -0,0 +1,27 @@ +// SPDX-FileCopyrightText: 2023 TriliTech +// +// SPDX-License-Identifier: MIT + +use kernel_sequencer::sequencer_kernel_entry; +use tezos_smart_rollup_debug::debug_msg; +use tezos_smart_rollup_host::runtime::Runtime; + +pub fn kernel_loop(host: &mut Host) { + while let Ok(Some(message)) = host.read_input() { + debug_msg!( + host, + "Processing MessageData {} at level {}", + message.id, + message.level + ); + + if let Err(e) = host.mark_for_reboot() { + debug_msg!(host, "Could not mark host for reboot: {}", e); + } + } +} + +sequencer_kernel_entry!( + kernel_loop, + kernel_sequencer::FilterBehavior::OnlyThisRollup +); diff --git a/src/kernel_sequencer/examples/sequenced_kernel.rs b/src/kernel_sequencer/examples/sequenced_kernel.rs deleted file mode 100644 index d7acb38371cb660fb612ef13fe83b1020ff38a72..0000000000000000000000000000000000000000 --- a/src/kernel_sequencer/examples/sequenced_kernel.rs +++ /dev/null @@ -1,13 +0,0 @@ -// SPDX-FileCopyrightText: 2023 TriliTech -// -// SPDX-License-Identifier: MIT - -use kernel_sequencer::sequencer_kernel_entry; -use tezos_smart_rollup_host::runtime::Runtime; - -pub fn kernel_loop(_host: &mut Host) {} - -sequencer_kernel_entry!( - kernel_loop, - kernel_sequencer::FilterBehavior::OnlyThisRollup -); diff --git a/src/kernel_sequencer/src/delayed_inbox.rs b/src/kernel_sequencer/src/delayed_inbox.rs index c545cda2befdaeb026c88fc7da5ef056718df0bc..efcf8c4d457fdbc9e1af233f0186628f105459fb 100644 --- a/src/kernel_sequencer/src/delayed_inbox.rs +++ b/src/kernel_sequencer/src/delayed_inbox.rs @@ -3,7 +3,10 @@ // // SPDX-License-Identifier: MIT +use tezos_crypto_rs::PublicKeySignatureVerifier; +use tezos_data_encoding::enc::BinWriter; use tezos_data_encoding::nom::NomReader; +use tezos_smart_rollup_debug::debug_msg; use tezos_smart_rollup_encoding::smart_rollup::SmartRollupAddress; use tezos_smart_rollup_host::{ input::Message, @@ -13,9 +16,28 @@ use tezos_smart_rollup_host::{ use crate::{ message::{Framed, KernelMessage, Sequence, SequencerMsg, SetSequencer}, + queue::Queue, routing::FilterBehavior, + state::{update_state, State}, + storage::read_state, }; +/// Message added to the delayed inbox +#[derive(BinWriter, NomReader)] +pub struct UserMessage { + pub(crate) timeout_level: u32, + pub(crate) payload: Vec, +} + +/// Message saved in the pending inbox +#[derive(Debug, BinWriter, NomReader)] +pub struct PendingUserMessage { + level: u32, + id: u32, + #[encoding(dynamic, list)] + payload: Vec, +} + /// Return a message from the inbox /// /// This function drives the delayed inbox: @@ -25,6 +47,10 @@ use crate::{ pub fn read_input( host: &mut Host, filter_behavior: FilterBehavior, + timeout_window: u32, + delayed_inbox_queue: &mut Queue, + pending_inbox_queue: &mut Queue, + pending_inbox_index: &mut u32, ) -> Result, RuntimeError> { let RollupMetadata { raw_rollup_address, .. @@ -32,9 +58,16 @@ pub fn read_input( loop { let msg = host.read_input()?; match msg { - None => return Ok(None), // No more messages to be processed Some(msg) => { let payload = msg.as_ref(); + // Verify the state of the delayed inbox on SoL + if let [0x00, 0x00, ..] = payload { + update_state(host, delayed_inbox_queue, msg.level)?; + } + + // The state can change at each iteration + let state = read_state(host)?; + let message = KernelMessage::nom_read(payload); match message { Err(_) => {} @@ -42,7 +75,19 @@ pub fn read_input( KernelMessage::Sequencer(Framed { destination, payload: SequencerMsg::Sequence(sequence), - }) => handle_sequence_message(sequence, destination, &raw_rollup_address), + }) => { + let _ = handle_sequence_message( + host, + sequence, + destination, + delayed_inbox_queue, + pending_inbox_queue, + pending_inbox_index, + &raw_rollup_address, + state, + msg.level, + ); + } KernelMessage::Sequencer(Framed { destination, payload: SequencerMsg::SetSequencer(set_sequence), @@ -52,24 +97,117 @@ pub fn read_input( &raw_rollup_address, ), KernelMessage::DelayedMessage(user_message) => { - handle_message(user_message, filter_behavior, &raw_rollup_address) + let _ = handle_message( + host, + delayed_inbox_queue, + timeout_window, + user_message, + msg.level, + filter_behavior, + &raw_rollup_address, + ); } }, } } + None => return handle_pending_inbox(host, pending_inbox_queue), } } } /// Handle Sequence message -fn handle_sequence_message( - _sequence: Sequence, +#[allow(clippy::too_many_arguments)] +fn handle_sequence_message( + host: &mut H, + sequence: Sequence, destination: SmartRollupAddress, + delayed_inbox_queue: &mut Queue, + pending_inbox_queue: &mut Queue, + pending_inbox_index: &mut u32, rollup_address: &[u8; RAW_ROLLUP_ADDRESS_SIZE], -) { - if destination.hash().as_ref() == rollup_address { - // process the sequence + state: State, + level: u32, +) -> Result<(), RuntimeError> { + let State::Sequenced(sequencer_address) = state else {return Ok(())}; + + // Verify if the destination is for this rollup + let true = destination.hash().as_ref() == rollup_address else {return Ok(())}; + + // Get the hash of the message + let Ok(hash) = sequence.hash(destination) else {return Ok(());}; + + // Verify if the signature is correct + // Verifying the signature also verify if the sequence comes from the right sequencer + let Ok(true) = sequencer_address.verify_signature(sequence.signature(), hash.as_ref()) else {return Ok(());}; + + // Add the message in the pending-outbox + // The pending inbox will be empty at the end of the shared-inbox (when None is returned by read_input) + + debug_msg!( + host, + "Received a sequence message {:?} targeting our rollup", + &sequence + ); + + let Sequence { + delayed_messages_prefix, + delayed_messages_suffix, + messages, + .. + } = sequence; + + // First pop elements from the delayed inbox indicated by the prefix + for _ in 0..delayed_messages_prefix { + // pop the head of the delayed inbox + let delayed_user_msg: Option = delayed_inbox_queue.pop(host)?; + // break the loop if the delayed inbox is empty + let Some(delayed_user_msg) = delayed_user_msg else {break;}; + // add the payload to the pending inbox + let UserMessage { payload, .. } = delayed_user_msg; + pending_inbox_queue.add( + host, + &PendingUserMessage { + level, + id: *pending_inbox_index, + payload, + }, + )?; + *pending_inbox_index += 1; } + + // Then add messages to the pending_inbox_queue + for bytes in messages { + pending_inbox_queue.add( + host, + &PendingUserMessage { + level, + id: *pending_inbox_index, + payload: bytes.inner, + }, + )?; + *pending_inbox_index += 1; + } + + // Finally, pop elements from the delayed inbox indicated by the suffix + for _ in 0..delayed_messages_suffix { + // pop the head of the delayed inbox + let delayed_user_msg: Option = delayed_inbox_queue.pop(host)?; + // break the loop if the delayed inbox is empty + let Some(delayed_user_msg) = delayed_user_msg else {break;}; + // add the payload to the pending inbox + let UserMessage { payload, .. } = delayed_user_msg; + pending_inbox_queue.add( + host, + &PendingUserMessage { + level, + id: *pending_inbox_index, + payload, + }, + )?; + *pending_inbox_index += 1; + } + + Ok(()) } fn handle_set_sequencer_message( @@ -83,13 +221,185 @@ fn handle_set_sequencer_message( } /// Handle messages -fn handle_message( +fn handle_message( + host: &mut H, + queue: &mut Queue, + timeout_window: u32, user_message: Vec, + level: u32, filter_behavior: FilterBehavior, rollup_address: &[u8; RAW_ROLLUP_ADDRESS_SIZE], -) { +) -> Result<(), RuntimeError> { // Check if the message should be included in the delayed inbox if filter_behavior.predicate(user_message.as_ref(), rollup_address) { + debug_msg!( + host, + "Received user message {:?} targeting our rollup, hence, will be added to the delayed inbox", + user_message + ); + // add the message to the delayed inbox + let user_message = UserMessage { + timeout_level: level + timeout_window, + payload: user_message, + }; + + queue.add(host, &user_message)?; + } + + Ok(()) +} + +/// Empty the pending inbox and returns the message +fn handle_pending_inbox( + host: &mut H, + pending_inbox_queue: &mut Queue, +) -> Result, RuntimeError> { + let pending_message = pending_inbox_queue.pop(host)?; + let Some(PendingUserMessage {id, level, payload}) = pending_message else {return Ok(None)}; + + let msg = Message::new(level, id, payload); + Ok(Some(msg)) +} + +#[cfg(test)] +mod tests { + use tezos_crypto_rs::hash::SecretKeyEd25519; + use tezos_crypto_rs::{blake2b, hash::SeedEd25519}; + use tezos_data_encoding::enc::BinWriter; + use tezos_data_encoding::{enc, nom::NomReader}; + use tezos_smart_rollup_encoding::public_key::PublicKey; + use tezos_smart_rollup_encoding::smart_rollup::SmartRollupAddress; + use tezos_smart_rollup_host::metadata::RollupMetadata; + use tezos_smart_rollup_host::runtime::Runtime; + use tezos_smart_rollup_mock::MockHost; + + use crate::message::SequencerMsg; + use crate::state::State; + use crate::storage::write_state; + use crate::{ + message::{Bytes, Framed, Sequence}, + sequencer_runtime::SequencerRuntime, + }; + + #[derive(BinWriter)] + struct Msg { + inner: u32, + } + + impl Msg { + fn new(inner: u32) -> Self { + Self { inner } + } + } + + fn prepare() -> (MockHost, SecretKeyEd25519) { + // create a mock host + let mut mock_host = MockHost::default(); + // generate a secret and public key + let seed = SeedEd25519::from_base58_check( + "edsk31vznjHSSpGExDMHYASz45VZqXN4DPxvsa4hAyY8dHM28cZzp6", + ) + .unwrap(); + let (pk, sk) = seed.keypair().unwrap(); + let pk = PublicKey::Ed25519(pk); + // set the mode of the kernel to Sequenced + write_state(&mut mock_host, State::Sequenced(pk)).unwrap(); + (mock_host, sk) + } + + fn make_sequence( + sk: SecretKeyEd25519, + rollup_address: [u8; 20], + delayed_messages_prefix: u32, + delayed_messages_suffix: u32, + ) -> Framed { + let nonce = 0; + + let mut bytes = Vec::default(); + // add the rollup address + bytes.append(&mut rollup_address.to_vec()); + enc::u32(&nonce, &mut bytes).unwrap(); + enc::u32(&delayed_messages_suffix, &mut bytes).unwrap(); + enc::u32(&delayed_messages_prefix, &mut bytes).unwrap(); + enc::list(Bytes::bin_write)(Vec::default(), &mut bytes) + .map_err(|_| ()) + .unwrap(); + let hash = blake2b::digest_256(&bytes).unwrap(); + + let signature = sk.sign(hash).unwrap(); + + Framed { + destination: SmartRollupAddress::nom_read(&rollup_address).unwrap().1, + payload: SequencerMsg::Sequence(Sequence { + nonce, + delayed_messages_prefix, + delayed_messages_suffix, + messages: Vec::default(), + signature, + }), + } + } + + /// check if the given message is equal to the given byte representation + fn assert_external_eq(message: M, payload: &[u8]) { + let mut bytes = Vec::default(); + message.bin_write(&mut bytes).unwrap(); + + match payload { + [0x01, remaining @ ..] => assert_eq!(bytes, remaining), + _ => panic!(), + } + } + + #[test] + fn test_add_message() { + let (mut mock_host, _) = prepare(); + + mock_host.add_external(Msg::new(0x01)); + + let mut runtime = SequencerRuntime::new(mock_host, crate::FilterBehavior::AllowAll, 1); + let msg = runtime.read_input().unwrap(); + + assert!(msg.is_none()) + } + + #[test] + fn test_add_sequence() { + let (mut mock_host, sk) = prepare(); + let RollupMetadata { + raw_rollup_address, .. + } = mock_host.reveal_metadata(); + + mock_host.add_external(Msg::new(0x01)); + mock_host.add_external(Msg::new(0x02)); + mock_host.add_external(Msg::new(0x03)); + mock_host.add_external(make_sequence(sk, raw_rollup_address, 2, 0)); + + let mut runtime = SequencerRuntime::new(mock_host, crate::FilterBehavior::AllowAll, 1); + let msg1 = runtime.read_input().unwrap().unwrap(); + let msg2 = runtime.read_input().unwrap().unwrap(); + + assert_external_eq(Msg::new(0x01), msg1.as_ref()); + assert_external_eq(Msg::new(0x02), msg2.as_ref()); + } + + #[test] + fn test_sequence_between_messages() { + let (mut mock_host, sk) = prepare(); + let RollupMetadata { + raw_rollup_address, .. + } = mock_host.reveal_metadata(); + + mock_host.add_external(Msg::new(0x01)); + mock_host.add_external(make_sequence(sk, raw_rollup_address, 2, 0)); + mock_host.add_external(Msg::new(0x02)); + + let mut runtime = SequencerRuntime::new(mock_host, crate::FilterBehavior::AllowAll, 1); + let msg1 = runtime.read_input().unwrap().unwrap(); + let msg2 = runtime.read_input().unwrap(); + + assert_external_eq(Msg::new(0x01), msg1.as_ref()); + assert!(msg2.is_none()); } } diff --git a/src/kernel_sequencer/src/lib.rs b/src/kernel_sequencer/src/lib.rs index 078dcca82610255dce778e8fca0ec31e34d0d58a..853fec1c538ce7542bae25e2439bf3d0847016ba 100644 --- a/src/kernel_sequencer/src/lib.rs +++ b/src/kernel_sequencer/src/lib.rs @@ -4,9 +4,12 @@ mod delayed_inbox; mod message; +mod queue; pub mod routing; mod sequencer_macro; pub mod sequencer_runtime; +mod state; mod storage; +pub use message::Framed; pub use routing::FilterBehavior; diff --git a/src/kernel_sequencer/src/message.rs b/src/kernel_sequencer/src/message.rs index 8ecc5d5e80e18a772c65ab3265f070721afc05d0..82ab138f415524db887b390f2587563382ab6e36 100644 --- a/src/kernel_sequencer/src/message.rs +++ b/src/kernel_sequencer/src/message.rs @@ -8,7 +8,7 @@ use nom::{ combinator::{all_consuming, map}, sequence::preceded, }; -use tezos_crypto_rs::hash::Signature; +use tezos_crypto_rs::{blake2b, hash::Signature}; use tezos_data_encoding::{ enc::{self, BinResult, BinWriter}, nom::{NomReader, NomResult}, @@ -32,7 +32,7 @@ pub struct Framed

{ #[derive(NomReader, BinWriter, Clone, Debug, PartialEq, Eq)] pub struct Bytes { #[encoding(dynamic, list)] - inner: Vec, + pub inner: Vec, } /// Sequence of messages sent by the sequencer @@ -40,7 +40,7 @@ pub struct Bytes { /// The sequence contains the number of messages /// that should be processed from the delayed inbox /// and the messages from the sequencer -/// +/// /// The number of messages that should be processed /// from the delayed inbox is divided into two parts /// delayed_messages_prefix and delayed_messages_suffix @@ -53,12 +53,43 @@ pub struct Bytes { /// will be processed at the end #[derive(NomReader, BinWriter, Clone, Debug, PartialEq, Eq)] pub struct Sequence { - nonce: u32, - delayed_messages_prefix: u32, - delayed_messages_suffix: u32, + pub nonce: u32, + pub delayed_messages_prefix: u32, + pub delayed_messages_suffix: u32, #[encoding(dynamic, list)] - messages: Vec, - signature: Signature, + pub messages: Vec, + pub signature: Signature, +} + +impl Sequence { + // Returns the signature of the sequence + pub fn signature(&self) -> &Signature { + &self.signature + } +} + +impl Sequence { + /// Returns the hash of the framed sequence + pub fn hash(&self, rollup_address: SmartRollupAddress) -> Result, ()> { + let mut bytes = Vec::default(); + // from Framed + enc::u8(&0, &mut bytes).map_err(|_| ())?; + // address of the rollup + rollup_address.bin_write(&mut bytes).map_err(|_| ())?; + + // Sequence tag + enc::u8(&0, &mut bytes).map_err(|_| ())?; + // nonce of the message + enc::u32(&self.nonce, &mut bytes).map_err(|_| ())?; + // number of delayed messages indicated by the prefix + enc::u32(&self.delayed_messages_prefix, &mut bytes).map_err(|_| ())?; + // number of delayed messages indicated by the suffix + enc::u32(&self.delayed_messages_suffix, &mut bytes).map_err(|_| ())?; + // payload messages + enc::dynamic(enc::list(Bytes::bin_write))(&self.messages, &mut bytes).map_err(|_| ())?; + + blake2b::digest_256(&bytes).map_err(|_| ()) + } } /// Message to set the appropriate sequencer diff --git a/src/kernel_sequencer/src/queue.rs b/src/kernel_sequencer/src/queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..5bfa07a701d380c911996d31ad4728ed56e871f5 --- /dev/null +++ b/src/kernel_sequencer/src/queue.rs @@ -0,0 +1,421 @@ +// SPDX-FileCopyrightText: 2023 Marigold +// +// SPDX-License-Identifier: MIT + +use tezos_data_encoding::{enc::BinWriter, nom::NomReader}; +use tezos_smart_rollup_host::{ + path::{concat, OwnedPath, Path, RefPath}, + runtime::{Runtime, RuntimeError}, + Error, +}; + +use crate::storage::sequencer_prefix; + +/// A simple FIFO (First-In-First-Out) queue implementation. +/// +/// The queue can support any type of element +/// The element has to implement NomReader and BinWriter +pub struct Queue { + prefix: OwnedPath, + pointer: Option, +} + +/// Pointer that indicates where is the head and tail of the queue. +/// +/// The head and the tail are both inclusive. +#[derive(NomReader, BinWriter, Copy, Clone)] +struct Pointer { + head: u32, + tail: u32, +} + +/// Compute the path where the pointer to the head and tail is stored +fn pointer_path(prefix: &impl Path) -> Result { + let pointer_path = RefPath::assert_from(b"/pointer"); + concat(prefix, &pointer_path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) +} + +/// Load the pointer from the durable storage +fn load_pointer(prefix: &OwnedPath, host: &H) -> Result, RuntimeError> { + let pointer_path = pointer_path(prefix)?; + match host.store_has(&pointer_path)? { + None => Ok(None), + Some(_) => { + // 8 bytes because the pointer is 2 u32 + let data = host.store_read(&pointer_path, 0, 8)?; + let (_, pointer) = Pointer::nom_read(&data).map_err(|_| RuntimeError::DecodingError)?; + Ok(Some(pointer)) + } + } +} + +impl Queue { + /// Creates a queue. + /// + /// If there is an existing queue at the given path, + /// then the queue is loaded from the durable storage + /// Otherwise a new fresh queue is created + pub fn new(host: &Host, path: OwnedPath) -> Result { + // Prefix the path of the queue by "__sequencer/" + let path = sequencer_prefix(&path)?; + let pointer = load_pointer(&path, host)?; + Ok(Queue { + prefix: path, + pointer, + }) + } + + /// Compute the path of one element of the queue + fn element_path(&self, id: u32) -> Result { + let path = OwnedPath::try_from(format!("/elements/{}", id)) + .map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey))?; + + concat(&self.prefix, &path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) + } + + /// Save the pointer of the queue in the durable storage + fn save_pointer(&self, host: &mut H) -> Result<(), RuntimeError> { + let pointer_path = pointer_path(&self.prefix)?; + + match &self.pointer { + None => { + // If the pointer is not present then the path has to be removed + host.store_delete(&pointer_path)?; + Ok(()) + } + Some(pointer) => { + // Serialize the pointer + let mut output = Vec::new(); + pointer + .bin_write(&mut output) + .map_err(|_| RuntimeError::HostErr(Error::GenericInvalidAccess))?; + // Save the pointer to the durable storage + host.store_write(&pointer_path, &output, 0)?; + Ok(()) + } + } + } + + /// Add an element to the back of the queue + pub fn add( + &mut self, + host: &mut Host, + element: &E, + ) -> Result<(), RuntimeError> + where + E: BinWriter, + { + let Queue { pointer, .. } = self; + + // Compute the next value of head and tail + // the element is added to the tail, so the tail pointer has to be incremented + // the head pointer does not change + let next_pointer = pointer + .map(|Pointer { head, tail }| Pointer { + head, + tail: tail + 1, + }) + .unwrap_or(Pointer { head: 0, tail: 0 }); + + // Compute the path of the element + let id = next_pointer.tail; + let path = self.element_path(id)?; + + // Get the bytes of the element + let mut bytes = Vec::new(); + element + .bin_write(&mut bytes) + .map_err(|_| RuntimeError::DecodingError)?; + + // write the bytes to the store + host.store_write(&path, bytes.as_ref(), 0)?; + + // update the queue pointer + self.pointer = Some(next_pointer); + + // save the pointer to the durable storage + self.save_pointer(host)?; + + Ok(()) + } + + /// Read the last element of the queue + /// + /// It does not remove it + pub fn head(&self, host: &Host) -> Result, RuntimeError> + where + E: NomReader, + { + match self.pointer { + None => Ok(None), + Some(Pointer { head, .. }) => { + // get the path of the stored element + let path = self.element_path(head)?; + // get the size of this element + let size = host.store_value_size(&path)?; + // get the bytes from the durable storage + let bytes = host.store_read(&path, 0, size)?; + // deserialize the bytes into the corresponding type + let (_, data) = E::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + // returns the parsed type + Ok(Some(data)) + } + } + } + + /// Remove the head of the queue and returns it + /// + /// If the queue is empty, it returns None + pub fn pop(&mut self, host: &mut Host) -> Result, RuntimeError> + where + E: NomReader, + { + match self.pointer { + None => Ok(None), + Some(Pointer { head, tail }) => { + // get the path of the stored element + let path = self.element_path(head)?; + // get the size of this element + let size = host.store_value_size(&path)?; + // get the bytes from the durable storage + let bytes = host.store_read(&path, 0, size)?; + // deserialize the bytes into the corresponding type + let (_, data) = E::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + // update the pointer + let next_pointer = Pointer { + head: head + 1, + tail, + }; + // check if the pointer is empty or not + let next_pointer = if next_pointer.head <= next_pointer.tail { + Some(next_pointer) + } else { + None + }; + // delete the element + host.store_delete(&path)?; + // save the pointer + self.pointer = next_pointer; + self.save_pointer(host)?; + + // returns the parsed type + Ok(Some(data)) + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::queue::Pointer; + + use super::Queue; + use tezos_data_encoding_derive::{BinWriter, NomReader}; + use tezos_smart_rollup_host::{ + path::OwnedPath, + runtime::{Runtime, RuntimeError}, + }; + use tezos_smart_rollup_mock::MockHost; + + /// Element of the queue for test purpose + #[derive(BinWriter, NomReader, Debug, PartialEq, Eq)] + struct Element { + inner: u32, + } + + impl Element { + fn new(inner: u32) -> Self { + Self { inner } + } + } + + /// Generate the queue path + fn queue_path() -> OwnedPath { + OwnedPath::try_from("/queue".to_string()).unwrap() + } + + /// Returns the number of elements in the queue. + pub fn queue_size(queue: &Queue) -> u32 { + match &queue.pointer { + None => 0, + Some(Pointer { head, tail }) => tail - head + 1, + } + } + + /// Read the last element of the queue + /// + /// It does not remove it + pub fn queue_tail(queue: &Queue, host: &H) -> Result, RuntimeError> + where + E: tezos_data_encoding::nom::NomReader, + { + match queue.pointer { + None => Ok(None), + Some(Pointer { tail, .. }) => { + let path = queue.element_path(tail)?; + let size = host.store_value_size(&path)?; + let data = host.store_read(&path, 0, size)?; + let (_, elt) = E::nom_read(&data).map_err(|_| RuntimeError::DecodingError)?; + Ok(Some(elt)) + } + } + } + + #[test] + fn test_init_empty() { + let mock_host = MockHost::default(); + let queue = Queue::new(&mock_host, queue_path()).unwrap(); + + assert_eq!(0, queue_size(&queue)) + } + + #[test] + fn test_add_element() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let data = Element::new(1); + + queue.add(&mut mock_host, &data).unwrap(); + + let head: Element = queue.head(&mock_host).unwrap().unwrap(); + let tail: Element = queue_tail(&queue, &mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 1); + assert_eq!(head, data); + assert_eq!(tail, data); + } + + #[test] + fn test_queue_correctly_saved() { + let mut mock_host = MockHost::default(); + let mut first_queue = Queue::new(&mock_host, queue_path()).unwrap(); + let data = Element::new(1); + first_queue.add(&mut mock_host, &data).unwrap(); + + // Initiate a new queue of the same path + let second_queue = Queue::new(&mock_host, queue_path()).unwrap(); + + // get head and tail from the first queue + let e1_fst: Option = first_queue.head(&mock_host).unwrap(); + let e2_fst: Option = queue_tail(&first_queue, &mock_host).unwrap(); + + // get head and tail from the second queue + let e1_snd: Option = second_queue.head(&mock_host).unwrap(); + let e2_snd: Option = queue_tail(&second_queue, &mock_host).unwrap(); + + assert_eq!(e1_fst, e1_snd); + assert_eq!(e2_fst, e2_snd); + } + + #[test] + fn test_add_two_element() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e1 = Element::new(1); + let e2 = Element::new(2); + + queue.add(&mut mock_host, &e1).unwrap(); + queue.add(&mut mock_host, &e2).unwrap(); + + let head: Element = queue.head(&mock_host).unwrap().unwrap(); + let tail: Element = queue_tail(&queue, &mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 2); + assert_eq!(head, e1); + assert_eq!(tail, e2); + } + + #[test] + fn test_add_three_element() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e1 = Element::new(1); + let e2 = Element::new(2); + let e3 = Element::new(3); + + queue.add(&mut mock_host, &e1).unwrap(); + queue.add(&mut mock_host, &e2).unwrap(); + queue.add(&mut mock_host, &e3).unwrap(); + + let head: Element = queue.head(&mock_host).unwrap().unwrap(); + let tail: Element = queue_tail(&queue, &mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 3); + assert_eq!(head, e1); + assert_eq!(tail, e3); + } + + #[test] + fn test_pop_empty() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e: Option = queue.pop(&mut mock_host).unwrap(); + + assert!(e.is_none()) + } + + #[test] + fn test_pop_one_element() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e1 = Element::new(1); + let e2 = Element::new(2); + + queue.add(&mut mock_host, &e1).unwrap(); + queue.add(&mut mock_host, &e2).unwrap(); + + let fst: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 1); + assert_eq!(fst, e1); + } + + #[test] + fn test_pop_until_empty() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e1 = Element::new(1); + let e2 = Element::new(2); + + queue.add(&mut mock_host, &e1).unwrap(); + queue.add(&mut mock_host, &e2).unwrap(); + + let fst: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + let snd: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 0); + assert_eq!(fst, e1); + assert_eq!(snd, e2); + } + + #[test] + fn test_pop_add() { + let mut mock_host = MockHost::default(); + let mut queue = Queue::new(&mock_host, queue_path()).unwrap(); + + let e1 = Element::new(1); + let e2 = Element::new(2); + let e3 = Element::new(3); + let e4 = Element::new(4); + + queue.add(&mut mock_host, &e1).unwrap(); + queue.add(&mut mock_host, &e2).unwrap(); + + let e1_read: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + queue.add(&mut mock_host, &e3).unwrap(); + let e2_read: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + queue.add(&mut mock_host, &e4).unwrap(); + let e3_read: Element = queue.pop(&mut mock_host).unwrap().unwrap(); + + assert_eq!(queue_size(&queue), 1); + assert_eq!(e1_read, e1); + assert_eq!(e2_read, e2); + assert_eq!(e3_read, e3); + } +} diff --git a/src/kernel_sequencer/src/sequencer_macro.rs b/src/kernel_sequencer/src/sequencer_macro.rs index 9b775341b409e59b558d16d956f7b0c1babdc0ba..e6c43bf129494f5609b3519e6a6840d45ad1a61f 100644 --- a/src/kernel_sequencer/src/sequencer_macro.rs +++ b/src/kernel_sequencer/src/sequencer_macro.rs @@ -24,7 +24,8 @@ macro_rules! sequencer_kernel_entry { pub extern "C" fn kernel_run() { use tezos_smart_rollup_core::rollup_host::RollupHost; let host = unsafe { RollupHost::new() }; // Runtime from the tezos sdk - let mut host = $crate::sequencer_runtime::SequencerRuntime::new(host, $filter_behavior); // create a sequencer runtime that use the RollupHost runtime + let mut host = + $crate::sequencer_runtime::SequencerRuntime::new(host, $filter_behavior, 100); // create a sequencer runtime that use the RollupHost runtime $kernel_run(&mut host) } }; diff --git a/src/kernel_sequencer/src/sequencer_runtime.rs b/src/kernel_sequencer/src/sequencer_runtime.rs index 8b1ea976430cfdd6416aeb25cd34f8f2243e44b8..0d92e55108a4f6032f2e309cbaadcf917052e791 100644 --- a/src/kernel_sequencer/src/sequencer_runtime.rs +++ b/src/kernel_sequencer/src/sequencer_runtime.rs @@ -11,7 +11,13 @@ use tezos_smart_rollup_host::{ runtime::{Runtime, RuntimeError, ValueType}, }; -use crate::{delayed_inbox::read_input, routing::FilterBehavior, storage::map_user_path}; +use crate::{ + delayed_inbox::read_input, + queue::Queue, + routing::FilterBehavior, + storage::map_user_path, + storage::{DELAYED_INBOX_PATH, PENDING_INBOX_PATH}, +}; pub struct SequencerRuntime where @@ -20,6 +26,20 @@ where host: R, /// if true then the input is added to the delayed inbox input_predicate: FilterBehavior, + /// maximum number of level a message can stay in the delayed inbox + timeout_window: u32, + /// The delayed inbox queue + delayed_inbox_queue: Queue, + /// The pending inbox + /// A buffer that contains messages removed from the delayed inbox + /// Or messages sent by the user + /// This queue is empty at the end of the delayed inbox + pending_inbox_queue: Queue, + /// Index of the pending inbox + /// When a message is added to the pending-inbox + /// Its index is the following field + /// And then the index is incremented + pending_inbox_index: u32, } /// Runtime that handles the delayed inbox and the sequencer protocol. @@ -30,10 +50,16 @@ impl SequencerRuntime where R: Runtime, { - pub fn new(host: R, input_predicate: FilterBehavior) -> Self { + pub fn new(host: R, input_predicate: FilterBehavior, timeout_window: u32) -> Self { + let delayed_inbox_queue = Queue::new(&host, DELAYED_INBOX_PATH.into()).unwrap(); + let pending_inbox_queue = Queue::new(&host, PENDING_INBOX_PATH.into()).unwrap(); Self { host, input_predicate, + timeout_window, + delayed_inbox_queue, + pending_inbox_queue, + pending_inbox_index: 0, } } } @@ -51,7 +77,14 @@ where } fn read_input(&mut self) -> Result, RuntimeError> { - read_input(&mut self.host, self.input_predicate) + read_input( + &mut self.host, + self.input_predicate, + self.timeout_window, + &mut self.delayed_inbox_queue, + &mut self.pending_inbox_queue, + &mut self.pending_inbox_index, + ) } fn store_has(&self, path: &T) -> Result, RuntimeError> { @@ -165,3 +198,57 @@ where self.host.runtime_version() } } + +#[cfg(test)] +mod tests { + + use super::SequencerRuntime; + use tezos_data_encoding_derive::BinWriter; + use tezos_smart_rollup_host::{path::RefPath, runtime::Runtime}; + use tezos_smart_rollup_mock::MockHost; + + #[derive(BinWriter)] + struct UserMessage { + payload: u32, + } + + impl UserMessage { + fn new(payload: u32) -> UserMessage { + UserMessage { payload } + } + } + + #[test] + fn test_add_user_message() { + let mut mock_host = MockHost::default(); + mock_host.add_external(UserMessage::new(1)); + mock_host.add_external(UserMessage::new(2)); + mock_host.add_external(UserMessage::new(3)); + + let mut sequencer_runtime = + SequencerRuntime::new(mock_host, crate::FilterBehavior::AllowAll, 1); + + let input = sequencer_runtime.read_input().unwrap(); + let SequencerRuntime { host, .. } = sequencer_runtime; + + assert!(input.is_none()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/0" + )) + .unwrap() + .is_some()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/1" + )) + .unwrap() + .is_some()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/2" + )) + .unwrap() + .is_some()); + } +} diff --git a/src/kernel_sequencer/src/state.rs b/src/kernel_sequencer/src/state.rs new file mode 100644 index 0000000000000000000000000000000000000000..fe31e81c16a237054395254f924bd70a9ecc14d6 --- /dev/null +++ b/src/kernel_sequencer/src/state.rs @@ -0,0 +1,122 @@ +// SPDX-FileCopyrightText: 2023 Marigold +// +// SPDX-License-Identifier: MIT + +use tezos_data_encoding::{enc::BinWriter, nom::NomReader}; +use tezos_smart_rollup_encoding::public_key::PublicKey; +use tezos_smart_rollup_host::runtime::{Runtime, RuntimeError}; + +use crate::{delayed_inbox::UserMessage, queue::Queue, storage::write_state}; + +/// Represent the state of the delayed inbox +/// +/// The delayed inbox has 2 states: +/// - Sequenced(SmartRollupAddress): +/// the delayed inbox accepts messages from the registered sequencer +/// - Fallback: +/// the kernel will process by itself the messages from the delayed inbox, +/// it's also the default mode +#[derive(Debug, PartialEq, Eq, NomReader, BinWriter, Clone)] +pub enum State { + Sequenced(PublicKey), + Fallback, +} + +impl Default for State { + fn default() -> Self { + State::Fallback + } +} + +/// If the head of the inbox is too old, the state switch to the Fallback mode. +pub fn update_state( + host: &mut H, + delayed_inbox_queue: &Queue, + current_level: u32, +) -> Result<(), RuntimeError> { + let head = delayed_inbox_queue.head(host)?; + + match head { + None => Ok(()), + Some(UserMessage { timeout_level, .. }) => { + if timeout_level < current_level { + write_state(host, State::Fallback)?; + Ok(()) + } else { + Ok(()) + } + } + } +} + +#[cfg(test)] +mod tests { + use tezos_smart_rollup_encoding::public_key::PublicKey; + use tezos_smart_rollup_host::path::RefPath; + use tezos_smart_rollup_mock::MockHost; + + use crate::{ + delayed_inbox::UserMessage, + queue::Queue, + state::State, + storage::{read_state, write_state}, + }; + + use super::update_state; + + fn sequenced_state() -> State { + State::Sequenced( + PublicKey::from_b58check("edpkuDMUm7Y53wp4gxeLBXuiAhXZrLn8XB1R83ksvvesH8Lp8bmCfK") + .expect("decoding should work"), + ) + } + + #[test] + fn test_switch_to_fallback() { + let mut mock_host = MockHost::default(); + let mut delayed_inbox_queue = + Queue::new(&mock_host, RefPath::assert_from(b"/queue").into()) + .expect("queue should be created"); + let message = UserMessage { + timeout_level: 32, + payload: Vec::default(), + }; + write_state(&mut mock_host, sequenced_state()).expect("Writing a new state should work"); + + delayed_inbox_queue + .add(&mut mock_host, &message) + .expect("Element should be added to the queue"); + + update_state(&mut mock_host, &delayed_inbox_queue, 100) + .expect("Updating the state should work"); + + let state = read_state(&mut mock_host).expect("reading the state should work"); + + assert_eq!(state, State::Fallback); + } + + #[test] + fn test_same_state() { + let mut mock_host = MockHost::default(); + let mut delayed_inbox_queue = + Queue::new(&mock_host, RefPath::assert_from(b"/queue").into()) + .expect("queue should be created"); + let message = UserMessage { + timeout_level: 32, + payload: Vec::default(), + }; + + write_state(&mut mock_host, sequenced_state()).expect("Writing a new state should work"); + + delayed_inbox_queue + .add(&mut mock_host, &message) + .expect("Element should be added to the queue"); + + update_state(&mut mock_host, &delayed_inbox_queue, 16) + .expect("Updating the state should work"); + + let read_state = read_state(&mut mock_host).expect("reading the state should work"); + + assert_eq!(read_state, sequenced_state()); + } +} diff --git a/src/kernel_sequencer/src/storage.rs b/src/kernel_sequencer/src/storage.rs index d81838cc6abde616cbe596bee4b24e442501edc5..85a91228a15860f016995817d405c8b6f6a01094 100644 --- a/src/kernel_sequencer/src/storage.rs +++ b/src/kernel_sequencer/src/storage.rs @@ -2,20 +2,25 @@ // // SPDX-License-Identifier: MIT +use tezos_data_encoding::{enc::BinWriter, nom::NomReader}; use tezos_smart_rollup_host::{ path::{concat, OwnedPath, Path, RefPath, PATH_SEPARATOR}, - runtime::RuntimeError, + runtime::{Runtime, RuntimeError}, Error, }; +use crate::state::State; + const SEQUENCER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/__sequencer"); const USER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/u"); +pub const DELAYED_INBOX_PATH: RefPath = RefPath::assert_from(b"/delayed-inbox"); +const STATE: RefPath = RefPath::assert_from(b"/state"); +pub const PENDING_INBOX_PATH: RefPath = RefPath::assert_from(b"/pending-inbox"); /// Prefix the given path by `/__sequencer`. /// /// This function has to be used when writing/reading storage related to the sequencer kernel. /// Then with this function, all the sequencer kernel storage should be under the path `__sequencer`. -#[allow(dead_code)] pub fn sequencer_prefix(path: &T) -> Result { concat(&SEQUENCER_PREFIX_PATH, path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) } @@ -33,11 +38,46 @@ pub fn map_user_path(path: &T) -> Result { } } +/// Write the sequencer kernel state under /__sequencer/state +pub fn write_state(host: &mut H, state: State) -> Result<(), RuntimeError> { + let path = sequencer_prefix(&STATE)?; + let mut bytes = Vec::default(); + state + .bin_write(&mut bytes) + .map_err(|_| RuntimeError::DecodingError)?; + + host.store_write(&path, &bytes, 0) +} + +/// Returns the state of the sequencer kernel +/// +/// By default the Fallback mode is returned +#[allow(dead_code)] +pub fn read_state(host: &mut H) -> Result { + let path = sequencer_prefix(&STATE)?; + + let is_present = host.store_has(&path)?; + match is_present { + None => Ok(State::Fallback), + Some(_) => { + // read the size of the state + let size = host.store_value_size(&path)?; + // read the according bytes + let bytes = host.store_read(&path, 0, size)?; + // deserialize the state + let (_, state) = State::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + Ok(state) + } + } +} + #[cfg(test)] mod tests { + use super::{map_user_path, sequencer_prefix, write_state, USER_PREFIX_PATH}; + use crate::state::State; use tezos_smart_rollup_host::path::{concat, OwnedPath, RefPath}; - - use super::{map_user_path, sequencer_prefix, USER_PREFIX_PATH}; + use tezos_smart_rollup_host::runtime::Runtime; + use tezos_smart_rollup_mock::MockHost; #[test] fn test_sequencer_prefixed() { @@ -62,4 +102,16 @@ mod tests { let expected = concat(&USER_PREFIX_PATH, &user_path).unwrap(); assert_eq!(expected, prefixed_user_path); } + + #[test] + fn test_write_state() { + let path = RefPath::assert_from(b"/__sequencer/state"); + let state = State::Fallback; + let mut mock_host = MockHost::default(); + + write_state(&mut mock_host, state).expect("write_state should succeed"); + + let is_present = mock_host.store_has(&path).expect("Store has should work"); + assert!(is_present.is_some()); + } } diff --git a/src/lib_scoru_sequencer/dune b/src/lib_scoru_sequencer/dune index 05361fbd177d17d3890bf150091143af68f190ea..07a47c26bdedc69073a5461cf2a64e128044dda6 100644 --- a/src/lib_scoru_sequencer/dune +++ b/src/lib_scoru_sequencer/dune @@ -13,6 +13,8 @@ tezos-workers octez-smart-rollup-node-lib octez-smart-rollup + tezos-client-base + tezos_layer2_store tezos-rpc tezos-rpc-http tezos-rpc-http-server) @@ -24,4 +26,6 @@ -open Tezos_protocol_alpha -open Tezos_workers -open Octez_smart_rollup_node - -open Octez_smart_rollup)) + -open Octez_smart_rollup + -open Tezos_client_base + -open Tezos_layer2_store)) diff --git a/src/lib_scoru_sequencer/durable_state.ml b/src/lib_scoru_sequencer/durable_state.ml new file mode 100644 index 0000000000000000000000000000000000000000..d18021625104d011177c3085be3cd230aab92b04 --- /dev/null +++ b/src/lib_scoru_sequencer/durable_state.ml @@ -0,0 +1,136 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2022-2023 TriliTech *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +open Protocol +open Alpha_context +open Octez_smart_rollup_node_alpha +open Kernel_durable +module Durable_state = Wasm_2_0_0_pvm.Durable_state + +(* User state is supposed to be stored under /u *) +let lookup_user_kernel ctxt_tree key = + Durable_state.lookup ctxt_tree @@ "/u" ^ key + +(* User state is supposed to be stored under /u *) +let list_user_kernel ctxt_tree key = Durable_state.list ctxt_tree @@ "/u" ^ key + +let lookup_queue_pointer ctxt_tree = + let open Lwt_syntax in + let* pointer_bytes = + Durable_state.lookup ctxt_tree Delayed_inbox.Pointer.path + in + match pointer_bytes with + | None -> return Delayed_inbox.Pointer.empty + | Some pointer_bytes -> + return + @@ Option.value ~default:Delayed_inbox.Pointer.empty + @@ Data_encoding.Binary.of_bytes_opt + Delayed_inbox.Pointer.encoding + pointer_bytes + +let lookup_queue_element ctxt_tree element_id = + let open Lwt_syntax in + let* element = + Durable_state.lookup ctxt_tree @@ Delayed_inbox.Element.path element_id + in + Lwt.return + @@ Option.bind element (fun el_bytes -> + Option.map (fun x -> x.Delayed_inbox.Element.user_message) + @@ Data_encoding.Binary.of_bytes_opt + Delayed_inbox.Element.encoding + el_bytes) + +let context_of node_ctxt (head : Layer1.head) = + let open Lwt_result_syntax in + if head.level < node_ctxt.Node_context.genesis_info.level then + (* This is before we have interpreted the boot sector, so we start + with an empty context in genesis *) + return (Context.empty node_ctxt.context) + else Node_context.checkout_context node_ctxt head.hash + +let get_delayed_inbox_pointer node_ctxt (at : Layer1.head) = + let open Lwt_result_syntax in + let* ctxt = context_of node_ctxt at in + let* _ctxt, state = Interpreter.state_of_head node_ctxt ctxt at in + let*! pointer = lookup_queue_pointer state in + return pointer + +let get_pointer_elements state (p : Delayed_inbox.Pointer.t) = + let open Lwt_result_syntax in + let size = Int32.to_int @@ Delayed_inbox.Pointer.size p in + List.init_ep + ~when_negative_length: + (Exn (Failure "Unexpected negative length of delayed inbox difference")) + size + (fun i -> + let element_id = Int32.(add (of_int i) p.head) in + let*! opt_el = lookup_queue_element state element_id in + match opt_el with + | None -> + tzfail + @@ Exn + (Failure + (Format.asprintf + "Couldn't obtain delayed inbox element with index %ld" + element_id)) + | Some el -> return @@ Sc_rollup.Inbox_message.unsafe_of_string el) + +let get_previous_delayed_inbox_pointer node_ctxt (head : Layer1.head) = + let open Lwt_result_syntax in + let*? () = + error_unless + (head.level >= node_ctxt.Node_context.genesis_info.level) + (Exn (Failure "Cannot obtain delayed inbox before origination level")) + in + let* previous_head = Node_context.get_predecessor node_ctxt head in + get_delayed_inbox_pointer node_ctxt previous_head + +(* Returns newly added elements in the delayed inbox + at block corresponding to the passed head. *) +let get_delayed_inbox_diff node_ctxt (head : Layer1.head) = + let open Lwt_result_syntax in + let level = head.level in + if head.level < node_ctxt.Node_context.genesis_info.level then + tzfail + (Exn + (Failure + (Format.asprintf + "Cannot obtain delayed inbox difference for level %ld" + level))) + else if level = node_ctxt.Node_context.genesis_info.level then + return Delayed_inbox.{pointer = Delayed_inbox.Pointer.empty; elements = []} + else + let* previous_head = Node_context.get_predecessor node_ctxt head in + let* previous_pointer = get_delayed_inbox_pointer node_ctxt previous_head in + let* current_pointer = get_delayed_inbox_pointer node_ctxt head in + let new_block_head = Int32.succ previous_pointer.tail in + let new_block_tail = current_pointer.tail in + let* ctxt = context_of node_ctxt head in + let* _ctxt, state = Interpreter.state_of_head node_ctxt ctxt head in + let pointer = + Delayed_inbox.Pointer.{head = new_block_head; tail = new_block_tail} + in + let+ elements = get_pointer_elements state pointer in + Delayed_inbox.{pointer; elements} diff --git a/src/lib_scoru_sequencer/kernel_durable.ml b/src/lib_scoru_sequencer/kernel_durable.ml index ac5ed0a68c84a8772f151fbbc23ae927c9fc5abf..5e15a04bf8236906c0ed8d7a5ca1b52ce1b2d7b0 100644 --- a/src/lib_scoru_sequencer/kernel_durable.ml +++ b/src/lib_scoru_sequencer/kernel_durable.ml @@ -23,17 +23,50 @@ (* *) (*****************************************************************************) +open Protocol +open Alpha_context + let sequencer_prefix = "/__sequencer" -let delayed_inbox_prefix = String.concat "/" [sequencer_prefix; "delayed-inbox"] +module Delayed_inbox = struct + let path = String.concat "/" [sequencer_prefix; "delayed-inbox"] + + module Pointer = struct + let path = String.concat "/" [path; "pointer"] + + type t = {head : int32; tail : int32} + + let empty = {head = 0l; tail = -1l} + + let is_adjacent left right = + Compare.Int32.(Int32.succ left.tail = right.head) + + let size x = Int32.(succ @@ sub x.tail x.head) + + let encoding = + let open Data_encoding in + conv (fun {head; tail} -> (head, tail)) (fun (head, tail) -> {head; tail}) + @@ obj2 (req "head" int32) (req "tail" int32) + + let pp ppf p = Format.fprintf ppf "{head: %ld; tail: %ld}" p.head p.tail + end + + module Element = struct + let path element_id = + String.concat "/" [path; "elements"; Int32.to_string element_id] -module Delayed_inbox_pointer = struct - let path = String.concat "/" [delayed_inbox_prefix; "pointer"] + type t = {timeout_level : int32; user_message : string} - type t = {head : int32; tail : int32} + let encoding = + let open Data_encoding in + conv + (fun {timeout_level; user_message} -> (timeout_level, user_message)) + (fun (timeout_level, user_message) -> {timeout_level; user_message}) + @@ obj2 (req "timeout_level" int32) (req "user_message" Variable.string) + end - let encoding = - let open Data_encoding in - conv (fun {head; tail} -> (head, tail)) (fun (head, tail) -> {head; tail}) - @@ obj2 (req "head" int32) (req "tail" int32) + type queue_slice = { + pointer : Pointer.t; + elements : Sc_rollup.Inbox_message.serialized list; + } end diff --git a/src/lib_scoru_sequencer/kernel_message.ml b/src/lib_scoru_sequencer/kernel_message.ml index d7ee9f7f000b48c688ff94d735702d5ed666ea69..5009139193b3c754fb8a0222932baec78151b645 100644 --- a/src/lib_scoru_sequencer/kernel_message.ml +++ b/src/lib_scoru_sequencer/kernel_message.ml @@ -144,15 +144,25 @@ module Framed_message = Framed (struct && List.equal String.equal x.l2_messages y.l2_messages end) -include Signed (Framed_message) +type unsigned = Framed_message.t -(* Signature consisting of zeroes *) -let dummy_signature = - Signature.V0.of_bytes_exn @@ Bytes.make Signature.V0.size @@ Char.chr 0 +let unsigned_encoding = Framed_message.encoding -let encode_sequence_message rollup_address ~prefix ~suffix - (l2_messages : Sc_rollup.Inbox_message.serialized list) : string = - (* Fix: actually sign a message *) +module Signed_raw = Signed (struct + type t = string + + let encoding = Data_encoding.Variable.string + + let equal = String.equal +end) + +type signed_raw = Signed_raw.t + +let signed_raw_encoding = Signed_raw.encoding + +let encode_sequence_message rollup_address ~nonce ~prefix ~suffix + (l2_messages : Sc_rollup.Inbox_message.serialized list) : + [`Unsigned_encoded of string] = let unsigned_payload = Framed_message. { @@ -160,7 +170,7 @@ let encode_sequence_message rollup_address ~prefix ~suffix payload = Sequence { - nonce = 0l; + nonce; delayed_messages_prefix = prefix; delayed_messages_suffix = suffix; l2_messages = @@ -168,18 +178,51 @@ let encode_sequence_message rollup_address ~prefix ~suffix }; } in - let signed_framed_sequence = - {unsigned_payload; signature = dummy_signature} + `Unsigned_encoded + (Data_encoding.Binary.to_string_exn + Framed_message.encoding + unsigned_payload) + +let sign_sequence cctxt signer (`Unsigned_encoded unsigned_sequence) = + let open Lwt_result_syntax in + let+ signature = + Client_keys.V0.sign cctxt signer (String.to_bytes unsigned_sequence) + in + Data_encoding.Binary.to_string_exn + signed_raw_encoding + {unsigned_payload = unsigned_sequence; signature} + +let encode_and_sign_sequence (cctx, signer) rollup_address ~nonce ~prefix + ~suffix serialized_msgs = + let encoded_sequence = + encode_sequence_message + rollup_address + ~nonce + ~prefix + ~suffix + serialized_msgs in - Data_encoding.Binary.to_string_exn encoding signed_framed_sequence + sign_sequence cctx signer encoded_sequence let single_l2_message_overhead = let dummy_address = Sc_rollup.Address.of_b58check_exn "sr1EzLeJYWrvch2Mhvrk1nUVYrnjGQ8A4qdb" in - String.length - @@ encode_sequence_message - dummy_address - ~prefix:1000l - ~suffix:1000l - [Sc_rollup.Inbox_message.unsafe_of_string ""] + (* Signature consisting of zeroes *) + let dummy_signature = + Signature.V0.of_bytes_exn @@ Bytes.make Signature.V0.size @@ Char.chr 0 + in + let (`Unsigned_encoded encoded) = + encode_sequence_message + dummy_address + ~nonce:0l + ~prefix:1000l + ~suffix:1000l + [Sc_rollup.Inbox_message.unsafe_of_string ""] + in + let dummy_signed = + Data_encoding.Binary.to_string_exn + signed_raw_encoding + {unsigned_payload = encoded; signature = dummy_signature} + in + String.length dummy_signed diff --git a/src/lib_scoru_sequencer/optimistic_simulation.ml b/src/lib_scoru_sequencer/optimistic_simulation.ml new file mode 100644 index 0000000000000000000000000000000000000000..df53250061b71591c35fab230b29e41493047094 --- /dev/null +++ b/src/lib_scoru_sequencer/optimistic_simulation.ml @@ -0,0 +1,299 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2022-2023 TriliTech *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +open Protocol +open Alpha_context +open Octez_smart_rollup_node_alpha +open Kernel_durable +module Fueled_pvm = Fueled_pvm.Free + +type t = { + current_block_diff : Delayed_inbox.Pointer.t; + inbox_level : int32; + ctxt : Context.ro; + state : Context.tree; + tot_messages_consumed : int; + accumulated_messages : Sc_rollup.Inbox_message.serialized list; + block_beginning : Context.tree; +} + +module type Messages_encoder = sig + type signer_ctxt + + val encode_sequence : + signer_ctxt -> + nonce:int32 -> + prefix:int32 -> + suffix:int32 -> + Sc_rollup.Inbox_message.serialized list -> + string tzresult Lwt.t +end + +module type S = sig + type signer_ctxt + + val init_ctxt : + signer_ctxt -> + Node_context.ro -> + Delayed_inbox.queue_slice -> + t tzresult Lwt.t + + val new_block : + signer_ctxt -> + Node_context.ro -> + t -> + Delayed_inbox.queue_slice -> + t tzresult Lwt.t + + val append_messages : + signer_ctxt -> + Node_context.ro -> + t -> + Sc_rollup.Inbox_message.serialized list -> + t tzresult Lwt.t +end + +module Simple = struct + include Internal_event.Simple + + let section = ["sequencer_node"] + + let simulation_kernel_debug = + declare_1 + ~section + ~name:"simulation_kernel_debug" + ~level:Info + ~msg:"Simulation debug: {log}" + ("log", Data_encoding.string) + ~pp1:Format.pp_print_string +end + +let simulation_kernel_debug msg = Simple.(emit simulation_kernel_debug) msg + +let init_empty_ctxt node_ctxt (first_block : Delayed_inbox.Pointer.t) = + let open Lwt_result_syntax in + let genesis_level = node_ctxt.Node_context.genesis_info.level in + let* genesis_hash = Node_context.hash_of_level node_ctxt genesis_level in + let genesis_head = Layer1.{hash = genesis_hash; level = genesis_level} in + let* ctxt = Node_context.checkout_context node_ctxt genesis_hash in + let+ ctxt, state = Interpreter.state_of_head node_ctxt ctxt genesis_head in + { + ctxt; + (* It will be incremeneted in new_block_impl straight away *) + inbox_level = node_ctxt.Node_context.genesis_info.level; + state; + current_block_diff = first_block; + tot_messages_consumed = 0; + accumulated_messages = []; + block_beginning = state; + } + +let simulate (node_ctxt : Node_context.ro) + ({ctxt; inbox_level; accumulated_messages; block_beginning; _} as sim) + messages = + let open Lwt_result_syntax in + let module PVM = (val Pvm.of_kind node_ctxt.kind) in + let*! block_beginning_hash = PVM.state_hash block_beginning in + let*! tick = PVM.get_tick block_beginning in + let accumulated_messages = List.rev_append messages accumulated_messages in + let*? eol = + Sc_rollup.Inbox_message.serialize + (Sc_rollup.Inbox_message.Internal End_of_level) + |> Environment.wrap_tzresult + in + let eval_state = + Fueled_pvm. + { + state = block_beginning; + state_hash = block_beginning_hash; + tick; + inbox_level; + message_counter_offset = 0; + remaining_fuel = Fuel.Free.of_ticks 0L; + remaining_messages = + List.map Sc_rollup.Inbox_message.unsafe_to_string + @@ List.rev (eol :: accumulated_messages); + } + in + let node_ctxt = + Node_context.{node_ctxt with kernel_debug_logger = simulation_kernel_debug} + in + let* eval_result = Fueled_pvm.eval_messages node_ctxt eval_state in + (* Build new state *) + let Fueled_pvm.{state = {state; _}; _} = + Delayed_write_monad.ignore eval_result + in + let*! ctxt = PVM.State.set ctxt state in + return {sim with ctxt; state; accumulated_messages} + +let ensure_diff_is_valid (current_diff : Delayed_inbox.Pointer.t option) + (new_diff : Delayed_inbox.queue_slice) = + let open Lwt_result_syntax in + let* () = + match current_diff with + | None -> + fail_unless + Compare.Int32.(new_diff.pointer.head = 0l) + (Exn + (Failure + "First block's messages have to be the first in the delayed \ + inbox queue")) + | Some current_diff -> + fail_unless + (Delayed_inbox.Pointer.is_adjacent current_diff new_diff.pointer) + (Exn + (Failure + "Two consecutive blocks' message have to be adjacent in the \ + delayed inbox queue")) + in + let open Sc_rollup.Inbox_message in + let els = new_diff.elements in + match (els, List.last_opt els) with + | sol :: ipl :: _, Some eol -> ( + let*? sol_ = Environment.wrap_tzresult @@ deserialize sol in + let*? ipl_ = Environment.wrap_tzresult @@ deserialize ipl in + let*? eol_ = Environment.wrap_tzresult @@ deserialize eol in + match (sol_, ipl_, eol_) with + | ( Internal Start_of_level, + Internal (Info_per_level _), + Internal End_of_level ) -> + return (List.take_n (List.length els - 1) els, eol) + | _ -> + tzfail + (Exn + (Failure + "Boundaries messages are expected to be SoL, IpL, .. EoL"))) + | _ -> + tzfail + (Exn + (Failure + "Block diff has to include at least SoL, IpL and EoL messages")) + +module Make (Enc : Messages_encoder) = struct + (* First supply block diff to the sequencer kernel but the last EoL, + all of them will land in the delayed inbox queue. + Then suppply Sequence that consumes all those messages from the delayed inbox. *) + let new_block_impl ?current_block_diff signer_ctxt node_ctxt sim_ctxt + new_block_diff = + let open Lwt_result_syntax in + let* all_but_eol, _eol = + ensure_diff_is_valid current_block_diff new_block_diff + in + (* If we already started a first block, we have to close it up with EoL, + which is supposed to be on the head of the queue. *) + let is_first_block = Option.is_none current_block_diff in + (* First, move sim_ctxt to state where EoL of the current block is supplied. + To do so we just need to update block beginning state to state as it's basically needed one, + reset accumulated messages and update current_block_diff *) + let sim_ctxt = + { + sim_ctxt with + accumulated_messages = []; + block_beginning = sim_ctxt.state; + inbox_level = Int32.succ sim_ctxt.inbox_level; + current_block_diff = new_block_diff.pointer; + } + in + let* populated_delayed_inbox_ctxt = + simulate node_ctxt sim_ctxt all_but_eol + in + let diff_size = + Kernel_durable.Delayed_inbox.Pointer.size new_block_diff.pointer + in + let to_consume = + if is_first_block then + (* In this case, it's our first block, hence, no EoL from previous level *) + Int32.pred diff_size + else + (* In this case delayed inbox looks like: + EoL[from previous level], SoL[new level], IpL[new level], ..., . + We need to consume first EoL and the rest of the messages, + what brings us to diff_size elements to consume. + *) diff_size + in + (* TODO: fetch optimistic nonce *) + let* consume_inbox_sequence = + Enc.encode_sequence signer_ctxt ~nonce:0l ~prefix:to_consume ~suffix:0l [] + in + let*? wrapping_sequence_external = + Environment.wrap_tzresult + @@ Sc_rollup.Inbox_message.(serialize @@ External consume_inbox_sequence) + in + let* consumed_delayed_inbox_ctxt = + simulate + node_ctxt + populated_delayed_inbox_ctxt + [wrapping_sequence_external] + in + return + { + consumed_delayed_inbox_ctxt with + tot_messages_consumed = + consumed_delayed_inbox_ctxt.tot_messages_consumed + + Int32.to_int to_consume; + } + + let init_ctxt signer_ctxt node_ctxt (first_block : Delayed_inbox.queue_slice) + = + let open Lwt_result_syntax in + let* init_ctxt = init_empty_ctxt node_ctxt first_block.pointer in + new_block_impl signer_ctxt node_ctxt init_ctxt first_block + + let new_block signer_ctxt node_ctxt sim_state block_delayed_inbox_diff = + new_block_impl + ~current_block_diff:sim_state.current_block_diff + signer_ctxt + node_ctxt + sim_state + block_delayed_inbox_diff + + (* Construct Sequence consisting of the passed blocks and passes it to the sequencer kernel *) + let append_messages signer_ctxt node_ctxt sim_state + external_serialized_messages = + let open Lwt_result_syntax in + (* TODO fetch nonce from simulation durable storage *) + let* encoded_wrapping_sequence = + Enc.encode_sequence + signer_ctxt + ~nonce:0l + ~prefix:0l + ~suffix:0l + external_serialized_messages + in + let*? wrapping_sequence_external = + Environment.wrap_tzresult + @@ Sc_rollup.Inbox_message.( + serialize @@ External encoded_wrapping_sequence) + in + let+ consumed_ctxt = + simulate node_ctxt sim_state [wrapping_sequence_external] + in + { + consumed_ctxt with + tot_messages_consumed = + consumed_ctxt.tot_messages_consumed + + List.length external_serialized_messages; + } +end diff --git a/src/lib_scoru_sequencer/optimistic_simulation.mli b/src/lib_scoru_sequencer/optimistic_simulation.mli new file mode 100644 index 0000000000000000000000000000000000000000..169cc72fc394e20cdb18246ca7d0756700c0f956 --- /dev/null +++ b/src/lib_scoru_sequencer/optimistic_simulation.mli @@ -0,0 +1,106 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2022-2023 TriliTech *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +open Protocol +open Alpha_context +open Kernel_durable + +(** This module simulates sequence of the messages + in which they will be eventually supplied to the sequencer kernel. + It's doing so by replicating the exact same order + defined in the seq_batcher, basically reacting to the two types of events: + - new finalized block added to the simulation + - user messages injected through sequencer RPC added to the simulation + + Currently, the simulation works in a quadratic way, + resimulating all the sequence of the messages when new messages arrives. + It will be fixed when this https://gitlab.com/tezos/tezos/-/issues/6020 task is done. +*) + +(** Provides capability to encode well formed sequence, + which will be supplied to the simulation PVM *) +module type Messages_encoder = sig + type signer_ctxt + + val encode_sequence : + signer_ctxt -> + nonce:int32 -> + prefix:int32 -> + suffix:int32 -> + Sc_rollup.Inbox_message.serialized list -> + string tzresult Lwt.t +end + +type t = { + current_block_diff : Delayed_inbox.Pointer.t; + (** The range of the delayed inbox queue, + which have been added to the delayed inbox by the current block. + Head of this pointer corresponds to the head of the queue. + *) + inbox_level : int32; + (** Current inbox level, incremented when a new block arrives *) + ctxt : Context.ro; + state : Context.tree; + (** State of the PVM corresponding to all [accumulated_messages] @@ EoL *) + tot_messages_consumed : int; + (** How many messages have been fed to the user kernel by the sequencer kernel *) + accumulated_messages : Sc_rollup.Inbox_message.serialized list; + (** All the messages, which have to be fed to the sequencer kernel since the last block. + Workaround causing quadratic complexity of the simulation, will be removed when + https://gitlab.com/tezos/tezos/-/issues/6020 is done *) + block_beginning : Context.tree; + (** State of the PVM corresponding to the previous block. + Will be removed when https://gitlab.com/tezos/tezos/-/issues/6020 is done *) +} + +module type S = sig + type signer_ctxt + + (** Init simulation context with the first block, having level genesis + 1. *) + val init_ctxt : + signer_ctxt -> + Node_context.ro -> + Delayed_inbox.queue_slice -> + t tzresult Lwt.t + + (** New block arrives, adding a difference to the delayed inbox. *) + val new_block : + signer_ctxt -> + Node_context.ro -> + t -> + Delayed_inbox.queue_slice -> + t tzresult Lwt.t + + (** Append more messages to the simulation. *) + val append_messages : + signer_ctxt -> + Node_context.ro -> + t -> + Sc_rollup.Inbox_message.serialized list -> + t tzresult Lwt.t +end + +module Make (Enc : Messages_encoder) : + S with type signer_ctxt := Enc.signer_ctxt diff --git a/src/lib_scoru_sequencer/seq_batcher.ml b/src/lib_scoru_sequencer/seq_batcher.ml index bddb569f604d2259e90883f61919ab2b6c9f6ec8..b74e0ccf374b0ef66634b65b9f9ad5a70469a9ea 100644 --- a/src/lib_scoru_sequencer/seq_batcher.ml +++ b/src/lib_scoru_sequencer/seq_batcher.ml @@ -28,149 +28,164 @@ open Protocol open Alpha_context open Octez_smart_rollup_node_alpha open Octez_smart_rollup_node_alpha.Batcher_worker_types +open Durable_state module Message_queue = Hash_queue.Make (L2_message.Hash) (L2_message) -module Durable_state = Wasm_2_0_0_pvm.Durable_state let worker_name = "seq_batcher" -module Batcher_events = Batcher_events.Declare (struct - let worker_name = worker_name -end) - -module L2_batched_message = struct - type t = {content : string; l1_hash : Injector.Inj_operation.hash} +module Batcher_events = struct + include Batcher_events.Declare (struct + let worker_name = worker_name + end) + + let batched = + declare_3 + ~section + ~name:"batched_sequence" + ~msg: + "Batched Sequence consuming messages from delayed inbox queue in range \ + {delayed_inbox_pointer}, and {l2_messages} user messages. Scheduled \ + for injection as {l1_operation_hash}" + ~level:Notice + ("delayed_inbox_pointer", Kernel_durable.Delayed_inbox.Pointer.encoding) + ("l2_messages", Data_encoding.int32) + ("l1_operation_hash", Injector.Inj_operation.Hash.encoding) + + let optimistic_simulation_advanced = + declare_3 + ~section + ~name:"optimistic_simulation_advanced" + ~msg: + "Optimistic simulation advanced, {total_consumed} messages are \ + expected to be fed to the user kernel so far. Next message to process \ + has level: {last_level} and index {last_index}" + ~level:Notice + ("total_consumed", Data_encoding.int32) + ("last_level", Data_encoding.int32) + ("last_index", Data_encoding.int32) + + let emit_optimistic_simulation_advanced (sim_ctxt : Optimistic_simulation.t) = + (emit optimistic_simulation_advanced) + ( Int32.of_int sim_ctxt.tot_messages_consumed, + sim_ctxt.inbox_level, + Int32.of_int @@ List.length sim_ctxt.accumulated_messages ) end -module Batched_messages = Hash_queue.Make (L2_message.Hash) (L2_batched_message) - type state = { node_ctxt : Node_context.ro; - signer : Signature.public_key_hash; - messages : Message_queue.t; - batched : Batched_messages.t; - mutable simulation_ctxt : Simulation.t option; + signer : Signature.V0.public_key_hash * Client_keys.sk_uri; + (* This one is V0 because sequencer kernel doesn't support BLS signatures yet *) + block_finality : int; + (** Having block with level X, a block with level X - [block_finality] and less cannot reorg. *) + mutable last_injected_nonce : int32; + (** Sequence with [last_injected_nonce] nonce has been scheduled for injection. *) + mutable new_messages : Sc_rollup.Inbox_message.serialized list; + (** User messages received from RPC which will be included in the next Sequence *) + mutable simulation_ctxt : Optimistic_simulation.t option; } -(* Takes sequencer message to inject and L2 messages included into it *) -let inject_sequence state (sequencer_message, l2_messages) = - let open Lwt_result_syntax in - let operation = L1_operation.Add_messages {messages = [sequencer_message]} in - let+ l1_hash = - Injector.add_pending_operation ~source:state.signer operation - in - List.iter - (fun msg -> - let content = L2_message.content msg in - let hash = L2_message.hash msg in - Batched_messages.replace state.batched hash {content; l1_hash}) - l2_messages +let encode_sequence state = + Kernel_message.encode_and_sign_sequence + (state.node_ctxt.cctxt, snd state.signer) + state.node_ctxt.rollup_address -let inject_batches state = List.iter_es (inject_sequence state) +module Optimistic_simulation = Optimistic_simulation.Make (struct + type signer_ctxt = state -let get_previous_delayed_inbox_size node_ctxt (head : Layer1.head) = + let encode_sequence = encode_sequence +end) + +let get_simulation_ctxt_exn state = let open Lwt_result_syntax in - let*? () = - error_unless - (head.level >= node_ctxt.Node_context.genesis_info.level) - (Exn (Failure "Cannot obtain delayed inbox before origination level")) - in - let* previous_head = Node_context.get_predecessor node_ctxt head in - let first_inbox_level = Int32.succ node_ctxt.genesis_info.level in - let* ctxt = - if previous_head.level < first_inbox_level then - (* This is before we have interpreted the boot sector, so we start - with an empty context in genesis *) - return (Context.empty node_ctxt.context) - else Node_context.checkout_context node_ctxt previous_head.hash - in - let* _ctxt, state = Interpreter.state_of_head node_ctxt ctxt previous_head in - let open Kernel_durable in - let*! pointer_bytes = Durable_state.lookup state Delayed_inbox_pointer.path in - match pointer_bytes with - | None -> return 0 - | Some pointer_bytes -> - return - @@ Option.fold ~none:0 ~some:(fun x -> - Int32.(to_int @@ succ @@ sub x.Delayed_inbox_pointer.tail x.head)) - @@ Data_encoding.Binary.of_bytes_opt - Delayed_inbox_pointer.encoding - pointer_bytes - -let get_batch_sequences state head = + match state.simulation_ctxt with + | None -> + failwith "Simulation context of sequencer hasn't been initialized yet" + | Some opt -> return opt + +(** Represents sequence of messages ready to be injected. *) +type sequence_batch = { + delayed_inbox_pointer : Kernel_durable.Delayed_inbox.Pointer.t; + (** Delayed inbox range which will be consumed by the sequence *) + l2_messages : Sc_rollup.Inbox_message.serialized list; + (** Messages to be sent within the sequence *) + nonce : int32; + encoded_sequence : string; (** Encoded and signed sequence message *) +} + +(* Schedule sequencer message for injection, return L1 operation hash. *) +let inject_sequence ~source {encoded_sequence; _} = + let operation = L1_operation.Add_messages {messages = [encoded_sequence]} in + Injector.add_pending_operation ~source operation + +(* Create Sequence message out of received messages and + finalized delayed inbox messages. *) +let cut_sequence state = let open Lwt_result_syntax in - let* delayed_inbox_size = - get_previous_delayed_inbox_size state.node_ctxt head - in + let* sim_ctxt = get_simulation_ctxt_exn state in (* Assuming at the moment that all the registered messages fit into a single L2 message. - This logic will be extended later. - *) - let l2_messages = Message_queue.elements state.messages in - let*? l2_messages_serialized = - List.map_e - (fun m -> - Sc_rollup.Inbox_message.(serialize (External (L2_message.content m)))) + This logic will be extended later. *) + let delayed_inbox_pointer = sim_ctxt.current_block_diff in + let l2_messages = List.rev @@ state.new_messages in + let delayed_inbox_size = + Kernel_durable.Delayed_inbox.Pointer.size delayed_inbox_pointer + in + let new_nonce = Int32.succ state.last_injected_nonce in + let+ sequence_signed = + encode_sequence + state + ~nonce:new_nonce + ~prefix:(Int32.pred delayed_inbox_size) + ~suffix:1l l2_messages - |> Environment.wrap_tzresult in - return - ( [ - ( Kernel_message.encode_sequence_message - state.node_ctxt.rollup_address - ~prefix:(Int32.of_int (delayed_inbox_size - 1)) - ~suffix:1l - l2_messages_serialized, - l2_messages ); - ], - delayed_inbox_size ) - -let produce_batch_sequences state head = - let open Lwt_result_syntax in - let* batches, total_delayed_inbox_sizes = get_batch_sequences state head in - match batches with - | [] -> return_unit - | _ -> - let* () = inject_batches state batches in - let*! () = - Batcher_events.(emit batched) - (* As we add ALL the messages to the Sequence for now, - the number of messages is equal to length of state.messages *) - ( List.length batches, - total_delayed_inbox_sizes + Message_queue.length state.messages ) - in - Message_queue.clear state.messages ; - return_unit + { + delayed_inbox_pointer; + l2_messages; + nonce = new_nonce; + encoded_sequence = sequence_signed; + } -let simulate node_ctxt simulation_ctxt (messages : L2_message.t list) = +(* This one finalises current Sequence, schedule it for injection + and update optimistic simulation context with new head *) +let batch_sequence state new_head = let open Lwt_result_syntax in - let*? ext_messages = - Environment.wrap_tzresult - @@ List.map_e - (fun m -> - let open Result_syntax in - let open Sc_rollup.Inbox_message in - let+ msg = serialize @@ External (L2_message.content m) in - unsafe_to_string msg) - messages + let* sequence = cut_sequence state in + let* l1_op_hash = + inject_sequence + ~source:(Signature.Of_V0.public_key_hash @@ fst state.signer) + sequence + in + let*! () = + Batcher_events.(emit batched) + ( sequence.delayed_inbox_pointer, + Int32.of_int @@ List.length sequence.l2_messages, + l1_op_hash ) in - let+ simulation_ctxt, _ticks = - Simulation.simulate_messages node_ctxt simulation_ctxt ext_messages + let* new_block_delayed_inbox_diff = + get_delayed_inbox_diff state.node_ctxt new_head in - simulation_ctxt + let* cur_sim_ctxt = get_simulation_ctxt_exn state in + let* new_sim_ctxt = + Optimistic_simulation.new_block + state + state.node_ctxt + cur_sim_ctxt + new_block_delayed_inbox_diff + in + state.last_injected_nonce <- sequence.nonce ; + state.new_messages <- [] ; + state.simulation_ctxt <- Some new_sim_ctxt ; + let*! () = Batcher_events.emit_optimistic_simulation_advanced new_sim_ctxt in + return_unit (* Maximum size of single L2 message. - If L2 message size exceeds it, it means we won't be able to form a Sequence with solely this message -*) + If a L2 message size exceeds it, + it means we won't be even able to create a Sequence with this message only. *) let max_single_l2_msg_size = Protocol.Constants_repr.sc_rollup_message_size_limit - Kernel_message.single_l2_message_overhead - 4 (* each L2 message prepended with it size *) -let get_simulation_context state = - let open Lwt_result_syntax in - match state.simulation_ctxt with - | None -> failwith "Simulation context of sequencer not initialized" - | Some simulation_ctxt -> return simulation_ctxt - (*** HANDLERS IMPLEMENTATION ***) let on_register_messages state (messages : string list) = let open Lwt_result_syntax in @@ -185,39 +200,66 @@ let on_register_messages state (messages : string list) = else Ok (L2_message.make message)) messages in - let* simulation_ctxt = get_simulation_context state in - let* advanced_simulation_ctxt = - simulate state.node_ctxt simulation_ctxt messages - in - state.simulation_ctxt <- Some advanced_simulation_ctxt ; - let*! () = Batcher_events.(emit queue) (List.length messages) in - let hashes = - List.map - (fun message -> - let msg_hash = L2_message.hash message in - Message_queue.replace state.messages msg_hash message ; - msg_hash) + let*? external_serialized_messages = + List.map_e + (fun m -> + Sc_rollup.Inbox_message.(serialize @@ External (L2_message.content m))) messages + |> Environment.wrap_tzresult + in + let* current_simulation_ctxt = get_simulation_ctxt_exn state in + let* new_sim_ctxt = + Optimistic_simulation.append_messages + state + state.node_ctxt + current_simulation_ctxt + external_serialized_messages in - return hashes + state.new_messages <- + List.rev_append external_serialized_messages state.new_messages ; + state.simulation_ctxt <- Some new_sim_ctxt ; + let*! () = Batcher_events.(emit queue) (List.length messages) in + let*! () = Batcher_events.emit_optimistic_simulation_advanced new_sim_ctxt in + return @@ List.map L2_message.hash messages -let on_new_head state head = +let on_new_head state (head : Layer1.head) = let open Lwt_result_syntax in - let* () = produce_batch_sequences state head in - when_ (head.level >= state.node_ctxt.genesis_info.level) @@ fun () -> - let* simulation_ctxt = - Simulation.start_simulation ~reveal_map:None state.node_ctxt head + let genesis_level = state.node_ctxt.genesis_info.level in + let first_block_finalization_level = + Int32.add (Int32.succ genesis_level) (Int32.of_int state.block_finality) in - state.simulation_ctxt <- Some simulation_ctxt ; - return_unit + if head.level = Int32.succ genesis_level then ( + (* Init simulation context *) + let* first_delayed_inbox_diff = + get_delayed_inbox_diff state.node_ctxt head + in + let* sim_ctxt = + Optimistic_simulation.init_ctxt + state + state.node_ctxt + first_delayed_inbox_diff + in + state.simulation_ctxt <- Some sim_ctxt ; + let*! () = Batcher_events.emit_optimistic_simulation_advanced sim_ctxt in + return_unit) + else if head.level > first_block_finalization_level then + batch_sequence state head + else return_unit let init_batcher_state node_ctxt ~signer = - Lwt.return + let open Lwt_result_syntax in + let* _alias, _pk, sk = + Client_keys.V0.get_key node_ctxt.Node_context.cctxt signer + in + return { node_ctxt; - signer; - messages = Message_queue.create 100_000 (* ~ 400MB *); - batched = Batched_messages.create 100_000 (* ~ 400MB *); + signer = (signer, sk); + (* TODO: restore all the variables below from persistent storage *) + last_injected_nonce = 0l; + (* TODO: Make block finality argument of init *) + block_finality = 0; + new_messages = []; simulation_ctxt = None; } @@ -265,7 +307,18 @@ module Handlers = struct let on_launch _w () Types.{node_ctxt; signer} = let open Lwt_result_syntax in - let*! state = init_batcher_state node_ctxt ~signer in + let to_v0_exn pkh = + match + Signature.V0.Public_key_hash.of_bytes + @@ Signature.Public_key_hash.to_bytes pkh + with + | Error _ -> + invalid_arg + "Only Ed25519, Secp256k1, P256 keys are supported as an operator \ + key" + | Ok x -> x + in + let* state = init_batcher_state node_ctxt ~signer:(to_v0_exn signer) in return state let on_error (type a b) _w st (r : (a, b) Request.t) (errs : b) : @@ -351,17 +404,17 @@ let get_simulation_state () = let open Lwt_result_syntax in let*? w = Lazy.force worker in let state = Worker.state w in - let+ simulation_ctxt = get_simulation_context state in - simulation_ctxt.state + let+ sim_ctxt = get_simulation_ctxt_exn state in + sim_ctxt.state let get_simulated_state_value key = let open Lwt_result_syntax in let* sim_state = get_simulation_state () in - let*! result = Durable_state.lookup sim_state key in + let*! result = lookup_user_kernel sim_state key in return result let get_simulated_state_subkeys key = let open Lwt_result_syntax in let* sim_state = get_simulation_state () in - let*! result = Durable_state.list sim_state key in + let*! result = list_user_kernel sim_state key in return result diff --git a/src/lib_scoru_sequencer/test/test_kernel_message.ml b/src/lib_scoru_sequencer/test/test_kernel_message.ml index 1d733a8818fa15d8e2271e2821eae3bceaeeb929..ee545a35f3d6345192a74fca4c86c275821d1ed9 100644 --- a/src/lib_scoru_sequencer/test/test_kernel_message.ml +++ b/src/lib_scoru_sequencer/test/test_kernel_message.ml @@ -35,9 +35,18 @@ open Alpha_context open Tztest module KM = Octez_smart_rollup_sequencer.Kernel_message -let test_signed ?loc expected_hex encoded_msg = +(* TODO fix it: actually sign it *) +let test_encoding ?loc expected_hex (`Unsigned_encoded encoded_msg) = let open Lwt_result_syntax in - let result_hex = Hex.show @@ Hex.of_string encoded_msg in + let dummy_signature = + Signature.V0.of_bytes_exn @@ Bytes.make Signature.V0.size @@ Char.chr 0 + in + let dummy_signed = + Data_encoding.Binary.to_string_exn + KM.signed_raw_encoding + {unsigned_payload = encoded_msg; signature = dummy_signature} + in + let result_hex = Hex.show @@ Hex.of_string dummy_signed in Assert.equal ?loc ~msg:"Encoded hex don't match" expected_hex result_hex ; return_unit @@ -52,23 +61,29 @@ let test_empty_suffix_n_prefix () = "006227a8721213bd7ddb9b56227e3acb01161b1e67000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" in let* () = - test_signed + test_encoding ~loc:__LOC__ empty_l2_messages_hex - (KM.encode_sequence_message rollup_address ~prefix:0l ~suffix:0l []) + (KM.encode_sequence_message + rollup_address + ~prefix:0l + ~suffix:0l + ~nonce:0l + []) in let single_empty_l2_message_hex = "006227a8721213bd7ddb9b56227e3acb01161b1e6700000000000000000000000000000000040000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" in let* () = - test_signed + test_encoding ~loc:__LOC__ single_empty_l2_message_hex (KM.encode_sequence_message rollup_address ~prefix:0l ~suffix:0l + ~nonce:0l [Sc_rollup.Inbox_message.unsafe_of_string ""]) in @@ -76,26 +91,28 @@ let test_empty_suffix_n_prefix () = "006227a8721213bd7ddb9b56227e3acb01161b1e6700000000000000000000000000000000090000000568656c6c6f00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" in let* () = - test_signed + test_encoding ~loc:__LOC__ hello_l2_message_hex (KM.encode_sequence_message rollup_address ~prefix:0l ~suffix:0l + ~nonce:0l [Sc_rollup.Inbox_message.unsafe_of_string "hello"]) in let hello_world_l2_message_hex = "006227a8721213bd7ddb9b56227e3acb01161b1e6700000000000000000000000000000000120000000568656c6c6f00000005776f726c6400000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000" in - test_signed + test_encoding ~loc:__LOC__ hello_world_l2_message_hex (KM.encode_sequence_message rollup_address ~prefix:0l ~suffix:0l + ~nonce:0l (List.map Sc_rollup.Inbox_message.unsafe_of_string ["hello"; "world"])) let tests = diff --git a/src/proto_016_PtMumbai/lib_sc_rollup_node/daemon.ml b/src/proto_016_PtMumbai/lib_sc_rollup_node/daemon.ml index 2238a82fa858c3ea6e7e4ab8a5b00991bcb81568..596ced976206395ba99011603f2f2ff047b365f1 100644 --- a/src/proto_016_PtMumbai/lib_sc_rollup_node/daemon.ml +++ b/src/proto_016_PtMumbai/lib_sc_rollup_node/daemon.ml @@ -592,7 +592,7 @@ let run node_ctxt configuration let open Lwt_result_syntax in let (module Components) = daemon_components in let* () = check_initial_state_hash node_ctxt in - let* rpc_server = RPC_server.start node_ctxt configuration in + let* rpc_server = Components.RPC_server.start node_ctxt configuration in let (_ : Lwt_exit.clean_up_callback_id) = install_finalizer daemon_components node_ctxt rpc_server in diff --git a/src/proto_017_PtNairob/lib_sc_rollup_node/daemon.ml b/src/proto_017_PtNairob/lib_sc_rollup_node/daemon.ml index 0320a9683f18fb5dd20f2caea286a5bea7d2d0aa..4fe71ecfb60d3437688d5efc95e85f8671522e6d 100644 --- a/src/proto_017_PtNairob/lib_sc_rollup_node/daemon.ml +++ b/src/proto_017_PtNairob/lib_sc_rollup_node/daemon.ml @@ -588,7 +588,7 @@ let run node_ctxt configuration let open Lwt_result_syntax in let (module Components) = daemon_components in let* () = check_initial_state_hash node_ctxt in - let* rpc_server = RPC_server.start node_ctxt configuration in + let* rpc_server = Components.RPC_server.start node_ctxt configuration in let (_ : Lwt_exit.clean_up_callback_id) = install_finalizer daemon_components node_ctxt rpc_server in diff --git a/src/proto_018_Proxford/lib_sc_rollup_node/daemon.ml b/src/proto_018_Proxford/lib_sc_rollup_node/daemon.ml index 47e7201f25a0fa3d5bc40b2bb94d17a79a20b9fe..11d84737cc74d83e9f1ccd39c0b7ad46daaf7c44 100644 --- a/src/proto_018_Proxford/lib_sc_rollup_node/daemon.ml +++ b/src/proto_018_Proxford/lib_sc_rollup_node/daemon.ml @@ -588,7 +588,7 @@ let run node_ctxt configuration let open Lwt_result_syntax in let (module Components) = daemon_components in let* () = check_initial_state_hash node_ctxt in - let* rpc_server = RPC_server.start node_ctxt configuration in + let* rpc_server = Components.RPC_server.start node_ctxt configuration in let (_ : Lwt_exit.clean_up_callback_id) = install_finalizer daemon_components node_ctxt rpc_server in diff --git a/src/proto_alpha/lib_sc_rollup_node/daemon.ml b/src/proto_alpha/lib_sc_rollup_node/daemon.ml index ce639a407cff766237c0dbd58539b48c9a127177..5b6722ee8e14d66b4eff2e7b020536a24e7943a7 100644 --- a/src/proto_alpha/lib_sc_rollup_node/daemon.ml +++ b/src/proto_alpha/lib_sc_rollup_node/daemon.ml @@ -594,7 +594,7 @@ let run node_ctxt configuration let open Lwt_result_syntax in let (module Components) = daemon_components in let* () = check_initial_state_hash node_ctxt in - let* rpc_server = RPC_server.start node_ctxt configuration in + let* rpc_server = Components.RPC_server.start node_ctxt configuration in let (_ : Lwt_exit.clean_up_callback_id) = install_finalizer daemon_components node_ctxt rpc_server in diff --git a/tezt/lib_tezos/sc_rollup_client.ml b/tezt/lib_tezos/sc_rollup_client.ml index d28264fce24476970a2dfec7cacb85404f5c1fe1..5385037e5a80d87528b1334533e0900326cbb8a3 100644 --- a/tezt/lib_tezos/sc_rollup_client.ml +++ b/tezt/lib_tezos/sc_rollup_client.ml @@ -282,17 +282,6 @@ let inspect_durable_state_value : rpc_req () |> Runnable.map (fun json -> List.map JSON.as_string (JSON.as_list json)) -(* match expected with - | Expected_success -> ( - let*! json = rpc_req () in - match operation with - | Value -> return (JSON.as_string json : a) - | Length -> return (JSON.as_int64 json : a) - | Subkeys -> return (List.map JSON.as_string (JSON.as_list json))) - | Expected_error msg -> - let*? process = rpc_req () in - Process.check_error ~msg process *) - let ticks ?hooks ?(block = "head") sc_client = let res = rpc_get ?hooks sc_client ["global"; "block"; block; "ticks"] in Runnable.map JSON.as_int res diff --git a/tezt/lib_tezos/sc_rollup_node.ml b/tezt/lib_tezos/sc_rollup_node.ml index f14af4155d5d8d2f52481d2370b3f4e6a93efc3c..6b3bc7f1c97c7fdd6c0d6374fba38a85ef466743 100644 --- a/tezt/lib_tezos/sc_rollup_node.ml +++ b/tezt/lib_tezos/sc_rollup_node.ml @@ -392,6 +392,16 @@ let run ?legacy ?event_level ?event_sections_levels ?loser_mode let* () = if wait_ready then wait_for_ready node else unit in return () +let run_sequencer ?event_level ?event_sections_levels ?(wait_ready = true) node + rollup_address extra_arguments = + let cmd = + ["run"; "for"; rollup_address; "with"; "operator"] + @ operators_params node @ common_node_args node @ extra_arguments + in + let* () = do_runlike_command ?event_level ?event_sections_levels node cmd in + let* () = if wait_ready then wait_for_ready node else unit in + return () + let spawn_run node rollup_address extra_arguments = let mode, args = node_args node rollup_address in spawn_command node (["run"; mode] @ args @ extra_arguments) diff --git a/tezt/lib_tezos/sc_rollup_node.mli b/tezt/lib_tezos/sc_rollup_node.mli index 5db40c7f74eb4c642e35a2535cb183e535c04d8d..54dc9db2b32c59aa685e64955da2b42c69742bec 100644 --- a/tezt/lib_tezos/sc_rollup_node.mli +++ b/tezt/lib_tezos/sc_rollup_node.mli @@ -144,6 +144,15 @@ val run : string list -> unit Lwt.t +val run_sequencer : + ?event_level:Daemon.Level.default_level -> + ?event_sections_levels:(string * Daemon.Level.level) list -> + ?wait_ready:bool -> + t -> + string -> + string list -> + unit Lwt.t + (** [spawn_run node rollup_address arguments] is a lightweight version of {!run} that spawns a process. *) val spawn_run : t -> string -> string list -> Process.t diff --git a/tezt/tests/main.ml b/tezt/tests/main.ml index 7257c1dde12998d80dafc30dd3d789f690a21288..9e959dbf8e01af29e2b330702ac1df61df510e8d 100644 --- a/tezt/tests/main.ml +++ b/tezt/tests/main.ml @@ -224,6 +224,7 @@ let register_protocol_specific_because_regression_tests () = (* TODO: https://gitlab.com/tezos/tezos/-/issues/4652 re-enable Mumbai when DAC is separated from Dal node. *) Tx_sc_rollup.register ~protocols:[Alpha] ; + Sc_sequencer.register ~protocols:[Alpha] ; Snoop_codegen.register ~protocols:[Alpha] ; Timelock.register ~protocols:[Alpha] ; Timelock_disabled.register ~protocols:[Mumbai] diff --git a/tezt/tests/sc_sequencer.ml b/tezt/tests/sc_sequencer.ml new file mode 100644 index 0000000000000000000000000000000000000000..3aed61137e12ba0fe57c5ef3bdbee6f66311d7ed --- /dev/null +++ b/tezt/tests/sc_sequencer.ml @@ -0,0 +1,454 @@ +(*****************************************************************************) +(* *) +(* Open Source License *) +(* Copyright (c) 2023 Nomadic Labs *) +(* Copyright (c) 2023 TriliTech *) +(* *) +(* Permission is hereby granted, free of charge, to any person obtaining a *) +(* copy of this software and associated documentation files (the "Software"),*) +(* to deal in the Software without restriction, including without limitation *) +(* the rights to use, copy, modify, merge, publish, distribute, sublicense, *) +(* and/or sell copies of the Software, and to permit persons to whom the *) +(* Software is furnished to do so, subject to the following conditions: *) +(* *) +(* The above copyright notice and this permission notice shall be included *) +(* in all copies or substantial portions of the Software. *) +(* *) +(* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR*) +(* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, *) +(* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL *) +(* THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER*) +(* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING *) +(* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER *) +(* DEALINGS IN THE SOFTWARE. *) +(* *) +(*****************************************************************************) + +(* Testing + ------- + Component: Smart Optimistic Rollups: Sequencer + Invocation: dune exec tezt/tests/main.exe -- --file sc_sequencer.ml +*) +open Sc_rollup_helpers +open Tezos_protocol_alpha.Protocol + +let pvm_kind = "wasm_2_0_0" + +type full_sequencer_setup = { + node : Node.t; + client : Client.t; + sc_sequencer_node : Sc_rollup_node.t; + sc_rollup_client : Sc_rollup_client.t; + sc_rollup_address : string; + originator_key : string; + sequencer_key : string; +} + +let next_rollup_level {node; client; sc_sequencer_node; _} = + let* () = Client.bake_for_and_wait client in + Sc_rollup_node.wait_for_level + ~timeout:30. + sc_sequencer_node + (Node.get_level node) + +let setup_sequencer_kernel + ?(originator_key = Constant.bootstrap1.public_key_hash) + ?(sequencer_key = Constant.bootstrap1.public_key_hash) sequenced_kernel + protocol = + let* node, client = setup_l1 protocol in + let sc_sequencer_node = + Sc_rollup_node.create + Custom + node + ~path:"./octez-smart-rollup-sequencer-node" + ~base_dir:(Client.base_dir client) + ~default_operator:sequencer_key + in + (* Prepare sequencer kernel & originate it *) + let* boot_sector = + prepare_installer_kernel + ~base_installee:"./" + ~config: + [ + Installer_kernel_config.Set + { + value = + (* encodings of State::Sequenced(edpkuBknW28nW72KG6RoHtYW7p12T6GKc7nAbwYX5m8Wd9sDVC9yav) *) + "00004798d2cc98473d7e250c898885718afd2e4efbcb1a1595ab9730761ed830de0f"; + to_ = "/__sequencer/state"; + }; + ] + ~preimages_dir: + (Filename.concat + (Sc_rollup_node.data_dir sc_sequencer_node) + "wasm_2_0_0") + sequenced_kernel + in + let* sc_rollup_address = + originate_sc_rollup + ~kind:pvm_kind + ~boot_sector + ~parameters_ty:"unit" + ~src:originator_key + client + in + (* Start a sequencer node *) + let* () = + Sc_rollup_node.run_sequencer + sc_sequencer_node + sc_rollup_address + ["--log-kernel-debug"] + in + let sc_rollup_client = Sc_rollup_client.create ~protocol sc_sequencer_node in + let setup = + { + node; + client; + sc_sequencer_node; + sc_rollup_client; + sc_rollup_address; + originator_key; + sequencer_key; + } + in + Lwt.return setup + +let wrap_with_framed rollup_address msg = + (* Byte from framing protocol, then smart rollup address, then message bytes *) + String.concat + "" + [ + "\000"; + Data_encoding.Binary.to_string_exn + Sc_rollup_repr.Address.encoding + rollup_address; + msg; + ] + +let send_message ~src client raw_msg = + Client.Sc_rollup.send_message + ~hooks + ~src + ~msg:(sf "hex:[%S]" @@ hex_encode raw_msg) + client + +let wait_for_sequence_debug_message sc_node = + Sc_rollup_node.wait_for sc_node "kernel_debug.v0" @@ fun json -> + let message = JSON.as_string json in + if String.starts_with ~prefix:"Received a sequence message" message then + Some message + else None + +let wait_for_optimistic_simulation_advanced sc_node expected_tot_messages = + Sc_rollup_node.wait_for sc_node "optimistic_simulation_advanced.v0" + @@ fun json -> + let tot_messages = JSON.(json |-> "total_consumed" |> as_int32) in + if tot_messages >= expected_tot_messages then Some () else None + +let pp_sequencer_msg_body ppf (seq, signature) = + let open Octez_smart_rollup_sequencer.Kernel_message in + match seq with + | Sequence seq -> + Format.fprintf + ppf + "Sequence { nonce: %ld, delayed_messages_prefix: %ld, \ + delayed_messages_suffix: %ld, messages: [], signature: \ + Signature(\"%s\") }" + seq.nonce + seq.delayed_messages_prefix + seq.delayed_messages_suffix + signature + +let test_delayed_inbox_consumed = + Protocol.register_test + ~__FILE__ + ~tags:["sequencer"] + ~title:"Originate sequencer kernel & consume delayed inbox messages" + @@ fun protocol -> + let* ({client; sc_rollup_address; sc_sequencer_node; _} as setup) = + setup_sequencer_kernel "sequenced_empty_kernel" protocol + in + let sc_rollup_address = + Sc_rollup_repr.Address.of_b58check_exn sc_rollup_address + in + let* () = + send_message ~src:Constant.bootstrap2.alias client + @@ wrap_with_framed sc_rollup_address "\000\000\000" + in + let* () = + send_message ~src:Constant.bootstrap3.alias client + @@ wrap_with_framed sc_rollup_address "\000\000\001" + in + + (* Start async collection of sequence debug messages from the kernel *) + let collected_sequences = ref [] in + let _ = + let rec collect_sequences () = + let* c = wait_for_sequence_debug_message sc_sequencer_node in + collected_sequences := c :: !collected_sequences ; + collect_sequences () + in + collect_sequences () + in + + (* Bake block with those user messages, which has level 3, origination level is 2. + + This block will incorporate "\000\000\000" and "\000\000\001" *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 3 ------------------------ *) + + (* At this moment delayed inbox corresponding to the previous block is empty, + hence, a Sequence with 0 delayed inbox messages and 0 user messages has been batched, + which denoted as S0. *) + + (* Bake a block with level 4 *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 4 ------------------------ *) + + (* At this moment delayed inbox corresponding to the previous block have 5 messages: + [SoL3, IpL3, "\000\000\000", "\000\000\001", EoL3]. + Seq_batcher has batched a Sequence with + 5 delayed inbox messages and 0 L2 messages, which denoted S1. + *) + + (* Bake a block with level 5, incorporating S0 *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 5 ------------------------ *) + + (* At this moment delayed inbox corresponding to the previous block have 3 messages: + [SoL4, IpL4, EoL4]. + 5 delayed inbox messages have been consumed by the previous block. + Seq_batcher has batched a Sequence with the 3 delayed inbox messages + and 0 L2 messages, which denoted S2. + *) + + (* Inject S1 into an upcoming block with level 6 *) + + (* Bake a block with level 6, incorporating S1 *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 6 ------------------------ *) + + (* Feed to the sequencer kernel S1 sequence *) + + (* Inject S2 into an upcoming block with level 7 *) + (* Following S3 is not going to be injected within this test, so we ingore consideration of it *) + + (* Bake a block with level 7, incorporating S2 *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 7 ------------------------ *) + + (* Feed to the sequencer kernel S2 sequence *) + let open Octez_smart_rollup_sequencer.Kernel_message in + let expected_sequences = + List.map + (Format.asprintf + "Received a sequence message %a targeting our rollup" + pp_sequencer_msg_body) + @@ [ + ( Sequence + { + nonce = 1l; + delayed_messages_prefix = 4l; + delayed_messages_suffix = 1l; + l2_messages = []; + }, + "sigj7K7PS9KBuhf8r5ddaxHFFzuHZ38T5VHi1oL5cZUtUm1U832gvZvgP1f3u6bPA5neqX1qCWPBJ7hsxmEyixNiAseWxurt" + ); + ( Sequence + { + nonce = 2l; + delayed_messages_prefix = 2l; + delayed_messages_suffix = 1l; + l2_messages = []; + }, + "sigvJUVDTbXNk47175jHb9CS8Vy1EwMkUMFh14QcuNNp2AgMTVo5y1THPYaGYtmhfa8bSKhJFy2GoaAJmsPdU5DyLXxc1Pjd" + ); + ] + in + Check.( + ( = ) + expected_sequences + (List.rev !collected_sequences) + ~__LOC__ + (list string) + ~error_msg:"Unexpected debug messages emitted") ; + Lwt.return_unit + +let rpc_inject_messages = Sc_rollup_client.inject ~hooks + +let rpc_get_optimistic_storage_value sc_client key = + Sc_rollup_client.rpc_get_rich + ~hooks + sc_client + ["local"; "durable"; "wasm_2_0_0"; "value"] + [("key", key)] + +let test_optimistic_state ?(allow_prefix = false) ~__LOC__ sc_rollup_client + expected_prefix = + let*! concated_string = + rpc_get_optimistic_storage_value sc_rollup_client "/concat" + in + let res_concated_string = + Hex.to_string @@ `Hex (JSON.as_string concated_string) + in + Check.is_true + ~__LOC__ + (if allow_prefix then + String.starts_with ~prefix:expected_prefix res_concated_string + else expected_prefix = res_concated_string) + ~error_msg: + (Format.asprintf "Unexpected optimistic state: %s" res_concated_string) ; + return () + +let test_optimistic_state_computed_correctly = + Protocol.register_test + ~__FILE__ + ~tags:["sequencer"] + ~title: + "Supply messages via RPC and through L1 directly, making sure optimistic \ + durable state equals to the expected one" + @@ fun protocol -> + let* ({client; sc_rollup_address; sc_rollup_client; sc_sequencer_node; _} as + setup) = + setup_sequencer_kernel "sequenced_concat_kernel" protocol + in + let final_concat_msgs = + [ + "[SoL 3"; + "IpL 3"; + "L1_message2"; + "L1_message1"; + "RPC_message1"; + "RPC_message2"; + "EoL 4]"; + (* TODO: EoL 4 should be EoL 3 instead, + it will be fixed when corresponding bug in the sequencer kernel fixed *) + "[SoL 4"; + "IpL 4"; + "L1_message3"; + "RPC_message3"; + "RPC_message4"; + "EoL 5]"; + (* TODO: EoL 5 should be EoL 4 instead, + it will be fixed when corresponding bug in the sequencer kernel fixed *) + "[SoL 5"; + "IpL 5"; + "EoL 6]"; + ] + in + let test_concat_prefix ?(allow_prefix = false) ~__LOC__ last_el = + let indexed = List.mapi (fun i x -> (i, x)) final_concat_msgs in + let position = fst @@ List.find (fun (_, x) -> x = last_el) indexed in + let expected = + (String.concat "; " + @@ Tezos_stdlib.TzList.take_n (position + 1) final_concat_msgs) + ^ "; " + in + test_optimistic_state ~allow_prefix ~__LOC__ sc_rollup_client expected + in + (* Before we start, let's create Lwt tasks to wait for event logs + which we will need further in the test *) + let sim_event = wait_for_optimistic_simulation_advanced sc_sequencer_node in + let sim_ev4 = sim_event 4l in + let sim_ev6 = sim_event 6l in + let sim_ev10 = sim_event 10l in + let sim_ev12 = sim_event 12l in + let sim_ev15 = sim_event 15l in + let sc_rollup_address = + Sc_rollup_repr.Address.of_b58check_exn sc_rollup_address + in + let* () = + send_message ~src:Constant.bootstrap2.alias client + @@ wrap_with_framed sc_rollup_address "L1_message1" + in + let* () = + send_message ~src:Constant.bootstrap3.alias client + @@ wrap_with_framed sc_rollup_address "L1_message2" + in + + (* Bake block with the L1 user messages, which has level 3, origination level is 2. + + This block will incorporate "L1_message1" and "L1_message2" *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 3 ------------------------ *) + + (* At this moment we just initialized optimistic simulation context *) + let* () = sim_ev4 in + let* () = test_concat_prefix ~__LOC__ "L1_message1" in + + (* Inject messages through RPC *) + let*! _messages_hashes = + rpc_inject_messages sc_rollup_client + @@ List.map + (wrap_with_framed sc_rollup_address) + ["RPC_message1"; "RPC_message2"] + in + + let* () = sim_ev6 in + let* () = test_concat_prefix ~__LOC__ "RPC_message2" in + + (* Inject more messages directly in L1 *) + let* () = + send_message ~src:Constant.bootstrap2.alias client + @@ wrap_with_framed sc_rollup_address "L1_message3" + in + + (* Bake a block with level 4, that will incorporate "L1_message3" *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 4 ------------------------ *) + let* () = sim_ev10 in + let* () = test_concat_prefix ~__LOC__ "L1_message3" in + + (* At this moment delayed inbox corresponding to the previous block have 5 messages: + [SoL, IpL, "L1_message1", "L1_message2", EoL]. + Seq_batcher has batched a Sequence with those 5 delayed inbox messages + and ["RPC_message1"; "RPC_message2"] which denoted S1. + *) + + (* Inject more messages through RPC *) + let*! _messages_hashes = + rpc_inject_messages sc_rollup_client + @@ List.map + (wrap_with_framed sc_rollup_address) + ["RPC_message3"; "RPC_message4"] + in + + let* () = sim_ev12 in + let* () = test_concat_prefix ~__LOC__ "RPC_message4" in + + (* Bake a block with level 5, incorporating S0 and "L1_message3" *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 5 ------------------------ *) + let* () = sim_ev15 in + let* () = test_concat_prefix ~__LOC__ "IpL 5" in + + (* At this moment delayed inbox corresponding to the previous block have 4 messages: + [SoL4, IpL4, "L1_message3", EoL4]. + 5 delayed inbox messages have been consumed by the previous block. + Seq_batcher has batched a Sequence with the 4 delayed inbox messages + and ["RPC_message3"; "RPC_message4"] user messages, which denoted S2. + *) + + (* Inject S1 into an upcoming block with level 6 *) + + (* Bake a block with level 6, containing S1 *) + let* _ = next_rollup_level setup in + + (* ------------------------ NEW BLOCK level = 6 ------------------------ *) + let* () = wait_for_optimistic_simulation_advanced sc_sequencer_node 16l in + let* () = test_concat_prefix ~allow_prefix:true ~__LOC__ "EoL 6]" in + + (* Feed to the sequencer kernel S1 sequence *) + Lwt.return_unit + +let register ~protocols = + test_delayed_inbox_consumed protocols ; + test_optimistic_state_computed_correctly protocols