diff --git a/src/kernel_sequencer/src/delayed_inbox.rs b/src/kernel_sequencer/src/delayed_inbox.rs index c545cda2befdaeb026c88fc7da5ef056718df0bc..087a506b7f0d251239ab062bf375abfed0be2311 100644 --- a/src/kernel_sequencer/src/delayed_inbox.rs +++ b/src/kernel_sequencer/src/delayed_inbox.rs @@ -4,6 +4,7 @@ // SPDX-License-Identifier: MIT use tezos_data_encoding::nom::NomReader; +use tezos_data_encoding_derive::BinWriter; use tezos_smart_rollup_encoding::smart_rollup::SmartRollupAddress; use tezos_smart_rollup_host::{ input::Message, @@ -13,9 +14,17 @@ use tezos_smart_rollup_host::{ use crate::{ message::{Framed, KernelMessage, Sequence, SequencerMsg, SetSequencer}, + queue::Queue, routing::FilterBehavior, }; +/// Message added to the delayed inbox +#[derive(BinWriter)] +pub struct UserMessage { + timeout_level: u32, + payload: Vec, +} + /// Return a message from the inbox /// /// This function drives the delayed inbox: @@ -25,6 +34,8 @@ use crate::{ pub fn read_input( host: &mut Host, filter_behavior: FilterBehavior, + timeout_window: u32, + delayed_inbox_queue: &mut Queue, ) -> Result, RuntimeError> { let RollupMetadata { raw_rollup_address, .. @@ -52,7 +63,15 @@ pub fn read_input( &raw_rollup_address, ), KernelMessage::DelayedMessage(user_message) => { - handle_message(user_message, filter_behavior, &raw_rollup_address) + let _ = handle_message( + host, + delayed_inbox_queue, + timeout_window, + user_message, + msg.level, + filter_behavior, + &raw_rollup_address, + ); } }, } @@ -83,13 +102,25 @@ fn handle_set_sequencer_message( } /// Handle messages -fn handle_message( +fn handle_message( + host: &mut H, + queue: &mut Queue, + timeout_window: u32, user_message: Vec, + level: u32, filter_behavior: FilterBehavior, rollup_address: &[u8; RAW_ROLLUP_ADDRESS_SIZE], -) { +) -> Result<(), RuntimeError> { // Check if the message should be included in the delayed inbox if filter_behavior.predicate(user_message.as_ref(), rollup_address) { // add the message to the delayed inbox + let user_message = UserMessage { + timeout_level: level + timeout_window, + payload: user_message, + }; + + queue.add(host, &user_message)?; } + + Ok(()) } diff --git a/src/kernel_sequencer/src/lib.rs b/src/kernel_sequencer/src/lib.rs index 078dcca82610255dce778e8fca0ec31e34d0d58a..77feb871e52aad0533e0c7cf0c3d621092235ba3 100644 --- a/src/kernel_sequencer/src/lib.rs +++ b/src/kernel_sequencer/src/lib.rs @@ -4,6 +4,7 @@ mod delayed_inbox; mod message; +mod queue; pub mod routing; mod sequencer_macro; pub mod sequencer_runtime; diff --git a/src/kernel_sequencer/src/queue.rs b/src/kernel_sequencer/src/queue.rs new file mode 100644 index 0000000000000000000000000000000000000000..7ae9c0e8e72ea9bacaf31e79d2c8219c656dd9b8 --- /dev/null +++ b/src/kernel_sequencer/src/queue.rs @@ -0,0 +1,435 @@ +// SPDX-FileCopyrightText: 2023 Marigold +// +// SPDX-License-Identifier: MIT + +use tezos_data_encoding::{enc::BinWriter, nom::NomReader}; +use tezos_smart_rollup_host::{ + path::{concat, OwnedPath, Path, RefPath}, + runtime::{Runtime, RuntimeError}, + Error, +}; + +use crate::storage::sequencer_prefix; + +/// A simple FIFO (First-In-First-Out) queue implementation. +/// +/// The queue can support any type of element. +/// The element has to implement NomReader and BinWriter +/// The queue has a maximum size of 2^32 elements. +/// When the queue is full, it won't accept new elements. +pub struct Queue { + prefix: OwnedPath, + pointer_path: OwnedPath, + pointer: Option, +} + +/// Pointer that indicates where is the head and the tail of the queue. +/// +/// `head` represents the index of the element to be popped out from the queue. +/// `next_add_slot` represents the exclusive tail, +/// or the index of the next element to be added to the queue. +#[derive(NomReader, BinWriter, Copy, Clone)] +struct Pointer { + head: u32, + next_add_slot: u32, +} + +/// Compute the path where the pointer to the `head` and `next_add_slot` is stored. +fn pointer_path(prefix: &impl Path) -> Result { + let pointer_path = RefPath::assert_from(b"/pointer"); + concat(prefix, &pointer_path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) +} + +/// Load the pointer from the durable storage. +fn load_pointer( + pointer_path: &OwnedPath, + host: &H, +) -> Result, RuntimeError> { + match host.store_has(pointer_path)? { + None => Ok(None), + Some(_) => { + // 8 bytes because the pointer is 2 u32 + let data = host.store_read(pointer_path, 0, 8)?; + let (_, pointer) = Pointer::nom_read(&data).map_err(|_| RuntimeError::DecodingError)?; + Ok(Some(pointer)) + } + } +} + +impl Queue { + /// Loads a queue from the given path. + /// + /// If the given path does not point to a queue + /// a new empty queue is created. + pub fn new(host: &H, prefix: &P) -> Result { + // Prefix the path of the queue by "__sequencer/" + let prefix = sequencer_prefix(prefix)?; + // Get the pointer path to avoid compute it too many times. + let pointer_path = pointer_path(&prefix)?; + // Load the pointer or use the default one + let pointer = load_pointer(&pointer_path, host)?; + Ok(Queue { + prefix, + pointer_path, + pointer, + }) + } + + /// Compute the path of one element of the queue. + fn element_path(&self, id: u32) -> Result { + let path = OwnedPath::try_from(format!("/elements/{}", id)) + .map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey))?; + + concat(&self.prefix, &path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) + } + + /// Save the pointer of the queue in the durable storage. + fn save_pointer(&self, host: &mut H) -> Result<(), RuntimeError> { + match self.pointer { + None => { + // If the pointer is not present then it's removed from the durable storage. + host.store_delete(&self.pointer_path) + } + Some(pointer) => { + // Serialize the pointer + let mut output = Vec::new(); + pointer + .bin_write(&mut output) + .map_err(|_| RuntimeError::HostErr(Error::GenericInvalidAccess))?; + // Save the pointer to the durable storage + host.store_write(&self.pointer_path, &output, 0)?; + Ok(()) + } + } + } + + /// Returns the next pointer with the `next_add_slot` incremented. + fn increment_next_add_slot_pointer(&self) -> Pointer { + match self.pointer { + None => Pointer { + head: 0, + next_add_slot: 1, + }, + Some(Pointer { + head, + next_add_slot, + }) => Pointer { + head, + next_add_slot: { + let (incremented_next_add_slot, _) = next_add_slot.overflowing_add(1); + incremented_next_add_slot + }, + }, + } + } + + /// Returns the next pointer with an incremented head. + /// + /// Returns None if the pointer indicated an empty queue. + fn increment_head_pointer(&self) -> Option { + match self.pointer { + None => None, + Some(Pointer { + head, + next_add_slot, + }) => { + let (next_head, _) = head.overflowing_add(1); + // Check if the pointer indicates an empty list + if next_head == next_add_slot { + return None; + } + Some(Pointer { + head: next_head, + next_add_slot, + }) + } + } + } + + /// Returns the pointer to the head of the queue. + fn head_pointer(&self) -> u32 { + match self.pointer { + Some(Pointer { + head, + next_add_slot: _, + }) => head, + None => 0, + } + } + + /// Returns the pointer to the `next_add_slot` of the queue. + fn next_add_slot_pointer(&self) -> u32 { + match self.pointer { + Some(Pointer { + head: _, + next_add_slot, + }) => next_add_slot, + None => 0, + } + } + + /// Check if the queue is empty. + fn is_empty(&self) -> bool { + self.pointer.is_none() + } + + /// Check if the queue is full. + fn is_full(&self) -> bool { + match self.pointer { + None => false, // If the pointer is not present, the queue is empty. + Some(Pointer { + head, + next_add_slot, + }) => { + let (next_add_slot, _) = next_add_slot.overflowing_add(1); + next_add_slot == head // The queue is full when the next element slot is equal to the current head. + } + } + } + + /// Add an element to the back of the queue + /// + /// Returns an error when the queue is full. + pub fn add(&mut self, host: &mut H, element: &E) -> Result<(), RuntimeError> + where + E: BinWriter, + { + // Check if the queue is full + if self.is_full() { + return Err(RuntimeError::HostErr(Error::GenericInvalidAccess)); + } + + // Compute the path of the element + let id = self.next_add_slot_pointer(); + let path = self.element_path(id)?; + + // Get the bytes of the element + let mut bytes = Vec::new(); + element + .bin_write(&mut bytes) + .map_err(|_| RuntimeError::DecodingError)?; + + // write the bytes to the store + host.store_write(&path, bytes.as_ref(), 0)?; + + // update the queue pointer + self.pointer = Some(self.increment_next_add_slot_pointer()); + + // save the pointer to the durable storage + self.save_pointer(host)?; + + Ok(()) + } + + /// Remove and returns the first element of the queue. + /// + /// If the queue is empty None is returned + #[allow(dead_code)] + pub fn pop(&mut self, host: &mut H) -> Result, RuntimeError> + where + E: NomReader, + { + // Check if the queue is empty + if self.is_empty() { + return Ok(None); + } + + // Get the path of the element + let id = self.head_pointer(); + let path = self.element_path(id)?; + + // Check if the element is present + let Some(_) = host.store_has(&path)? else {return Ok(None)}; + + // Deserialize the element + let size = host.store_value_size(&path)?; + let bytes = host.store_read(&path, 0, size)?; + let (_, element) = E::nom_read(&bytes).map_err(|_| RuntimeError::DecodingError)?; + host.store_delete(&path)?; + + // Update the pointer + self.pointer = self.increment_head_pointer(); + self.save_pointer(host)?; + + Ok(Some(element)) + } +} + +#[cfg(test)] +mod tests2 { + use crate::storage::DELAYED_INBOX_PATH; + + use super::{Pointer, Queue}; + use tezos_data_encoding_derive::{BinWriter, NomReader}; + use tezos_smart_rollup_host::{ + path::RefPath, + runtime::{Runtime, RuntimeError}, + }; + use tezos_smart_rollup_mock::MockHost; + + #[derive(BinWriter, NomReader, Eq, PartialEq, Debug)] + struct Element { + inner: u32, + } + + impl Element { + fn new(inner: u32) -> Self { + Self { inner } + } + } + + const QUEUE_PATH: RefPath = RefPath::assert_from(b"/queue"); + + /// Initialize a Queue with a custom pointer + fn queue_with_pointer(host: &H, pointer: Pointer) -> Queue { + let queue = Queue::new(host, &QUEUE_PATH).unwrap(); + Queue { + prefix: queue.prefix, + pointer_path: queue.pointer_path, + pointer: Some(pointer), + } + } + + #[test] + fn test_empty() { + let host = MockHost::default(); + let queue = Queue::new(&host, &QUEUE_PATH).unwrap(); + assert!(queue.is_empty()) + } + + #[test] + fn test_is_not_full() { + let host = MockHost::default(); + let queue = Queue::new(&host, &QUEUE_PATH).unwrap(); + assert!(!queue.is_full()) + } + + #[test] + fn test_is_full() { + let mut host = MockHost::default(); + let mut queue = queue_with_pointer( + &host, + Pointer { + head: 0, + next_add_slot: u32::MAX - 3, + }, + ); + + let _ = queue.add(&mut host, &Element::new(0)); + let _ = queue.add(&mut host, &Element::new(0)); + assert!(!queue.is_full()); + let _ = queue.add(&mut host, &Element::new(0)); + assert!(queue.is_full()); + } + + #[test] + fn test_add_element() { + let mut host = MockHost::default(); + let mut queue = Queue::new(&host, &DELAYED_INBOX_PATH).unwrap(); + + queue + .add(&mut host, &Element::new(0)) + .expect("Element should be added"); + assert!(!queue.is_empty()); + } + + #[test] + fn test_cannot_pop_elements_empty_queue() { + let mut host = MockHost::default(); + let mut queue = Queue::new(&host, &DELAYED_INBOX_PATH).unwrap(); + let element: Result, RuntimeError> = queue.pop(&mut host); + assert!(queue.is_empty()); + assert!(matches!(element, Ok(None))) + } + + #[test] + fn test_empty_queue_of_one_element() { + let mut host = MockHost::default(); + let mut queue = Queue::new(&host, &DELAYED_INBOX_PATH).unwrap(); + queue + .add(&mut host, &Element::new(0)) + .expect("adding element should work"); + let _: Option = queue + .pop(&mut host) + .expect("should be able to pop out an element"); + assert!(queue.is_empty()); + } + + #[test] + fn test_cycling_queue() { + let mut host = MockHost::default(); + // The queue starts at u32::MAX - 2 to simulate an overflow of the pointer + let mut queue = queue_with_pointer( + &host, + Pointer { + head: u32::MAX - 2, + next_add_slot: u32::MAX - 2, + }, + ); + // Adding 4 elements + queue + .add(&mut host, &Element::new(0)) + .expect("adding element should work"); + queue + .add(&mut host, &Element::new(1)) + .expect("adding element should work"); + queue + .add(&mut host, &Element::new(2)) + .expect("adding element should work"); + queue + .add(&mut host, &Element::new(3)) + .expect("adding element should work"); + + // Pop out 2 elements + let e1: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + let e2: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + let e3: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + let e4: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + + assert_eq!(e1, Element::new(0)); + assert_eq!(e2, Element::new(1)); + assert_eq!(e3, Element::new(2)); + assert_eq!(e4, Element::new(3)); + assert!(queue.is_empty()); + } + + #[test] + fn test_queue_correctly_saved() { + let mut host = MockHost::default(); + let mut queue1 = Queue::new(&host, &QUEUE_PATH).expect("create first queue"); + + queue1 + .add(&mut host, &Element::new(0)) + .expect("Should work"); + + queue1 + .add(&mut host, &Element::new(1)) + .expect("Should work"); + + let mut queue = Queue::new(&host, &QUEUE_PATH).expect("create second queue"); + + let e1: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + let e2: Element = queue + .pop(&mut host) + .unwrap() + .expect("element should be present"); + + assert_eq!(e1, Element::new(0)); + assert_eq!(e2, Element::new(1)); + } +} diff --git a/src/kernel_sequencer/src/sequencer_macro.rs b/src/kernel_sequencer/src/sequencer_macro.rs index 9b775341b409e59b558d16d956f7b0c1babdc0ba..e6c43bf129494f5609b3519e6a6840d45ad1a61f 100644 --- a/src/kernel_sequencer/src/sequencer_macro.rs +++ b/src/kernel_sequencer/src/sequencer_macro.rs @@ -24,7 +24,8 @@ macro_rules! sequencer_kernel_entry { pub extern "C" fn kernel_run() { use tezos_smart_rollup_core::rollup_host::RollupHost; let host = unsafe { RollupHost::new() }; // Runtime from the tezos sdk - let mut host = $crate::sequencer_runtime::SequencerRuntime::new(host, $filter_behavior); // create a sequencer runtime that use the RollupHost runtime + let mut host = + $crate::sequencer_runtime::SequencerRuntime::new(host, $filter_behavior, 100); // create a sequencer runtime that use the RollupHost runtime $kernel_run(&mut host) } }; diff --git a/src/kernel_sequencer/src/sequencer_runtime.rs b/src/kernel_sequencer/src/sequencer_runtime.rs index 8b1ea976430cfdd6416aeb25cd34f8f2243e44b8..af7c39a1939de47a599340dfcd446d70524bea92 100644 --- a/src/kernel_sequencer/src/sequencer_runtime.rs +++ b/src/kernel_sequencer/src/sequencer_runtime.rs @@ -11,7 +11,10 @@ use tezos_smart_rollup_host::{ runtime::{Runtime, RuntimeError, ValueType}, }; -use crate::{delayed_inbox::read_input, routing::FilterBehavior, storage::map_user_path}; +use crate::{ + delayed_inbox::read_input, queue::Queue, routing::FilterBehavior, storage::map_user_path, + storage::DELAYED_INBOX_PATH, +}; pub struct SequencerRuntime where @@ -20,6 +23,10 @@ where host: R, /// if true then the input is added to the delayed inbox input_predicate: FilterBehavior, + /// maximum number of level a message can stay in the delayed inbox + timeout_window: u32, + /// The delayed inbox queue + delayed_inbox_queue: Queue, } /// Runtime that handles the delayed inbox and the sequencer protocol. @@ -30,10 +37,13 @@ impl SequencerRuntime where R: Runtime, { - pub fn new(host: R, input_predicate: FilterBehavior) -> Self { + pub fn new(host: R, input_predicate: FilterBehavior, timeout_window: u32) -> Self { + let delayed_inbox_queue = Queue::new(&host, &DELAYED_INBOX_PATH).unwrap(); Self { host, input_predicate, + timeout_window, + delayed_inbox_queue, } } } @@ -51,7 +61,12 @@ where } fn read_input(&mut self) -> Result, RuntimeError> { - read_input(&mut self.host, self.input_predicate) + read_input( + &mut self.host, + self.input_predicate, + self.timeout_window, + &mut self.delayed_inbox_queue, + ) } fn store_has(&self, path: &T) -> Result, RuntimeError> { @@ -165,3 +180,57 @@ where self.host.runtime_version() } } + +#[cfg(test)] +mod tests { + + use super::SequencerRuntime; + use tezos_data_encoding_derive::BinWriter; + use tezos_smart_rollup_host::{path::RefPath, runtime::Runtime}; + use tezos_smart_rollup_mock::MockHost; + + #[derive(BinWriter)] + struct UserMessage { + payload: u32, + } + + impl UserMessage { + fn new(payload: u32) -> UserMessage { + UserMessage { payload } + } + } + + #[test] + fn test_add_user_message() { + let mut mock_host = MockHost::default(); + mock_host.add_external(UserMessage::new(1)); + mock_host.add_external(UserMessage::new(2)); + mock_host.add_external(UserMessage::new(3)); + + let mut sequencer_runtime = + SequencerRuntime::new(mock_host, crate::FilterBehavior::AllowAll, 1); + + let input = sequencer_runtime.read_input().unwrap(); + let SequencerRuntime { host, .. } = sequencer_runtime; + + assert!(input.is_none()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/0" + )) + .unwrap() + .is_some()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/1" + )) + .unwrap() + .is_some()); + assert!(host + .store_has(&RefPath::assert_from( + b"/__sequencer/delayed-inbox/elements/2" + )) + .unwrap() + .is_some()); + } +} diff --git a/src/kernel_sequencer/src/storage.rs b/src/kernel_sequencer/src/storage.rs index d81838cc6abde616cbe596bee4b24e442501edc5..a4ae685258c6be0126556a6614fab24da6705e45 100644 --- a/src/kernel_sequencer/src/storage.rs +++ b/src/kernel_sequencer/src/storage.rs @@ -10,12 +10,12 @@ use tezos_smart_rollup_host::{ const SEQUENCER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/__sequencer"); const USER_PREFIX_PATH: RefPath = RefPath::assert_from(b"/u"); +pub const DELAYED_INBOX_PATH: RefPath = RefPath::assert_from(b"/delayed-inbox"); /// Prefix the given path by `/__sequencer`. /// /// This function has to be used when writing/reading storage related to the sequencer kernel. /// Then with this function, all the sequencer kernel storage should be under the path `__sequencer`. -#[allow(dead_code)] pub fn sequencer_prefix(path: &T) -> Result { concat(&SEQUENCER_PREFIX_PATH, path).map_err(|_| RuntimeError::HostErr(Error::StoreInvalidKey)) }