#[cfg(any(feature = "async-std", feature = "tokio"))]
use crate::async_lib::AsyncRead;
#[cfg(any(feature = "async-std", feature = "tokio"))]
use crate::async_lib::AsyncReadExt;
use crate::content::linkto;
use crate::errors::{Error, IoErrorExt, Result};
use crate::{index, WriteOpts};
use ssri::{Algorithm, Integrity};
use std::io::Read;
use std::path::{Path, PathBuf};
#[cfg(any(feature = "async-std", feature = "tokio"))]
use std::pin::Pin;
#[cfg(any(feature = "async-std", feature = "tokio"))]
use std::task::{Context as TaskContext, Poll};
const BUF_SIZE: usize = 16 * 1024;
const PROBE_SIZE: usize = 8;
#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn link_to<P, K, T>(cache: P, key: K, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
ToLinker::open(cache, key, target).await?.commit().await
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn link_to_hash<P, T>(cache: P, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
ToLinker::open_hash(cache, target).await?.commit().await
}
pub fn link_to_sync<P, K, T>(cache: P, key: K, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
SyncToLinker::open(cache, key, target)?.commit()
}
pub fn link_to_hash_sync<P, T>(cache: P, target: T) -> Result<Integrity>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
SyncToLinker::open_hash(cache, target)?.commit()
}
impl WriteOpts {
#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn link_to<P, K, T>(self, cache: P, key: K, target: T) -> Result<ToLinker>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
async fn inner(
opts: WriteOpts,
cache: &Path,
key: &str,
target: &Path,
) -> Result<ToLinker> {
Ok(ToLinker {
cache: cache.to_path_buf(),
key: Some(String::from(key)),
read: 0,
linker: linkto::AsyncToLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)
.await?,
opts,
})
}
inner(self, cache.as_ref(), key.as_ref(), target.as_ref()).await
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
pub async fn link_to_hash<P, T>(self, cache: P, target: T) -> Result<ToLinker>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
async fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result<ToLinker> {
Ok(ToLinker {
cache: cache.to_path_buf(),
key: None,
read: 0,
linker: linkto::AsyncToLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)
.await?,
opts,
})
}
inner(self, cache.as_ref(), target.as_ref()).await
}
pub fn link_to_sync<P, K, T>(self, cache: P, key: K, target: T) -> Result<SyncToLinker>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
fn inner(opts: WriteOpts, cache: &Path, key: &str, target: &Path) -> Result<SyncToLinker> {
Ok(SyncToLinker {
cache: cache.to_path_buf(),
key: Some(String::from(key)),
read: 0,
linker: linkto::ToLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)?,
opts,
})
}
inner(self, cache.as_ref(), key.as_ref(), target.as_ref())
}
pub fn link_to_hash_sync<P, T>(self, cache: P, target: T) -> Result<SyncToLinker>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
fn inner(opts: WriteOpts, cache: &Path, target: &Path) -> Result<SyncToLinker> {
Ok(SyncToLinker {
cache: cache.to_path_buf(),
key: None,
read: 0,
linker: linkto::ToLinker::new(
cache,
opts.algorithm.unwrap_or(Algorithm::Sha256),
target,
)?,
opts,
})
}
inner(self, cache.as_ref(), target.as_ref())
}
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
pub struct ToLinker {
cache: PathBuf,
key: Option<String>,
read: usize,
pub(crate) linker: linkto::AsyncToLinker,
opts: WriteOpts,
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
impl AsyncRead for ToLinker {
#[cfg(feature = "async-std")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let amt = futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?;
self.read += amt;
Poll::Ready(Ok(amt))
}
#[cfg(feature = "tokio")]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> Poll<tokio::io::Result<()>> {
let pre_len = buf.filled().len();
futures::ready!(Pin::new(&mut self.linker).poll_read(cx, buf))?;
self.read += buf.filled().len() - pre_len;
Poll::Ready(Ok(()))
}
}
fn filesize(target: &Path) -> Result<usize> {
Ok(target
.metadata()
.with_context(|| format!("Failed to get metadata of {}", target.display()))?
.len() as usize)
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
impl ToLinker {
pub async fn open<P, K, T>(cache: P, key: K, target: T) -> Result<Self>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
async fn inner(cache: &Path, key: &str, target: &Path) -> Result<ToLinker> {
let size = filesize(target)?;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_to(cache, key, target)
.await
}
inner(cache.as_ref(), key.as_ref(), target.as_ref()).await
}
pub async fn open_hash<P, T>(cache: P, target: T) -> Result<Self>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
async fn inner(cache: &Path, target: &Path) -> Result<ToLinker> {
let size = filesize(target)?;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_to_hash(cache, target)
.await
}
inner(cache.as_ref(), target.as_ref()).await
}
pub async fn commit(mut self) -> Result<Integrity> {
self.consume().await?;
let linker_sri = self.linker.commit().await?;
if let Some(sri) = &self.opts.sri {
if sri.matches(&linker_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into());
}
} else {
self.opts.sri = Some(linker_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.read {
return Err(Error::SizeMismatch(size, self.read));
}
}
if let Some(key) = self.key {
index::insert(&self.cache, &key, self.opts)
} else {
Ok(linker_sri)
}
}
async fn consume(&mut self) -> Result<()> {
let mut probe = [0; PROBE_SIZE];
if self.context_read(&mut probe).await? > 0 {
let mut buf = [0; BUF_SIZE];
while self.context_read(&mut buf).await? > 0 {}
}
Ok(())
}
async fn context_read(&mut self, buf: &mut [u8]) -> Result<usize> {
AsyncReadExt::read(self, buf).await.with_context(|| {
"Failed to read target file contents while calculating integrity".into()
})
}
}
pub struct SyncToLinker {
cache: PathBuf,
key: Option<String>,
read: usize,
pub(crate) linker: linkto::ToLinker,
opts: WriteOpts,
}
impl std::io::Read for SyncToLinker {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let amt = self.linker.read(buf)?;
self.read += amt;
Ok(amt)
}
}
impl SyncToLinker {
pub fn open<P, K, T>(cache: P, key: K, target: T) -> Result<Self>
where
P: AsRef<Path>,
K: AsRef<str>,
T: AsRef<Path>,
{
fn inner(cache: &Path, key: &str, target: &Path) -> Result<SyncToLinker> {
let size = filesize(target)?;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_to_sync(cache, key, target)
}
inner(cache.as_ref(), key.as_ref(), target.as_ref())
}
pub fn open_hash<P, T>(cache: P, target: T) -> Result<Self>
where
P: AsRef<Path>,
T: AsRef<Path>,
{
fn inner(cache: &Path, target: &Path) -> Result<SyncToLinker> {
let size = filesize(target)?;
WriteOpts::new()
.algorithm(Algorithm::Sha256)
.size(size)
.link_to_hash_sync(cache, target)
}
inner(cache.as_ref(), target.as_ref())
}
pub fn commit(mut self) -> Result<Integrity> {
self.consume()?;
let cache = self.cache;
let linker_sri = self.linker.commit()?;
if let Some(sri) = &self.opts.sri {
if sri.matches(&linker_sri).is_none() {
return Err(ssri::Error::IntegrityCheckError(sri.clone(), linker_sri).into());
}
} else {
self.opts.sri = Some(linker_sri.clone());
}
if let Some(size) = self.opts.size {
if size != self.read {
return Err(Error::SizeMismatch(size, self.read));
}
}
if let Some(key) = self.key {
index::insert(&cache, &key, self.opts)
} else {
Ok(linker_sri)
}
}
fn consume(&mut self) -> Result<()> {
let mut probe = [0; PROBE_SIZE];
if self.context_read(&mut probe)? > 0 {
let mut buf = [0; BUF_SIZE];
while self.context_read(&mut buf)? > 0 {}
}
Ok(())
}
fn context_read(&mut self, buf: &mut [u8]) -> Result<usize> {
self.read(buf).with_context(|| {
"Failed to read target file contents while calculating integrity".into()
})
}
}
#[cfg(test)]
mod tests {
use std::fs::File;
use std::io::Write;
use super::*;
#[cfg(feature = "async-std")]
use async_attributes::test as async_test;
#[cfg(feature = "tokio")]
use tokio::test as async_test;
fn create_tmpfile(tmp: &tempfile::TempDir, buf: &[u8]) -> PathBuf {
let dir = tmp.path().to_owned();
let target = dir.join("target-file");
std::fs::create_dir_all(target.parent().unwrap()).unwrap();
let mut file = File::create(&target).unwrap();
file.write_all(buf).unwrap();
file.flush().unwrap();
target
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
#[async_test]
async fn test_link() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::link_to(&dir, "my-key", target).await.unwrap();
let buf = crate::read(&dir, "my-key").await.unwrap();
assert_eq!(buf, b"hello world");
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
#[async_test]
async fn test_link_to_hash() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::link_to_hash(&dir, target).await.unwrap();
let buf = crate::read_hash(&dir, &sri).await.unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_link_to_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
crate::link_to_sync(&dir, "my-key", target).unwrap();
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_link_to_hash_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let sri = crate::link_to_hash_sync(&dir, target).unwrap();
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
#[async_test]
async fn test_open() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::ToLinker::open(&dir, "my-key", target).await.unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).await.unwrap();
handle.commit().await.unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[cfg(any(feature = "async-std", feature = "tokio"))]
#[async_test]
async fn test_open_hash() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::ToLinker::open_hash(&dir, target).await.unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).await.unwrap();
let sri = handle.commit().await.unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_open_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::SyncToLinker::open(&dir, "my-key", target).unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).unwrap();
handle.commit().unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_sync(&dir, "my-key").unwrap();
assert_eq!(buf, b"hello world");
}
#[test]
fn test_open_hash_sync() {
let tmp = tempfile::tempdir().unwrap();
let target = create_tmpfile(&tmp, b"hello world");
let tmp = tempfile::tempdir().unwrap();
let dir = tmp.path().to_owned();
let mut handle = crate::SyncToLinker::open_hash(&dir, target).unwrap();
let mut buf = Vec::new();
handle.read_to_end(&mut buf).unwrap();
let sri = handle.commit().unwrap();
assert_eq!(buf, b"hello world");
let buf = crate::read_hash_sync(&dir, &sri).unwrap();
assert_eq!(buf, b"hello world");
}
}