use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::task::{Context, Poll};
#[doc(no_inline)]
pub use futures_core::stream::Stream;
use pin_project_lite::pin_project;
use crate::ready;
pub fn empty<T>() -> Empty<T> {
Empty {
_marker: PhantomData,
}
}
#[derive(Debug)]
pub struct Empty<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Empty<T> {}
impl<T> Stream for Empty<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
}
pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
Iter {
iter: iter.into_iter(),
}
}
#[derive(Debug)]
pub struct Iter<I> {
iter: I,
}
impl<I> Unpin for Iter<I> {}
impl<I: Iterator> Stream for Iter<I> {
type Item = I::Item;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.iter.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
pub fn once<T>(t: T) -> Once<T> {
Once { value: Some(t) }
}
pin_project! {
#[derive(Debug)]
pub struct Once<T> {
value: Option<T>,
}
}
impl<T> Stream for Once<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.project().value.take())
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.value.is_some() {
(1, Some(1))
} else {
(0, Some(0))
}
}
}
pub fn pending<T>() -> Pending<T> {
Pending {
_marker: PhantomData,
}
}
#[derive(Debug)]
pub struct Pending<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Pending<T> {}
impl<T> Stream for Pending<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Pending
}
}
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
PollFn { f }
}
pub struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}
impl<T, F> Stream for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
(&mut self.f)(cx)
}
}
pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
Repeat { item }
}
#[derive(Debug)]
pub struct Repeat<T> {
item: T,
}
impl<T> Unpin for Repeat<T> {}
impl<T: Clone> Stream for Repeat<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(self.item.clone()))
}
}
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> T,
{
RepeatWith { f: repeater }
}
#[derive(Debug)]
pub struct RepeatWith<F> {
f: F,
}
impl<F> Unpin for RepeatWith<F> {}
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> T,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
}
pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
state: Some(seed),
fut: None,
}
}
pin_project! {
pub struct Unfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
let step = ready!(this
.fut
.as_mut()
.as_pin_mut()
.expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
.poll(cx));
this.fut.set(None);
if let Some((item, next_state)) = step {
*this.state = Some(next_state);
Poll::Ready(Some(item))
} else {
Poll::Ready(None)
}
}
}
pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
TryUnfold {
f,
state: Some(init),
fut: None,
}
}
pin_project! {
pub struct TryUnfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryUnfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
type Item = Result<Item, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
match this.fut.as_mut().as_pin_mut() {
None => {
Poll::Ready(None)
}
Some(future) => {
let step = ready!(future.poll(cx));
this.fut.set(None);
match step {
Ok(Some((item, next_state))) => {
*this.state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
}
}
pub trait StreamExt: Stream {
fn next(&mut self) -> NextFuture<'_, Self>
where
Self: Unpin,
{
NextFuture { stream: self }
}
fn collect<C: Default + Extend<Self::Item>>(self) -> CollectFuture<Self, C>
where
Self: Sized,
{
CollectFuture {
stream: self,
collection: Default::default(),
}
}
fn try_collect<T, C: Default + Extend<T>>(self) -> TryCollectFuture<Self, C>
where
Self: Sized,
Self::Item: try_hack::Result<Ok = T>,
{
TryCollectFuture {
stream: self,
items: Default::default(),
}
}
}
impl<T: ?Sized> StreamExt for T where T: Stream {}
#[derive(Debug)]
pub struct NextFuture<'a, T: Unpin + ?Sized> {
stream: &'a mut T,
}
impl<St: ?Sized + Unpin> Unpin for NextFuture<'_, St> {}
impl<T: Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
pin_project! {
#[derive(Debug)]
pub struct CollectFuture<St, C> {
#[pin]
stream: St,
collection: C,
}
}
impl<St, C> Future for CollectFuture<St, C>
where
St: Stream,
C: Default + Extend<St::Item>,
{
type Output = C;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(e) => this.collection.extend(Some(e)),
None => {
return Poll::Ready({
mem::replace(self.project().collection, Default::default())
})
}
}
}
}
}
pin_project! {
#[derive(Debug)]
pub struct TryCollectFuture<St, C> {
#[pin]
stream: St,
items: C,
}
}
impl<T, E, St, C> Future for TryCollectFuture<St, C>
where
St: Stream<Item = Result<T, E>>,
C: Default + Extend<T>,
{
type Output = Result<C, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
None => break mem::replace(this.items, Default::default()),
}
}))
}
}
mod try_hack {
pub trait Result {
type Ok;
type Err;
}
impl<T, E> Result for std::result::Result<T, E> {
type Ok = T;
type Err = E;
}
}