use super::{Forked, StreamExt as _, Weak};
use futures::{
stream::{self, FusedStream},
Stream, StreamExt as _,
};
use insta::assert_debug_snapshot;
use std::task::Poll;
fn create_test_stream() -> impl Stream<Item = i32> {
let mut current_poll = -1;
stream::poll_fn(move |ctx| {
current_poll += 1;
if current_poll == 8 {
Poll::Ready(None)
} else if current_poll % 2 == 0 {
Poll::Ready(Some(current_poll / 2))
} else {
let waker = ctx.waker().clone();
tokio::task::spawn(async move { waker.wake() });
Poll::Pending
}
})
}
#[tokio::test]
async fn single() {
let fork = create_test_stream().fork();
let data = fork.collect::<Vec<_>>().await;
assert_eq!(data, [0, 1, 2, 3]);
}
#[tokio::test]
async fn multiple_concurrently() {
let fork1 = create_test_stream().fork();
let fork2 = fork1.clone();
let concurrent = stream::select(fork1, fork2);
let data = concurrent.collect::<Vec<_>>().await;
assert_eq!(data, [0, 0, 1, 1, 2, 2, 3, 3]);
}
#[tokio::test]
async fn multiple_consequtively() {
let fork1 = create_test_stream().fork();
let fork2 = fork1.clone();
let data = fork1.collect::<Vec<_>>().await;
assert_eq!(data, [0, 1, 2, 3]);
let data = fork2.collect::<Vec<_>>().await;
assert_eq!(data, [0, 1, 2, 3]);
}
#[tokio::test]
async fn fall_behind() {
let mut fork1 = create_test_stream().fork();
let mut fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
assert_eq!(fork2.next().await.unwrap(), 0);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
assert_eq!(fork2.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
}
#[tokio::test]
async fn fork_outpaces_source() {
let mut fork1 = create_test_stream().fork();
let mut fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
assert_eq!(fork2.next().await.unwrap(), 0);
assert_eq!(fork2.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
assert_eq!(fork2.next().await.unwrap(), 2);
assert_eq!(fork2.next().await.unwrap(), 3);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 2,
refs: 1,
},
BufferEntry {
item: 3,
refs: 1,
},
]
"###);
}
#[tokio::test]
async fn fork_created_late() {
let mut fork1 = create_test_stream().fork();
let mut fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
let mut fork3 = fork2.clone();
assert_eq!(fork2.next().await.unwrap(), 0);
assert_eq!(fork2.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
assert_eq!(fork3.next().await.unwrap(), 0);
assert_eq!(fork3.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
}
#[tokio::test]
async fn fork_dropped_late() {
let mut fork1 = create_test_stream().fork();
let fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
drop(fork2);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
}
#[tokio::test]
async fn weak() {
let mut fork1 = create_test_stream().fork();
let fork2 = fork1.clone();
let weak = fork2.downgrade();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
]
"###);
let mut fork3 = weak.upgrade().unwrap();
assert_eq!(fork3.next().await.unwrap(), 2);
assert_eq!(fork3.next().await.unwrap(), 3);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 0,
refs: 1,
},
BufferEntry {
item: 1,
refs: 1,
},
BufferEntry {
item: 2,
refs: 2,
},
BufferEntry {
item: 3,
refs: 2,
},
]
"###);
}
#[tokio::test]
async fn weak_does_not_affect_garbage_collection() {
let mut fork1 = create_test_stream().fork();
let weak = fork1.downgrade();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
drop(weak);
}
#[tokio::test]
async fn weak_falls_behind() {
let mut fork1 = create_test_stream().fork();
let weak = fork1.downgrade();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @"[]");
let fork2 = weak.upgrade().unwrap();
assert_eq!(fork1.next().await.unwrap(), 2);
assert_eq!(fork1.next().await.unwrap(), 3);
assert_debug_snapshot!(fork1.inner.lock().unwrap().buffer.buf, @r###"
[
BufferEntry {
item: 2,
refs: 1,
},
BufferEntry {
item: 3,
refs: 1,
},
]
"###);
drop(fork2);
}
#[test]
fn weak_none() {
let fork1 = create_test_stream().fork();
let weak = fork1.downgrade();
drop(fork1);
assert!(weak.upgrade().is_none());
}
#[tokio::test]
async fn fused_has_data_from_source() {
let mut fork1 = create_test_stream().fork();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_eq!(fork1.next().await.unwrap(), 2);
assert_eq!(fork1.next().await.unwrap(), 3);
assert!(!fork1.is_terminated());
}
#[tokio::test]
async fn fused_has_data_from_buffer() {
let mut fork1 = create_test_stream().fork();
let fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_eq!(fork1.next().await.unwrap(), 2);
assert_eq!(fork1.next().await.unwrap(), 3);
assert_eq!(fork1.next().await, None);
assert!(!fork2.is_terminated());
}
#[tokio::test]
async fn fused_has_no_data() {
let mut fork1 = create_test_stream().fork();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_eq!(fork1.next().await.unwrap(), 2);
assert_eq!(fork1.next().await.unwrap(), 3);
assert_eq!(fork1.next().await, None);
assert!(fork1.is_terminated());
}
#[test]
fn size_hint_from_source() {
let fork1 = stream::iter(0..4).fork();
assert_debug_snapshot!(fork1.size_hint(), @r###"
(
4,
Some(
4,
),
)
"###);
}
#[tokio::test]
async fn test() {
let mut fork1 = stream::iter(0..4).fork();
let fork2 = fork1.clone();
assert_eq!(fork1.next().await.unwrap(), 0);
assert_eq!(fork1.next().await.unwrap(), 1);
assert_debug_snapshot!(fork1.size_hint(), @r###"
(
2,
Some(
2,
),
)
"###);
assert_debug_snapshot!(fork2.size_hint(), @r###"
(
4,
Some(
4,
),
)
"###)
}