#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]
use crate::sync::notify::Notify;
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::Relaxed;
use crate::loom::sync::{Arc, RwLock, RwLockReadGuard};
use std::mem;
use std::ops;
#[derive(Debug)]
pub struct Receiver<T> {
shared: Arc<Shared<T>>,
version: Version,
}
#[derive(Debug)]
pub struct Sender<T> {
shared: Arc<Shared<T>>,
}
#[derive(Debug)]
pub struct Ref<'a, T> {
inner: RwLockReadGuard<'a, T>,
}
#[derive(Debug)]
struct Shared<T> {
value: RwLock<T>,
state: AtomicState,
ref_count_rx: AtomicUsize,
notify_rx: Notify,
notify_tx: Notify,
}
pub mod error {
use std::fmt;
#[derive(Debug)]
pub struct SendError<T>(pub T);
impl<T: fmt::Debug> fmt::Display for SendError<T> {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl<T: fmt::Debug> std::error::Error for SendError<T> {}
#[derive(Debug)]
pub struct RecvError(pub(super) ());
impl fmt::Display for RecvError {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(fmt, "channel closed")
}
}
impl std::error::Error for RecvError {}
}
use self::state::{AtomicState, Version};
mod state {
use crate::loom::sync::atomic::AtomicUsize;
use crate::loom::sync::atomic::Ordering::SeqCst;
const CLOSED: usize = 1;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub(super) struct Version(usize);
#[derive(Copy, Clone, Debug)]
pub(super) struct StateSnapshot(usize);
#[derive(Debug)]
pub(super) struct AtomicState(AtomicUsize);
impl Version {
pub(super) fn initial() -> Self {
Version(0)
}
}
impl StateSnapshot {
pub(super) fn version(self) -> Version {
Version(self.0 & !CLOSED)
}
pub(super) fn is_closed(self) -> bool {
(self.0 & CLOSED) == CLOSED
}
}
impl AtomicState {
pub(super) fn new() -> Self {
AtomicState(AtomicUsize::new(0))
}
pub(super) fn load(&self) -> StateSnapshot {
StateSnapshot(self.0.load(SeqCst))
}
pub(super) fn increment_version(&self) {
self.0.fetch_add(2, SeqCst);
}
pub(super) fn set_closed(&self) {
self.0.fetch_or(CLOSED, SeqCst);
}
}
}
pub fn channel<T>(init: T) -> (Sender<T>, Receiver<T>) {
let shared = Arc::new(Shared {
value: RwLock::new(init),
state: AtomicState::new(),
ref_count_rx: AtomicUsize::new(1),
notify_rx: Notify::new(),
notify_tx: Notify::new(),
});
let tx = Sender {
shared: shared.clone(),
};
let rx = Receiver {
shared,
version: Version::initial(),
};
(tx, rx)
}
impl<T> Receiver<T> {
fn from_shared(version: Version, shared: Arc<Shared<T>>) -> Self {
shared.ref_count_rx.fetch_add(1, Relaxed);
Self { shared, version }
}
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
}
pub fn borrow_and_update(&mut self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
self.version = self.shared.state.load().version();
Ref { inner }
}
pub async fn changed(&mut self) -> Result<(), error::RecvError> {
loop {
let notified = self.shared.notify_rx.notified();
if let Some(ret) = maybe_changed(&self.shared, &mut self.version) {
return ret;
}
notified.await;
}
}
cfg_process_driver! {
pub(crate) fn try_has_changed(&mut self) -> Option<Result<(), error::RecvError>> {
maybe_changed(&self.shared, &mut self.version)
}
}
}
fn maybe_changed<T>(
shared: &Shared<T>,
version: &mut Version,
) -> Option<Result<(), error::RecvError>> {
let state = shared.state.load();
let new_version = state.version();
if *version != new_version {
*version = new_version;
return Some(Ok(()));
}
if state.is_closed() {
return Some(Err(error::RecvError(())));
}
None
}
impl<T> Clone for Receiver<T> {
fn clone(&self) -> Self {
let version = self.version;
let shared = self.shared.clone();
Self::from_shared(version, shared)
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if 1 == self.shared.ref_count_rx.fetch_sub(1, Relaxed) {
self.shared.notify_tx.notify_waiters();
}
}
}
impl<T> Sender<T> {
pub fn send(&self, value: T) -> Result<(), error::SendError<T>> {
if 0 == self.receiver_count() {
return Err(error::SendError(value));
}
self.send_replace(value);
Ok(())
}
pub fn send_replace(&self, value: T) -> T {
let old = {
let mut lock = self.shared.value.write().unwrap();
let old = mem::replace(&mut *lock, value);
self.shared.state.increment_version();
drop(lock);
old
};
self.shared.notify_rx.notify_waiters();
old
}
pub fn borrow(&self) -> Ref<'_, T> {
let inner = self.shared.value.read().unwrap();
Ref { inner }
}
pub fn is_closed(&self) -> bool {
self.receiver_count() == 0
}
pub async fn closed(&self) {
while self.receiver_count() > 0 {
let notified = self.shared.notify_tx.notified();
if self.receiver_count() == 0 {
return;
}
notified.await;
}
}
pub fn subscribe(&self) -> Receiver<T> {
let shared = self.shared.clone();
let version = shared.state.load().version();
Receiver::from_shared(version, shared)
}
pub fn receiver_count(&self) -> usize {
self.shared.ref_count_rx.load(Relaxed)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.shared.state.set_closed();
self.shared.notify_rx.notify_waiters();
}
}
impl<T> ops::Deref for Ref<'_, T> {
type Target = T;
fn deref(&self) -> &T {
self.inner.deref()
}
}
#[cfg(all(test, loom))]
mod tests {
use futures::future::FutureExt;
use loom::thread;
#[test]
fn watch_spurious_wakeup() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
send.send(1).unwrap();
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
send
});
recv.changed().now_or_never();
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
recv.changed().now_or_never();
recv.changed().now_or_never();
recv
});
send.send(3).unwrap();
let mut recv = recv_thread.join().unwrap();
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
});
recv.changed().now_or_never();
send_thread.join().unwrap();
});
}
#[test]
fn watch_borrow() {
loom::model(|| {
let (send, mut recv) = crate::sync::watch::channel(0i32);
assert!(send.borrow().eq(&0));
assert!(recv.borrow().eq(&0));
send.send(1).unwrap();
assert!(send.borrow().eq(&1));
let send_thread = thread::spawn(move || {
send.send(2).unwrap();
send
});
recv.changed().now_or_never();
let send = send_thread.join().unwrap();
let recv_thread = thread::spawn(move || {
recv.changed().now_or_never();
recv.changed().now_or_never();
recv
});
send.send(3).unwrap();
let recv = recv_thread.join().unwrap();
assert!(recv.borrow().eq(&3));
assert!(send.borrow().eq(&3));
send.send(2).unwrap();
thread::spawn(move || {
assert!(recv.borrow().eq(&2));
});
assert!(send.borrow().eq(&2));
});
}
}