#![allow(deprecated)]
use std::{fmt, io, u32};
use std::cell::UnsafeCell;
use std::os::windows::prelude::*;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicUsize, Ordering, ATOMIC_USIZE_INIT};
use std::time::Duration;
use lazycell::AtomicLazyCell;
use winapi::*;
use miow;
use miow::iocp::{CompletionPort, CompletionStatus};
use event_imp::{Event, Evented, Ready};
use poll::{self, Poll};
use sys::windows::buffer_pool::BufferPool;
use {Token, PollOpt};
static NEXT_ID: AtomicUsize = ATOMIC_USIZE_INIT;
pub struct Selector {
inner: Arc<SelectorInner>,
}
struct SelectorInner {
id: usize,
port: CompletionPort,
buffers: Mutex<BufferPool>,
}
impl Selector {
pub fn new() -> io::Result<Selector> {
let id = NEXT_ID.fetch_add(1, Ordering::Relaxed) + 1;
CompletionPort::new(1).map(|cp| {
Selector {
inner: Arc::new(SelectorInner {
id: id,
port: cp,
buffers: Mutex::new(BufferPool::new(256)),
}),
}
})
}
pub fn select(&self,
events: &mut Events,
awakener: Token,
timeout: Option<Duration>) -> io::Result<bool> {
trace!("select; timeout={:?}", timeout);
events.events.truncate(0);
trace!("polling IOCP");
let n = match self.inner.port.get_many(&mut events.statuses, timeout) {
Ok(statuses) => statuses.len(),
Err(ref e) if e.raw_os_error() == Some(WAIT_TIMEOUT as i32) => 0,
Err(e) => return Err(e),
};
let mut ret = false;
for status in events.statuses[..n].iter() {
if status.overlapped() as usize == 0 {
assert_eq!(status.token(), usize::from(awakener));
ret = true;
continue;
}
let callback = unsafe {
(*(status.overlapped() as *mut Overlapped)).callback
};
trace!("select; -> got overlapped");
callback(status.entry());
}
trace!("returning");
Ok(ret)
}
pub fn port(&self) -> &CompletionPort {
&self.inner.port
}
pub fn clone_ref(&self) -> Selector {
Selector { inner: self.inner.clone() }
}
pub fn id(&self) -> usize {
self.inner.id
}
}
impl SelectorInner {
fn identical(&self, other: &SelectorInner) -> bool {
(self as *const SelectorInner) == (other as *const SelectorInner)
}
}
pub struct Binding {
selector: AtomicLazyCell<Arc<SelectorInner>>,
}
impl Binding {
pub fn new() -> Binding {
Binding { selector: AtomicLazyCell::new() }
}
pub unsafe fn register_handle(&self,
handle: &AsRawHandle,
token: Token,
poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
drop(self.selector.fill(selector.inner.clone()));
try!(self.check_same_selector(poll));
selector.inner.port.add_handle(usize::from(token), handle)
}
pub unsafe fn register_socket(&self,
handle: &AsRawSocket,
token: Token,
poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
drop(self.selector.fill(selector.inner.clone()));
try!(self.check_same_selector(poll));
selector.inner.port.add_socket(usize::from(token), handle)
}
pub unsafe fn reregister_handle(&self,
_handle: &AsRawHandle,
_token: Token,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn reregister_socket(&self,
_socket: &AsRawSocket,
_token: Token,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn deregister_handle(&self,
_handle: &AsRawHandle,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
pub unsafe fn deregister_socket(&self,
_socket: &AsRawSocket,
poll: &Poll) -> io::Result<()> {
self.check_same_selector(poll)
}
fn check_same_selector(&self, poll: &Poll) -> io::Result<()> {
let selector = poll::selector(poll);
match self.selector.borrow() {
Some(prev) if prev.identical(&selector.inner) => Ok(()),
Some(_) |
None => Err(other("socket already registered")),
}
}
}
impl fmt::Debug for Binding {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Binding")
}
}
pub struct ReadyBinding {
binding: Binding,
readiness: Option<poll::SetReadiness>,
}
impl ReadyBinding {
pub fn new() -> ReadyBinding {
ReadyBinding {
binding: Binding::new(),
readiness: None,
}
}
pub fn registered(&self) -> bool {
self.readiness.is_some()
}
pub fn get_buffer(&self, size: usize) -> Vec<u8> {
match self.binding.selector.borrow() {
Some(i) => i.buffers.lock().unwrap().get(size),
None => Vec::with_capacity(size),
}
}
pub fn put_buffer(&self, buf: Vec<u8>) {
if let Some(i) = self.binding.selector.borrow() {
i.buffers.lock().unwrap().put(buf);
}
}
pub fn set_readiness(&self, set: Ready) {
if let Some(ref i) = self.readiness {
trace!("set readiness to {:?}", set);
i.set_readiness(set).expect("event loop disappeared?");
}
}
pub fn readiness(&self) -> Ready {
match self.readiness {
Some(ref i) => i.readiness(),
None => Ready::empty(),
}
}
pub fn register_socket(&mut self,
socket: &AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("register {:?} {:?}", token, events);
unsafe {
try!(self.binding.register_socket(socket, token, poll));
}
let (r, s) = poll::new_registration(poll, token, events, opts);
self.readiness = Some(s);
*registration.lock().unwrap() = Some(r);
Ok(())
}
pub fn reregister_socket(&mut self,
socket: &AsRawSocket,
poll: &Poll,
token: Token,
events: Ready,
opts: PollOpt,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("reregister {:?} {:?}", token, events);
unsafe {
try!(self.binding.reregister_socket(socket, token, poll));
}
registration.lock().unwrap()
.as_mut().unwrap()
.reregister(poll, token, events, opts)
}
pub fn deregister(&mut self,
socket: &AsRawSocket,
poll: &Poll,
registration: &Mutex<Option<poll::Registration>>)
-> io::Result<()> {
trace!("deregistering");
unsafe {
try!(self.binding.deregister_socket(socket, poll));
}
registration.lock().unwrap()
.as_ref().unwrap()
.deregister(poll)
}
}
fn other(s: &str) -> io::Error {
io::Error::new(io::ErrorKind::Other, s)
}
#[derive(Debug)]
pub struct Events {
statuses: Box<[CompletionStatus]>,
events: Vec<Event>,
}
impl Events {
pub fn with_capacity(cap: usize) -> Events {
Events {
statuses: vec![CompletionStatus::zero(); cap].into_boxed_slice(),
events: Vec::with_capacity(cap),
}
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn capacity(&self) -> usize {
self.events.capacity()
}
pub fn get(&self, idx: usize) -> Option<Event> {
self.events.get(idx).map(|e| *e)
}
pub fn push_event(&mut self, event: Event) {
self.events.push(event);
}
}
macro_rules! overlapped2arc {
($e:expr, $t:ty, $($field:ident).+) => ({
let offset = offset_of!($t, $($field).+);
debug_assert!(offset < mem::size_of::<$t>());
FromRawArc::from_raw(($e as usize - offset) as *mut $t)
})
}
macro_rules! offset_of {
($t:ty, $($field:ident).+) => (
&(*(0 as *const $t)).$($field).+ as *const _ as usize
)
}
#[repr(C)]
pub struct Overlapped {
inner: UnsafeCell<miow::Overlapped>,
callback: fn(&OVERLAPPED_ENTRY),
}
impl Overlapped {
pub fn new(cb: fn(&OVERLAPPED_ENTRY)) -> Overlapped {
Overlapped {
inner: UnsafeCell::new(miow::Overlapped::zero()),
callback: cb,
}
}
pub fn as_mut_ptr(&self) -> *mut OVERLAPPED {
unsafe {
(*self.inner.get()).raw()
}
}
}
impl fmt::Debug for Overlapped {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Overlapped")
}
}
unsafe impl Send for Overlapped {}
unsafe impl Sync for Overlapped {}