use crate::future::{assert_future, Either};
use core::pin::Pin;
use futures_core::future::Future;
use futures_core::stream::{Stream, TryStream};
use futures_core::task::{Context, Poll};
#[cfg(feature = "compat")]
use crate::compat::CompatSink;
pub use futures_sink::Sink;
mod close;
pub use self::close::Close;
mod drain;
pub use self::drain::{drain, Drain};
mod fanout;
pub use self::fanout::Fanout;
mod feed;
pub use self::feed::Feed;
mod flush;
pub use self::flush::Flush;
mod err_into;
pub use self::err_into::SinkErrInto;
mod map_err;
pub use self::map_err::SinkMapErr;
mod send;
pub use self::send::Send;
mod send_all;
pub use self::send_all::SendAll;
mod unfold;
pub use self::unfold::{unfold, Unfold};
mod with;
pub use self::with::With;
mod with_flat_map;
pub use self::with_flat_map::WithFlatMap;
#[cfg(feature = "alloc")]
mod buffer;
#[cfg(feature = "alloc")]
pub use self::buffer::Buffer;
impl<T: ?Sized, Item> SinkExt<Item> for T where T: Sink<Item> {}
pub trait SinkExt<Item>: Sink<Item> {
fn with<U, Fut, F, E>(self, f: F) -> With<Self, Item, U, Fut, F>
where
F: FnMut(U) -> Fut,
Fut: Future<Output = Result<Item, E>>,
E: From<Self::Error>,
Self: Sized,
{
assert_sink::<U, E, _>(With::new(self, f))
}
fn with_flat_map<U, St, F>(self, f: F) -> WithFlatMap<Self, Item, U, St, F>
where
F: FnMut(U) -> St,
St: Stream<Item = Result<Item, Self::Error>>,
Self: Sized,
{
assert_sink::<U, Self::Error, _>(WithFlatMap::new(self, f))
}
fn sink_map_err<E, F>(self, f: F) -> SinkMapErr<Self, F>
where
F: FnOnce(Self::Error) -> E,
Self: Sized,
{
assert_sink::<Item, E, _>(SinkMapErr::new(self, f))
}
fn sink_err_into<E>(self) -> err_into::SinkErrInto<Self, Item, E>
where
Self: Sized,
Self::Error: Into<E>,
{
assert_sink::<Item, E, _>(SinkErrInto::new(self))
}
#[cfg(feature = "alloc")]
fn buffer(self, capacity: usize) -> Buffer<Self, Item>
where
Self: Sized,
{
assert_sink::<Item, Self::Error, _>(Buffer::new(self, capacity))
}
fn close(&mut self) -> Close<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Close::new(self))
}
fn fanout<Si>(self, other: Si) -> Fanout<Self, Si>
where
Self: Sized,
Item: Clone,
Si: Sink<Item, Error = Self::Error>,
{
assert_sink::<Item, Self::Error, _>(Fanout::new(self, other))
}
fn flush(&mut self) -> Flush<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Flush::new(self))
}
fn send(&mut self, item: Item) -> Send<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Send::new(self, item))
}
fn feed(&mut self, item: Item) -> Feed<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Result<(), Self::Error>, _>(Feed::new(self, item))
}
fn send_all<'a, St>(&'a mut self, stream: &'a mut St) -> SendAll<'a, Self, St>
where
St: TryStream<Ok = Item, Error = Self::Error> + Stream + Unpin + ?Sized,
Self: Unpin,
{
SendAll::new(self, stream)
}
fn left_sink<Si2>(self) -> Either<Self, Si2>
where
Si2: Sink<Item, Error = Self::Error>,
Self: Sized,
{
assert_sink::<Item, Self::Error, _>(Either::Left(self))
}
fn right_sink<Si1>(self) -> Either<Si1, Self>
where
Si1: Sink<Item, Error = Self::Error>,
Self: Sized,
{
assert_sink::<Item, Self::Error, _>(Either::Right(self))
}
#[cfg(feature = "compat")]
#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
fn compat(self) -> CompatSink<Self, Item>
where
Self: Sized + Unpin,
{
CompatSink::new(self)
}
fn poll_ready_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_ready(cx)
}
fn start_send_unpin(&mut self, item: Item) -> Result<(), Self::Error>
where
Self: Unpin,
{
Pin::new(self).start_send(item)
}
fn poll_flush_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_flush(cx)
}
fn poll_close_unpin(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
where
Self: Unpin,
{
Pin::new(self).poll_close(cx)
}
}
pub(crate) fn assert_sink<T, E, S>(sink: S) -> S
where
S: Sink<T, Error = E>,
{
sink
}