use std::{cell::RefCell, collections::HashMap, rc::Rc, sync::Arc, time::Duration};
#[cfg(unix)]
use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd as Borrowed, RawFd as Raw};
#[cfg(windows)]
use std::os::windows::io::{AsRawSocket, AsSocket, BorrowedSocket as Borrowed, RawSocket as Raw};
use polling::{Event, Events, PollMode, Poller};
use crate::loop_logic::{MAX_SOURCES, MAX_SUBSOURCES_TOTAL};
use crate::sources::timer::TimerWheel;
use crate::RegistrationToken;
#[derive(Copy, Clone, Debug)]
pub enum Mode {
OneShot,
Level,
Edge,
}
#[derive(Copy, Clone, Debug)]
pub struct Interest {
pub readable: bool,
pub writable: bool,
}
impl Interest {
pub const EMPTY: Interest = Interest {
readable: false,
writable: false,
};
pub const READ: Interest = Interest {
readable: true,
writable: false,
};
pub const WRITE: Interest = Interest {
readable: false,
writable: true,
};
pub const BOTH: Interest = Interest {
readable: true,
writable: true,
};
}
#[derive(Copy, Clone, Debug)]
pub struct Readiness {
pub readable: bool,
pub writable: bool,
pub error: bool,
}
impl Readiness {
pub const EMPTY: Readiness = Readiness {
readable: false,
writable: false,
error: false,
};
}
#[derive(Debug)]
pub(crate) struct PollEvent {
pub(crate) readiness: Readiness,
pub(crate) token: Token,
}
#[derive(Debug)]
pub struct TokenFactory {
key: usize,
sub_id: u32,
}
impl TokenFactory {
pub(crate) fn new(key: usize) -> TokenFactory {
TokenFactory { key, sub_id: 0 }
}
pub(crate) fn registration_token(&self) -> RegistrationToken {
RegistrationToken::new(self.key)
}
pub fn token(&mut self) -> Token {
if self.sub_id >= MAX_SUBSOURCES_TOTAL as _ {
panic!("Too many sub-sources for this source");
}
let mut key = self.key;
key |= (self.sub_id as usize) << MAX_SOURCES;
let token = Token { key };
self.sub_id += 1;
token
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct Token {
pub(crate) key: usize,
}
pub struct Poll {
poller: Arc<Poller>,
events: RefCell<Events>,
level_triggered: Option<RefCell<HashMap<usize, (Raw, polling::Event)>>>,
pub(crate) timers: Rc<RefCell<TimerWheel>>,
}
impl std::fmt::Debug for Poll {
#[cfg_attr(feature = "nightly_coverage", coverage(off))]
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("Poll { ... }")
}
}
impl Poll {
pub(crate) fn new() -> crate::Result<Poll> {
Self::new_inner(false)
}
fn new_inner(force_fallback_lt: bool) -> crate::Result<Poll> {
let poller = Poller::new()?;
let level_triggered = if poller.supports_level() && !force_fallback_lt {
None
} else {
Some(RefCell::new(HashMap::new()))
};
Ok(Poll {
poller: Arc::new(poller),
events: RefCell::new(Events::new()),
timers: Rc::new(RefCell::new(TimerWheel::new())),
level_triggered,
})
}
pub(crate) fn poll(&self, mut timeout: Option<Duration>) -> crate::Result<Vec<PollEvent>> {
let now = std::time::Instant::now();
if let Some(next_timeout) = self.timers.borrow().next_deadline() {
if next_timeout <= now {
timeout = Some(Duration::ZERO);
} else if let Some(deadline) = timeout {
timeout = Some(std::cmp::min(deadline, next_timeout - now));
} else {
timeout = Some(next_timeout - now);
}
};
let mut events = self.events.borrow_mut();
events.clear();
self.poller.wait(&mut events, timeout)?;
let level_triggered = self.level_triggered.as_ref().map(RefCell::borrow);
let mut poll_events = events
.iter()
.map(|ev| {
if let Some(level_triggered) = level_triggered.as_ref() {
if let Some((source, interest)) = level_triggered.get(&ev.key) {
self.poller
.modify(unsafe { Borrowed::borrow_raw(*source) }, *interest)?;
}
}
Ok(PollEvent {
readiness: Readiness {
readable: ev.readable,
writable: ev.writable,
error: false,
},
token: Token { key: ev.key },
})
})
.collect::<std::io::Result<Vec<_>>>()?;
drop(events);
let now = std::time::Instant::now();
let mut timers = self.timers.borrow_mut();
while let Some((_, token)) = timers.next_expired(now) {
poll_events.push(PollEvent {
readiness: Readiness {
readable: true,
writable: false,
error: false,
},
token,
});
}
Ok(poll_events)
}
pub unsafe fn register(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
interest: Interest,
mode: Mode,
token: Token,
) -> crate::Result<()> {
let raw = {
#[cfg(unix)]
{
fd.as_fd().as_raw_fd()
}
#[cfg(windows)]
{
fd.as_socket().as_raw_socket()
}
};
let ev = cvt_interest(interest, token);
unsafe {
self.poller
.add_with_mode(raw, ev, cvt_mode(mode, self.poller.supports_level()))?;
}
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}
Ok(())
}
pub fn reregister(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
interest: Interest,
mode: Mode,
token: Token,
) -> crate::Result<()> {
let (borrowed, raw) = {
#[cfg(unix)]
{
(fd.as_fd(), fd.as_fd().as_raw_fd())
}
#[cfg(windows)]
{
(fd.as_socket(), fd.as_socket().as_raw_socket())
}
};
let ev = cvt_interest(interest, token);
self.poller
.modify_with_mode(borrowed, ev, cvt_mode(mode, self.poller.supports_level()))?;
if let (Mode::Level, Some(level_triggered)) = (mode, self.level_triggered.as_ref()) {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.insert(ev.key, (raw, ev));
}
Ok(())
}
pub fn unregister(
&self,
#[cfg(unix)] fd: impl AsFd,
#[cfg(windows)] fd: impl AsSocket,
) -> crate::Result<()> {
let (borrowed, raw) = {
#[cfg(unix)]
{
(fd.as_fd(), fd.as_fd().as_raw_fd())
}
#[cfg(windows)]
{
(fd.as_socket(), fd.as_socket().as_raw_socket())
}
};
self.poller.delete(borrowed)?;
if let Some(level_triggered) = self.level_triggered.as_ref() {
let mut level_triggered = level_triggered.borrow_mut();
level_triggered.retain(|_, (source, _)| *source != raw);
}
Ok(())
}
pub(crate) fn notifier(&self) -> Notifier {
Notifier(self.poller.clone())
}
pub(crate) fn poller(&self) -> &Arc<Poller> {
&self.poller
}
}
#[derive(Clone)]
pub(crate) struct Notifier(Arc<Poller>);
impl Notifier {
pub(crate) fn notify(&self) -> crate::Result<()> {
self.0.notify()?;
Ok(())
}
}
fn cvt_interest(interest: Interest, tok: Token) -> Event {
let mut event = Event::none(tok.key);
event.readable = interest.readable;
event.writable = interest.writable;
event
}
fn cvt_mode(mode: Mode, supports_other_modes: bool) -> PollMode {
if !supports_other_modes {
return PollMode::Oneshot;
}
match mode {
Mode::Edge => PollMode::Edge,
Mode::Level => PollMode::Level,
Mode::OneShot => PollMode::Oneshot,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{loop_logic::MAX_SOURCES_TOTAL, sources::ping::make_ping, EventSource};
#[should_panic]
#[test]
fn overflow_subid() {
let mut gen = TokenFactory {
key: 0,
sub_id: u32::MAX - 1,
};
let _ = gen.token();
}
#[test]
fn test_fallback_lt() {
let mut poll = Poll::new_inner(true).unwrap();
let mut gen = TokenFactory { key: 0, sub_id: 0 };
let (dst, mut src) = make_ping().unwrap();
src.register(&mut poll, &mut gen).unwrap();
let mut key = 0;
for _ in 0..2 {
dst.ping();
let events = poll.poll(Some(Duration::from_secs(3))).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].token, Token { key });
let events = poll.poll(Some(Duration::from_secs(3))).unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].token, Token { key });
src.reregister(&mut poll, &mut gen).unwrap();
key += MAX_SOURCES_TOTAL;
}
src.unregister(&mut poll).unwrap();
}
}