Source code for mizani.breaks

"""
All scales have a means by which the values that are mapped
onto the scale are interpreted. Numeric digital scales put
out numbers for direct interpretation, but most scales
cannot do this. What they offer is named markers/ticks that
aid in assessing the values e.g. the common odometer will
have ticks and values to help gauge the speed of the vehicle.

The named markers are what we call breaks. Properly calculated
breaks make interpretation straight forward. These functions
provide ways to calculate good(hopefully) breaks.
"""

from __future__ import annotations

import sys
from dataclasses import dataclass
from datetime import date, datetime
from itertools import product
from typing import TYPE_CHECKING

import numpy as np
import pandas as pd

from .utils import (
    log,
    min_max,
    round_any,
)

if TYPE_CHECKING:
    from typing import Sequence

    from mizani.typing import (
        DatetimeOffset,
        FloatArrayLike,
        NDArrayFloat,
        Timedelta,
        TimedeltaArrayLike,
        TimedeltaOffset,
        Trans,
    )


__all__ = [
    "breaks_log",
    "breaks_symlog",
    "minor_breaks",
    "minor_breaks_trans",
    "breaks_date",
    "breaks_date_width",
    "breaks_width",
    "breaks_timedelta",
    "breaks_timedelta_width",
    "breaks_extended",
]


[docs] @dataclass class breaks_log: """ Integer breaks on log transformed scales Parameters ---------- n : int Desired number of breaks base : int Base of logarithm Examples -------- >>> x = np.logspace(3, 6) >>> limits = min(x), max(x) >>> breaks_log()(limits) array([ 1000, 10000, 100000, 1000000]) >>> breaks_log(2)(limits) array([ 1000, 100000]) >>> breaks_log()([0.1, 1]) array([0.1, 0.3, 1. , 3. ]) """ n: int = 5 base: float = 10
[docs] def __call__(self, limits: tuple[float, float]) -> NDArrayFloat: """ Compute breaks Parameters ---------- limits : tuple Minimum and maximum values Returns ------- out : array_like Sequence of breaks points """ if any(np.isinf(limits)): return np.array([]) n = self.n base = self.base rng = log(limits, base) _min = int(np.floor(rng[0])) _max = int(np.ceil(rng[1])) # Prevent overflow if float(base) ** _max > sys.maxsize: base = float(base) if _max == _min: return np.array([base**_min]) # Try getting breaks at the integer powers of the base # e.g [1, 100, 10000, 1000000] # If there are too few breaks, try other points using the # _log_sub_breaks by = int(np.floor((_max - _min) / n)) + 1 for step in range(by, 0, -1): breaks = np.array([base**i for i in range(_min, _max + 1, step)]) relevant_breaks = (limits[0] <= breaks) & (breaks <= limits[1]) if np.sum(relevant_breaks) >= n - 2: return breaks return _breaks_log_sub(n=n, base=base)(limits)
@dataclass class _breaks_log_sub: """ Breaks for log transformed scales Calculate breaks that do not fall on integer powers of the base. Parameters ---------- n : int Desired number of breaks base : int | float Base of logarithm Notes ----- Credit: Thierry Onkelinx (thierry.onkelinx@inbo.be) for the original algorithm in the r-scales package. """ n: int = 5 base: float = 10 def __call__(self, limits: tuple[float, float]) -> NDArrayFloat: base = self.base n = self.n rng = log(limits, base) _min = int(np.floor(rng[0])) _max = int(np.ceil(rng[1])) steps = [1] # Prevent overflow if float(base) ** _max > sys.maxsize: base = float(base) def delta(x): """ Calculates the smallest distance in the log scale between the currently selected breaks and a new candidate 'x' """ arr = np.sort(np.hstack([x, steps, base])) log_arr = log(arr, base) return np.min(np.diff(log_arr)) if self.base == 2: return np.array([base**i for i in range(_min, _max + 1)]) candidate = np.arange(base + 1) candidate = np.compress( (candidate > 1) & (candidate < base), candidate ) while len(candidate): best = np.argmax([delta(x) for x in candidate]) steps.append(candidate[best]) candidate = np.delete(candidate, best) _factors = [base**i for i in range(_min, _max + 1)] breaks = np.array([f * s for f, s in product(_factors, steps)]) relevant_breaks = (limits[0] <= breaks) & (breaks <= limits[1]) if np.sum(relevant_breaks) >= n - 2: breaks = np.sort(breaks) lower_end = np.max( [ np.min(np.where(limits[0] <= breaks)) - 1, 0, # type: ignore ] ) upper_end = np.min( [ np.max(np.where(breaks <= limits[1])) + 1, len(breaks), # type: ignore ] ) return breaks[lower_end : upper_end + 1] else: return breaks_extended(n=n)(limits)
[docs] @dataclass class minor_breaks: """ Compute minor breaks This is the naive method. It does not take into account the transformation. Parameters ---------- n : int Number of minor breaks between the major breaks. Examples -------- >>> major = [1, 2, 3, 4] >>> limits = [0, 5] >>> minor_breaks()(major, limits) array([0.5, 1.5, 2.5, 3.5, 4.5]) >>> minor_breaks()([1, 2], (1, 2)) array([1.5]) More than 1 minor break. >>> minor_breaks(3)([1, 2], (1, 2)) array([1.25, 1.5 , 1.75]) >>> minor_breaks()([1, 2], (1, 2), 3) array([1.25, 1.5 , 1.75]) """ n: int = 1
[docs] def __call__( self, major: FloatArrayLike, limits: tuple[float, float] | None = None, n: int | None = None, ) -> NDArrayFloat: """ Minor breaks Parameters ---------- major : array_like Major breaks limits : array_like | None Limits of the scale. If *array_like*, must be of size 2. If **None**, then the minimum and maximum of the major breaks are used. n : int Number of minor breaks between the major breaks. If **None**, then *self.n* is used. Returns ------- out : array_like Minor beraks """ if len(major) < 2: return np.array([]) if limits is None: low, high = min_max(major) else: low, high = min_max(limits) if n is None: n = self.n # Try to infer additional major breaks so that # minor breaks can be generated beyond the first # and last major breaks diff = np.diff(major) step = diff[0] if len(diff) > 1 and all(diff == step): major = np.hstack([major[0] - step, major, major[-1] + step]) mbreaks = [] factors = np.arange(1, n + 1) for lhs, rhs in zip(major[:-1], major[1:]): sep = (rhs - lhs) / (n + 1) mbreaks.append(lhs + factors * sep) minor = np.hstack(mbreaks) minor = minor.compress((low <= minor) & (minor <= high)) return minor
[docs] @dataclass class minor_breaks_trans: """ Compute minor breaks for transformed scales The minor breaks are computed in data space. This together with major breaks computed in transform space reveals the non linearity of of a scale. See the log transforms created with :func:`log_trans` like :class:`log10_trans`. Parameters ---------- trans : trans or type Trans object or trans class. n : int Number of minor breaks between the major breaks. Examples -------- >>> from mizani.transforms import sqrt_trans >>> major = [1, 2, 3, 4] >>> limits = [0, 5] >>> t1 = sqrt_trans() >>> t1.minor_breaks(major, limits) array([1.58113883, 2.54950976, 3.53553391]) # Changing the regular `minor_breaks` method >>> t2 = sqrt_trans() >>> t2.minor_breaks = minor_breaks() >>> t2.minor_breaks(major, limits) array([0.5, 1.5, 2.5, 3.5, 4.5]) More than 1 minor break >>> major = [1, 10] >>> limits = [1, 10] >>> t2.minor_breaks(major, limits, 4) array([2.8, 4.6, 6.4, 8.2]) """ trans: Trans n: int = 1
[docs] def __call__( self, major: FloatArrayLike, limits: tuple[float, float] | None = None, n: int | None = None, ) -> NDArrayFloat: """ Minor breaks for transformed scales Parameters ---------- major : array_like Major breaks limits : array_like | None Limits of the scale. If *array_like*, must be of size 2. If **None**, then the minimum and maximum of the major breaks are used. n : int Number of minor breaks between the major breaks. If **None**, then *self.n* is used. Returns ------- out : array_like Minor breaks """ if limits is None: limits = min_max(major) if n is None: n = self.n major = self._extend_breaks(major) major = self.trans.inverse(major) limits = self.trans.inverse(limits) minor = minor_breaks(n)(major, limits) return self.trans.transform(minor)
def _extend_breaks(self, major: FloatArrayLike) -> FloatArrayLike: """ Append 2 extra breaks at either end of major If breaks of transform space are non-equidistant, :func:`minor_breaks` add minor breaks beyond the first and last major breaks. The solutions is to extend those breaks (in transformed space) before the minor break call is made. How the breaks depends on the type of transform. """ trans = self.trans trans = trans if isinstance(trans, type) else trans.__class__ # so far we are only certain about this extending stuff # making sense for log transform is_log = trans.__name__.startswith("log") diff = np.diff(major) step = diff[0] if is_log and all(diff == step): major = np.hstack([major[0] - step, major, major[-1] + step]) return major
[docs] @dataclass class breaks_date: """ Regularly spaced dates Parameters ---------- n : Desired number of breaks. Examples -------- >>> from datetime import datetime >>> limits = (datetime(2010, 1, 1), datetime(2026, 1, 1)) Default breaks will be regularly spaced but the spacing is automatically determined >>> breaks = breaks_date(9) >>> [d.year for d in breaks(limits)] [2010, 2012, 2014, 2016, 2018, 2020, 2022, 2024, 2026] """ n: int = 5
[docs] def __call__( self, limits: tuple[datetime, datetime] | tuple[date, date] ) -> Sequence[datetime]: """ Compute breaks Parameters ---------- limits : tuple Minimum and maximum :class:`datetime.datetime` values. Returns ------- out : array_like Sequence of break points. """ from mizani._datetime.breaks import by_n from mizani._datetime.utils import as_datetime if pd.isna(limits[0]) or pd.isna(limits[1]): return [] if isinstance(limits[0], np.datetime64) and isinstance( limits[1], np.datetime64 ): limits = limits[0].astype(object), limits[1].astype(object) limits = as_datetime(limits) return by_n(limits, self.n)
[docs] @dataclass class breaks_date_width: """ Regularly spaced dates by width Parameters ---------- width : str The interval between the breaks. A string of the form, "<number> <units>"`. The units are one of: microseconds milliseconds seconds minutes hours days weeks months years decades centuries or their singular forms. `secs` and `mins` or their singular forms are also recognised as abbreviations for seconds and minutes. offset : int | timedelta | str | Sequence[str] | relativedelta | None The breaks are set to start at some "nice" value but apply an offset you can shift them to a value you may prefer.. - If an `int`, the units will be the same as the width. - If a `Sequence`, it is of the form `("[+-]<number> <units>", "[+-]<number> <units>", ...)` e.g. `("1 year", "2 months", ...)`. - If a `str`, it is of the form `"[+-]<number> <units>"` e.g. `"2 years"`. - If `None`, do not shift. Examples -------- Breaks at 4 year intervals >>> limits = [datetime(2010, 1, 1), datetime(2025, 1, 1)] >>> breaks = breaks_date_width("4 years") >>> [d.year for d in breaks(limits)] [2010, 2014, 2018, 2022, 2026] >>> breaks = breaks_date_width("4 years", offset=1) >>> [d.year for d in breaks(limits)] [2011, 2015, 2019, 2023, 2027] """ width: str offset: int | DatetimeOffset = None
[docs] def __call__( self, limits: tuple[datetime, datetime] | tuple[date, date] ) -> Sequence[datetime]: """ Compute breaks Parameters ---------- limits : Minimum and maximum :class:`datetime.datetime` values. Returns ------- out : Sequence of break points. """ from mizani._datetime.breaks import by_width from mizani._datetime.utils import as_datetime if pd.isna(limits[0]) or pd.isna(limits[1]): return [] if isinstance(limits[0], np.datetime64) and isinstance( limits[1], np.datetime64 ): limits = limits[0].astype(object), limits[1].astype(object) limits = as_datetime(limits) return by_width(limits, self.width, self.offset)
[docs] @dataclass class breaks_timedelta: """ Timedelta breaks Returns ------- out : callable ``f(limits)`` A function that takes a sequence of two :class:`datetime.timedelta` values and returns a sequence of break points. Examples -------- >>> from datetime import timedelta >>> breaks = breaks_timedelta() >>> limits = (timedelta(days=0), timedelta(days=345)) >>> major = breaks(limits) >>> [b.days for b in major] [0, 70, 140, 210, 280, 350] """ n: int = 5
[docs] def __call__( self, limits: tuple[Timedelta, Timedelta] ) -> TimedeltaArrayLike: """ Compute breaks Parameters ---------- limits : tuple Minimum and maximum :class:`datetime.timedelta` values. Returns ------- out : array_like Sequence of break points. """ from mizani._timedelta.breaks import by_n return by_n(limits, self.n)
[docs] @dataclass class breaks_timedelta_width: """ Regularly spaced timedeltas by width Parameters ---------- width : str The interval between the breaks. A string of the form, "<number> <units>"`. The units are one of: microseconds milliseconds seconds minutes hours days weeks offset : Use this to shift the calculated breaks so that they start at a value you may prefer. - If an `int`, the units will be the same as the width. - If a `Sequence`, it is of the form `("[+-]<number> <units>", "[+-]<number> <units>", ...)` e.g. `("2 days", "12 hours", ...)`. - If a `str`, it is of the form `"[+-]<number> <units>"` e.g. `"4 hours"` - If `None`, do not shift. """ width: str offset: int | TimedeltaOffset = None
[docs] def __call__( self, limits: tuple[Timedelta, Timedelta] ) -> TimedeltaArrayLike: """ Compute breaks Parameters ---------- limits : Minimum and maximum :class:`datetime.timedelta` values. Returns ------- out : Sequence of break points. """ from mizani._timedelta.breaks import by_width return by_width(limits, self.width, self.offset)
[docs] @dataclass class breaks_extended: """ An extension of Wilkinson's tick position algorithm Parameters ---------- n : int Desired number of breaks Q : list List of nice numbers only_inside : bool If ``True``, then all the breaks will be within the given range. w : list Weights applied to the four optimization components (simplicity, coverage, density, and legibility). They should add up to 1. Examples -------- >>> limits = (0, 9) >>> breaks_extended()(limits) array([ 0. , 2.5, 5. , 7.5, 10. ]) >>> breaks_extended(n=6)(limits) array([ 0., 2., 4., 6., 8., 10.]) References ---------- - Talbot, J., Lin, S., Hanrahan, P. (2010) An Extension of Wilkinson's Algorithm for Positioning Tick Labels on Axes, InfoVis 2010. Additional Credit to Justin Talbot on whose code this implementation is almost entirely based. """ n: int = 5 Q: Sequence[float] = (1, 5, 2, 2.5, 4, 3) only_inside: bool = False w: Sequence[float] = (0.25, 0.2, 0.5, 0.05) def __post_init__(self): # Used for lookups during the computations self.Q_index = {q: i for i, q in enumerate(self.Q)} def coverage( self, dmin: float, dmax: float, lmin: float, lmax: float ) -> float: p1 = (dmax - lmax) ** 2 p2 = (dmin - lmin) ** 2 p3 = (0.1 * (dmax - dmin)) ** 2 return 1 - 0.5 * (p1 + p2) / p3 def coverage_max(self, dmin: float, dmax: float, span: float) -> float: range = dmax - dmin if span > range: half = (span - range) / 2.0 return 1 - (half**2) / (0.1 * range) ** 2 else: return 1 def density( self, k: float, dmin: float, dmax: float, lmin: float, lmax: float ) -> float: r = (k - 1.0) / (lmax - lmin) rt = (self.n - 1) / (max(lmax, dmax) - min(lmin, dmin)) return 2 - max(r / rt, rt / r) def density_max(self, k: float) -> float: if k >= self.n: return 2 - (k - 1.0) / (self.n - 1.0) else: return 1 def simplicity( self, q: float, j: float, lmin: float, lmax: float, lstep: float ) -> float: eps = 1e-10 n = len(self.Q) i = self.Q_index[q] + 1 if ( (lmin % lstep < eps or (lstep - lmin % lstep) < eps) and lmin <= 0 and lmax >= 0 ): v = 1 else: v = 0 return (n - i) / (n - 1.0) + v - j def simplicity_max(self, q: float, j: float) -> float: n = len(self.Q) i = self.Q_index[q] + 1 v = 1 return (n - i) / (n - 1.0) + v - j def legibility(self, lmin: float, lmax: float, lstep: float) -> float: # Legibility depends on fontsize, rotation, overlap ... i.e. # it requires drawing or simulating drawn breaks then calculating # a score. Return 1 ignores all that. return 1
[docs] def __call__(self, limits: tuple[float, float]) -> NDArrayFloat: """ Calculate the breaks Parameters ---------- limits : array Minimum and maximum values. Returns ------- out : array_like Sequence of break points. """ Q = self.Q w = self.w only_inside = self.only_inside simplicity_max = self.simplicity_max density_max = self.density_max coverage_max = self.coverage_max simplicity = self.simplicity coverage = self.coverage density = self.density legibility = self.legibility log10 = np.log10 ceil = np.ceil floor = np.floor # casting prevents the typechecker from mixing # float & np.float32 dmin, dmax = float(limits[0]), float(limits[1]) if dmin > dmax: dmin, dmax = dmax, dmin elif dmin == dmax: return np.array([dmin]) best_score = -2.0 best = (0, 0, 0, 0, 0) # Gives Empty breaks j = 1.0 while j < float("inf"): for q in Q: sm = simplicity_max(q, j) if w[0] * sm + w[1] + w[2] + w[3] < best_score: j = float("inf") break k = 2.0 while k < float("inf"): dm = density_max(k) if w[0] * sm + w[1] + w[2] * dm + w[3] < best_score: break delta = (dmax - dmin) / (k + 1) / j / q z: float = ceil(log10(delta)) while z < float("inf"): step = j * q * (10**z) cm = coverage_max(dmin, dmax, step * (k - 1)) if ( w[0] * sm + w[1] * cm + w[2] * dm + w[3] < best_score ): break min_start = int(floor(dmax / step) * j - (k - 1) * j) max_start = int(ceil(dmin / step) * j) if min_start > max_start: z = z + 1 break for start in range(min_start, max_start + 1): lmin = start * (step / j) lmax = lmin + step * (k - 1) lstep = step s = simplicity(q, j, lmin, lmax, lstep) c = coverage(dmin, dmax, lmin, lmax) d = density(k, dmin, dmax, lmin, lmax) l = legibility(lmin, lmax, lstep) score = w[0] * s + w[1] * c + w[2] * d + w[3] * l if score > best_score and ( not only_inside or (lmin >= dmin and lmax <= dmax) ): best_score = score best = (lmin, lmax, lstep, q, k) z = z + 1 k = k + 1 j = j + 1 locs = best[0] + np.arange(best[4]) * best[2] return locs
[docs] class breaks_symlog: """ Breaks for the Symmetric Logarithm Transform Examples -------- >>> limits = (-100, 100) >>> breaks_symlog()(limits) array([-100, -10, 0, 10, 100]) """
[docs] def __call__(self, limits: tuple[float, float]) -> NDArrayFloat: def _signed_log10(x): return np.round(np.sign(x) * np.log10(np.sign(x) * x)).astype(int) l, h = _signed_log10(limits) exps = np.arange(l, h + 1, 1) return np.sign(exps) * (10 ** np.abs(exps))
[docs] @dataclass class breaks_width: """ Regularly spaced dates by width Parameters ---------- width : The interval between the breaks. offset : Shift the calculated breaks by this much. Examples -------- Breaks at 4 year intervals >>> limits = [3, 14] >>> breaks = breaks_width(width=4) >>> breaks(limits) array([ 0, 4, 8, 12, 16]) """ width: float offset: float | None = None
[docs] def __call__(self, limits: tuple[float, float]) -> NDArrayFloat: offset = 0 if self.offset is None else self.offset start = round_any(limits[0], self.width, np.floor) + offset end = round_any(limits[1], self.width, np.ceil) + self.width dtype = ( int if isinstance(self.width, int) and isinstance(offset, int) else float ) return np.arange(start, end, self.width, dtype=dtype)
# Deprecated log_breaks = breaks_log trans_minor_breaks = minor_breaks_trans date_breaks = breaks_width breaks_timedelta = breaks_timedelta extended_breaks = breaks_extended