# operators/_base.py
"""Abstract base classes for operators."""
__all__ = [
"InputMixin",
"OperatorTemplate",
"OpInfOperator",
"ParametricOperatorTemplate",
"ParametricOpInfOperator",
]
import os
import abc
import copy
import types
import numpy as np
import scipy.linalg as la
import scipy.sparse as sparse
import matplotlib.pyplot as plt
from .. import errors, utils
def has_inputs(obj) -> bool:
r"""Return ``True`` if ``obj`` is an operator object whose ``apply()``
method acts on the ``input_`` argument, i.e.,
:math:`\Ophat_{\ell}(\qhat,\u)` depends on :math:`\u`.
"""
return isinstance(obj, InputMixin)
# Nonparametric operators =====================================================
[docs]
class OperatorTemplate(abc.ABC):
r"""Template for general operators :math:`\Ophat_{\ell}(\qhat,\u).`
In this package, an "operator" is a function
:math:`\Ophat_{\ell}: \RR^r \times \RR^m \to \RR^r` that acts on a state
vector :math:`\qhat\in\RR^r` and (optionally) an input vector
:math:`\u\in\RR^m`.
Models are defined as the sum of several operators,
for example, an :class:`opinf.models.ContinuousModel` object represents a
system of ordinary differential equations:
.. math::
\ddt\qhat(t)
= \sum_{\ell=1}^{n_\textrm{terms}}\Ophat_{\ell}(\qhat(t),\u(t)).
Notes
-----
This class can be used for custom nonparametric model terms that are not
learnable with Operator Inference.
For parametric model terms, see :class:`ParametricOperatorTemplate`.
For model terms that can be learned with Operator Inference, see
:class:`OpInfOperator` or :class:`ParametricOpInfOperator`.
"""
# Properties --------------------------------------------------------------
@property
@abc.abstractmethod
def state_dimension(self) -> int:
r"""Dimension :math:`r` of the state :math:`\qhat` that the operator
acts on.
"""
raise NotImplementedError # pragma: no cover
def __str__(self) -> str:
"""String representation: class name + dimensions."""
out = [self.__class__.__name__]
out.append(f"state_dimension: {self.state_dimension}")
if has_inputs(self):
out.append(f"input_dimension: {self.input_dimension}")
return "\n ".join(out)
@staticmethod
def _str(statestr, inputstr=None):
"""String representation of the operator, used when printing out the
structure of a model.
Parameters
----------
statestr : str
String representation of the state, e.g., ``"q(t)"`` or ``"q_j"``.
inputstr : str
String representation of the input, e.g., ``"u(t)"`` or ``"u_j"``.
Returns
-------
opstr : str
String representation of the operator acting on the state/input,
e.g., ``"Aq(t)"`` or ``"Bu(t)"`` or ``"H[q(t) ⊗ q(t)]"``.
"""
return f"f({statestr}, {inputstr})"
# Evaluation --------------------------------------------------------------
[docs]
@abc.abstractmethod
def apply(self, state: np.ndarray, input_=None) -> np.ndarray:
"""Apply the operator mapping to the given state / input.
Parameters
----------
state : (r,) or (r, k) ndarray
State vector or matrix of state vectors.
input_ : (m,) or (m, k) ndarray or None
Input vector or matrix of input vectors.
Returns
-------
out : (r,) or (r, k) ndarray
Application of the operator to the state / input, with the same
number of dimensions as ``state`` and (if provided) ``input_``.
"""
raise NotImplementedError # pragma: no cover
[docs]
def jacobian(self, state: np.ndarray, input_=None) -> np.ndarray:
r"""Construct the state Jacobian of the operator.
If :math:`[\![\q]\!]_{i}` denotes the :math:`i`-th entry of a vector
:math:`\q`, then the :math:`(i,j)`-th entry of the state Jacobian is
given by
.. math::
[\![\ddqhat\Ophat(\qhat,\u)]\!]_{i,j}
= \frac{\partial}{\partial[\![\qhat]\!]_j}
[\![\Ophat(\qhat,\u)]\!]_i.
Parameters
----------
state : (r,) ndarray
State vector.
input_ : (m,) ndarray or float or None
Input vector.
Returns
-------
jac : (r, r) ndarray
State Jacobian.
"""
raise NotImplementedError # pragma: no cover
# Dimensionality reduction ------------------------------------------------
[docs]
def galerkin(self, Vr: np.ndarray, Wr=None):
r"""Get the (Petrov-)Galerkin projection of this operator.
Consider an operator :math:`\Op(\q,\u)`, where :math:`\q\in\RR^n`
is the state and :math:`\u\in\RR^m` is the input.
Given a *trial basis* :math:`\Vr\in\RR^{n\times r}` and a *test basis*
:math:`\Wr\in\RR^{n\times r}`, the Petrov-Galerkin projection of
:math:`\Op` is the operator :math:`\Ophat:\RR^r\times\RR^m\to\RR^r`
defined by
.. math::
\Ophat(\qhat, \u) = (\Wr\trp\Vr)^{-1}\Wr\trp\Op(\Vr\qhat, \u)
where :math:`\qhat\in\RR^n` approximates the original state via
:math:`\q \approx \Vr\qhat`.
Parameters
----------
Vr : (n, r) ndarray
Basis for the trial space :math:`\Vr`.
Wr : (n, r) ndarray or None
Basis for the test space :math:`\Wr`.
If ``None`` (default), use ``Vr`` as the test basis.
Returns
-------
op : :class:`OperatorTemplate`
New operator object whose ``state_dimension``
attribute equals ``r``. If this operator acts on inputs, the
``input_dimension`` attribute of the new operator should be
``self.input_dimension``.
"""
raise NotImplementedError # pragma: no cover
# Model persistence -------------------------------------------------------
[docs]
def copy(self):
"""Return a copy of the operator using :func:`copy.deepcopy()`."""
return copy.deepcopy(self)
[docs]
def save(self, savefile: str, overwrite: bool = False) -> None:
"""Save the operator to an HDF5 file.
Parameters
----------
savefile : str
Path of the file to save the basis in.
overwrite : bool
If ``True``, overwrite the file if it already exists. If ``False``
(default), raise a ``FileExistsError`` if the file already exists.
"""
raise NotImplementedError # pragma: no cover
[docs]
@classmethod
def load(cls, loadfile: str):
"""Load an operator from an HDF5 file.
Parameters
----------
loadfile : str
Path to the file where the operator was stored via :meth:`save()`.
"""
raise NotImplementedError # pragma: no cover
# Verification ------------------------------------------------------------
[docs]
def verify(
self,
plot: bool = False,
*,
k: int = 10,
ntests: int = 4,
) -> None:
"""Verify consistency between dimension properties and required
methods.
This method verifies :meth:`apply()` and, if implemented,
:meth:`jacobian()`, :meth:`galerkin()`, :meth:`copy()`,
:meth:`save()`, and :meth:`load()`.
Parameters
----------
plot : bool
If ``True``, plot the relative errors of the finite difference
check for :meth:`jacobian()` as a function of the perturbation
size.
If ``False`` (default), print a report of the relative errors.
Nothing is plotted or printed if :meth:`jacobian()` is not
implemented.
Notes
-----
This method does **not** verify the correctness of :meth:`apply()`,
only that it returns an output with the expected shape. However,
if :meth:`jacobian()` is implemented, a finite difference check is
applied to check that :meth:`apply()` and :meth:`jacobian()` are
consistent.
"""
# Verify dimensions exist and are valid.
if not isinstance((r := self.state_dimension), int) or r <= 0:
raise errors.VerificationError(
"state_dimension must be a positive integer "
f"(current value: {repr(r)}, of type '{type(r).__name__}')"
)
if hasinputs := has_inputs(self):
if not isinstance((m := self.input_dimension), int) or m <= 0:
raise errors.VerificationError(
"input_dimension must be a positive integer "
f"(current value: {repr(m)}, of type '{type(r).__name__}')"
)
else:
m = 0
# Verify apply() - - - - - - - - - - - - - - - - - - - - - - - - - - -
Q = np.random.random((r, k))
q = Q[:, 0]
U, u = None, None
if hasinputs:
U = np.random.random((m, k))
u = U[:, 0]
out = self.apply(q, u)
if not isinstance(out, np.ndarray) or out.shape != (r,):
_message = [
"apply(q, u) must return array of shape (state_dimension,)",
"when q.shape = (state_dimension,)",
"and u = None",
]
if hasinputs:
_message[-1] = "and u.shape = (input_dimension,)"
raise errors.VerificationError(" ".join(_message))
out = self.apply(Q, U)
if not isinstance(out, np.ndarray) or out.shape != (r, k):
_message = [
"apply(Q, U) must return array of shape (state_dimension, k)",
"when Q.shape = (state_dimension, k)",
"and U = None",
]
if hasinputs:
_message[-1] = "and U.shape = (input_dimension, k)"
raise errors.VerificationError(" ".join(_message))
# Report successes.
def _report_isconsistent(method):
_message = f"{method} is consistent with state_dimension"
if hasinputs:
_message += " and input_dimension"
print(_message)
_report_isconsistent("apply()")
# Verify jacobian() - - - - - - - - - - - - - - - - - - - - - - - - - -
def _gradient(f, x, h=1e-8):
"""Estimate the Jacobian of f:R^n -> R^m at x using perturbations
of magnitude h.
"""
E = np.eye((n := x.size))
return np.array([(f(x + h * E[i]) - f(x)) / h for i in range(n)]).T
def _finite_difference_check(f, df, x, hs=None):
"""Compare analytical and numerical derivatives."""
dfx = df(x)
if hs is None:
hs = np.logspace(-10, -1, 10)[::-1]
return hs, np.array(
[la.norm(_gradient(f, x, h) - dfx) / la.norm(dfx) for h in hs]
)
try:
out = self.jacobian(q, u)
except NotImplementedError:
print("jacobian() not implemented")
else:
if np.isscalar(out) and out == 0:
print("jacobian() = 0")
elif not isinstance(out, np.ndarray) or out.shape != (r, r):
_message = [
"jacobian(q, u) must return array",
"of shape (state_dimension, state_dimension)",
"when q.shape = (state_dimension,)",
"and u = None",
]
if hasinputs:
_message[-1] = "and u.shape = (input_dimension,)"
raise errors.VerificationError(" ".join(_message))
else:
_report_isconsistent("jacobian()")
# Finite difference check.
hs, diffs = _finite_difference_check(
lambda x: self.apply(x, u),
lambda x: self.jacobian(x, u),
np.random.standard_normal(r),
)
if plot:
plt.loglog(hs, diffs, ".-", markersize=5, linewidth=0.5)
else:
print(
"jacobian() finite difference relative errors",
" ------------------------------------------",
sep="\n",
)
for h, err in zip(hs, diffs):
print(f" h = {h:.2e}\terror = {err:.4e}")
# Verify galerkin() - - - - - - - - - - - - - - - - - - - - - - - - - -
def _orth(n, k):
"""Get an n x k matrix with orthonormal columns."""
return la.qr(np.random.standard_normal((n, k)), mode="economic")[0]
if r > 1:
rnew = r // 2
Vr = _orth(r, rnew)
Wr = _orth(r, rnew)
try:
out = self.galerkin(Vr, Wr)
except NotImplementedError:
print("galerkin() not implemented")
else:
if not isinstance(out, OperatorTemplate):
raise errors.VerificationError(
"galerkin() must return object "
"whose class inherits from OperatorTemplate"
)
if out.state_dimension != rnew:
raise errors.VerificationError(
"galerkin(Vr, Wr).state_dimension != Vr.shape[1]"
)
if hasinputs and out.input_dimension != m:
raise errors.VerificationError(
"self.galerkin(Vr, Wr).input_dimension "
"!= self.input_dimension"
)
WrTVr_LU = la.lu_factor(Wr.T @ Vr)
for _ in range(ntests):
qr = np.random.random(rnew)
full = la.lu_solve(WrTVr_LU, Wr.T @ self.apply(Vr @ qr, u))
reduced = out.apply(qr, u)
if not np.allclose(reduced, full):
raise errors.VerificationError(
"op2.apply(qr, u) != "
"inv(Wr.T @ Vr) @ Wr.T @ self.apply(Vr @ qr, u) "
"where op2 = self.galerkin(Vr, Wr)"
)
out = self.galerkin(Vr)
for _ in range(ntests):
qr = np.random.random(rnew)
full = Vr.T @ self.apply(Vr @ qr, u)
reduced = out.apply(qr, u)
if not np.allclose(reduced, full):
raise errors.VerificationError(
"op2.apply(qr, u) != "
"Vr.T @ self.apply(Vr @ qr, u) "
"where op2 = self.galerkin(Vr) and Vr.T @ Vr = I"
)
print("galerkin() is consistent with apply()")
else:
print("cannot test galerkin() when state_dimension = 1")
# Verify copy() - - - - - - - - - - - - - - - - - - - - - - - - - - - -
out = self.copy()
if out is self:
raise errors.VerificationError("self.copy() is self")
if out.__class__ is not self.__class__:
raise errors.VerificationError(
"type(self.copy()) is not type(self)"
)
if out.state_dimension != r:
raise errors.VerificationError(
"self.copy().state_dimension != self.state_dimension"
)
if hasinputs and out.input_dimension != m:
raise errors.VerificationError(
"self.copy().input_dimension != self.input_dimension"
)
_report_isconsistent("copy()")
for _ in range(ntests):
q = np.random.random(r)
if not np.allclose(out.apply(q, u), self.apply(q, u)):
raise errors.VerificationError(
"self.copy().apply() not consistent with self.apply()"
)
print("copy() preserves the results of apply()")
# Verify save()/load() - - - - - - - - - - - - - - - - - - - - - - - -
tempfile = "_operatorverification.h5"
try:
self.save(tempfile)
out = self.load(tempfile)
except NotImplementedError:
print("save() and/or load() not implemented")
else:
if out.__class__ is not self.__class__:
raise errors.VerificationError(
"save()/load() does not preserve object type"
)
if out.state_dimension != r:
raise errors.VerificationError(
"save()/load() does not preserve state_dimension"
)
if hasinputs and out.input_dimension != m:
raise errors.VerificationError(
"save()/load() does not preserve input_dimension"
)
_report_isconsistent("save()/load()")
for _ in range(ntests):
q = np.random.random(r)
if not np.allclose(out.apply(q, u), self.apply(q, u)):
raise errors.VerificationError(
"save()/load() does not preserve the result of apply()"
)
print("save()/load() preserves the results of apply()")
if os.path.isfile(tempfile): # pragma: no cover
os.remove(tempfile)
def is_nonparametric(obj) -> bool:
"""Return ``True`` if ``obj`` is a nonparametric operator object."""
return isinstance(obj, OperatorTemplate)
[docs]
class OpInfOperator(OperatorTemplate):
r"""Template for nonparametric operators that can be calibrated through
Operator Inference, i.e.,
:math:`\Ophat_{\ell}(\qhat, \u) = \Ohat_{\ell}\d(\qhat, \u)`.
In this package, an "operator" is a function
:math:`\Ophat_{\ell}: \RR^r \times \RR^m \to \RR^r` that acts on a state
vector :math:`\qhat\in\RR^r` and (optionally) an input vector
:math:`\u\in\RR^m`.
Models are defined as the sum of several operators,
for example, an :class:`opinf.models.ContinuousModel` object represents a
system of ordinary differential equations:
.. math::
\ddt\qhat(t)
= \sum_{\ell=1}^{n_\textrm{terms}}\Ophat_{\ell}(\qhat(t),\u(t)).
Operator Inference calibrates operators that can be written as the product
of a matrix and some known (possibly nonlinear) function of the state
and/or input:
.. math::
\Ophat_{\ell}(\qhat, \u)
= \Ohat_{\ell}\d(\qhat, \u),
where :math:`\Ohat_{\ell}\in\RR^{r\times d}` is a constant matrix, called
the *operator entries*, and :math:`\d_{\ell}:\RR^r\times\RR^m\to\RR^d`.
Notes
-----
* To define operators with a more general structure than
:math:`\Ohat_{\ell}\d(\qhat, \u),` see :class:`OperatorTemplate`.
* If the operator entries :math:`\Ohat_{\ell}` depend on one or more
external parameters, it is called a *parametric operator*.
See :class:`ParametricOpInfOperator`.
"""
# Initialization ----------------------------------------------------------
def __init__(self, entries=None):
"""Initialize an empty operator."""
self._clear()
if entries is not None:
self.set_entries(entries)
def _clear(self):
"""Delete operator ``entries`` and related attributes."""
self.__entries = None
@staticmethod
def _validate_entries(entries):
"""Ensure argument is a NumPy array and screen for NaN, Inf entries."""
if sparse.issparse(entries):
return
if not isinstance(entries, np.ndarray):
raise TypeError(
"operator entries must be NumPy or scipy.sparse array"
)
if np.any(np.isnan(entries)):
raise ValueError("operator entries must not be NaN")
elif np.any(np.isinf(entries)):
raise ValueError("operator entries must not be Inf")
[docs]
def set_entries(self, entries) -> None:
"""Set the :attr:`entries` attribute."""
self.__entries = entries
# Properties --------------------------------------------------------------
@property
def entries(self) -> np.ndarray:
r"""Discrete representation of the operator,
the matrix :math:`\Ohat`.
"""
return self.__entries
@entries.setter
def entries(self, entries):
"""Set the ``entries`` attribute."""
self.set_entries(entries)
@entries.deleter
def entries(self):
"""Reset the ``entries`` attribute."""
self._clear()
@property
def shape(self) -> tuple:
"""Shape of the operator matrix."""
return None if self.entries is None else self.entries.shape
@property
def state_dimension(self) -> int:
r"""Dimension :math:`r` of the state :math:`\qhat` that the operator
acts on.
"""
return None if self.entries is None else self.entries.shape[0]
# Magic methods -----------------------------------------------------------
def __getitem__(self, key):
"""Slice into the entries of the operator."""
return None if self.entries is None else self.entries[key]
def __eq__(self, other):
"""Two operator objects are equal if they are of the same class
and have the same ``entries`` array.
"""
if not isinstance(other, self.__class__):
return False
if (self.entries is None and other.entries is not None) or (
self.entries is not None and other.entries is None
):
return False
if self.entries is not None:
if self.shape != other.shape:
return False
return np.all(self.entries == other.entries)
return True
def __add__(self, other):
"""Nonparametric operators are linear in their entries."""
if (ocls := other.__class__) is not (scls := self.__class__):
raise TypeError(
f"can't add object of type '{ocls.__name__}' "
f"to object of type '{scls.__name__}'"
)
return scls(self.entries + other.entries)
def __str__(self):
out = OperatorTemplate.__str__(self)
return out + f"\n entries.shape: {self.shape}"
# Evaluation --------------------------------------------------------------
[docs]
@utils.requires("entries")
def jacobian(self, state, input_=None) -> np.ndarray: # pragma: no cover
r"""Construct the state Jacobian of the operator.
If :math:`[\![\q]\!]_{i}` denotes the :math:`i`-th entry of a vector
:math:`\q`, then the :math:`(i,j)`-th entry of the state Jacobian is
given by
.. math::
[\![\ddqhat\Ophat(\qhat,\u)]\!]_{i,j}
= \frac{\partial}{\partial[\![\qhat]\!]_j}
[\![\Ophat(\qhat,\u)]\!]_i.
If a child class does not implement this method, it is assumed that
the Jacobian is zero (i.e., the operator does not act on the state).
Parameters
----------
state : (r,) ndarray
State vector.
input_ : (m,) ndarray or float or None
Input vector.
Returns
-------
jac : (r, r) ndarray
State Jacobian.
"""
return 0
# Dimensionality reduction ------------------------------------------------
def _galerkin(self, Vr, Wr, func):
r"""Get the (Petrov-)Galerkin projection of this operator.
Subclasses may implement this function as follows:
.. code-block:: python
@utils.requires("entries")
def galerkin(self, Vr, Wr=None):
'''Docstring'''
return self._galerkin(Vr, Wr, lambda A, V: <TODO>)
Parameters
----------
Vr : (n, r) ndarray
Basis for the trial space.
Wr : (n, r) ndarray or None
Basis for the test space. If ``None``, defaults to ``Vr``.
func : callable
Function that accepts the operator ``entries`` and the trial
basis ``Vr`` and computes the right side of the projection,
i.e, the result of substituting :math:`\q` with :math:`\Vr\qhat`.
For example, for linear operator :math:`\Op_\ell(\q) = \A\q`,
``func`` should return :math:`\A\Vr`.
Returns
-------
op : operator
New object of the same class as ``self``.
"""
if Wr is None:
Wr = Vr
n, r = Wr.shape
if self.entries.shape[0] != n:
raise errors.DimensionalityError("basis and operator not aligned")
if Vr.shape[1] != r:
raise errors.DimensionalityError(
"trial and test bases not aligned"
)
entries = Wr.T @ func(self.entries, Vr)
if not np.allclose((WrTVr := Wr.T @ Vr), np.eye(r)):
entries = la.solve(WrTVr, entries)
return self.__class__(entries)
# Operator inference ------------------------------------------------------
[docs]
@staticmethod
@abc.abstractmethod
def operator_dimension(r: int, m: int = None) -> int:
r"""Column dimension of the operator entries.
Child classes should decorate this method with ``@staticmethod``.
Parameters
----------
r : int
State dimension.
m : int or None
Input dimension.
Returns
-------
d : int
Number of columns in the operator entries matrix.
This is also the number of rows in the data matrix block.
"""
raise NotImplementedError # pragma: no cover
[docs]
@staticmethod
@abc.abstractmethod
def datablock(states: np.ndarray, inputs=None) -> np.ndarray:
r"""Construct the data matrix block corresponding to the operator.
For a nonparametric operator
:math:`\Ophat_{\ell}(\qhat,\u) = \Ohat_{\ell}\d_{\ell}(\qhat, \u)`,
the data matrix block corresponding to the data pairs
:math:`\{(\qhat_j,\u_j)\}_{j=0}^{k-1}` is the matrix
.. math::
\D\trp = \left[\begin{array}{c|c|c|c}
& & & \\
\d_{\ell}(\qhat_0,\u_0) & \d_{\ell}(\qhat_1,\u_1)
& \cdots &
\\d_{\ell}(\qhat_{k-1},\u_{k-1})
\\ & & &
\end{array}\right]
\in \RR^{d \times k}.
Here, ``states`` is the snapshot matrix
:math:`[~\qhat_0~~\cdots~~\qhat_{k-1}~]`
and ``inputs`` is the (optional) input matrix
:math:`[~\u_0~~\cdots~~\u_{k-1}~]`.
Child classes should decorate this method with ``@staticmethod``.
Parameters
----------
states : (r, k) or (k,) ndarray
State vectors. Each column is a single state vector.
If one dimensional, it is assumed that :math:`r = 1`.
inputs : (m, k) or (k,) ndarray or None
Input vectors. Each column is a single input vector.
If one dimensional, it is assumed that :math:`m = 1`.
Returns
-------
block : (d, k) or (d,) ndarray
Data matrix block. Here, :math:`d` is ``entries.shape[1]``.
"""
raise NotImplementedError # pragma: no cover
# Model persistence -------------------------------------------------------
[docs]
def copy(self):
"""Return a copy of the operator."""
entries = self.entries.copy() if self.entries is not None else None
return self.__class__(entries)
[docs]
def save(self, savefile: str, overwrite: bool = False) -> None:
"""Save the operator to an HDF5 file.
Parameters
----------
savefile : str
Path of the file to save the basis in.
overwrite : bool
If ``True``, overwrite the file if it already exists. If ``False``
(default), raise a ``FileExistsError`` if the file already exists.
"""
with utils.hdf5_savehandle(savefile, overwrite) as hf:
meta = hf.create_dataset("meta", shape=(0,))
meta.attrs["class"] = self.__class__.__name__
if self.entries is not None:
hf.create_dataset("entries", data=self.entries)
[docs]
@classmethod
def load(cls, loadfile: str):
"""Load an operator from an HDF5 file.
Parameters
----------
loadfile : str
Path to the file where the operator was stored via :meth:`save()`.
"""
with utils.hdf5_loadhandle(loadfile) as hf:
if (ClassName := hf["meta"].attrs["class"]) != cls.__name__:
raise TypeError(
f"file '{loadfile}' contains '{ClassName}' "
f"object, use '{ClassName}.load()"
)
return cls(hf["entries"][:] if "entries" in hf else None)
# Verification ------------------------------------------------------------
[docs]
def verify(
self,
plot: bool = False,
*,
k: int = 10,
ntests: int = 4,
r: int = 6,
m: int = 3,
) -> None:
"""Verify consistency between dimension properties and required
methods.
This method verifies :meth:`apply()` and, if implemented,
:meth:`jacobian()`, :meth:`galerkin()`, :meth:`copy()`,
:meth:`save()`, and :meth:`load()`.
If the :attr:`entries` are not set, no checks are made.
Parameters
----------
plot : bool
If ``True``, plot the relative errors of the finite difference
check for :meth:`jacobian()` as a function of the perturbation
size.
If ``False`` (default), print a report of the relative errors.
Nothing is plotted or printed if :meth:`jacobian()` is not
implemented.
Notes
-----
This method does **not** verify the correctness of :meth:`apply()`,
only that it returns an output with the expected shape. However,
if :meth:`jacobian()` is implemented, a finite difference check is
applied to check that :meth:`apply()` and :meth:`jacobian()` are
consistent.
"""
# Verify operator_dimension() - - - - - - - - - - - - - - - - - - - - -
if not isinstance(self.operator_dimension, types.FunctionType):
raise errors.VerificationError(
"operator_dimension() must have @staticmethod decorator"
)
d = self.operator_dimension(r, m)
if not isinstance(d, int) or d <= 0:
raise errors.VerificationError(
"operator_dimension() must return a positive integer"
)
# Verify datablock() - - - - - - - - - - - - - - - - - - - - - - - - -
if not isinstance(self.datablock, types.FunctionType):
raise errors.VerificationError(
"datablock() must have @staticmethod decorator"
)
Dt = self.datablock(np.random.random((r, k)), np.random.random((m, k)))
if not isinstance(Dt, np.ndarray) or Dt.ndim != 2:
raise errors.VerificationError(
"datablock() must return a two-dimensional array"
)
if Dt.shape != (d, k):
raise errors.VerificationError(
"datablock().shape[0] != operator_dimension()"
)
print("operator_dimension() is consistent with datablock()")
# Verify instance methods - - - - - - - - - - - - - - - - - - - - - - -
if self.entries is None:
return print("cannot verify apply() when entries=None")
OperatorTemplate.verify(self, plot=plot, k=k, ntests=ntests)
# Parametric operators ========================================================
[docs]
class ParametricOperatorTemplate(abc.ABC):
r"""Template for operators that depend on external parameters,
:math:`\Ophat_{\ell}(\qhat,\u;\bfmu).`
In this package, a parametric "operator" is a function
:math:`\Ophat_{\ell}: \RR^r \times \RR^m \times \RR^p \to \RR^r` that acts
on a state vector :math:`\qhat\in\RR^r`, an (optional) input vector
:math:`\u\in\RR^m`, and a parameter vector :math:`\bfmu\in\RR^p`.
Parametric models are defined as the sum of several operators, at least
one of which is parametric.
For example, a system of ODEs:
.. math::
\ddt\qhat(t;\bfmu)
= \sum_{\ell=1}^{n_\textrm{terms}}\Ophat_{\ell}(\qhat(t),\u(t);\bfmu).
Notes
-----
This class can be used for custom nonparametric model terms that are not
learnable with Operator Inference.
For nonparametric model terms, see :class:`OperatorTemplate`.
For model terms that can be learned with Operator Inference, see
:class:`OpInfOperator` or :class:`ParametricOpInfOperator`.
"""
# Nonparametric operator class that this parametric operator evaluates to.
_OperatorClass = NotImplemented
# Properties --------------------------------------------------------------
@property
@abc.abstractmethod
def state_dimension(self) -> int:
r"""Dimension :math:`r` of the state :math:`\qhat` that the operator
acts on.
"""
raise NotImplementedError # pragma: no cover
@property
@abc.abstractmethod
def parameter_dimension(self) -> int:
r"""Dimension :math:`p` of the parameter vector :math:`\bfmu` that the
operator matrix depends on.
"""
raise NotImplementedError # pragma: no cover
def __str__(self) -> str:
"""String representation: class name, dimensions, evaluation type."""
out = [self.__class__.__name__]
out.append(f"state_dimension: {self.state_dimension}")
if has_inputs(self):
out.append(f"input_dimension: {self.input_dimension}")
out.append(f"parameter_dimension: {self.parameter_dimension}")
out.append(f"evaluate(parameter) -> {self._OperatorClass.__name__}")
return "\n ".join(out)
def __repr__(self) -> str:
return utils.str2repr(self)
# Evaluation --------------------------------------------------------------
def _check_parametervalue_dimension(self, parameter):
"""Ensure a new parameter value has the expected shape."""
if (pdim := self.parameter_dimension) is None:
raise RuntimeError("parameter_dimension not set")
if np.atleast_1d(parameter).shape[0] != pdim:
raise ValueError(f"expected parameter of shape ({pdim:d},)")
[docs]
@abc.abstractmethod
def evaluate(self, parameter):
r"""Evaluate the operator at the given parameter value,
resulting in a nonparametric operator.
Parameters
----------
parameter : (p,) ndarray or float
Parameter value :math:`\bfmu` at which to evalute the operator.
Returns
-------
op : nonparametric :mod:`opinf.operators` operator
Nonparametric operator corresponding to the parameter value.
This should be an instance of :class:`OperatorTemplate` (or
a class that inherits from it).
"""
raise NotImplementedError # pragma: no cover
[docs]
def apply(self, parameter, state, input_=None):
r"""Apply the operator to the given state and input
at the specified parameter value, :math:`\Ophat_\ell(\qhat,\u;\bfmu)`.
Parameters
----------
parameter : (p,) ndarray or float
Parameter value.
state : (r,) ndarray
State vector.
input_ : (m,) ndarray or float or None
Input vector.
Returns
-------
(r,) ndarray
Notes
-----
For repeated calls with the same parameter value, use
:meth:`evaluate()` to first get the nonparametric operator
corresponding to the parameter value.
.. code-block::
# Instead of this...
>>> values = [parametric_operator.apply(parameter, q, u)
... for q, u in zip(list_of_states, list_of_inputs)]
# ...it is faster to do this.
>>> operator_at_parameter = parametric_operator.evaluate(parameter)
>>> values = [operator_at_parameter.apply(q, u)
... for q, u in zip(list_of_states, list_of_inputs)]
"""
return self.evaluate(parameter).apply(state, input_)
[docs]
def jacobian(self, parameter, state, input_=None):
r"""Construct the state Jacobian of the operator,
:math:`\ddqhat\Ophat_\ell(\qhat,\u;\bfmu)`.
If :math:`[\![\q]\!]_{i}` denotes the entry :math:`i` of a vector
:math:`\q`, then the entries of the state Jacobian are given by
.. math::
[\![\ddqhat\Ophat_\ell(\qhat,\u;\bfmu)]\!]_{i,j}
= \frac{\partial}{\partial[\![\qhat]\!]_j}
[\![\Ophat_\ell(\qhat,\u;\bfmu)]\!]_i.
Parameters
----------
parameter : (p,) ndarray or float
Parameter value.
state : (r,) ndarray
State vector.
input_ : (m,) ndarray or float or None
Input vector.
Returns
-------
jac : (r, r) ndarray
State Jacobian.
Notes
-----
For repeated calls with the same parameter value, use
:meth:`evaluate()` to first get the nonparametric operator
corresponding to the parameter value.
.. code-block::
# Instead of this...
>>> values = [parametric_operator.jacobian(parameter, q, u)
... for q, u in zip(list_of_states, list_of_inputs)]
# ...it is faster to do this.
>>> operator_at_parameter = parametric_operator.evaluate(parameter)
>>> values = [operator_at_parameter.jacobian(q, u)
... for q, u in zip(list_of_states, list_of_inputs)]
"""
return self.evaluate(parameter).jacobian(state, input_)
# Dimensionality reduction ------------------------------------------------
[docs]
def galerkin(self, Vr, Wr=None):
r"""Get the (Petrov-)Galerkin projection of this operator.
Consider an operator :math:`\Op(\q,\u)`, where :math:`\q\in\RR^n`
is the state and :math:`\u\in\RR^m` is the input.
Given a *trial basis* :math:`\Vr\in\RR^{n\times r}` and a *test basis*
:math:`\Wr\in\RR^{n\times r}`, the Petrov-Galerkin projection of
:math:`\Op` is the operator :math:`\Ophat:\RR^r\times\RR^m\to\RR^r`
defined by
.. math::
\Ophat(\qhat, \u) = (\Wr\trp\Vr)^{-1}\Wr\trp\Op(\Vr\qhat, \u)
where :math:`\qhat\in\RR^n` approximates the original state via
:math:`\q \approx \Vr\qhat`.
Parameters
----------
Vr : (n, r) ndarray
Basis for the trial space.
Wr : (n, r) ndarray or None
Basis for the test space. If ``None``, defaults to ``Vr``.
Returns
-------
op : operator
New object of the same class as ``self`` whose ``state_dimension``
attribute equals ``r``. If this operator acts on inputs, the
``input_dimension`` attribute of the new operator should be
``self.input_dimension``.
"""
raise NotImplementedError # pragma: no cover
# Model persistence -------------------------------------------------------
[docs]
def copy(self):
"""Return a copy of the operator."""
return copy.deepcopy(self) # pragma: no cover
[docs]
def save(self, savefile: str, overwrite: bool = False) -> None:
"""Save the operator to an HDF5 file.
Parameters
----------
savefile : str
Path of the file to save the basis in.
overwrite : bool
If ``True``, overwrite the file if it already exists. If ``False``
(default), raise a ``FileExistsError`` if the file already exists.
"""
raise NotImplementedError # pragma: no cover
[docs]
@classmethod
def load(cls, loadfile: str):
"""Load an operator from an HDF5 file.
Parameters
----------
loadfile : str
Path to the file where the operator was stored via :meth:`save()`.
"""
raise NotImplementedError # pragma: no cover
# Verification ------------------------------------------------------------
[docs]
def verify(self, testparam=None):
"""Verify dimension attributes and :meth:`evaluate()`.
Parameters
----------
testparam : (p,) ndarray or None
Test parameter at which to evaluate the operator.
If ``None`` (default), draw test parameter entries from the
standard Normal distribution.
"""
# Check the _OperatorClass.
if not issubclass(self._OperatorClass, OperatorTemplate):
raise errors.VerificationError(
"_OperatorClass must be a nonparametric operator type"
)
# Verify dimensions exist and are valid.
if not isinstance((r := self.state_dimension), int) or r <= 0:
raise errors.VerificationError(
"state_dimension must be a positive integer "
f"(current value: {repr(r)}, of type '{type(r).__name__}')"
)
if hasinputs := has_inputs(self):
if not isinstance((m := self.input_dimension), int) or m <= 0:
raise errors.VerificationError(
"input_dimension must be a positive integer "
f"(current value: {repr(m)}, of type '{type(r).__name__}')"
)
else:
m = 0
# Get a test parameter.
if testparam is None:
testparam = np.random.standard_normal(self.parameter_dimension)
if np.shape(testparam) != (self.parameter_dimension,):
raise ValueError("testparam.shape != (parameter_dimension,)")
# Evaluate the operator at the test parameter.
op_evaluated = self.evaluate(testparam)
if not isinstance(op_evaluated, self._OperatorClass):
raise errors.VerificationError(
"evaluate() must return instance of type _OperatorClass"
)
if not is_nonparametric(op_evaluated):
raise errors.VerificationError(
"_OperatorClass must be a nonparametric operator type"
)
if op_evaluated.state_dimension != self.state_dimension:
raise errors.VerificationError(
"result of evaluate() does not retain the state_dimension"
)
if hasinputs:
if not has_inputs(op_evaluated):
raise errors.VerificationError(
"result of evaluate() should depend on inputs"
)
if op_evaluated.input_dimension != m:
raise errors.VerificationError(
"result of evaluate() does not retain the input_dimension"
)
else:
if has_inputs(op_evaluated):
raise errors.VerificationError(
"result of evaluate() should not depend on inputs"
)
def is_parametric(obj) -> bool:
"""Return ``True`` if ``obj`` is a parametric operator object."""
return isinstance(obj, ParametricOperatorTemplate)
[docs]
class ParametricOpInfOperator(ParametricOperatorTemplate):
r"""Template for operators that depend on external parameters, and which
can be calibrated through operator inference, i.e.,
:math:`\Ophat_\ell(\qhat,\u;\bfmu) = \Ohat_\ell(\bfmu)\d_\ell(\qhat,\u)`.
"""
# Initialization ----------------------------------------------------------
def __init__(self):
"""Initialize the parameter_dimension."""
self._clear()
def _clear(self) -> None:
"""Reset the operator to its post-constructor state."""
self.__p = None
self.__entries = None
def _set_parameter_dimension_from_values(self, parameters) -> None:
"""Extract and save the dimension of the parameter space from a set of
one or more parameter values.
Parameters
----------
parameters : (s, p) or (s,) ndarray
Parameter value(s).
"""
if (dim := len(shape := np.shape(parameters))) == 1:
self.__p = 1
elif dim == 2:
self.__p = shape[1]
else:
raise ValueError("parameter values must be scalars or 1D arrays")
# Verification ------------------------------------------------------------
@staticmethod
def _check_shape_consistency(iterable, prefix: str) -> None:
"""Ensure that each array in `iterable` has the same shape."""
shape = np.shape(iterable[0])
if any(np.shape(A) != shape for A in iterable):
raise ValueError(f"{prefix} shapes inconsistent")
# Properties --------------------------------------------------------------
@property
def state_dimension(self) -> int:
r"""Dimension :math:`r` of the state :math:`\qhat` that the operator
acts on.
"""
return None if self.entries is None else self.entries[0].shape[0]
@property
def parameter_dimension(self) -> int:
r"""Dimension :math:`p` of the parameter vector :math:`\bfmu` that the
operator matrix depends on.
"""
return self.__p
@parameter_dimension.setter
def parameter_dimension(self, p):
"""Set :attr:`parameter_dimension`.
Only allowed if :attr:`parameter_dimension` is currently ``None``.
"""
if self.__p is not None:
raise AttributeError(
"can't set property 'parameter_dimension' twice"
)
if not isinstance(p, int) or p < 1:
raise ValueError("parameter_dimension must be a positive integer")
self.__p = p
@property
def shape(self) -> tuple:
"""Shape of the operator matrix when evaluated
at a parameter value.
"""
return None if self.entries is None else self.entries[0].shape
@property
def entries(self):
r"""Arrays that define the operator matrix as a function of the
parameter vector.
"""
return self.__entries
[docs]
@abc.abstractmethod
def set_entries(self, entries, fromblock: bool = False) -> None:
r"""Set the arrays that define the operator matrix as a function of
the parameter vector.
Parameters
----------
entries : list of s (r, d) ndarrays, or (r, sd) ndarray
Operator entries, either as a list of arrays
(``fromblock=False``, default)
or as a horizontal concatenatation of arrays (``fromblock=True``).
fromblock : bool
If ``True``, interpret ``entries`` as a horizontal concatenation
of arrays; if ``False`` (default), interpret ``entries`` as a list
of arrays.
"""
self.__entries = entries
# Operator inference ------------------------------------------------------
[docs]
@abc.abstractmethod
def operator_dimension(self, s: int, r: int, m: int = None) -> int:
r"""Number of columns in the total operator matrix.
Parameters
----------
s : int
Number of training parameter values.
r : int
State dimension.
m : int or None
Input dimension.
Returns
-------
d : int
Number of columns in the total operator entries matrix.
This is also the number of rows in the data matrix block.
"""
raise NotImplementedError # pragma: no cover
[docs]
@abc.abstractmethod
def datablock(self, parameters, states, inputs=None):
r"""Return the data matrix block corresponding to the operator.
Parameters
----------
parameters : (s, p) ndarray
Traning parameter values :math:`\bfmu_{0},\ldots,\bfmu_{s-1}`.
states : list of s (r, k_i) ndarrays
State snapshots for each of the :math:`s` training parameter
values.
inputs : list of s (m, k_i) ndarrays
Inputs corresponding to the state snapshots.
Returns
-------
block : (D, K) ndarray
Data block for the parametric operator.
Here, :math:`D` is the total operator matrix dimension and
:math:`K = \sum_{i=0}^{s-1}k_i`, the total number of state
snapshots.
"""
raise NotImplementedError # pragma: no cover
def is_uncalibrated(obj) -> bool:
"""Return ``True`` if ``obj`` is an OpInf operator with empty entries."""
return (
isinstance(obj, (OpInfOperator, ParametricOpInfOperator))
and obj.entries is None
)