use futures::{Async, Future, Poll};
use mio::unix::UnixReady;
use mio_aio;
pub use mio_aio::{BufRef, LioError};
use nix;
use tokio::reactor::{Handle, PollEvented2};
use std::{fs, io, mem};
use std::borrow::{Borrow, BorrowMut};
use std::os::unix::io::AsRawFd;
use std::path::Path;
#[derive(Debug)]
enum AioOp {
Fsync(PollEvented2<mio_aio::AioCb<'static>>),
Read(PollEvented2<mio_aio::AioCb<'static>>),
Write(PollEvented2<mio_aio::AioCb<'static>>),
}
#[derive(Debug)]
enum AioState {
Allocated,
InProgress,
Incomplete,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct AioFut {
op: AioOp,
state: AioState,
}
impl AioFut {
#[doc(hidden)]
fn aio_return(&mut self) -> Result<Option<isize>, nix::Error> {
match self.op {
AioOp::Fsync(ref io) =>
io.get_ref().aio_return().map(|_| None),
AioOp::Read(ref io) =>
io.get_ref().aio_return().map(|x| Some(x)),
AioOp::Write(ref io) =>
io.get_ref().aio_return().map(|x| Some(x)),
}
}
}
pub struct AioResult {
pub value: Option<isize>,
pub buf: BufRef
}
impl AioResult {
pub fn into_buf_ref(self) -> BufRef {
self.buf
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct LioFut {
op: Option<PollEvented2<mio_aio::LioCb>>,
state: AioState,
}
impl Future for LioFut {
type Item = Box<Iterator<Item = AioResult>>;
type Error = nix::Error;
fn poll(&mut self) -> Poll<Box<Iterator<Item = AioResult>>, nix::Error> {
if let AioState::Allocated = self.state {
let result = self.op.as_mut().unwrap().get_mut().submit();
match result {
Ok(()) => self.state = AioState::InProgress,
Err(LioError::EINCOMPLETE) => {
self.state = AioState::Incomplete;
},
Err(LioError::EAGAIN) =>
return Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)),
Err(LioError::EIO) =>
return Err(nix::Error::Sys(nix::errno::Errno::EIO)),
}
}
let poll_result = self.op
.as_mut()
.unwrap()
.poll_read_ready(UnixReady::lio().into())
.unwrap();
if poll_result == Async::NotReady {
return Ok(Async::NotReady);
}
if let AioState::Incomplete = self.state {
let result = self.op.as_mut().unwrap().get_mut().resubmit();
self.op
.as_mut()
.unwrap()
.clear_read_ready(UnixReady::lio().into())
.unwrap();
match result {
Ok(()) => {
self.state = AioState::InProgress;
return Ok(Async::NotReady);
},
Err(LioError::EINCOMPLETE) => {
return Ok(Async::NotReady);
},
Err(LioError::EAGAIN) =>
return Err(nix::Error::Sys(nix::errno::Errno::EAGAIN)),
Err(LioError::EIO) =>
return Err(nix::Error::Sys(nix::errno::Errno::EIO)),
}
}
let mut op = None;
mem::swap(&mut op, &mut self.op);
let iter = op.unwrap().into_inner().unwrap().into_results(|iter| {
iter.map(|lr| {
AioResult{
value: Some(lr.result.expect("aio_return")),
buf: lr.buf_ref
}
})
});
Ok(Async::Ready(Box::new(iter)))
}
}
#[derive(Debug)]
pub struct File {
file: fs::File,
handle: Handle
}
impl File {
pub fn metadata(&self) -> io::Result<fs::Metadata> {
self.file.metadata()
}
pub fn open<P: AsRef<Path>>(path: P, h: Handle) -> io::Result<File> {
fs::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path)
.map(|f| File {file: f, handle: h})
}
pub fn read_at(&self, buf: Box<BorrowMut<[u8]>>,
offset: u64) -> io::Result<AioFut> {
let aiocb = mio_aio::AioCb::from_boxed_mut_slice(self.file.as_raw_fd(),
offset, buf,
0, mio_aio::LioOpcode::LIO_NOP);
Ok(AioFut{
op: AioOp::Read(PollEvented2::new_with_handle(aiocb,
&self.handle)?),
state: AioState::Allocated })
}
pub fn readv_at(&self, mut bufs: Vec<Box<BorrowMut<[u8]>>>,
offset: u64) -> io::Result<LioFut> {
let mut liocb = mio_aio::LioCb::with_capacity(bufs.len());
let mut offs = offset;
for mut buf in bufs.drain(..) {
let buflen = {
let borrowed : &mut BorrowMut<[u8]> = buf.borrow_mut();
let slice : &mut [u8] = borrowed.borrow_mut();
slice.len()
};
liocb.emplace_boxed_mut_slice(self.file.as_raw_fd(), offs, buf,
0, mio_aio::LioOpcode::LIO_READ);
offs += buflen as u64;
};
Ok(LioFut{
op: Some(PollEvented2::new_with_handle(liocb, &self.handle)?),
state: AioState::Allocated })
}
pub fn write_at(&self, buf: Box<Borrow<[u8]>>,
offset: u64) -> io::Result<AioFut> {
let fd = self.file.as_raw_fd();
let aiocb = mio_aio::AioCb::from_boxed_slice(fd, offset, buf, 0,
mio_aio::LioOpcode::LIO_NOP);
Ok(AioFut{
op: AioOp::Write(PollEvented2::new_with_handle(aiocb,
&self.handle)?),
state: AioState::Allocated })
}
pub fn writev_at(&self, mut bufs: Vec<Box<Borrow<[u8]>>>,
offset: u64) -> io::Result<LioFut> {
let mut liocb = mio_aio::LioCb::with_capacity(bufs.len());
let mut offs = offset;
let fd = self.file.as_raw_fd();
for buf in bufs.drain(..) {
let buflen = {
let borrowed : &Borrow<[u8]> = buf.borrow();
let slice : &[u8] = borrowed.borrow();
slice.len()
};
liocb.emplace_boxed_slice(fd, offs, buf, 0,
mio_aio::LioOpcode::LIO_WRITE);
offs += buflen as u64;
};
Ok(LioFut{
op: Some(PollEvented2::new_with_handle(liocb, &self.handle)?),
state: AioState::Allocated })
}
pub fn sync_all(&self) -> io::Result<AioFut> {
let aiocb = mio_aio::AioCb::from_fd(self.file.as_raw_fd(),
0, );
Ok(AioFut{
op: AioOp::Fsync(PollEvented2::new_with_handle(aiocb,
&self.handle)?),
state: AioState::Allocated })
}
}
impl Future for AioFut {
type Item = AioResult;
type Error = nix::Error;
fn poll(&mut self) -> Poll<AioResult, nix::Error> {
if let AioState::Allocated = self.state {
let r = match self.op {
AioOp::Fsync(ref pe) => pe.get_ref()
.fsync(mio_aio::AioFsyncMode::O_SYNC),
AioOp::Read(ref pe) => pe.get_ref().read(),
AioOp::Write(ref pe) => pe.get_ref().write(),
};
if r.is_err() {
return Err(r.unwrap_err());
}
self.state = AioState::InProgress;
}
let poll_result = match self.op {
AioOp::Fsync(ref mut io) =>
io.poll_read_ready(UnixReady::aio().into()),
AioOp::Read(ref mut io) =>
io.poll_read_ready(UnixReady::aio().into()),
AioOp::Write(ref mut io) =>
io.poll_read_ready(UnixReady::aio().into()),
}.unwrap();
if poll_result == Async::NotReady {
return Ok(Async::NotReady);
}
let result = self.aio_return();
let buf = match self.op {
AioOp::Fsync(ref mut op) => op.get_mut().buf_ref(),
AioOp::Read(ref mut op) => op.get_mut().buf_ref(),
AioOp::Write(ref mut op) => op.get_mut().buf_ref(),
};
match result {
Ok(x) => Ok(Async::Ready(AioResult{value: x, buf: buf})),
Err(x) => Err(x)
}
}
}