cfg_signal_internal_and_unix! {
mod signal;
}
cfg_io_uring! {
mod uring;
use uring::UringContext;
use crate::loom::sync::atomic::AtomicUsize;
}
use crate::io::interest::Interest;
use crate::io::ready::Ready;
use crate::loom::sync::Mutex;
use crate::runtime::driver;
use crate::runtime::io::registration_set;
use crate::runtime::io::{IoDriverMetrics, RegistrationSet, ScheduledIo};
use mio::event::Source;
use std::fmt;
use std::io;
use std::sync::Arc;
use std::time::Duration;
pub(crate) struct Driver {
signal_ready: bool,
events: mio::Events,
poll: mio::Poll,
}
pub(crate) struct Handle {
registry: mio::Registry,
registrations: RegistrationSet,
synced: Mutex<registration_set::Synced>,
#[cfg(not(target_os = "wasi"))]
waker: mio::Waker,
pub(crate) metrics: IoDriverMetrics,
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
pub(crate) uring_context: Mutex<UringContext>,
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
pub(crate) uring_state: AtomicUsize,
}
#[derive(Debug)]
pub(crate) struct ReadyEvent {
pub(super) tick: u8,
pub(crate) ready: Ready,
pub(super) is_shutdown: bool,
}
cfg_net_unix!(
impl ReadyEvent {
pub(crate) fn with_ready(&self, ready: Ready) -> Self {
Self {
ready,
tick: self.tick,
is_shutdown: self.is_shutdown,
}
}
}
);
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
pub(super) enum Direction {
Read,
Write,
}
pub(super) enum Tick {
Set,
Clear(u8),
}
const TOKEN_WAKEUP: mio::Token = mio::Token(0);
const TOKEN_SIGNAL: mio::Token = mio::Token(1);
fn _assert_kinds() {
fn _assert<T: Send + Sync>() {}
_assert::<Handle>();
}
impl Driver {
pub(crate) fn new(nevents: usize) -> io::Result<(Driver, Handle)> {
let poll = mio::Poll::new()?;
#[cfg(not(target_os = "wasi"))]
let waker = mio::Waker::new(poll.registry(), TOKEN_WAKEUP)?;
let registry = poll.registry().try_clone()?;
let driver = Driver {
signal_ready: false,
events: mio::Events::with_capacity(nevents),
poll,
};
let (registrations, synced) = RegistrationSet::new();
let handle = Handle {
registry,
registrations,
synced: Mutex::new(synced),
#[cfg(not(target_os = "wasi"))]
waker,
metrics: IoDriverMetrics::default(),
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
uring_context: Mutex::new(UringContext::new()),
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
uring_state: AtomicUsize::new(0),
};
Ok((driver, handle))
}
pub(crate) fn park(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
self.turn(handle, None);
}
pub(crate) fn park_timeout(&mut self, rt_handle: &driver::Handle, duration: Duration) {
let handle = rt_handle.io();
self.turn(handle, Some(duration));
}
pub(crate) fn shutdown(&mut self, rt_handle: &driver::Handle) {
let handle = rt_handle.io();
let ios = handle.registrations.shutdown(&mut handle.synced.lock());
for io in ios {
io.shutdown();
}
}
fn turn(&mut self, handle: &Handle, max_wait: Option<Duration>) {
debug_assert!(!handle.registrations.is_shutdown(&handle.synced.lock()));
handle.release_pending_registrations();
let events = &mut self.events;
match self.poll.poll(events, max_wait) {
Ok(()) => {}
Err(ref e) if e.kind() == io::ErrorKind::Interrupted => {}
#[cfg(target_os = "wasi")]
Err(e) if e.kind() == io::ErrorKind::InvalidInput => {
}
Err(e) => panic!("unexpected error when polling the I/O driver: {e:?}"),
}
let mut ready_count = 0;
for event in events.iter() {
let token = event.token();
if token == TOKEN_WAKEUP {
} else if token == TOKEN_SIGNAL {
self.signal_ready = true;
} else {
let ready = Ready::from_mio(event);
let ptr = super::EXPOSE_IO.from_exposed_addr(token.0);
let io: &ScheduledIo = unsafe { &*ptr };
io.set_readiness(Tick::Set, |curr| curr | ready);
io.wake(ready);
ready_count += 1;
}
}
#[cfg(all(
tokio_unstable,
feature = "io-uring",
feature = "rt",
feature = "fs",
target_os = "linux",
))]
{
let mut guard = handle.get_uring().lock();
let ctx = &mut *guard;
ctx.dispatch_completions();
}
handle.metrics.incr_ready_count_by(ready_count);
}
}
impl fmt::Debug for Driver {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Driver")
}
}
impl Handle {
pub(crate) fn unpark(&self) {
#[cfg(not(target_os = "wasi"))]
self.waker.wake().expect("failed to wake I/O driver");
}
pub(super) fn add_source(
&self,
source: &mut impl mio::event::Source,
interest: Interest,
) -> io::Result<Arc<ScheduledIo>> {
let scheduled_io = self.registrations.allocate(&mut self.synced.lock())?;
let token = scheduled_io.token();
if let Err(e) = self.registry.register(source, token, interest.to_mio()) {
unsafe {
self.registrations
.remove(&mut self.synced.lock(), &scheduled_io)
};
return Err(e);
}
self.metrics.incr_fd_count();
Ok(scheduled_io)
}
pub(super) fn deregister_source(
&self,
registration: &Arc<ScheduledIo>,
source: &mut impl Source,
) -> io::Result<()> {
self.registry.deregister(source)?;
if self
.registrations
.deregister(&mut self.synced.lock(), registration)
{
self.unpark();
}
self.metrics.dec_fd_count();
Ok(())
}
fn release_pending_registrations(&self) {
if self.registrations.needs_release() {
self.registrations.release(&mut self.synced.lock());
}
}
}
impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Handle")
}
}
impl Direction {
pub(super) fn mask(self) -> Ready {
match self {
Direction::Read => Ready::READABLE | Ready::READ_CLOSED,
Direction::Write => Ready::WRITABLE | Ready::WRITE_CLOSED,
}
}
}