# Copyright Red Hat
#
# snapm/manager/_schedule.py - Snapshot Manager boot support
#
# This file is part of the snapm project.
#
# SPDX-License-Identifier: Apache-2.0
"""
Snapshot set scheduling abstractions for Snapshot Manager.
"""
from datetime import datetime, timedelta
from typing import List, Union, Dict
from dataclasses import dataclass
from json import dumps, loads
from os.path import join
from enum import Enum
from math import ceil
import tempfile
import textwrap
import logging
import os
from snapm import (
SnapmSystemError,
SnapmArgumentError,
SnapshotSet,
)
from ._boot import (
delete_snapset_boot_entry,
delete_snapset_revert_entry,
)
from ._timers import TimerStatus, TimerType, Timer
_log = logging.getLogger(__name__)
_log_debug = _log.debug
_log_info = _log.info
_log_warn = _log.warning
_log_error = _log.error
#: Garbage collect timer default calendarspec
_GC_CALENDAR_SPEC = "*-*-* *:10:00"
#: File mode for schdule configuration files
_SCHEDULE_CONF_FILE_MODE = 0o644
[docs]
class GcPolicyType(Enum):
"""
Garbage collection policy types enum.
"""
ALL = "All"
COUNT = "Count"
AGE = "Age"
TIMELINE = "Timeline"
[docs]
class GcPolicyParams:
"""
Abstract base class for garbage collection policy parameters.
"""
[docs]
def __str__(self):
"""
Return a human readable string representation of this
``GcPolicyParams`` instance.
:returns: A human readable string.
:rtype: ``str``
"""
return ", ".join(
f"{key}={val}" for key, val in self.__dict__.items() if val != 0
)
[docs]
def to_dict(self):
"""
Return a dictionary representation of this ``GcPolicyParams``.
:returns: This ``GcPolicyParams`` as a dictionary.
:rtype: ``dict``
"""
return self.__dict__.copy()
[docs]
def json(self, pretty=False):
"""
Return a JSON representation of this ``GcPolicyParams``.
:returns: This ``GcPolicyParams`` as a JSON string.
:rtype: ``str``
"""
return dumps(self.to_dict(), indent=4 if pretty else None)
[docs]
def evaluate(self, sets: List[SnapshotSet]) -> List[SnapshotSet]:
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this set of ``GcPolicyParams`` and return a list of
``SnapshotSet`` objects that should be garbage collected.
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
raise NotImplementedError
@dataclass
class GcPolicyParamsAll(GcPolicyParams):
"""
Policy parameters for the ALL policy type.
"""
_type = GcPolicyType.ALL
def evaluate(self, sets: List[SnapshotSet]) -> List[SnapshotSet]:
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this ``GcPolicyParamsAll`` instance and return a list of
``SnapshotSet`` objects that should be garbage collected.
Since this ``GcPolicyType`` always retains all ``SnapshotSet`` objects
passed to it it will always return the empty list.
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
return []
@dataclass
class GcPolicyParamsCount(GcPolicyParams):
"""
Policy parameters for the COUNT policy type.
``keep_count`` (int): The count of snapshot sets to keep.
"""
_type = GcPolicyType.COUNT
#: Retain ``keep_count`` number of snapshot sets.
keep_count: int = 0
def evaluate(self, sets: List[SnapshotSet]) -> List[SnapshotSet]:
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this ``GcPolicyParamsCount`` instance and return a list of
``SnapshotSet`` objects that should be garbage collected according
to the configured ``keep_count``.
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
return sets[0 : len(sets) - self.keep_count]
@dataclass
class GcPolicyParamsAge(GcPolicyParams):
"""
Policy parameters for the AGE policy type.
``keep_years:`` (int): The age in years to retain snapshot sets.
``keep_months:`` (int): The age in months to retain snapshot sets.
``keep_weeks:`` (int): The age in weeks to retain snapshot sets.
``keep_days:`` (int): The age in days to retain snapshot sets.
"""
_type = GcPolicyType.AGE
#: The maximum age in years to retain snapshot sets.
keep_years: int = 0
#: The maximum age in months to retain snapshot sets.
keep_months: int = 0
#: The maximum age in weeks to retain snapshot sets.
keep_weeks: int = 0
#: The maximum age in days to retain snapshot sets.
keep_days: int = 0
def to_days(self):
"""
Return the total number of days represented by this
``GcPolicyParamsAge`` as an integer.
:returns: The number of days to retain snapshot sets according to
this ``GcPolicyParamsAge`` object.
:rtype: ``int``
"""
return ceil(
self.keep_years * 365.25
+ self.keep_months * 30.44
+ self.keep_weeks * 7
+ self.keep_days
)
def to_timedelta(self):
"""
Return this ``GcPolicyParamsAge`` object as a ``datetime.timedelta``
object.
:returns: The time to retain snapshot sets as a ``timedelta`` object.
:rtype: ``datetime.timedelta``
"""
return timedelta(days=self.to_days())
def evaluate(self, sets: List[SnapshotSet]) -> List[SnapshotSet]:
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this ``GcPolicyParamsAge`` instance and return a list of
``SnapshotSet`` objects that should be garbage collected according
to the configured ``keep_years``, ``keep_months``, ``keep_weeks``,
and ``keep_days``
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
limit = datetime.now() - self.to_timedelta()
return [sset for sset in sets if sset.datetime < limit]
@dataclass
class GcPolicyParamsTimeline(GcPolicyParams):
"""
Policy parameters for the TIMELINE policy type.
``keep_hourly`` (int): The maximum number of hourly snapshot sets to keep.
``keep_daily`` (int): The maximum nuber of daily snapshot sets to keep.
``keep_weekly`` (int): The maximum number of weekly snapshot sets to keep.
``keep_monthly`` (int): The maximum nuber of monthly snapshot sets to keep.
``keep_quarterly`` (int): The maximum number of quarterly snapshot sets keep.
``keep_yearly`` (int): The maximum number of yearly snapshot sets to keep.
"""
_type = GcPolicyType.TIMELINE
#: The maximum number of yearly snapshot sets to keep.
keep_yearly: int = 0
#: The maximum number of quaterly snapshot sets to keep.
keep_quarterly: int = 0
#: The maximum number of monthly snapshot sets to keep.
keep_monthly: int = 0
#: The maximum number of weekly snapshot sets to keep.
keep_weekly: int = 0
#: The maximum number of daily snapshot sets to keep.
keep_daily: int = 0
#: The maximum number of hourly snapshot sets to keep.
keep_hourly: int = 0
def evaluate(self, sets: List[SnapshotSet]) -> List[SnapshotSet]:
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this ``GcPolicyParamsTimeline`` instance and return a list of
``SnapshotSet`` objects that should be garbage collected according
to the configured ``keep_yearly``, ``keey_quarterly``, ``keep_monthly``,
``keep_weekly``, ``keep_daily``, and ``keep_hourly``.
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
def classify_snapshot_sets(sets: List[SnapshotSet]) -> Dict[str, List]:
"""
Classify snapshot sets into one of six categories based on creation
time:
* yearly (first snapshot set after midnight 1st Jan)
* quarterly (first snapshot set after midnight 1st Apr, Jul, Oct)
* monthly (first snapshot set after midnight 1st of month)
* weekly (first snapshot set after midnight Monday)
* daily (first snapshot set after midnight)
* hourly (first snapshot set after top of hour)
Snapshot sets are assigned only to the first matching category (for
e.g. a snapshot set taken at 'Thu 1 Jan 00:05:00 1970' is assigned
to "yearly").
Any snapshot sets that are not classified by these rules (e.g. a
second snapshot set taken within a single hour) are assigned to
a catch-all "unclassified" list.
:param sets: The list of ``SnapshotSet`` objects to classify.
:type sets: ``List[SnapshotSet]``
:returns: a dictionary mapping category names to lists of
``SnapshotSet`` objects.
:rtype: ``Dict[str, List]``
"""
categories = ["yearly", "quarterly", "monthly", "weekly", "daily", "hourly"]
classified = {category: [] for category in categories}
seen_boundaries = {category: set() for category in categories}
# Catch-all for snapshot sets that are not classified as one of the fixed
# timeline categories.
classified["unclassified"] = []
# pylint: disable=too-many-return-statements
def get_boundary(dt, category):
if category == "yearly":
return datetime(dt.year, 1, 1)
if category == "quarterly":
q_month = ((dt.month - 1) // 3) * 3 + 1
return datetime(dt.year, q_month, 1)
if category == "monthly":
return datetime(dt.year, dt.month, 1)
if category == "weekly":
monday = dt - timedelta(days=dt.weekday())
return datetime(monday.year, monday.month, monday.day)
if category == "daily":
return datetime(dt.year, dt.month, dt.day)
if category == "hourly":
return datetime(dt.year, dt.month, dt.day, dt.hour)
return None
for sset in sets:
dt = sset.datetime
have_classified = False
for category in categories:
if category == "quarterly" and dt.month not in (1, 4, 7, 10):
continue
if category == "monthly" and dt.month in (1, 4, 7, 10):
continue
if category == "weekly" and dt.weekday() != 0:
continue
boundary = get_boundary(dt, category)
if dt > boundary and boundary not in seen_boundaries[category]:
classified[category].append(sset)
seen_boundaries[category].add(boundary)
have_classified = True
break # Do not allow multiple labels per snapshot set
if not have_classified:
classified["unclassified"].append(sset)
return classified
# Build lists of categorised snapshot sets
classified_sets = classify_snapshot_sets(sets)
# Short cuts for each category
yearly = classified_sets["yearly"]
quarterly = classified_sets["quarterly"]
monthly = classified_sets["monthly"]
weekly = classified_sets["weekly"]
daily = classified_sets["daily"]
hourly = classified_sets["hourly"]
if classified_sets["unclassified"]:
_log_warn(
"Found %d unclassified snapshot sets for GcPolicyParamsTimeline",
len(classified_sets["unclassified"]),
)
# Select snapshot sets for garbage collection based on category and
# per-category keep count values.
to_delete = []
to_delete += yearly[0 : len(yearly) - self.keep_yearly]
to_delete += quarterly[0 : len(quarterly) - self.keep_quarterly]
to_delete += monthly[0 : len(monthly) - self.keep_monthly]
to_delete += weekly[0 : len(weekly) - self.keep_weekly]
to_delete += daily[0 : len(daily) - self.keep_daily]
to_delete += hourly[0 : len(hourly) - self.keep_hourly]
return to_delete
#: Mapping from ``GcPolicyType`` values to ``GcPolicyParams`` subclasses.
_TYPE_MAP = {
GcPolicyType.ALL: GcPolicyParamsAll,
GcPolicyType.COUNT: GcPolicyParamsCount,
GcPolicyType.AGE: GcPolicyParamsAge,
GcPolicyType.TIMELINE: GcPolicyParamsTimeline,
}
[docs]
class GcPolicy:
"""
An instance of a garbage collection policy.
"""
[docs]
def __init__(self, policy_name: str, policy_type: GcPolicyType, params: dict):
"""
Initialise a new GcPolicy.
:param policy_type: The policy type.
:type policy_type: ``GcPolicyType``
:param params: A dictionary of parameters suitable for ``policy_type``.
:type params: ``dict``
"""
if policy_type not in _TYPE_MAP:
raise SnapmArgumentError(f"Invalid GcPolicyType: {policy_type}")
if not policy_name:
raise SnapmArgumentError("GcPolicy name cannot be empty")
self._name = policy_name
self._type = policy_type
self._params = _TYPE_MAP[policy_type](**params)
self._timer = Timer(TimerType.GC, policy_name, _GC_CALENDAR_SPEC)
[docs]
def __str__(self):
"""
Return a human readable string representation of this ``GcPolicy``.
:returns: A human readable string.
:rtype: ``str``
"""
return (
f"Name: {self.name}\n"
f"Type: {self.type.value}\n"
f"Params: {str(self.params)}"
)
[docs]
def __repr__(self):
"""
Return a machine readable string representation of this ``GcPolicy``.
:returns: A machine readable string.
:rtype: ``str``
"""
return (
f"GcPolicy('{self.name}', "
f"GcPolicyType.{self.type.name}, "
f"{self.params.to_dict()})"
)
[docs]
def __eq__(self, other):
"""
Test for ``GcPolicy`` equality.
"""
return (
self.name == other.name
and self.type == other.type
and self.params == other.params
)
[docs]
def to_dict(self):
"""
Return a dictionary representation of this ``GcPolicy``.
:returns: This ``GcPolicy`` as a dictionary.
:rtype: ``dict``
"""
params_dict = {"policy_name": self._name, "policy_type": self.type.name}
params_dict.update(self.params.to_dict())
return params_dict
[docs]
def json(self, pretty=False):
"""
Return a JSON representation of this ``GcPolicy``.
:returns: This ``GcPolicy`` as a JSON string.
:rtype: ``str``
"""
return dumps(self.to_dict(), indent=4 if pretty else None)
@property
def name(self):
"""
This ``GcPolicy`` instance's name.
"""
return self._name
@property
def params(self):
"""
This ``GcPolicy`` instance's ``GcPolicyParams`` value.
"""
return self._params
@property
def type(self):
"""
This ``GcPolicy`` instance's ``GcPolicyType`` value.
"""
return self._type
@property
def enabled(self):
"""
Return ``True`` if this ``GcPolicy`` and its corresponding timer are
enabled, and ``False`` otherwise.
"""
return self._timer.status in (TimerStatus.ENABLED, TimerStatus.RUNNING)
@property
def running(self):
"""
Return ``True`` if this ``GcPolicy`` and its corresponding timer are
enabled, and ``False`` otherwise.
"""
return self._timer.status == TimerStatus.RUNNING
[docs]
def enable(self):
"""
Enable this ``GcPolicy`` and its corresponding timer.
"""
self._timer.enable()
[docs]
def start(self):
"""
Start this the timer for this ``GcPolicy``.
"""
self._timer.start()
[docs]
def stop(self):
"""
Stop the timer for this ``GcPolicy``.
"""
self._timer.stop()
[docs]
def disable(self):
"""
Disable this ``GcPolicy`` and its corresponding timer.
"""
self._timer.disable()
[docs]
@classmethod
def from_dict(cls, data):
"""
Instantiate a ``GcPolicy`` object from values in ``data``.
:param data: A dictionary describing a ``GcPolicy`` instance.
:type data: ``dict``
:returns: An instance of ``GcPolicy`` reflecting the values in
``data``.
:rtype: ``GcPolicy``
"""
policy_type = GcPolicyType[data["policy_type"]]
data.pop("policy_type")
policy_name = data["policy_name"]
data.pop("policy_name")
return cls(policy_name, policy_type, data)
[docs]
@classmethod
def from_json(cls, value):
"""
Instantiate a ``GcPolicy`` object from a JSON string in
``value``.
:param value: A JSON string describing a ``GcPolicyParams`` instance.
:type data: ``str``
:returns: An instance of a ``GcPolicyParams`` subclass reflecting
the values in the JSON string ``value``.
:rtype: ``GcPolicyParams``
"""
return cls.from_dict(loads(value))
[docs]
@classmethod
def from_cmd_args(cls, cmd_args):
"""
Initialise garbage collection policy from command line arguments.
Construct a new ``GcPolicy`` object from the command line
arguments in ``cmd_args``. The ``GcPolicyType`` is determined by
the ``policy_type`` argument, and the type-specific policy
parameters are initialised from the remaining arguments.
:param cmd_args: The command line selection arguments.
:returns: A new ``GcPolicy`` instance
:rtype: ``GcPolicy``
"""
policy_type = cmd_args.policy_type.upper()
name = cmd_args.schedule_name
if policy_type == GcPolicyType.ALL.name:
params = {}
elif policy_type == GcPolicyType.COUNT.name:
params = {"keep_count": cmd_args.keep_count}
elif policy_type == GcPolicyType.AGE.name:
params = {
"keep_years": cmd_args.keep_years,
"keep_months": cmd_args.keep_months,
"keep_weeks": cmd_args.keep_weeks,
"keep_days": cmd_args.keep_days,
}
elif policy_type == GcPolicyType.TIMELINE.name:
params = {
"keep_yearly": cmd_args.keep_yearly,
"keep_quarterly": cmd_args.keep_quarterly,
"keep_monthly": cmd_args.keep_monthly,
"keep_weekly": cmd_args.keep_weekly,
"keep_daily": cmd_args.keep_daily,
"keep_hourly": cmd_args.keep_hourly,
}
else:
raise SnapmArgumentError(f"Unknown policy type: {policy_type}")
return GcPolicy(name, GcPolicyType[policy_type], params)
[docs]
def evaluate(self, sets: List[SnapshotSet]):
"""
Evaluate the list of ``SnapshotSet`` objects in ``sets``
against this ``GcPolicy`` and return a list of ``SnapshotSet`` objects
that should be garbage collected.
:param sets: The list of ``SnapshotSet`` objects to evaluate,
sorted in order of increasing creation date.
:type sets: ``list[SnapshotSet]``.
:returns: A list of ``SnapshotSet`` objects to garbage collect.
:rtype: ``list[SnapshotSet]``
"""
return self.params.evaluate(sets)
# pylint: disable=too-many-public-methods
[docs]
class Schedule:
"""
An individual snapshot schedule instance with create and garbage
collection timers. Tracks timer configuration, name, sources, size
policies, enabled/disabled, nr snapshots, next elapse.
"""
[docs]
def __init__(
self,
name: str,
sources: List[str],
default_size_policy: Union[str, None],
autoindex: bool,
calendarspec: str,
gc_policy: GcPolicy,
boot=False,
revert=False,
):
"""
Initialse a new ``Schedule`` instance.
:param name: The name of the ``Schedule``.
:type name: ``str``
:param sources: The souce specs to include in this ``Schedule``.
:type sources: ``list[str]``
:param default_size_policy: The default size policy for this
``Schedule``.
:type default_size_policy: ``str``
:param autoindex: Enable autoindex names for this ``Schedule``.
:type autoindex: ``bool``
:param calendarspec: The ``OnCalendar`` expression for this ``Schedule``.
:type calendarspec: ``str``
:param policy: The garbage collection policy for this ``Schedule``.
:type policy: ``GcPolicy``
:returns: The new ``Schedule`` instance.
:rtype: ``Schedule``
"""
self._name = name
self._sources = sources
self._default_size_policy = default_size_policy or None
self._autoindex = autoindex
self._gc_policy = gc_policy
self._timer = Timer(TimerType.CREATE, name, calendarspec)
self._boot = boot
self._revert = revert
self._sched_path = None
[docs]
def __str__(self):
"""
Return a human-readable representation of this ``Schedule``.
:returns: A human-readable string.
:rtype: ``str``
"""
as_dict = self.to_dict()
as_dict["enabled"] = "yes" if self.enabled else "no"
as_dict["running"] = "yes" if self.running else "no"
as_dict["boot"] = "yes" if self.boot else "no"
as_dict["revert"] = "yes" if self.revert else "no"
as_dict["gc_policy"] = textwrap.indent("\n" + str(self.gc_policy), 4 * " ")
as_dict["sources"] = ", ".join(as_dict["sources"])
as_dict["next_elapse"] = self.next_elapse
as_dict.pop("autoindex")
return "\n".join(
f"{key.title().replace('_', '')}: {value}" for key, value in as_dict.items()
)
[docs]
def __repr__(self):
"""
Return a machine-readable representation of this ``Schedule``.
:returns: A machine-readable string.
:rtype: ``str``
"""
return (
f"Schedule('{self.name}', {self.sources}, "
f"'{self.default_size_policy}', {self.autoindex}, "
f"'{self.calendarspec}', {repr(self.gc_policy)})"
)
[docs]
def __eq__(self, other):
"""
Test for ``Schedule`` equality.
"""
return (
self.name == other.name
and self.sources == other.sources
and self.default_size_policy == other.default_size_policy
and self.autoindex == other.autoindex
and self.calendarspec == other.calendarspec
and self.gc_policy == other.gc_policy
and self.boot == other.boot
and self.revert == other.revert
)
[docs]
def to_dict(self):
"""
Return a dictionary representation of this ``Schedule``.
:returns: This ``Schedule`` as a dictionary.
:rtype: ``dict``
"""
attrs = [
"name",
"sources",
"default_size_policy",
"autoindex",
"calendarspec",
"boot",
"revert",
]
sched_dict = {attr: getattr(self, attr) for attr in attrs}
sched_dict["gc_policy"] = self.gc_policy.to_dict()
return sched_dict
[docs]
def json(self, pretty=False):
"""
Return a JSON representation of this ``Schedule``.
:returns: This ``Schedule`` as a JSON string.
:rtype: ``str``
"""
return dumps(self.to_dict(), indent=4 if pretty else None)
@property
def name(self):
"""
The name of this ``Schedule``.
"""
return self._name
@property
def sources(self):
"""
The source specs value for this ``Schedule``.
"""
return self._sources
@property
def default_size_policy(self):
"""
The default size policy for this ``Schedule``.
"""
return self._default_size_policy
@property
def autoindex(self):
"""
The autoindex property for this ``Schedule``.
"""
return self._autoindex
@property
def gc_policy(self):
"""
The garbage collection policy for this ``Schedule``.
"""
return self._gc_policy
@property
def calendarspec(self):
"""
The OnCalendar expression for the timer associated with this
``Schedule`` instance.
"""
return self._timer.calendarspec.original
@property
def next_elapse(self):
"""
Return the ``next_elapse`` time of this ``Schedule``'s timer.
"""
return self._timer.next_elapse
@property
def enabled(self):
"""
Return ``True`` if this ``Schedule`` and its corresponding timer are
enabled, and ``False`` otherwise.
"""
return self._timer.status in (TimerStatus.ENABLED, TimerStatus.RUNNING)
@property
def running(self):
"""
Return ``True`` if this ``Schedule`` and its corresponding timer are
enabled, and ``False`` otherwise.
"""
return self._timer.status == TimerStatus.RUNNING and self.gc_policy.running
@property
def boot(self):
"""
Retrun ``True`` if this ``Schedule`` is configuree to enable snapshot
set boot entries when creating snapshot sets.
"""
return self._boot
@property
def revert(self):
"""
Retrun ``True`` if this ``Schedule`` is configuree to enable snapshot
set revert entries when creating snapshot sets.
"""
return self._revert
[docs]
def enable(self):
"""
Enable this ``Schedule`` and its corresponding timers.
"""
self._timer.enable()
self.gc_policy.enable()
[docs]
def start(self):
"""
Start this the timer for this ``Schedule``.
"""
self._timer.start()
self.gc_policy.start()
[docs]
def stop(self):
"""
Stop the timer for this ``Schedule``.
"""
self._timer.stop()
self.gc_policy.stop()
[docs]
def disable(self):
"""
Disable this ``Schedule`` and its corresponding timer.
"""
self._timer.stop()
self._timer.disable()
self.gc_policy.stop()
self.gc_policy.disable()
[docs]
def gc(self, sets: List[SnapshotSet]):
"""
Apply the configured garbage collection policy for this ``Schedule``.
"""
to_delete = self.gc_policy.evaluate(sets)
for snapshot_set in to_delete:
delete_snapset_boot_entry(snapshot_set)
delete_snapset_revert_entry(snapshot_set)
snapshot_set.delete()
[docs]
def write_config(self, sched_dir: str):
"""
Write this ``Schedule``'s configuration to disk.
:param sched_dir: The path at which to write the configuration file.
:type sched_dir: ``str``
"""
json = self.json(pretty=True)
sched_path = join(sched_dir, f"{self.name}.json")
try:
# Write the schedule configuration file atomically
fd, tmp_path = tempfile.mkstemp(dir=sched_dir, prefix=".tmp_", text=True)
try:
with os.fdopen(fd, "w", encoding="utf8") as f:
f.write(json)
f.flush()
os.fdatasync(f.fileno())
os.rename(tmp_path, sched_path)
os.chmod(sched_path, _SCHEDULE_CONF_FILE_MODE)
# Ensure directory metadata is written to disk
dir_fd = os.open(sched_dir, os.O_DIRECTORY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except OSError as err: # pragma: no cover
os.unlink(tmp_path)
raise SnapmSystemError(
f"Filesystem error writing schedule file '{sched_path}': {err}"
) from err
except OSError as err: # pragma: no cover
raise SnapmSystemError(
f"Filesystem error writing schedule temporary file '{tmp_path}': {err}"
) from err
self._sched_path = sched_path
[docs]
def delete_config(self):
"""
Delete this ``Schedule``'s on-disk configration.
"""
if not self._sched_path:
return
os.unlink(self._sched_path)
[docs]
@classmethod
def from_dict(cls, data):
"""
Instantiate a ``Schedule`` object from values in ``data``.
:param data: A dictionary describing a ``Schedule`` instance.
:type data: ``dict``
:returns: An instance of ``Schedule`` reflecting the values in
``data``.
:rtype: ``GcPolicyParams``
"""
policy = GcPolicy.from_dict(data.pop("gc_policy"))
return Schedule(
data["name"],
data["sources"],
data["default_size_policy"],
data["autoindex"],
data["calendarspec"],
policy,
boot=data["boot"],
revert=data["revert"],
)
[docs]
@classmethod
def from_file(cls, sched_file):
"""
Initialise a new ``Schedule`` instance from an on-disk JSON
configuration file.
:param sched_file: The path to the schedule configuration file.
:type sched_file: ``str``
:returns: A new ``Schedule`` instance.
:rtype: ``Schedule``
"""
try:
with open(sched_file, "r", encoding="utf8") as fp:
json = fp.read()
sched_dict = loads(json)
except OSError as err: # pragma: no cover
raise SnapmSystemError(
f"Filesystem error reading schedule file '{sched_file}': {err}"
) from err
schedule = Schedule.from_dict(sched_dict)
schedule._sched_path = sched_file
return schedule
__all__ = [
"Schedule",
"GcPolicy",
"GcPolicyType",
"GcPolicyParams",
]