use std::{
pin::Pin,
task::{Context, Poll},
};
use futures_core::Stream;
use pin_project::pin_project;
pub trait StreamExt: Stream {
fn merge<U>(self, other: U) -> Merge<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Merge::new(self, other)
}
}
impl<S> StreamExt for S where S: Stream {}
#[pin_project]
#[derive(Debug)]
pub struct Merge<L, R> {
#[pin]
left: Fuse<L>,
#[pin]
right: Fuse<R>,
}
impl<L: Stream, R: Stream> Merge<L, R> {
pub(crate) fn new(left: L, right: R) -> Self {
Self {
left: Fuse::new(left),
right: Fuse::new(right),
}
}
}
impl<L, R, T> Stream for Merge<L, R>
where
L: Stream<Item = T>,
R: Stream<Item = T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if random(2) == 0 {
poll_next_in_order(this.left, this.right, cx)
} else {
poll_next_in_order(this.right, this.left, cx)
}
}
}
fn poll_next_in_order<F, S, I>(
first: Pin<&mut F>,
second: Pin<&mut S>,
cx: &mut Context<'_>,
) -> Poll<Option<I>>
where
F: Stream<Item = I>,
S: Stream<Item = I>,
{
match first.poll_next(cx) {
Poll::Ready(None) => second.poll_next(cx),
Poll::Ready(item) => Poll::Ready(item),
Poll::Pending => match second.poll_next(cx) {
Poll::Ready(None) | Poll::Pending => Poll::Pending,
Poll::Ready(item) => Poll::Ready(item),
},
}
}
pub(crate) fn random(n: u32) -> u32 {
use std::cell::Cell;
use std::num::Wrapping;
thread_local! {
static RNG: Cell<Wrapping<u32>> = {
let mut x = 0i32;
let r = &mut x;
let addr = r as *mut i32 as usize;
Cell::new(Wrapping(addr as u32))
}
}
RNG.with(|rng| {
let mut x = rng.get();
x ^= x << 13;
x ^= x >> 17;
x ^= x << 5;
rng.set(x);
((u64::from(x.0)).wrapping_mul(u64::from(n)) >> 32) as u32
})
}
#[pin_project]
#[derive(Clone, Debug)]
pub struct Fuse<S> {
#[pin]
pub(crate) stream: S,
pub(crate) done: bool,
}
impl<S> Fuse<S> {
pub(super) fn new(stream: S) -> Self {
Self {
stream,
done: false,
}
}
}
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
let next = futures_core::ready!(this.stream.poll_next(cx));
if next.is_none() {
*this.done = true;
}
Poll::Ready(next)
}
}
}