diff --git a/src/kernel_sequencer/src/delayed_inbox.rs b/src/kernel_sequencer/src/delayed_inbox.rs index 0504b886b29d250708d6f52c1a9d5d155be44ffd..784a72bf08461ad76c583bd942ca60f7b01b6c6a 100644 --- a/src/kernel_sequencer/src/delayed_inbox.rs +++ b/src/kernel_sequencer/src/delayed_inbox.rs @@ -3,8 +3,8 @@ // // SPDX-License-Identifier: MIT +use tezos_data_encoding::enc::BinWriter; use tezos_data_encoding::nom::NomReader; -use tezos_data_encoding_derive::BinWriter; use tezos_smart_rollup_debug::debug_msg; use tezos_smart_rollup_encoding::smart_rollup::SmartRollupAddress; use tezos_smart_rollup_host::{ @@ -17,13 +17,14 @@ use crate::{ message::{Framed, KernelMessage, Sequence, SequencerMsg, SetSequencer}, queue::Queue, routing::FilterBehavior, + state::update_state, }; /// Message added to the delayed inbox -#[derive(BinWriter)] +#[derive(BinWriter, NomReader)] pub struct UserMessage { - timeout_level: u32, - payload: Vec, + pub(crate) timeout_level: u32, + pub(crate) payload: Vec, } /// Return a message from the inbox @@ -47,6 +48,11 @@ pub fn read_input( None => return Ok(None), // No more messages to be processed Some(msg) => { let payload = msg.as_ref(); + // Verify the state of the delayed inbox on SoL + if let [0x00, 0x00, ..] = payload { + update_state(host, delayed_inbox_queue, msg.level)?; + } + let message = KernelMessage::nom_read(payload); match message { Err(_) => {} diff --git a/src/kernel_sequencer/src/lib.rs b/src/kernel_sequencer/src/lib.rs index 77feb871e52aad0533e0c7cf0c3d621092235ba3..0b025124ff91d7f55eb0a39f8a471ae8f7d6cf14 100644 --- a/src/kernel_sequencer/src/lib.rs +++ b/src/kernel_sequencer/src/lib.rs @@ -8,6 +8,7 @@ mod queue; pub mod routing; mod sequencer_macro; pub mod sequencer_runtime; +mod state; mod storage; pub use routing::FilterBehavior; diff --git a/src/kernel_sequencer/src/queue.rs b/src/kernel_sequencer/src/queue.rs index 7ae9c0e8e72ea9bacaf31e79d2c8219c656dd9b8..2a695975170ec0454393f15541be00f24fcb96a8 100644 --- a/src/kernel_sequencer/src/queue.rs +++ b/src/kernel_sequencer/src/queue.rs @@ -253,6 +253,27 @@ impl Queue { Ok(Some(element)) } + + /// Read the last element of the queue + /// + /// It does not remove it + pub fn head(&self, host: &Host) -> Result, RuntimeError> + where + E: NomReader, + { + let Some(Pointer { head, .. }) = self.pointer else {return Ok(None)}; + + // get the path of the stored element + let path = self.element_path(head)?; + // get the size of this element + let size = host.store_value_size(&path)?; + // get the bytes from the durable storage + let bytes = host.store_read(&path, 0, size)?; + // deserialize the bytes into the corresponding type + let (_, data) = E::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + // returns the parsed type + Ok(Some(data)) + } } #[cfg(test)] @@ -432,4 +453,43 @@ mod tests2 { assert_eq!(e1, Element::new(0)); assert_eq!(e2, Element::new(1)); } + + #[test] + fn test_head_empty_queue() { + let host = MockHost::default(); + let queue = Queue::new(&host, &QUEUE_PATH).expect("create first queue"); + + let head: Option = queue.head(&host).expect("should not be runtime work"); + + assert!(queue.is_empty()); + assert_eq!(head, None); + } + + #[test] + fn test_head_one_element_queue() { + let mut host = MockHost::default(); + let mut queue = Queue::new(&host, &QUEUE_PATH).expect("create first queue"); + + queue.add(&mut host, &Element::new(0)).unwrap(); + + let head: Option = queue.head(&host).expect("should not be runtime error"); + + assert_eq!(head, Some(Element::new(0))); + } + + #[test] + fn test_head_two_elements_queue() { + let mut host = MockHost::default(); + let mut queue = Queue::new(&host, &QUEUE_PATH).expect("create first queue"); + + queue.add(&mut host, &Element::new(0)).unwrap(); + queue.add(&mut host, &Element::new(1)).unwrap(); + + let e1: Option = queue.head(&host).expect("should not be runtime error"); + let _: Option = queue.pop(&mut host).expect("Can pop element"); + let e2: Option = queue.head(&host).expect("should not be runtime error"); + + assert_eq!(e1, Some(Element::new(0))); + assert_eq!(e2, Some(Element::new(1))); + } } diff --git a/src/kernel_sequencer/src/state.rs b/src/kernel_sequencer/src/state.rs new file mode 100644 index 0000000000000000000000000000000000000000..629a53a51e40126a2846599b39a854d6da486ddd --- /dev/null +++ b/src/kernel_sequencer/src/state.rs @@ -0,0 +1,120 @@ +// SPDX-FileCopyrightText: 2023 Marigold +// +// SPDX-License-Identifier: MIT + +use tezos_data_encoding::{enc::BinWriter, nom::NomReader}; +use tezos_smart_rollup_encoding::public_key::PublicKey; +use tezos_smart_rollup_host::runtime::{Runtime, RuntimeError}; + +use crate::{delayed_inbox::UserMessage, queue::Queue, storage::write_state}; + +/// Represent the state of the delayed inbox +/// +/// The delayed inbox has 2 states: +/// - Sequenced(SmartRollupAddress): +/// the delayed inbox accepts messages from the registered sequencer +/// - Fallback: +/// the kernel will process by itself the messages from the delayed inbox, +/// it's also the default mode +#[derive(Debug, PartialEq, Eq, NomReader, BinWriter, Clone)] +pub enum State { + Sequenced(PublicKey), + Fallback, +} + +impl Default for State { + fn default() -> Self { + State::Fallback + } +} + +/// If the head of the inbox is too old, the state switch to the Fallback mode. +pub fn update_state( + host: &mut H, + delayed_inbox_queue: &Queue, + current_level: u32, +) -> Result<(), RuntimeError> { + let head = delayed_inbox_queue.head(host)?; + + match head { + None => Ok(()), + Some(UserMessage { timeout_level, .. }) => { + if timeout_level < current_level { + write_state(host, State::Fallback)?; + Ok(()) + } else { + Ok(()) + } + } + } +} + +#[cfg(test)] +mod tests { + use tezos_smart_rollup_encoding::public_key::PublicKey; + use tezos_smart_rollup_host::path::RefPath; + use tezos_smart_rollup_mock::MockHost; + + use crate::{ + delayed_inbox::UserMessage, + queue::Queue, + state::State, + storage::{read_state, write_state}, + }; + + use super::update_state; + + fn sequenced_state() -> State { + State::Sequenced( + PublicKey::from_b58check("edpkuDMUm7Y53wp4gxeLBXuiAhXZrLn8XB1R83ksvvesH8Lp8bmCfK") + .expect("decoding should work"), + ) + } + + #[test] + fn test_switch_to_fallback() { + let mut mock_host = MockHost::default(); + let mut delayed_inbox_queue = Queue::new(&mock_host, &RefPath::assert_from(b"/queue")) + .expect("queue should be created"); + let message = UserMessage { + timeout_level: 32, + payload: Vec::default(), + }; + write_state(&mut mock_host, sequenced_state()).expect("Writing a new state should work"); + + delayed_inbox_queue + .add(&mut mock_host, &message) + .expect("Element should be added to the queue"); + + update_state(&mut mock_host, &delayed_inbox_queue, 100) + .expect("Updating the state should work"); + + let state = read_state(&mut mock_host).expect("reading the state should work"); + + assert_eq!(state, State::Fallback); + } + + #[test] + fn test_same_state() { + let mut mock_host = MockHost::default(); + let mut delayed_inbox_queue = Queue::new(&mock_host, &RefPath::assert_from(b"/queue")) + .expect("queue should be created"); + let message = UserMessage { + timeout_level: 32, + payload: Vec::default(), + }; + + write_state(&mut mock_host, sequenced_state()).expect("Writing a new state should work"); + + delayed_inbox_queue + .add(&mut mock_host, &message) + .expect("Element should be added to the queue"); + + update_state(&mut mock_host, &delayed_inbox_queue, 16) + .expect("Updating the state should work"); + + let read_state = read_state(&mut mock_host).expect("reading the state should work"); + + assert_eq!(read_state, sequenced_state()); + } +} diff --git a/src/kernel_sequencer/src/storage.rs b/src/kernel_sequencer/src/storage.rs index a4ae685258c6be0126556a6614fab24da6705e45..26b8420bbc5abe487ace56cbab4665a8f99a0128 100644 --- a/src/kernel_sequencer/src/storage.rs +++ b/src/kernel_sequencer/src/storage.rs @@ -2,15 +2,20 @@ // // SPDX-License-Identifier: MIT +use crate::state::State; +use tezos_data_encoding::enc::BinWriter; +#[cfg(test)] +use tezos_data_encoding::nom::NomReader; use tezos_smart_rollup_host::{ path::{concat, OwnedPath, Path, RefPath, PATH_SEPARATOR}, - runtime::RuntimeError, + runtime::{Runtime, RuntimeError}, Error, }; const SEQUENCER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/__sequencer"); const USER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/u"); pub const DELAYED_INBOX_PATH: RefPath = RefPath::assert_from(b"/delayed-inbox"); +const STATE: RefPath = RefPath::assert_from(b"/state"); /// Prefix the given path by `/__sequencer`. /// @@ -33,11 +38,46 @@ pub fn map_user_path(path: &T) -> Result { } } +/// Write the sequencer kernel state under /__sequencer/state +pub fn write_state(host: &mut H, state: State) -> Result<(), RuntimeError> { + let path = sequencer_prefix(&STATE)?; + let mut bytes = Vec::default(); + state + .bin_write(&mut bytes) + .map_err(|_| RuntimeError::DecodingError)?; + + host.store_write(&path, &bytes, 0) +} + +/// Returns the state of the sequencer kernel. +/// +/// Or the default value if it is not present in storage. +#[cfg(test)] +pub fn read_state(host: &mut H) -> Result { + let path = sequencer_prefix(&STATE)?; + + let is_present = host.store_has(&path)?; + match is_present { + None => Ok(State::default()), + Some(_) => { + // read the size of the state + let size = host.store_value_size(&path)?; + // read the according bytes + let bytes = host.store_read(&path, 0, size)?; + // deserialize the state + let (_, state) = State::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + Ok(state) + } + } +} + #[cfg(test)] mod tests { + use super::{map_user_path, sequencer_prefix, write_state, USER_PREFIX_PATH}; + use crate::state::State; use tezos_smart_rollup_host::path::{concat, OwnedPath, RefPath}; - - use super::{map_user_path, sequencer_prefix, USER_PREFIX_PATH}; + use tezos_smart_rollup_host::runtime::Runtime; + use tezos_smart_rollup_mock::MockHost; #[test] fn test_sequencer_prefixed() { @@ -62,4 +102,16 @@ mod tests { let expected = concat(&USER_PREFIX_PATH, &user_path).unwrap(); assert_eq!(expected, prefixed_user_path); } + + #[test] + fn test_write_state() { + let path = RefPath::assert_from(b"/__sequencer/state"); + let state = State::Fallback; + let mut mock_host = MockHost::default(); + + write_state(&mut mock_host, state).expect("write_state should succeed"); + + let is_present = mock_host.store_has(&path).expect("Store has should work"); + assert!(is_present.is_some()); + } }