use std::{
collections::BTreeMap,
path::{Path, PathBuf},
str::FromStr,
};
use anyhow::Context;
use chrono::{DateTime, Utc};
use sequoia_openpgp::parse::{stream::DetachedVerifierBuilder, Parse};
use serde::{de::Visitor, Deserialize, Serialize, Serializer};
use tokio::io::AsyncReadExt as _;
use tracing::instrument;
pub const DATETIME_FORMAT: &str = "%Y%m%dT%H%M%S";
pub const CONTENT_FOLDER: &str = "content";
pub const CHECKSUM_FILE: &str = "checksum.sha256";
pub const DATA_FILE: &str = "data.tar.gz";
pub const DATA_FILE_ENCRYPTED: &str = "data.tar.gz.gpg";
pub const METADATA_FILE: &str = "metadata.json";
pub const METADATA_SIG_FILE: &str = "metadata.json.sig";
pub mod state {
#[derive(Debug, Clone)]
pub struct Verified;
#[derive(Debug, Clone)]
pub struct Unverified;
pub trait State: sealed::Sealed {}
impl State for Unverified {}
impl State for Verified {}
mod sealed {
pub trait Sealed {}
impl Sealed for super::Unverified {}
impl Sealed for super::Verified {}
}
}
#[derive(Debug, Clone)]
pub(crate) enum Source {
Local(PathBuf),
S3(S3Source),
}
#[derive(Debug, Clone)]
pub(crate) struct S3Source {
pub(crate) client: crate::remote::s3::Client,
pub(crate) bucket: String,
pub(crate) object: String,
}
#[derive(Clone)]
pub struct Package<State: state::State = state::Unverified> {
state: std::marker::PhantomData<State>,
zip: crate::zip::ZipReader,
name: String,
}
impl<State: state::State> std::fmt::Debug for Package<State> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Package")
.field("name", &self.name)
.field("source", &self.zip.source)
.finish()
}
}
impl<State: state::State> Package<State> {
pub(crate) fn name(&self) -> &str {
&self.name
}
pub(crate) fn path(&self) -> anyhow::Result<&Path> {
if let Source::Local(path) = &self.zip.source {
Ok(path)
} else {
Err(anyhow::anyhow!(
"package path is not avialble for non-local sources"
))
}
}
}
macro_rules! read_metadata {
($package:expr) => {{
let (mut reader, size) = $package.zip.get_file_reader(METADATA_FILE).await?;
let mut buf = Vec::with_capacity(size as usize);
reader.read_to_end(&mut buf).await?;
Ok(serde_json::from_slice(&buf)?)
}};
}
impl Package<state::Unverified> {
#[instrument(fields(path = %path.as_ref().display()), err(Debug, level=tracing::Level::ERROR))]
pub async fn open(path: impl AsRef<Path>) -> anyhow::Result<Self> {
let path = path.as_ref();
anyhow::ensure!(path.exists(), "package '{}' does not exist", path.display());
let path = path.canonicalize()?;
let name = path
.file_name()
.context("Unable to get package file name")?
.to_string_lossy()
.to_string();
Ok(Package {
state: Default::default(),
name,
zip: crate::zip::ZipReader::open(Source::Local(path)).await?,
})
}
#[instrument(err(Debug, level=tracing::Level::ERROR))]
pub async fn open_s3(
client: &crate::remote::s3::Client,
bucket: String,
object: String,
) -> anyhow::Result<Self> {
let source = Source::S3(S3Source {
client: client.clone(),
bucket,
object: object.clone(),
});
Ok(Package {
state: Default::default(),
zip: crate::zip::ZipReader::open(source).await?,
name: object,
})
}
fn verify_format(&self) -> anyhow::Result<()> {
const EXPECTED_FILES: [&str; 3] = [DATA_FILE_ENCRYPTED, METADATA_FILE, METADATA_SIG_FILE];
let error_msg_base = format!(
"A valid data package must contain exactly the following {} files: {}",
EXPECTED_FILES.len(),
EXPECTED_FILES.join(", ")
);
let mut actual_files = Vec::new();
for file_name in self.zip.file_names() {
if EXPECTED_FILES.contains(&file_name) {
actual_files.push(file_name);
} else {
anyhow::bail!(
"invalid data package. Zip archive contains unexpected \
files. {error_msg_base}."
);
}
}
if actual_files.len() == EXPECTED_FILES.len() {
return Ok(());
}
Err(anyhow::anyhow!(
"invalid data package. Zip archive is missing the following \
files: {}. {}.",
EXPECTED_FILES
.into_iter()
.filter(|f| !actual_files.contains(f))
.collect::<Vec<&str>>()
.join(", "),
error_msg_base
))
}
pub async fn verify(
self,
cert_store: &crate::openpgp::certstore::CertStore<'_>,
) -> anyhow::Result<Package<state::Verified>> {
macro_rules! read_inner {
($file:expr) => {{
let (mut reader, size) = self
.zip
.get_file_reader($file)
.await
.with_context(|| format!("{} not found", $file))?;
let mut buffer = Vec::with_capacity(size as usize);
reader.read_to_end(&mut buffer).await?;
buffer
}};
}
self.verify_format()?;
DetachedVerifierBuilder::from_bytes(&read_inner!(METADATA_SIG_FILE))?
.with_policy(
&sequoia_openpgp::policy::StandardPolicy::new(),
None,
crate::openpgp::crypto::VerificationHelper { cert_store },
)?
.verify_bytes(read_inner!(METADATA_FILE))?;
Ok(Package {
state: Default::default(),
zip: self.zip,
name: self.name,
})
}
pub async fn metadata_unverified(&self) -> anyhow::Result<Metadata> {
read_metadata!(self)
}
}
impl Package<state::Verified> {
pub async fn metadata(&self) -> anyhow::Result<Metadata> {
read_metadata!(self)
}
pub(crate) async fn data(
&self,
) -> anyhow::Result<(Box<dyn tokio::io::AsyncBufRead + Unpin + Sync + Send>, u64)> {
self.zip.get_file_reader(DATA_FILE_ENCRYPTED).await
}
}
#[derive(Clone, Debug, Default, Deserialize, Serialize)]
pub struct Metadata {
pub sender: String,
pub recipients: Vec<String>,
pub checksum: String,
pub timestamp: DateTime<Utc>,
#[serde(default = "default_version")]
pub version: String,
#[serde(default)]
pub checksum_algorithm: ChecksumAlgorithm,
#[serde(default)]
pub compression_algorithm: CompressionAlgorithm,
#[serde(default)]
pub transfer_id: Option<u32>,
#[serde(default)]
pub purpose: Option<Purpose>,
#[serde(default)]
pub extra: BTreeMap<String, String>,
}
impl Metadata {
pub(crate) fn to_json_or_debug(&self) -> String {
serde_json::to_string(self).unwrap_or_else(|_| format!("{:?}", self))
}
}
#[derive(Deserialize, Serialize, Debug, Default, Clone, Copy, PartialEq, Eq)]
pub enum ChecksumAlgorithm {
#[default]
SHA256,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CompressionAlgorithm {
Stored,
Gzip(Option<u32>),
Zstandard(Option<i32>),
}
impl Default for CompressionAlgorithm {
fn default() -> Self {
Self::Zstandard(None)
}
}
impl Serialize for CompressionAlgorithm {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
match *self {
CompressionAlgorithm::Stored => {
serializer.serialize_unit_variant("CompressionAlgorithm", 0, "stored")
}
CompressionAlgorithm::Gzip(_) => {
serializer.serialize_unit_variant("CompressionAlgorithm", 1, "gzip")
}
CompressionAlgorithm::Zstandard(_) => {
serializer.serialize_unit_variant("CompressionAlgorithm", 2, "zstandard")
}
}
}
}
impl<'de> Deserialize<'de> for CompressionAlgorithm {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct CompressionAlgorithmVisitor;
impl<'de> Visitor<'de> for CompressionAlgorithmVisitor {
type Value = CompressionAlgorithm;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("one of `stored`, `gzip`, `zstandard`")
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
match v.to_lowercase().as_str() {
"stored" => Ok(CompressionAlgorithm::Stored),
"gzip" => Ok(CompressionAlgorithm::Gzip(None)),
"zstandard" => Ok(CompressionAlgorithm::Zstandard(None)),
_ => Err(E::custom(format!("unknown variant `{}`", v))),
}
}
}
deserializer.deserialize_str(CompressionAlgorithmVisitor {})
}
}
pub fn default_version() -> String {
"0.7.2".into()
}
#[derive(Copy, Clone, Deserialize, Serialize, Debug)]
pub enum Purpose {
PRODUCTION,
TEST,
}
impl FromStr for Purpose {
type Err = anyhow::Error;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_lowercase().as_str() {
"production" => Ok(Self::PRODUCTION),
"test" => Ok(Self::TEST),
_ => Err(anyhow::anyhow!("Invalid purpose: {}", s)),
}
}
}
pub(crate) fn generate_package_name(timestamp: &DateTime<Utc>, prefix: Option<&str>) -> String {
let ts = timestamp.format(DATETIME_FORMAT);
if let Some(prefix) = prefix {
format!("{prefix}_{ts}.zip")
} else {
format!("{ts}.zip")
}
}