use super::Result;
use bytes::Bytes;
use futures::{stream::StreamExt, Stream};
#[cfg(any(feature = "aws", feature = "gcp", feature = "azure"))]
pub fn format_prefix(prefix: Option<&crate::path::Path>) -> Option<String> {
prefix
.filter(|x| !x.as_ref().is_empty())
.map(|p| format!("{}{}", p.as_ref(), crate::path::DELIMITER))
}
#[cfg(any(feature = "aws", feature = "gcp"))]
pub fn format_http_range(range: std::ops::Range<usize>) -> String {
format!("bytes={}-{}", range.start, range.end.saturating_sub(1))
}
pub async fn collect_bytes<S>(mut stream: S, size_hint: Option<usize>) -> Result<Bytes>
where
S: Stream<Item = Result<Bytes>> + Send + Unpin,
{
let first = stream.next().await.transpose()?.unwrap_or_default();
match stream.next().await.transpose()? {
None => Ok(first),
Some(second) => {
let size_hint = size_hint.unwrap_or_else(|| first.len() + second.len());
let mut buf = Vec::with_capacity(size_hint);
buf.extend_from_slice(&first);
buf.extend_from_slice(&second);
while let Some(maybe_bytes) = stream.next().await {
buf.extend_from_slice(&maybe_bytes?);
}
Ok(buf.into())
}
}
}
pub async fn maybe_spawn_blocking<F, T>(f: F) -> Result<T>
where
F: FnOnce() -> Result<T> + Send + 'static,
T: Send + 'static,
{
match tokio::runtime::Handle::try_current() {
Ok(runtime) => runtime.spawn_blocking(f).await?,
Err(_) => f(),
}
}
pub const OBJECT_STORE_COALESCE_DEFAULT: usize = 1024 * 1024;
pub async fn coalesce_ranges<F, Fut>(
ranges: &[std::ops::Range<usize>],
mut fetch: F,
coalesce: usize,
) -> Result<Vec<Bytes>>
where
F: Send + FnMut(std::ops::Range<usize>) -> Fut,
Fut: std::future::Future<Output = Result<Bytes>> + Send,
{
let mut ret = Vec::with_capacity(ranges.len());
let mut start_idx = 0;
let mut end_idx = 1;
while start_idx != ranges.len() {
while end_idx != ranges.len()
&& ranges[end_idx]
.start
.checked_sub(ranges[start_idx].end)
.map(|delta| delta <= coalesce)
.unwrap_or(false)
{
end_idx += 1;
}
let start = ranges[start_idx].start;
let end = ranges[end_idx - 1].end;
let bytes = fetch(start..end).await?;
for range in ranges.iter().take(end_idx).skip(start_idx) {
ret.push(bytes.slice(range.start - start..range.end - start))
}
start_idx = end_idx;
end_idx += 1;
}
Ok(ret)
}
#[cfg(test)]
mod tests {
use super::*;
use std::ops::Range;
#[tokio::test]
async fn test_coalesce_ranges() {
let do_fetch = |ranges: Vec<Range<usize>>, coalesce: usize| async move {
let max = ranges.iter().map(|x| x.end).max().unwrap_or(0);
let src: Vec<_> = (0..max).map(|x| x as u8).collect();
let mut fetches = vec![];
let coalesced = coalesce_ranges(
&ranges,
|range| {
fetches.push(range.clone());
futures::future::ready(Ok(Bytes::from(src[range].to_vec())))
},
coalesce,
)
.await
.unwrap();
assert_eq!(ranges.len(), coalesced.len());
for (range, bytes) in ranges.iter().zip(coalesced) {
assert_eq!(bytes.as_ref(), &src[range.clone()]);
}
fetches
};
let fetches = do_fetch(vec![], 0).await;
assert_eq!(fetches, vec![]);
let fetches = do_fetch(vec![0..3], 0).await;
assert_eq!(fetches, vec![0..3]);
let fetches = do_fetch(vec![0..2, 3..5], 0).await;
assert_eq!(fetches, vec![0..2, 3..5]);
let fetches = do_fetch(vec![0..1, 1..2], 0).await;
assert_eq!(fetches, vec![0..2]);
let fetches = do_fetch(vec![0..1, 2..72], 1).await;
assert_eq!(fetches, vec![0..72]);
let fetches = do_fetch(vec![0..1, 56..72, 73..75], 1).await;
assert_eq!(fetches, vec![0..1, 56..75]);
let fetches = do_fetch(vec![0..1, 5..6, 7..9, 2..3, 4..6], 1).await;
assert_eq!(fetches, vec![0..1, 5..9, 2..6]);
}
}