1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167
|
# Copyright (c) 2015 SUSE Linux GmbH. All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi. If not, see <http://www.gnu.org/licenses/>
#
import os
from collections import namedtuple
import hashlib
import encodings.ascii as encoding
# project
from kiwi.api_helper import decommissioned
from kiwi.utils.compress import Compress
from kiwi.utils.primes import factors
from kiwi.exceptions import (
KiwiFileNotFound
)
class Checksum:
"""
**Manage checksum creation for files**
:param str source_filename: source file name to build checksum for
:param str checksum_filename: target file with checksum information
"""
def __init__(self, source_filename):
if not os.path.exists(source_filename):
raise KiwiFileNotFound(
f'checksum source file {source_filename} not found'
)
self.source_filename = source_filename
self.checksum_filename = None
self.ascii = encoding.getregentry().name
def matches(self, checksum, filename):
"""
Compare given checksum with reference checksum stored
in the provided filename. If the checksum matches the
method returns True, or False in case it does not match
:param str checksum: checksum string to compare
:param str filename: filename containing checksum
:return: True or False
:rtype: bool
"""
if not os.path.exists(filename):
return False
with open(filename, encoding=self.ascii) as checksum_file:
checksum_from_file = checksum_file.read()
# checksum is expected to be stored in the first field
# separated by space, other information might contain
# the filename or blocklist data which is not of interest
# for the plain checksum match
if checksum_from_file.split(' ')[0] == checksum:
return True
return False
@decommissioned
def md5(self, filename=None):
pass # pragma: no cover
def sha256(self, filename=None):
"""
Create sha256 checksum
:param str filename: filename for checksum
"""
sha256_checksum = self._calculate_hash_hexdigest(
hashlib.sha256(), self.source_filename
)
if filename:
self._create_checksum_file(
sha256_checksum, filename
)
return sha256_checksum
def _create_checksum_file(self, checksum, filename):
"""
Creates the text file that contains the checksum
:param str checksum: checksum to include into the file
:param str filename: filename of the output file
"""
compressed_blocks = None
compress = Compress(self.source_filename)
if compress.get_format():
compressed_blocks = self._block_list(
os.path.getsize(self.source_filename)
)
compress.uncompress(temporary=True)
blocks = self._block_list(
os.path.getsize(compress.uncompressed_filename)
)
checksum = self._calculate_hash_hexdigest(
hashlib.sha256(), compress.uncompressed_filename
)
else:
blocks = self._block_list(
os.path.getsize(self.source_filename)
)
with open(filename, encoding=self.ascii, mode='w') as checksum_file:
if compressed_blocks:
checksum_file.write(
'%s %s %s %s %s\n' % (
checksum, blocks.blocks, blocks.blocksize,
compressed_blocks.blocks, compressed_blocks.blocksize
)
)
else:
checksum_file.write(
f'{checksum} {blocks.blocks} {blocks.blocksize}\n'
)
def _calculate_hash_hexdigest(self, digest, filename, digest_blocks=128):
"""
Calculates the hash hexadecimal digest for a given file.
:param func digest: Digest function for hash calculation
:param str filename: File to compute
:param int digest_blocks: Number of blocks processed at a time
"""
chunk_size = digest_blocks * digest.block_size
with open(filename, 'rb') as source:
for chunk in iter(lambda: source.read(chunk_size), b''):
digest.update(chunk)
return digest.hexdigest()
def _block_list(self, file_size):
"""
Calculates the number of blocks and the block size for a given file
size in bytes.
:param int file_size: files size in bytes.
:return: int:blocksize, int:blocks
:rtype: tuple
"""
blocksize = 1
for factor in factors(file_size, 8192):
if blocksize * factor > 8192:
break
blocksize *= factor
blocks = int(file_size / blocksize)
block_list = namedtuple(
'block_list', ['blocksize', 'blocks']
)
return block_list(
blocksize=blocksize,
blocks=blocks
)
|