#![allow(deprecated, warnings)]
use std::fmt;
use std::io::{self, Read, Write};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::Relaxed;
use std::sync::Mutex;
use futures::{task, Async, Poll};
use mio::event::Evented;
use mio::Ready;
use tokio_io::{AsyncRead, AsyncWrite};
use reactor::{Handle, Registration};
#[deprecated(since = "0.1.2", note = "PollEvented2 instead")]
#[doc(hidden)]
pub struct PollEvented<E> {
io: E,
inner: Inner,
handle: Handle,
}
struct Inner {
registration: Mutex<Registration>,
read_readiness: AtomicUsize,
write_readiness: AtomicUsize,
}
impl<E: fmt::Debug> fmt::Debug for PollEvented<E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("PollEvented").field("io", &self.io).finish()
}
}
impl<E> PollEvented<E> {
pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>>
where
E: Evented,
{
let registration = Registration::new();
registration.register(&io)?;
Ok(PollEvented {
io: io,
inner: Inner {
registration: Mutex::new(registration),
read_readiness: AtomicUsize::new(0),
write_readiness: AtomicUsize::new(0),
},
handle: handle.clone(),
})
}
pub fn poll_read(&mut self) -> Async<()> {
if self.poll_read2().is_ready() {
return ().into();
}
Async::NotReady
}
fn poll_read2(&self) -> Async<Ready> {
let r = self.inner.registration.lock().unwrap();
match self.inner.read_readiness.load(Relaxed) {
0 => {}
mut n => {
if let Some(ready) = r.take_read_ready().unwrap() {
n |= ready2usize(ready);
self.inner.read_readiness.store(n, Relaxed);
}
return usize2ready(n).into();
}
}
let ready = match r.poll_read_ready().unwrap() {
Async::Ready(r) => r,
_ => return Async::NotReady,
};
self.inner.read_readiness.store(ready2usize(ready), Relaxed);
ready.into()
}
pub fn poll_write(&mut self) -> Async<()> {
let r = self.inner.registration.lock().unwrap();
match self.inner.write_readiness.load(Relaxed) {
0 => {}
mut n => {
if let Some(ready) = r.take_write_ready().unwrap() {
n |= ready2usize(ready);
self.inner.write_readiness.store(n, Relaxed);
}
return ().into();
}
}
let ready = match r.poll_write_ready().unwrap() {
Async::Ready(r) => r,
_ => return Async::NotReady,
};
self.inner
.write_readiness
.store(ready2usize(ready), Relaxed);
().into()
}
pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> {
let mut ret = Ready::empty();
if mask.is_empty() {
return ret.into();
}
if mask.is_writable() {
if self.poll_write().is_ready() {
ret = Ready::writable();
}
}
let mask = mask - Ready::writable();
if !mask.is_empty() {
if let Async::Ready(v) = self.poll_read2() {
ret |= v & mask;
}
}
if ret.is_empty() {
if mask.is_writable() {
let _ = self.need_write();
}
if mask.is_readable() {
let _ = self.need_read();
}
Async::NotReady
} else {
ret.into()
}
}
pub fn need_read(&mut self) -> io::Result<()> {
self.inner.read_readiness.store(0, Relaxed);
if self.poll_read().is_ready() {
task::current().notify();
}
Ok(())
}
pub fn need_write(&mut self) -> io::Result<()> {
self.inner.write_readiness.store(0, Relaxed);
if self.poll_write().is_ready() {
task::current().notify();
}
Ok(())
}
pub fn handle(&self) -> &Handle {
&self.handle
}
pub fn get_ref(&self) -> &E {
&self.io
}
pub fn get_mut(&mut self) -> &mut E {
&mut self.io
}
pub fn into_inner(self) -> E {
self.io
}
pub fn deregister(&self) -> io::Result<()>
where
E: Evented,
{
self.inner.registration.lock().unwrap().deregister(&self.io)
}
}
impl<E: Read> Read for PollEvented<E> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_read() {
return Err(io::ErrorKind::WouldBlock.into());
}
let r = self.get_mut().read(buf);
if is_wouldblock(&r) {
self.need_read()?;
}
return r;
}
}
impl<E: Write> Write for PollEvented<E> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into());
}
let r = self.get_mut().write(buf);
if is_wouldblock(&r) {
self.need_write()?;
}
return r;
}
fn flush(&mut self) -> io::Result<()> {
if let Async::NotReady = self.poll_write() {
return Err(io::ErrorKind::WouldBlock.into());
}
let r = self.get_mut().flush();
if is_wouldblock(&r) {
self.need_write()?;
}
return r;
}
}
impl<E: Read> AsyncRead for PollEvented<E> {}
impl<E: Write> AsyncWrite for PollEvented<E> {
fn shutdown(&mut self) -> Poll<(), io::Error> {
Ok(().into())
}
}
fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
match *r {
Ok(_) => false,
Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
}
}
const READ: usize = 1 << 0;
const WRITE: usize = 1 << 1;
fn ready2usize(ready: Ready) -> usize {
let mut bits = 0;
if ready.is_readable() {
bits |= READ;
}
if ready.is_writable() {
bits |= WRITE;
}
bits | platform::ready2usize(ready)
}
fn usize2ready(bits: usize) -> Ready {
let mut ready = Ready::empty();
if bits & READ != 0 {
ready.insert(Ready::readable());
}
if bits & WRITE != 0 {
ready.insert(Ready::writable());
}
ready | platform::usize2ready(bits)
}
#[cfg(unix)]
mod platform {
use mio::unix::UnixReady;
use mio::Ready;
const HUP: usize = 1 << 2;
const ERROR: usize = 1 << 3;
const AIO: usize = 1 << 4;
const LIO: usize = 1 << 5;
#[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
fn is_aio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_aio()
}
#[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
fn is_aio(_ready: &Ready) -> bool {
false
}
#[cfg(target_os = "freebsd")]
fn is_lio(ready: &Ready) -> bool {
UnixReady::from(*ready).is_lio()
}
#[cfg(not(target_os = "freebsd"))]
fn is_lio(_ready: &Ready) -> bool {
false
}
pub fn ready2usize(ready: Ready) -> usize {
let ready = UnixReady::from(ready);
let mut bits = 0;
if is_aio(&ready) {
bits |= AIO;
}
if is_lio(&ready) {
bits |= LIO;
}
if ready.is_error() {
bits |= ERROR;
}
if ready.is_hup() {
bits |= HUP;
}
bits
}
#[cfg(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
))]
fn usize2ready_aio(ready: &mut UnixReady) {
ready.insert(UnixReady::aio());
}
#[cfg(not(any(
target_os = "dragonfly",
target_os = "freebsd",
target_os = "ios",
target_os = "macos"
)))]
fn usize2ready_aio(_ready: &mut UnixReady) {
}
#[cfg(target_os = "freebsd")]
fn usize2ready_lio(ready: &mut UnixReady) {
ready.insert(UnixReady::lio());
}
#[cfg(not(target_os = "freebsd"))]
fn usize2ready_lio(_ready: &mut UnixReady) {
}
pub fn usize2ready(bits: usize) -> Ready {
let mut ready = UnixReady::from(Ready::empty());
if bits & AIO != 0 {
usize2ready_aio(&mut ready);
}
if bits & LIO != 0 {
usize2ready_lio(&mut ready);
}
if bits & HUP != 0 {
ready.insert(UnixReady::hup());
}
if bits & ERROR != 0 {
ready.insert(UnixReady::error());
}
ready.into()
}
}
#[cfg(windows)]
mod platform {
use mio::Ready;
pub fn all() -> Ready {
Ready::empty()
}
pub fn hup() -> Ready {
Ready::empty()
}
pub fn ready2usize(_r: Ready) -> usize {
0
}
pub fn usize2ready(_r: usize) -> Ready {
Ready::empty()
}
}