use std::io;
use std::ops::{Deref, DerefMut};
use std::sync::Arc;
use futures::{Async, Poll, Stream, Sink, StartSend, AsyncSink};
use io::Io;
#[derive(Clone)]
pub struct EasyBuf {
buf: Arc<Vec<u8>>,
start: usize,
end: usize,
}
pub struct EasyBufMut<'a> {
buf: &'a mut Vec<u8>,
end: &'a mut usize,
}
impl EasyBuf {
pub fn new() -> EasyBuf {
EasyBuf::with_capacity(8 * 1024)
}
pub fn with_capacity(cap: usize) -> EasyBuf {
EasyBuf {
buf: Arc::new(Vec::with_capacity(cap)),
start: 0,
end: 0,
}
}
fn set_start(&mut self, start: usize) -> &mut EasyBuf {
assert!(start <= self.buf.as_ref().len());
assert!(start <= self.end);
self.start = start;
self
}
fn set_end(&mut self, end: usize) -> &mut EasyBuf {
assert!(end <= self.buf.len());
assert!(self.start <= end);
self.end = end;
self
}
pub fn len(&self) -> usize {
self.end - self.start
}
pub fn as_slice(&self) -> &[u8] {
self.as_ref()
}
pub fn split_off(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_start(idx);
self.set_end(idx);
return other
}
pub fn drain_to(&mut self, at: usize) -> EasyBuf {
let mut other = EasyBuf { buf: self.buf.clone(), ..*self };
let idx = self.start + at;
other.set_end(idx);
self.set_start(idx);
return other
}
pub fn get_mut(&mut self) -> EasyBufMut {
if Arc::get_mut(&mut self.buf).is_some() {
let buf = Arc::get_mut(&mut self.buf).unwrap();
buf.drain(..self.start);
self.start = 0;
return EasyBufMut { buf: buf, end: &mut self.end }
}
let mut v = Vec::with_capacity(self.buf.capacity());
v.extend_from_slice(self.as_ref());
self.start = 0;
self.buf = Arc::new(v);
EasyBufMut {
buf: Arc::get_mut(&mut self.buf).unwrap(),
end: &mut self.end,
}
}
}
impl AsRef<[u8]> for EasyBuf {
fn as_ref(&self) -> &[u8] {
&self.buf[self.start..self.end]
}
}
impl<'a> Deref for EasyBufMut<'a> {
type Target = Vec<u8>;
fn deref(&self) -> &Vec<u8> {
self.buf
}
}
impl<'a> DerefMut for EasyBufMut<'a> {
fn deref_mut(&mut self) -> &mut Vec<u8> {
self.buf
}
}
impl From<Vec<u8>> for EasyBuf {
fn from(vec: Vec<u8>) -> EasyBuf {
let end = vec.len();
EasyBuf {
buf: Arc::new(vec),
start: 0,
end: end,
}
}
}
impl<'a> Drop for EasyBufMut<'a> {
fn drop(&mut self) {
*self.end = self.buf.len();
}
}
pub trait Codec {
type In;
type Out;
fn decode(&mut self, buf: &mut EasyBuf) -> Result<Option<Self::In>, io::Error>;
fn decode_eof(&mut self, buf: &mut EasyBuf) -> io::Result<Self::In> {
match try!(self.decode(buf)) {
Some(frame) => Ok(frame),
None => Err(io::Error::new(io::ErrorKind::Other,
"bytes remaining on stream")),
}
}
fn encode(&mut self, msg: Self::Out, buf: &mut Vec<u8>) -> io::Result<()>;
}
pub struct Framed<T, C> {
upstream: T,
codec: C,
eof: bool,
is_readable: bool,
rd: EasyBuf,
wr: Vec<u8>,
}
impl<T: Io, C: Codec> Stream for Framed<T, C> {
type Item = C::In;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<C::In>, io::Error> {
loop {
if self.is_readable {
if self.eof {
if self.rd.len() == 0 {
return Ok(None.into())
} else {
let frame = try!(self.codec.decode_eof(&mut self.rd));
return Ok(Async::Ready(Some(frame)))
}
}
trace!("attempting to decode a frame");
if let Some(frame) = try!(self.codec.decode(&mut self.rd)) {
trace!("frame decoded from buffer");
return Ok(Async::Ready(Some(frame)));
}
self.is_readable = false;
}
assert!(!self.eof);
let before = self.rd.len();
let ret = self.upstream.read_to_end(&mut self.rd.get_mut());
match ret {
Ok(_n) => self.eof = true,
Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
if self.rd.len() == before {
return Ok(Async::NotReady)
}
}
Err(e) => return Err(e),
}
self.is_readable = true;
}
}
}
impl<T: Io, C: Codec> Sink for Framed<T, C> {
type SinkItem = C::Out;
type SinkError = io::Error;
fn start_send(&mut self, item: C::Out) -> StartSend<C::Out, io::Error> {
if self.wr.len() > 8 * 1024 {
try!(self.poll_complete());
if self.wr.len() > 8 * 1024 {
return Ok(AsyncSink::NotReady(item));
}
}
try!(self.codec.encode(item, &mut self.wr));
Ok(AsyncSink::Ready)
}
fn poll_complete(&mut self) -> Poll<(), io::Error> {
trace!("flushing framed transport");
while !self.wr.is_empty() {
trace!("writing; remaining={}", self.wr.len());
let n = try_nb!(self.upstream.write(&self.wr));
if n == 0 {
return Err(io::Error::new(io::ErrorKind::WriteZero,
"failed to write frame to transport"));
}
self.wr.drain(..n);
}
try_nb!(self.upstream.flush());
trace!("framed transport flushed");
return Ok(Async::Ready(()));
}
}
pub fn framed<T, C>(io: T, codec: C) -> Framed<T, C> {
Framed {
upstream: io,
codec: codec,
eof: false,
is_readable: false,
rd: EasyBuf::new(),
wr: Vec::with_capacity(8 * 1024),
}
}
impl<T, C> Framed<T, C> {
pub fn get_ref(&self) -> &T {
&self.upstream
}
pub fn get_mut(&mut self) -> &mut T {
&mut self.upstream
}
pub fn into_inner(self) -> T {
self.upstream
}
}