use std::{
collections::BTreeMap,
io::{self, Write as _},
path::{Path, PathBuf},
};
use anyhow::Context as _;
use bytes::BytesMut;
use chrono::{DateTime, Utc};
use flate2::{write::GzEncoder, Compression};
use sequoia_openpgp::policy::StandardPolicy;
use tokio::io::AsyncWriteExt;
use tracing::{debug, info, instrument, trace};
use crate::{
destination::Destination,
filesystem::{check_space, check_writeable, get_common_path},
package,
task::{Mode, Status},
utils::{Progress, ProgressReader},
zip::ZipArchive,
};
pub struct EncryptOpts<T, U> {
pub files: Vec<PathBuf>,
pub recipients: Vec<crate::openpgp::cert::Fingerprint>,
pub signer: crate::openpgp::cert::Fingerprint,
pub cert_store: crate::openpgp::certstore::CertStore<'static>,
pub key_store: crate::openpgp::keystore::KeyStore,
pub password: U,
pub compression_algorithm: package::CompressionAlgorithm,
pub mode: Mode,
pub progress: Option<T>,
pub purpose: Option<package::Purpose>,
pub transfer_id: Option<u32>,
pub timestamp: DateTime<Utc>,
pub prefix: Option<String>,
pub extra_metadata: BTreeMap<String, String>,
}
impl<T: Progress, U> std::fmt::Debug for EncryptOpts<T, U> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("EncryptOpts")
.field("files", &self.files)
.field("recipients", &self.recipients)
.field("signer", &self.signer)
.field("compression_algorithm", &self.compression_algorithm)
.field("mode", &self.mode)
.field("purpose", &self.purpose)
.field("transfer_id", &self.transfer_id)
.field("extra_metadata", &self.extra_metadata)
.finish()
}
}
#[instrument(err(Debug, level=tracing::Level::ERROR))]
pub async fn encrypt<T, F, Fut>(
mut opts: EncryptOpts<T, F>,
dest: Destination,
) -> anyhow::Result<Status>
where
T: Progress + Send + 'static,
F: Fn(crate::openpgp::types::PasswordHint) -> Fut,
Fut: std::future::Future<Output = crate::openpgp::types::Password>,
{
trace!("Extract signing and encryption certificates");
let policy = StandardPolicy::new();
let signer_cert = opts.cert_store.get_cert_by_fingerprint(&opts.signer)?;
let recipients_certs = opts
.recipients
.iter()
.map(|fp| {
opts.cert_store
.get_cert_by_fingerprint(fp)
.map(|cert| cert.0)
})
.collect::<anyhow::Result<Vec<_>>>()?;
let keys = EncryptKeys {
signer: (
crate::openpgp::crypto::get_signing_capable_key(
&signer_cert.0,
&policy,
&mut opts.key_store,
&opts.password,
)
.await?,
signer_cert.0.fingerprint(),
),
recipients: crate::openpgp::crypto::get_encryption_keys(&recipients_certs, &policy)?
.into_iter()
.zip(recipients_certs.iter().map(|cert| cert.fingerprint()))
.collect(),
};
let status = match dest {
Destination::Stdout => encrypt_stdout(opts, keys).await,
Destination::Local(local_opts) => encrypt_local(opts, &local_opts, keys).await,
Destination::Sftp(sftp_opts) => encrypt_sftp(opts, &sftp_opts, keys).await,
Destination::S3(s3_opts) => encrypt_s3(opts, &s3_opts, keys).await,
}?;
match &status {
Status::Checked {
destination,
source_size,
} => {
debug!(destination, source_size, "Checked encryption task input");
}
Status::Completed {
destination,
source_size,
destination_size,
metadata,
} => {
info!(
destination,
source_size,
destination_size,
metadata = metadata.to_json_or_debug(),
"Successfully created encrypted data package"
)
}
}
Ok(status)
}
struct EncryptKeys {
signer: (sequoia_keystore::Key, sequoia_openpgp::Fingerprint),
recipients: Vec<(
sequoia_openpgp::packet::Key<
sequoia_openpgp::packet::key::PublicParts,
sequoia_openpgp::packet::key::UnspecifiedRole,
>,
sequoia_openpgp::Fingerprint,
)>,
}
fn get_archive_paths(files: &[PathBuf]) -> anyhow::Result<Vec<PathBuf>> {
let parent_folders = files
.iter()
.map(|f| {
f.parent()
.with_context(|| format!("Unable to find parent folder of {f:?}"))
})
.collect::<Result<Vec<_>, _>>()?;
let root_dir = get_common_path(&parent_folders)?;
let archive_paths = files
.iter()
.map(|f| Ok(Path::new(package::CONTENT_FOLDER).join(f.strip_prefix(&root_dir)?)))
.collect::<anyhow::Result<_>>()?;
Ok(archive_paths)
}
fn init_message<'a, W: io::Write + Send + Sync>(
writer: &'a mut W,
keys: &'a EncryptKeys,
) -> anyhow::Result<sequoia_openpgp::serialize::stream::Message<'a>> {
use sequoia_openpgp::serialize::stream::{
Encryptor2, LiteralWriter, Message, Recipient, Signer,
};
LiteralWriter::new(
Signer::new(
Encryptor2::for_recipients(
Message::new(writer),
keys.recipients.iter().map(|(key, _)| Recipient::from(key)),
)
.build()?,
keys.signer.0.clone(),
)
.build()?,
)
.build()
}
fn add_checksum_file(
archive: &mut tar::Builder<impl io::Write>,
content: &[(String, String)],
) -> anyhow::Result<()> {
use std::fmt::Write as _;
let content = content
.iter()
.fold(String::new(), |mut output, (checksum, path)| {
let _ = writeln!(output, "{checksum} {path}");
output
})
.into_bytes();
let mut header = tar::Header::new_gnu();
header.set_entry_type(tar::EntryType::file());
header.set_size(content.len().try_into()?);
header.set_mtime(Utc::now().timestamp().try_into()?);
header.set_mode(0o644);
header.set_cksum();
archive.append_data(&mut header, package::CHECKSUM_FILE, &content[..])?;
Ok(())
}
fn create_tarball(
writer: &mut impl io::Write,
file_path: &[(PathBuf, PathBuf)],
mut progress: Option<&mut impl Progress>,
) -> anyhow::Result<()> {
enum Message {
Payload(BytesMut),
Finalize(std::path::PathBuf),
}
impl crate::io::Message for Message {
fn from_bytes(bytes: BytesMut) -> Self {
Self::Payload(bytes)
}
}
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(4);
let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
for _ in 0..4 {
pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
}
let checksums_len = file_path.len();
let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
let mut checksums = Vec::with_capacity(checksums_len);
let mut hasher = SHA256.context()?;
while let Some(message) = result_rx.blocking_recv() {
match message {
Message::Payload(payload) => {
hasher.update(&payload);
if pool_tx.blocking_send(payload).is_err() {
trace!("Failed to send chunk back to the pool (channel closed)");
}
}
Message::Finalize(path) => {
let path = crate::filesystem::to_posix_path(&path)
.with_context(|| format!("Path contains non-UTF-8 characters: {path:?}"))?
.to_string();
let cs = std::mem::replace(&mut hasher, SHA256.context()?);
checksums.push((crate::utils::to_hex_string(&cs.into_digest()?), path));
}
}
}
Ok(checksums)
});
let mut archive = tar::Builder::new(writer);
for (fs_path, archive_path) in file_path {
let f = std::fs::File::open(fs_path)?;
let mut header = tar::Header::new_gnu();
header.set_metadata(&f.metadata()?);
header.set_cksum();
let mut hash_reader = crate::io::Tee::new(f, &mut pool_rx, &result_tx)?;
if let Some(p) = progress.as_mut() {
let mut progress_reader = ProgressReader::new(&mut hash_reader, |len| {
p.inc(len.try_into()?);
Ok(())
});
archive.append_data(&mut header, archive_path, &mut progress_reader)?;
} else {
archive.append_data(&mut header, archive_path, &mut hash_reader)?;
}
result_tx.blocking_send(Message::Finalize(archive_path.clone()))?;
}
drop(result_tx);
let checksums = checksums_handle
.join()
.map_err(|_| anyhow::anyhow!("checksum thread panicked"))??;
add_checksum_file(&mut archive, &checksums)?;
archive.finish()?;
Ok(())
}
fn add_metadata_file<S: sequoia_openpgp::crypto::Signer + Send + Sync>(
archive: &mut ZipArchive<impl io::Write>,
metadata: &package::Metadata,
signer_key: S,
) -> anyhow::Result<()> {
let zip_file_opts = Default::default();
archive.add(package::METADATA_FILE, &zip_file_opts)?;
let metadata_json = serde_json::to_string(&metadata)?.as_bytes().to_owned();
archive.write_all(&metadata_json)?;
archive.add(package::METADATA_SIG_FILE, &zip_file_opts)?;
archive.write_all(&crate::openpgp::crypto::sign_detached(
&metadata_json,
signer_key,
crate::openpgp::types::SerializationFormat::AsciiArmored,
)?)?;
Ok(())
}
fn process_files<P: AsRef<Path>>(files: &[P]) -> anyhow::Result<(Vec<(PathBuf, PathBuf)>, u64)> {
let mut total_input_size = 0u64;
let mut unique_files = std::collections::HashSet::new();
for entry in files.iter().flat_map(walkdir::WalkDir::new) {
let entry = entry?;
if entry.file_type().is_file() {
total_input_size += entry.metadata()?.len();
unique_files.insert(entry.path().canonicalize()?.to_owned());
}
}
let files = Vec::from_iter(unique_files);
let archive_paths = get_archive_paths(&files)?;
archive_paths.iter().try_for_each(|p| {
p.to_str()
.and(Some(()))
.with_context(|| format!("Path contains non-UTF-8 characters: {p:?}"))
})?;
let file_archive_path = files.into_iter().zip(archive_paths).collect();
Ok((file_archive_path, total_input_size))
}
fn spawn_encryption_task<T: Progress + Send + 'static, F>(
mut opts: EncryptOpts<T, F>,
file_path: Vec<(PathBuf, PathBuf)>,
total_input_size: u64,
keys: EncryptKeys,
source: crate::io::Source,
sink: tokio::sync::mpsc::Sender<BytesMut>,
) -> tokio::task::JoinHandle<anyhow::Result<(u64, package::Metadata)>> {
struct Message(BytesMut);
impl crate::io::Message for Message {
fn from_bytes(bytes: BytesMut) -> Self {
Self(bytes)
}
}
tokio::task::spawn_blocking(move || -> anyhow::Result<_> {
let writer = crate::io::ChannelWriter::new(source, sink)?;
let mut zip = ZipArchive::new(writer);
zip.add(package::DATA_FILE_ENCRYPTED, &Default::default())?;
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel::<Message>(4);
let (pool_tx, mut pool_rx) = tokio::sync::mpsc::channel(4);
for _ in 0..4 {
pool_tx.blocking_send(BytesMut::with_capacity(1 << 19))?;
}
let checksums_handle = std::thread::spawn(move || -> anyhow::Result<_> {
use sequoia_openpgp::{crypto::hash::Digest as _, types::HashAlgorithm::SHA256};
let mut hasher = SHA256.context()?;
while let Some(message) = result_rx.blocking_recv() {
hasher.update(&message.0);
if pool_tx.blocking_send(message.0).is_err() {
trace!("Failed to send chunk back to the pool (channel closed)");
}
}
Ok(crate::utils::to_hex_string(&hasher.into_digest()?))
});
let mut checksum_writer = crate::io::Tee::new(&mut zip, &mut pool_rx, &result_tx)?;
let mut encrypted_message = init_message(&mut checksum_writer, &keys)?;
crate::io::write_parallel(&mut encrypted_message, |w| {
let mut progress = opts.progress.as_mut();
if let Some(p) = &mut progress {
p.set_length(total_input_size);
}
match opts.compression_algorithm {
package::CompressionAlgorithm::Stored => create_tarball(w, &file_path, progress),
package::CompressionAlgorithm::Gzip(level) => {
let mut enc = GzEncoder::new(
w,
Compression::new(level.unwrap_or_else(|| Compression::default().level())),
);
create_tarball(&mut enc, &file_path, progress)
}
package::CompressionAlgorithm::Zstandard(level) => {
let mut enc = zstd::stream::write::Encoder::new(
w,
level.unwrap_or(zstd::DEFAULT_COMPRESSION_LEVEL),
)?;
enc.multithread(2)?;
create_tarball(&mut enc, &file_path, progress)?;
enc.finish()?;
Ok(())
}
}
})?;
encrypted_message.finalize()?;
checksum_writer.flush_channel()?;
drop(result_tx);
let checksum = checksums_handle
.join()
.map_err(|_| anyhow::anyhow!("checksum thread in encrypt panicked"))??;
let metadata = package::Metadata {
sender: keys.signer.1.to_hex(),
recipients: keys.recipients.iter().map(|(_, fp)| fp.to_hex()).collect(),
checksum,
timestamp: opts.timestamp,
version: package::default_version(),
checksum_algorithm: Default::default(),
compression_algorithm: opts.compression_algorithm,
transfer_id: opts.transfer_id,
purpose: opts.purpose,
extra: opts.extra_metadata,
};
add_metadata_file(&mut zip, &metadata, keys.signer.0.clone())?;
let size = zip.finish()?;
if let Some(p) = &mut opts.progress {
p.finish();
}
Ok((size, metadata))
})
}
async fn encrypt_to_writer<T: Progress + Send + 'static, F>(
output: impl tokio::io::AsyncWrite,
opts: EncryptOpts<T, F>,
file_path: Vec<(PathBuf, PathBuf)>,
total_input_size: u64,
keys: EncryptKeys,
) -> anyhow::Result<(u64, package::Metadata)> {
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
for _ in 0..3 {
buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
}
let encrypt_task_handle = spawn_encryption_task(
opts,
file_path,
total_input_size,
keys,
crate::io::Source::Channel(buf_pool_rx),
result_tx,
);
tokio::pin!(output);
while let Some(chunk) = result_rx.recv().await {
output.write_all(&chunk).await?;
if buf_pool_tx.send(chunk).await.is_err() {
trace!("Failed to send chunk back to the pool (channel closed)");
}
}
encrypt_task_handle.await?
}
async fn encrypt_to_blocking_writer<T: Progress + Send + 'static, F>(
output: &mut impl std::io::Write,
opts: EncryptOpts<T, F>,
file_path: Vec<(PathBuf, PathBuf)>,
total_input_size: u64,
keys: EncryptKeys,
) -> anyhow::Result<(u64, package::Metadata)> {
let (result_tx, mut result_rx) = tokio::sync::mpsc::channel(3);
let (buf_pool_tx, buf_pool_rx) = tokio::sync::mpsc::channel(3);
for _ in 0..3 {
buf_pool_tx.send(BytesMut::with_capacity(1 << 22)).await?;
}
let encrypt_task_handle = spawn_encryption_task(
opts,
file_path,
total_input_size,
keys,
crate::io::Source::Channel(buf_pool_rx),
result_tx,
);
while let Some(chunk) = result_rx.recv().await {
output.write_all(&chunk)?;
if buf_pool_tx.send(chunk).await.is_err() {
trace!("Failed to send chunk back to the pool (channel closed)");
}
}
encrypt_task_handle.await?
}
async fn encrypt_local<T: Progress + Send + 'static, F>(
opts: EncryptOpts<T, F>,
dest: &crate::destination::Local,
keys: EncryptKeys,
) -> anyhow::Result<Status> {
trace!("Verify files to encrypt");
let (file_path, source_size) = process_files(&opts.files)?;
let output_path = dest
.path
.join(dest.package_name(&opts.timestamp, opts.prefix.as_deref()));
let destination = output_path.to_string_lossy().into_owned();
let parent = output_path
.parent()
.context("Unable to find output directory")?;
trace!("Verify available disk space");
check_space(source_size, parent, opts.mode)?;
trace!("Verify whether destination is writeable");
check_writeable(parent, opts.mode)?;
Ok(if let Mode::Check = opts.mode {
Status::Checked {
destination,
source_size,
}
} else {
let output = tokio::fs::File::create(&output_path).await?;
let mut buffered_output = tokio::io::BufWriter::with_capacity(1 << 22, output);
let (package_size, metadata) =
encrypt_to_writer(&mut buffered_output, opts, file_path, source_size, keys)
.await
.with_context(|| {
let err_msg = "encryption failed";
if let Err(msg) = std::fs::remove_file(&output_path) {
anyhow::anyhow!(
"{}. Additionally, failed to clean partial file '{}', reason: {}",
err_msg,
&destination,
msg
)
} else {
anyhow::anyhow!(err_msg)
}
})?;
buffered_output.flush().await?;
Status::Completed {
destination,
source_size,
destination_size: package_size,
metadata,
}
})
}
async fn encrypt_stdout<T: Progress + Send + 'static, F>(
opts: EncryptOpts<T, F>,
keys: EncryptKeys,
) -> anyhow::Result<Status> {
trace!("Verify files to encrypt");
let (file_path, source_size) = process_files(&opts.files)?;
let destination = "stdout".to_string();
Ok(if let Mode::Check = opts.mode {
Status::Checked {
destination,
source_size,
}
} else {
let (package_size, metadata) =
encrypt_to_writer(tokio::io::stdout(), opts, file_path, source_size, keys).await?;
Status::Completed {
destination,
source_size,
destination_size: package_size,
metadata,
}
})
}
async fn encrypt_s3<T: Progress + Send + 'static, F>(
opts: EncryptOpts<T, F>,
dest: &crate::destination::S3,
keys: EncryptKeys,
) -> anyhow::Result<Status> {
let object_name = package::generate_package_name(&opts.timestamp, opts.prefix.as_deref());
trace!("Verify files to encrypt");
let (file_path, source_size) = process_files(&opts.files)?;
let destination = [
dest.client().endpoint.as_str(),
dest.bucket(),
object_name.as_str(),
]
.join("/");
if let Mode::Check = opts.mode {
return Ok(Status::Checked {
destination,
source_size,
});
}
let (result_tx, result_rx) = tokio::sync::mpsc::channel(3);
let chunk_size = crate::remote::s3::compute_chunk_size(source_size);
let encrypt_task_handle = spawn_encryption_task(
opts,
file_path,
source_size,
keys,
crate::io::Source::New(chunk_size),
result_tx,
);
dest.client()
.put_object(dest.bucket(), &object_name, result_rx)
.await?;
let (package_size, metadata) = encrypt_task_handle.await??;
Ok(Status::Completed {
destination,
source_size,
destination_size: package_size,
metadata,
})
}
async fn encrypt_sftp<T: Progress + Send + 'static, F>(
opts: EncryptOpts<T, F>,
dest: &crate::destination::Sftp,
keys: EncryptKeys,
) -> anyhow::Result<Status> {
trace!("Verify files to encrypt");
let (file_path, source_size) = process_files(&opts.files)?;
let client = dest.client().connect()?;
let upload_dir = crate::remote::sftp::UploadDir::new(dest.base_path(), &client);
let dpkg_path = crate::remote::sftp::DpkgPath::new(
&upload_dir.path,
package::generate_package_name(&opts.timestamp, opts.prefix.as_deref()),
&client,
);
let destination = client.get_url(&dpkg_path.path);
if let Mode::Check = opts.mode {
return Ok(Status::Checked {
destination,
source_size,
});
}
upload_dir.create(None)?;
let mut buffered_output =
std::io::BufWriter::with_capacity(1 << 22, client.inner.create(&dpkg_path.tmp)?);
let (package_size, metadata) =
encrypt_to_blocking_writer(&mut buffered_output, opts, file_path, source_size, keys)
.await
.with_context(|| {
let err_msg = "encryption failed";
if let Err(msg) = upload_dir.delete() {
anyhow::anyhow!(
"{}. Additionally, failed to clean partially uploaded data \
at '{}', reason: {}",
err_msg,
destination,
msg
)
} else {
anyhow::anyhow!(err_msg)
}
})?;
buffered_output.flush()?;
dpkg_path.finalize()?;
upload_dir.finalize()?;
Ok(Status::Completed {
source_size,
destination_size: package_size,
destination,
metadata,
})
}