Source code for opinf.lstsq._base

# lstsq/_base.py
"""Base class for solvers for the Operator Inference regression problem."""

__all__ = [
    "lstsq_size",
    "SolverTemplate",
    "PlainSolver",
]

import os
import abc
import copy
import warnings
import numpy as np
import scipy.linalg as la

from .. import errors, utils


_require_trained = utils.requires2(
    attr="data_matrix",
    message="solver not trained, call fit()",
)


[docs] def lstsq_size(modelform, r, m=0, affines=None) -> int: r"""Compute the number of columns in the operator matrix :math:`\Ohat` in the Operator Inference least-squares problem. This is also the number of columns in the data matrix :math:`\D`. Parameters ---------- modelform : str containing 'c', 'A', 'H', 'G', and/or 'B' The structure of the desired reduced-order model. Each character indicates the presence of a different term in the model: 'c' : Constant term c 'A' : Linear state term Ax. 'H' : Quadratic state term H(x⊗x). 'G' : Cubic state term G(x⊗x⊗x). 'B' : Input term Bu. For example, modelform=="AB" means f(x,u) = Ax + Bu. r : int The dimension of the reduced order model. m : int The dimension of the inputs of the model. Must be zero unless 'B' is in `modelform`. affines : dict(str -> list(callables)) Functions that define the structures of the affine operators. Keys must match the modelform: * 'c': Constant term c(µ). * 'A': Linear state matrix A(µ). * 'H': Quadratic state matrix H(µ). * 'G': Cubic state matrix G(µ). * 'B': linear Input matrix B(µ). For example, if the constant term has the affine structure c(µ) = θ1(µ)c1 + θ2(µ)c2 + θ3(µ)c3, then 'c' -> [θ1, θ2, θ3]. Returns ------- ncols : int The number of columns in the Operator Inference least-squares problem. """ if "B" in modelform and m == 0: raise ValueError("argument m > 0 required since 'B' in modelform") if "B" not in modelform and m != 0: raise ValueError(f"argument m={m} invalid since 'B' in modelform") if affines is None: affines = {} qs = [ ( len(affines[op]) if (op in affines and op in modelform) else 1 if op in modelform else 0 ) for op in "cAHGB" ] rs = [1, r, r * (r + 1) // 2, r * (r + 1) * (r + 2) // 6, m] return sum(qq * rr for qq, rr in zip(qs, rs))
[docs] class SolverTemplate(abc.ABC): r"""Template for solvers for the Operator Inference regression :math:`\Z \approx \Ohat\D\trp` (or :math:`\D\Ohat\trp \approx \Z\trp`) for the operator matrix :math:`\Ohat`. Child classes formulate the regression, which may include regularization terms and/or optimization constraints. Hyperparameters should be set in the constructor (regularization terms, etc.). """ def __init__(self): self.__D = None self.__Z = None # Properties: matrices ---------------------------------------------------- @property def data_matrix(self) -> np.ndarray: r""":math:`k \times d` data matrix :math:`\D`.""" return self.__D @property def lhs_matrix(self) -> np.ndarray: r""":math:`r \times k` left-hand side data :math:`\Z`.""" return self.__Z # Properties: matrix dimensions ------------------------------------------- @property def k(self) -> int: r"""Number of equations in the least-squares problem (number of rows of :math:`\D` and number of columns of :math:`\Z`). """ return D.shape[0] if (D := self.data_matrix) is not None else None @property def d(self) -> int: r"""Number of unknowns in each row of the operator matrix (number of columns of :math:`\D` and :math:`\Ohat`). """ return D.shape[1] if (D := self.data_matrix) is not None else None @property def r(self) -> int: r"""Number of operator matrix rows to learn (number of rows of :math:`\Z` and :math:`\Ohat`) """ return Z.shape[0] if (Z := self.lhs_matrix) is not None else None # String representation --------------------------------------------------- def __str__(self) -> str: """String representation: class name + dimensions.""" out = [self.__class__.__name__] if (self.data_matrix is not None) and (self.lhs_matrix is not None): out.append(f" data_matrix: {self.data_matrix.shape}") out.append(f" condition number: {self.cond():.4e}") out.append(f" lhs_matrix: {self.lhs_matrix.shape}") out.append(f" solve().shape: {self.r, self.d}") else: out[0] += " (not trained)" return "\n".join(out) def __repr__(self) -> str: """Unique ID + string representation.""" return utils.str2repr(self) @staticmethod def _print_kwargs(opts: dict) -> str: """Utility for printing routine options in __str__().""" return ", ".join([f"{key}={repr(val)}" for key, val in opts.items()]) # Main methods -----------------------------------------------------------
[docs] def fit(self, data_matrix: np.ndarray, lhs_matrix: np.ndarray): r"""Verify dimensions and save the data matrices. Parameters ---------- data_matrix : (k, d) ndarray Data matrix :math:`\D`. lhs_matrix : (r, k) or (k,) ndarray "Left-hand side" data matrix :math:`\Z` (not its transpose!). If one-dimensional, assume :math:`r = 1`. """ # Verify dimensions. if data_matrix.ndim != 2: raise ValueError("data_matrix must be two-dimensional") if lhs_matrix.ndim == 1: lhs_matrix = lhs_matrix.reshape((1, -1)) if lhs_matrix.ndim != 2: raise ValueError("lhs_matrix must be one- or two-dimensional") if (k1 := lhs_matrix.shape[1]) != (k2 := data_matrix.shape[0]): raise errors.DimensionalityError( "data_matrix and lhs_matrix not aligned " f"(lhs_matrix.shape[-1] = {k1} != {k2} = data_matrix.shape[0])" ) self.__D = data_matrix self.__Z = lhs_matrix return self
[docs] @abc.abstractmethod def solve(self) -> np.ndarray: # pragma: no cover r"""Solve the Operator Inference regression. Returns ------- Ohat : (r, d) ndarray Operator matrix :math:`\Ohat` (not its transpose!). """ raise NotImplementedError
# Post-processing --------------------------------------------------------
[docs] @_require_trained def cond(self) -> float: r"""Compute the :math:`2`-norm condition number of the data matrix :math:`\D`. Returns ------- conditionnumber : float """ return np.linalg.cond(self.data_matrix)
[docs] @_require_trained def residual(self, Ohat: np.ndarray) -> np.ndarray: r"""Compute the residual of the :math:`2`-norm regression objective for each row of the given operator matrix. Specifically, given a potential :math:`\Ohat`, compute .. math:: \|\D\ohat_i - \z_i\|_2^2, \quad i = 1, \ldots, r, where :math:`\ohat_i` and :math:`\z_i` are the :math:`i`-th rows of :math:`\Ohat` and :math:`\Z`, respectively. Parameters ---------- Ohat : (r, d) ndarray Operator matrix :math:`\Ohat`. Returns ------- residuals : (r,) ndarray :math:`2`-norm residuals for each row of the operator matrix. """ if self.r == 1 and Ohat.ndim == 1: Ohat = Ohat.reshape((1, -1)) if Ohat.shape != (shape := (self.r, self.d)): raise errors.DimensionalityError( f"Ohat.shape = {Ohat.shape} != {shape} = (r, d)" ) return np.sum( (self.data_matrix @ Ohat.T - self.lhs_matrix.T) ** 2, axis=0, )
# Persistence -------------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False): # pragma: no cover """Serialize the solver, saving it in HDF5 format. The model can be recovered with the :meth:`load()` class method. Parameters ---------- savefile : str File to save to, with extension ``.h5`` (HDF5). overwrite : bool If ``True`` and the specified ``savefile`` already exists, overwrite the file. If ``False`` (default) and the specified ``savefile`` already exists, raise an error. """ raise NotImplementedError
[docs] @classmethod def load(cls, loadfile: str): # pragma: no cover """Load a serialized solver from an HDF5 file, created previously from the :meth:`save()` method. Parameters ---------- loadfile : str Path to the file where the model was stored via :meth:`save()`. Returns ------- solver : SolverTemplate Loaded solver. """ raise NotImplementedError
def _save_dict(self, hf, attr="options"): """Helper method for serializing a dictionary in HDF5 format.""" options = hf.create_dataset(attr, shape=(0,)) for key, value in getattr(self, attr).items(): options.attrs[key] = "NULL" if value is None else value @staticmethod def _load_dict(hf, attr="options"): """Helper method for loading a dictionary from HDF5 format. This function is the inverse of :meth:`_save_dict()`.""" options = dict() for key in hf[attr].attrs: value = hf[attr].attrs[key] options[key] = None if value == "NULL" else value return options
[docs] def copy(self): """Make a copy of the solver.""" return copy.deepcopy(self)
# Verification ------------------------------------------------------------
[docs] def verify(self): """Verify the solver. If the solver is already trained, check :meth:`solve()`, :meth:`copy()`, and (if implemented) :meth:`save()` and :meth:`load()`. If the solver is not trained, do nothing. """ if self.data_matrix is None: return print("cannot verify solve() before calling fit()") tempfile = "_solververification.h5" def _make_copy(solver): newsolver = solver.copy() if (s2cls := newsolver.__class__) is not (scls := self.__class__): raise errors.VerificationError( f"{scls.__name__}.copy() returned object " f"of type '{s2cls.__name__}'" ) return newsolver def _check_equality(obj1, obj2, Ohat1, operation): if obj1.r != obj2.r or obj1.d != obj2.d or obj1.k != obj2.k: raise errors.VerificationError( f"{operation} does not preserve problem dimensions" ) if not np.allclose(obj2.solve(), Ohat1): raise errors.VerificationError( f"{operation} does not preserve the result of solve()" ) try: # Verify solve(). Ohat = self.solve() if (shape1 := Ohat.shape) != (shape2 := (self.r, self.d)): raise errors.VerificationError( "solve() did not return array of shape (r, d) " f"(expected {shape2}, got {shape1})" ) # Verify copy(). self2 = _make_copy(self) _check_equality(self, self2, Ohat, "copy()") print("solve() is consistent with problem dimensions") # Verify save()/load(). try: self2.save(tempfile, overwrite=True) self3 = self2.load(tempfile) _check_equality(self2, self3, Ohat, "save()/load()") except NotImplementedError: print("save() and/or load() not implemented") finally: if os.path.isfile(tempfile): os.remove(tempfile)
[docs] class PlainSolver(SolverTemplate): r"""Solve the :math:`2`-norm ordinary least-squares problem without any regularization or constraints. That is, solve .. math:: \argmin_{\Ohat} \|\D\Ohat\trp - \Z\trp\|_F^2. The solution is calculated using :func:`scipy.linalg.lstsq()`. Parameters ---------- cond : float or None Cutoff for 'small' singular values of the data matrix. See :func:`scipy.linalg.lstsq()`. lapack_driver : str or None Which LAPACK driver is used to solve the least-squares problem. See :func:`scipy.linalg.lstsq()`. """ def __init__(self, cond=None, lapack_driver=None): """Store least-squares solver options.""" SolverTemplate.__init__(self) self.__options = dict(cond=cond, lapack_driver=lapack_driver) @property def options(self): """Keyword arguments for :func:`scipy.linalg.lstsq()`.""" return self.__options def __str__(self): """String representation: dimensions + solver options.""" start = SolverTemplate.__str__(self) kwargs = self._print_kwargs(self.options) return start + f"\n solver: scipy.linalg.lstsq({kwargs})" # Main methods ------------------------------------------------------------
[docs] def fit(self, data_matrix, lhs_matrix): r"""Verify dimensions and save the data matrices. Parameters ---------- data_matrix : (k, d) ndarray Data matrix :math:`\D`. lhs_matrix : (r, k) or (k,) ndarray "Left-hand side" data matrix :math:`\Z` (not its transpose!). If one-dimensional, assume :math:`r = 1`. """ SolverTemplate.fit(self, data_matrix, lhs_matrix) if self.k < self.d: warnings.warn( "least-squares system is underdetermined", errors.OpInfWarning, stacklevel=2, ) return self
[docs] def solve(self): r"""Solve the Operator Inference regression via :func:`scipy.linalg.lstsq()`. Returns ------- Ohat : (r, d) ndarray Operator matrix :math:`\Ohat` (not its transpose!). """ results = la.lstsq(self.data_matrix, self.lhs_matrix.T, **self.options) return results[0].T
# Persistence -------------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False): """Serialize the solver, saving it in HDF5 format. The model can be recovered with the :meth:`load()` class method. Parameters ---------- savefile : str File to save to, with extension ``.h5`` (HDF5). overwrite : bool If ``True`` and the specified ``savefile`` already exists, overwrite the file. If ``False`` (default) and the specified ``savefile`` already exists, raise an error. """ with utils.hdf5_savehandle(savefile, overwrite) as hf: self._save_dict(hf, "options") if self.data_matrix is not None: hf.create_dataset("data_matrix", data=self.data_matrix) hf.create_dataset("lhs_matrix", data=self.lhs_matrix)
[docs] @classmethod def load(cls, loadfile: str): """Load a serialized solver from an HDF5 file, created previously from the :meth:`save()` method. Parameters ---------- loadfile : str Path to the file where the model was stored via :meth:`save()`. Returns ------- solver : SolverTemplate Loaded solver. """ options = dict() with utils.hdf5_loadhandle(loadfile) as hf: options = cls._load_dict(hf, "options") solver = cls(**options) if "data_matrix" in hf: D = hf["data_matrix"][:] Z = hf["lhs_matrix"][:] solver.fit(D, Z) return solver
[docs] def copy(self): """Make a copy of the solver.""" solver = self.__class__( cond=self.options["cond"], lapack_driver=self.options["lapack_driver"], ) if self.data_matrix is not None: SolverTemplate.fit(solver, self.data_matrix, self.lhs_matrix) return solver