Source code for opinf.pre._multi

# pre/_multi.py
"""Transformer for states with multiple variables."""

__all__ = [
    "TransformerPipeline",
    "NullTransformer",
    "TransformerMulti",
]

import warnings
import numpy as np

from .. import errors, utils
from ._base import TransformerTemplate, requires_trained


# Horizontal joining ==========================================================
[docs] class TransformerPipeline(TransformerTemplate): r"""Chain multiple transformers. Given :math:`\tau\in\NN` transformers :math:`\mathcal{T}_1,\ldots,\mathcal{T}_{\tau}`, this class defines the compositional transformer :math:`\mathcal{T} = \mathcal{T}_{\tau}\circ\cdots\circ\mathcal{T}_1`. Parameters ---------- transformers : tuple of instantiated Transformer objects Transformers to be chained together; `transformers[0]` is applied first, then `transformers[1]`, and so on. name : str or None Label for the state variable that this transformer acts on. Notes ----- This class connects multiple transformers "horizontally"; see :class:`TransformerMulti` to connect multiple transformers "vertically", i.e., to assign different transformations for different parts of the state. """ def __init__(self, transformers, name=None): """Set the transformers.""" super().__init__(name=name) if isinstance(transformers, list): transformers = tuple(transformers) else: raise TypeError("'transformers' should be a list or tuple") statedims = set() for i, tf in enumerate(transformers): if not isinstance(tf, (TransformerTemplate, TransformerMulti)): warnings.warn( f"transformers[{i}] does not inherit from " "TransformerTemplate, unexpected behavior may occur", errors.OpInfWarning, ) statedims.add(tf.state_dimension) statedims.discard(None) if len(statedims) > 1: raise ValueError("transformers have inconsistent state_dimension") if len(transformers) == 1: warnings.warn( "only one transformer provided to TransformerPipeline", errors.OpInfWarning, ) self.__transformers = tuple(transformers) # Properties -------------------------------------------------------------- @property def transformers(self) -> tuple: """Transformers being chained together; `transformers[0]` is applied first, then `transformers[1]`, and so on. """ return self.__transformers @property def num_transformers(self) -> int: """Number of transformers chained together.""" return len(self.transformers) @property def state_dimension(self) -> int: r"""Dimension :math:`n` of the state.""" for tf in self.transformers: if (r := tf.state_dimension) is not None: return r return None def __str__(self): lines = super().__str__().split("\n ") lines.append(f"num_transformers: {self.num_transformers}") lines.append("transformers") for tf in self.transformers: tfstr = str(tf).split("\n ") tfstr[0] = f" {tfstr[0]}" lines.append("\n ".join(tfstr)) return "\n ".join(lines) def __len__(self): return self.num_transformers # Main routines ----------------------------------------------------------- def _chain(self, method, states, inplace=False, **kwargs): """Apply the specified method for each transformer (forward).""" func = getattr(self.transformers[0], method) Y = func(states, inplace=inplace, **kwargs) for tf in self.transformers[1:]: func = getattr(tf, method) Y = func(Y, inplace=True, **kwargs) return Y
[docs] def fit_transform(self, states, inplace=False): return self._chain("fit_transform", states, inplace=inplace)
[docs] def transform(self, states, inplace=False): return self._chain("transform", states, inplace=inplace)
[docs] def inverse_transform(self, states_transformed, inplace=False, locs=None): Y = self.transformers[-1].inverse_transform( states_transformed, inplace=inplace, locs=locs, ) for tf in reversed(self.transformers[:-1]): Y = tf.inverse_transform(Y, inplace=True, locs=locs) return Y
[docs] def transform_ddts(self, ddts, inplace=False): return self._chain("transform_ddts", ddts, inplace=inplace)
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False): with utils.hdf5_savehandle(savefile, overwrite) as hf: meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["name"] = str(self.name) meta.attrs["num_transformers"] = self.num_transformers for i, tf in enumerate(self.transformers): tf.save(hf.create_group(f"transformer_{i:0>2d}"))
[docs] @classmethod def load(cls, loadfile: str, TransformerClasses): """Load a previously saved transformer from an HDF5 file. Parameters ---------- loadfile : str File where the transformer was stored via :meth:`save()`. TransformerClasses : Iterable[type] Classes of the transformers for each state variable. Returns ------- TransformerPipeline """ with utils.hdf5_loadhandle(loadfile) as hf: meta = hf["meta"] name = str(meta.attrs["name"]) num_transformers = int(meta.attrs["num_transformers"]) if (nclasses := len(TransformerClasses)) != num_transformers: raise ValueError( f"file contains {num_transformers:d} transformers " f"but {nclasses:d} classes provided" ) transformers = [ TransformerClasses[i].load(hf[f"transformer_{i:0>2d}"]) for i in range(num_transformers) ] return cls(transformers, name=name)
# Vertical joining ============================================================
[docs] class NullTransformer(TransformerTemplate): r"""Identity transformation :math:`\q\mapsto\q`. This transformer can be used in conjunction with :class:`TransformerMulti` if separate transformations are desired for individual state variables but one of those state variable is to remain unchanged. Parameters ---------- name : str or None Label for the state variable that this transformer "acts" on. """ # Main routines -----------------------------------------------------------
[docs] def fit(self, states): """Set the state dimension. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. Returns ------- self """ self.state_dimension = states.shape[0] return self
[docs] def fit_transform(self, states, inplace: bool = True): """Do nothing but set the state dimension; this transformation does not affect the states. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True`` (default), return ``states``. If ``False``, return a copy of ``states``. Returns ------- states: (n, ...) ndarray State snapshots, or a copy of them if ``inplace=False``. """ self.fit(states) return states if inplace else states.copy()
[docs] def transform(self, states, inplace: bool = False): """Do nothing; this transformation does not affect the states. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True`` (default), return ``states``. If ``False``, return a copy of ``states``. Returns ------- states: (n, ...) ndarray State snapshots, or a copy of them if ``inplace=False``. Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_shape(states) return states if inplace else states.copy()
[docs] def transform_ddts(self, ddts, inplace: bool = True): r"""Do nothing; this transformation does not affect derivatives. Parameters ---------- ddts : (n, ...) ndarray Matrix of `n`-dimensional snapshot time derivatives, or a single snapshot time derivative. inplace : bool If ``True`` (default), return ``ddts``. If ``False``, return a create a copy of ``ddts``. Returns ------- ddts : (n, ...) ndarray Snapshot time derivatives, or a copy of them if ``inplace=False``. Raises ------ ValueError If the ``states`` do not align with the :attr:`state_dimension`. """ self._check_shape(ddts) return ddts if inplace else ddts.copy()
[docs] def inverse_transform(self, states_transformed, inplace=False, locs=None): """Do nothing; this transformation does not affect the states. Parameters ---------- states_transformed : (n, ...) or (p, ...) ndarray Matrix of `n`-dimensional transformed snapshots, or a single transformed snapshot. inplace : bool If ``True`` (default), return ``states_transformed``. If ``False``, return a create a copy of ``states_transformed``. locs : slice or (p,) ndarray of integers or None If given, assume ``states_transformed`` contains the transformed snapshots at only the `p` indices described by ``locs``. Returns ------- states_transformed: (n, ...) or (p, ...) ndarray Transformed states, or a copy of them if ``inplace=False``. Raises ------ ValueError If the ``states_transformed`` do not align with the ``locs`` (when provided) or the :attr:`state_dimension` (when ``locs`` is not provided). """ if locs is not None: locs = self._check_locs(locs, states_transformed) else: self._check_shape(states_transformed) return states_transformed if inplace else states_transformed.copy()
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile, overwrite=False): with utils.hdf5_savehandle(savefile, overwrite) as hf: meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["name"] = str(self.name) if (n := self.state_dimension) is not None: meta.attrs["state_dimension"] = n
[docs] @classmethod def load(cls, loadfile): with utils.hdf5_loadhandle(loadfile) as hf: meta = hf["meta"] name = meta.attrs["name"] ntf = cls(name=(None if name == "None" else name)) if "state_dimension" in meta.attrs: ntf.state_dimension = int(meta.attrs["state_dimension"]) return ntf
[docs] class TransformerMulti: r"""Join transformers together for states with multiple variables. This class is for states that can be written (after discretization) as .. math:: \q = \left[\begin{array}{c} \q_{0} \\ \q_{1} \\ \vdots \\ \q_{n_q - 1} \end{array}\right] \in \RR^{n}, where each :math:`\q_{i} \in \RR^{n_i}` represents a single discretized state variable of dimension :math:`n_i \in \NN`. The dimension of the joint state :math:`\q` is :math:`n = \sum_{i=0}^{n_q - 1} n_i`, i.e., ``state_dimension = sum(variable_sizes)``. Individual transformers are calibrated for each state variable. Parameters ---------- transformers : list Initialized (but not necessarily trained) transformer objects, one for each state variable. Entries of this list can be ``None``, in which case a :class:`NullTransformer` is used. variable_sizes : list or None Dimensions for each state variable, :math:`n_0,\ldots,n_{n_q-1}`. If ``None`` (default), set :math:`n_i` to ``transformers[i].state_dimension``; if any of these are ``None``, assume all state variables have the same dimension, i.e., :math:`n_0 = n_1 = \cdots = n_x \in \NN` with :math:`n_x` to be determined in :meth:`fit`. In this case, :math:`n = n_q n_x`. Notes ----- This class connects multiple transformers "vertically" by assigning different transformations for different parts of the state; see :class:`TransformerPipeline` to connect multiple transformers "horizontally", i.e., to perform one transformation after another on the entire state. """ def __init__(self, transformers, variable_sizes=None): """Initialize the transformers.""" if (num_variables := len(transformers)) == 0: raise ValueError("at least one transformer required") elif num_variables == 1: warnings.warn("only one variable detected", errors.OpInfWarning) # Check inheritance and set default variable names. tfs = [NullTransformer() if tf is None else tf for tf in transformers] for i, tf in enumerate(tfs): if not isinstance(tf, TransformerTemplate): warnings.warn( f"transformers[{i}] does not inherit from " "TransformerTemplate, unexpected behavior may occur", errors.OpInfWarning, ) if tf.name is None: tf.name = f"variable {i}" # Check variable sizes. if variable_sizes is not None: if len(variable_sizes) != num_variables: raise ValueError("len(variable_sizes) != len(transformers)") for tf, ni in zip(tfs, variable_sizes): if tf.state_dimension is None: tf.state_dimension = ni elif (tf_n := tf.state_dimension) != ni: raise ValueError( f"transformers[{i}].state_dimension = {tf_n} " f"!= {ni} = variable_sizes[{i}]" ) # Store transformers and set slice dimensions. self.__nq = num_variables self.__transformers = tuple(tfs) # Properties -------------------------------------------------------------- @property def transformers(self) -> tuple: """Transformers for each state variable.""" return self.__transformers @property def num_variables(self) -> int: r"""Number of state variables :math:`n_q \in \NN`.""" return self.__nq @property def variable_names(self) -> tuple: """Names for each state variable.""" return tuple(tf.name for tf in self.transformers) @property def variable_sizes(self) -> tuple: r"""Dimensions of each state variable, :math:`n_0,\ldots,n_{n_q-1}\in\NN`. """ return tuple(tf.state_dimension for tf in self.transformers) @property def state_dimension(self) -> int: r"""Total dimension :math:`n = \sum_{i=0}^{n_q-1} n_i \in \NN` of the joint state. """ if None in (sizes := self.variable_sizes): return None return sum(sizes) # Magic methods ----------------------------------------------------------- def __len__(self) -> int: """Length = number of state variables.""" return self.num_variables def __getitem__(self, key): """Get the transformer for variable i.""" if key in self.variable_names: key = self.variable_names.index(key) return self.transformers[key] def __eq__(self, other) -> bool: """Test two TransformerMulti objects for equality.""" if not ( isinstance(other, self.__class__) or isinstance(self, other.__class__) ): return False if self.num_variables != other.num_variables: return False return all( t1 == t2 for t1, t2 in zip(self.transformers, other.transformers) ) def __str__(self) -> str: """String representation: str() of each transformer.""" lines = [self.__class__.__name__] if (n := self.state_dimension) is not None: lines.append(f"state_dimension: {n}") lines.append(f"num_variables: {self.num_variables}") lines.append("transformers") for tf in self.transformers: tfstr = str(tf).split("\n ") tfstr[0] = f" {tfstr[0]}" lines.append("\n ".join(tfstr)) return "\n ".join(lines) def __repr__(self) -> str: """Unique ID + string representation.""" return utils.str2repr(self) # Access convenience methods ---------------------------------------------- @requires_trained def _check_shape(self, states): """Verify the shape of the snapshot set Q.""" if (nQ := len(states)) != (n := self.state_dimension): raise errors.DimensionalityError( f"len(states) = {nQ:d} != {n:d} = state_dimension" ) def _slices(self, varindex=None): """Get slices for one or all state variable(s). Parameters ---------- varindex : int Index of the variable to get a slice for. If ``None`` (default), get slices for all state variables. Returns ------- slice or tuple Slice for the state variable at index ``varindex`` or a tuple of slices for all state variables. """ dimsum = np.cumsum((0,) + self.variable_sizes) if varindex is not None: return slice(dimsum[varindex], dimsum[varindex + 1]) return tuple( [ slice(dimsum[i], dimsum[i + 1]) for i in range(self.num_variables) ] )
[docs] def get_var(self, var, states): """Extract a single variable from the joint state. Parameters ---------- var : int or str Index or name of the variable to extract. states : (n, ...) ndarray Joint state vector or snapshot matrix. Returns ------- state_variable : (n_i, ...) ndarray One state variable, extracted from ``states``. """ self._check_shape(states) if var in (names := self.variable_names): var = names.index(var) return states[self._slices(var)]
[docs] def split(self, states): """Split the joint state into the individual state variables. Parameters ---------- states : (n, ...) ndarray Joint state vector or snapshot matrix. Returns ------- state_variable : list of nq (nx, ...) ndarrays Individual state variables, extracted from ``states``. """ self._check_shape(states) return [states[s] for s in self._slices()]
# Main routines -----------------------------------------------------------
[docs] def fit(self, states): """Learn (but do not apply) the transformation. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. Returns ------- self """ self.fit_transform(states, inplace=False) return self
[docs] def fit_transform(self, states, inplace=False): """Learn and apply the transformation. Parameters ---------- states : (n, k) ndarray Matrix of `k` `n`-dimensional snapshots. The first ``variable_sizes[0]`` entries correspond to the first state variable, the next ``variable_sizes[1]`` entries correspond to the second state variable, and so on. If ``variable_sizes`` are not yet prescribed, assume that each state variable has the same dimension. inplace : bool If ``True``, overwrite ``states`` data during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_transformed: (n, k) ndarray Matrix of `k` transformed `n`-dimensional snapshots. """ if self.state_dimension is None: # Assume all state dimensions are equal. nx, remainder = divmod(states.shape[0], self.num_variables) if remainder != 0: raise errors.DimensionalityError( "len(states) must be evenly divisible by " f"the number of variables n_q = {self.num_variables}" ) for tf in self.transformers: TransformerTemplate.state_dimension.fset(tf, nx) new_states = np.concatenate( [ transformer.fit_transform(state_variable, inplace=inplace) for transformer, state_variable in zip( self.transformers, self.split(states) ) ], axis=0, ) return states if inplace else new_states
[docs] def transform(self, states, inplace=False): """Apply the learned transformation. Parameters ---------- states : (n, ...) ndarray Matrix of `n`-dimensional snapshots, or a single snapshot. inplace : bool If ``True``, overwrite ``states`` during transformation. If ``False``, create a copy of the data to transform. Returns ------- states_transformed: (n, ...) ndarray Matrix of `n`-dimensional transformed snapshots, or a single transformed snapshot. """ states_transformed = np.concatenate( [ tf.transform(Q, inplace=inplace) for tf, Q in zip(self.transformers, self.split(states)) ] ) return states if inplace else states_transformed
[docs] def transform_ddts(self, ddts, inplace: bool = False): r"""Apply the learned transformation to snapshot time derivatives. If the transformation is denoted by :math:`\mathcal{T}(q)`, this function implements :math:`\mathcal{T}'` such that :math:`\mathcal{T}'(\ddt q) = \ddt \mathcal{T}(q)`. Parameters ---------- ddts : (n, ...) ndarray Matrix of `n`-dimensional snapshot time derivatives, or a single snapshot time derivative. inplace : bool If True, overwrite ``ddts`` during the transformation. If False, create a copy of the data to transform. Returns ------- ddts_transformed : (n, ...) ndarray Transformed snapshot time derivatives. """ ddts_transformed = [ tf.transform_ddts(dQdt, inplace=inplace) for tf, dQdt in zip(self.transformers, self.split(ddts)) ] if any(dt is NotImplemented for dt in ddts_transformed): return NotImplemented return ddts if inplace else np.concatenate(ddts_transformed)
[docs] @requires_trained def inverse_transform(self, states_transformed, inplace=False, locs=None): """Apply the inverse of the learned transformation. Parameters ---------- states_transformed : (n, ...) or (num_variables*p, ...) ndarray Matrix of `n`-dimensional transformed snapshots, or a single transformed snapshot. inplace : bool If ``True``, overwrite ``states_transformed`` during the inverse transformation. If ``False``, create a copy of the data to untransform. locs : slice or (p,) ndarray of integers or None If given, assume ``states_transformed`` contains the `p` entries of each transformed state variable at the indices specified by ``locs``. This option requires each state variable to have the same dimension. Returns ------- states_untransformed: (n, ...) or (num_variables*p, ...) ndarray Matrix of `n`-dimensional untransformed snapshots, or the :math:`n_q p` entries of such at the indices specified by ``locs``. """ if locs is not None: if len(set(self.variable_sizes)) > 1: raise ValueError( "'locs != None' requires that " "all transformers have the same state_dimension" ) variables_transformed = np.split( states_transformed, self.num_variables, axis=0, ) else: variables_transformed = self.split(states_transformed) states = np.concatenate( [ tf.inverse_transform(Q, inplace=inplace, locs=locs) for tf, Q in zip(self.transformers, variables_transformed) ] ) return states_transformed if inplace else states
# Model persistence -------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False): with utils.hdf5_savehandle(savefile, overwrite) as hf: hf.create_dataset("num_variables", data=[self.num_variables]) for i, tf in enumerate(self.transformers): tf.save(hf.create_group(f"variable{i}"))
[docs] @classmethod def load(cls, loadfile: str, TransformerClasses): """Load a previously saved transformer from an HDF5 file. Parameters ---------- loadfile : str File where the transformer was stored via :meth:`save()`. TransformerClasses : Iterable[type] Classes of the transformers for each state variable. Returns ------- TransformerMulti """ with utils.hdf5_loadhandle(loadfile) as hf: num_variables = int(hf["num_variables"][0]) if isinstance(TransformerClasses, type): TransformerClasses = [TransformerClasses] * num_variables if (nclasses := len(TransformerClasses)) != num_variables: raise ValueError( f"file contains {num_variables:d} transformers " f"but {nclasses:d} classes provided" ) transformers = [ TransformerClasses[i].load(hf[f"variable{i}"]) for i in range(num_variables) ] return cls(transformers)
# Verification ------------------------------------------------------------
[docs] def verify(self, tol: float = 1e-4): r"""Verify that :meth:`transform()` and :meth:`inverse_transform()` are consistent and that :meth:`transform_ddts()`, if implemented in each transformer, is consistent with :meth:`transform()`. * The :meth:`transform()` / :meth:`inverse_transform()` consistency check verifies that ``inverse_transform(transform(states)) == states``. * The :meth:`transform_ddts()` consistency check uses :meth:`opinf.ddt.ddt()` to estimate the time derivatives of the states and the transformed states, then verfies that the relative difference between ``transform_ddts(opinf.ddt.ddt(states, t))`` and ``opinf.ddt.ddt(transform(states), t)`` is less than ``tol``. If this check fails, consider using a finer time mesh. Parameters ---------- tol : float > 0 Tolerance for the finite difference check of :meth:`transform_ddts()`. Only used if :meth:`transform_ddts()` is implemented. """ for tf, name in zip(self.transformers, self.variable_names): print(f"{name}:", end="\t") tf.verify(tol) return TransformerTemplate.verify(self, tol)
def _verify_locs(self, states, states_transformed): """Verify :meth:`inverse_transform()` with ``locs != None``.""" if len(sizes := set(self.variable_sizes)) != 1: return # Cannot use locs unless all variable sizes equal. nx = sizes.pop() locs = np.sort(np.random.choice(nx, size=(nx // 3), replace=False)) states_at_locs = np.concatenate([Q[locs] for Q in self.split(states)]) states_transformed_at_locs = np.concatenate( [Qt[locs] for Qt in self.split(states_transformed)] ) states_recovered_at_locs = self.inverse_transform( states_transformed_at_locs, locs=locs, ) if states_recovered_at_locs.shape != states_at_locs.shape: raise errors.VerificationError( # pragma: no cover "inverse_transform(states_transformed_at_locs, locs).shape " "!= states_at_locs.shape" ) if not np.allclose(states_recovered_at_locs, states_at_locs): raise errors.VerificationError( # pragma: no cover "transform() and inverse_transform() are not inverses " "(locs != None)" )