[go: up one dir, main page]

actix-http 3.0.0-rc.1

HTTP primitives for the Actix ecosystem
Documentation
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use actix_codec::{AsyncRead, AsyncWrite, Framed};
use pin_project_lite::pin_project;

use crate::{
    body::{BodySize, MessageBody},
    h1::{Codec, Message},
    Error, Response,
};

pin_project! {
    /// Send HTTP/1 response
    pub struct SendResponse<T, B> {
        res: Option<Message<(Response<()>, BodySize)>>,

        #[pin]
        body: Option<B>,

        #[pin]
        framed: Option<Framed<T, Codec>>,
    }
}

impl<T, B> SendResponse<T, B>
where
    B: MessageBody,
    B::Error: Into<Error>,
{
    pub fn new(framed: Framed<T, Codec>, response: Response<B>) -> Self {
        let (res, body) = response.into_parts();

        SendResponse {
            res: Some((res, body.size()).into()),
            body: Some(body),
            framed: Some(framed),
        }
    }
}

impl<T, B> Future for SendResponse<T, B>
where
    T: AsyncRead + AsyncWrite + Unpin,
    B: MessageBody,
    B::Error: Into<Error>,
{
    type Output = Result<Framed<T, Codec>, Error>;

    // TODO: rethink if we need loops in polls
    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let mut this = self.as_mut().project();

        let mut body_done = this.body.is_none();
        loop {
            let mut body_ready = !body_done;

            // send body
            if this.res.is_none() && body_ready {
                while body_ready
                    && !body_done
                    && !this
                        .framed
                        .as_ref()
                        .as_pin_ref()
                        .unwrap()
                        .is_write_buf_full()
                {
                    let next = match this.body.as_mut().as_pin_mut().unwrap().poll_next(cx) {
                        Poll::Ready(Some(Ok(item))) => Poll::Ready(Some(item)),
                        Poll::Ready(Some(Err(err))) => return Poll::Ready(Err(err.into())),
                        Poll::Ready(None) => Poll::Ready(None),
                        Poll::Pending => Poll::Pending,
                    };

                    match next {
                        Poll::Ready(item) => {
                            // body is done when item is None
                            body_done = item.is_none();
                            if body_done {
                                this.body.set(None);
                            }
                            let framed = this.framed.as_mut().as_pin_mut().unwrap();
                            framed
                                .write(Message::Chunk(item))
                                .map_err(|err| Error::new_send_response().with_cause(err))?;
                        }
                        Poll::Pending => body_ready = false,
                    }
                }
            }

            let framed = this.framed.as_mut().as_pin_mut().unwrap();

            // flush write buffer
            if !framed.is_write_buf_empty() {
                match framed
                    .flush(cx)
                    .map_err(|err| Error::new_send_response().with_cause(err))?
                {
                    Poll::Ready(_) => {
                        if body_ready {
                            continue;
                        } else {
                            return Poll::Pending;
                        }
                    }
                    Poll::Pending => return Poll::Pending,
                }
            }

            // send response
            if let Some(res) = this.res.take() {
                framed
                    .write(res)
                    .map_err(|err| Error::new_send_response().with_cause(err))?;
                continue;
            }

            if !body_done {
                if body_ready {
                    continue;
                } else {
                    return Poll::Pending;
                }
            } else {
                break;
            }
        }

        let framed = this.framed.take().unwrap();

        Poll::Ready(Ok(framed))
    }
}