pub mod full;
pub mod simple;
use crate::meta::attributes::*;
use crate::compression::{Compression, ByteVec};
use crate::math::*;
use std::io::{Read, Seek, Write};
use crate::error::{Result, Error, PassiveResult, usize_to_i32};
use crate::meta::{MetaData, Header, TileIndices, Blocks};
use crate::chunks::{Chunk, Block, TileBlock, ScanLineBlock, TileCoordinates};
use crate::io::{PeekRead, Tracking};
use rayon::iter::{ParallelIterator, ParallelBridge};
use crate::io::Data;
use smallvec::SmallVec;
use std::ops::Range;
use std::convert::TryFrom;
use std::collections::BTreeMap;
#[derive(Clone, Copy, Eq, Hash, PartialEq, Debug)]
pub struct BlockIndex {
pub layer: usize,
pub pixel_position: Vec2<usize>,
pub pixel_size: Vec2<usize>,
pub level: Vec2<usize>,
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct UncompressedBlock {
pub index: BlockIndex,
pub data: ByteVec,
}
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub struct Line<'s> {
pub location: LineIndex,
pub value: &'s [u8],
}
#[derive(Clone, Copy, Eq, PartialEq, Debug, Hash)]
pub struct LineIndex {
pub layer: usize,
pub channel: usize,
pub level: Vec2<usize>,
pub position: Vec2<usize>,
pub width: usize,
}
impl<'s> Line<'s> {
pub fn read_samples<T: crate::io::Data>(&self, slice: &mut [T]) -> PassiveResult {
debug_assert_eq!(slice.len(), self.location.width, "line value width bug");
T::read_slice(&mut self.value.clone(), slice)
}
pub fn sample_iter<T: crate::io::Data>(&self) -> impl Iterator<Item = Result<T>> + '_ {
let mut read = self.value.clone(); (0..self.location.width).map(move |_| T::read(&mut read))
}
}
impl LineIndex {
pub fn write_samples<T: crate::io::Data>(slice: &[T], write: &mut impl Write) -> PassiveResult {
T::write_slice(write, slice)?;
Ok(())
}
}
pub fn read_all_lines_from_buffered<T>(
read: impl Read + Send, parallel: bool,
new: impl Fn(&[Header]) -> Result<T>,
mut insert: impl FnMut(&mut T, Line<'_>) -> PassiveResult
) -> Result<T>
{
let (meta_data, mut read_chunk) = self::read_all_compressed_chunks_from_buffered(read)?;
let meta_data_ref = &meta_data;
let read_chunks = std::iter::from_fn(move || read_chunk(meta_data_ref));
let mut result = new(meta_data.headers.as_slice())?;
for_lines_in_chunks(
read_chunks, &meta_data, parallel,
|line| insert(&mut result, line)
)?;
Ok(result)
}
pub fn read_filtered_lines_from_buffered<T>(
read: impl Read + Seek + Send, parallel: bool,
filter: impl Fn(&Header, &TileIndices) -> bool,
new: impl Fn(&[Header]) -> Result<T>,
mut insert: impl FnMut(&mut T, Line<'_>) -> PassiveResult
) -> Result<T>
{
let (meta_data, mut read_chunk) = self::read_filtered_chunks_from_buffered(read, filter)?;
let read_chunks = std::iter::from_fn(|| read_chunk(&meta_data));
let mut value = new(meta_data.headers.as_slice())?;
for_lines_in_chunks(
read_chunks, &meta_data, parallel,
|line| insert(&mut value, line)
)?;
Ok(value)
}
fn for_lines_in_chunks(chunks: impl Send + Iterator<Item = Result<Chunk>>, meta_data: &MetaData, parallel: bool, mut for_each: impl FnMut(Line<'_>) -> PassiveResult) -> PassiveResult {
let has_compression = meta_data.headers.iter() .find(|header| header.compression != Compression::Uncompressed).is_some();
if parallel && has_compression {
let (sender, receiver) = std::sync::mpsc::channel();
chunks.par_bridge()
.map(|chunk| UncompressedBlock::decompress_chunk(chunk?, &meta_data))
.try_for_each_with(sender, |sender, result| {
result.map(|block: UncompressedBlock| sender.send(block).expect("threading error"))
})?;
for decompressed in receiver {
let header = meta_data.headers.get(decompressed.index.layer)
.ok_or(Error::invalid("chunk index"))?;
for (bytes, line) in decompressed.index.line_indices(header) {
for_each(Line { location: line, value: &decompressed.data[bytes] })?;
}
}
Ok(())
}
else {
for chunk in chunks {
let decompressed = UncompressedBlock::decompress_chunk(chunk?, &meta_data)?;
let header = meta_data.headers.get(decompressed.index.layer)
.ok_or(Error::invalid("chunk index"))?;
for (bytes, line) in decompressed.index.line_indices(header) {
for_each(Line { location: line, value: &decompressed.data[bytes] })?;
}
}
Ok(())
}
}
pub fn read_all_compressed_chunks_from_buffered<'m>(
read: impl Read + Send, ) -> Result<(MetaData, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let mut read = PeekRead::new(read);
let meta_data = MetaData::read_from_buffered_peekable(&mut read)?;
let mut remaining_chunk_count = usize::try_from(MetaData::skip_offset_tables(&mut read, &meta_data.headers)?)
.expect("too large chunk count for this machine");
Ok((meta_data, move |meta_data| {
if remaining_chunk_count > 0 {
remaining_chunk_count -= 1;
Some(Chunk::read(&mut read, meta_data))
}
else {
None
}
}))
}
pub fn read_filtered_chunks_from_buffered<'m>(
read: impl Read + Seek + Send, filter: impl Fn(&Header, &TileIndices) -> bool,
) -> Result<(MetaData, impl FnMut(&'m MetaData) -> Option<Result<Chunk>>)>
{
let skip_read = Tracking::new(read);
let mut read = PeekRead::new(skip_read);
let meta_data = MetaData::read_from_buffered_peekable(&mut read)?;
let offset_tables = MetaData::read_offset_tables(&mut read, &meta_data.headers)?;
let mut offsets = Vec::with_capacity(meta_data.headers.len() * 32);
for (header_index, header) in meta_data.headers.iter().enumerate() { for (block_index, block) in header.blocks_increasing_y_order().enumerate() { if filter(header, &block) {
offsets.push(offset_tables[header_index][block_index]) }
};
}
offsets.sort(); let mut offsets = offsets.into_iter();
Ok((meta_data, move |meta_data| {
offsets.next().map(|offset|{
read.skip_to(usize::try_from(offset).expect("too large chunk position for this machine"))?; Chunk::read(&mut read, meta_data)
})
}))
}
pub fn uncompressed_image_blocks_ordered<'l>(
meta_data: &'l MetaData,
get_line: &'l (impl Fn(LineIndex, &mut Vec<u8>) + Send + Sync + 'l) ) -> impl Iterator<Item = (usize, UncompressedBlock)> + 'l + Send {
meta_data.headers.iter().enumerate()
.flat_map(move |(layer_index, header)|{
header.enumerate_ordered_blocks().map(move |(chunk_index, tile)|{
let data_indices = header.get_absolute_block_indices(tile.location).expect("tile coordinate bug");
let block_indices = BlockIndex {
layer: layer_index, level: tile.location.level_index,
pixel_position: data_indices.start.to_usize("data indices start").expect("data index bug"),
pixel_size: data_indices.size,
};
let mut block_bytes = Vec::with_capacity(header.max_block_byte_size());
for (byte_range, line_index) in block_indices.line_indices(header) {
debug_assert_eq!(byte_range.start, block_bytes.len(), "line_indices byte range calculation bug");
get_line(line_index, &mut block_bytes);
debug_assert_eq!(byte_range.end, block_bytes.len(), "line_indices byte range calculation bug");
}
(chunk_index, UncompressedBlock {
index: block_indices,
data: block_bytes
})
})
})
}
pub fn for_compressed_blocks_in_image(
meta_data: &MetaData, get_line: impl Fn(LineIndex, &mut Vec<u8>) + Send + Sync,
parallel: bool, mut write_chunk: impl FnMut(usize, Chunk) -> PassiveResult
) -> PassiveResult
{
let blocks = uncompressed_image_blocks_ordered(meta_data, &get_line);
let parallel = parallel && meta_data.headers.iter() .any(|header| header.compression != Compression::Uncompressed);
let requires_sorting = meta_data.headers.iter()
.any(|header| header.line_order != LineOrder::Unspecified);
if parallel {
let (sender, receiver) = std::sync::mpsc::channel();
blocks.par_bridge()
.map(|(chunk_index, block)| Ok((chunk_index, block.compress_to_chunk(meta_data)?)))
.try_for_each_with(sender, |sender, result: Result<(usize, Chunk)>| {
result.map(|block| sender.send(block).expect("threading error"))
})?;
if !requires_sorting {
for (chunk_index, compressed_chunk) in receiver {
write_chunk(chunk_index, compressed_chunk)?;
}
}
else {
let mut expected_id_order = meta_data.headers.iter().enumerate()
.flat_map(|(layer, header)| header.enumerate_ordered_blocks().map(move |(chunk, _)| (layer, chunk)));
let mut next_id = expected_id_order.next();
let mut pending_blocks = BTreeMap::new();
for (chunk_index, compressed_chunk) in receiver {
pending_blocks.insert((compressed_chunk.layer_index, chunk_index), compressed_chunk);
while let Some(pending_chunk) = next_id.as_ref().and_then(|id| pending_blocks.remove(id)) {
let pending_chunk_index = next_id.unwrap().1; write_chunk(pending_chunk_index, pending_chunk)?;
next_id = expected_id_order.next();
}
}
assert!(expected_id_order.next().is_none(), "expected more blocks bug");
assert_eq!(pending_blocks.len(), 0, "pending blocks left after processing bug");
}
}
else {
for (chunk_index, uncompressed_block) in blocks {
let chunk = uncompressed_block.compress_to_chunk(meta_data)?;
write_chunk(chunk_index, chunk)?;
}
}
Ok(())
}
#[must_use]
pub fn write_all_lines_to_buffered(
write: impl Write + Seek,
parallel: bool, pedantic: bool,
mut meta_data: MetaData,
get_line: impl Fn(LineIndex, &mut Vec<u8>) + Send + Sync
) -> PassiveResult
{
if !parallel {
for header in &mut meta_data.headers {
if header.line_order == LineOrder::Unspecified {
header.line_order = LineOrder::Increasing;
}
}
}
let mut write = Tracking::new(write);
meta_data.write_validating_to_buffered(&mut write, pedantic)?;
let offset_table_start_byte = write.byte_position();
let offset_table_size: usize = meta_data.headers.iter()
.map(|header| header.chunk_count).sum();
write.seek_write_to(write.byte_position() + offset_table_size * std::mem::size_of::<u64>())?;
let mut offset_tables: Vec<Vec<u64>> = meta_data.headers.iter()
.map(|header| vec![0; header.chunk_count]).collect();
for_compressed_blocks_in_image(&meta_data, get_line, parallel, |chunk_index, chunk|{
offset_tables[chunk.layer_index][chunk_index] = write.byte_position() as u64; chunk.write(&mut write, meta_data.headers.as_slice())
})?;
write.seek_write_to(offset_table_start_byte)?;
for offset_table in offset_tables {
u64::write_slice(&mut write, offset_table.as_slice())?;
}
Ok(())
}
impl BlockIndex {
pub fn line_indices(&self, header: &Header) -> impl Iterator<Item=(Range<usize>, LineIndex)> {
struct LineIter {
layer: usize, level: Vec2<usize>, width: usize,
end_y: usize, x: usize, channel_sizes: SmallVec<[usize; 8]>,
byte: usize, channel: usize, y: usize,
};
impl Iterator for LineIter {
type Item = (Range<usize>, LineIndex);
fn next(&mut self) -> Option<Self::Item> {
if self.y < self.end_y {
let byte_len = self.channel_sizes[self.channel];
let return_value = (
(self.byte .. self.byte + byte_len),
LineIndex {
channel: self.channel,
layer: self.layer,
level: self.level,
position: Vec2(self.x, self.y),
width: self.width,
}
);
{ self.byte += byte_len;
self.channel += 1;
if self.channel == self.channel_sizes.len() {
self.channel = 0;
self.y += 1;
}
}
Some(return_value)
}
else {
None
}
}
}
let channel_line_sizes: SmallVec<[usize; 8]> = header.channels.list.iter()
.map(move |channel| self.pixel_size.0 * channel.pixel_type.bytes_per_sample()) .collect();
LineIter {
layer: self.layer,
level: self.level,
width: self.pixel_size.0,
x: self.pixel_position.0,
end_y: self.pixel_position.1 + self.pixel_size.1,
channel_sizes: channel_line_sizes,
byte: 0,
channel: 0,
y: self.pixel_position.1
}
}
}
impl UncompressedBlock {
pub fn decompress_chunk(chunk: Chunk, meta_data: &MetaData) -> Result<Self> {
let header: &Header = meta_data.headers.get(chunk.layer_index)
.ok_or(Error::invalid("chunk layer index"))?;
let tile_data_indices = header.get_block_data_indices(&chunk.block)?;
let absolute_indices = header.get_absolute_block_indices(tile_data_indices)?;
absolute_indices.validate(header.data_window.size)?;
match chunk.block {
Block::Tile(TileBlock { compressed_pixels, .. }) |
Block::ScanLine(ScanLineBlock { compressed_pixels, .. }) => Ok(UncompressedBlock {
data: header.compression.decompress_image_section(header, compressed_pixels, absolute_indices)?,
index: BlockIndex {
layer: chunk.layer_index,
pixel_position: absolute_indices.start.to_usize("data indices start")?,
level: tile_data_indices.level_index,
pixel_size: absolute_indices.size,
}
}),
_ => return Err(Error::unsupported("deep data not supported yet"))
}
}
pub fn compress_to_chunk(self, meta_data: &MetaData) -> Result<Chunk> {
let UncompressedBlock { data, index } = self;
let header: &Header = meta_data.headers.get(index.layer)
.expect("block layer index bug");
let expected_byte_size = header.channels.bytes_per_pixel * self.index.pixel_size.area(); if expected_byte_size != data.len() {
panic!("get_line byte size should be {} but was {}", expected_byte_size, data.len());
}
let compressed_data = header.compression.compress_image_section(data)?;
Ok(Chunk {
layer_index: index.layer,
block : match header.blocks {
Blocks::ScanLines => Block::ScanLine(ScanLineBlock {
compressed_pixels: compressed_data,
y_coordinate: usize_to_i32(index.pixel_position.1) + header.data_window.start.1,
}),
Blocks::Tiles(tiles) => Block::Tile(TileBlock {
compressed_pixels: compressed_data,
coordinates: TileCoordinates {
level_index: index.level,
tile_index: index.pixel_position / tiles.tile_size,
},
}),
}
})
}
}