Crate diatomic_waker
source · [−]Expand description
Async, fast synchronization primitives for task wakeup.
diatomic-waker is similar to Atomic Waker in that it
enables concurrent updates and notifications to a wrapped Waker. Unlike
the latter, however, it does not use spinlocks1 and is faster, in
particular when the consumer is notified periodically rather than just once.
It can in particular be used as a very fast, single-consumer eventcount to
turn a non-blocking data structure into an asynchronous one (see MPSC
channel receiver example).
The API distinguishes between the entity that registers wakers
(WakeSink) and the possibly many entities that notify the waker
(WakeSource).
Note that WakeSink and WakeSource readily store a shared
DiatomicWaker within an
Arc. You may instead elect to allocate a DiatomicWaker
yourself, but will then need to ensure by other means that waker
registration cannot be performed concurrently.
Examples
A multi-producer, single-consumer channel of capacity 1 for sending
NonZeroUsize values, with an asynchronous receiver:
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use diatomic_waker::{WakeSink, WakeSource};
// The sending side of the channel.
#[derive(Clone)]
struct Sender {
wake_src: WakeSource,
value: Arc<AtomicUsize>,
}
// The receiving side of the channel.
struct Receiver {
wake_sink: WakeSink,
value: Arc<AtomicUsize>,
}
// Creates an empty channel.
fn channel() -> (Sender, Receiver) {
let value = Arc::new(AtomicUsize::new(0));
let wake_sink = WakeSink::new();
let wake_src = wake_sink.source();
(
Sender {
wake_src,
value: value.clone(),
},
Receiver { wake_sink, value },
)
}
impl Sender {
// Sends a value if the channel is empty.
fn try_send(&self, value: NonZeroUsize) -> bool {
let success = self
.value
.compare_exchange(0, value.get(), Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
if success {
self.wake_src.notify()
};
success
}
}
impl Receiver {
// Receives a value asynchronously.
async fn recv(&mut self) -> NonZeroUsize {
// Wait until the predicate returns `Some(value)`, i.e. when the atomic
// value becomes non-zero.
self.wake_sink
.wait_until(|| NonZeroUsize::new(self.value.swap(0, Ordering::Relaxed)))
.await
}
}In some case, it may be necessary to use the lower-level
register and unregister
methods rather than the wait_until convenience
method. This is how the behavior of the above recv method could be
reproduced with a hand-coded future:
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
struct Recv<'a> {
receiver: &'a mut Receiver,
}
impl Future for Recv<'_> {
type Output = NonZeroUsize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<NonZeroUsize> {
// Avoid waker registration if a value is readily available.
let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
if let Some(value) = value {
return Poll::Ready(value);
}
// Register the waker to be polled again once a value is available.
self.receiver.wake_sink.register(cx.waker());
// Check again after registering the waker to prevent a race condition.
let value = NonZeroUsize::new(self.receiver.value.swap(0, Ordering::Relaxed));
if let Some(value) = value {
// Avoid a spurious wake-up.
self.receiver.wake_sink.unregister();
return Poll::Ready(value);
}
Poll::Pending
}
}The implementation of AtomicWaker yields to the runtime on contention, which is in effect an executor-mediated spinlock. ↩
Modules
Structs
WakeSources.WakeSink.