Source code for opinf.lstsq._total

# lstsq/_total.py
"""Operator Inference total least-squares solver."""

__all__ = [
    "TotalLeastSquaresSolver",
]

import types
import numpy as np
import scipy.linalg as la

from .. import utils
from ._base import SolverTemplate, _require_trained


[docs] class TotalLeastSquaresSolver(SolverTemplate): r"""Solve the total least-squares problem without any regularization, i.e., .. math:: \argmin_{\Ohat} \left\|[~\Delta_\D~~\Delta_{\Z\trp}~]\right\|_{F}^{2} \quad\text{such that}\quad (\D + \Delta_\D)\Ohat\trp = \Z\trp + \Delta_{\Z\trp}. The solution is computed via the singular value decomposition of the augmented :math:`k \times (d + r)` matrix :math:`[~\D~~\Z\trp~]`: if :math:`\bfPhi\bfSigma\bfPsi\trp = [~\D~~\Z\trp~]`, write the :math:`(d + r) \times (d + r)` right singular vector matrix :math:`\bfPsi` as .. math:: \bfPsi = \left[\begin{array}{cc} \bfPsi_{\D\D} & \bfPsi_{\D\Z} \\ \bfPsi_{\Z\D} & \bfPsi_{\Z\Z} \end{array}\right], where :math:`\bfPsi_{\D\D}` is :math:`d \times d`, :math:`\bfPsi_{\D\Z}` is :math:`d \times r`, :math:`\bfPsi_{\Z\D}` is :math:`r \times d`, and :math:`\bfPsi_{\Z\Z}` is :math:`r \times r`. Then the optimal solution is given by :math:`\Ohat = -\bfPsi_{\Z\Z}^{-\mathsf{T}}\bfPsi_{\D\Z}\trp`. Parameters ---------- lapack_driver : str LAPACK routine for computing the SVD. See :func:`scipy.linalg.svd()`. """ def __init__(self, lapack_driver: str = "gesdd"): SolverTemplate.__init__(self) self.__options = types.MappingProxyType( dict(full_matrices=False, lapack_driver=lapack_driver) ) # Properties -------------------------------------------------------------- @property def options(self) -> dict: """Keyword arguments for :func:`scipy.linalg.svd()`.""" return self.__options def __str__(self) -> str: """String representation: dimensions + solver options.""" start = SolverTemplate.__str__(self) if self.data_matrix is not None: lines = start.split("\n") lines.insert( 4, f" Augmented condition number: {self._augcond:.4e}", ) start = "\n".join(lines) kwargs = self._print_kwargs(self.options) return start + f"\n SVD solver: scipy.linalg.svd({kwargs})" # Main methods ------------------------------------------------------------
[docs] def fit(self, data_matrix: np.ndarray, lhs_matrix: np.ndarray): r"""Verify dimensions, compute the singular value decomposition of the data matrix, and solve the problem. Parameters ---------- data_matrix : (k, d) ndarray Data matrix :math:`\D`. lhs_matrix : (r, k) or (k,) ndarray "Left-hand side" data matrix :math:`\Z` (not its transpose!). If one-dimensional, assume :math:`r = 1`. """ SolverTemplate.fit(self, data_matrix, lhs_matrix) # Compute the SVD of the concatenation [D Z^T]. D_Zt = np.hstack((self.data_matrix, self.lhs_matrix.T)) Phi, _svals, PsiT = la.svd(D_Zt, **self.options) self._augcond = _svals.max() / _svals.min() # Extract the relevant blocks of the singular vector matrices. Psi_ZZt = PsiT[self.d :, self.d :] Psi_DZt = PsiT[self.d :, : self.d] Phi_Zt_Sig_ZT = Phi[:, self.d :] * _svals[self.d :] # Solve the problem and compute the residuals. self._Ohat = -la.solve(Psi_ZZt, Psi_DZt) Deltas = -Phi_Zt_Sig_ZT @ np.hstack((Psi_ZZt, Psi_DZt)) self._norm_of_errors = la.norm(Deltas) return self
[docs] @_require_trained def solve(self) -> np.ndarray: r"""Return the total least-squares solution to the Operator Inference regression. Returns ------- Ohat : (r, d) ndarray Operator matrix :math:`\Ohat` (not its transpose!). """ return self._Ohat
# Post-processing --------------------------------------------------------- @property @_require_trained def augcond(self) -> float: r""":math:`2`-norm condition number of the augmented data matrix :math:`[~\D~~\Z\trp~]`. """ return self._augcond @property @_require_trained def error(self) -> float: r"""Frobenius norm of the error matrices, :math:`\|[~\Delta_\D~~\Delta_{\Z\trp}~\|_F`. """ return self._norm_of_errors # Persistence -------------------------------------------------------------
[docs] def save(self, savefile: str, overwrite: bool = False): """Serialize the solver, saving it in HDF5 format. The model can be recovered with the :meth:`load()` class method. Parameters ---------- savefile : str File to save to, with extension ``.h5`` (HDF5). overwrite : bool If ``True`` and the specified ``savefile`` already exists, overwrite the file. If ``False`` (default) and the specified ``savefile`` already exists, raise an error. """ with utils.hdf5_savehandle(savefile, overwrite) as hf: self._save_dict(hf, "options") if self.data_matrix is not None: for attr in ("data_matrix", "lhs_matrix", "_Ohat"): hf.create_dataset(attr, data=getattr(self, attr)) for attr in ("_augcond", "_norm_of_errors"): hf.create_dataset(attr, data=[getattr(self, attr)])
[docs] @classmethod def load(cls, loadfile: str): """Load a serialized solver from an HDF5 file, created previously from the :meth:`save()` method. Parameters ---------- loadfile : str Path to the file where the model was stored via :meth:`save()`. Returns ------- solver : L2Solver Loaded solver. """ with utils.hdf5_loadhandle(loadfile) as hf: options = cls._load_dict(hf, "options") solver = cls(lapack_driver=options["lapack_driver"]) if "data_matrix" in hf: D = hf["data_matrix"][:] Z = hf["lhs_matrix"][:] SolverTemplate.fit(solver, D, Z) solver._Ohat = hf["_Ohat"][:] for attr in ("_augcond", "_norm_of_errors"): setattr(solver, attr, float(hf[attr][0])) return solver
[docs] def copy(self): """Make a copy of the solver.""" solver = self.__class__(lapack_driver=self.options["lapack_driver"]) if self.data_matrix is not None: SolverTemplate.fit(solver, self.data_matrix, self.lhs_matrix) for attr in ("_Ohat", "_augcond", "_norm_of_errors"): setattr(solver, attr, getattr(self, attr)) return solver