use crate::broadcast::{Broadcast, Parent, Sender};
use crate::callbacks::Callbacks;
use crate::children::{Children, ChildrenRef};
use crate::context::BastionId;
use crate::message::{BastionMessage, Deployment, Message};
use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::{pending, poll};
use fxhash::FxHashMap;
use lightproc::prelude::*;
use log::Level;
use std::cmp::{Eq, PartialEq};
use std::ops::RangeFrom;
use std::task::Poll;
#[derive(Debug)]
pub struct Supervisor {
bcast: Broadcast,
order: Vec<BastionId>,
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>)>,
stopped: FxHashMap<BastionId, Supervised>,
killed: FxHashMap<BastionId, Supervised>,
strategy: SupervisionStrategy,
callbacks: Callbacks,
is_system_supervisor: bool,
pre_start_msgs: Vec<BastionMessage>,
started: bool,
}
#[derive(Debug, Clone)]
pub struct SupervisorRef {
id: BastionId,
sender: Sender,
}
#[derive(Debug, Clone)]
pub enum SupervisionStrategy {
OneForOne,
OneForAll,
RestForOne,
}
#[derive(Debug)]
enum Supervised {
Supervisor(Supervisor),
Children(Children),
}
impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
let order = Vec::new();
let launched = FxHashMap::default();
let stopped = FxHashMap::default();
let killed = FxHashMap::default();
let strategy = SupervisionStrategy::default();
let callbacks = Callbacks::new();
let is_system_supervisor = false;
let pre_start_msgs = Vec::new();
let started = false;
Supervisor {
bcast,
order,
launched,
stopped,
killed,
strategy,
callbacks,
is_system_supervisor,
pre_start_msgs,
started,
}
}
pub(crate) fn system(bcast: Broadcast) -> Self {
let mut supervisor = Supervisor::new(bcast);
supervisor.is_system_supervisor = true;
supervisor
}
fn stack(&self) -> ProcStack {
trace!("Supervisor({}): Creating ProcStack.", self.id());
ProcStack::default()
}
pub(crate) async fn reset(&mut self, bcast: Option<Broadcast>) {
if log_enabled!(Level::Debug) {
if let Some(bcast) = &bcast {
debug!(
"Supervisor({}): Resetting to Supervisor({}).",
self.id(),
bcast.id()
);
} else {
debug!(
"Supervisor({}): Resetting to Supervisor({}).",
self.id(),
self.id()
);
}
}
self.kill(0..).await;
if let Some(bcast) = bcast {
self.bcast = bcast;
} else {
self.bcast.clear_children();
}
debug!(
"Supervisor({}): Removing {} pre-start messages.",
self.id(),
self.pre_start_msgs.len()
);
self.pre_start_msgs.clear();
self.pre_start_msgs.shrink_to_fit();
self.restart(0..).await;
debug!(
"Supervisor({}): Removing {} stopped elements.",
self.id(),
self.stopped.len()
);
self.stopped.clear();
self.stopped.shrink_to_fit();
debug!(
"Supervisor({}): Removing {} killed elements.",
self.id(),
self.killed.len()
);
self.killed.clear();
self.killed.shrink_to_fit();
}
pub fn id(&self) -> &BastionId {
&self.bcast.id()
}
pub(crate) fn bcast(&self) -> &Broadcast {
&self.bcast
}
pub(crate) fn callbacks(&self) -> &Callbacks {
&self.callbacks
}
pub(crate) fn as_ref(&self) -> SupervisorRef {
trace!(
"Supervisor({}): Creating new SupervisorRef({}).",
self.id(),
self.id()
);
let id = self.bcast.id().clone();
let sender = self.bcast.sender().clone();
SupervisorRef::new(id, sender)
}
pub fn supervisor<S>(self, init: S) -> Self
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("Supervisor({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent);
debug!(
"Supervisor({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
debug!("Supervisor({}): Initialized.", supervisor.id());
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
self.bcast.send_self(msg);
self
}
pub fn supervisor_ref<S>(&mut self, init: S) -> SupervisorRef
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("Supervisor({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent);
debug!(
"Supervisor({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
debug!("Supervisor({}): Initialized.", supervisor.id());
let supervisor_ref = supervisor.as_ref();
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
self.bcast.send_self(msg);
supervisor_ref
}
pub fn children<C>(self, init: C) -> Self
where
C: FnOnce(Children) -> Children,
{
debug!("Supervisor({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent);
debug!(
"Supervisor({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
children.launch_elems();
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
self.bcast.send_self(msg);
self
}
pub fn children_ref<C>(&self, init: C) -> ChildrenRef
where
C: FnOnce(Children) -> Children,
{
debug!("Supervisor({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent);
debug!(
"Supervisor({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
children.launch_elems();
let children_ref = children.as_ref();
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
self.bcast.send_self(msg);
children_ref
}
pub fn with_strategy(mut self, strategy: SupervisionStrategy) -> Self {
trace!(
"Supervisor({}): Setting strategy: {:?}",
self.id(),
strategy
);
self.strategy = strategy;
self
}
pub fn with_callbacks(mut self, callbacks: Callbacks) -> Self {
trace!(
"Supervisor({}): Setting callbacks: {:?}",
self.id(),
callbacks
);
self.callbacks = callbacks;
self
}
async fn restart(&mut self, range: RangeFrom<usize>) {
debug!("Supervisor({}): Restarting range: {:?}", self.id(), range);
self.kill(range.clone()).await;
let supervisor_id = &self.id().clone();
let parent = Parent::supervisor(self.as_ref());
let mut reset = FuturesOrdered::new();
for id in self.order.drain(range) {
let (killed, supervised) = if let Some(supervised) = self.stopped.remove(&id) {
(false, supervised)
} else if let Some(supervised) = self.killed.remove(&id) {
(true, supervised)
} else {
unimplemented!();
};
if killed {
supervised.callbacks().before_restart();
}
let bcast = Broadcast::new(parent.clone());
reset.push(async move {
debug!(
"Supervisor({}): Resetting Supervised({}) (killed={}) to Supervised({}).",
supervisor_id,
supervised.id(),
killed,
bcast.id()
);
let supervised = supervised.reset(bcast).await.unwrap();
if killed {
supervised.callbacks().after_restart();
} else {
supervised.callbacks().before_start();
}
supervised
})
}
trace!(
"Supervisor({}): Resetting {} elements.",
self.id(),
reset.len()
);
while let Some(supervised) = reset.next().await {
self.bcast.register(supervised.bcast());
if self.started {
let msg = BastionMessage::start();
self.bcast.send_child(supervised.id(), msg);
}
debug!(
"Supervisor({}): Launching Supervised({}).",
self.id(),
supervised.id()
);
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
self.order.push(id);
}
}
async fn stop(&mut self, range: RangeFrom<usize>) {
debug!("Supervisor({}): Stopping range: {:?}", self.id(), range);
if range.start == 0 {
self.bcast.stop_children();
} else {
for id in self.order.get(range.clone()).unwrap() {
trace!("Supervised({}): Stopping Supervised({}).", self.id(), id);
self.bcast.stop_child(id);
}
}
let mut supervised = FuturesOrdered::new();
for id in self.order.get(range.clone()).unwrap() {
if let Some((_, launched)) = self.launched.remove(&id) {
supervised.push(launched);
}
}
while let Some(supervised) = supervised.next().await {
match supervised {
Some(supervised) => {
trace!(
"Supervisor({}): Supervised({}) stopped.",
self.id(),
supervised.id()
);
supervised.callbacks().after_stop();
let id = supervised.id().clone();
self.stopped.insert(id, supervised);
}
None => unimplemented!(),
}
}
}
async fn kill(&mut self, range: RangeFrom<usize>) {
debug!("Supervisor({}): Killing range: {:?}", self.id(), range);
if range.start == 0 {
self.bcast.kill_children();
} else {
for id in self.order.get(range.clone()).unwrap() {
trace!("Supervised({}): Killing Supervised({}).", self.id(), id);
self.bcast.kill_child(id);
}
}
let mut supervised = FuturesOrdered::new();
for id in self.order.get(range.clone()).unwrap() {
if let Some((_, launched)) = self.launched.remove(&id) {
supervised.push(launched);
}
}
while let Some(supervised) = supervised.next().await {
match supervised {
Some(supervised) => {
trace!(
"Supervisor({}): Supervised({}) stopped.",
self.id(),
supervised.id()
);
let id = supervised.id().clone();
self.killed.insert(id, supervised);
}
None => unimplemented!(),
}
}
}
fn stopped(&mut self) {
debug!("Supervisor({}): Stopped.", self.id());
self.bcast.stopped();
}
fn faulted(&mut self) {
debug!("Supervisor({}): Faulted.", self.id());
self.bcast.faulted();
}
async fn recover(&mut self, id: BastionId) -> Result<(), ()> {
debug!(
"Supervisor({}): Recovering using strategy: {:?}",
self.id(),
self.strategy
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let (order, launched) = self.launched.remove(&id).ok_or(())?;
let supervised = launched.await.unwrap();
supervised.callbacks().before_restart();
self.bcast.unregister(supervised.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent);
let id = bcast.id().clone();
debug!(
"Supervisor({}): Resetting Supervised({}) to Supervised({}).",
self.id(),
supervised.id(),
bcast.id()
);
let supervised = supervised.reset(bcast).await.unwrap();
supervised.callbacks().after_restart();
self.bcast.register(supervised.bcast());
if self.started {
let msg = BastionMessage::start();
self.bcast.send_child(&id, msg);
}
debug!(
"Supervisor({}): Launching Supervised({}).",
self.id(),
supervised.id()
);
let launched = supervised.launch();
self.launched.insert(id.clone(), (order, launched));
self.order[order] = id;
}
SupervisionStrategy::OneForAll => {
self.restart(0..).await;
self.stopped.shrink_to_fit();
self.killed.shrink_to_fit();
}
SupervisionStrategy::RestForOne => {
let (start, _) = self.launched.get(&id).ok_or(())?;
let start = *start;
self.restart(start..).await;
}
}
Ok(())
}
async fn handle(&mut self, msg: BastionMessage) -> Result<(), ()> {
match msg {
BastionMessage::Start => unreachable!(),
BastionMessage::Stop => {
self.stop(0..).await;
self.stopped();
return Err(());
}
BastionMessage::Kill => {
self.kill(0..).await;
self.stopped();
return Err(());
}
BastionMessage::Deploy(deployment) => {
let supervised = match deployment {
Deployment::Supervisor(supervisor) => {
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
supervisor.callbacks().before_start();
Supervised::supervisor(supervisor)
}
Deployment::Children(children) => {
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
children.callbacks().before_start();
Supervised::children(children)
}
};
self.bcast.register(supervised.bcast());
if self.started {
let msg = BastionMessage::start();
self.bcast.send_child(supervised.id(), msg);
}
debug!(
"Supervisor({}): Launching Supervised({}).",
self.id(),
supervised.id()
);
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
self.order.push(id);
}
BastionMessage::Prune { .. } => unimplemented!(),
BastionMessage::SuperviseWith(strategy) => {
debug!(
"Supervisor({}): Setting strategy: {:?}",
self.id(),
strategy
);
self.strategy = strategy;
}
BastionMessage::Message(ref message) => {
debug!(
"Supervisor({}): Broadcasting a message: {:?}",
self.id(),
message
);
self.bcast.send_children(msg);
}
BastionMessage::Stopped { id } => {
if let Some((_, launched)) = self.launched.remove(&id) {
debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id);
let supervised = launched.await.unwrap();
supervised.callbacks().after_stop();
self.bcast.unregister(&id);
self.stopped.insert(id, supervised);
}
}
BastionMessage::Faulted { id } => {
if self.launched.contains_key(&id) {
warn!("Supervisor({}): Supervised({}) faulted.", self.id(), id);
}
if self.recover(id).await.is_err() {
self.kill(0..).await;
self.faulted();
return Err(());
}
}
}
Ok(())
}
async fn run(mut self) -> Self {
debug!("Supervisor({}): Launched.", self.id());
loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(BastionMessage::Start)) => {
trace!(
"Supervisor({}): Received a new message (started=false): {:?}",
self.id(),
BastionMessage::Start
);
debug!("Supervisor({}): Starting.", self.id());
self.started = true;
let msg = BastionMessage::start();
self.bcast.send_children(msg);
let msgs = self.pre_start_msgs.drain(..).collect::<Vec<_>>();
self.pre_start_msgs.shrink_to_fit();
debug!(
"Supervisor({}): Replaying messages received before starting.",
self.id()
);
for msg in msgs {
trace!("Supervisor({}): Replaying message: {:?}", self.id(), msg);
if self.handle(msg).await.is_err() {
return self;
}
}
}
Poll::Ready(Some(msg)) if !self.started => {
trace!(
"Supervisor({}): Received a new message (started=false): {:?}",
self.id(),
msg
);
self.pre_start_msgs.push(msg);
}
Poll::Ready(Some(msg)) => {
trace!(
"Supervisor({}): Received a new message (started=true): {:?}",
self.id(),
msg
);
if self.handle(msg).await.is_err() {
return self;
}
}
Poll::Ready(None) => unreachable!(),
Poll::Pending => pending!(),
}
}
}
pub(crate) fn launch(self) -> RecoverableHandle<Self> {
debug!("Supervisor({}): Launching.", self.id());
let stack = self.stack();
pool::spawn(self.run(), stack)
}
}
impl SupervisorRef {
pub(crate) fn new(id: BastionId, sender: Sender) -> Self {
SupervisorRef { id, sender }
}
pub fn id(&self) -> &BastionId {
&self.id
}
pub fn supervisor<S>(&self, init: S) -> Result<Self, ()>
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("SupervisorRef({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.clone());
let bcast = Broadcast::new(parent);
debug!(
"SupervisorRef({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
let supervisor_ref = supervisor.as_ref();
debug!("Supervisor({}): Initialized.", supervisor.id());
debug!(
"SupervisorRef({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
self.send(msg).map_err(|_| ())?;
Ok(supervisor_ref)
}
pub fn children<C>(&self, init: C) -> Result<ChildrenRef, ()>
where
C: FnOnce(Children) -> Children,
{
debug!("SupervisorRef({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.clone());
let bcast = Broadcast::new(parent);
debug!(
"SupervisorRef({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
children.launch_elems();
let children_ref = children.as_ref();
debug!(
"SupervisorRef({}): Deplying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
self.send(msg).map_err(|_| ())?;
Ok(children_ref)
}
pub fn strategy(&self, strategy: SupervisionStrategy) -> Result<(), ()> {
debug!(
"SupervisorRef({}): Setting strategy: {:?}",
self.id(),
strategy
);
let msg = BastionMessage::supervise_with(strategy);
self.send(msg).map_err(|_| ())
}
pub fn broadcast<M: Message>(&self, msg: M) -> Result<(), M> {
debug!(
"SupervisorRef({}): Broadcasting message: {:?}",
self.id(),
msg
);
let msg = BastionMessage::broadcast(msg);
self.send(msg).map_err(|msg| msg.into_msg().unwrap())
}
pub fn stop(&self) -> Result<(), ()> {
debug!("SupervisorRef({}): Stopping.", self.id());
let msg = BastionMessage::stop();
self.send(msg).map_err(|_| ())
}
pub fn kill(&self) -> Result<(), ()> {
debug!("SupervisorRef({}): Killing.", self.id());
let msg = BastionMessage::kill();
self.send(msg).map_err(|_| ())
}
pub(crate) fn send(&self, msg: BastionMessage) -> Result<(), BastionMessage> {
trace!("SupervisorRef({}): Sending message: {:?}", self.id(), msg);
self.sender
.unbounded_send(msg)
.map_err(|err| err.into_inner())
}
}
impl Supervised {
fn supervisor(supervisor: Supervisor) -> Self {
Supervised::Supervisor(supervisor)
}
fn children(children: Children) -> Self {
Supervised::Children(children)
}
fn stack(&self) -> ProcStack {
trace!("Supervised({}): Creating ProcStack.", self.id());
ProcStack::default()
}
fn reset(self, bcast: Broadcast) -> RecoverableHandle<Self> {
debug!(
"Supervised({}): Resetting to Supervised({}).",
self.id(),
bcast.id()
);
let stack = self.stack();
match self {
Supervised::Supervisor(mut supervisor) => pool::spawn(
async {
supervisor.reset(Some(bcast)).await;
Supervised::Supervisor(supervisor)
},
stack,
),
Supervised::Children(mut children) => pool::spawn(
async {
children.reset(bcast).await;
Supervised::Children(children)
},
stack,
),
}
}
fn id(&self) -> &BastionId {
match self {
Supervised::Supervisor(supervisor) => supervisor.id(),
Supervised::Children(children) => children.id(),
}
}
fn bcast(&self) -> &Broadcast {
match self {
Supervised::Supervisor(supervisor) => supervisor.bcast(),
Supervised::Children(children) => children.bcast(),
}
}
fn callbacks(&self) -> &Callbacks {
match self {
Supervised::Supervisor(supervisor) => supervisor.callbacks(),
Supervised::Children(children) => children.callbacks(),
}
}
fn launch(self) -> RecoverableHandle<Self> {
debug!("Supervised({}): Launching.", self.id());
let stack = self.stack();
match self {
Supervised::Supervisor(supervisor) => {
pool::spawn(
async {
let supervisor = supervisor.launch().await.unwrap();
Supervised::Supervisor(supervisor)
},
stack,
)
}
Supervised::Children(children) => {
pool::spawn(
async {
let children = children.launch().await.unwrap();
Supervised::Children(children)
},
stack,
)
}
}
}
}
impl Default for SupervisionStrategy {
fn default() -> Self {
SupervisionStrategy::OneForOne
}
}
impl PartialEq for SupervisorRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for SupervisorRef {}