use futures_util::io::AsyncRead;
use futures_util::Stream;
use wasm_bindgen::prelude::*;
use wasm_bindgen::JsCast;
pub use byob_reader::ReadableStreamBYOBReader;
pub use default_reader::ReadableStreamDefaultReader;
pub use into_async_read::IntoAsyncRead;
pub use into_stream::IntoStream;
use into_underlying_source::IntoUnderlyingSource;
pub use pipe_options::PipeOptions;
use crate::queuing_strategy::QueuingStrategy;
use crate::readable::into_underlying_byte_source::IntoUnderlyingByteSource;
use crate::util::promise_to_void_future;
use crate::writable::WritableStream;
mod byob_reader;
mod default_reader;
mod into_async_read;
mod into_stream;
mod into_underlying_byte_source;
mod into_underlying_source;
mod pipe_options;
pub mod sys;
#[derive(Debug)]
pub struct ReadableStream {
raw: sys::ReadableStream,
}
impl ReadableStream {
#[inline]
pub fn from_raw(raw: sys::ReadableStream) -> Self {
Self { raw }
}
pub fn from_stream<St>(stream: St) -> Self
where
St: Stream<Item = Result<JsValue, JsValue>> + 'static,
{
let source = IntoUnderlyingSource::new(Box::new(stream));
let strategy = QueuingStrategy::new(0.0);
let raw = sys::ReadableStream::new_with_source(source, strategy);
Self { raw }
}
pub fn from_async_read<R>(async_read: R, default_buffer_len: usize) -> Self
where
R: AsyncRead + 'static,
{
let source = IntoUnderlyingByteSource::new(Box::new(async_read), default_buffer_len);
let raw = sys::ReadableStream::new_with_byte_source(source)
.expect_throw("readable byte streams not supported");
Self { raw }
}
#[inline]
pub fn as_raw(&self) -> &sys::ReadableStream {
&self.raw
}
#[inline]
pub fn into_raw(self) -> sys::ReadableStream {
self.raw
}
#[inline]
pub fn is_locked(&self) -> bool {
self.as_raw().is_locked()
}
pub async fn cancel(&mut self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel()).await
}
pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
}
#[inline]
pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
self.try_get_reader()
.expect_throw("already locked to a reader")
}
pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
ReadableStreamDefaultReader::new(self)
}
pub fn get_byob_reader(&mut self) -> ReadableStreamBYOBReader {
self.try_get_byob_reader()
.expect_throw("already locked to a reader, or not a readable byte stream")
}
pub fn try_get_byob_reader(&mut self) -> Result<ReadableStreamBYOBReader, js_sys::Error> {
ReadableStreamBYOBReader::new(self)
}
pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
self.pipe_to_with_options(dest, &PipeOptions::default())
.await
}
pub async fn pipe_to_with_options<'a>(
&'a mut self,
dest: &'a mut WritableStream,
options: &PipeOptions,
) -> Result<(), JsValue> {
let promise = self
.as_raw()
.pipe_to(dest.as_raw(), options.clone().into_raw());
promise_to_void_future(promise).await
}
pub fn tee(self) -> (ReadableStream, ReadableStream) {
self.try_tee().expect_throw("already locked to a reader")
}
pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
let branches = self.as_raw().tee().map_err(|err| (err, self))?;
debug_assert_eq!(branches.length(), 2);
let (left, right) = (branches.get(0), branches.get(1));
Ok((
Self::from_raw(left.unchecked_into()),
Self::from_raw(right.unchecked_into()),
))
}
#[inline]
pub fn into_stream(self) -> IntoStream<'static> {
self.try_into_stream()
.expect_throw("already locked to a reader")
}
pub fn try_into_stream(mut self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
let reader = ReadableStreamDefaultReader::new(&mut self).map_err(|err| (err, self))?;
Ok(IntoStream::new(reader, true))
}
#[inline]
pub fn into_async_read(self) -> IntoAsyncRead<'static> {
self.try_into_async_read()
.expect_throw("already locked to a reader, or not a readable byte stream")
}
pub fn try_into_async_read(mut self) -> Result<IntoAsyncRead<'static>, (js_sys::Error, Self)> {
let reader = ReadableStreamBYOBReader::new(&mut self).map_err(|err| (err, self))?;
Ok(IntoAsyncRead::new(reader, true))
}
}
impl<St> From<St> for ReadableStream
where
St: Stream<Item = Result<JsValue, JsValue>> + 'static,
{
#[inline]
fn from(stream: St) -> Self {
Self::from_stream(stream)
}
}