use crate::child_ref::ChildRef;
use crate::children_ref::ChildrenRef;
use crate::envelope::{Envelope, RefAddr, SignedMessage};
use crate::message::{Answer, BastionMessage, Message, Msg};
use crate::supervisor::SupervisorRef;
use futures::pending;
use qutex::{Guard, Qutex};
use std::collections::VecDeque;
use std::fmt::{self, Display, Formatter};
use uuid::Uuid;
pub const NIL_ID: BastionId = BastionId(Uuid::nil());
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
pub struct BastionId(Uuid);
#[derive(Debug)]
pub struct BastionContext {
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Qutex<ContextState>,
}
#[derive(Debug)]
pub(crate) struct ContextState {
msgs: VecDeque<SignedMessage>,
}
impl BastionId {
pub(crate) fn new() -> Self {
let uuid = Uuid::new_v4();
BastionId(uuid)
}
}
impl BastionContext {
pub(crate) fn new(
id: BastionId,
child: ChildRef,
children: ChildrenRef,
supervisor: Option<SupervisorRef>,
state: Qutex<ContextState>,
) -> Self {
debug!("BastionContext({}): Creating.", id);
BastionContext {
id,
child,
children,
supervisor,
state,
}
}
pub fn current(&self) -> &ChildRef {
&self.child
}
pub fn parent(&self) -> &ChildrenRef {
&self.children
}
pub fn supervisor(&self) -> Option<&SupervisorRef> {
self.supervisor.as_ref()
}
pub async fn try_recv(&self) -> Option<SignedMessage> {
debug!("BastionContext({}): Trying to receive message.", self.id);
let mut state = self.state.clone().lock_async().await.ok()?;
if let Some(msg) = state.msgs.pop_front() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
Some(msg)
} else {
trace!("BastionContext({}): Received no message.", self.id);
None
}
}
pub async fn recv(&self) -> Result<SignedMessage, ()> {
debug!("BastionContext({}): Waiting to receive message.", self.id);
loop {
let mut state = self.state.clone().lock_async().await.unwrap();
if let Some(msg) = state.msgs.pop_front() {
trace!("BastionContext({}): Received message: {:?}", self.id, msg);
return Ok(msg);
}
Guard::unlock(state);
pending!();
}
}
pub fn signature(&self) -> RefAddr {
RefAddr::new(
self.current().path().clone(),
self.current().sender().clone(),
)
}
pub fn tell<M: Message>(&self, to: &RefAddr, msg: M) -> Result<(), M> {
debug!(
"{:?}: Telling message: {:?} to: {:?}",
self.current().path(),
msg,
to.path()
);
let msg = BastionMessage::tell(msg);
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())
}
pub fn ask<M: Message>(&self, to: &RefAddr, msg: M) -> Result<Answer, M> {
debug!(
"{:?}: Asking message: {:?} to: {:?}",
self.current().path(),
msg,
to
);
let (msg, answer) = BastionMessage::ask(msg);
let env = Envelope::new_with_sign(msg, self.signature());
to.sender()
.unbounded_send(env)
.map_err(|err| err.into_inner().into_msg().unwrap())?;
Ok(answer)
}
}
impl ContextState {
pub(crate) fn new() -> Self {
let msgs = VecDeque::new();
ContextState { msgs }
}
pub(crate) fn push_msg(&mut self, msg: Msg, sign: RefAddr) {
self.msgs.push_back(SignedMessage::new(msg, sign))
}
}
impl Display for BastionId {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
self.0.fmt(fmt)
}
}