use crate::{
deadline_clock::{
DeadlineClock,
OnConflict,
},
ports::{
BlockImporter,
BlockProducer,
TransactionPool,
},
Config,
Trigger,
};
use anyhow::{
anyhow,
Context,
};
use fuel_core_services::{
stream::BoxStream,
RunnableService,
RunnableTask,
ServiceRunner,
StateWatcher,
};
use fuel_core_storage::transactional::StorageTransaction;
use fuel_core_types::{
blockchain::{
block::Block,
consensus::{
poa::PoAConsensus,
Consensus,
},
primitives::{
BlockHeight,
SecretKeyWrapper,
},
SealedBlock,
},
fuel_asm::Word,
fuel_crypto::Signature,
fuel_tx::UniqueIdentifier,
secrecy::{
ExposeSecret,
Secret,
},
services::{
block_importer::ImportResult,
executor::{
ExecutionResult,
UncommittedResult as UncommittedExecutionResult,
},
txpool::TxStatus,
Uncommitted,
},
tai64::Tai64,
};
use std::ops::Deref;
use tokio::{
sync::{
mpsc,
oneshot,
},
time::Instant,
};
use tokio_stream::StreamExt;
use tracing::error;
pub type Service<T, B, I> = ServiceRunner<Task<T, B, I>>;
#[derive(Clone)]
pub struct SharedState {
request_sender: mpsc::Sender<Request>,
}
impl SharedState {
pub async fn manually_produce_block(
&self,
block_times: Vec<Option<Tai64>>,
) -> anyhow::Result<()> {
let (sender, receiver) = oneshot::channel();
self.request_sender
.send(Request::ManualBlocks((block_times, sender)))
.await?;
receiver.await?
}
}
enum Request {
ManualBlocks((Vec<Option<Tai64>>, oneshot::Sender<anyhow::Result<()>>)),
}
impl core::fmt::Debug for Request {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
write!(f, "Request")
}
}
pub(crate) enum RequestType {
Manual,
Trigger,
}
pub struct Task<T, B, I> {
block_gas_limit: Word,
signing_key: Option<Secret<SecretKeyWrapper>>,
block_producer: B,
block_importer: I,
txpool: T,
tx_status_update_stream: BoxStream<TxStatus>,
request_receiver: mpsc::Receiver<Request>,
shared_state: SharedState,
last_height: BlockHeight,
last_block_created: Instant,
trigger: Trigger,
timer: DeadlineClock,
}
impl<T, B, I> Task<T, B, I>
where
T: TransactionPool,
{
pub fn new(
last_height: BlockHeight,
config: Config,
txpool: T,
block_producer: B,
block_importer: I,
) -> Self {
let tx_status_update_stream = txpool.transaction_status_events();
let (request_sender, request_receiver) = mpsc::channel(100);
Self {
block_gas_limit: config.block_gas_limit,
signing_key: config.signing_key,
txpool,
block_producer,
block_importer,
tx_status_update_stream,
request_receiver,
shared_state: SharedState { request_sender },
last_height,
last_block_created: Instant::now(),
trigger: config.trigger,
timer: DeadlineClock::new(),
}
}
fn next_height(&self) -> BlockHeight {
self.last_height + 1u32.into()
}
}
impl<D, T, B, I> Task<T, B, I>
where
T: TransactionPool,
B: BlockProducer<Database = D>,
I: BlockImporter<Database = D>,
{
async fn signal_produce_block(
&self,
height: BlockHeight,
block_time: Option<Tai64>,
) -> anyhow::Result<UncommittedExecutionResult<StorageTransaction<D>>> {
self.block_producer
.produce_and_execute_block(height, block_time, self.block_gas_limit)
.await
}
pub(crate) async fn produce_next_block(&mut self) -> anyhow::Result<()> {
self.produce_block(self.next_height(), None, RequestType::Trigger)
.await
}
pub(crate) async fn produce_manual_blocks(
&mut self,
block_times: Vec<Option<Tai64>>,
) -> anyhow::Result<()> {
for block_time in block_times {
self.produce_block(self.next_height(), block_time, RequestType::Manual)
.await?;
}
Ok(())
}
pub(crate) async fn produce_block(
&mut self,
height: BlockHeight,
block_time: Option<Tai64>,
request_type: RequestType,
) -> anyhow::Result<()> {
if self.signing_key.is_none() {
return Err(anyhow!("unable to produce blocks without a consensus key"))
}
let (
ExecutionResult {
block,
skipped_transactions,
tx_status,
},
db_transaction,
) = self.signal_produce_block(height, block_time).await?.into();
let mut tx_ids_to_remove = Vec::with_capacity(skipped_transactions.len());
for (tx, err) in skipped_transactions {
error!(
"During block production got invalid transaction {:?} with error {:?}",
tx, err
);
tx_ids_to_remove.push(tx.id());
}
self.txpool.remove_txs(tx_ids_to_remove);
let seal = seal_block(&self.signing_key, &block)?;
let block = SealedBlock {
entity: block,
consensus: seal,
};
self.block_importer.commit_result(Uncommitted::new(
ImportResult {
sealed_block: block,
tx_status,
},
db_transaction,
))?;
self.last_height = height;
self.last_block_created = Instant::now();
match (self.trigger, request_type) {
(Trigger::Never, RequestType::Manual) => (),
(Trigger::Never, RequestType::Trigger) => {
unreachable!("Trigger production will never produce blocks in never mode")
}
(Trigger::Instant, _) => {}
(Trigger::Interval { block_time }, RequestType::Trigger) => {
self.timer.set_timeout(block_time, OnConflict::Min).await;
}
(
Trigger::Hybrid {
max_block_time,
min_block_time,
max_tx_idle_time,
},
RequestType::Trigger,
) => {
let consumable_gas = self.txpool.total_consumable_gas();
if consumable_gas > self.block_gas_limit {
self.timer
.set_timeout(min_block_time, OnConflict::Max)
.await;
} else if self.txpool.pending_number() > 0 {
self.timer
.set_timeout(max_tx_idle_time, OnConflict::Max)
.await;
} else {
self.timer
.set_timeout(max_block_time, OnConflict::Max)
.await;
}
}
(Trigger::Interval { .. }, RequestType::Manual)
| (Trigger::Hybrid { .. }, RequestType::Manual) => {
unreachable!("Trigger types interval and hybrid cannot be used with manual. This is enforced during config validation")
}
}
Ok(())
}
pub(crate) async fn on_txpool_event(
&mut self,
txpool_event: TxStatus,
) -> anyhow::Result<()> {
match txpool_event {
TxStatus::Submitted => match self.trigger {
Trigger::Instant => {
let pending_number = self.txpool.pending_number();
if pending_number > 0 {
self.produce_next_block().await?;
}
Ok(())
}
Trigger::Never | Trigger::Interval { .. } => Ok(()),
Trigger::Hybrid {
max_tx_idle_time,
min_block_time,
..
} => {
let consumable_gas = self.txpool.total_consumable_gas();
if consumable_gas > self.block_gas_limit
&& self.last_block_created + min_block_time < Instant::now()
{
self.produce_next_block().await?;
} else if self.txpool.pending_number() > 0 {
self.timer
.set_timeout(max_tx_idle_time, OnConflict::Min)
.await;
}
Ok(())
}
},
TxStatus::Completed => Ok(()), TxStatus::SqueezedOut { .. } => {
Ok(())
}
}
}
async fn on_timer(&mut self, _at: Instant) -> anyhow::Result<()> {
match self.trigger {
Trigger::Instant | Trigger::Never => {
unreachable!("Timer is never set in this mode");
}
Trigger::Interval { .. } | Trigger::Hybrid { .. } => {
self.produce_next_block().await?;
Ok(())
}
}
}
}
#[async_trait::async_trait]
impl<T, B, I> RunnableService for Task<T, B, I>
where
Self: RunnableTask,
{
const NAME: &'static str = "PoA";
type SharedData = SharedState;
type Task = Task<T, B, I>;
fn shared_data(&self) -> Self::SharedData {
self.shared_state.clone()
}
async fn into_task(self, _: &StateWatcher) -> anyhow::Result<Self::Task> {
match self.trigger {
Trigger::Never | Trigger::Instant => {}
Trigger::Interval { block_time } => {
self.timer
.set_timeout(block_time, OnConflict::Overwrite)
.await;
}
Trigger::Hybrid { max_block_time, .. } => {
self.timer
.set_timeout(max_block_time, OnConflict::Overwrite)
.await;
}
};
Ok(self)
}
}
#[async_trait::async_trait]
impl<D, T, B, I> RunnableTask for Task<T, B, I>
where
T: TransactionPool,
B: BlockProducer<Database = D>,
I: BlockImporter<Database = D>,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
let should_continue;
tokio::select! {
_ = watcher.while_started() => {
should_continue = false;
}
request = self.request_receiver.recv() => {
if let Some(request) = request {
match request {
Request::ManualBlocks((block_times, response)) => {
let result = self.produce_manual_blocks(block_times).await;
let _ = response.send(result);
}
}
should_continue = true;
} else {
unreachable!("The task is the holder of the `Sender` too")
}
}
txpool_event = self.tx_status_update_stream.next() => {
if let Some(txpool_event) = txpool_event {
self.on_txpool_event(txpool_event).await.context("While processing txpool event")?;
should_continue = true;
} else {
should_continue = false;
}
}
at = self.timer.wait() => {
self.on_timer(at).await.context("While processing timer event")?;
should_continue = true;
}
}
Ok(should_continue)
}
async fn shutdown(self) -> anyhow::Result<()> {
Ok(())
}
}
pub fn new_service<D, T, B, I>(
last_height: BlockHeight,
config: Config,
txpool: T,
block_producer: B,
block_importer: I,
) -> Service<T, B, I>
where
T: TransactionPool + 'static,
B: BlockProducer<Database = D> + 'static,
I: BlockImporter<Database = D> + 'static,
{
Service::new(Task::new(
last_height,
config,
txpool,
block_producer,
block_importer,
))
}
fn seal_block(
signing_key: &Option<Secret<SecretKeyWrapper>>,
block: &Block,
) -> anyhow::Result<Consensus> {
if let Some(key) = signing_key {
let block_hash = block.id();
let message = block_hash.into_message();
let signing_key = key.expose_secret().deref();
let poa_signature = Signature::sign(signing_key, &message);
let seal = Consensus::PoA(PoAConsensus::new(poa_signature));
Ok(seal)
} else {
Err(anyhow!("no PoA signing key configured"))
}
}