use std::collections::VecDeque;
use std::fmt;
use std::io;
use std::net::{SocketAddr, ToSocketAddrs};
use std::pin::Pin;
use futures::future::BoxFuture;
use futures::io::*;
use futures::prelude::*;
use futures::ready;
use futures::task::{Context, Poll};
#[derive(Debug)]
pub struct TcpStream {
inner: Pin<Box<dyn runtime_raw::TcpStream>>,
}
impl TcpStream {
pub fn connect<A: ToSocketAddrs>(addr: A) -> ConnectFuture {
ConnectFuture {
addrs: Some(addr.to_socket_addrs().map(|iter| iter.collect())),
last_err: None,
future: None,
runtime: runtime_raw::current_runtime(),
}
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
pub fn peer_addr(&self) -> io::Result<SocketAddr> {
self.inner.peer_addr()
}
pub fn shutdown(&self, how: std::net::Shutdown) -> std::io::Result<()> {
self.inner.shutdown(how)
}
}
impl AsyncRead for TcpStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_read(cx, buf)
}
fn poll_read_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &mut [IoSliceMut<'_>],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_read_vectored(cx, bufs)
}
}
impl AsyncWrite for TcpStream {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.as_mut().poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.inner.as_mut().poll_close(cx)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
self.inner.as_mut().poll_write_vectored(cx, bufs)
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ConnectFuture {
addrs: Option<io::Result<VecDeque<SocketAddr>>>,
last_err: Option<io::Error>,
future: Option<BoxFuture<'static, io::Result<Pin<Box<dyn runtime_raw::TcpStream>>>>>,
runtime: &'static dyn runtime_raw::Runtime,
}
impl Future for ConnectFuture {
type Output = io::Result<TcpStream>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
if let Some(future) = self.future.as_mut() {
match future.as_mut().poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Ok(inner)) => return Poll::Ready(Ok(TcpStream { inner })),
Poll::Ready(Err(err)) => self.last_err = Some(err),
}
}
let addrs = match self.addrs.as_mut().expect("polled a completed future") {
Ok(addrs) => addrs,
Err(_) => {
return Poll::Ready(Err(self.addrs.take().unwrap().err().unwrap()));
}
};
let addr = match addrs.pop_front() {
Some(addr) => addr,
None => {
let err = self.last_err.take().unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
});
return Poll::Ready(Err(err));
}
};
self.future = Some(self.runtime.connect_tcp_stream(&addr));
}
}
}
impl fmt::Debug for ConnectFuture {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Connect")
.field("addrs", &self.addrs)
.finish()
}
}
#[derive(Debug)]
pub struct TcpListener {
inner: Pin<Box<dyn runtime_raw::TcpListener>>,
}
impl TcpListener {
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
let mut last_err = None;
for addr in addr.to_socket_addrs()? {
match runtime_raw::current_runtime().bind_tcp_listener(&addr) {
Ok(inner) => return Ok(TcpListener { inner }),
Err(e) => last_err = Some(e),
}
}
Err(last_err.unwrap_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"could not resolve to any addresses",
)
}))
}
pub fn local_addr(&self) -> io::Result<SocketAddr> {
self.inner.local_addr()
}
pub fn incoming(&mut self) -> IncomingStream<'_> {
IncomingStream { inner: self }
}
pub fn accept(&mut self) -> AcceptFuture<'_> {
let incoming = self.incoming();
AcceptFuture { inner: incoming }
}
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
#[derive(Debug)]
pub struct AcceptFuture<'stream> {
inner: IncomingStream<'stream>,
}
impl<'stream> Future for AcceptFuture<'stream> {
type Output = io::Result<(TcpStream, SocketAddr)>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let stream = ready!(self.inner.poll_next_unpin(cx)).unwrap()?;
let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
}
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct IncomingStream<'listener> {
inner: &'listener mut TcpListener,
}
impl<'listener> Stream for IncomingStream<'listener> {
type Item = io::Result<TcpStream>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let inner = ready!(self.inner.inner.as_mut().poll_accept(cx)?);
Poll::Ready(Some(Ok(TcpStream { inner })))
}
}
#[cfg(unix)]
mod sys {
use super::{TcpListener, TcpStream};
use std::os::unix::prelude::*;
impl AsRawFd for TcpListener {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
impl AsRawFd for TcpStream {
fn as_raw_fd(&self) -> RawFd {
self.inner.as_raw_fd()
}
}
}