use std::collections::VecDeque;
use std::future::Future;
use std::pin::Pin;
use std::task::{self, Poll, Waker};
use std::time::{Duration, Instant};
use std::{cmp, io};
use futures_core::ready;
use tokio_io::{AsyncRead, AsyncWrite, Buf};
use tokio_sync::mpsc;
use tokio_timer::{clock, timer, Delay};
#[derive(Debug)]
pub struct Mock {
inner: Inner,
}
#[derive(Debug)]
pub struct Handle {
tx: mpsc::UnboundedSender<Action>,
}
#[derive(Debug, Clone, Default)]
pub struct Builder {
actions: VecDeque<Action>,
}
#[derive(Debug, Clone)]
enum Action {
Read(Vec<u8>),
Write(Vec<u8>),
Wait(Duration),
}
#[derive(Debug)]
struct Inner {
actions: VecDeque<Action>,
waiting: Option<Instant>,
timer_handle: timer::Handle,
sleep: Option<Delay>,
read_wait: Option<Waker>,
rx: mpsc::UnboundedReceiver<Action>,
}
impl Builder {
pub fn new() -> Self {
Self::default()
}
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Read(buf.into()));
self
}
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.actions.push_back(Action::Write(buf.into()));
self
}
pub fn wait(&mut self, duration: Duration) -> &mut Self {
let duration = cmp::max(duration, Duration::from_millis(1));
self.actions.push_back(Action::Wait(duration));
self
}
pub fn build(&mut self) -> Mock {
let (mock, _) = self.build_with_handle();
mock
}
pub fn build_with_handle(&mut self) -> (Mock, Handle) {
let (inner, handle) = Inner::new(self.actions.clone());
let mock = Mock { inner };
(mock, handle)
}
}
impl Handle {
pub fn read(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Read(buf.into())).unwrap();
self
}
pub fn write(&mut self, buf: &[u8]) -> &mut Self {
self.tx.try_send(Action::Write(buf.into())).unwrap();
self
}
}
impl Inner {
fn new(actions: VecDeque<Action>) -> (Inner, Handle) {
let (tx, rx) = mpsc::unbounded_channel();
let inner = Inner {
actions,
timer_handle: timer::Handle::default(),
sleep: None,
read_wait: None,
rx,
waiting: None,
};
let handle = Handle { tx };
(inner, handle)
}
fn poll_action(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<Action>> {
self.rx.poll_recv(cx)
}
fn read(&mut self, dst: &mut [u8]) -> io::Result<usize> {
match self.action() {
Some(&mut Action::Read(ref mut data)) => {
let n = cmp::min(dst.len(), data.len());
(&mut dst[..n]).copy_from_slice(&data[..n]);
data.drain(..n);
Ok(n)
}
Some(_) => {
Err(io::ErrorKind::WouldBlock.into())
}
None => Ok(0),
}
}
fn write(&mut self, mut src: &[u8]) -> io::Result<usize> {
let mut ret = 0;
if self.actions.is_empty() {
return Err(io::ErrorKind::BrokenPipe.into());
}
if let Some(&mut Action::Wait(..)) = self.action() {
return Err(io::ErrorKind::WouldBlock.into());
}
for i in 0..self.actions.len() {
match self.actions[i] {
Action::Write(ref mut expect) => {
let n = cmp::min(src.len(), expect.len());
assert_eq!(&src[..n], &expect[..n]);
expect.drain(..n);
src = &src[n..];
ret += n;
if src.is_empty() {
return Ok(ret);
}
}
Action::Wait(..) => {
break;
}
_ => {}
}
}
Ok(ret)
}
fn remaining_wait(&mut self) -> Option<Duration> {
match self.action() {
Some(&mut Action::Wait(dur)) => Some(dur),
_ => None,
}
}
fn action(&mut self) -> Option<&mut Action> {
loop {
if self.actions.is_empty() {
return None;
}
match self.actions[0] {
Action::Read(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Write(ref mut data) => {
if !data.is_empty() {
break;
}
}
Action::Wait(ref mut dur) => {
if let Some(until) = self.waiting {
let now = Instant::now();
if now < until {
break;
}
} else {
self.waiting = Some(Instant::now() + *dur);
break;
}
}
}
let _action = self.actions.pop_front();
}
self.actions.front_mut()
}
}
impl Mock {
fn maybe_wakeup_reader(&mut self) {
match self.inner.action() {
Some(&mut Action::Read(_)) | None => {
if let Some(waker) = self.inner.read_wait.take() {
waker.wake();
}
}
_ => {}
}
}
}
impl AsyncRead for Mock {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
loop {
if let Some(ref mut sleep) = self.inner.sleep {
ready!(Pin::new(sleep).poll(cx));
}
self.inner.sleep = None;
match self.inner.read(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = clock::now() + rem;
self.inner.sleep = Some(self.inner.timer_handle.delay(until));
} else {
self.inner.read_wait = Some(cx.waker().clone());
return Poll::Pending;
}
}
Ok(0) => {
match ready!(self.inner.poll_action(cx)) {
Some(action) => {
self.inner.actions.push_back(action);
continue;
}
None => {
return Poll::Ready(Ok(0));
}
}
}
ret => return Poll::Ready(ret),
}
}
}
}
impl AsyncWrite for Mock {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
loop {
if let Some(ref mut sleep) = self.inner.sleep {
ready!(Pin::new(sleep).poll(cx));
}
self.inner.sleep = None;
match self.inner.write(buf) {
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if let Some(rem) = self.inner.remaining_wait() {
let until = clock::now() + rem;
self.inner.sleep = Some(self.inner.timer_handle.delay(until));
} else {
panic!("unexpected WouldBlock");
}
}
Ok(0) => {
if !self.inner.actions.is_empty() {
return Poll::Pending;
}
match ready!(self.inner.poll_action(cx)) {
Some(action) => {
self.inner.actions.push_back(action);
continue;
}
None => {
panic!("unexpected write");
}
}
}
ret => {
self.maybe_wakeup_reader();
return Poll::Ready(ret);
}
}
}
}
fn poll_write_buf<B: Buf>(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
buf: &mut B,
) -> Poll<io::Result<usize>> {
let n = ready!(self.poll_write(cx, buf.bytes()))?;
buf.advance(n);
Poll::Ready(Ok(n))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
Poll::Ready(Ok(()))
}
}