use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::Arc, time::Duration};
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, RawFd as Raw};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, RawSocket as Raw};
#[cfg(unix)]
use io_lifetimes::AsFd;
#[cfg(windows)]
use io_lifetimes::AsSocket;
use polling::{Event, PollMode, Poller};
use crate::loop_logic::{MAX_SOURCES, MAX_SUBSOURCES_TOTAL};
use crate::sources::timer::TimerWheel;
#[derive(Copy, Clone, Debug)]
pub enum Mode {
OneShot,
Level,
Edge,
}
#[derive(Copy, Clone, Debug)]
pub struct Interest {
pub readable: bool,
pub writable: bool,
}
impl Interest {
pub const EMPTY: Interest = Interest {
readable: false,
writable: false,
};
pub const READ: Interest = Interest {
readable: true,
writable: false,
};
pub const WRITE: Interest = Interest {
readable: false,
writable: true,
};
pub const BOTH: Interest = Interest {
readable: true,
writable: true,
};
}
#[derive(Copy, Clone, Debug)]
pub struct Readiness {
pub readable: bool,
pub writable: bool,
pub error: bool,
}
impl Readiness {
pub const EMPTY: Readiness = Readiness {
readable: false,
writable: false,
error: false,
};
}
#[derive(Debug)]
pub(crate) struct PollEvent {
pub(crate) readiness: Readiness,
pub(crate) token: Token,
}
#[derive(Debug)]
pub struct TokenFactory {
key: usize,
sub_id: u32,
}
impl TokenFactory {
pub(crate) fn new(key: usize) -> TokenFactory {
TokenFactory { key, sub_id: 0 }
}
pub fn token(&mut self) -> Token {
if self.sub_id >= MAX_SUBSOURCES_TOTAL as _ {
panic!("Too many sub-sources for this source");
}
let mut key = self.key;
key |= (self.sub_id as usize) << MAX_SOURCES;
let token = Token { key };
self.sub_id += 1;
token
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Token {
pub(crate) key: usize,
}
pub struct Poll {
poller: Arc<Poller>,
events: RefCell<Vec<Event>>,
level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
pub(crate) timers: Rc<RefCell<TimerWheel>>,
}
impl std::fmt::Debug for Poll {
#[cfg_attr(feature = "nightly_coverage", no_coverage)]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Poll { ... }")
}
}
impl Poll {
pub(crate) fn new() -> crate::Result<Poll> {
Self::new_inner(false)
}
fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
let poller = Poller::new()?;
let level_triggered = if poller.supports_level() && !force_fallback_lt {
None
} else {
Some(RefCell::new(HashMap::new()))
};
Ok(Poll {
poller: Arc::new(poller),
events: RefCell::new(Vec::new()),
timers: Rc::new(RefCell::new(TimerWheel::new())),
level_triggered,
})
}
pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
let now = std::time::Instant::now();
if let Some(next_timeout) = self.timers.borrow().next_deadline() {
if next_timeout <= now {
timeout = Some(Duration::ZERO);
} else if let Some(deadline) = timeout {
timeout = Some(std::cmp::min(deadline, next_timeout - now));
} else {
timeout = Some(next_timeout - now);
}
};
let mut events = self.events.borrow_mut();
self.poller.wait(&mut events, timeout)?;
let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
let mut poll_events = events
.drain(..)
.map(|ev| {
if let Some(level_triggered) = level_triggered.as_ref() {
if let Some((source, interest)) = level_triggered.get(&ev.key) {
self.poller.modify(source, *interest)?;
}
}
Ok(PollEvent {
readiness: Readiness {
readable: ev.readable,
writable: ev.writable,
error: false,
},
token: Token { key: ev.key },
})
})
.collect::<std::io::Result<Vec<_>>>()?;
drop(events);
let now = std::time::Instant::now();
let mut timers = self.timers.borrow_mut();
while let Some((_, token)) = timers.next_expired(now) {
poll_events.push(PollEvent {
readiness: Readiness {
readable: true,
writable: false,
error: false,
},
token,
});
}
Ok(poll_events)
}
pub fn register(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
interest: Interest,
mode: Mode,
token: Token,
) -> crate::Result<()> {
let raw = {
#[cfg(unix)]
{
fd.as_fd().as_raw_fd()
}
#[cfg(windows)]
{
fd.as_socket().as_raw_socket()
}
};
let ev = cvt_interest(interest, token);
self.poller
.add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}
Ok(())
}
pub fn reregister(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
interest: Interest,
mode: Mode,
token: Token,
) -> crate::Result<()> {
let raw = {
#[cfg(unix)]
{
fd.as_fd().as_raw_fd()
}
#[cfg(windows)]
{
fd.as_socket().as_raw_socket()
}
};
let ev = cvt_interest(interest, token);
self.poller
.modify_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}
Ok(())
}
pub fn unregister(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
) -> crate::Result<()> {
let raw = {
#[cfg(unix)]
{
fd.as_fd().as_raw_fd()
}
#[cfg(windows)]
{
fd.as_socket().as_raw_socket()
}
};
self.poller.delete(raw)?;
if let Some(level_triggered) = self.level_triggered.as_ref() {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.retain(|_, (source, _)| *source != raw);
}
Ok(())
}
pub(crate) fn notifier(&self) -> Notifier {
Notifier(self.poller.clone())
}
}
#[derive(Clone)]
pub(crate) struct Notifier(Arc<Poller>);
impl Notifier {
pub(crate) fn notify(&self) -> crate::Result<()> {
self.0.notify()?;
Ok(())
}
}
fn cvt_interest(interest: Interest, tok: Token) -> Event {
Event {
readable: interest.readable,
writable: interest.writable,
key: tok.key,
}
}
fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
if !supports_other_modes {
return PollMode::Oneshot;
}
match mode {
Mode::Edge => PollMode::Edge,
Mode::Level => PollMode::Level,
Mode::OneShot => PollMode::Oneshot,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{loop_logic::MAX_SOURCES_TOTAL, sources::ping::make_ping, EventSource};
#[should_panic]
#[test]
fn overflow_subid() {
let mut gen = TokenFactory {
key: 0,
sub_id: u32::MAX - 1,
};
let _ = gen.token();
}
#[test]
fn test_fallback_lt() {
let mut poll = Poll::new_inner(true).unwrap();
let mut gen = TokenFactory { key: 0, sub_id: 0 };
let (dst, mut src) = make_ping().unwrap();
src.register(&mut poll, &mut gen).unwrap();
let mut key = 0;
for _ in 0..2 {
dst.ping();
let events = poll.poll(Some(Duration::from_secs(3))).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].token, Token { key });
let events = poll.poll(Some(Duration::from_secs(3))).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].token, Token { key });
src.reregister(&mut poll, &mut gen).unwrap();
key += MAX_SOURCES_TOTAL;
}
src.unregister(&mut poll).unwrap();
}
}