use crate::child_ref::ChildRef;
use crate::children_ref::ChildrenRef;
use crate::dispatcher::{BroadcastTarget, DispatcherType, NotificationType};
use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use crate::system::SYSTEM;
use async_mutex::Mutex;
use futures::pending;
#[cfg(feature = "scaling")]
use lever::table::lotable::LOTable;
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use std::pin::Pin;
#[cfg(feature = "scaling")]
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use tracing::{debug, trace};
use uuid::Uuid;
pub const NIL_ID: BastionId = BastionId(Uuid::nil());
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub struct BastionId(pub(crate) Uuid);
#[derive(Debug)]
pub struct BastionContext {
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
}
#[derive(Debug)]
pub(crate) struct ContextState {
messages: VecDeque<SignedMessage>,
#[cfg(feature = "scaling")]
stats: Arc<AtomicU64>,
#[cfg(feature = "scaling")]
actor_stats: Arc<LOTable<BastionId, u32>>,
}
impl BastionId {
pub(crate) fn new() -> Self {
let uuid = Uuid::new_v4();
BastionId(uuid)
}
}
impl BastionContext {
pub(crate) fn new(
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
id,
child,
children,
supervisor,
state,
}
}
pub fn current(&self) -> &ChildRef {
&self.child
}
pub fn parent(&self) -> &ChildrenRef {
&self.children
}
pub fn supervisor(&self) -> Option<&SupervisorRef> {
self.supervisor.as_ref()
}
pub async fn try_recv(&self) -> Option<SignedMessage> {
debug!("BastionContext({}): Trying to receive message.", self.id);
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
}
pub async fn recv(&self) -> Result<SignedMessage, ()> {
debug!("BastionContext({}): Waiting to receive message.", self.id);
loop {
let state = self.state.clone();
let mut guard = state.lock().await;
if let Some(msg) = guard.pop_message() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
return Ok(msg);
}
drop(guard);
pending!();
}
}
pub fn signature(&self) -> RefAddr {
RefAddr::new(
self.current().path().clone(),
self.current().sender().clone(),
)
}
pub fn tell<M: Message>(&self, to: &RefAddr, msg: M) -> Result<(), M> {
debug!(
"{:?}: Telling message: {:?} to: {:?}",
self.current().path(),
msg,
to.path()
);
let msg = BastionMessage::tell(msg);
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())
}
pub fn ask<M: Message>(&self, to: &RefAddr, msg: M) -> Result<Answer, M> {
debug!(
"{:?}: Asking message: {:?} to: {:?}",
self.current().path(),
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())?;
Ok(answer)
}
pub fn notify(&self, dispatchers: &[DispatcherType], notification_type: NotificationType) {
let global_dispatcher = SYSTEM.dispatcher();
let from_actor = self.current();
global_dispatcher.notify(from_actor, dispatchers, notification_type);
}
pub fn broadcast_message<M: Message>(&self, target: BroadcastTarget, message: M) {
let msg = Arc::new(SignedMessage {
msg: Msg::broadcast(message),
sign: self.signature(),
});
let global_dispatcher = SYSTEM.dispatcher();
global_dispatcher.broadcast_message(target, &msg);
}
}
impl ContextState {
pub(crate) fn new() -> Self {
ContextState {
messages: VecDeque::new(),
#[cfg(feature = "scaling")]
stats: Arc::new(AtomicU64::new(0)),
#[cfg(feature = "scaling")]
actor_stats: Arc::new(LOTable::new()),
}
}
#[cfg(feature = "scaling")]
pub(crate) fn set_stats(&mut self, stats: Arc<AtomicU64>) {
self.stats = stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn set_actor_stats(&mut self, actor_stats: Arc<LOTable<BastionId, u32>>) {
self.actor_stats = actor_stats;
}
#[cfg(feature = "scaling")]
pub(crate) fn stats(&self) -> Arc<AtomicU64> {
self.stats.clone()
}
#[cfg(feature = "scaling")]
pub(crate) fn actor_stats(&self) -> Arc<LOTable<BastionId, u32>> {
self.actor_stats.clone()
}
pub(crate) fn push_message(&mut self, msg: Msg, sign: RefAddr) {
self.messages.push_back(SignedMessage::new(msg, sign))
}
pub(crate) fn pop_message(&mut self) -> Option<SignedMessage> {
self.messages.pop_front()
}
#[cfg(feature = "scaling")]
pub(crate) fn mailbox_size(&self) -> u32 {
self.messages.len() as _
}
}
impl Display for BastionId {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}