use std::{future::Future, pin::Pin, sync::Arc, task, task::Poll, thread};
use actix_rt::System;
use crossbeam_channel as cb_channel;
use futures_core::stream::Stream;
use log::warn;
use tokio::sync::oneshot::Sender as SyncSender;
use crate::{
actor::{Actor, ActorContext, ActorState, Running},
address::{
channel, Addr, AddressReceiver, AddressSenderProducer, Envelope, EnvelopeProxy, ToEnvelope,
},
context::Context,
handler::{Handler, Message, MessageResponse},
};
pub struct SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
queue: Option<cb_channel::Sender<Envelope<A>>>,
msgs: AddressReceiver<A>,
}
impl<A> SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
pub fn start<F>(threads: usize, factory: F) -> Addr<A>
where
F: Fn() -> A + Send + Sync + 'static,
{
Self::start_with_thread_builder(threads, thread::Builder::new, factory)
}
pub fn start_with_thread_builder<F, BF>(
threads: usize,
mut thread_builder_factory: BF,
factory: F,
) -> Addr<A>
where
F: Fn() -> A + Send + Sync + 'static,
BF: FnMut() -> thread::Builder,
{
let factory = Arc::new(factory);
let (sender, receiver) = cb_channel::unbounded();
let (tx, rx) = channel::channel(0);
for _ in 0..threads {
let f = Arc::clone(&factory);
let sys = System::current();
let actor_queue = receiver.clone();
let inner_rx = rx.sender_producer();
thread_builder_factory()
.spawn(move || {
System::set_current(sys);
SyncContext::new(f, actor_queue, inner_rx).run();
})
.expect("failed to spawn thread");
}
System::current().arbiter().spawn(Self {
queue: Some(sender),
msgs: rx,
});
Addr::new(tx)
}
}
impl<A> Actor for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Context = Context<Self>;
}
#[doc(hidden)]
impl<A> Future for SyncArbiter<A>
where
A: Actor<Context = SyncContext<A>>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
loop {
match Pin::new(&mut this.msgs).poll_next(cx) {
Poll::Ready(Some(msg)) => {
if let Some(ref queue) = this.queue {
assert!(queue.send(msg).is_ok());
}
}
Poll::Pending => break,
Poll::Ready(None) => unreachable!(),
}
}
if this.msgs.connected() {
Poll::Pending
} else {
this.queue = None;
Poll::Ready(())
}
}
}
impl<A, M> ToEnvelope<A, M> for SyncContext<A>
where
A: Actor<Context = Self> + Handler<M>,
M: Message + Send + 'static,
M::Result: Send,
{
fn pack(msg: M, tx: Option<SyncSender<M::Result>>) -> Envelope<A> {
Envelope::with_proxy(Box::new(SyncContextEnvelope::new(msg, tx)))
}
}
pub struct SyncContext<A>
where
A: Actor<Context = SyncContext<A>>,
{
act: Option<A>,
queue: cb_channel::Receiver<Envelope<A>>,
stopping: bool,
state: ActorState,
factory: Arc<dyn Fn() -> A>,
address: AddressSenderProducer<A>,
}
impl<A> SyncContext<A>
where
A: Actor<Context = Self>,
{
fn new(
factory: Arc<dyn Fn() -> A>,
queue: cb_channel::Receiver<Envelope<A>>,
address: AddressSenderProducer<A>,
) -> Self {
let act = factory();
Self {
queue,
factory,
act: Some(act),
stopping: false,
state: ActorState::Started,
address,
}
}
fn run(&mut self) {
let mut act = self.act.take().unwrap();
A::started(&mut act, self);
self.state = ActorState::Running;
loop {
match self.queue.recv() {
Ok(mut env) => {
env.handle(&mut act, self);
}
Err(_) => {
self.state = ActorState::Stopping;
if A::stopping(&mut act, self) != Running::Stop {
warn!("stopping method is not supported for sync actors");
}
self.state = ActorState::Stopped;
A::stopped(&mut act, self);
return;
}
}
if self.stopping {
self.stopping = false;
A::stopping(&mut act, self);
self.state = ActorState::Stopped;
A::stopped(&mut act, self);
self.state = ActorState::Started;
act = (*self.factory)();
A::started(&mut act, self);
self.state = ActorState::Running;
}
}
}
pub fn address(&self) -> Addr<A> {
Addr::new(self.address.sender())
}
}
impl<A> ActorContext for SyncContext<A>
where
A: Actor<Context = Self>,
{
fn stop(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn terminate(&mut self) {
self.stopping = true;
self.state = ActorState::Stopping;
}
fn state(&self) -> ActorState {
self.state
}
}
pub(crate) struct SyncContextEnvelope<M>
where
M: Message + Send,
{
msg: Option<M>,
tx: Option<SyncSender<M::Result>>,
}
impl<M> SyncContextEnvelope<M>
where
M: Message + Send,
M::Result: Send,
{
pub fn new(msg: M, tx: Option<SyncSender<M::Result>>) -> Self {
Self { tx, msg: Some(msg) }
}
}
impl<A, M> EnvelopeProxy<A> for SyncContextEnvelope<M>
where
M: Message + Send + 'static,
M::Result: Send,
A: Actor<Context = SyncContext<A>> + Handler<M>,
{
fn handle(&mut self, act: &mut A, ctx: &mut A::Context) {
let tx = self.tx.take();
if tx.is_some() && tx.as_ref().unwrap().is_closed() {
return;
}
if let Some(msg) = self.msg.take() {
<A as Handler<M>>::handle(act, msg, ctx).handle(ctx, tx)
}
}
}
#[cfg(test)]
mod tests {
use tokio::sync::oneshot;
use crate::prelude::*;
struct SyncActor2;
impl Actor for SyncActor2 {
type Context = SyncContext<Self>;
}
struct SyncActor1(Addr<SyncActor2>);
impl Actor for SyncActor1 {
type Context = SyncContext<Self>;
}
impl SyncActor1 {
fn run() -> SyncActor1 {
SyncActor1(SyncArbiter::start(1, || SyncActor2))
}
}
struct Msg(oneshot::Sender<u8>);
impl Message for Msg {
type Result = ();
}
impl Handler<Msg> for SyncActor1 {
type Result = ();
fn handle(&mut self, msg: Msg, _: &mut Self::Context) -> Self::Result {
self.0.do_send(msg);
}
}
impl Handler<Msg> for SyncActor2 {
type Result = ();
fn handle(&mut self, msg: Msg, _: &mut Self::Context) -> Self::Result {
msg.0.send(233u8).unwrap();
}
}
#[test]
fn nested_sync_arbiters() {
System::new().block_on(async {
let addr = SyncArbiter::start(1, SyncActor1::run);
let (tx, rx) = oneshot::channel();
addr.send(Msg(tx)).await.unwrap();
assert_eq!(233u8, rx.await.unwrap());
System::current().stop();
})
}
}