1use std::io::prelude::*;
3use std::path::{Path, PathBuf};
4#[cfg(any(feature = "async-std", feature = "tokio"))]
5use std::pin::Pin;
6
7use serde_json::Value;
8use ssri::{Algorithm, Integrity};
9
10#[cfg(any(feature = "async-std", feature = "tokio"))]
11use crate::async_lib::{AsyncWrite, AsyncWriteExt};
12use crate::content::write;
13use crate::errors::{Error, IoErrorExt, Result};
14use crate::index;
15
16#[cfg(any(feature = "async-std", feature = "tokio"))]
17use std::task::{Context as TaskContext, Poll};
18
19#[cfg(any(feature = "async-std", feature = "tokio"))]
32pub async fn write<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity>
33where
34 P: AsRef<Path>,
35 D: AsRef<[u8]>,
36 K: AsRef<str>,
37{
38 write_with_algo(Algorithm::Sha256, cache, key, data).await
39}
40
41#[cfg(any(feature = "async-std", feature = "tokio"))]
55pub async fn write_with_algo<P, D, K>(
56 algo: Algorithm,
57 cache: P,
58 key: K,
59 data: D,
60) -> Result<Integrity>
61where
62 P: AsRef<Path>,
63 D: AsRef<[u8]>,
64 K: AsRef<str>,
65{
66 async fn inner(algo: Algorithm, cache: &Path, key: &str, data: &[u8]) -> Result<Integrity> {
67 let mut writer = WriteOpts::new()
68 .algorithm(algo)
69 .size(data.len())
70 .open(cache, key)
71 .await?;
72 writer.write_all(data).await.with_context(|| {
73 format!("Failed to write to cache data for key {key} for cache at {cache:?}")
74 })?;
75 writer.commit().await
76 }
77 inner(algo, cache.as_ref(), key.as_ref(), data.as_ref()).await
78}
79
80#[cfg(any(feature = "async-std", feature = "tokio"))]
93pub async fn write_hash<P, D>(cache: P, data: D) -> Result<Integrity>
94where
95 P: AsRef<Path>,
96 D: AsRef<[u8]>,
97{
98 write_hash_with_algo(Algorithm::Sha256, cache, data).await
99}
100
101#[cfg(any(feature = "async-std", feature = "tokio"))]
115pub async fn write_hash_with_algo<P, D>(algo: Algorithm, cache: P, data: D) -> Result<Integrity>
116where
117 P: AsRef<Path>,
118 D: AsRef<[u8]>,
119{
120 async fn inner(algo: Algorithm, cache: &Path, data: &[u8]) -> Result<Integrity> {
121 let mut writer = WriteOpts::new()
122 .algorithm(algo)
123 .size(data.len())
124 .open_hash(cache)
125 .await?;
126 writer
127 .write_all(data)
128 .await
129 .with_context(|| format!("Failed to write to cache data for cache at {cache:?}"))?;
130 writer.commit().await
131 }
132 inner(algo, cache.as_ref(), data.as_ref()).await
133}
134#[cfg(any(feature = "async-std", feature = "tokio"))]
136pub struct Writer {
137 cache: PathBuf,
138 key: Option<String>,
139 written: usize,
140 pub(crate) writer: write::AsyncWriter,
141 opts: WriteOpts,
142}
143
144#[cfg(any(feature = "async-std", feature = "tokio"))]
145impl AsyncWrite for Writer {
146 fn poll_write(
147 mut self: Pin<&mut Self>,
148 cx: &mut TaskContext<'_>,
149 buf: &[u8],
150 ) -> Poll<std::io::Result<usize>> {
151 let amt = futures::ready!(Pin::new(&mut self.writer).poll_write(cx, buf))?;
152 self.written += amt;
153 Poll::Ready(Ok(amt))
154 }
155
156 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
157 Pin::new(&mut self.writer).poll_flush(cx)
158 }
159
160 #[cfg(feature = "async-std")]
161 fn poll_close(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<std::io::Result<()>> {
162 Pin::new(&mut self.writer).poll_close(cx)
163 }
164
165 #[cfg(feature = "tokio")]
166 fn poll_shutdown(
167 mut self: Pin<&mut Self>,
168 cx: &mut TaskContext<'_>,
169 ) -> Poll<std::io::Result<()>> {
170 Pin::new(&mut self.writer).poll_shutdown(cx)
171 }
172}
173
174#[cfg(any(feature = "async-std", feature = "tokio"))]
175impl Writer {
176 pub async fn create<P, K>(cache: P, key: K) -> Result<Writer>
193 where
194 P: AsRef<Path>,
195 K: AsRef<str>,
196 {
197 Self::create_with_algo(Algorithm::Sha256, cache, key).await
198 }
199
200 pub async fn create_with_algo<P, K>(algo: Algorithm, cache: P, key: K) -> Result<Writer>
218 where
219 P: AsRef<Path>,
220 K: AsRef<str>,
221 {
222 async fn inner(algo: Algorithm, cache: &Path, key: &str) -> Result<Writer> {
223 WriteOpts::new().algorithm(algo).open(cache, key).await
224 }
225 inner(algo, cache.as_ref(), key.as_ref()).await
226 }
227
228 pub async fn commit(mut self) -> Result<Integrity> {
233 let cache = self.cache;
234 let writer_sri = self.writer.close().await?;
235 if let Some(sri) = &self.opts.sri {
236 if sri.matches(&writer_sri).is_none() {
237 return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri).into());
238 }
239 } else {
240 self.opts.sri = Some(writer_sri.clone());
241 }
242 if let Some(size) = self.opts.size {
243 if size != self.written {
244 return Err(Error::SizeMismatch(size, self.written));
245 }
246 }
247 if let Some(key) = self.key {
248 index::insert_async(&cache, &key, self.opts).await
249 } else {
250 Ok(writer_sri)
251 }
252 }
253}
254
255pub fn write_sync<P, D, K>(cache: P, key: K, data: D) -> Result<Integrity>
267where
268 P: AsRef<Path>,
269 D: AsRef<[u8]>,
270 K: AsRef<str>,
271{
272 write_sync_with_algo(Algorithm::Sha256, cache, key, data)
273}
274
275pub fn write_sync_with_algo<P, D, K>(
288 algo: Algorithm,
289 cache: P,
290 key: K,
291 data: D,
292) -> Result<Integrity>
293where
294 P: AsRef<Path>,
295 D: AsRef<[u8]>,
296 K: AsRef<str>,
297{
298 fn inner(algo: Algorithm, cache: &Path, key: &str, data: &[u8]) -> Result<Integrity> {
299 let mut writer = SyncWriter::create_with_algo(algo, cache, key)?;
300 writer.write_all(data).with_context(|| {
301 format!("Failed to write to cache data for key {key} for cache at {cache:?}")
302 })?;
303 writer.written = data.as_ref().len();
304 writer.commit()
305 }
306 inner(algo, cache.as_ref(), key.as_ref(), data.as_ref())
307}
308
309pub fn write_hash_sync<P, D>(cache: P, data: D) -> Result<Integrity>
321where
322 P: AsRef<Path>,
323 D: AsRef<[u8]>,
324{
325 write_hash_sync_with_algo(Algorithm::Sha256, cache, data)
326}
327
328pub fn write_hash_sync_with_algo<P, D>(algo: Algorithm, cache: P, data: D) -> Result<Integrity>
340where
341 P: AsRef<Path>,
342 D: AsRef<[u8]>,
343{
344 fn inner(algo: Algorithm, cache: &Path, data: &[u8]) -> Result<Integrity> {
345 let mut writer = WriteOpts::new()
346 .algorithm(algo)
347 .size(data.len())
348 .open_hash_sync(cache)?;
349 writer
350 .write_all(data)
351 .with_context(|| format!("Failed to write to cache data for cache at {cache:?}"))?;
352 writer.written = data.len();
353 writer.commit()
354 }
355 inner(algo, cache.as_ref(), data.as_ref())
356}
357#[derive(Clone, Default)]
359pub struct WriteOpts {
360 pub(crate) algorithm: Option<Algorithm>,
361 pub(crate) sri: Option<Integrity>,
362 pub(crate) size: Option<usize>,
363 pub(crate) time: Option<u128>,
364 pub(crate) metadata: Option<Value>,
365 pub(crate) raw_metadata: Option<Vec<u8>>,
366}
367
368impl WriteOpts {
369 pub fn new() -> WriteOpts {
371 Default::default()
372 }
373
374 #[cfg(any(feature = "async-std", feature = "tokio"))]
376 pub async fn open<P, K>(self, cache: P, key: K) -> Result<Writer>
377 where
378 P: AsRef<Path>,
379 K: AsRef<str>,
380 {
381 async fn inner(me: WriteOpts, cache: &Path, key: &str) -> Result<Writer> {
382 Ok(Writer {
383 cache: cache.to_path_buf(),
384 key: Some(String::from(key)),
385 written: 0,
386 writer: write::AsyncWriter::new(
387 cache,
388 me.algorithm.unwrap_or(Algorithm::Sha256),
389 None,
390 )
391 .await?,
392 opts: me,
393 })
394 }
395 inner(self, cache.as_ref(), key.as_ref()).await
396 }
397
398 #[cfg(any(feature = "async-std", feature = "tokio"))]
400 pub async fn open_hash<P>(self, cache: P) -> Result<Writer>
401 where
402 P: AsRef<Path>,
403 {
404 async fn inner(me: WriteOpts, cache: &Path) -> Result<Writer> {
405 Ok(Writer {
406 cache: cache.to_path_buf(),
407 key: None,
408 written: 0,
409 writer: write::AsyncWriter::new(
410 cache,
411 me.algorithm.unwrap_or(Algorithm::Sha256),
412 me.size,
413 )
414 .await?,
415 opts: me,
416 })
417 }
418 inner(self, cache.as_ref()).await
419 }
420
421 pub fn open_sync<P, K>(self, cache: P, key: K) -> Result<SyncWriter>
423 where
424 P: AsRef<Path>,
425 K: AsRef<str>,
426 {
427 fn inner(me: WriteOpts, cache: &Path, key: &str) -> Result<SyncWriter> {
428 Ok(SyncWriter {
429 cache: cache.to_path_buf(),
430 key: Some(String::from(key)),
431 written: 0,
432 writer: write::Writer::new(
433 cache,
434 me.algorithm.unwrap_or(Algorithm::Sha256),
435 me.size,
436 )?,
437 opts: me,
438 })
439 }
440 inner(self, cache.as_ref(), key.as_ref())
441 }
442
443 pub fn open_hash_sync<P>(self, cache: P) -> Result<SyncWriter>
445 where
446 P: AsRef<Path>,
447 {
448 fn inner(me: WriteOpts, cache: &Path) -> Result<SyncWriter> {
449 Ok(SyncWriter {
450 cache: cache.to_path_buf(),
451 key: None,
452 written: 0,
453 writer: write::Writer::new(
454 cache,
455 me.algorithm.unwrap_or(Algorithm::Sha256),
456 me.size,
457 )?,
458 opts: me,
459 })
460 }
461 inner(self, cache.as_ref())
462 }
463
464 pub fn algorithm(mut self, algo: Algorithm) -> Self {
466 self.algorithm = Some(algo);
467 self
468 }
469
470 pub fn size(mut self, size: usize) -> Self {
473 self.size = Some(size);
474 self
475 }
476
477 pub fn metadata(mut self, metadata: Value) -> Self {
479 self.metadata = Some(metadata);
480 self
481 }
482
483 pub fn raw_metadata(mut self, metadata: Vec<u8>) -> Self {
485 self.raw_metadata = Some(metadata);
486 self
487 }
488
489 pub fn time(mut self, time: u128) -> Self {
493 self.time = Some(time);
494 self
495 }
496
497 pub fn integrity(mut self, sri: Integrity) -> Self {
501 self.sri = Some(sri);
502 self
503 }
504}
505
506pub struct SyncWriter {
508 cache: PathBuf,
509 key: Option<String>,
510 written: usize,
511 pub(crate) writer: write::Writer,
512 opts: WriteOpts,
513}
514
515impl Write for SyncWriter {
516 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
517 let written = self.writer.write(buf)?;
518 self.written += written;
519 Ok(written)
520 }
521 fn flush(&mut self) -> std::io::Result<()> {
522 self.writer.flush()
523 }
524}
525
526impl SyncWriter {
527 pub fn create<P, K>(cache: P, key: K) -> Result<SyncWriter>
542 where
543 P: AsRef<Path>,
544 K: AsRef<str>,
545 {
546 fn inner(cache: &Path, key: &str) -> Result<SyncWriter> {
547 WriteOpts::new()
548 .algorithm(Algorithm::Sha256)
549 .open_sync(cache, key)
550 }
551 inner(cache.as_ref(), key.as_ref())
552 }
553
554 pub fn create_with_algo<P, K>(algo: Algorithm, cache: P, key: K) -> Result<SyncWriter>
570 where
571 P: AsRef<Path>,
572 K: AsRef<str>,
573 {
574 fn inner(algo: Algorithm, cache: &Path, key: &str) -> Result<SyncWriter> {
575 WriteOpts::new().algorithm(algo).open_sync(cache, key)
576 }
577 inner(algo, cache.as_ref(), key.as_ref())
578 }
579 pub fn commit(mut self) -> Result<Integrity> {
584 let cache = self.cache;
585 let writer_sri = self.writer.close()?;
586 if let Some(sri) = &self.opts.sri {
587 if sri.matches(&writer_sri).is_none() {
588 return Err(ssri::Error::IntegrityCheckError(sri.clone(), writer_sri).into());
589 }
590 } else {
591 self.opts.sri = Some(writer_sri.clone());
592 }
593 if let Some(size) = self.opts.size {
594 if size != self.written {
595 return Err(Error::SizeMismatch(size, self.written));
596 }
597 }
598 if let Some(key) = self.key {
599 index::insert(&cache, &key, self.opts)
600 } else {
601 Ok(writer_sri)
602 }
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 #[cfg(feature = "async-std")]
609 use async_attributes::test as async_test;
610 #[cfg(feature = "tokio")]
611 use tokio::test as async_test;
612
613 #[cfg(any(feature = "async-std", feature = "tokio"))]
614 #[async_test]
615 async fn round_trip() {
616 let tmp = tempfile::tempdir().unwrap();
617 let dir = tmp.path().to_owned();
618 crate::write(&dir, "hello", b"hello").await.unwrap();
619 let data = crate::read(&dir, "hello").await.unwrap();
620 assert_eq!(data, b"hello");
621 }
622
623 #[test]
624 fn round_trip_sync() {
625 let tmp = tempfile::tempdir().unwrap();
626 let dir = tmp.path().to_owned();
627 crate::write_sync(&dir, "hello", b"hello").unwrap();
628 let data = crate::read_sync(&dir, "hello").unwrap();
629 assert_eq!(data, b"hello");
630 }
631
632 #[test]
633 fn hash_write_sync() {
634 let tmp = tempfile::tempdir().unwrap();
635 let dir = tmp.path().to_owned();
636 let original = format!("hello world{}", 5);
637 let integrity = crate::write_hash_sync(&dir, &original)
638 .expect("should be able to write a hash synchronously");
639 let bytes = crate::read_hash_sync(&dir, &integrity)
640 .expect("should be able to read the data we just wrote");
641 let result =
642 String::from_utf8(bytes).expect("we wrote valid utf8 but did not read valid utf8 back");
643 assert_eq!(result, original, "we did not read back what we wrote");
644 }
645
646 #[cfg(any(feature = "async-std", feature = "tokio"))]
647 #[async_test]
648 async fn hash_write_async() {
649 let tmp = tempfile::tempdir().unwrap();
650 let dir = tmp.path().to_owned();
651 let original = format!("hello world{}", 12);
652 let integrity = crate::write_hash(&dir, &original)
653 .await
654 .expect("should be able to write a hash asynchronously");
655 let bytes = crate::read_hash(&dir, &integrity)
656 .await
657 .expect("should be able to read back what we wrote");
658 let result =
659 String::from_utf8(bytes).expect("we wrote valid utf8 but did not read valid utf8 back");
660 assert_eq!(result, original, "we did not read back what we wrote");
661 }
662}