[go: up one dir, main page]

File: base.py

package info (click to toggle)
kiwi 10.2.33-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 7,528 kB
  • sloc: python: 67,299; sh: 3,980; xml: 3,379; ansic: 391; makefile: 354
file content (428 lines) | stat: -rw-r--r-- 13,630 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
# Copyright (c) 2015 SUSE Linux GmbH.  All rights reserved.
#
# This file is part of kiwi.
#
# kiwi is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# kiwi is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with kiwi.  If not, see <http://www.gnu.org/licenses/>
#
import os
import uuid
import random
import logging
import copy
from functools import reduce
from typing import (
    Dict, List, Optional
)

# project
import kiwi.defaults as defaults

from kiwi.defaults import Defaults
from kiwi.utils.sync import DataSync
from kiwi.mount_manager import MountManager
from kiwi.command import Command
from kiwi.storage.device_provider import DeviceProvider
from kiwi.utils.veritysetup import VeritySetup

from kiwi.exceptions import (
    KiwiFileSystemSyncError
)

log = logging.getLogger('kiwi')


class FileSystemBase:
    """
    **Implements base class for filesystem interface**

    :param object device_provider:
        Instance of a class based on DeviceProvider
        required for filesystems which needs a block device for
        creation. In most cases the DeviceProvider is a LoopDevice
    :param string root_dir: root directory path name
    :param dict custom_args: custom filesystem arguments
    """
    def __init__(
        self, device_provider: DeviceProvider,
        root_dir: str = '', custom_args: Dict = {}
    ):
        # filesystems created with a block device stores the mountpoint
        # here. The file name of the file containing the filesystem is
        # stored in the device_provider if the filesystem is represented
        # as a file there
        self.filesystem_mount: Optional[MountManager] = None

        # the underlaying device provider. This is only required if the
        # filesystem required a block device to become created
        self.device_provider = device_provider

        self.root_dir = root_dir

        # filesystems created without a block device stores the result
        # filesystem file name here
        self.filename = ''

        self.custom_args: Dict = {}
        self.post_init(custom_args)
        self.veritysetup: Optional[VeritySetup] = None

    def __enter__(self):
        return self

    def post_init(self, custom_args: Dict):
        """
        Post initialization method

        Store dictionary of custom arguments if not empty. This
        overrides the default custom argument hash

        :param dict custom_args: custom arguments

            .. code:: python

                {
                    'create_options': ['option'],
                    'mount_options': ['option'],
                    'meta_data': {
                        'key': 'value'
                    }
                }
        """
        if custom_args:
            self.custom_args = copy.deepcopy(custom_args)

        if not self.custom_args.get('create_options'):
            self.custom_args['create_options'] = []

        if not self.custom_args.get('meta_data'):
            self.custom_args['meta_data'] = {}

        if not self.custom_args.get('mount_options'):
            self.custom_args['mount_options'] = []

        if not self.custom_args.get('fs_attributes'):
            self.custom_args['fs_attributes'] = []

    def set_uuid(self):
        """
        Create new random filesystem UUID

        Implement in specialized filesystem class for filesystems which
        supports the concept of an UUID and allows to change it
        """
        log.warning(
            'Instance {0} has no support for setting a new UUID label'.format(
                type(self).__name__
            )
        )

    def create_on_device(
        self, label: str = None, size: int = 0, unit: str = defaults.UNIT.kb,
        uuid: str = None
    ):
        """
        Create filesystem on block device

        Implement in specialized filesystem class for filesystems which
        requires a block device for creation, e.g ext4.

        :param str label: label name
        :param int size:
            size value, can also be counted from the end via -X
            The value is interpreted in units of: unit
        :param str unit:
            unit name. Default unit is set to: defaults.UNIT.kb
        :param str uuid: UUID name
        """
        raise NotImplementedError

    def create_on_file(
        self, filename: str, label: str = None, exclude: List[str] = None
    ):
        """
        Create filesystem from root data tree

        Implement in specialized filesystem class for filesystems which
        requires a data tree for creation, e.g squashfs.

        :param string filename: result file path name
        :param string label: label name
        :param list exclude: list of exclude dirs/files
        """
        raise NotImplementedError

    def get_mountpoint(self) -> Optional[str]:
        """
        Provides mount point directory

        Effective use of the directory is guaranteed only after sync_data

        :return: directory path name

        :rtype: string
        """
        if self.filesystem_mount:
            return self.filesystem_mount.mountpoint
        return None

    def sync_data(self, exclude: List[str] = []) -> MountManager:
        """
        Copy data tree into filesystem

        :param list exclude: list of exclude dirs/files
        :return: The mount created for syncing data. It should be used to
            un-mount the filesystem again.
        """
        if not self.root_dir:
            raise KiwiFileSystemSyncError(
                'no root directory specified'
            )
        if not os.path.exists(self.root_dir):
            raise KiwiFileSystemSyncError(
                f'given root directory {self.root_dir} does not exist'
            )
        self.filesystem_mount = MountManager(
            device=self.device_provider.get_device()
        )
        self.filesystem_mount.mount(
            self.custom_args['mount_options']
        )
        self._apply_attributes()
        data = DataSync(
            self.root_dir, self.filesystem_mount.mountpoint
        )
        data.sync_data(
            exclude=exclude, options=Defaults.get_sync_options()
        )
        return self.filesystem_mount

    def create_verity_layer(
        self, blocks: Optional[int] = None, filename: str = None
    ):
        """
        Create veritysetup on device

        :param int block:
            Number of blocks to use for veritysetup.
            If not specified the entire root device is used
        :param str filename:
            Target filename to use for VeritySetup.
            If not specified the filename or block special
            provided at object construction time is used
        """
        on_file_name = filename or self.filename
        self.veritysetup = VeritySetup(
            on_file_name or self.device_provider.get_device(),
            blocks
        )
        log.info(
            '--> Creating dm verity hash ({0} blocks)...'.format(
                blocks or 'all'
            )
        )
        log.debug(
            '--> dm verity metadata: {0}'.format(self.veritysetup.format())
        )

    def create_verification_metadata(self, device_node: str = '') -> None:
        """
        Write verification block at the end of the device

        :param str device_node:
            Target device node, if not specified the root device
            from this instance is used
        """
        if self.veritysetup:
            log.info('--> Creating verification metadata...')
            self.veritysetup.create_verity_verification_metadata()
            log.info('--> Signing verification metadata...')
            self.veritysetup.sign_verification_metadata()
            self.veritysetup.write_verification_metadata(
                device_node or self.device_provider.get_device()
            )

    def mount(self) -> None:
        """
        Mount the filesystem
        """
        if self.filesystem_mount:
            self.filesystem_mount.mount(
                self.custom_args['mount_options']
            )

    def umount(self) -> None:
        """
        Umounts the filesystem in case it is mounted, does nothing otherwise
        """
        if self.filesystem_mount:
            log.info('umount %s instance', type(self).__name__)
            self.filesystem_mount.umount()

    def umount_volumes(self) -> None:
        """
        Consistency layer with regards to VolumeManager classes

        Invokes umount
        """
        self.umount()

    def mount_volumes(self) -> None:
        """
        Consistency layer with regards to VolumeManager classes

        Invokes mount
        """
        self.mount()

    def get_volumes(self) -> Dict:
        """
        Consistency layer with regards to VolumeManager classes

        Raises
        """
        raise NotImplementedError

    def get_root_volume_name(self) -> None:
        """
        Consistency layer with regards to VolumeManager classes

        Raises
        """
        raise NotImplementedError

    def get_fstab(
        self, persistency_type: str = 'by-label', filesystem_name: str = ''
    ) -> List[str]:
        """
        Consistency layer with regards to VolumeManager classes

        Raises
        """
        raise NotImplementedError

    def set_property_readonly_root(self) -> None:
        """
        Consistency layer with regards to VolumeManager classes

        Raises
        """
        raise NotImplementedError

    def _generate_seed_uuid(self, label: str, random_bits: int = 128) -> str:
        """
        Create random UUID. If SOURCE_DATE_EPOCH is present use
        SOURCE_DATE_EPOCH + label name as seed
        """
        sde = os.environ.get('SOURCE_DATE_EPOCH')
        if sde:
            label_seed = reduce(lambda x, y: x + y, map(ord, label))
            epoch_seed = label_seed + int(sde)
            log.info(
                'Using UUID seed SOURCE_DATE_EPOCH:{0} + LABEL:{1}={2}'.format(
                    sde, label, label_seed
                )
            )
            rd = random.Random()
            rd.seed(epoch_seed)
            return format(
                uuid.UUID(int=rd.getrandbits(random_bits))
            )
        else:
            return format(uuid.uuid4())

    def _map_size(self, size: float, from_unit: str, to_unit: str) -> float:
        """
        Return byte size value for given size and unit

        :param float size:
            requested filesystem size. The value is interpreted
            by the given from_unit.

        :param str from_unit: source unit
        :param str to_unit: target unit

        :return: size value in unit: to_unit

        :rtype: float
        """
        unit_map = {
            defaults.UNIT.byte: 1,
            defaults.UNIT.kb: 1024,
            defaults.UNIT.mb: 1048576,
            defaults.UNIT.gb: 1073741824
        }
        byte_size = size * unit_map[from_unit]
        return byte_size / unit_map[to_unit]

    def _fs_size(
        self, size: float, blocksize: int = 1, unit: str = defaults.UNIT.kb
    ) -> str:
        """
        Calculate filesystem size parameter in number of blocksize
        blocks. If the given size is <= 0 the calculation is done from
        the actual size of the block device reduced by the given size

        :param float size:
            requested filesystem size. The value is interpreted
            by the given unit.

        :param int blocksize:
            blocksize as requested from the filesystem creation tool
            for specifying the filesystem size. The value is interpreted
            by the given unit. By default set to: 1

        :param str unit:
            Unit to use for calculations and return value
            Default unit is set to: defaults.UNIT.kb

        :return: an int block count of the specified unit as str

        :rtype: str
        """
        if size > 0:
            result_size = size / blocksize
        else:
            device_name = self.device_provider.get_device()
            device_byte_size = self.device_provider.get_byte_size(device_name)
            requested_byte_size = self._map_size(size, unit, defaults.UNIT.byte)
            result_size = self._map_size(
                (device_byte_size + requested_byte_size) / blocksize,
                from_unit=defaults.UNIT.byte, to_unit=unit
            )
        return format(int(result_size))

    def _apply_attributes(self):
        """
        Apply filesystem attributes
        """
        attribute_map = {
            'synchronous-updates': '+S',
            'no-copy-on-write': '+C'
        }
        for attribute in self.custom_args['fs_attributes']:
            if attribute_map.get(attribute):
                log.info(
                    '--> setting {0} for {1}'.format(
                        attribute, self.filesystem_mount.mountpoint
                    )
                )
                Command.run(
                    [
                        'chattr', attribute_map.get(attribute),
                        self.filesystem_mount.mountpoint
                    ]
                )

    def __exit__(self, exc_type, exc_value, traceback):
        self.umount()