use std::time::Duration;
use crate::{AppChannelError, AppEventSender, update::UpdatesTrace};
use super::*;
pub(crate) struct EventUpdateMsg {
args: Box<dyn FnOnce() -> EventUpdate + Send>,
}
impl EventUpdateMsg {
pub(crate) fn get(self) -> EventUpdate {
(self.args)()
}
}
impl fmt::Debug for EventUpdateMsg {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("EventUpdateMsg").finish_non_exhaustive()
}
}
pub struct EventSender<A>
where
A: EventArgs + Send,
{
pub(super) sender: AppEventSender,
pub(super) event: Event<A>,
}
impl<A> Clone for EventSender<A>
where
A: EventArgs + Send,
{
fn clone(&self) -> Self {
EventSender {
sender: self.sender.clone(),
event: self.event,
}
}
}
impl<A> fmt::Debug for EventSender<A>
where
A: EventArgs + Send,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "EventSender({:?})", &self.event)
}
}
impl<A> EventSender<A>
where
A: EventArgs + Send,
{
pub fn send(&self, args: A) -> Result<(), AppChannelError> {
UpdatesTrace::log_event(self.event.as_any());
let event = self.event;
let msg = EventUpdateMsg {
args: Box::new(move || event.new_update(args)),
};
self.sender.send_event(msg).map_err(|_| AppChannelError::Disconnected)
}
pub fn event(&self) -> Event<A> {
self.event
}
}
#[must_use = "stops receiving on drop"]
pub struct EventReceiver<A>
where
A: EventArgs + Send,
{
pub(super) event: Event<A>,
pub(super) receiver: flume::Receiver<A>,
}
impl<A> Clone for EventReceiver<A>
where
A: EventArgs + Send,
{
fn clone(&self) -> Self {
EventReceiver {
event: self.event,
receiver: self.receiver.clone(),
}
}
}
impl<A> fmt::Debug for EventReceiver<A>
where
A: EventArgs + Send,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "EventSender({:?})", &self.event)
}
}
impl<A> EventReceiver<A>
where
A: EventArgs + Send,
{
pub fn recv(&self) -> Result<A, AppChannelError> {
self.receiver.recv().map_err(|_| AppChannelError::Disconnected)
}
pub fn try_recv(&self) -> Result<Option<A>, AppChannelError> {
match self.receiver.try_recv() {
Ok(a) => Ok(Some(a)),
Err(e) => match e {
flume::TryRecvError::Empty => Ok(None),
flume::TryRecvError::Disconnected => Err(AppChannelError::Disconnected),
},
}
}
pub fn recv_deadline(&self, deadline: Instant) -> Result<A, AppChannelError> {
self.receiver.recv_deadline(deadline).map_err(AppChannelError::from)
}
pub fn recv_timeout(&self, dur: Duration) -> Result<A, AppChannelError> {
self.receiver.recv_timeout(dur).map_err(AppChannelError::from)
}
pub async fn recv_async(&self) -> Result<A, AppChannelError> {
RecvFut::from(self.receiver.recv_async()).await
}
pub fn into_recv_async(self) -> impl Future<Output = Result<A, AppChannelError>> + Send + Sync + 'static {
RecvFut::from(self.receiver.into_recv_async())
}
pub fn iter(&self) -> flume::Iter<'_, A> {
self.receiver.iter()
}
pub fn try_iter(&self) -> flume::TryIter<'_, A> {
self.receiver.try_iter()
}
pub fn event(&self) -> Event<A> {
self.event
}
}
impl<A> From<EventReceiver<A>> for flume::Receiver<A>
where
A: EventArgs + Send,
{
fn from(e: EventReceiver<A>) -> Self {
e.receiver
}
}
impl<'a, A> IntoIterator for &'a EventReceiver<A>
where
A: EventArgs + Send,
{
type Item = A;
type IntoIter = flume::Iter<'a, A>;
fn into_iter(self) -> Self::IntoIter {
self.receiver.iter()
}
}
impl<A> IntoIterator for EventReceiver<A>
where
A: EventArgs + Send,
{
type Item = A;
type IntoIter = flume::IntoIter<A>;
fn into_iter(self) -> Self::IntoIter {
self.receiver.into_iter()
}
}
struct RecvFut<'a, M>(flume::r#async::RecvFut<'a, M>);
impl<'a, M> From<flume::r#async::RecvFut<'a, M>> for RecvFut<'a, M> {
fn from(f: flume::r#async::RecvFut<'a, M>) -> Self {
Self(f)
}
}
impl<M> Future for RecvFut<'_, M> {
type Output = Result<M, AppChannelError>;
fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Self::Output> {
match std::pin::Pin::new(&mut self.0).poll(cx) {
std::task::Poll::Ready(r) => std::task::Poll::Ready(r.map_err(|_| AppChannelError::Disconnected)),
std::task::Poll::Pending => std::task::Poll::Pending,
}
}
}