use std::marker::PhantomData;
use futures::stream::Stream;
use wasm_bindgen::prelude::*;
use wasm_bindgen::{throw_val, JsCast};
use wasm_bindgen_futures::JsFuture;
pub use into_stream::IntoStream;
use into_underlying_source::IntoUnderlyingSource;
pub use pipe_options::PipeOptions;
use crate::queuing_strategy::QueuingStrategy;
use crate::util::promise_to_void_future;
use crate::writable::WritableStream;
mod into_stream;
mod into_underlying_source;
mod pipe_options;
pub mod sys;
#[derive(Debug)]
pub struct ReadableStream {
raw: sys::ReadableStream,
}
impl ReadableStream {
#[inline]
pub fn from_raw(raw: sys::ReadableStream) -> Self {
Self { raw }
}
pub fn from_stream<St>(stream: St) -> Self
where
St: Stream<Item = Result<JsValue, JsValue>> + 'static,
{
let source = IntoUnderlyingSource::new(Box::new(stream));
let strategy = QueuingStrategy::new(0.0);
let raw = sys::ReadableStream::new_with_source(source, strategy);
Self { raw }
}
#[inline]
pub fn as_raw(&self) -> &sys::ReadableStream {
&self.raw
}
#[inline]
pub fn into_raw(self) -> sys::ReadableStream {
self.raw
}
#[inline]
pub fn is_locked(&self) -> bool {
self.as_raw().is_locked()
}
pub async fn cancel(&mut self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel()).await
}
pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
}
#[inline]
pub fn get_reader(&mut self) -> ReadableStreamDefaultReader {
self.try_get_reader()
.expect_throw("already locked to a reader")
}
pub fn try_get_reader(&mut self) -> Result<ReadableStreamDefaultReader, js_sys::Error> {
Ok(ReadableStreamDefaultReader {
raw: self.as_raw().get_reader()?,
_stream: PhantomData,
})
}
pub async fn pipe_to<'a>(&'a mut self, dest: &'a mut WritableStream) -> Result<(), JsValue> {
self.pipe_to_with_options(dest, &PipeOptions::default())
.await
}
pub async fn pipe_to_with_options<'a>(
&'a mut self,
dest: &'a mut WritableStream,
options: &PipeOptions,
) -> Result<(), JsValue> {
let promise = self
.as_raw()
.pipe_to(dest.as_raw(), options.clone().into_raw());
promise_to_void_future(promise).await
}
pub fn tee(self) -> (ReadableStream, ReadableStream) {
self.try_tee().expect_throw("already locked to a reader")
}
pub fn try_tee(self) -> Result<(ReadableStream, ReadableStream), (js_sys::Error, Self)> {
let branches = match self.as_raw().tee() {
Ok(branches) => branches,
Err(err) => return Err((err, self)),
};
debug_assert_eq!(branches.length(), 2);
let (left, right) = (branches.get(0), branches.get(1));
Ok((
Self::from_raw(left.unchecked_into()),
Self::from_raw(right.unchecked_into()),
))
}
#[inline]
pub fn into_stream(self) -> IntoStream<'static> {
self.try_into_stream()
.expect_throw("already locked to a reader")
}
pub fn try_into_stream(self) -> Result<IntoStream<'static>, (js_sys::Error, Self)> {
let raw_reader = match self.as_raw().get_reader() {
Ok(raw_reader) => raw_reader,
Err(err) => return Err((err, self)),
};
let reader = ReadableStreamDefaultReader {
raw: raw_reader,
_stream: PhantomData,
};
Ok(reader.into_stream())
}
}
impl<St> From<St> for ReadableStream
where
St: Stream<Item = Result<JsValue, JsValue>> + 'static,
{
#[inline]
fn from(stream: St) -> Self {
Self::from_stream(stream)
}
}
#[derive(Debug)]
pub struct ReadableStreamDefaultReader<'stream> {
raw: sys::ReadableStreamDefaultReader,
_stream: PhantomData<&'stream mut ReadableStream>,
}
impl<'stream> ReadableStreamDefaultReader<'stream> {
#[inline]
pub fn as_raw(&self) -> &sys::ReadableStreamDefaultReader {
&self.raw
}
pub async fn closed(&self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().closed()).await
}
pub async fn cancel(&mut self) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel()).await
}
pub async fn cancel_with_reason(&mut self, reason: &JsValue) -> Result<(), JsValue> {
promise_to_void_future(self.as_raw().cancel_with_reason(reason)).await
}
pub async fn read(&mut self) -> Result<Option<JsValue>, JsValue> {
let promise = self.as_raw().read();
let js_value = JsFuture::from(promise).await?;
let result = sys::ReadableStreamReadResult::from(js_value);
if result.is_done() {
Ok(None)
} else {
Ok(Some(result.value()))
}
}
#[inline]
pub fn release_lock(mut self) {
self.release_lock_mut()
}
fn release_lock_mut(&mut self) {
self.as_raw()
.release_lock()
.unwrap_or_else(|error| throw_val(error.into()))
}
#[inline]
pub fn try_release_lock(self) -> Result<(), (js_sys::Error, Self)> {
self.as_raw().release_lock().map_err(|error| (error, self))
}
#[inline]
pub fn into_stream(self) -> IntoStream<'stream> {
IntoStream::new(self)
}
}
impl Drop for ReadableStreamDefaultReader<'_> {
fn drop(&mut self) {
self.release_lock_mut();
}
}