pub(crate) mod platform;
mod scheduled_io;
pub(crate) use scheduled_io::ScheduledIo;
use crate::loom::sync::atomic::AtomicUsize;
use crate::park::{Park, Unpark};
use crate::runtime::context;
use crate::util::slab::{Address, Slab};
use mio::event::Evented;
use std::fmt;
use std::io;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;
pub(crate) struct Driver {
events: mio::Events,
inner: Arc<Inner>,
_wakeup_registration: mio::Registration,
}
#[derive(Clone)]
pub(crate) struct Handle {
inner: Weak<Inner>,
}
pub(super) struct Inner {
io: mio::Poll,
pub(super) io_dispatch: Slab<ScheduledIo>,
n_sources: AtomicUsize,
wakeup: mio::SetReadiness,
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Write,
}
const TOKEN_WAKEUP: mio::Token = mio::Token(Address::NULL);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
_assert::<Handle>();
}
impl Driver {
pub(crate) fn new() -> io::Result<Driver> {
let io = mio::Poll::new()?;
let wakeup_pair = mio::Registration::new2();
io.register(
&wakeup_pair.0,
TOKEN_WAKEUP,
mio::Ready::readable(),
mio::PollOpt::level(),
)?;
Ok(Driver {
events: mio::Events::with_capacity(1024),
_wakeup_registration: wakeup_pair.0,
inner: Arc::new(Inner {
io,
io_dispatch: Slab::new(),
n_sources: AtomicUsize::new(0),
wakeup: wakeup_pair.1,
}),
})
}
pub(crate) fn handle(&self) -> Handle {
Handle {
inner: Arc::downgrade(&self.inner),
}
}
fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
match self.inner.io.poll(&mut self.events, max_wait) {
Ok(_) => {}
Err(e) => return Err(e),
}
for event in self.events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
self.inner
.wakeup
.set_readiness(mio::Ready::empty())
.unwrap();
} else {
self.dispatch(token, event.readiness());
}
}
Ok(())
}
fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
let mut rd = None;
let mut wr = None;
let address = Address::from_usize(token.0);
let io = match self.inner.io_dispatch.get(address) {
Some(io) => io,
None => return,
};
if io
.set_readiness(address, |curr| curr | ready.as_usize())
.is_err()
{
return;
}
if ready.is_writable() || platform::is_hup(ready) || platform::is_error(ready) {
wr = io.writer.take_waker();
}
if !(ready & (!mio::Ready::writable())).is_empty() {
rd = io.reader.take_waker();
}
if let Some(w) = rd {
w.wake();
}
if let Some(w) = wr {
w.wake();
}
}
}
impl Park for Driver {
type Unpark = Handle;
type Error = io::Error;
fn unpark(&self) -> Self::Unpark {
self.handle()
}
fn park(&mut self) -> io::Result<()> {
self.turn(None)?;
Ok(())
}
fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
self.turn(Some(duration))?;
Ok(())
}
}
impl fmt::Debug for Driver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Driver")
}
}
impl Handle {
pub(super) fn current() -> Self {
context::io_handle()
.expect("there is no reactor running, must be called from the context of Tokio runtime")
}
fn wakeup(&self) {
if let Some(inner) = self.inner() {
inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
}
}
pub(super) fn inner(&self) -> Option<Arc<Inner>> {
self.inner.upgrade()
}
}
impl Unpark for Handle {
fn unpark(&self) {
self.wakeup();
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
impl Inner {
pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<Address> {
let address = self.io_dispatch.alloc().ok_or_else(|| {
io::Error::new(
io::ErrorKind::Other,
"reactor at max registered I/O resources",
)
})?;
self.n_sources.fetch_add(1, SeqCst);
self.io.register(
source,
mio::Token(address.to_usize()),
mio::Ready::all(),
mio::PollOpt::edge(),
)?;
Ok(address)
}
pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
self.io.deregister(source)
}
pub(super) fn drop_source(&self, address: Address) {
self.io_dispatch.remove(address);
self.n_sources.fetch_sub(1, SeqCst);
}
pub(super) fn register(&self, token: Address, dir: Direction, w: Waker) {
let sched = self
.io_dispatch
.get(token)
.unwrap_or_else(|| panic!("IO resource for token {:?} does not exist!", token));
let waker = match dir {
Direction::Read => &sched.reader,
Direction::Write => &sched.writer,
};
waker.register(w);
}
}
impl Direction {
pub(super) fn mask(self) -> mio::Ready {
match self {
Direction::Read => {
mio::Ready::all() - mio::Ready::writable()
}
Direction::Write => mio::Ready::writable() | platform::hup() | platform::error(),
}
}
}
#[cfg(all(test, loom))]
mod tests {
use super::*;
use loom::thread;
struct NotEvented;
impl Evented for NotEvented {
fn register(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}
fn reregister(
&self,
_: &mio::Poll,
_: mio::Token,
_: mio::Ready,
_: mio::PollOpt,
) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _: &mio::Poll) -> io::Result<()> {
Ok(())
}
}
#[test]
fn tokens_unique_when_dropped() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
let token_1 = inner.add_source(&NotEvented).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});
let token_2 = inner.add_source(&NotEvented).unwrap();
thread.join().unwrap();
assert!(token_1 != token_2);
})
}
#[test]
fn tokens_unique_when_dropped_on_full_page() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
for _ in 0..31 {
inner.add_source(&NotEvented).unwrap();
}
let token_1 = inner.add_source(&NotEvented).unwrap();
let thread = thread::spawn(move || {
inner2.drop_source(token_1);
});
let token_2 = inner.add_source(&NotEvented).unwrap();
thread.join().unwrap();
assert!(token_1 != token_2);
})
}
#[test]
fn tokens_unique_concurrent_add() {
loom::model(|| {
let reactor = Driver::new().unwrap();
let inner = reactor.inner;
let inner2 = inner.clone();
let thread = thread::spawn(move || {
let token_2 = inner2.add_source(&NotEvented).unwrap();
token_2
});
let token_1 = inner.add_source(&NotEvented).unwrap();
let token_2 = thread.join().unwrap();
assert!(token_1 != token_2);
})
}
}