# ddt/_base.py
"""Template for time derivative estimators."""
__all__ = [
"DerivativeEstimatorTemplate",
]
import abc
import collections
import numpy as np
import scipy.linalg as la
import matplotlib.pyplot as plt
from .. import errors, utils
[docs]
class DerivativeEstimatorTemplate(abc.ABC):
r"""Template for time derivative estimators.
Operator Inference for time-continuous (semi-discrete) models requires
state snapshots and their time derivatives in order to learn operator
entries via regression. This class is a template for estimating the first
time derivative of state snapshots. Specifically, from a collection of
snapshots :math:`\qhat_0,\ldots,\qhat_{k-1}\in\RR^r` representing the state
at time instances :math:`t_0,\ldots,t_{k-1}`, the goal is to estimate
.. math::
\dot{\qhat}_j \approx \ddt\qhat(t)\bigg|_{t = t_j} \in \RR^{r}
for :math:`j = 0, \ldots, k - 1`.
Depending on the estimation strategy, the derivatives may only be computed
for a subset of the states. For example, a first-order backward difference
may omit an estimate for :math:`\dot{\qhat}_0`.
Parameters
----------
time_domain : (k,) ndarray
Time domain of the snapshot data.
"""
__tests = (
(
r"$f(t) = t$",
lambda t: t,
np.ones_like,
),
(
r"$f(t) = t^4 - \frac{1}{3}t^3$",
lambda t: t**4 - (t**3 / 3),
lambda t: 4 * t**3 - t**2,
),
(
r"$f(t) = \sin(t)$",
np.sin,
np.cos,
),
(
r"$f(t) = e^t$",
np.exp,
np.exp,
),
(
r"$f(t) = \frac{1}{1 + t}$",
lambda t: 1 / (t + 1),
lambda t: -1 / (t + 1) ** 2,
),
(
r"$f(t) = t - t^3 + \cos(t) - e^{t/2}$",
lambda t: t - t**3 + np.cos(t) - np.exp(t / 2),
lambda t: 1 - 3 * t**2 - np.sin(t) - np.exp(t / 2) / 2,
),
)
# Constructor -------------------------------------------------------------
def __init__(self, time_domain):
"""Set the time domain."""
if not isinstance(time_domain, np.ndarray) or time_domain.ndim != 1:
raise ValueError("time_domain must be a one-dimensional array")
self.__t = time_domain
# Properties --------------------------------------------------------------
@property
def time_domain(self):
"""Time domain of the snapshot data, a (k,) ndarray."""
return self.__t
def __str__(self):
"""String representation: class name, time domain."""
out = [self.__class__.__name__]
t = self.time_domain
out.append(f"time_domain: {t.size} entries in [{t.min()}, {t.max()}]")
return "\n ".join(out)
def __repr__(self):
"""Unique ID + string representation."""
return utils.str2repr(self)
# Main routine ------------------------------------------------------------
def _check_dimensions(self, states, inputs, check_against_time=True):
"""Check dimensions and alignment of the state and inputs."""
if states.ndim != 2:
raise errors.DimensionalityError("states must be two-dimensional")
if check_against_time and states.shape[-1] != self.time_domain.size:
raise errors.DimensionalityError(
"states not aligned with time_domain"
)
if inputs is not None:
if inputs.ndim == 1:
inputs = inputs.reshape((1, -1))
if inputs.shape[1] != states.shape[1]:
raise errors.DimensionalityError(
"states and inputs not aligned"
)
return states, inputs
[docs]
@abc.abstractmethod
def estimate(self, states, inputs=None):
"""Estimate the first time derivatives of the states.
Parameters
----------
states : (r, k) ndarray
State snapshots, either full or (preferably) reduced.
inputs : (m, k) ndarray or None
Inputs corresponding to the state snapshots, if applicable.
Returns
-------
_states : (r, k') ndarray
Subset of the state snapshots.
ddts : (r, k') ndarray
First time derivatives corresponding to ``_states``.
_inputs : (m, k') ndarray or None
Inputs corresponding to ``_states``, if applicable.
**Only returned** if ``inputs`` is provided.
"""
raise NotImplementedError # pragma: no cover
# Verification ------------------------------------------------------------
[docs]
def verify_shapes(self, r: int = 5, m: int = 3):
"""Verify that :meth:`estimate()` is consistent in the sense that the
all outputs have the same number of columns. This method does **not**
check the accuracy of the derivative estimation.
Parameters
----------
r : int
State dimension to use in the check.
m : int
Number of inputs to use in the check.
The ``inputs`` argument used to verify :meth:`estimate()` is
a two-dimensional array of shape ``(m, k)`` even if ``m = 1``,
where ``k`` is the size of the time domain.
"""
# Get random data. What matters here is the size, not the entries.
k = self.time_domain.size
Q = np.random.random((r, k))
U = np.random.random((m, k))
# Call estimate() with inputs=None.
outputs = self.estimate(Q)
if not isinstance(outputs, tuple) or len(outputs) != 2:
raise errors.VerificationError(
"len(estimate(states, inputs=None)) != 2"
)
_Q, _dQdt = outputs
if _Q.shape[0] != r:
raise errors.VerificationError(
"estimate(states)[0].shape[0] != states.shape[0]"
)
if _dQdt.shape[0] != r:
raise errors.VerificationError(
"estimate(states)[1].shape[0] != states.shape[0]"
)
if _Q.shape[1] != _dQdt.shape[1]:
print(_Q.shape[1], _dQdt.shape[1])
raise errors.VerificationError(
"Q.shape[1] != dQdt.shape[1] "
"where Q, dQdt = estimate(states, inputs=None)"
)
# Call estimate() with non-None inputs.
outputs = self.estimate(Q, U)
if not isinstance(outputs, tuple) or len(outputs) != 3:
raise errors.VerificationError(
"len(estimate(states, inputs)) != 3"
)
_Q, _dQdt, _U = outputs
if _U is None:
raise errors.VerificationError(
"estimates(states, inputs)[2] should not be None"
)
if _U.shape[0] != m:
raise errors.VerificationError(
"estimate(states, inputs)[2].shape[0] != inputs.shape[0]"
)
if _U.shape[1] != _Q.shape[1]:
raise errors.VerificationError(
"Q.shape[1] != U.shape[1] "
"where Q, _, U = estimate(states, inputs)"
)
print("estimate() output shapes are consistent")
[docs]
def verify(self, plot: bool = False, return_errors=False):
"""Verify that :meth:`estimate()` is consistent in the sense that the
all outputs have the same number of columns and test the accuracy of
the results on a few test problems.
Parameters
----------
plot : bool
If ``True``, plot the relative errors of the derivative estimation
errors as a function of the time step.
If ``False`` (default), print a report of the relative errors.
return_errors : bool
If ``True``, return the errors for each test as a dictionary.
If ``False`` (default), return nothing.
Returns
-------
errors : dict
Estimation errors for each test case.
Time steps are listed as ``errors[dts]``.
**Only returned** if ``return_errors=True``.
"""
self.verify_shapes()
time_domain = self.time_domain # Record original time domain.
dts = np.logspace(-12, -1, 12)[::-1]
estimation_errors = collections.defaultdict(list)
estimation_errors["dts"] = dts
t_base = np.arange(1001)
for dt in dts:
# Construct test cases.
t = 1 + (dt * t_base)
Q = np.array([test[1](t) for test in self.__tests])
dQdt = np.array([test[2](t) for test in self.__tests])
self.__t = t
# Call the derivative estimator.
Q_est, dQdt_est = self.estimate(Q, None)
# Use new states to infer the indices of the returns.
start = np.argmin(
la.norm(Q - Q_est[:, 0].reshape((-1, 1))), axis=0
)
s = slice(start, start + Q_est.shape[1])
truth = dQdt[:, s]
# Calculate the relative error of the time derivative estimate.
denom = la.norm(truth, axis=1)
errs = la.norm(dQdt_est - truth, axis=1) / denom
for test, err in zip(self.__tests, errs):
estimation_errors[test[0]].append(err)
if plot:
_, ax = plt.subplots(1, 1)
for test in self.__tests:
name = test[0]
ax.loglog(
dts,
estimation_errors[name],
".-",
linewidth=0.5,
markersize=5,
label=name,
)
ymin, ymax = ax.get_ylim()
ax.set_ylim(bottom=max(ymin, 1e-14), top=min(ymax, 10))
ax.set_xlabel("time step")
ax.set_ylabel("relative error")
ax.legend(ncols=2, frameon=False, fontsize="small")
ax.set_title("Time derivative estimation errors")
else:
print(
(title := "\nTime derivative estimation relative errors"),
"=" * len(title),
sep="\n",
)
for test in self.__tests:
name = test[0]
rawname = name.strip("$")
print(f"\n{rawname}", len(rawname) * "-", sep="\n")
for dt, err in zip(dts, estimation_errors[name]):
print(f"dt = {dt:.1e}:\terror = {err:.4e}")
self.__t = time_domain # Restore original time domain.
if return_errors:
return estimation_errors