Source code for opinf.pre._shiftscale

# pre/_shiftscale.py
"""Preprocessing transformations based on elementary shifts and scalings."""

__all__ = [
    "shift",
    "scale",
    "ShiftTransformer",
    "ScaleTransformer",
    "ShiftScaleTransformer",
]

import numbers
import warnings
import numpy as np

from .. import errors, utils
from ._base import TransformerTemplate, requires_trained


# Functional paradigm =========================================================
[docs] def shift(states: np.ndarray, shift_by: np.ndarray = None): """Shift the columns of a snapshot matrix by a vector. Parameters ---------- states : (n, k) ndarray Matrix of k snapshots. Each column is a single snapshot. shift_by : (n,) ndarray Vector that is the same size as a single snapshot. If ``None`` (default), set to the mean of the columns of ``states``. Returns ------- states_shifted : (n, k) ndarray Shifted state matrix, i.e., ``states_shifted[:, j] = states[:, j] - shift_by``. shift_by : (n,) ndarray Shift factor, returned only if ``shift_by=None``. Since this is a one-dimensional array, it must be reshaped to be applied to a matrix, for example, ``states_shifted = states - shift_by.reshape(-1, 1)``. Examples -------- >>> import opinf # Shift Q by its mean, then shift Y by the same mean. >>> Q_shifted, qbar = opinf.pre.shift(Q) >>> Y_shifted = opinf.pre.shift(Y, qbar) # Shift Q by its mean, then undo the transformation by an inverse shift. >>> Q_shifted, qbar = opinf.pre.shift(Q) >>> Q_again = opinf.pre.shift(Q_shifted, -qbar) """ # Check dimensions. if states.ndim != 2: raise ValueError("'states' must be two-dimensional") # If not shift_by factor is provided, compute the mean column. learning = shift_by is None if learning: shift_by = np.mean(states, axis=1) if shift_by.ndim != 1: if shift_by.ndim == 2 and shift_by.shape[1] == 1: shift_by = shift_by[:, 0] else: raise ValueError("'shift_by' must be one-dimensional") # Shift the columns by the mean. states_shifted = states - shift_by.reshape((-1, 1)) return (states_shifted, shift_by) if learning else states_shifted
[docs] def scale(states: np.ndarray, scale_to: tuple, scale_from: tuple = None): r"""Scale the entries of a snapshot matrix to a specified interval. The scaling from the interval :math:`[a, b]` to the interval :math:`[a', b']` given by .. math:: q' = \frac{q - a}{b - a}(b' - a') + a', where :math:`q` is the original variable and :math:`q'` is the transformed variable. This follows :class:`sklearn.preprocessing.MinMaxScaler`. Parameters ---------- states : (n, k) ndarray Matrix of k snapshots to be scaled. Each column is a single snapshot. scale_to : (float, float) Desired minimum and maximum of the scaled data, i.e., :math:`[a', b']`. scale_from : (float, float) Minimum and maximum of the snapshot data, i.e., :math:`[a, b]`. If ``None`` (default), learn the scaling from the data: ``scale_from[0] = min(states)``; ``scale_from[1] = max(states)``. Returns ------- states_scaled : (n, k) ndarray Scaled snapshot matrix. scaled_to : (float, float) Bounds that the snapshot matrix was scaled to, i.e., ``scaled_to[0] = min(states_scaled)``; ``scaled_to[1] = max(states_scaled)``. Only returned if ``scale_from = None``. scaled_from : (float, float) Minimum and maximum of the snapshot data, i.e., the bounds that the data was scaled from. Only returned if ``scale_from = None``. Examples -------- >>> import opinf # Scale Q to [-1, 1] and then scale Y with the same transformation. >>> Qscaled, scaled_to, scaled_from = opinf.pre.scale(Q, (-1, 1)) >>> Yscaled = opinf.pre.scale(Y, scaled_to, scaled_from) # Scale Q to [0, 1], then undo the transformation by an inverse scaling. >>> Qscaled, scaled_to, scaled_from = opinf.pre.scale(Q, (0, 1)) >>> Q_again = opinf.pre.scale(Qscaled, scaled_from, scaled_to) """ # If no scale_from bounds are provided, learn them. learning = scale_from is None if learning: scale_from = np.min(states), np.max(states) # Check scales. if len(scale_to) != 2: raise ValueError("scale_to must have exactly 2 elements") if len(scale_from) != 2: raise ValueError("scale_from must have exactly 2 elements") # Do the scaling. mini, maxi = scale_to xmin, xmax = scale_from scl = (maxi - mini) / (xmax - xmin) states_scaled = states * scl + (mini - xmin * scl) return (states_scaled, scale_to, scale_from) if learning else states_scaled
# Object-oriented paradigm ====================================================
[docs] class ShiftTransformer(TransformerTemplate): r"""Shift snapshots by a given reference snapshot :math:`\bar{\q}`. For a vector :math:`\q\in\RR^n`, this transformation is :math:`\q \mapsto \q' = \q - \bar{\q}` with inverse transformation :math:`\q' \mapsto \q = \q' + \bar{\q}`. For a matrix :math:`\Q\in\RR^{n \times k}`, the transformation is applied columnwise. Writing :math:`\Q = [~\q_0~~\q_1~~\cdots~~\q_{k-1}~]`, .. math:: \Q \mapsto \Q' = \Q - \bar{\q}\mathbf{1}_k\trp = \left[\begin{array}{c|c|c|c} &&& \\ \q_0 - \bar{\q} & \q_1 - \bar{\q} & \cdots & \q_{k-1} - \bar{\q} \\ &&& \end{array}\right], with the inverse transformation defined similarly. Parameters ---------- reference_snapshot : (n,) ndarray Reference snapshot :math:`\bar{\q}\in\RR^n`. name : str or None Label for the state variable that this transformer acts on. Notes ----- In this class, the reference snapshot :math:`\bar{\q}` is provided explicitly. Use :class:`ShiftScaleTransformer` to define :math:`\bar{\q}` as the average training snapshot. """ def __init__(self, reference_snapshot, /, name=None): """Set the reference snapshot.""" super().__init__(name=name) if ( not isinstance(reference_snapshot, np.ndarray) or reference_snapshot.ndim != 1 ): raise TypeError( "reference snapshot must be a one-dimensional array" ) self.__qbar = reference_snapshot # Properties -------------------------------------------------------------- @property def reference(self): r"""Reference snapshot :math:`\bar{\q}\in\RR^n`.""" return self.__qbar @property def state_dimension(self): r"""Dimension :math:`n` of the state.""" return self.reference.shape[0] @state_dimension.setter def state_dimension(self, n): if not isinstance(n, numbers.Number) or n != self.state_dimension: raise AttributeError( "can't set attribute 'state_dimension'" f" to {n} != {self.state_dimension} = reference.size" ) # Main routines -----------------------------------------------------------
[docs] def fit(self, states): """Do nothing; this transformation is not learned from data. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. Returns ------- self Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_shape(states) return self
[docs] def fit_transform(self, states, inplace: bool = False): """Apply the shift. This method is equivalent to :meth:`transform` because the transformation is not learned from data (there is nothing to "fit"). Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_shifted: (n, ...) ndarray Matrix of `n`-dimensional shifted snapshots, or a single shifted snapshot. Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ return self.transform(states, inplace=inplace)
[docs] def transform(self, states, inplace: bool = False): """Apply the shift. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_shifted: (n, ...) ndarray Matrix of `n`-dimensional shifted snapshots, or a single shifted snapshot. Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_shape(states) Y = states if inplace else states.copy() Y -= self.reference.reshape((-1, 1)) if Y.ndim > 1 else self.reference return Y
[docs] def transform_ddts(self, ddts, inplace: bool = True): r"""Do nothing; this transformation does not affect derivatives. Parameters ---------- ddts : (n, ...) ndarray Matrix of `n`-dimensional snapshot time derivatives, or a single snapshot time derivative. inplace : bool If ``True`` (default), return ``ddts``. If ``False``, return a create a copy of ``ddts``. Returns ------- ddts : (n, ...) ndarray Snapshot time derivatives, or a copy of them if ``inplace=False``. Raises ------ ValueError If the ``ddts`` do not align with the :attr:`state_dimension`. """ return ddts if inplace else ddts.copy()
[docs] def inverse_transform(self, states_shifted, inplace=False, locs=None): """Apply the inverse shift. Parameters ---------- states_shifted : (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional shifted snapshots, or a single shifted snapshot. inplace : bool If ``True``, overwrite ``states_shifted`` during the inverse transformation. If ``False``, create a copy of the data to untransform. locs : slice or (p,) ndarray of integers or None If given, assume ``states_shifted`` contains the transformed snapshots at only the `p` indices described by ``locs``. Returns ------- states_unshifted: (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional unshifted snapshots, or the `p` entries of such at the indices specified by ``locs``. Raises ------ ValueError If the ``states_shifted`` do not align with the ``locs`` (when provided) or the :attr:`state_dimension` (when ``locs`` is not provided). """ if locs is not None: locs = self._check_locs(locs, states_shifted, "states_shifted") else: self._check_shape(states_shifted) Y = states_shifted if inplace else states_shifted.copy() qbar = self.reference if locs is None else self.reference[locs] Y += qbar.reshape((-1, 1)) if Y.ndim > 1 else qbar return Y
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile, overwrite=False): with utils.hdf5_savehandle(savefile, overwrite) as hf: meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["name"] = str(self.name) hf.create_dataset("reference_snapshot", data=self.reference)
[docs] @classmethod def load(cls, loadfile): with utils.hdf5_loadhandle(loadfile) as hf: name = hf["meta"].attrs["name"] return cls( hf["reference_snapshot"][:], name=(None if name == "None" else name), )
[docs] class ScaleTransformer(TransformerTemplate): r"""Scale (nondimensionalize) snapshots as a whole or by row. If the provided :attr:`scaler` is a number :math:`\alpha \neq 0`, this transformation simply multiplies the input by that scaler. For a vector :math:`\q\in\RR^n`, the transformation is :math:`\q \mapsto \q' = \alpha\q` with inverse transformation :math:`\q' \mapsto \q = \frac{1}{\alpha}\q'`, and similarly for matrices. If the :attr:`scaler` is a vector :math:`\boldsymbol{\alpha}\in\RR^{n}`, this transformation multiplies each row of the input by the corresponding entry of :math:`\boldsymbol{\alpha}`. For a vector :math:`\q\in\RR^n`, the transformation is :math:`\q\mapsto\q' = \boldsymbol{\alpha}\ast\q` where :math:`\ast` is the elementwise (Hadamard) product (``*`` in NumPy). The inverse transformation performs elementwise division (``/`` in NumPy). For a matrix :math:`\Q\in\RR^{n \times k}`, the transformation is applied columnwise: writing :math:`\Q = [~\q_0~~\q_1~~\cdots~~\q_{k-1}~]`, .. math:: \Q \mapsto \Q' = \Q \ast \boldsymbol{\alpha}\1\trp = \left[\begin{array}{c|c|c|c} &&& \\ \q_0 \ast \boldsymbol{\alpha} & \q_1 \ast \boldsymbol{\alpha} & \cdots & \q_{k-1} \ast \boldsymbol{\alpha} \\ &&& \end{array}\right], with the inverse transformation defined similarly. Parameters ---------- scaler : float or (n,) ndarray Scaling factor. If a float, data are scaled as a whole; if an array, data are scaled by row. Must be nonzero or have all nonzero entries. name : str or None Label for the state variable that this transformer acts on. Notes ----- In this class, the scaler :math:`\alpha` or :math:`\boldsymbol{\alpha}` is provided explicitly. Use :class:`ShiftScaleTransformer` to learn different types of scaling from training data. """ def __init__(self, scaler, /, name=None): """Set the scaler.""" super().__init__(name=name) if not ( (isinstance(scaler, numbers.Number) and scaler != 0) or ( isinstance(scaler, np.ndarray) and scaler.ndim == 1 and np.count_nonzero(scaler) == scaler.size ) ): raise TypeError( "scaler must be a nonzero scalar or one-dimensional array" ) self.__scl = scaler if self.byrow: TransformerTemplate.state_dimension.fset(self, scaler.size) # Properties -------------------------------------------------------------- @property def scaler(self): """Scaling factor. If a float, data are scaled as a whole; if an array, data are scaled by row. Must be nonzero or have all nonzero entries. """ return self.__scl @property def byrow(self): """Whether data are scaled by row (``True``, :attr:`scaler` is an array) or as a whole (``False``, :attr:`scaler` is a float). """ return isinstance(self.scaler, np.ndarray) @property def state_dimension(self): r"""Dimension :math:`n` of the state.""" return TransformerTemplate.state_dimension.fget(self) @state_dimension.setter def state_dimension(self, n): if self.byrow and ( not isinstance(n, numbers.Number) or n != self.state_dimension ): raise AttributeError( "can't set attribute 'state_dimension'" f" to {n} != {self.state_dimension} = scaler.size" ) TransformerTemplate.state_dimension.fset(self, n) def __str__(self): lines = super().__str__().split("\n ") lines.append( "scaling by row" if self.byrow else f"scaler: {self.scaler:.4e}" ) return "\n ".join(lines) # Main routines -----------------------------------------------------------
[docs] def fit(self, states): """Set the :attr:`state_dimension` if :attr:`scaler` is not an array, otherwise do nothing. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. Returns ------- self Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension` (only when :attr:`scaler` is an array) """ if not self.byrow: self.state_dimension = states.shape[0] self._check_shape(states) return self
[docs] def fit_transform(self, states, inplace=False): """Set the :attr:`state_dimension` if :attr:`scaler` is not an array, and apply the scaling. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_scaled: (n, ...) ndarray Matrix of `n`-dimensional scaled snapshots, or a single scaled snapshot. Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension` (only when :attr:`scaler` is an array). """ self.fit(states) return self.transform(states, inplace=inplace)
[docs] @requires_trained def transform(self, states, inplace=False): """Apply the scaling. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_scaled: (n, ...) ndarray Matrix of `n`-dimensional shifted snapshots, or a single shifted snapshot. Raises ------ AttributeError If :attr:`scaler` is a number (not an array) but :meth:`fit` or :meth:`fit_transform` have not been called yet. ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_shape(states) Y = states if inplace else states.copy() _flip = self.byrow and Y.ndim > 1 Y *= self.scaler.reshape((-1, 1)) if _flip else self.scaler return Y
[docs] @requires_trained def transform_ddts(self, ddts, inplace=False): """Apply the scaling; the transformation for derivatives is the same as for snapshots. Parameters ---------- ddts : (n, ...) ndarray Matrix of `n`-dimensional snapshot time derivatives, or a single snapshot time derivative. inplace : bool If ``True``, modify ``ddts`` inplace. If ``False`` (default), return a new array. Returns ------- ddts_scaled : (n, ...) ndarray Scaled snapshot time derivatives. Raises ------ AttributeError If :attr:`scaler` is a number (not an array) but :meth:`fit` or :meth:`fit_transform` have not been called yet. ValueError If the ``ddts`` do not align with the :attr:`state_dimension`. """ return self.transform(ddts, inplace=inplace)
[docs] @requires_trained def inverse_transform(self, states_scaled, inplace=False, locs=None): """Apply the inverse scaling. Parameters ---------- states_scaled : (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional scaled snapshots, or a single scaled snapshot. inplace : bool If ``True``, overwrite ``states_scaled`` during the inverse transformation. If ``False``, create a copy of the data to untransform. locs : slice or (p,) ndarray of integers or None If given, assume ``states_scaled`` contains the transformed snapshots at only the `p` indices described by ``locs``. Returns ------- states_unscaled: (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional unscaled snapshots, or the `p` entries of such at the indices specified by ``locs``. Raises ------ AttributeError If :attr:`scaler` is a number (not an array) but :meth:`fit` or :meth:`fit_transform` have not been called yet. ValueError If the ``states_scaled`` do not align with the ``locs`` (when provided) or the :attr:`state_dimension` (when ``locs`` is not provided). """ scaler_ = self.scaler if locs is not None: locs = self._check_locs(locs, states_scaled) if self.byrow: scaler_ = scaler_[locs] else: self._check_shape(states_scaled) Y = states_scaled if inplace else states_scaled.copy() _flip = self.byrow and Y.ndim > 1 Y /= scaler_.reshape((-1, 1)) if _flip else scaler_ return Y
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile, overwrite=False): with utils.hdf5_savehandle(savefile, overwrite) as hf: meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["name"] = str(self.name) if (n := self.state_dimension) is not None: meta.attrs["state_dimension"] = n scaler = self.scaler if self.byrow else [self.scaler] hf.create_dataset("scaler", data=scaler)
[docs] @classmethod def load(cls, loadfile): with utils.hdf5_loadhandle(loadfile) as hf: meta = hf["meta"] name = meta.attrs["name"] scaler = hf["scaler"][:] if scaler.shape == (1,): scaler = scaler[0] out = cls(scaler, name=(None if name == "None" else name)) if not out.byrow and "state_dimension" in meta.attrs: out.state_dimension = int(meta.attrs["state_dimension"]) return out
[docs] class ShiftScaleTransformer(TransformerTemplate): r"""Process snapshots by vector centering and/or affine scaling (in that order). Transformations with this class are notated below as .. math:: \Q \mapsto \Q' ~\text{(centered)}~ \mapsto \Q'' ~\text{(centered/scaled)}, where :math:`\Q\in\RR^{n \times k}` is the snapshot matrix to be transformed and :math:`\Q''\in\RR^{n \times k}` is the transformed snapshot matrix. Transformation parameters are learned from a training data set, not provided explicitly by the user as in :class:`ShiftTransformer` or :class:`ScaleTransformer`. All transformations with this class are *affine* and hence can be written componentwise as :math:`\Q_{i,j}'' = \alpha_{i,j} \Q_{i,j} + \beta_{i,j}` for some choice of :math:`\alpha_{i,j},\beta_{i,j}\in\RR`. Parameters ---------- centering : bool If ``True``, shift the snapshots by the mean training snapshot, i.e., .. math:: \Q'_{:,j} = \Q_{:,j} - \frac{1}{k}\sum_{j=0}^{k-1}\Q_{:,j}. Otherwise, :math:`\Q' = \Q` (default). scaling : str or None If given, scale (non-dimensionalize) the centered snapshot entries. Otherwise, :math:`\Q'' = \Q'` (default). All scaling options multiply :math:`\Q'` by a constant; others (symmetric scalings, ``'standard'`` and those ending in ``'sym'``) shift the entries of :math:`\Q'` by a constant (the mean entry) as well. This is different from setting ``centering=True``, which shifts each column of :math:`\Q` by a vector; however, when ``centering=True`` symmetric scaling options are equivalent to their non-symmetric counterparts because in that case the mean of :math:`\Q'` is zero. **Options:** .. dropdown:: ``'standard'`` Standardize to zero mean and unit standard deviation .. list-table:: * - Formula - .. math:: \Q'' = \frac{\Q' - \mean(\Q')}{\std(\Q')} * - ``byrow=False`` - :math:`\mean(\Q'') = 0` and :math:`\std(\Q'') = 1` * - ``byrow=True`` - :math:`\mean_{j}(\Q_{i,j}'') = 0` and :math:`\std_j(\Q_{i,j}'') = 1` for each row index :math:`i` .. dropdown:: ``'minmax'`` Minmax scaling to :math:`[0, 1]` .. list-table:: * - Formula - .. math:: \Q'' = \frac{\Q'-\min(\Q')}{\max(\Q')-\min(\Q')} * - ``byrow=False`` - :math:`\min(\Q'') = 0` and :math:`\max(\Q'') = 1` * - ``byrow=True`` - :math:`\min_{j}(\Q_{i,j}'') = 0` and :math:`\max_{j}(\Q_{i,j}'') = 1` for each row index :math:`i` .. dropdown:: ``'minmaxsym'`` Minmax scaling to :math:`[-1, 1]` .. list-table:: * - Formula - .. math:: \Q'' = 2\frac{\Q'-\min(\Q')}{\max(\Q')-\min(\Q')}-1 * - ``byrow=False`` - :math:`\min(\Q'') = -1` and :math:`\max(\Q'') = 1` * - ``byrow=True`` - :math:`\min_{j}(\Q_{i,j}'') = -1` and :math:`\max_{j}(\Q_{i,j}'') = 1` for each row index :math:`i` .. dropdown:: ``'maxabs'`` Maximum absolute scaling to :math:`[-1, 1]` without scalar mean shift .. list-table:: * - Formula - .. math:: \Q'' = \frac{1}{\max(\text{abs}(\Q'))}\Q' * - ``byrow=False`` - :math:`\mean(\Q'')=\frac{\mean(\Q')}{\max(\text{abs}(\Q'))}` and :math:`\max(\text{abs}(\Q'')) = 1` * - ``byrow=True`` - :math:`\mean_{j}(\Q_{i,j}'') = \frac{\mean_j(\Q_{i,j}')}{\max_j(\text{abs}(\Q_{i,j}'))}` and :math:`\max_{j}(\text{abs}(\Q_{i,j}'')) = 1` for each row index :math:`i` .. dropdown:: ``'maxabssym'`` Maximum absolute scaling to :math:`[-1, 1]` with scalar mean shift .. list-table:: * - Formula - .. math:: \Q'' = \frac{\Q' - \mean(\Q')}{ \max(\text{abs}(\Q' - \mean(\Q')))} * - ``byrow=False`` - :math:`\mean(\Q'')=0` and :math:`\max(\text{abs}(\Q''))=1` * - ``byrow=True`` - :math:`\mean_j(\Q_{i,j}'') = 0` and :math:`\max_j(\text{abs}(\Q_{i,j}'')) = 1` for each row index :math:`i` .. dropdown:: ``'maxnorm'`` Maximum Euclidean norm scaling to :math:`[0, 1]` without scalar mean shift .. list-table:: * - Formula - .. math:: \Q'' = \frac{1}{\max_j(\|\Q'_{:,j}\|_2)}\Q' * - ``byrow=False`` - :math:`\mean(\Q'')=\frac{\mean(\Q')}{\max_j(\|\Q'_{:,j}\|)}` and :math:`\max_j(\|\Q''_{:,j}\|) = 1` * - ``byrow=True`` - ``ValueError``: use ``'maxabs'`` instead .. dropdown:: ``'maxnormsym'`` Maximum Euclidean norm scaling to :math:`[0, 1]` with scalar mean shift .. list-table:: * - Formula - .. math:: \Q'' = \frac{\Q' - \text{mean}(\Q')}{ \max_j(\|\Q'_{:,j} - \text{mean}(\Q')\|_2)} * - ``byrow=False`` - :math:`\mean(\Q'')=0` and :math:`\max_j(\|\Q''_{:,j}\|) = 1` * - ``byrow=True`` - ``ValueError``: use ``'maxabssym'`` instead byrow : bool If ``True``, scale each row of the snapshot matrix separately when a scaling is specified. Otherwise, scale the entire matrix at once (default). verbose : bool If ``True``, print information upon learning a transformation. Notes ----- A custom shifting vector (i.e., the mean snapshot) can be specified by setting the ``mean_`` attribute. Similarly, the scaling :math:`\q'\mapsto \q'' = \alpha \q' + \beta` can be adjusted by setting the ``scale_`` (:math:`\alpha`) and ``shift_`` (:math:`\beta`) attributes. However, calling :meth:`fit()` or :meth:`fit_transform()` will overwrite all three attributes. A cleaner alternative is to use a :class:`ShiftTransformer`, which takes a custom shifting vector, and/or a :class:`ScaleTransformer`, which takes a custom scaling. These can be joined with a :class:`TransformerPipeline`. """ _VALID_SCALINGS = frozenset( ( "standard", "minmax", "minmaxsym", "maxabs", "maxabssym", "maxnorm", "maxnormsym", ) ) _table_header = ( " | min | mean | max | std\n" "----|------------|------------|------------|------------" ) # TODO: allow scaling to be a tuple [a, b] to scale to (as in scale()). def __init__( self, centering: bool = False, scaling: str = None, byrow: bool = False, name: str = None, verbose: bool = False, ): """Set transformation hyperparameters.""" # Centering is always a boolean. self.__centering = bool(centering) # Verify scaling. if scaling is not None: if not isinstance(scaling, str): raise TypeError("'scaling' must be None or of type 'str'") if scaling not in self._VALID_SCALINGS: opts = ", ".join([f"'{v}'" for v in self._VALID_SCALINGS]) raise ValueError( f"invalid scaling '{scaling}'; valid options are {opts}" ) self.__scaling = scaling # Set byrow, warn if not applied. self.__byrow = bool(byrow) if self.__byrow and self.scaling is None: warnings.warn( "scaling=None --> byrow=True will have no effect", errors.OpInfWarning, ) if self.__byrow and self.__scaling in ("maxnorm", "maxnormsym"): raise ValueError( f"scaling '{self.__scaling}' is invalid when byrow=True" ) # Set other properties. self.verbose = verbose self.__qbar = None self.__alpha = None self.__beta = None TransformerTemplate.__init__(self, name) # Properties: transformation directives ----------------------------------- @property def centering(self) -> bool: """If ``True``, center the snapshots by the mean training snapshot.""" return self.__centering @property def scaling(self) -> str: """Type of scaling (non-dimensionalization).""" return self.__scaling @property def byrow(self) -> bool: """If ``True``, scale each row of the snapshot matrix separately.""" return self.__byrow @property def verbose(self) -> bool: """If ``True``, print information upon learning a transformation.""" return self.__verbose @verbose.setter def verbose(self, vbs): """Set the verbosity.""" self.__verbose = bool(vbs) # Properties: calibrated quantities --------------------------------------- @property def mean_(self): """Mean training snapshot. ``None`` unless ``centering = True``.""" return self.__qbar @mean_.setter def mean_(self, mean): """Set the mean vector.""" if not self.centering: raise AttributeError("cannot set mean_ (centering=False)") if self.state_dimension is None: if np.ndim(mean) != 1: raise ValueError("expected one-dimensional mean_") self.state_dimension = mean.shape[0] if np.shape(mean) != ((n := self.state_dimension),): raise ValueError(f"expected mean_ to be ({n:d},) ndarray") self.__qbar = mean @property def scale_(self): r"""Multiplicative factor of the scaling, the :math:`\alpha` of :math:`q'' = \alpha q' + \beta`. """ return self.__alpha @scale_.setter def scale_(self, alpha): """Set the multiplicative factor of the scaling.""" if self.scaling is None: raise AttributeError("cannot set scale_ (scaling=None)") if self.byrow: if self.state_dimension is None: if np.ndim(alpha) != 1: raise ValueError("expected one-dimensional scale_") self.state_dimension = alpha.shape[0] if np.shape(alpha) != ((n := self.state_dimension),): raise ValueError(f"expected scale_ to be ({n:d},) ndarray") self.__alpha = alpha @property def shift_(self): r"""Additive factor of the scaling, the :math:`\beta` of :math:`q'' = \alpha q' + \beta`. """ return self.__beta @shift_.setter def shift_(self, beta): """Set the multiplicative factor of the scaling.""" if self.scaling is None: raise AttributeError("cannot set shift_ (scaling=None)") if self.byrow: if self.state_dimension is None: if np.ndim(beta) != 1: raise ValueError("expected one-dimensional shift_") self.state_dimension = beta.shape[0] if np.shape(beta) != ((n := self.state_dimension),): raise ValueError(f"expected shift_ to be ({n:d},) ndarray") self.__beta = beta def __eq__(self, other) -> bool: """Test two ShiftScaleTransformers for equality.""" if not isinstance(other, self.__class__): return False for attr in ("centering", "scaling", "byrow"): if getattr(self, attr) != getattr(other, attr): return False if self.state_dimension != other.state_dimension: return False if self.centering and self.mean_ is not None: if other.mean_ is None: return False if not np.all(self.mean_ == other.mean_): return False if self.scaling and self.scale_ is not None: for attr in ("scale_", "shift_"): if (oat := getattr(other, attr)) is None: return False if not np.all(getattr(self, attr) == oat): return False return True # Printing ---------------------------------------------------------------- @staticmethod def _statistics_report(Q) -> str: """Return a string of basis statistics about a data set.""" return " | ".join( [f"{f(Q):>10.3e}" for f in (np.min, np.mean, np.max, np.std)] ) def __str__(self) -> str: out = super().__str__().split("\n ") out.append(f"centering: {self.centering}") s = " None" if self.scaling is None else f"'{self.scaling}'" out.append(f"scaling: {s}") if self.scaling is not None: out.append(f"byrow: {self.byrow}") return "\n ".join(out) # Main routines ----------------------------------------------------------- def _is_trained(self) -> bool: """Return True if transform() and inverse_transform() are ready.""" if self.centering and self.mean_ is None: return False if self.scaling and any( getattr(self, attr) is None for attr in ("scale_", "shift_") ): return False return True def _check_is_trained(self): """Raise an exception if the transformer is not trained.""" if not self._is_trained(): raise AttributeError( "transformer not trained, call fit() or fit_transform()" )
[docs] def fit_transform(self, states, inplace: bool = False): """Learn and apply the transformation. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_transformed: (n, k) ndarray Matrix of `k` `n`-dimensional transformed snapshots. Raises ------ ValueError If the ``states`` are not two-dimensional. """ if states.ndim != 2: raise ValueError("2D array required to fit transformer") self.state_dimension = states.shape[0] Y = states if inplace else states.copy() axis = 1 if self.byrow else None # Record statistics of the training data. if self.verbose: report = ["No transformation learned"] report.append(self._table_header) report.append(f"Q | {self._statistics_report(Y)}") # Center the snapshots by the mean training snapshot. if self.centering: self.mean_ = np.mean(Y, axis=1) Y -= self.mean_.reshape((-1, 1)) if self.verbose: report[0] = "Learned mean centering Q -> Q'" report.append(f"Q' | {self._statistics_report(Y)}") # Scale (non-dimensionalize) the centered snapshot entries. if self.scaling: # Standard: Q' = (Q - mu)/sigma if self.scaling == "standard": mu = np.mean(Y, axis=axis) sigma = np.std(Y, axis=axis) self.scale_ = 1 / sigma self.shift_ = -mu * self.scale_ # Min-max: Q' = (Q - min(Q))/(max(Q) - min(Q)) elif self.scaling == "minmax": Ymin = np.min(Y, axis=axis) Ymax = np.max(Y, axis=axis) self.scale_ = 1 / (Ymax - Ymin) self.shift_ = -Ymin * self.scale_ # Symmetric min-max: Q' = (Q - min(Q))*2/(max(Q) - min(Q)) - 1 elif self.scaling == "minmaxsym": Ymin = np.min(Y, axis=axis) Ymax = np.max(Y, axis=axis) self.scale_ = 2 / (Ymax - Ymin) self.shift_ = -Ymin * self.scale_ - 1 # MaxAbs: Q' = Q / max(abs(Q)) elif self.scaling == "maxabs": self.scale_ = 1 / np.max(np.abs(Y), axis=axis) self.shift_ = ( 0 if axis is None else np.zeros(self.state_dimension) ) # Symmetric MaxAbs: Q' = (Q - mean(Q)) / max(abs(Q - mean(Q))) elif self.scaling == "maxabssym": mu = np.mean(Y, axis=axis) Y -= mu if axis is None else mu.reshape((-1, 1)) self.scale_ = 1 / np.max(np.abs(Y), axis=axis) self.shift_ = -mu * self.scale_ Y += mu if axis is None else mu.reshape((-1, 1)) # MaxNorm: Q' = Q / max(norm(Q)) elif self.scaling == "maxnorm": # scale such that the norm of each snapshot is <= 1 if self.byrow: # pragma: nocover raise RuntimeError( f"invalid scaling '{self.scaling}' for byrow=True" ) self.scale_ = 1 / np.max(np.linalg.norm(Y, axis=0, ord=2)) self.shift_ = 0 # Symmetric MaxNorm: Q' = (Q - mean(Q)) / max(norm(Q - mean(Q))) elif self.scaling == "maxnormsym": if self.byrow: # pragma: nocover raise RuntimeError( f"invalid scaling '{self.scaling}' for byrow=True" ) mu = np.mean(Y) self.scale_ = 1 / np.max(np.linalg.norm(Y - mu, axis=0, ord=2)) self.shift_ = -mu * self.scale_ else: # pragma: nocover raise RuntimeError(f"invalid scaling '{self.scaling}'") # Apply the scaling. Y *= self.scale_ if axis is None else self.scale_.reshape((-1, 1)) Y += self.shift_ if axis is None else self.shift_.reshape((-1, 1)) if self.verbose: if self.centering: report[0] += f" and {self.scaling} scaling Q' -> Q''" else: report[0] = f"Learned {self.scaling} scaling Q -> Q''" report.append(f"Q'' | {self._statistics_report(Y)}") if self.verbose: if self.name is not None: report.insert(0, f"<{self.name}>") print("\n".join(report) + "\n") return Y
[docs] def transform(self, states, inplace: bool = False): """Apply the learned transformation. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_transformed: (n, ...) ndarray Matrix of `n`-dimensional transformed snapshots, or a single transformed snapshot. Raises ------ AttributeError If the transformer is not ready because :meth:`fit` or :meth:`fit_transform` have not been called. ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_is_trained() self._check_shape(states) Y = states if inplace else states.copy() # Center the snapshots by the mean training snapshot. if self.centering: Y -= self.mean_.reshape((-1, 1)) if Y.ndim > 1 else self.mean_ # Scale (non-dimensionalize) the centered snapshot entries. if self.scaling is not None: _flip = self.byrow and Y.ndim > 1 Y *= self.scale_.reshape((-1, 1)) if _flip else self.scale_ Y += self.shift_.reshape((-1, 1)) if _flip else self.shift_ return Y
[docs] def transform_ddts(self, ddts, inplace: bool = False): r"""Apply the learned transformation to snapshot time derivatives. Denoting the transformation by :math:`\mathcal{T}(\q) = \alpha(\q - \bar{\q}) + \beta`, this is the function :math:`\mathcal{T}'(\z) = \alpha\z`. Hence, :math:`\mathcal{T}'(\ddt q) = \ddt \mathcal{T}(q)`. Parameters ---------- ddts : (n, ...) ndarray Matrix of `n`-dimensional snapshot time derivatives, or a single snapshot time derivative. inplace : bool If True, overwrite ``ddts`` during the transformation. If False, create a copy of the data to transform. Returns ------- ddts_transformed : (n, ...) ndarray Transformed `n`-dimensional snapshot time derivatives. Raises ------ AttributeError If the transformer is not ready because :meth:`fit` or :meth:`fit_transform` have not been called. ValueError If the ``ddts`` do not align with the :attr:`state_dimension`. """ self._check_is_trained() self._check_shape(ddts) Z = ddts if inplace else ddts.copy() if self.scaling is not None: _flip = self.byrow and Z.ndim > 1 Z *= self.scale_.reshape((-1, 1)) if _flip else self.scale_ return Z
[docs] def inverse_transform( self, states_transformed, inplace: bool = False, locs=None, ): """Apply the inverse of the learned transformation. Parameters ---------- states_transformed : (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional transformed snapshots, or a single transformed snapshot. inplace : bool If ``True``, overwrite ``states_transformed`` during the inverse transformation. If ``False``, create a copy of the data to untransform. locs : slice or (p,) ndarray of integers or None If given, assume ``states_transformed`` contains the transformed snapshots at only the `p` indices described by ``locs``. Returns ------- states_untransformed: (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional untransformed snapshots, or the `p` entries of such at the indices specified by ``locs``. Raises ------ AttributeError If the transformer is not ready because :meth:`fit` or :meth:`fit_transform` have not been called. ValueError If the ``states_transformed`` do not align with the ``locs`` (when provided) or the :attr:`state_dimension` (when ``locs`` is not provided). """ self._check_is_trained() shift_, scale_ = self.shift_, self.scale_ if locs is not None: locs = self._check_locs(locs, states_transformed) if self.byrow: shift_ = shift_[locs] scale_ = scale_[locs] else: self._check_shape(states_transformed) Y = states_transformed if inplace else states_transformed.copy() # Unscale (re-dimensionalize) the data. if self.scaling: _flip = self.byrow and Y.ndim > 1 Y -= shift_.reshape((-1, 1)) if _flip else shift_ Y /= scale_.reshape((-1, 1)) if _flip else scale_ # Uncenter the unscaled snapshots. if self.centering: mean_ = self.mean_ if locs is None else self.mean_[locs] Y += mean_.reshape((-1, 1)) if Y.ndim > 1 else mean_ return Y
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False) -> None: with utils.hdf5_savehandle(savefile, overwrite) as hf: # Store transformation hyperparameter metadata. meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["centering"] = self.centering meta.attrs["scaling"] = self.scaling if self.scaling else False meta.attrs["byrow"] = self.byrow meta.attrs["verbose"] = self.verbose meta.attrs["name"] = str(self.name) # Store learned transformation parameters. n = self.state_dimension meta.attrs["state_dimension"] = n if n is not None else False if self.centering and self.mean_ is not None: hf.create_dataset( "transformation/mean_", data=self.mean_, ) if self.scaling and self.scale_ is not None: hf.create_dataset( "transformation/scale_", data=self.scale_ if self.byrow else [self.scale_], ) hf.create_dataset( "transformation/shift_", data=self.shift_ if self.byrow else [self.shift_], )
[docs] @classmethod def load(cls, loadfile: str): with utils.hdf5_loadhandle(loadfile) as hf: # Load transformation hyperparameters. meta = hf["meta"] scl = meta.attrs["scaling"] name = meta.attrs["name"] # Instantiate transformer. transformer = cls( centering=bool(meta.attrs["centering"]), scaling=(scl if scl else None), byrow=meta.attrs["byrow"], name=(None if name == "None" else name), verbose=meta.attrs["verbose"], ) # Load learned transformation parameters. n = meta.attrs["state_dimension"] transformer.state_dimension = None if not n else n if transformer.centering and "transformation/mean_" in hf: transformer.mean_ = hf["transformation/mean_"][:] if transformer.scaling and "transformation/scale_" in hf: ind = slice(None) if transformer.byrow else 0 transformer.scale_ = hf["transformation/scale_"][ind] transformer.shift_ = hf["transformation/shift_"][ind] return transformer