Source code for opinf.basis._pod

# basis/
"""Dimensionality reduction with Proper Orthogonal Decomposition (POD)."""

__all__ = [

import types
import warnings
import numpy as np
import scipy.linalg as la
import scipy.sparse as sparse
import sklearn.utils.extmath as sklmath
import matplotlib.pyplot as plt

from .. import errors, utils
from ._base import BasisTemplate
from ._linear import LinearBasis

# Helper functions ============================================================
requires_svdvals = utils.requires2(
    "no singular value data, call fit()",

def _Wmult(W, arr):
    """Matrix multiply ``W`` and ``arr``, where ``W`` may be a one-dimensional
    array representing diagonals or a two-dimensional array.
    if W.ndim == 1:
        if arr.ndim == 1:
            return W * arr
        elif arr.ndim == 2:
            return W.reshape((-1, 1)) * arr
            raise ValueError("expected one- or two-dimensional array")
    return W @ arr

[docs] def method_of_snapshots( states, inner_product_matrix=None, minthresh: float = 1e-15, **options, ): r"""Use the method of snapshots to compute the left singular values of a collection of state snapshots. For a snapshot matrix :math:`\Q\in\RR^{n\times k}` (usually with :math:`n \ge k`) and a weighting matrix :math:`\W\in\RR^{n\times n}`, the method of snapshots computes the symmetric eigendecomposition .. math:: \Q\trp\W\Q = \bfPsi\bfLambda\bfPsi\trp. The matrix :math:`\bfPsi\in\RR^{k\times k}` consists of the right singular vectors of :math:`\Q` and :math:`\bfLambda\in\RR^{k\times k}` is a diagonal matrix containing the square of the singular values of :math:`\Q`. The (weighted) left singular vectors are then given by :math:`\bfPhi = \Q\bfPsi\bfLambda^{-1/2} \in \RR^{n \times k}` and satisfy :math:`\Q = \bfPhi\bfLambda^{1/2}\bfPsi\trp` and :math:`\bfPhi\trp\W\bfPhi = \I`. Parameters ---------- states : (n, k) ndarray, Snapshot matrix :math:`\Q` from which to compute the POD vectors. inner_product_matrix : (n, n) sparse SPD matrix or None Spatial inner product matrix :math:`\W` for measuring how different indices in the snapshot matrix interact with each other. If not provided, default to the standard Euclidean inner product (:math:`\W = \I`). minthresh : float > 0 Threshold at which to truncate small eigenvalues. Singular vectors corresponding to eigenvalues that are less than this threshold are not included in the returned arrays. options : dict Additional arguments for :func:`scipy.linalg.eigh`. Returns ------- V : (n, k') ndarray, k' <= k Left singular vectors :math:`\bfPhi`. svals : (k',) ndarray, k' <= k Singular values :math:`\operatorname{diag}(\bfLambda^{1/2})` in descending order. eigvecsT : (k', k') ndarray Transposed right singular vectors :math:`\bfPsi\trp`. Notes ----- If, due to numerical precision errors, :func:`scipy.linalg.eigh` returns any negative eigenvalues, then ``minthresh`` is increased to the absolute value of the most negative eigenvalue. """ n_states = states.shape[1] if inner_product_matrix is None: gramian = states.T @ (states / n_states) else: gramian = states.T @ _Wmult(inner_product_matrix, states / n_states) # Compute eigenvalue decomposition, using that the Gramian is symmetric. eigvals, eigvecs = la.eigh(gramian, **options) # Re-order (largest to smallest). eigvals = eigvals[::-1] eigvecs = eigvecs[:, ::-1] # NOTE: By definition the Gramian is symmetric positive semi-definite. # If any eigenvalues are smaller than zero, they are only measuring # numerical error and can be truncated. positives = eigvals > max(minthresh, abs(np.min(eigvals))) eigvecs = eigvecs[:, positives] eigvals = eigvals[positives] # Rescale and square root eigenvalues to get singular values. svals = np.sqrt(eigvals * n_states) V = states @ (eigvecs / svals) return V, svals, eigvecs.T
# Main class ==================================================================
[docs] class PODBasis(LinearBasis): r"""Proper othogonal decomposition basis, consisting of the principal left singular vectors of a collection of states. .. math:: \text{svd}(\Q) = \bfPhi\bfSigma\bfPsi\trp \qquad\Longrightarrow\qquad \text{pod}(\Q, r) = \bfPhi_{:,:r} The low-dimensional approximation is linear, see :class:`LinearBasis`. Here, :math:`\Q\in\mathbb{R}^{n\times k}` is a collection of states, the only argument of :meth:`fit`. The POD basis entries matrix :math:`\Vr = \bfPhi_{:,:r}\in\RR^{n\times r}` always has orthonormal columns, i.e., :math:`\Vr\trp\Vr = \I`. If a weight matrix :math:`\W` is specified, a weighted SVD is computed so that :math:`\Vr\trp\W\Vr = \I`. The columns of the basis entries are also the dominant eigenvectors of :math:`\Q\trp\W\Q` and can be computed through eigendecomposition by setting ``svdsolver="eigh"``. The number of left singular vectors :math:`r` is the dimension of the reduced state and is set by specifying exactly one of the constructor arguments ``num_vectors``, ``svdval_threshold``, ``residual_energy``, ``cumulative_energy``, or ``projection_error``. Once the basis entries are set by calling :meth:`fit`, the reduced state dimension :math:`r` can be updated by calling :meth:`set_dimension`. The POD singular values, which are used to select :math:`r`, are the diagonals of :math:`\bfSigma` and are denoted :math:`\sigma_1,\ldots,\sigma_k`. Parameters ---------- num_vectors : int Set the reduced state dimension :math:`r` to ``num_vectors``. svdval_threshold : float Choose :math:`r` as the number of normalized POD singular values that are greater than the given threshold, i.e., :math:`\sigma_{i}/\sigma_{1} \ge` ``svdval_threshold`` for :math:`i=1,\ldots,r`. cumulative_energy : float Choose :math:`r` as the smallest integer such that :math:`\sum_{i=1}^{r}\sigma_i^2\big/\sum_{j=1}^{k}\sigma_j^2` is greater than or equal to ``cumulative_energy``. residual_energy : float Choose :math:`r` as the smallest integer such that :math:`\sum_{i=r+1}^k\sigma_i^2\big/\sum_{j=1}^k\sigma_j^2` is less than or equal to ``residual_energy``. projection_error : float Choose :math:`r` as the smallest integer such that :math:`\|\Q - \Vr\Vr\trp\Q\|_F \big/ \|\Q\|_F` is less than or equal to ``projection_error``. max_vectors : int Maximum number of POD basis vectors to store. After calling :meth:`fit`, the ``reduced_state_dimension`` can be increased up to ``max_vectors``. If not given (default), record all :math:`k` left singular vectors. svdsolver : str or callable Strategy for computing the thin SVD of the states. **Options:** * ``"dense"`` (default): Use :func:`scipy.linalg.svd` to compute the SVD. May be inefficient for very large state matrices. * ``"randomized"``: Compute an approximate SVD with a randomized approach via :func:`sklearn.utils.extmath.randomized_svd`. May be more efficient but less accurate for very large state matrices. **NOTE**: it is highly recommended to set ``max_vectors`` to limit the number of computed singular vectors. In this case, only ``max_vectors`` singular *values* are computed as well, meaning the cumulative and residual energies cannot be computed exactly. * ``"method-of-snapshots"`` or ``"eigh"``: Compute the basis through a symmetric eigenvalue decomposition, rather than through the SVD, via :func:`scipy.linalg.eigh`. This is how POD was computed when it was orginally introduced. If the state dimension is larger than the number of snapshots, this method is much more efficient than the SVD. Moreover, non-Euclidean inner products (see :attr:`weights`) are handled much more efficiently this way than with an SVD-based approach. **NOTE**: in this case, an additional keyword argument ``minthresh`` defines a threshold at which small eigenvalues are truncated. * callable: If this argument is a callable function, use it for the SVD computation. The signature must match :func:`scipy.linalg.svd`, i.e., ``U, s, Vh = svdsolver(states, **svdsolver_options)`` weights : (n, n) ndarray or (n,) ndarray None Weight matrix :math:`\W` or its diagonals. When provided, a weighted singular value decomposition of the states is used to ensure that the left singular vectors are orthogonal with respect to the weight matrix, i.e., :math:`\bfPhi\trp\W\bfPhi = \I`. If ``None`` (default), set :math:`\W` to the identity. name : str Label for the state variable that this basis approximates. svdsolver_options : dict Options to pass to the SVD solver. """ # Valid SVD solvers. __SVDSOLVERS = types.MappingProxyType( { "dense": la.svd, "randomized": sklmath.randomized_svd, "method-of-snapshots": method_of_snapshots, # "streaming": # TODO } ) # Constructors ------------------------------------------------------------ def __init__( self, num_vectors: int = None, svdval_threshold: float = None, cumulative_energy: float = None, residual_energy: float = None, projection_error: float = None, max_vectors: int = None, svdsolver: str = "dense", weights: np.ndarray = None, name: str = None, **svdsolver_options, ): """Initialize an empty basis.""" # Superclass constructor. LinearBasis.__init__(self, entries=None, weights=weights, name=name) # Store dimension selection criteria. self._set_dimension_selection_criterion( num_vectors=num_vectors, svdval_threshold=svdval_threshold, cumulative_energy=cumulative_energy, residual_energy=residual_energy, projection_error=projection_error, ) self.__energy_is_being_estimated = False # Initialize hyperparameter properties. if max_vectors is not None: max_vectors = int(max_vectors) if max_vectors <= 0: raise ValueError("max_vectors must be a positive integer") self.__max_vectors_desired = max_vectors self.svdsolver = svdsolver self.svdsolver_options = svdsolver_options # Initialize entry properties. self.__leftvecs = None self.__svdvals = None self.__rightvecs = None self.__cumulative_energy = None self.__residual_energy = None # Store weights (separate from LinearBasis.__weights) if ( weights is not None and self.__svdsolverlabel != "method-of-snapshots" ): if weights.ndim == 1: self.__sqrt_weights = np.sqrt(weights) else: # (weights.ndim == 2, checked by LinearBasis) self.__sqrt_weights = la.sqrtm(weights) self.__sqrt_weights_cho = la.cho_factor(self.__sqrt_weights)
[docs] @classmethod def from_svd( cls, leftvecs: np.ndarray, svdvals: np.ndarray, rightvecs: np.ndarray = None, num_vectors: int = None, svdval_threshold: float = None, residual_energy: float = None, cumulative_energy: float = None, projection_error: float = None, max_vectors: int = None, weights: np.ndarray = None, name: str = None, ): r"""Initialize a :class:`PODBasis` from the singular value decomposition of an :math:`n\times k` matrix. Parameters ---------- leftvecs : (n, r) ndarray Left singular vectors. svdvals : (r,) ndarray Singular values. rightvecs : (k, r) ndarray or None Right singular vectors. Each *column* is a singular vector. Returns ------- Initialized :class:`PODBasis` object. Notes ----- See :class:`PODBasis` for details on other arguments. """ # Default dimensionality criterion: use all left singular vectors. if all( arg is None for arg in ( num_vectors, svdval_threshold, residual_energy, cumulative_energy, projection_error, ) ): num_vectors = leftvecs.shape[1] basis = cls( num_vectors=num_vectors, svdval_threshold=svdval_threshold, residual_energy=residual_energy, cumulative_energy=cumulative_energy, projection_error=projection_error, max_vectors=max_vectors, weights=weights, name=name, ) basis._store_svd(leftvecs, svdvals, rightvecs) basis._set_dimension_from_criterion() return basis
# Properties: hyperparameters --------------------------------------------- @property def svdsolver(self) -> str: """Strategy for computing the thin SVD of the states, either ``'dense'``, ``'randomized'``, or ``'custom'``. """ return self.__svdsolverlabel @svdsolver.setter def svdsolver(self, s): if callable(s): self.__svdsolverlabel = "custom" self.__svdengine = s return if s == "eigh": s = "method-of-snapshots" if s not in self.__SVDSOLVERS: raise AttributeError( f"invalid svdsolver '{s}', options: " + ", ".join(self.__SVDSOLVERS.keys()) ) self.__svdsolverlabel = s self.__svdengine = self.__SVDSOLVERS[s] @property def svdsolver_options(self) -> dict: """Options to pass to the SVD solver.""" return self.__svdsolver_options @svdsolver_options.setter def svdsolver_options(self, options): if options is None: options = dict() if not isinstance(options, dict): raise TypeError("svdsolver_options must be a dictionary") self.__svdsolver_options = options # Properties: entries ----------------------------------------------------- @property def leftvecs(self): """Leading left singular vectors of the training data.""" return self.__leftvecs @property def max_vectors(self) -> int: """Number of POD basis vectors stored in memory. The ``reduced_state_dimension`` may be increased up to ``max_vectors``. """ return None if self.leftvecs is None else self.leftvecs.shape[1] @property def svdvals(self): """Normalized singular values of the training data.""" return self.__svdvals @property def rightvecs(self): """Leading *right* singular vectors of the training data, if available. """ return self.__rightvecs @property def cumulative_energy(self) -> float: r"""Amount of singular value energy captured by the basis, :math:`\sum_{i=1}^r\sigma_i^2\big/\sum_{j=1}^k\sigma_j^2`. """ return self.__cumulative_energy @property def residual_energy(self) -> float: r"""Amount of singular value energy *not* captured by the basis, :math:`\sum_{i=r+1}^k\sigma_i^2\big/\sum_{j=1}^k\sigma_j^2`. """ return self.__residual_energy def __str__(self): """String representation: class, dimensions, and singular value energies. """ out = [LinearBasis.__str__(self)] if (ce := self.cumulative_energy) is not None: if self.__energy_is_being_estimated: out.append(f"approximate cumulative energy: {ce:%}") else: out.append(f"cumulative energy: {ce:%}") if (re := self.residual_energy) is not None: if self.__energy_is_being_estimated: out.append(f"approximate residual energy: {re:.4e}") else: out.append(f"residual energy: {re:.4e}") if (mv := self.max_vectors) is not None: out.append(f"{mv:d} basis vectors available") if self.__svdsolverlabel == "dense": out.append("SVD solver: scipy.linalg.svd()") elif self.__svdsolverlabel == "randomized": out.append("SVD solver: sklearn.utils.extmath.randomized_svd()") elif self.__svdsolverlabel == "custom": if (name := self.__svdengine.__name__) == "<lambda>": out.append("SVD solver: custom lambda function") else: out.append(f"SVD solver: {name}()") return "\n ".join(out) # Dimension selection ----------------------------------------------------- def _set_dimension_selection_criterion( self, num_vectors: int = None, svdval_threshold: float = None, cumulative_energy: float = None, residual_energy: float = None, projection_error: float = None, ): args = [ ("num_vectors", num_vectors), ("svdval_threshold", svdval_threshold), ("cumulative_energy", cumulative_energy), ("residual_energy", residual_energy), ("projection_error", projection_error), ] provided = [(arg[1] is not None) for arg in args] # More than one argument provided. if sum(provided) > 1: firstarg = args[np.argmax(provided)] warnings.warn( "received multiple dimension selection criteria, using " f"{firstarg[0]}={firstarg[1]}", errors.OpInfWarning, ) self.__criterion = firstarg return # Return the one provided argument. for name, val in args: if val is not None: self.__criterion = (name, val) return # No arguments provided. raise ValueError( "exactly one dimension selection criterion must be provided" ) @BasisTemplate.reduced_state_dimension.setter def reduced_state_dimension(self, r): """Set the reduced state dimension :math:`r`. If there is SVD data, set the basis entries to the first r left singular vectors. If r > max_vectors, set r = max_vectors and raise a warning. """ r = int(r) # No basis data yet, but when fit() is called there will be r vectors. if self.svdvals is None: self._set_dimension_selection_criterion(num_vectors=r) BasisTemplate.reduced_state_dimension.fset(self, r) return # Basis data already exists, change the dimension and update. if r > (k := self.max_vectors): warnings.warn( "selected reduced dimension exceeds number of stored vectors, " f"setting reduced_state_dimension = max_vectors = {k:d}", errors.OpInfWarning, ) r = self.max_vectors BasisTemplate.reduced_state_dimension.fset(self, r) # Update singular value energies. r = self.reduced_state_dimension svdvals2 = self.svdvals**2 self.__cumulative_energy = np.sum(svdvals2[:r]) / np.sum(svdvals2) self.__residual_energy = 1 - self.__cumulative_energy # Update entries. LinearBasis.__init__( self, entries=self.__leftvecs[:, :r], weights=self.weights, check_orthogonality=True,, ) def _set_dimension_from_criterion(self): """Set the dimension by interpreting the ``__criterion`` attribute.""" criterion, value = self.__criterion svdvals2 = self.svdvals**2 nsvdvals = svdvals2.size energy = np.cumsum(svdvals2) / np.sum(svdvals2) if criterion == "num_vectors": r = value elif criterion == "svdval_threshold": r = np.count_nonzero(self.svdvals >= value) elif criterion == "cumulative_energy": r = int(np.searchsorted(energy, value)) + 1 if self.__energy_is_being_estimated: warnings.warn( "cumulative energy is being estimated from only " f"{nsvdvals:d} singular values", errors.OpInfWarning, ) elif criterion == "residual_energy": r = np.count_nonzero(1 - energy >= value) + 1 if self.__energy_is_being_estimated: warnings.warn( "residual energy is being estimated from only " f"{nsvdvals:d} singular values", errors.OpInfWarning, ) elif criterion == "projection_error": r = np.count_nonzero(np.sqrt(1 - energy) >= value) + 1 if self.__energy_is_being_estimated: warnings.warn( "projection error is being estimated from only " f"{nsvdvals:d} singular values", errors.OpInfWarning, ) self.reduced_state_dimension = r
[docs] def set_dimension( self, num_vectors: int = None, svdval_threshold: float = None, cumulative_energy: float = None, residual_energy: float = None, projection_error: float = None, ): r"""Set the reduced state dimension :math:`r`. Exactly one argument should be specified Parameters ---------- num_vectors : int Set :math:`r` to ``num_vectors``. svdval_threshold : float Choose :math:`r` as the number of normalized POD singular values that are greater than the given threshold, i.e., :math:`\sigma_{i}/\sigma_{1} \ge` ``svdval_threshold`` for :math:`i=1,\ldots,r`. cumulative_energy : float Choose :math:`r` as the smallest integer such that :math:`\sum_{i=1}^{r}\sigma_i^2\big/\sum_{j=1}^{k}\sigma_j^2` is greater than or equal to ``cumulative_energy``. residual_energy : float Choose :math:`r` as the smallest integer such that :math:`\sum_{i=r+1}^k\sigma_i^2\big/\sum_{j=1}^k\sigma_j^2` is less than or equal to ``residual_energy``. projection_error : float Choose :math:`r` as the smallest integer such that :math:`\|\Q - \Vr\Vr\trp\Q\|_F \big/ \|\Q\|_F` is less than or equal to ``projection_error``, where :math:`\Q` is the matrix of training snapshots. """ self._set_dimension_selection_criterion( num_vectors=num_vectors, svdval_threshold=svdval_threshold, residual_energy=residual_energy, cumulative_energy=cumulative_energy, projection_error=projection_error, ) # No basis data yet, do nothing. if self.svdvals is None: return # Basis data exists, set the dimension and update. self._set_dimension_from_criterion()
# Fitting ----------------------------------------------------------------- def _store_svd(self, left, svdvals, right): """Store the singular value decomposition, normalizing the singular values. """ self.__leftvecs = left svdvals = np.sort(svdvals)[::-1] self.__svdvals = svdvals / svdvals[0] self.__rightvecs = right
[docs] def fit(self, states): """Compute the POD basis entries by taking the SVD of the states. Parameters ---------- states : (n, k) ndarray Matrix of :math:`k` :math:`n`-dimensional snapshots. """ if np.ndim(states) == 3: states = np.hstack(states) # Limit maximum number of vectors if needed. rmax = min(states.shape) keep = self.__max_vectors_desired if keep is None: keep = rmax elif keep > rmax: warnings.warn( f"only {rmax:d} singular vectors can be extracted " f"from ({states.shape[0]:d} x {states.shape[1]:d}) snapshots, " f"setting max_vectors={rmax:d}", errors.OpInfWarning, ) keep = rmax # SVD solver settings. self.__energy_is_being_estimated = False options = self.svdsolver_options.copy() if self.__svdsolverlabel == "dense": options["full_matrices"] = False elif self.__svdsolverlabel == "randomized": options["n_components"] = keep if "random_state" not in options: options["random_state"] = None if keep < rmax: self.__energy_is_being_estimated = True elif self.__svdsolverlabel == "method-of-snapshots": options["inner_product_matrix"] = self.weights # Weight the states. if ( self.weights is not None and self.__svdsolverlabel != "method-of-snapshots" ): if states.shape[0] != (nW := self.__sqrt_weights.shape[0]): raise errors.DimensionalityError( f"states not aligned with weights, should have {nW:d} rows" ) states = _Wmult(self.__sqrt_weights, states) # Compute the SVD. V, svdvals, Wt = self.__svdengine(states, **options) # Unweight the basis. if ( self.weights is not None and self.__svdsolverlabel != "method-of-snapshots" ): if self.__sqrt_weights.ndim == 1: V = _Wmult(1 / self.__sqrt_weights, V) else: V = la.cho_solve(self.__sqrt_weights_cho, V) # Store the results. self._store_svd( left=V[:, :keep], svdvals=svdvals, right=Wt[:keep, :].T, ) self._set_dimension_from_criterion() return self
# Visualization ----------------------------------------------------------- @requires_svdvals def _plot_single(self, plotter, **kwargs): """Execute a single plotting routine.""" plotter(self.svdvals, **kwargs) return ax if (ax := kwargs.get("ax")) is not None else plt.gca()
[docs] def plot_svdval_decay( self, threshold=None, right: int = None, ax: plt.Axes = None, **options, ): """Plot the normalized singular value decay. Parameters ---------- threshold : float or list[float] or None Cutoff value(s) to mark on the plot. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot on. If ``None`` (default), a new single-axes figure is created. options : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ax : matplotlib.Axes Axes for the plot. """ kwargs = dict(threshold=threshold, plot=True, right=right, ax=ax) kwargs.update(options) return self._plot_single(svdval_decay, **kwargs)
[docs] def plot_cumulative_energy( self, threshold=None, right: int = None, ax: plt.Axes = None, **options, ): r"""Plot the cumulative singular value energy and a function of the basis size. The cumulative energy of :math:`r` singular values is defined by .. math:: \kappa_r = \sum_{i=1}^r\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2. This method plots :math:`\kappa_r` as a function of :math:`r`. Parameters ---------- threshold : float or list[float] or None Threshold energy value(s) to mark on the plot. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot on. If ``None`` (default), a new single-axes figure is created. kwargs : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ax : matplotlib.Axes Axes for the plot. """ kwargs = dict(threshold=threshold, plot=True, right=right, ax=ax) kwargs.update(options) return self._plot_single(cumulative_energy, **kwargs)
[docs] def plot_residual_energy( self, threshold=None, right: int = None, ax: plt.Axes = None, **options, ): r"""Plot the residual singular value energy as a function of the basis size. The residual energy of :math:`r` singular values is defined by .. math:: \epsilon_r = \sum_{i=r+1}^k\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2 = 1 - \left( \sum_{i=1}^r\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2 \right) This method plots :math:`\epsilon_r` as a function of :math:`r`. Parameters ---------- threshold : float or list[float] or None Cutoff value(s) to mark on the plot. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot on. If ``None`` (default), a new single-axes figure is created. options : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ax : matplotlib.Axes Axes for the plot. """ kwargs = dict(threshold=threshold, plot=True, right=right, ax=ax) kwargs.update(options) return self._plot_single(residual_energy, **kwargs)
[docs] def plot_projection_error( self, threshold=None, right: int = None, ax: plt.Axes = None, **options, ): r"""Plot the relative projection error of the training snapshots as a function of the basis size. The relative projection error for the rank-:math:`r` POD basis corresponding to the training snapshot matrix :math:`\Q\in\RR^{n\times k}` is defined by .. math:: \rho_r = \frac{\|\Q - \Vr\Vr\trp\Q\|_{F}}{\|\Q\|_{F}}, where :math:`\Vr\in\RR^{n \times r}` are the basis entries. This method plots :math:`\rho_r` as a function of :math:`r`; :math:`\rho_r` is calculated via the singular values: .. math:: \rho_r = \sqrt{\frac{\sum_{j = r + 1}^{\ell}\sigma_{j}^{2}}{ \sum_{j=1}^{\ell}\sigma_{j}^{2}}} Parameters ---------- threshold : float or list[float] or None Cutoff value(s) to mark on the plot. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot on. If ``None`` (default), a new single-axes figure is created. options : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ax : matplotlib.Axes Axes for the plot. Notes ----- This method shows the projection error of the training snapshots. See :meth:`projection_error` to calculate the projection error for an arbitrary snapshot or collection of snapshots. """ kwargs = dict( threshold=threshold, plot=True, right=right, ax=ax, sqrt=True, ) kwargs.update(options) return self._plot_single(residual_energy, **kwargs)
[docs] @requires_svdvals def plot_energy(self, right: int = None): """Plot the normalized singular values and the cumulative and residual energies. Parameters ---------- right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). """ r = self.reduced_state_dimension def _rline(ax, ymin): ax.axvline(r, color="gray", linewidth=0.5, zorder=1) ax.text( r + 0.5, ymin, f"r = {r}", color="gray", horizontalalignment="left", verticalalignment="bottom", ) fig, axes = plt.subplots(1, 2, figsize=(13.44, 4.8)) ax = self.plot_svdval_decay(right=right, ax=axes[0]) ax.set_title("POD singular values") _rline(ax, 1.05 * ax.get_ylim()[0]) ax = self.plot_residual_energy(right=right, ax=axes[1]) ax.set_ylabel("Residual energy", color="C1") ax.spines["right"].set_visible(True) ax.tick_params(axis="y", which="both", color="C1", labelcolor="C1") ax.set_title("POD singular value energy") ax = self.plot_cumulative_energy( right=right, ax=axes[1].twinx(), color="C0", ) ax.spines["left"].set_visible(True) ax.set_ylabel("Cumulative energy", color="C0") ax.tick_params(axis="y", which="both", color="C0", labelcolor="C0") _rline(ax, 0.01 + ax.get_ylim()[0]) fig.tight_layout()
# Persistence -------------------------------------------------------------
[docs] def save(self, savefile, overwrite=False): """Save the basis to an HDF5 file. Parameters ---------- savefile : str Path of the file to save the basis to. overwrite : bool If ``True``, overwrite the file if it already exists. If ``False`` (default), raise a ``FileExistsError`` if the file already exists. """ with utils.hdf5_savehandle(savefile, overwrite) as hf: meta = hf.create_dataset("meta", shape=(0,)) if meta.attrs["name"] = meta.attrs["criterion_type"] = self.__criterion[0] hf.create_dataset("criterion_value", data=[self.__criterion[1]]) if self.__max_vectors_desired is not None: hf.create_dataset( "max_vectors", data=[self.__max_vectors_desired], ) if (w := self.weights) is not None: if isinstance(w, sparse.dia_array): w =[0] hf.create_dataset("weights", data=w) if self.leftvecs is not None: hf.create_dataset("leftvecs", data=self.leftvecs) hf.create_dataset("svdvals", data=self.svdvals) hf.create_dataset("rightvecs", data=self.rightvecs)
[docs] @classmethod def load(cls, loadfile, max_vectors=None): """Load a basis from an HDF5 file. Parameters ---------- loadfile : str Path to the file where the basis was stored via :meth:`save`. max_vectors : int or None Maximum number of POD vectors to load. If ``None`` (default), load all stored vectors. Returns ------- PODBasis object """ kwargs = {} with utils.hdf5_loadhandle(loadfile) as hf: attrs = hf["meta"].attrs if "name" in attrs: kwargs["name"] = attrs["name"] kwargs[attrs["criterion_type"]] = hf["criterion_value"][0] if "max_vectors" in hf: kwargs["max_vectors"] = hf["max_vectors"][0] if "weights" in hf: kwargs["weights"] = hf["weights"][:] if "leftvecs" in hf: if (r := max_vectors) is not None: r = min(r, hf["leftvecs"].shape[1]) if "num_vectors" in kwargs: kwargs["num_vectors"] = r kwargs["leftvecs"] = hf["leftvecs"][:, :r] kwargs["svdvals"] = hf["svdvals"][:] kwargs["rightvecs"] = hf["rightvecs"][:, :r] return cls.from_svd(**kwargs) return cls(**kwargs)
# Functional API ==============================================================
[docs] def pod_basis( states, num_vectors: int = None, svdval_threshold: float = None, cumulative_energy: float = None, residual_energy: float = None, projection_error: float = None, svdsolver: str = "dense", weights: np.ndarray = None, return_rightvecs: bool = False, **svdsolver_options, ): r"""Compute a POD basis from the given states. Parameters ---------- states : (n, k) ndarray Matrix of :math:`k` :math:`n`-dimensional snapshots. return_rightvec : bool If ``True``, return the right singular vectors as well. Returns ------- basis : (n, r) ndarray POD basis matrix, the first :math:`r` left singular vectors of the states. svdvals : (n,), (k,), or (r,) ndarray Normalized singular values in descending order. Always returns as many as are calculated: :math:`\min\{n, k\}` for ``svdsolver="dense"``, :math:`r` for ``svdsolver="randomized"``. rightvecs : (k, r) ndarray First :math:`r` **right** singular vectors, as columns. **Only returned** if ``return_rightvecs=True``. Notes ----- See :class:`PODBasis` for the mathematical definition of POD and for details on other function arguments. """ # If no criteria given, use the maximum dimension based on the states. criteria = { num_vectors, svdval_threshold, cumulative_energy, residual_energy, projection_error, } if len(criteria) == 1 and criteria.pop() is None: num_vectors = min(states.shape) basis = PODBasis( num_vectors=num_vectors, svdval_threshold=svdval_threshold, cumulative_energy=cumulative_energy, residual_energy=residual_energy, projection_error=projection_error, svdsolver=svdsolver, weights=weights, **svdsolver_options, ).fit(states) if return_rightvecs: r = basis.reduced_state_dimension return basis.entries, basis.svdvals, basis.rightvecs[:, :r] return basis.entries, basis.svdvals
[docs] def svdval_decay( singular_values, threshold: float = 1e-8, plot: bool = True, right: int = None, ax: plt.Axes = None, **kwargs, ): """Count the number of **normalized** singular values that are greater than a specified threshold. Parameters ---------- singular_values : (n,) ndarray Singular values of a snapshot matrix, e.g., ``scipy.linalg.svdvals(states)``. threshold : float or list[float] Cutoff value(s) for the singular values. plot : bool If ``True``, plot the singular values and the cutoff value(s) against the singular value index. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot the results on if ``plot=True``. If not given, a new single-axes figure is created. kwargs : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ranks : int or list[int] Number of singular values greater than the cutoff value(s). """ singular_values = np.sort(singular_values)[::-1] singular_values /= singular_values[0] # Calculate the number of singular values above the cutoff value(s). if threshold: if one_threshold := np.isscalar(threshold): threshold = [threshold] ranks = [np.count_nonzero(singular_values > ep) for ep in threshold] if plot: # Plot singular values. if ax is None: ax = plt.figure().add_subplot(111) options = dict( marker="*", color="k", markersize=8, markeredgewidth=0, zorder=3, ) options.update(kwargs) marker = options.pop("marker") j = np.arange(1, singular_values.size + 1) ax.semilogy(j, singular_values, marker, **options) ax.set_xlim((0, j.size + 1)) if right: right = min(right, j.size) ax.set_xlim(right=right) _idx = min(int(right), singular_values.size - 1) ax.set_ylim(bottom=singular_values[_idx] / 5) # Draw cutoff value(s). if threshold: rborder = ax.get_xlim()[1] ylim = ax.get_ylim() for sigma, r in zip(threshold, ranks): ax.axhline(sigma, color="gray", linewidth=0.5) ax.text( rborder - 0.5, 1.05 * sigma, f"{sigma:.2e}", color="gray", horizontalalignment="right", verticalalignment="bottom", ) ax.axvline(r, color="gray", linewidth=0.5) ax.text( r + 0.5, 1.05 * ylim[0], f"r = {r}", color="gray", horizontalalignment="left", verticalalignment="bottom", ) ax.set_ylim(ylim) ax.set_xlabel("Singular value index") ax.set_ylabel("Normalized singular values") if threshold: return ranks[0] if one_threshold else ranks
[docs] def cumulative_energy( singular_values, threshold: float = 0.9999, plot: bool = True, right: int = None, ax: plt.Axes = None, **kwargs, ): r"""Compute the number of singular values needed to surpass a given cumulative energy threshold. The cumulative energy of :math:`r` singular values is defined by .. math:: \kappa_r = \sum_{i=1}^r\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2. This function determines the smalled :math:`r` such that :math:`\kappa_r \ge` ``threshold``. Parameters ---------- singular_values : (n,) ndarray Singular values of a snapshot matrix, e.g., ``scipy.linalg.svdvals(states)``. threshold : float or list[float] Energy capture threshold(s). Default is 99.99%. plot : bool If ``True``, plot the cumulative energy and the capture threshold(s) against the singular value index (linear scale). right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot the results on if ``plot=True``. If not given, a new single-axes figure is created. kwargs : dict Options to pass to :func:`matplotlib.pyplot.plot`. Returns ------- ranks : int or list[int] Number of singular values required to capture the specified energy. """ # Calculate the cumulative energy. svdvals2 = np.sort(singular_values)[::-1] ** 2 energy = np.cumsum(svdvals2) / np.sum(svdvals2) energy = np.concatenate(([0], energy)) # Determine the points at which the cumulative energy passes the threshold. if threshold: if one_threshold := np.isscalar(threshold): threshold = [threshold] ranks = [int(np.searchsorted(energy, xi)) for xi in threshold] if plot: # Plot cumulative energy and threshold value(s). if ax is None: ax = plt.figure().add_subplot(111) options = dict( linestyle="-", marker="o", color="C2", markersize=4, markeredgewidth=0, linewidth=0.5, zorder=3, ) options.update(kwargs) j = np.arange(singular_values.size + 1) ax.plot(j, energy, **options) ax.set_xlim(0, j.size) ylim = (ax.get_ylim()[0], 1.05) if right: right = min(right, j.size) ax.set_xlim(right=right) if threshold: rborder = ax.get_xlim()[1] for kappa, r in zip(threshold, ranks): ax.axhline(kappa, color="gray", linewidth=0.5) ax.text( rborder - 0.5, kappa + 0.01, f"{kappa:%}", color="gray", horizontalalignment="right", verticalalignment="bottom", ) ax.axvline(r, color="gray", linewidth=0.5) ax.text( r + 0.5, ylim[0] + 0.01, f"r = {r}", color="gray", horizontalalignment="left", verticalalignment="bottom", ) ax.set_ylim(ylim) ax.set_xlabel(r"Singular value index") ax.set_ylabel(r"Cumulative energy") if threshold: return ranks[0] if one_threshold else ranks
[docs] def residual_energy( singular_values, threshold: float = 1e-6, plot: bool = True, right: int = None, ax: plt.Axes = None, sqrt: bool = False, **kwargs, ): r"""Compute the number of singular values needed such that the residual energy drops beneath a given threshold. The residual energy of :math:`r` singular values is defined by .. math:: \epsilon_r = \sum_{i=r+1}^k\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2 = 1 - \left( \sum_{i=1}^r\sigma_i^2 \bigg/ \sum_{j=1}^k\sigma_j^2 \right) This function determines the smallest :math:`r` such that :math:`\epsilon_r \le` ``threshold``. Parameters ---------- singular_values : (n,) ndarray Singular values of a snapshot matrix, e.g., ``scipy.linalg.svdvals(states)``. threshold : float or list[float] Energy residual threshold(s). Default is 10^-6. plot : bool If ``True``, plot the residual energy and the threshold(s) against the singular value index. right : int or None Maximum singular value index to plot (``plt.xlim(right=right)``). ax : matplotlib.Axes or None Axes to plot the results on if ``plot=True``. If not given, a new single-axes figure is created. sqrt : bool If ``True``, square root the residual energies to get the projection error of the training snapshots. kwargs : dict Options to pass to :func:`matplotlib.pyplot.semilogy`. Returns ------- ranks : int or list[int] Number of singular values required to shrink the residual energy below the specified threshold. """ # Calculate the residual energy. svdvals2 = np.sort(singular_values)[::-1] ** 2 res_energy = 1 - (np.cumsum(svdvals2) / np.sum(svdvals2)) res_energy = np.concatenate(([1], res_energy)) if sqrt: res_energy = np.sqrt(res_energy) # Determine the points when the residual energy dips under the threshold. if threshold: if one_threshold := np.isscalar(threshold): threshold = [threshold] ranks = [np.count_nonzero(res_energy > eps) for eps in threshold] if plot: # Plot residual energy and threshold value(s). if ax is None: ax = plt.figure().add_subplot(111) options = dict( linestyle="-", marker="s", color="C1", markersize=4, markeredgewidth=0, linewidth=0.5, zorder=3, ) options.update(kwargs) j = np.arange(singular_values.size + 1) ax.semilogy(j, res_energy, **options) ax.set_xlim(0, j.size) bottom = res_energy[-2] if right: right = min(right, j.size) ax.set_xlim(right=right) bottom = res_energy[min(int(right) + 1, res_energy.size - 3)] bottom = max(bottom, 1e-17) ylim = (bottom, 1.1) if threshold: rborder = ax.get_xlim()[1] for epsilon, r in zip(threshold, ranks): ax.axhline(epsilon, color="gray", linewidth=0.5) ax.text( rborder - 0.5, 1.1 * epsilon, f"{epsilon:.2e}", color="gray", horizontalalignment="right", verticalalignment="bottom", ) ax.axvline(r, color="gray", linewidth=0.5) ax.text( r + 0.5, 1.1 * ylim[0], f"r = {r}", color="gray", horizontalalignment="left", verticalalignment="bottom", ) ax.set_ylim(ylim) ax.set_xlabel("Singular value index") if sqrt: ax.set_ylabel("Relative projection error") else: ax.set_ylabel("Residual energy") if threshold: return ranks[0] if one_threshold else ranks