use {sys, Token};
use event_imp::{self as event, Ready, Event, Evented, PollOpt};
use std::{fmt, io, ptr, usize};
use std::cell::UnsafeCell;
use std::{mem, ops, isize};
#[cfg(unix)]
use std::os::unix::io::AsRawFd;
#[cfg(unix)]
use std::os::unix::io::RawFd;
use std::sync::{Arc, Mutex, Condvar};
use std::sync::atomic::{AtomicUsize, AtomicPtr, AtomicBool};
use std::sync::atomic::Ordering::{self, Acquire, Release, AcqRel, Relaxed, SeqCst};
use std::time::{Duration, Instant};
#[cfg(unix)]
use sys::unix::UnixReady;
pub struct Poll {
selector: sys::Selector,
readiness_queue: ReadinessQueue,
lock_state: AtomicUsize,
lock: Mutex<()>,
condvar: Condvar,
}
pub struct Registration {
inner: RegistrationInner,
}
unsafe impl Send for Registration {}
unsafe impl Sync for Registration {}
#[derive(Clone)]
pub struct SetReadiness {
inner: RegistrationInner,
}
unsafe impl Send for SetReadiness {}
unsafe impl Sync for SetReadiness {}
#[derive(Debug)]
pub struct SelectorId {
id: AtomicUsize,
}
struct RegistrationInner {
node: *mut ReadinessNode,
}
#[derive(Clone)]
struct ReadinessQueue {
inner: Arc<ReadinessQueueInner>,
}
unsafe impl Send for ReadinessQueue {}
unsafe impl Sync for ReadinessQueue {}
struct ReadinessQueueInner {
awakener: sys::Awakener,
head_readiness: AtomicPtr<ReadinessNode>,
tail_readiness: UnsafeCell<*mut ReadinessNode>,
end_marker: Box<ReadinessNode>,
sleep_marker: Box<ReadinessNode>,
closed_marker: Box<ReadinessNode>,
}
struct ReadinessNode {
state: AtomicState,
token_0: UnsafeCell<Token>,
token_1: UnsafeCell<Token>,
token_2: UnsafeCell<Token>,
next_readiness: AtomicPtr<ReadinessNode>,
update_lock: AtomicBool,
readiness_queue: AtomicPtr<()>,
ref_count: AtomicUsize,
}
struct AtomicState {
inner: AtomicUsize,
}
const MASK_2: usize = 4 - 1;
const MASK_4: usize = 16 - 1;
const QUEUED_MASK: usize = 1 << QUEUED_SHIFT;
const DROPPED_MASK: usize = 1 << DROPPED_SHIFT;
const READINESS_SHIFT: usize = 0;
const INTEREST_SHIFT: usize = 4;
const POLL_OPT_SHIFT: usize = 8;
const TOKEN_RD_SHIFT: usize = 12;
const TOKEN_WR_SHIFT: usize = 14;
const QUEUED_SHIFT: usize = 16;
const DROPPED_SHIFT: usize = 17;
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
struct ReadinessState(usize);
enum Dequeue {
Data(*mut ReadinessNode),
Empty,
Inconsistent,
}
const AWAKEN: Token = Token(usize::MAX);
const MAX_REFCOUNT: usize = (isize::MAX) as usize;
impl Poll {
pub fn new() -> io::Result<Poll> {
is_send::<Poll>();
is_sync::<Poll>();
let poll = Poll {
selector: try!(sys::Selector::new()),
readiness_queue: try!(ReadinessQueue::new()),
lock_state: AtomicUsize::new(0),
lock: Mutex::new(()),
condvar: Condvar::new(),
};
try!(poll.readiness_queue.inner.awakener.register(&poll, AWAKEN, Ready::readable(), PollOpt::edge()));
Ok(poll)
}
pub fn register<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
where E: Evented
{
try!(validate_args(token, interest));
trace!("registering with poller");
try!(handle.register(self, token, interest, opts));
Ok(())
}
pub fn reregister<E: ?Sized>(&self, handle: &E, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()>
where E: Evented
{
try!(validate_args(token, interest));
trace!("registering with poller");
try!(handle.reregister(self, token, interest, opts));
Ok(())
}
pub fn deregister<E: ?Sized>(&self, handle: &E) -> io::Result<()>
where E: Evented
{
trace!("deregistering handle with poller");
try!(handle.deregister(self));
Ok(())
}
pub fn poll(&self, events: &mut Events, mut timeout: Option<Duration>) -> io::Result<usize> {
let zero = Some(Duration::from_millis(0));
let mut curr = self.lock_state.compare_and_swap(0, 1, SeqCst);
if 0 != curr {
let mut lock = self.lock.lock().unwrap();
let mut inc = false;
loop {
if curr & 1 == 0 {
let mut next = curr | 1;
if inc {
next -= 2;
}
let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
if actual != curr {
curr = actual;
continue;
}
break;
}
if timeout == zero {
if inc {
self.lock_state.fetch_sub(2, SeqCst);
}
return Ok(0);
}
if !inc {
let next = curr.checked_add(2).expect("overflow");
let actual = self.lock_state.compare_and_swap(curr, next, SeqCst);
if actual != curr {
curr = actual;
continue;
}
inc = true;
}
lock = match timeout {
Some(to) => {
let now = Instant::now();
let (l, _) = self.condvar.wait_timeout(lock, to).unwrap();
let elapsed = now.elapsed();
if elapsed >= to {
timeout = zero;
} else {
timeout = Some(to - elapsed);
}
l
}
None => {
self.condvar.wait(lock).unwrap()
}
};
curr = self.lock_state.load(SeqCst);
}
}
let ret = self.poll2(events, timeout);
if 1 != self.lock_state.fetch_and(!1, Release) {
let _lock = self.lock.lock().unwrap();
self.condvar.notify_one();
}
ret
}
#[inline]
fn poll2(&self, events: &mut Events, timeout: Option<Duration>) -> io::Result<usize> {
let timeout = if timeout == Some(Duration::from_millis(0)) {
timeout
} else if self.readiness_queue.prepare_for_sleep() {
timeout
} else {
Some(Duration::from_millis(0))
};
let res = self.selector.select(&mut events.inner, AWAKEN, timeout);
if try!(res) {
self.readiness_queue.inner.awakener.cleanup();
}
self.readiness_queue.poll(&mut events.inner);
Ok(events.len())
}
}
#[cfg(unix)]
fn registerable(interest: Ready) -> bool {
let unixinterest = UnixReady::from(interest);
unixinterest.is_readable() || unixinterest.is_writable() || unixinterest.is_aio()
}
#[cfg(not(unix))]
fn registerable(interest: Ready) -> bool {
interest.is_readable() || interest.is_writable()
}
fn validate_args(token: Token, interest: Ready) -> io::Result<()> {
if token == AWAKEN {
return Err(io::Error::new(io::ErrorKind::Other, "invalid token"));
}
if !registerable(interest) {
return Err(io::Error::new(io::ErrorKind::Other, "interest must include readable or writable or aio"));
}
Ok(())
}
impl fmt::Debug for Poll {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "Poll")
}
}
#[cfg(unix)]
impl AsRawFd for Poll {
fn as_raw_fd(&self) -> RawFd {
self.selector.as_raw_fd()
}
}
pub struct Events {
inner: sys::Events,
}
#[derive(Debug)]
pub struct Iter<'a> {
inner: &'a Events,
pos: usize,
}
impl Events {
pub fn with_capacity(capacity: usize) -> Events {
Events {
inner: sys::Events::with_capacity(capacity),
}
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.inner.get(idx)
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn capacity(&self) -> usize {
self.inner.capacity()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn iter(&self) -> Iter {
Iter {
inner: self,
pos: 0
}
}
}
impl<'a> IntoIterator for &'a Events {
type Item = Event;
type IntoIter = Iter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
impl<'a> Iterator for Iter<'a> {
type Item = Event;
fn next(&mut self) -> Option<Event> {
let ret = self.inner.get(self.pos);
self.pos += 1;
ret
}
}
impl fmt::Debug for Events {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Events")
.field("len", &self.len())
.field("capacity", &self.capacity())
.finish()
}
}
pub fn selector(poll: &Poll) -> &sys::Selector {
&poll.selector
}
#[allow(dead_code)]
pub fn new_registration(poll: &Poll, token: Token, ready: Ready, opt: PollOpt)
-> (Registration, SetReadiness)
{
Registration::new_priv(poll, token, ready, opt)
}
impl Registration {
pub fn new2() -> (Registration, SetReadiness) {
let node = Box::into_raw(Box::new(ReadinessNode::new(
ptr::null_mut(), Token(0), Ready::empty(), PollOpt::empty(), 2)));
let registration = Registration {
inner: RegistrationInner {
node: node,
},
};
let set_readiness = SetReadiness {
inner: RegistrationInner {
node: node,
},
};
(registration, set_readiness)
}
#[deprecated(since = "0.6.5", note = "use `new2` instead")]
#[cfg(feature = "with-deprecated")]
#[doc(hidden)]
pub fn new(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
-> (Registration, SetReadiness)
{
Registration::new_priv(poll, token, interest, opt)
}
fn new_priv(poll: &Poll, token: Token, interest: Ready, opt: PollOpt)
-> (Registration, SetReadiness)
{
is_send::<Registration>();
is_sync::<Registration>();
is_send::<SetReadiness>();
is_sync::<SetReadiness>();
let queue = poll.readiness_queue.inner.clone();
let queue: *mut () = unsafe { mem::transmute(queue) };
let node = Box::into_raw(Box::new(ReadinessNode::new(
queue, token, interest, opt, 3)));
let registration = Registration {
inner: RegistrationInner {
node: node,
},
};
let set_readiness = SetReadiness {
inner: RegistrationInner {
node: node,
},
};
(registration, set_readiness)
}
#[deprecated(since = "0.6.5", note = "use `Evented` impl")]
#[cfg(feature = "with-deprecated")]
#[doc(hidden)]
pub fn update(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self.inner.update(poll, token, interest, opts)
}
#[deprecated(since = "0.6.5", note = "use `Evented` impl")]
#[cfg(feature = "with-deprecated")]
#[doc(hidden)]
pub fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
}
}
impl Evented for Registration {
fn register(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self.inner.update(poll, token, interest, opts)
}
fn reregister(&self, poll: &Poll, token: Token, interest: Ready, opts: PollOpt) -> io::Result<()> {
self.inner.update(poll, token, interest, opts)
}
fn deregister(&self, poll: &Poll) -> io::Result<()> {
self.inner.update(poll, Token(0), Ready::empty(), PollOpt::empty())
}
}
impl Drop for Registration {
fn drop(&mut self) {
if self.inner.state.flag_as_dropped() {
let _ = self.inner.enqueue_with_wakeup();
}
}
}
impl fmt::Debug for Registration {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Registration")
.finish()
}
}
impl SetReadiness {
pub fn readiness(&self) -> Ready {
self.inner.readiness()
}
pub fn set_readiness(&self, ready: Ready) -> io::Result<()> {
self.inner.set_readiness(ready)
}
}
impl fmt::Debug for SetReadiness {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "SetReadiness")
}
}
impl RegistrationInner {
fn readiness(&self) -> Ready {
self.state.load(Relaxed).readiness()
}
fn set_readiness(&self, ready: Ready) -> io::Result<()> {
let mut state = self.state.load(Acquire);
let mut next;
loop {
next = state;
if state.is_dropped() {
return Ok(());
}
next.set_readiness(ready);
if !next.effective_readiness().is_empty() {
next.set_queued();
}
let actual = self.state.compare_and_swap(state, next, AcqRel);
if state == actual {
break;
}
state = actual;
}
if !state.is_queued() && next.is_queued() {
try!(self.enqueue_with_wakeup());
}
Ok(())
}
fn update(&self, poll: &Poll, token: Token, interest: Ready, opt: PollOpt) -> io::Result<()> {
let mut queue = self.readiness_queue.load(Relaxed);
let other: &*mut () = unsafe { mem::transmute(&poll.readiness_queue.inner) };
let other = *other;
debug_assert!(mem::size_of::<Arc<ReadinessQueueInner>>() == mem::size_of::<*mut ()>());
if queue.is_null() {
let actual = self.readiness_queue.compare_and_swap(
queue, other, Release);
if actual.is_null() {
self.ref_count.fetch_add(1, Relaxed);
mem::forget(poll.readiness_queue.clone());
} else {
if actual != other {
return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
}
}
queue = other;
} else if queue != other {
return Err(io::Error::new(io::ErrorKind::Other, "registration handle associated with another `Poll` instance"));
}
unsafe {
let actual = &poll.readiness_queue.inner as *const _ as *const usize;
debug_assert_eq!(queue as usize, *actual);
}
if self.update_lock.compare_and_swap(false, true, Acquire) {
return Ok(());
}
let mut state = self.state.load(Relaxed);
let mut next;
let curr_token_pos = state.token_write_pos();
let curr_token = unsafe { self::token(self, curr_token_pos) };
let mut next_token_pos = curr_token_pos;
if token != curr_token {
next_token_pos = state.next_token_pos();
match next_token_pos {
0 => unsafe { *self.token_0.get() = token },
1 => unsafe { *self.token_1.get() = token },
2 => unsafe { *self.token_2.get() = token },
_ => unreachable!(),
}
}
loop {
next = state;
debug_assert!(!state.is_dropped());
next.set_token_write_pos(next_token_pos);
next.set_interest(interest);
next.set_poll_opt(opt);
if !next.effective_readiness().is_empty() {
next.set_queued();
}
let actual = self.state.compare_and_swap(state, next, Release);
if actual == state {
break;
}
debug_assert_eq!(curr_token_pos, actual.token_write_pos());
state = actual;
}
self.update_lock.store(false, Release);
if !state.is_queued() && next.is_queued() {
try!(enqueue_with_wakeup(queue, self));
}
Ok(())
}
}
impl ops::Deref for RegistrationInner {
type Target = ReadinessNode;
fn deref(&self) -> &ReadinessNode {
unsafe { &*self.node }
}
}
impl Clone for RegistrationInner {
fn clone(&self) -> RegistrationInner {
let old_size = self.ref_count.fetch_add(1, Relaxed);
if old_size & !MAX_REFCOUNT != 0 {
panic!();
}
RegistrationInner {
node: self.node.clone(),
}
}
}
impl Drop for RegistrationInner {
fn drop(&mut self) {
release_node(self.node);
}
}
impl ReadinessQueue {
fn new() -> io::Result<ReadinessQueue> {
is_send::<Self>();
is_sync::<Self>();
let end_marker = Box::new(ReadinessNode::marker());
let sleep_marker = Box::new(ReadinessNode::marker());
let closed_marker = Box::new(ReadinessNode::marker());
let ptr = &*end_marker as *const _ as *mut _;
Ok(ReadinessQueue {
inner: Arc::new(ReadinessQueueInner {
awakener: try!(sys::Awakener::new()),
head_readiness: AtomicPtr::new(ptr),
tail_readiness: UnsafeCell::new(ptr),
end_marker: end_marker,
sleep_marker: sleep_marker,
closed_marker: closed_marker,
})
})
}
fn poll(&self, dst: &mut sys::Events) {
let mut until = ptr::null_mut();
'outer:
while dst.len() < dst.capacity() {
let ptr = match unsafe { self.inner.dequeue_node(until) } {
Dequeue::Empty | Dequeue::Inconsistent => break,
Dequeue::Data(ptr) => ptr,
};
let node = unsafe { &*ptr };
let mut state = node.state.load(Acquire);
let mut next;
let mut readiness;
let mut opt;
loop {
next = state;
debug_assert!(state.is_queued());
if state.is_dropped() {
release_node(ptr);
continue 'outer;
}
readiness = state.effective_readiness();
opt = state.poll_opt();
if opt.is_edge() {
next.set_dequeued();
if opt.is_oneshot() && !readiness.is_empty() {
next.disarm();
}
} else if readiness.is_empty() {
next.set_dequeued();
}
next.update_token_read_pos();
if state == next {
break;
}
let actual = node.state.compare_and_swap(state, next, AcqRel);
if actual == state {
break;
}
state = actual;
}
if next.is_queued() {
if until.is_null() {
until = ptr;
}
self.inner.enqueue_node(node);
}
if !readiness.is_empty() {
let token = unsafe { token(node, next.token_read_pos()) };
dst.push_event(Event::new(readiness, token));
}
}
}
fn prepare_for_sleep(&self) -> bool {
let end_marker = self.inner.end_marker();
let sleep_marker = self.inner.sleep_marker();
let tail = unsafe { *self.inner.tail_readiness.get() };
if tail == sleep_marker {
return self.inner.head_readiness.load(Acquire) == sleep_marker;
}
if tail != end_marker {
return false;
}
self.inner.sleep_marker.next_readiness.store(ptr::null_mut(), Relaxed);
let actual = self.inner.head_readiness.compare_and_swap(
end_marker, sleep_marker, AcqRel);
debug_assert!(actual != sleep_marker);
if actual != end_marker {
return false;
}
debug_assert!(unsafe { *self.inner.tail_readiness.get() == end_marker });
debug_assert!(self.inner.end_marker.next_readiness.load(Relaxed).is_null());
unsafe { *self.inner.tail_readiness.get() = sleep_marker; }
true
}
}
impl Drop for ReadinessQueue {
fn drop(&mut self) {
self.inner.enqueue_node(&*self.inner.closed_marker);
loop {
let ptr = match unsafe { self.inner.dequeue_node(ptr::null_mut()) } {
Dequeue::Empty => break,
Dequeue::Inconsistent => {
continue;
}
Dequeue::Data(ptr) => ptr,
};
let node = unsafe { &*ptr };
let state = node.state.load(Acquire);
debug_assert!(state.is_queued());
release_node(ptr);
}
}
}
impl ReadinessQueueInner {
fn wakeup(&self) -> io::Result<()> {
self.awakener.wakeup()
}
fn enqueue_node_with_wakeup(&self, node: &ReadinessNode) -> io::Result<()> {
if self.enqueue_node(node) {
try!(self.wakeup());
}
Ok(())
}
fn enqueue_node(&self, node: &ReadinessNode) -> bool {
let node_ptr = node as *const _ as *mut _;
node.next_readiness.store(ptr::null_mut(), Relaxed);
unsafe {
let mut prev = self.head_readiness.load(Acquire);
loop {
if prev == self.closed_marker() {
debug_assert!(node_ptr != self.closed_marker());
debug_assert!(node_ptr != self.sleep_marker());
if node_ptr != self.end_marker() {
debug_assert!(node.ref_count.load(Relaxed) >= 2);
release_node(node_ptr);
}
return false;
}
let act = self.head_readiness.compare_and_swap(prev, node_ptr, AcqRel);
if prev == act {
break;
}
prev = act;
}
debug_assert!((*prev).next_readiness.load(Relaxed).is_null());
(*prev).next_readiness.store(node_ptr, Release);
prev == self.sleep_marker()
}
}
unsafe fn dequeue_node(&self, until: *mut ReadinessNode) -> Dequeue {
let mut tail = *self.tail_readiness.get();
let mut next = (*tail).next_readiness.load(Acquire);
if tail == self.end_marker() || tail == self.sleep_marker() || tail == self.closed_marker() {
if next.is_null() {
return Dequeue::Empty;
}
*self.tail_readiness.get() = next;
tail = next;
next = (*next).next_readiness.load(Acquire);
}
if tail == until {
return Dequeue::Empty;
}
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
if self.head_readiness.load(Acquire) != tail {
return Dequeue::Inconsistent;
}
self.enqueue_node(&*self.end_marker);
next = (*tail).next_readiness.load(Acquire);
if !next.is_null() {
*self.tail_readiness.get() = next;
return Dequeue::Data(tail);
}
Dequeue::Inconsistent
}
fn end_marker(&self) -> *mut ReadinessNode {
&*self.end_marker as *const ReadinessNode as *mut ReadinessNode
}
fn sleep_marker(&self) -> *mut ReadinessNode {
&*self.sleep_marker as *const ReadinessNode as *mut ReadinessNode
}
fn closed_marker(&self) -> *mut ReadinessNode {
&*self.closed_marker as *const ReadinessNode as *mut ReadinessNode
}
}
impl ReadinessNode {
fn new(queue: *mut (),
token: Token,
interest: Ready,
opt: PollOpt,
ref_count: usize) -> ReadinessNode
{
ReadinessNode {
state: AtomicState::new(interest, opt),
token_0: UnsafeCell::new(token),
token_1: UnsafeCell::new(Token(0)),
token_2: UnsafeCell::new(Token(0)),
next_readiness: AtomicPtr::new(ptr::null_mut()),
update_lock: AtomicBool::new(false),
readiness_queue: AtomicPtr::new(queue),
ref_count: AtomicUsize::new(ref_count),
}
}
fn marker() -> ReadinessNode {
ReadinessNode {
state: AtomicState::new(Ready::empty(), PollOpt::empty()),
token_0: UnsafeCell::new(Token(0)),
token_1: UnsafeCell::new(Token(0)),
token_2: UnsafeCell::new(Token(0)),
next_readiness: AtomicPtr::new(ptr::null_mut()),
update_lock: AtomicBool::new(false),
readiness_queue: AtomicPtr::new(ptr::null_mut()),
ref_count: AtomicUsize::new(0),
}
}
fn enqueue_with_wakeup(&self) -> io::Result<()> {
let queue = self.readiness_queue.load(Acquire);
if queue.is_null() {
return Ok(());
}
enqueue_with_wakeup(queue, self)
}
}
fn enqueue_with_wakeup(queue: *mut (), node: &ReadinessNode) -> io::Result<()> {
debug_assert!(!queue.is_null());
let queue: &Arc<ReadinessQueueInner> = unsafe { mem::transmute(&queue) };
queue.enqueue_node_with_wakeup(node)
}
unsafe fn token(node: &ReadinessNode, pos: usize) -> Token {
match pos {
0 => *node.token_0.get(),
1 => *node.token_1.get(),
2 => *node.token_2.get(),
_ => unreachable!(),
}
}
fn release_node(ptr: *mut ReadinessNode) {
unsafe {
if (*ptr).ref_count.fetch_sub(1, AcqRel) != 1 {
return;
}
let node = Box::from_raw(ptr);
let queue = node.readiness_queue.load(Acquire);
if queue.is_null() {
return;
}
let _: Arc<ReadinessQueueInner> = mem::transmute(queue);
}
}
impl AtomicState {
fn new(interest: Ready, opt: PollOpt) -> AtomicState {
let state = ReadinessState::new(interest, opt);
AtomicState {
inner: AtomicUsize::new(state.into()),
}
}
fn load(&self, order: Ordering) -> ReadinessState {
self.inner.load(order).into()
}
fn compare_and_swap(&self, current: ReadinessState, new: ReadinessState, order: Ordering) -> ReadinessState {
self.inner.compare_and_swap(current.into(), new.into(), order).into()
}
fn flag_as_dropped(&self) -> bool {
let prev: ReadinessState = self.inner.fetch_or(DROPPED_MASK | QUEUED_MASK, Release).into();
debug_assert!(!prev.is_dropped());
!prev.is_queued()
}
}
impl ReadinessState {
#[inline]
fn new(interest: Ready, opt: PollOpt) -> ReadinessState {
let interest = event::ready_as_usize(interest);
let opt = event::opt_as_usize(opt);
debug_assert!(interest <= MASK_4);
debug_assert!(opt <= MASK_4);
let mut val = interest << INTEREST_SHIFT;
val |= opt << POLL_OPT_SHIFT;
ReadinessState(val)
}
#[inline]
fn get(&self, mask: usize, shift: usize) -> usize{
(self.0 >> shift) & mask
}
#[inline]
fn set(&mut self, val: usize, mask: usize, shift: usize) {
self.0 = (self.0 & !(mask << shift)) | (val << shift)
}
#[inline]
fn readiness(&self) -> Ready {
let v = self.get(MASK_4, READINESS_SHIFT);
event::ready_from_usize(v)
}
#[inline]
fn effective_readiness(&self) -> Ready {
self.readiness() & self.interest()
}
#[inline]
fn set_readiness(&mut self, v: Ready) {
self.set(event::ready_as_usize(v), MASK_4, READINESS_SHIFT);
}
#[inline]
fn interest(&self) -> Ready {
let v = self.get(MASK_4, INTEREST_SHIFT);
event::ready_from_usize(v)
}
#[inline]
fn set_interest(&mut self, v: Ready) {
self.set(event::ready_as_usize(v), MASK_4, INTEREST_SHIFT);
}
#[inline]
fn disarm(&mut self) {
self.set_interest(Ready::empty());
}
#[inline]
fn poll_opt(&self) -> PollOpt {
let v = self.get(MASK_4, POLL_OPT_SHIFT);
event::opt_from_usize(v)
}
#[inline]
fn set_poll_opt(&mut self, v: PollOpt) {
self.set(event::opt_as_usize(v), MASK_4, POLL_OPT_SHIFT);
}
#[inline]
fn is_queued(&self) -> bool {
self.0 & QUEUED_MASK == QUEUED_MASK
}
#[inline]
fn set_queued(&mut self) {
debug_assert!(!self.is_dropped());
self.0 |= QUEUED_MASK;
}
#[inline]
fn set_dequeued(&mut self) {
debug_assert!(self.is_queued());
self.0 &= !QUEUED_MASK
}
#[inline]
fn is_dropped(&self) -> bool {
self.0 & DROPPED_MASK == DROPPED_MASK
}
#[inline]
fn token_read_pos(&self) -> usize {
self.get(MASK_2, TOKEN_RD_SHIFT)
}
#[inline]
fn token_write_pos(&self) -> usize {
self.get(MASK_2, TOKEN_WR_SHIFT)
}
#[inline]
fn next_token_pos(&self) -> usize {
let rd = self.token_read_pos();
let wr = self.token_write_pos();
match wr {
0 => {
match rd {
1 => 2,
2 => 1,
0 => 1,
_ => unreachable!(),
}
}
1 => {
match rd {
0 => 2,
2 => 0,
1 => 2,
_ => unreachable!(),
}
}
2 => {
match rd {
0 => 1,
1 => 0,
2 => 0,
_ => unreachable!(),
}
}
_ => unreachable!(),
}
}
#[inline]
fn set_token_write_pos(&mut self, val: usize) {
self.set(val, MASK_2, TOKEN_WR_SHIFT);
}
#[inline]
fn update_token_read_pos(&mut self) {
let val = self.token_write_pos();
self.set(val, MASK_2, TOKEN_RD_SHIFT);
}
}
impl From<ReadinessState> for usize {
fn from(src: ReadinessState) -> usize {
src.0
}
}
impl From<usize> for ReadinessState {
fn from(src: usize) -> ReadinessState {
ReadinessState(src)
}
}
fn is_send<T: Send>() {}
fn is_sync<T: Sync>() {}
impl SelectorId {
pub fn new() -> SelectorId {
SelectorId {
id: AtomicUsize::new(0),
}
}
pub fn associate_selector(&self, poll: &Poll) -> io::Result<()> {
let selector_id = self.id.load(Ordering::SeqCst);
if selector_id != 0 && selector_id != poll.selector.id() {
Err(io::Error::new(io::ErrorKind::Other, "socket already registered"))
} else {
self.id.store(poll.selector.id(), Ordering::SeqCst);
Ok(())
}
}
}
impl Clone for SelectorId {
fn clone(&self) -> SelectorId {
SelectorId {
id: AtomicUsize::new(self.id.load(Ordering::SeqCst)),
}
}
}
#[test]
#[cfg(unix)]
pub fn as_raw_fd() {
let poll = Poll::new().unwrap();
assert!(poll.as_raw_fd() > 0);
}