Source code for snapm.manager._boot

# Copyright Red Hat
#
# snapm/manager/boot.py - Snapshot Manager boot support
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Boot integration for snapshot manager
"""
from os import uname
from os.path import exists as path_exists
import logging

import boom
import boom.cache
import boom.command
from boom.config import load_boom_config, BoomConfigError
from boom.bootloader import (
    OPTIONAL_KEYS,
    optional_key_default,
    key_to_bls_name,
)
from boom.osprofile import match_os_profile_by_version

from snapm import (
    SnapmNotFoundError,
    SnapmCalloutError,
    ETC_FSTAB,
)

_log = logging.getLogger(__name__)

_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error

#: Path to the system machine-id file
_MACHINE_ID = "/etc/machine-id"
#: Path to the legacy system machine-id file
_DBUS_MACHINE_ID = "/var/lib/dbus/machine-id"
#: Path to the system os-release file
_OS_RELEASE = "/etc/os-release"

#: Snapshot set kernel command line argument
SNAPSET_ARG = "snapm.snapset"

#: Snapshot set revert kernel command line argument
REVERT_ARG = "snapm.revert"

#: /dev path prefix
_DEV_PREFIX = "/dev/"


[docs]def _get_uts_release(): """ Return the UTS release (kernel version) of the running system. :returns: A string representation of the UTS release value. """ return uname()[2]
[docs]def _get_machine_id(): """ Return the current host's machine-id. Get the machine-id value for the running system by reading from ``/etc/machine-id`` and return it as a string. :returns: The ``machine_id`` as a string :rtype: str """ if path_exists(_MACHINE_ID): path = _MACHINE_ID elif path_exists(_DBUS_MACHINE_ID): path = _DBUS_MACHINE_ID else: return None with open(path, "r", encoding="utf8") as file: try: machine_id = file.read().strip() except OSError as err: _log_error("Could not read machine-id from '%s': %s", path, err) machine_id = None return machine_id
[docs]def _find_snapset_root(snapset): """ Find the device that backs the root filesystem for snapshot set ``snapset``. If the snapset does not include the root volume look the device up via the fstab. """ for snapshot in snapset.snapshots: if snapshot.mount_point == "/": return snapshot # Note: add fstab lookup for non-root snapsets # needs either root=UUID/LABEL support in boom or a lookup to resolve any # UUID/LABEL found in the file. raise SnapmNotFoundError(f"Could not find root device for snapset {snapset.name}")
[docs]def _create_default_os_profile(): """ Create a default boom OsProfile for the running system. This uses the boom API to run the equivalent of: ``boom profile create --from-host``. """ options = boom.command.os_options_from_cmdline() _log_info( "Creating default boot profile from %s with options %s", _OS_RELEASE, options ) return boom.command.create_profile( None, None, None, None, profile_file=_OS_RELEASE, options=options )
[docs]def _build_snapset_mount_list(snapset): """ Build a list of command line mount unit definitions for the snapshot set ``snapset``. Mount points that are not part of the snapset are substituted from /etc/fstab. :param snapset: The snapshot set to build a mount list for. """ mounts = [] snapset_mounts = snapset.mount_points with open(ETC_FSTAB, "r", encoding="utf8") as fstab: for line in fstab.readlines(): if line == "\n" or line.startswith("#"): continue what, where, fstype, options, _, _ = line.split() if where == "/": continue if where in snapset_mounts: snapshot = snapset.snapshot_by_mount_point(where) mounts.append(f"{snapshot.devpath}:{where}:{fstype}:{options}") else: mounts.append(f"{what}:{where}:{fstype}:{options}") return mounts
[docs]def _create_boom_boot_entry( version, title, tag_arg, root_device, lvm_root_lv=None, mounts=None ): """ Create a boom boot entry according to the passed arguments. :param version: The UTS release name for the boot entry :param title: The title for the boot entry. :param tag_arg: A tag argument to be added to the kernel command line and used to associate the entry with a snapshot set name or UUID. :param root_device: The root device for the entry. Passed to root=... :param lvm_root_lv: An optional LVM2 root logical volume. :param mounts: An optional list of mount specifications to use for the boot entry. If defined fstab=no will be appended to the generated kernel command line. """ assert title is not None, "Boot entry argument title must have a value" assert version is not None, "Boot entry argument version must have a value" assert tag_arg is not None, "Boot entry argument tag_arg must have a value" assert root_device is not None, "Boot entry argument root_device must have a value" machine_id = _get_machine_id() osp = match_os_profile_by_version(version) if not osp: try: osp = _create_default_os_profile() except ValueError as err: raise SnapmCalloutError( f"Error calling boom to create default OsProfile: {err}" ) from err if mounts: add_opts = f"rw {tag_arg}" del_opts = "ro" else: add_opts = tag_arg del_opts = None entry = boom.command.create_entry( title, version, machine_id, root_device, lvm_root_lv=lvm_root_lv, profile=osp, add_opts=add_opts, del_opts=del_opts, write=False, images=boom.command.I_BACKUP, no_fstab=bool(mounts), mounts=mounts, ) # Apply defaults for optional keys enabled in profile for opt_key in OPTIONAL_KEYS: bls_key = key_to_bls_name(opt_key) if bls_key in osp.optional_keys: setattr(entry, bls_key, optional_key_default(opt_key)) # Write BLS snippet for entry entry.write_entry() return entry
[docs]def create_snapset_boot_entry(snapset, title=None): """ Create a boom boot entry to boot into the snapshot set represented by ``snapset``. :param snapset: The snapshot set for which to create a boot entry. :param title: An optional title for the boot entry. If ``title`` is ``None`` the boot entry will be titled as "Snapshot snapset_name snapset_time". """ version = _get_uts_release() title = title or f"Snapshot {snapset.name} {snapset.time} ({version})" root_snapshot = _find_snapset_root(snapset) root_device = root_snapshot.devpath if root_snapshot.provider.name in ("lvm2-cow", "lvm2-thin"): lvm_root_lv = root_snapshot.name else: lvm_root_lv = None mounts = _build_snapset_mount_list(snapset) snapset.boot_entry = _create_boom_boot_entry( version, title, f"{SNAPSET_ARG}={snapset.uuid}", root_device, lvm_root_lv=lvm_root_lv, mounts=mounts, ) _log_debug( "Created boot entry '%s' for snapshot set with UUID=%s", title, snapset.uuid )
[docs]def create_snapset_revert_entry(snapset, title=None): """ Create a boom boot entry to revert the snapshot set represented by ``snapset``. :param snapset: The snapshot set for which to create a revert entry. :param title: An optional title for the revert entry. If ``title`` is ``None`` the revert entry will be titled as "Revert snapset_name snapset_time". """ version = _get_uts_release() title = title or f"Revert {snapset.name} {snapset.time} ({version})" root_snapshot = _find_snapset_root(snapset) root_device = root_snapshot.origin if root_snapshot.provider.name in ("lvm2-cow", "lvm2-thin"): lvm_root_lv = root_device.removeprefix(_DEV_PREFIX) else: lvm_root_lv = None snapset.revert_entry = _create_boom_boot_entry( version, title, f"{REVERT_ARG}={snapset.uuid}", root_device, lvm_root_lv=lvm_root_lv, ) _log_debug( "Created revert entry '%s' for snapshot set with UUID=%s", title, snapset.uuid )
[docs]def _delete_boot_entry(boot_id): """ Delete a boom boot entry by ID. :param boot_id: The boot identifier to delete. """ selection = boom.Selection(boot_id=boot_id) boom.command.delete_entries(selection=selection) boom.cache.clean_cache()
[docs]def delete_snapset_boot_entry(snapset): """ Delete the boot entry corresponding to ``snapset``. :param snapset: The snapshot set for which to remove a boot entry. """ if snapset.boot_entry is None: return _delete_boot_entry(snapset.boot_entry.boot_id)
[docs]def delete_snapset_revert_entry(snapset): """ Delete the revert entry corresponding to ``snapset``. :param snapset: The snapshot set for which to remove a revert entry. """ if snapset.revert_entry is None: return _delete_boot_entry(boot_id=snapset.revert_entry.boot_id)
[docs]def check_boom_config(): """ Check for boom configuration and create the default config if not found. """ not_found_err = SnapmCalloutError("No usable boom configuration found") try: load_boom_config() except ValueError: _log_warn( "No boom configuration found: attempting to generate defaults in /boot" ) try: boom.command.create_config() except AttributeError as err: _log_error("Installed boom version does not support create_config()") raise not_found_err from err except (BoomConfigError, FileExistsError) as err: _log_error("Missing or invalid boom configuration: %s", err) raise not_found_err from err except OSError as err: _log_error("Error creating boom configuration: %s", err) raise not_found_err from err
[docs]class BootEntryCache(dict): """ Cache mapping snapshot sets to boom ``BootEntry`` instances. Boot entries in the cache are either snapshot set boot entries or revert entries depending on the value of ``entry_arg``. """
[docs] def __init__(self, entry_arg): super().__init__() self.entry_arg = entry_arg self.refresh_cache()
[docs] def _parse_entry(self, boot_entry): """ Parse a boom ``BootEntry`` options string and return the value of the ``snapm.snapset`` argument if present. :param boot_entry: The boot entry to process. :returns: The snapset name for the boot entry. """ for word in boot_entry.options.split(): if word.startswith(self.entry_arg): _, value = word.split("=") return value return None
[docs] def refresh_cache(self): """ Generate mappings from snapshot sets to boot entries. """ self.clear() entries = boom.command.find_entries() for boot_entry in entries: snapset = self._parse_entry(boot_entry) if snapset: self[snapset] = boot_entry
[docs]class BootCache: """ Set of caches mapping snapshot sets to boot entries and revert entries. """
[docs] def __init__(self): self.entry_cache = BootEntryCache(SNAPSET_ARG) _log_debug( "Initialised boot entry cache with %d entries", len(self.entry_cache) ) self.revert_cache = BootEntryCache(REVERT_ARG) _log_debug( "Initialised revert boot entry cache with %d entries", len(self.revert_cache), )
[docs] def refresh_cache(self): """ Refresh the cache of boot entry mappings held by this ``BootCache`` instance. """ self.entry_cache.refresh_cache() _log_debug("Refreshed boot entry cache with %d entries", len(self.entry_cache)) self.revert_cache.refresh_cache() _log_debug( "Refreshed revert boot entry cache with %d entries", len(self.revert_cache), )