use crate::broadcast::{Broadcast, Parent, Sender};
use crate::callbacks::Callbacks;
use crate::children::Children;
use crate::children_ref::ChildrenRef;
use crate::context::{BastionId, ContextState};
use crate::envelope::Envelope;
use crate::message::{BastionMessage, Deployment, Message};
use crate::path::{BastionPath, BastionPathElement};
use async_mutex::Mutex;
use bastion_executor::pool;
use futures::prelude::*;
use futures::stream::FuturesOrdered;
use futures::{pending, poll};
use futures_timer::Delay;
use fxhash::FxHashMap;
use lightproc::prelude::*;
use std::cmp::{Eq, PartialEq};
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
use tracing::{debug, trace, warn};
#[derive(Debug)]
pub struct Supervisor {
bcast: Broadcast,
order: Vec<BastionId>,
tracked_groups: FxHashMap<BastionId, Vec<TrackedChildState>>,
tracked_groups_order: FxHashMap<BastionId, usize>,
launched: FxHashMap<BastionId, (usize, RecoverableHandle<Supervised>)>,
stopped: FxHashMap<BastionId, Supervised>,
killed: FxHashMap<BastionId, Supervised>,
strategy: SupervisionStrategy,
restart_strategy: RestartStrategy,
callbacks: Callbacks,
is_system_supervisor: bool,
pre_start_msgs: Vec<Envelope>,
started: bool,
subtree_restarts: usize,
subtree_restarts_limit: usize,
}
#[derive(Debug, Clone)]
struct TrackedChildState {
id: BastionId,
state: Arc<Mutex<Pin<Box<ContextState>>>>,
restarts_counts: usize,
}
#[derive(Debug)]
enum RestartedElement {
Supervisor(BastionId),
Child { id: BastionId, parent_id: BastionId },
}
#[derive(Debug)]
enum ActorSearchMethod {
OneActor { id: BastionId, parent_id: BastionId },
FromActor { id: BastionId, parent_id: BastionId },
All,
}
#[derive(Debug, Clone)]
pub struct SupervisorRef {
id: BastionId,
sender: Sender,
path: Arc<BastionPath>,
}
#[derive(Debug, Clone)]
pub enum SupervisionStrategy {
OneForOne,
OneForAll,
RestForOne,
}
#[derive(Debug)]
enum Supervised {
Supervisor(Supervisor),
Children(Children),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum RestartPolicy {
Always,
Never,
Tries(usize),
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub struct RestartStrategy {
restart_policy: RestartPolicy,
strategy: ActorRestartStrategy,
}
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum ActorRestartStrategy {
Immediate,
LinearBackOff {
timeout: Duration,
},
ExponentialBackOff {
timeout: Duration,
multiplier: u64,
},
}
impl Supervisor {
pub(crate) fn new(bcast: Broadcast) -> Self {
debug!("Supervisor({}): Initializing.", bcast.id());
let order = Vec::new();
let tracked_groups = FxHashMap::default();
let tracked_groups_order = FxHashMap::default();
let launched = FxHashMap::default();
let stopped = FxHashMap::default();
let killed = FxHashMap::default();
let strategy = SupervisionStrategy::default();
let restart_strategy = RestartStrategy::default();
let callbacks = Callbacks::new();
let is_system_supervisor = false;
let pre_start_msgs = Vec::new();
let started = false;
let subtree_restarts = 0;
let subtree_restarts_limit = 3;
Supervisor {
bcast,
order,
tracked_groups,
tracked_groups_order,
launched,
stopped,
killed,
strategy,
restart_strategy,
callbacks,
is_system_supervisor,
pre_start_msgs,
started,
subtree_restarts,
subtree_restarts_limit,
}
}
pub(crate) fn system(bcast: Broadcast) -> Self {
let mut supervisor = Supervisor::new(bcast);
supervisor.is_system_supervisor = true;
supervisor
}
fn stack(&self) -> ProcStack {
trace!("Supervisor({}): Creating ProcStack.", self.id());
ProcStack::default()
}
pub(crate) async fn reset(&mut self, bcast: Option<Broadcast>) {
if let Some(bcast) = &bcast {
debug!(
"Supervisor({}): Resetting to Supervisor({}).",
self.id(),
bcast.id()
);
} else {
debug!(
"Supervisor({}): Resetting to Supervisor({}).",
self.id(),
self.id()
);
}
self.kill(0..self.order.len()).await;
if let Some(bcast) = bcast {
self.bcast = bcast;
} else {
self.bcast.clear_children();
}
debug!(
"Supervisor({}): Removing {} pre-start messages.",
self.id(),
self.pre_start_msgs.len()
);
self.pre_start_msgs.clear();
self.pre_start_msgs.shrink_to_fit();
let restarted_objects = self.search_restarted_objects(ActorSearchMethod::All);
self.restart(restarted_objects).await;
debug!(
"Supervisor({}): Removing {} stopped elements.",
self.id(),
self.stopped.len()
);
self.stopped.clear();
self.stopped.shrink_to_fit();
debug!(
"Supervisor({}): Removing {} killed elements.",
self.id(),
self.killed.len()
);
self.killed.clear();
self.killed.shrink_to_fit();
}
pub fn id(&self) -> &BastionId {
&self.bcast.id()
}
pub(crate) fn bcast(&self) -> &Broadcast {
&self.bcast
}
pub(crate) fn callbacks(&self) -> &Callbacks {
&self.callbacks
}
pub(crate) fn as_ref(&self) -> SupervisorRef {
trace!(
"Supervisor({}): Creating new SupervisorRef({}).",
self.id(),
self.id()
);
let id = self.bcast.id().clone();
let sender = self.bcast.sender().clone();
let path = self.bcast.path().clone();
SupervisorRef::new(id, sender, path)
}
pub fn supervisor<S>(self, init: S) -> Self
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("Supervisor({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));
debug!(
"Supervisor({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
debug!("Supervisor({}): Initialized.", supervisor.id());
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_self(env);
self
}
pub fn supervisor_ref<S>(&mut self, init: S) -> SupervisorRef
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("Supervisor({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));
debug!(
"Supervisor({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
debug!("Supervisor({}): Initialized.", supervisor.id());
let supervisor_ref = supervisor.as_ref();
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_self(env);
supervisor_ref
}
pub fn children<C>(self, init: C) -> Self
where
C: FnOnce(Children) -> Children,
{
debug!("Supervisor({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Children(BastionId::new()));
debug!(
"Supervisor({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
if let Err(e) = children.register_dispatchers() {
warn!("couldn't register all dispatchers into the registry: {}", e);
};
children.launch_elems();
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_self(env);
self
}
pub fn children_ref<C>(&self, init: C) -> ChildrenRef
where
C: FnOnce(Children) -> Children,
{
debug!("Supervisor({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.as_ref());
let bcast = Broadcast::new(parent, BastionPathElement::Children(BastionId::new()));
debug!(
"Supervisor({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
children.launch_elems();
let children_ref = children.as_ref();
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_self(env);
children_ref
}
pub fn with_strategy(mut self, strategy: SupervisionStrategy) -> Self {
trace!(
"Supervisor({}): Setting strategy: {:?}",
self.id(),
strategy
);
self.strategy = strategy;
self
}
pub fn with_restart_strategy(mut self, restart_strategy: RestartStrategy) -> Self {
trace!(
"Supervisor({}): Setting actor restart strategy: {:?}",
self.id(),
restart_strategy
);
self.restart_strategy = restart_strategy;
self
}
pub fn with_callbacks(mut self, callbacks: Callbacks) -> Self {
trace!(
"Supervisor({}): Setting callbacks: {:?}",
self.id(),
callbacks
);
self.callbacks = callbacks;
self
}
async fn restart(&mut self, objects: Vec<RestartedElement>) {
debug!(
"Supervisor({}): Restarting {:?} elements",
self.id(),
objects.len()
);
let mut restart_futures = FuturesOrdered::new();
for object in objects {
match object {
RestartedElement::Supervisor(supervisor_id) => {
let msg = BastionMessage::restart_subtree();
let env =
Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_child(&supervisor_id, env);
}
RestartedElement::Child { id, parent_id } => {
let index = match self.tracked_groups_order.get(&id) {
Some(index) => *index,
None => continue,
};
let childs = match self.tracked_groups.get_mut(&parent_id) {
Some(childs) => childs,
None => continue,
};
let tracked_state = match childs.get_mut(index) {
Some(tracked_state) => tracked_state,
None => continue,
};
let restarts_count = tracked_state.restarts_count();
let restart_required = match self.restart_strategy.restart_policy() {
RestartPolicy::Always => true,
RestartPolicy::Never => false,
RestartPolicy::Tries(max_retries) => restarts_count < max_retries,
};
let msg = match restart_required {
true => {
tracked_state.increase_restarts_counter();
let state = tracked_state.state();
BastionMessage::restore_child(id, state)
}
false => {
self.remove_child(&id.clone(), &parent_id.clone());
BastionMessage::drop_child(id)
}
};
let restart_strategy = self.restart_strategy.clone();
restart_futures.push(async move {
if restart_required {
restart_strategy.apply_strategy(restarts_count).await;
}
(parent_id, msg)
});
}
}
}
while let Some((receiver, msg)) = restart_futures.next().await {
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_child(&receiver, env);
}
}
fn remove_child(&mut self, id: &BastionId, parent_id: &BastionId) {
let index = match self.tracked_groups_order.get(id) {
Some(index) => *index,
None => return,
};
let childs = match self.tracked_groups.get_mut(parent_id) {
Some(childs) => childs,
None => return,
};
childs.remove(index);
for (new_index, state) in childs.iter().enumerate() {
let child_id = state.id.clone();
self.tracked_groups_order.insert(child_id, new_index);
}
}
async fn stop(&mut self, range: Range<usize>) {
debug!("Supervisor({}): Stopping range: {:?}", self.id(), range);
if range.start == 0 {
self.bcast.stop_children();
} else {
for id in self.order.get(range.clone()).unwrap() {
trace!("Supervised({}): Stopping Supervised({}).", self.id(), id);
self.bcast.stop_child(id);
}
}
let mut supervised = FuturesOrdered::new();
for id in self.order.get(range.clone()).unwrap() {
if let Some((_, launched)) = self.launched.remove(&id) {
supervised.push(launched);
}
}
while let Some(supervised) = supervised.next().await {
match supervised {
Some(supervised) => {
trace!(
"Supervisor({}): Supervised({}) stopped.",
self.id(),
supervised.id()
);
supervised.callbacks().after_stop();
let id = supervised.id().clone();
self.stopped.insert(id, supervised);
}
None => unimplemented!(),
}
}
}
async fn kill(&mut self, range: Range<usize>) {
debug!("Supervisor({}): Killing range: {:?}", self.id(), range);
if range.start == 0 {
self.bcast.kill_children();
} else {
for id in self.order.get(range.clone()).unwrap() {
trace!("Supervised({}): Killing Supervised({}).", self.id(), id);
self.bcast.kill_child(id);
}
}
let mut supervised = FuturesOrdered::new();
for id in self.order.get(range.clone()).unwrap() {
if let Some((_, launched)) = self.launched.remove(&id) {
supervised.push(launched);
}
}
while let Some(supervised) = supervised.next().await {
match supervised {
Some(supervised) => {
trace!(
"Supervisor({}): Supervised({}) stopped.",
self.id(),
supervised.id()
);
let id = supervised.id().clone();
self.killed.insert(id, supervised);
}
None => unimplemented!(),
}
}
}
fn stopped(&mut self) {
debug!("Supervisor({}): Stopped.", self.id());
self.bcast.stopped();
}
fn faulted(&mut self) {
debug!("Supervisor({}): Faulted.", self.id());
self.bcast.faulted();
}
async fn recover(&mut self, id: BastionId, parent_id: BastionId) -> Result<(), ()> {
debug!(
"Supervisor({}): Recovering using strategy: {:?}",
self.id(),
self.strategy
);
match self.strategy {
SupervisionStrategy::OneForOne => {
let search_method = ActorSearchMethod::OneActor { id, parent_id };
let objects = self.search_restarted_objects(search_method);
self.restart(objects).await;
}
SupervisionStrategy::OneForAll => {
let search_method = ActorSearchMethod::All;
let objects = self.search_restarted_objects(search_method);
self.restart(objects).await;
self.stopped.shrink_to_fit();
self.killed.shrink_to_fit();
}
SupervisionStrategy::RestForOne => {
let search_method = ActorSearchMethod::FromActor { id, parent_id };
let objects = self.search_restarted_objects(search_method);
self.restart(objects).await;
}
}
Ok(())
}
fn search_restarted_objects(&self, search_method: ActorSearchMethod) -> Vec<RestartedElement> {
let mut objects = Vec::new();
match search_method {
ActorSearchMethod::OneActor { id, parent_id } => {
let element = match self.tracked_groups.contains_key(&parent_id) {
true => RestartedElement::Child { id, parent_id },
false => RestartedElement::Supervisor(id),
};
objects.push(element)
}
ActorSearchMethod::FromActor { id, parent_id } => {
let childs = self.tracked_groups.get(&parent_id).unwrap();
let start_index = *self.tracked_groups_order.get(&id).unwrap();
childs.iter().skip(start_index).for_each(|tracked_state| {
let id = tracked_state.id();
let element = RestartedElement::Child {
id,
parent_id: parent_id.clone(),
};
objects.push(element)
});
let (rest_index, _) = self.launched.get(&parent_id).unwrap();
for index in *rest_index + 1..self.order.len() {
let element_id = &self.order[index];
match self.tracked_groups.get(element_id) {
Some(childs) => {
for tracked_state in childs {
let restarted_element = RestartedElement::Child {
id: tracked_state.id(),
parent_id: element_id.clone(),
};
objects.push(restarted_element);
}
}
None => {
let restarted_element = RestartedElement::Supervisor(id.clone());
objects.push(restarted_element);
}
}
}
}
ActorSearchMethod::All => {
for id in self.order.iter() {
match self.tracked_groups.get(&id) {
Some(childs) => {
for tracked_state in childs {
let restarted_element = RestartedElement::Child {
id: tracked_state.id(),
parent_id: id.clone(),
};
objects.push(restarted_element);
}
}
None => {
let restarted_element = RestartedElement::Supervisor(id.clone());
objects.push(restarted_element);
}
}
}
}
}
objects
}
async fn restart_subtree(&mut self) {
if self.subtree_restarts < self.subtree_restarts_limit {
self.subtree_restarts += 1;
let restarted_objects = self.search_restarted_objects(ActorSearchMethod::All);
self.restart(restarted_objects).await;
}
}
async fn deinit_with_stop(&mut self) {
self.stop(0..self.order.len()).await;
self.stopped();
}
async fn deinit_with_kill(&mut self) {
self.kill(0..self.order.len()).await;
self.stopped();
}
async fn deploy_supervised_object(&mut self, deployment: Box<Deployment>) {
let supervised = match *deployment {
Deployment::Supervisor(supervisor) => {
debug!(
"Supervisor({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
supervisor.callbacks().before_start();
Supervised::supervisor(supervisor)
}
Deployment::Children(children) => {
debug!(
"Supervisor({}): Deploying Children({}).",
self.id(),
children.id()
);
children.callbacks().before_start();
Supervised::children(children)
}
};
self.bcast.register(supervised.bcast());
if self.started {
let msg = BastionMessage::start();
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_child(supervised.id(), env);
}
debug!(
"Supervisor({}): Launching Supervised({}).",
self.id(),
supervised.id()
);
let id = supervised.id().clone();
let launched = supervised.launch();
self.launched
.insert(id.clone(), (self.order.len(), launched));
self.order.push(id);
}
async fn cleanup_supervised_object(&mut self, id: BastionId) {
if let Some((_, launched)) = self.launched.remove(&id) {
debug!("Supervisor({}): Supervised({}) stopped.", self.id(), id);
let supervised = launched.await.unwrap();
supervised.callbacks().after_stop();
self.bcast.unregister(&id);
self.stopped.insert(id.clone(), supervised);
}
}
async fn recover_supervised_object(
&mut self,
id: BastionId,
parent_id: BastionId,
) -> Result<(), ()> {
if self.launched.contains_key(&id) {
warn!("Supervisor({}): Supervised({}) faulted.", self.id(), id);
}
if self.recover(id, parent_id).await.is_err() {
self.kill(0..self.order.len()).await;
self.faulted();
return Err(());
}
Ok(())
}
async fn handle(&mut self, env: Envelope) -> Result<(), ()> {
match env {
Envelope {
msg: BastionMessage::Start,
..
} => unreachable!(),
Envelope {
msg: BastionMessage::Stop,
..
} => {
self.deinit_with_stop().await;
return Err(());
}
Envelope {
msg: BastionMessage::Kill,
..
} => {
self.deinit_with_kill().await;
return Err(());
}
Envelope {
msg: BastionMessage::Deploy(deployment),
..
} => self.deploy_supervised_object(deployment).await,
Envelope {
msg: BastionMessage::Prune { .. },
..
} => unimplemented!(),
Envelope {
msg: BastionMessage::SuperviseWith(strategy),
..
} => {
debug!(
"Supervisor({}): Setting strategy: {:?}",
self.id(),
strategy
);
self.strategy = strategy;
}
Envelope {
msg: BastionMessage::ApplyCallback { .. },
..
} => unreachable!(),
Envelope {
msg:
BastionMessage::InstantiatedChild {
parent_id,
child_id,
state,
},
..
} => {
let child_state = TrackedChildState::new(child_id.clone(), state);
match self.tracked_groups.get_mut(&parent_id) {
Some(childs) => {
childs.push(child_state);
self.tracked_groups_order.insert(child_id, childs.len() - 1);
}
None => {
self.tracked_groups.insert(parent_id, vec![child_state]);
self.tracked_groups_order.insert(child_id, 0);
}
}
}
Envelope {
msg: BastionMessage::Message(ref message),
..
} => {
debug!(
"Supervisor({}): Broadcasting a message: {:?}",
self.id(),
message
);
self.bcast.send_children(env);
}
Envelope {
msg: BastionMessage::RestartRequired { id, parent_id },
..
} => {
if self.recover_supervised_object(id, parent_id).await.is_err() {
return Err(());
}
}
Envelope {
msg: BastionMessage::FinishedChild { id, parent_id },
..
} => self.remove_child(&id, &parent_id),
Envelope {
msg: BastionMessage::RestartSubtree,
..
} => self.restart_subtree().await,
Envelope {
msg: BastionMessage::RestoreChild { .. },
..
} => unreachable!(),
Envelope {
msg: BastionMessage::DropChild { .. },
..
} => unreachable!(),
Envelope {
msg: BastionMessage::SetState { .. },
..
} => unreachable!(),
Envelope {
msg: BastionMessage::Stopped { id },
..
} => self.cleanup_supervised_object(id).await,
Envelope {
msg: BastionMessage::Faulted { id },
..
} => self.cleanup_supervised_object(id).await,
Envelope {
msg: BastionMessage::Heartbeat,
..
} => unreachable!(),
}
Ok(())
}
async fn initialize(&mut self) -> Result<(), ()> {
trace!(
"Supervisor({}): Received a new message (started=false): {:?}",
self.id(),
BastionMessage::Start
);
debug!("Supervisor({}): Starting.", self.id());
self.started = true;
let msg = BastionMessage::start();
let env = Envelope::new(msg, self.bcast.path().clone(), self.bcast.sender().clone());
self.bcast.send_children(env);
let msgs = self.pre_start_msgs.drain(..).collect::<Vec<_>>();
self.pre_start_msgs.shrink_to_fit();
debug!(
"Supervisor({}): Replaying messages received before starting.",
self.id()
);
for msg in msgs {
trace!("Supervisor({}): Replaying message: {:?}", self.id(), msg);
if self.handle(msg).await.is_err() {
return Err(());
}
}
Ok(())
}
async fn run(mut self) -> Self {
debug!("Supervisor({}): Launched.", self.id());
loop {
match poll!(&mut self.bcast.next()) {
Poll::Ready(Some(Envelope {
msg: BastionMessage::Start,
..
})) => {
if self.initialize().await.is_err() {
return self;
}
}
Poll::Ready(Some(msg)) if !self.started => {
trace!(
"Supervisor({}): Received a new message (started=false): {:?}",
self.id(),
msg
);
self.pre_start_msgs.push(msg);
}
Poll::Ready(Some(msg)) => {
trace!(
"Supervisor({}): Received a new message (started=true): {:?}",
self.id(),
msg
);
if self.handle(msg).await.is_err() {
return self;
}
}
Poll::Ready(None) => unreachable!(),
Poll::Pending => pending!(),
}
}
}
pub(crate) fn launch(self) -> RecoverableHandle<Self> {
debug!("Supervisor({}): Launching.", self.id());
let stack = self.stack();
pool::spawn(self.run(), stack)
}
}
impl SupervisorRef {
pub(crate) fn new(id: BastionId, sender: Sender, path: Arc<BastionPath>) -> Self {
SupervisorRef { id, sender, path }
}
pub fn id(&self) -> &BastionId {
&self.id
}
pub fn supervisor<S>(&self, init: S) -> Result<Self, ()>
where
S: FnOnce(Supervisor) -> Supervisor,
{
debug!("SupervisorRef({}): Creating supervisor.", self.id());
let parent = Parent::supervisor(self.clone());
let bcast = Broadcast::new(parent, BastionPathElement::Supervisor(BastionId::new()));
debug!(
"SupervisorRef({}): Initializing Supervisor({}).",
self.id(),
bcast.id()
);
let supervisor = Supervisor::new(bcast);
let supervisor = init(supervisor);
let supervisor_ref = supervisor.as_ref();
debug!("Supervisor({}): Initialized.", supervisor.id());
debug!(
"SupervisorRef({}): Deploying Supervisor({}).",
self.id(),
supervisor.id()
);
let msg = BastionMessage::deploy_supervisor(supervisor);
let env = Envelope::new(msg, self.path.clone(), self.sender.clone());
self.send(env).map_err(|_| ())?;
Ok(supervisor_ref)
}
pub fn children<C>(&self, init: C) -> Result<ChildrenRef, ()>
where
C: FnOnce(Children) -> Children,
{
self.children_with_id(BastionId::new(), init)
}
pub(crate) fn children_with_id<C>(&self, id: BastionId, init: C) -> Result<ChildrenRef, ()>
where
C: FnOnce(Children) -> Children,
{
debug!("SupervisorRef({}): Creating children group.", self.id());
let parent = Parent::supervisor(self.clone());
let bcast = Broadcast::new(parent, BastionPathElement::Children(id));
debug!(
"SupervisorRef({}): Initializing Children({}).",
self.id(),
bcast.id()
);
let children = Children::new(bcast);
let mut children = init(children);
debug!("Children({}): Initialized.", children.id());
children.launch_elems();
let children_ref = children.as_ref();
debug!(
"SupervisorRef({}): Deplying Children({}).",
self.id(),
children.id()
);
let msg = BastionMessage::deploy_children(children);
let env = Envelope::new(msg, self.path.clone(), self.sender.clone());
self.send(env).map_err(|_| ())?;
Ok(children_ref)
}
pub fn strategy(&self, strategy: SupervisionStrategy) -> Result<(), ()> {
debug!(
"SupervisorRef({}): Setting strategy: {:?}",
self.id(),
strategy
);
let msg = BastionMessage::supervise_with(strategy);
let env = Envelope::from_dead_letters(msg);
self.send(env).map_err(|_| ())
}
pub fn broadcast<M: Message>(&self, msg: M) -> Result<(), M> {
debug!(
"SupervisorRef({}): Broadcasting message: {:?}",
self.id(),
msg
);
let msg = BastionMessage::broadcast(msg);
let env = Envelope::from_dead_letters(msg);
self.send(env).map_err(|env| env.into_msg().unwrap())
}
pub fn stop(&self) -> Result<(), ()> {
debug!("SupervisorRef({}): Stopping.", self.id());
let msg = BastionMessage::stop();
let env = Envelope::from_dead_letters(msg);
self.send(env).map_err(|_| ())
}
pub fn kill(&self) -> Result<(), ()> {
debug!("SupervisorRef({}): Killing.", self.id());
let msg = BastionMessage::kill();
let env = Envelope::from_dead_letters(msg);
self.send(env).map_err(|_| ())
}
pub(crate) fn send(&self, env: Envelope) -> Result<(), Envelope> {
trace!("SupervisorRef({}): Sending message: {:?}", self.id(), env);
self.sender
.unbounded_send(env)
.map_err(|err| err.into_inner())
}
pub(crate) fn path(&self) -> &Arc<BastionPath> {
&self.path
}
}
impl TrackedChildState {
fn new(id: BastionId, state: Arc<Mutex<Pin<Box<ContextState>>>>) -> Self {
TrackedChildState {
id,
state,
restarts_counts: 0,
}
}
fn id(&self) -> BastionId {
self.id.clone()
}
fn state(&self) -> Arc<Mutex<Pin<Box<ContextState>>>> {
self.state.clone()
}
fn restarts_count(&self) -> usize {
self.restarts_counts
}
fn increase_restarts_counter(&mut self) {
self.restarts_counts += 1;
}
}
impl Supervised {
fn supervisor(supervisor: Supervisor) -> Self {
Supervised::Supervisor(supervisor)
}
fn children(children: Children) -> Self {
Supervised::Children(children)
}
fn stack(&self) -> ProcStack {
trace!("Supervised({}): Creating ProcStack.", self.id());
ProcStack::default()
}
fn id(&self) -> &BastionId {
match self {
Supervised::Supervisor(supervisor) => supervisor.id(),
Supervised::Children(children) => children.id(),
}
}
fn bcast(&self) -> &Broadcast {
match self {
Supervised::Supervisor(supervisor) => supervisor.bcast(),
Supervised::Children(children) => children.bcast(),
}
}
fn callbacks(&self) -> &Callbacks {
match self {
Supervised::Supervisor(supervisor) => supervisor.callbacks(),
Supervised::Children(children) => children.callbacks(),
}
}
fn launch(self) -> RecoverableHandle<Self> {
debug!("Supervised({}): Launching.", self.id());
let stack = self.stack();
match self {
Supervised::Supervisor(supervisor) => {
pool::spawn(
async {
let supervisor = supervisor.launch().await.unwrap();
Supervised::Supervisor(supervisor)
},
stack,
)
}
Supervised::Children(children) => {
pool::spawn(
async {
let children = children.launch().await.unwrap();
Supervised::Children(children)
},
stack,
)
}
}
}
}
impl RestartStrategy {
pub fn new(restart_policy: RestartPolicy, strategy: ActorRestartStrategy) -> Self {
RestartStrategy {
restart_policy,
strategy,
}
}
pub fn restart_policy(&self) -> RestartPolicy {
self.restart_policy.clone()
}
pub fn strategy(&self) -> ActorRestartStrategy {
self.strategy.clone()
}
pub fn with_restart_policy(mut self, restart_policy: RestartPolicy) -> Self {
self.restart_policy = restart_policy;
self
}
pub fn with_actor_restart_strategy(mut self, strategy: ActorRestartStrategy) -> Self {
self.strategy = strategy;
self
}
pub(crate) async fn apply_strategy(&self, restarts_count: usize) {
match self.strategy {
ActorRestartStrategy::LinearBackOff { timeout } => {
let start_in = timeout.as_secs() + (timeout.as_secs() * restarts_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
ActorRestartStrategy::ExponentialBackOff {
timeout,
multiplier,
} => {
let start_in =
timeout.as_secs() + (timeout.as_secs() * multiplier * restarts_count as u64);
Delay::new(Duration::from_secs(start_in)).await;
}
_ => {}
};
}
}
impl Default for SupervisionStrategy {
fn default() -> Self {
SupervisionStrategy::OneForOne
}
}
impl Default for RestartStrategy {
fn default() -> Self {
RestartStrategy {
restart_policy: RestartPolicy::Always,
strategy: ActorRestartStrategy::default(),
}
}
}
impl Default for ActorRestartStrategy {
fn default() -> Self {
ActorRestartStrategy::Immediate
}
}
impl PartialEq for SupervisorRef {
fn eq(&self, other: &Self) -> bool {
self.id == other.id
}
}
impl Eq for SupervisorRef {}