use crate::broadcast::{Broadcast, Parent, Sender};
use crate::callbacks::Callbacks;
use crate::context::{BastionContext, BastionId, ContextState};
use crate::message::{Answer, BastionMessage, Message};
use bastion_executor::pool;
use futures::pending;
use futures::poll;
use futures::prelude::*;
use futures::stream::{FuturesOrdered, FuturesUnordered};
use fxhash::FxHashMap;
use lightproc::prelude::*;
use qutex::Qutex;
use std::cmp::{Eq, PartialEq};
use std::fmt::{self, Debug, Formatter};
use std::future::Future;
use std::iter::FromIterator;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Init(Box<dyn Fn(BastionContext) -> Exec + Send + Sync>);
struct Exec(Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>);
#[derive(Debug)]
pub struct Children {
bcast: Broadcast,
launched: FxHashMap<BastionId, (Sender, RecoverableHandle<()>)>,
init: Init,
redundancy: usize,
callbacks: Callbacks,
pre_start_msgs: Vec<BastionMessage>,
started: bool,
}
#[derive(Debug, Clone)]
pub struct ChildrenRef {
id: BastionId,
sender: Sender,
children: Vec<ChildRef>,
}
#[derive(Debug)]
pub(crate) struct Child {
bcast: Broadcast,
exec: Exec,
state: Qutex<ContextState>,
pre_start_msgs: Vec<BastionMessage>,
started: bool,
}
#[derive(Debug, Clone)]
pub struct ChildRef {
id: BastionId,
sender: Sender,
}
impl Init {
fn new<C, F>(init: C) -> Self
where
C: Fn(BastionContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
let init = Box::new(move |ctx: BastionContext| {
let fut = init(ctx);
let exec = Box::pin(fut);
Exec(exec)
});
Init(init)
}
}
impl Children {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Children({}): Initializing.", bcast.id());
let launched = FxHashMap::default();
let init = Init::default();
let redundancy = 1;
let callbacks = Callbacks::new();
let pre_start_msgs = Vec::new();
let started = false;
Children {
bcast,
launched,
init,
redundancy,
callbacks,
pre_start_msgs,
started,
}
}
fn stack(&self) -> ProcStack {
trace!("Children({}): Creating ProcStack.", self.id());
ProcStack::default()
}
pub(crate) async fn reset(&mut self, bcast: Broadcast) {
debug!(
"Children({}): Resetting to Children({}).",
self.id(),
bcast.id()
);
self.kill().await;
self.bcast = bcast;
self.started = false;
trace!(
"Children({}): Removing {} pre-start messages.",
self.id(),
self.pre_start_msgs.len()
);
self.pre_start_msgs.clear();
self.pre_start_msgs.shrink_to_fit();
self.launch_elems();
}
pub fn id(&self) -> &BastionId {
self.bcast.id()
}
pub(crate) fn bcast(&self) -> &Broadcast {
&self.bcast
}
pub(crate) fn callbacks(&self) -> &Callbacks {
&self.callbacks
}
pub(crate) fn as_ref(&self) -> ChildrenRef {
trace!(
"Children({}): Creating new ChildrenRef({}).",
self.id(),
self.id()
);
let id = self.bcast.id().clone();
let sender = self.bcast.sender().clone();
let mut children = Vec::with_capacity(self.launched.len());
for (id, (sender, _)) in &self.launched {
trace!("Children({}): Creating new ChildRef({}).", self.id(), id);
let child = ChildRef::new(id.clone(), sender.clone());
children.push(child);
}
ChildrenRef::new(id, sender, children)
}
pub fn with_exec<I, F>(mut self, init: I) -> Self
where
I: Fn(BastionContext) -> F + Send + Sync + 'static,
F: Future<Output = Result<(), ()>> + Send + 'static,
{
trace!("Children({}): Setting exec closure.", self.id());
self.init = Init::new(init);
self
}
pub fn with_redundancy(mut self, redundancy: usize) -> Self {
trace!(
"Children({}): Setting redundancy: {}",
self.id(),
redundancy
);
if redundancy == std::usize::MIN {
self.redundancy = redundancy.saturating_add(1);
} else {
self.redundancy = redundancy;
}
self
}
pub fn with_callbacks(mut self, callbacks: Callbacks) -> Self {
trace!(
"Children({}): Setting callbacks: {:?}",
self.id(),
callbacks
);
self.callbacks = callbacks;
self
}
async fn stop(&mut self) {
debug!("Children({}): Stopping.", self.id());
self.bcast.stop_children();
let launched = self.launched.drain().map(|(_, (_, launched))| launched);
FuturesUnordered::from_iter(launched)
.for_each_concurrent(None, |_| {
async {
trace!("Children({}): Unknown child stopped.", self.id());
}
})
.await;
}
async fn kill(&mut self) {
debug!("Children({}): Killing.", self.id());
self.bcast.kill_children();
let mut children = FuturesOrdered::new();
for (_, (_, launched)) in self.launched.drain() {
launched.cancel();
children.push(launched);
}
children
.for_each_concurrent(None, |_| {
async {
trace!("Children({}): Unknown child stopped.", self.id());
}
})
.await;
}
fn stopped(&mut self) {
debug!("Children({}): Stopped.", self.id());
self.bcast.stopped();
}
fn faulted(&mut self) {
debug!("Children({}): Faulted.", self.id());
self.bcast.faulted();
}
async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> {
match msg {
BastionMessage::Start => unreachable!(),
BastionMessage::Stop => {
self.stop().await;
self.stopped();
return Err(());
}
BastionMessage::Kill => {
self.kill().await;
self.stopped();
return Err(());
}
BastionMessage::Deploy(_) => unimplemented!(),
BastionMessage::Prune { .. } => unimplemented!(),
BastionMessage::SuperviseWith(_) => unimplemented!(),
BastionMessage::Message(ref message) => {
debug!(
"Children({}): Broadcasting a message: {:?}",
self.id(),
message
);
self.bcast.send_children(msg);
}
BastionMessage::Stopped { id } => {
if self.launched.contains_key(&id) {
debug!("Children({}): Child({}) stopped.", self.id(), id);
self.stop().await;
self.stopped();
return Err(());
}
}
BastionMessage::Faulted { id } => {
if self.launched.contains_key(&id) {
warn!("Children({}): Child({}) faulted.", self.id(), id);
self.kill().await;
self.faulted();
return Err(());
}
}
}
Ok(())
}
async fn run(mut self) -> Self {
debug!("Children({}): Launched.", self.id());
loop {
for (_, launched) in self.launched.values_mut() {
let _ = poll!(launched);
}
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(BastionMessage::Start)) => {
trace!(
"Children({}): Received a new message (started=false): {:?}",
self.id(),
BastionMessage::Start
);
debug!("Children({}): Starting.", self.id());
self.started = true;
let msg = BastionMessage::start();
self.bcast.send_children(msg);
let msgs = self.pre_start_msgs.drain(..).collect::<Vec<_>>();
self.pre_start_msgs.shrink_to_fit();
debug!(
"Children({}): Replaying messages received before starting.",
self.id()
);
for msg in msgs {
trace!("Children({}): Replaying message: {:?}", self.id(), msg);
if self.handle(msg).await.is_err() {
return self;
}
}
}
Poll::Ready(Some(msg)) if !self.started => {
trace!(
"Children({}): Received a new message (started=false): {:?}",
self.id(),
msg
);
self.pre_start_msgs.push(msg);
}
Poll::Ready(Some(msg)) => {
trace!(
"Children({}): Received a new message (started=true): {:?}",
self.id(),
msg
);
if self.handle(msg).await.is_err() {
return self;
}
}
Poll::Ready(None) => unreachable!(),
Poll::Pending => pending!(),
}
}
}
pub(crate) fn launch_elems(&mut self) {
debug!("Children({}): Launching elements.", self.id());
for _ in 0..self.redundancy {
let parent = Parent::children(self.as_ref());
let bcast = Broadcast::new(parent);
let id = bcast.id().clone();
let sender = bcast.sender().clone();
let child_ref = ChildRef::new(id.clone(), sender.clone());
let children = self.as_ref();
let supervisor = self.bcast.parent().clone().into_supervisor();
let state = ContextState::new();
let state = Qutex::new(state);
let ctx = BastionContext::new(id, child_ref, children, supervisor, state.clone());
let exec = (self.init.0)(ctx);
self.bcast.register(&bcast);
debug!(
"Children({}): Initializing Child({}).",
self.id(),
bcast.id()
);
let child = Child::new(exec, bcast, state);
debug!("Children({}): Launching Child({}).", self.id(), child.id());
let id = child.id().clone();
let launched = child.launch();
self.launched.insert(id, (sender, launched));
}
}
pub(crate) fn launch(self) -> RecoverableHandle<Self> {
debug!("Children({}): Launching.", self.id());
let stack = self.stack();
pool::spawn(self.run(), stack)
}
}
impl ChildrenRef {
fn new(id: BastionId, sender: Sender, children: Vec<ChildRef>) -> Self {
ChildrenRef {
id,
sender,
children,
}
}
pub fn id(&self) -> &BastionId {
&self.id
}
pub fn elems(&self) -> &[ChildRef] {
&self.children
}
pub fn broadcast<M: Message>(&self, msg: M) -> Result<(), M> {
debug!(
"ChildrenRef({}): Broadcasting message: {:?}",
self.id(),
msg
);
let msg = BastionMessage::broadcast(msg);
self.send(msg).map_err(|err| err.into_msg().unwrap())
}
pub fn stop(&self) -> Result<(), ()> {
debug!("ChildrenRef({}): Stopping.", self.id());
let msg = BastionMessage::stop();
self.send(msg).map_err(|_| ())
}
pub fn kill(&self) -> Result<(), ()> {
debug!("ChildrenRef({}): Killing.", self.id());
let msg = BastionMessage::kill();
self.send(msg).map_err(|_| ())
}
pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> {
trace!("ChildrenRef({}): Sending message: {:?}", self.id(), msg);
self.sender
.unbounded_send(msg)
.map_err(|err| err.into_inner())
}
}
impl Child {
fn new(exec: Exec, bcast: Broadcast, state: Qutex<ContextState>) -> Self {
debug!("Child({}): Initializing.", bcast.id());
let pre_start_msgs = Vec::new();
let started = false;
Child {
bcast,
exec,
state,
pre_start_msgs,
started,
}
}
fn stack(&self) -> ProcStack {
trace!("Child({}): Creating ProcStack.", self.id());
let id = self.bcast.id().clone();
let parent = self.bcast.parent().clone().into_children().unwrap();
ProcStack::default().with_after_panic(move || {
let id = id.clone();
warn!("Child({}): Panicked.", id);
let msg = BastionMessage::faulted(id);
parent.send(msg).ok();
})
}
fn id(&self) -> &BastionId {
self.bcast.id()
}
fn stopped(&mut self) {
debug!("Child({}): Stopped.", self.id());
self.bcast.stopped();
}
fn faulted(&mut self) {
debug!("Child({}): Faulted.", self.id());
self.bcast.faulted();
}
async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> {
match msg {
BastionMessage::Start => unreachable!(),
BastionMessage::Stop => {
self.stopped();
return Err(());
}
BastionMessage::Kill => {
self.stopped();
return Err(());
}
BastionMessage::Deploy(_) => unimplemented!(),
BastionMessage::Prune { .. } => unimplemented!(),
BastionMessage::SuperviseWith(_) => unimplemented!(),
BastionMessage::Message(msg) => {
debug!("Child({}): Received a message: {:?}", self.id(), msg);
let mut state = self.state.clone().lock_async().await.map_err(|_| ())?;
state.push_msg(msg);
}
BastionMessage::Stopped { .. } => unimplemented!(),
BastionMessage::Faulted { .. } => unimplemented!(),
}
Ok(())
}
async fn run(mut self) {
debug!("Child({}): Launched.", self.id());
loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(BastionMessage::Start)) => {
trace!(
"Child({}): Received a new message (started=false): {:?}",
self.id(),
BastionMessage::Start
);
debug!("Child({}): Starting.", self.id());
self.started = true;
let msgs = self.pre_start_msgs.drain(..).collect::<Vec<_>>();
self.pre_start_msgs.shrink_to_fit();
debug!(
"Child({}): Replaying messages received before starting.",
self.id()
);
for msg in msgs {
trace!("Child({}): Replaying message: {:?}", self.id(), msg);
if self.handle(msg).await.is_err() {
return;
}
}
continue;
}
Poll::Ready(Some(msg)) if !self.started => {
trace!(
"Child({}): Received a new message (started=false): {:?}",
self.id(),
msg
);
self.pre_start_msgs.push(msg);
continue;
}
Poll::Ready(Some(msg)) => {
trace!(
"Child({}): Received a new message (started=true): {:?}",
self.id(),
msg
);
if self.handle(msg).await.is_err() {
return;
}
continue;
}
Poll::Ready(None) => unreachable!(),
Poll::Pending => (),
}
if !self.started {
pending!();
continue;
}
match poll!(&mut self.exec) {
Poll::Ready(Ok(())) => {
debug!(
"Child({}): The future finished executing successfully.",
self.id()
);
return self.stopped();
}
Poll::Ready(Err(())) => {
warn!("Child({}): The future returned an error.", self.id());
return self.faulted();
}
Poll::Pending => (),
}
pending!();
}
}
fn launch(self) -> RecoverableHandle<()> {
let stack = self.stack();
pool::spawn(self.run(), stack)
}
}
impl ChildRef {
fn new(id: BastionId, sender: Sender) -> ChildRef {
ChildRef { id, sender }
}
pub fn id(&self) -> &BastionId {
&self.id
}
pub fn tell<M: Message>(&self, msg: M) -> Result<(), M> {
debug!("ChildRef({}): Telling message: {:?}", self.id(), msg);
let msg = BastionMessage::tell(msg);
self.send(msg).map_err(|msg| msg.into_msg().unwrap())
}
pub fn ask<M: Message>(&self, msg: M) -> Result<Answer, M> {
debug!("ChildRef({}): Asking message: {:?}", self.id(), msg);
let (msg, answer) = BastionMessage::ask(msg);
self.send(msg).map_err(|msg| msg.into_msg().unwrap())?;
Ok(answer)
}
pub fn stop(&self) -> Result<(), ()> {
debug!("ChildRef({}): Stopping.", self.id);
let msg = BastionMessage::stop();
self.send(msg).map_err(|_| ())
}
pub fn kill(&self) -> Result<(), ()> {
debug!("ChildRef({}): Killing.", self.id());
let msg = BastionMessage::kill();
self.send(msg).map_err(|_| ())
}
pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> {
trace!("ChildRef({}): Sending message: {:?}", self.id(), msg);
self.sender
.unbounded_send(msg)
.map_err(|err| err.into_inner())
}
}
impl Future for Exec {
type Output = Result<(), ()>;
fn poll(self: Pin<&mut Self>, ctx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.get_mut().0).poll(ctx)
}
}
impl Default for Init {
fn default() -> Self {
Init::new(|_| async { Ok(()) })
}
}
impl PartialEq for ChildrenRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl PartialEq for ChildRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for ChildrenRef {}
impl Eq for ChildRef {}
impl Debug for Init {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Init").finish()
}
}
impl Debug for Exec {
fn fmt(&self, fmt: &mut Formatter) -> fmt::Result {
fmt.debug_struct("Exec").finish()
}
}