# rom/_bayes.py
"""Classes supporting Bayesian operator inference."""
__all__ = [
"OperatorPosterior",
"BayesianROM",
]
import warnings
import numpy as np
import scipy.linalg
import scipy.stats
from .. import errors, lstsq, post, utils
from ..models import _utils as modutils
from ._base import _identity
from ._nonparametric import ROM
VALID_SOLVERS = (
lstsq.L2Solver,
lstsq.L2DecoupledSolver,
lstsq.TikhonovSolver,
lstsq.TikhonovDecoupledSolver,
)
# Posterior ===================================================================
[docs]
class OperatorPosterior:
r"""Posterior distribution for operator matrices.
Operator inference models are uniquely determined by operator matrices
:math:`\Ohat\in\RR^{r\times d}` that concatenate the entries of all
operators in the model. For example, the time-continuous model
.. math::
\ddt\qhat(t) = \chat + \Ahat\qhat(t) + \Hhat[\qhat(t)\otimes\qhat(t)]
is uniquely determined by the operator matrix
.. math::
\Ohat = [~\chat~~\Ahat~~\Hhat~] \in \RR^{r \times d}.
Typical *deterministic* operator inference learns a single operator matrix
:math:`\Ohat` from state measurements, while *probabilistic* or *Bayesian*
operator inference constructs a distribution of operator matrices,
:math:`p(\Ohat)`. This class implements an operator matrix distribution
where the rows of :math:`\Ohat` are multivariate Normal (Gaussian) random
variables, i.e.,
.. math::
p(\ohat_{i}) = \mathcal{N}(\ohat_i\mid\bfmu_i,\bfSigma_i),
\\
\bfmu_i \in \RR^{d},
\quad
\bfSigma_i \in \RR^{d\times d},
\quad
i = 0, \ldots, r-1,
where :math:`\ohat_i \in \RR^{d}` is the :math:`i`-th row of :math:`\Ohat`.
The :class:`BayesianROM` class has a ``posterior`` attribute that is an
``OperatorPosterior`` object.
Parameters
----------
means : list of r (d,) ndarrays
Mean values for each row of the operator matrix.
precisions : list of r (d, d) ndarrays
**INVERSE** covariance matrices for each row of the operator matrix.
alreadyinverted : bool
If ``True``, assume ``precisions`` is the collection of covariance
matrices, not their inverses.
"""
def __init__(self, means, precisions, *, alreadyinverted=False):
"""Store and pre-process the distribution parameters."""
if (r := len(means)) != (_r2 := len(precisions)):
raise ValueError(f"len(means) = {r} != {_r2} = len(precisions)")
self.__r = r
self.__randomvariables = []
for i in range(self.__r):
# Verify dimensions.
mean_i, cov_i = means[i], precisions[i]
if not isinstance(mean_i, np.ndarray) or mean_i.ndim != 1:
raise ValueError(f"means[{i}] should be a 1D ndarray")
if not isinstance(cov_i, np.ndarray) or cov_i.ndim != 2:
raise ValueError(f"precisions[{i}] should be a 2D ndarray")
d = mean_i.shape[0]
if cov_i.shape != (d, d):
raise ValueError(f"means[{i}] and precisions[{i}] not aligned")
# Make a multivariate Normal distribution for this operator row.
if not alreadyinverted:
cov_i = scipy.stats.Covariance.from_precision(cov_i)
self.__randomvariables.append(
scipy.stats.multivariate_normal(mean=mean_i, cov=cov_i)
)
# If operator rows are all the same size, wrap rvs() output as array.
self.__rvsasarray = False
d = means[0].size
if all(mean.size == d for mean in means):
self.__rvsasarray = True
# Properties --------------------------------------------------------------
@property
def nrows(self) -> int:
"""Number of rows :math:`r` in the data matrix. This is also the state
dimension of the corresponding model.
"""
return self.__r
@property
def randomvariables(self) -> list:
"""Multivariate normal random variables for the rows of the operator
matrix.
"""
return self.__randomvariables
@property
def means(self) -> list:
r"""Mean vectors :math:`\bfmu_0,\ldots,\bfmu_{r-1}\in\RR^{d}` for the
rows of the operator matrix.
"""
return [rv.mean for rv in self.randomvariables]
@property
def covs(self) -> list:
r"""Covariance matrices
:math:`\bfSigma_0,\ldots,\bfSigma_{r-1}\in\RR^{d\times d}`
for the rows of the operator matrix.
"""
return [rv.cov for rv in self.randomvariables]
def __eq__(self, other):
if self.nrows != other.nrows:
return False
for m1, m2 in zip(self.means, other.means):
if m1.shape != m2.shape or not np.all(m1 == m2):
return False
for C1, C2 in zip(self.covs, other.covs):
if C1.shape != C2.shape or not np.all(C1 == C2):
return False
return True
# Random draws ------------------------------------------------------------
[docs]
def rvs(self):
r"""Draw a random operator matrix from the posterior operator
distribution.
Returns
-------
Ohat : (r, d) ndarray
Operator matrix sampled from :math:`p(\Ohat)`.
"""
ohats = [rv.rvs()[0] for rv in self.randomvariables]
return np.array(ohats) if self.__rvsasarray else ohats
# Model persistance -------------------------------------------------------
[docs]
def save(self, savefile, overwrite=True):
"""Save the posterior operator distribution.
Parameters
----------
savefile : str
File to save data to.
overwrite : bool
If False and ``savefile`` exists, raise an exception.
"""
with utils.hdf5_savehandle(savefile, overwrite) as hf:
hf.create_dataset("state_dimension", data=[self.nrows])
for i, (mean_i, cov_i) in enumerate(zip(self.means, self.covs)):
hf.create_dataset(f"means_{i}", data=mean_i)
hf.create_dataset(f"covs_{i}", data=cov_i)
[docs]
@classmethod
def load(cls, loadfile):
"""Load a previously saved posterior operator distribution.
Parameters
----------
loadfile : str
File to load data from.
"""
with utils.hdf5_loadhandle(loadfile) as hf:
r = int(hf["state_dimension"][0])
means = [hf[f"means_{i}"][:] for i in range(r)]
covs = [hf[f"covs_{i}"][:] for i in range(r)]
return cls(means, covs, alreadyinverted=True)
# Bayesian ROMs ===============================================================
class _BayesianROMMixin:
"""Mixin for ROM classes with a Bayesian operator posterior."""
def __init__(self):
self.__posterior = None
self._validate_model_solver(self.model)
@staticmethod
def _validate_model_solver(model):
if modutils.is_interpolatory(model):
raise AttributeError(
"Fully interpolatory parametric models are not supported "
"for Bayesian ROMs"
)
if not hasattr(model, "solver") or not isinstance(
model.solver, VALID_SOLVERS
):
types = ", ".join(f"lstsq.{s.__name__}" for s in VALID_SOLVERS)
raise AttributeError(
"'model' must have a 'solver' attribute "
f"of one of the following types: {types}"
)
@property
def posterior(self) -> OperatorPosterior:
"""Posterior distribution for the operator matrices."""
return self.__posterior
def _initialize_posterior(self):
"""Set the operator posterior if numerically possible."""
means, precisions = self.model.solver.posterior()
try:
self.__posterior = OperatorPosterior(means, precisions)
except np.linalg.LinAlgError as ex:
if ex.args[0] == "Matrix is not positive definite":
self.__posterior = None
def draw_operators(self):
"""Set the :attr:`model` operators to a new random draw from the
:attr:`posterior` operator distribution.
"""
self.model._extract_operators(self.posterior.rvs())
def fit_regselect_continuous(
self,
candidates: list,
train_time_domains: list,
parameters: list,
states: list,
ddts: list = None,
input_functions: list = None,
fit_transformer: bool = True,
fit_basis: bool = True,
regularizer_factory=None,
gridsearch_only: bool = False,
test_time_length: float = 0,
stability_margin: float = 5.0,
num_posterior_draws: int = 20,
test_cases: list = None,
verbose: bool = False,
**predict_options: dict,
):
if not self._iscontinuous:
raise AttributeError(
"this method is for time-continuous models only, "
"use fit_regselect_discrete()"
)
if parameters is None:
states, ddts, input_functions = self._fix_single_trajectory(
states, ddts, input_functions
)
# Validate arguments.
if np.isscalar(train_time_domains[0]):
train_time_domains = [train_time_domains] * len(states)
for t, Q in zip(train_time_domains, states):
if t.shape != (Q.shape[1],):
raise errors.DimensionalityError(
"train_time_domains and states not aligned"
)
if input_functions is not None:
if callable(input_functions): # one global input function.
input_functions = [input_functions] * len(states)
if not callable(input_functions[0]):
raise TypeError(
"argument 'input_functions' must be sequence of callables"
)
inputs = [ # evaluate the inputs over the time domain.
np.column_stack([u(tt) for tt in t])
for u, t in zip(input_functions, train_time_domains)
]
else:
inputs = None
if test_time_length < 0:
raise ValueError("argument 'test_time_length' must be nonnegative")
if regularizer_factory is None:
regularizer_factory = _identity
processed_test_cases = self._process_test_cases(
test_cases, utils.ContinuousRegTest
)
# Fit the model for the first time.
states = self._fit_and_return_training_data(
parameters=parameters,
states=states,
lhs=ddts,
inputs=inputs,
fit_transformer=fit_transformer,
fit_basis=fit_basis,
)
# Set up the regularization selection.
shifts, limits = self._get_stability_limits(states, stability_margin)
def unstable(_Q, ell, size):
"""Return ``True`` if the solution is unstable."""
if _Q.shape[-1] != size:
return True
return np.abs(_Q - shifts[ell]).max() > limits[ell]
# Extend the training time domains by the testing time length.
if test_time_length > 0:
time_domains = []
for t_train in train_time_domains:
dt = np.mean(np.diff(t_train))
t_test = t_train[-1] + np.linspace(
dt,
dt + test_time_length,
int(test_time_length / dt),
)
time_domains.append(np.concatenate(((t_train, t_test))))
else:
time_domains = train_time_domains
if input_functions is None:
input_functions = [None] * len(states)
loop_collections = [states, input_functions, time_domains]
if is_parametric := parameters is not None:
loop_collections.insert(0, parameters)
def update_model(reg_params):
"""Reset the regularizer and refit the model operators."""
self.model.solver.regularizer = regularizer_factory(reg_params)
self._initialize_posterior()
def training_error(reg_params):
"""Compute the training error for a single regularization
candidate by solving the model, checking for stability, and
comparing to available training data.
"""
update_model(reg_params)
if self.posterior is None:
return np.inf
# Pass stability checks.
for tcase in processed_test_cases:
for _ in range(num_posterior_draws):
self.draw_operators()
if not tcase.evaluate(self.model, **predict_options):
return np.inf
# Compute training error.
error = 0
for ell, entries in enumerate(zip(*loop_collections)):
if is_parametric:
params, Q, input_func, t = entries
predict_args = (params, Q[:, 0], t, input_func)
else:
Q, input_func, t = entries
predict_args = (Q[:, 0], t, input_func)
draws = []
trainsize = Q.shape[-1]
for _ in range(num_posterior_draws):
self.draw_operators()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
solution = self.model.predict(
*predict_args, **predict_options
)
if unstable(solution, ell, t.size):
return np.inf
draws.append(solution[:, :trainsize])
solution_train = np.mean(draws, axis=0)
error += post.Lp_error(Q, solution_train, t[:trainsize])[1]
return error / len(states)
best_regularization = utils.gridsearch(
training_error,
candidates,
gridsearch_only=gridsearch_only,
label="regularization",
verbose=verbose,
)
update_model(best_regularization)
return self
def fit_regselect_discrete(
self,
candidates: list,
parameters: list,
states: list,
inputs: list = None,
fit_transformer: bool = True,
fit_basis: bool = True,
regularizer_factory=None,
gridsearch_only: bool = False,
num_test_iters: int = 0,
stability_margin: float = 5.0,
num_posterior_draws: int = 20,
test_cases: list = None,
verbose: bool = False,
):
if self._iscontinuous:
raise AttributeError(
"this method is for fully discrete models only, "
"use fit_regselect_continuous()"
)
if parameters is None:
states, _, inputs = self._fix_single_trajectory(
states, None, inputs
)
# Validate arguments.
if num_test_iters < 0:
raise ValueError(
"argument 'num_test_iters' must be a nonnegative integer"
)
if inputs is not None:
if len(inputs) != len(states):
raise errors.DimensionalityError(
f"{len(states)} state trajectories but "
f"{len(inputs)} input trajectories detected"
)
for Q, U in zip(states, inputs):
if U.shape[-1] < Q.shape[1] + num_test_iters:
raise ValueError(
"argument 'inputs' must contain enough data for "
f"{num_test_iters} iterations after the training data"
)
if regularizer_factory is None:
regularizer_factory = _identity
processed_test_cases = self._process_test_cases(
test_cases, utils.DiscreteRegTest
)
# Fit the model for the first time.
states = self._fit_and_return_training_data(
parameters=parameters,
states=states,
lhs=None,
inputs=inputs,
fit_transformer=fit_transformer,
fit_basis=fit_basis,
)
# Set up the regularization selection.
shifts, limits = self._get_stability_limits(states, stability_margin)
def unstable(_Q, ell):
"""Return ``True`` if the solution is unstable."""
if np.isnan(_Q).any() or np.isinf(_Q).any():
return True
return np.any(np.abs(_Q - shifts[ell]).max() > limits[ell])
# Extend the iteration counts by the number of testing iterations.
num_iters = [Q.shape[-1] for Q in states]
if num_test_iters > 0:
num_iters = [n + num_test_iters for n in num_iters]
if inputs is None:
inputs = [None] * len(states)
loop_collections = [states, inputs, num_iters]
if is_parametric := parameters is not None:
loop_collections.insert(0, parameters)
def update_model(reg_params):
"""Reset the regularizer and refit the model operators."""
self.model.solver.regularizer = regularizer_factory(reg_params)
self._initialize_posterior()
def training_error(reg_params):
"""Compute the mean training error for a single regularization
candidate by solving the model, checking for stability, and
comparing to available training data.
"""
update_model(reg_params)
if self.posterior is None:
return np.inf
# Pass stability checks.
for tcase in processed_test_cases:
for _ in range(num_posterior_draws):
self.draw_operators()
if not tcase.evaluate(self.model):
return np.inf
# Solve the model, check for stability, and compute training error.
error = 0
for ell, entries in enumerate(zip(*loop_collections)):
if is_parametric:
params, Q, U, niter = entries
predict_args = (params, Q[:, 0], niter, U)
else:
Q, U, niter = entries
predict_args = (Q[:, 0], niter, U)
draws = []
for _ in range(num_posterior_draws):
self.draw_operators()
with warnings.catch_warnings():
warnings.simplefilter("ignore")
solution = self.model.predict(*predict_args)
if unstable(solution, ell):
return np.inf
draws.append(solution[:, : Q.shape[-1]])
error += post.frobenius_error(Q, np.mean(draws, axis=0))[1]
return error / len(states)
best_regularization = utils.gridsearch(
training_error,
candidates,
gridsearch_only=gridsearch_only,
label="regularization",
verbose=verbose,
)
update_model(best_regularization)
return self
[docs]
class BayesianROM(ROM, _BayesianROMMixin):
r"""Probabilistic nonparametric reduced-order model.
This class connects classes from the various submodules to form a complete
reduced-order modeling workflow for probabilistic models.
High-dimensional data
:math:`\to` transformed / preprocessed data
:math:`\to` compressed data
:math:`\to` low-dimensional probabilistic model.
Operator inference models are uniquely determined by operator matrices
:math:`\Ohat\in\RR^{r\times d}` that concatenate the entries of all
operators in the model. For example, the time-continuous model
.. math::
\ddt\qhat(t) = \chat + \Ahat\qhat(t) + \Hhat[\qhat(t)\otimes\qhat(t)]
is uniquely determined by the operator matrix
.. math::
\Ohat = [~\chat~~\Ahat~~\Hhat~] \in \RR^{r \times d}.
Typical *deterministic* operator inference learns a single operator matrix
:math:`\Ohat` from state measurements, while *probabilistic* or *Bayesian*
operator inference constructs a distribution of operator matrices,
:math:`p(\Ohat)`. This class solves a Bayesian linear inference to define
an :class:`OperatorPosterior` and facilitates sampling from the posterior.
See :cite:`guo2022bayesopinf`.
Parameters
----------
model : :mod:`opinf.models` object
Nonparametric system model, an instance of one of the following:
* :class:`opinf.models.ContinuousModel`
* :class:`opinf.models.DiscreteModel`
The model must have a ``solver`` of one of the following types:
* :class:`opinf.lstsq.L2Solver`
* :class:`opinf.lstsq.L2DecoupledSolver`
* :class:`opinf.lstsq.TikhonovSolver`
* :class:`opinf.lstsq.TikhonovDecoupledSolver`
lifter : :mod:`opinf.lift` object or None
Lifting transformation.
transformer : :mod:`opinf.pre` object or None
Preprocesser.
basis : :mod:`opinf.basis` object or None
Dimensionality reducer.
ddt_estimator : :mod:`opinf.ddt` object or None
Time derivative estimator.
Ignored if ``model`` is not time continuous.
Notes
-----
The ``operators`` attribute of the :attr:`model` represents a single draw
from the operator distribution and is modified every time
:meth:`draw_operators` or :meth:`predict` are called.
"""
def __init__(
self,
model,
*,
lifter=None,
transformer=None,
basis=None,
ddt_estimator=None,
):
ROM.__init__(
self,
model,
lifter=lifter,
transformer=transformer,
basis=basis,
ddt_estimator=ddt_estimator,
)
_BayesianROMMixin.__init__(self)
[docs]
def fit(
self,
states,
lhs=None,
inputs=None,
fit_transformer: bool = True,
fit_basis: bool = True,
):
ROM.fit(
self,
states=states,
lhs=lhs,
inputs=inputs,
fit_transformer=fit_transformer,
fit_basis=fit_basis,
)
self._initialize_posterior()
[docs]
def fit_regselect_continuous(
self,
candidates: list,
train_time_domains: list,
states: list,
ddts: list = None,
input_functions: list = None,
fit_transformer: bool = True,
fit_basis: bool = True,
regularizer_factory=None,
gridsearch_only: bool = False,
test_time_length: float = 0,
stability_margin: float = 5,
num_posterior_draws: int = 20,
test_cases: list = None,
verbose: bool = False,
**predict_options: dict,
):
"""Calibrate the time-continuous model to training data, selecting the
regularization hyperparameter(s) that minimize the sample mean training
error while maintaining stability over the testing regime.
This method requires the :attr:`model` to be time-continuous; use
:meth:`fit_regselect_discrete` for discrete models. The
``model.solver.regularizer`` is repeatedly adjusted, and the operator
:attr:`posterior` is recalibrated, until a best regularization is
selected. Training error is measured by comparing training data to the
sample mean of ``num_posterior_draws`` model predictions. Stability is
required for each of the individual model predictions.
See :cite:`mcquarrie2021combustion,guo2022bayesopinf`.
Parameters
----------
candidates : list of regularization hyperparameters
Regularization hyperparameters to check before carrying out a
derivative-free optimization.
train_time_domains : list of s (k_i,) ndarrays
Time domain corresponding to the training states.
states : list of s (n, k_i) ndarrays
State snapshots in the original state space. Each array
``states[i]`` is data corresponding to a different trajectory;
each column ``states[i][:, j]`` is one snapshot.
ddts : list of s (n, k_i) ndarrays or None
Snapshot time derivative data. Each array ``ddts[i]`` are the time
derivatives of ``states[i]``; each column ``ddts[i][:, j]``
corresponds to the snapshot ``states[i][:, j]``. If ``None``
(default), these are estimated using :attr:`ddt_estimator`.
input_functions : list of s callables or None
Input functions mapping time to input vectors. Only required if the
:attr:`model` takes external inputs. Each ``input_functions[i]``
is the function corresponding to ``states[i]``, and
``input_functions[i](train_time_domains[i][j])`` is the input
vector corresponding to the snapshot ``states[i][:, j]``.
fit_transformer : bool
If ``True`` (default), calibrate the preprocessing transformation
using the ``states``.
If ``False``, assume the transformer is already calibrated.
fit_basis : bool
If ``True`` (default), calibrate the high-to-low dimensional
mapping using the ``states``.
If ``False``, assume the basis is already calibrated.
regularizer_factory : callable or None
Function mapping regularization hyperparameters to the full
regularizer. Specifically, ``regularizer_factory(candidates[i])``
will be assigned to ``model.solver.regularizer`` for each ``i``.
If ``None`` (default), set ``regularizer_factory()`` to the
identity function.
gridsearch_only : bool
If ``True``, stop after checking all regularization ``candidates``
and do not follow up with optimization.
test_time_length : float or None
Amount of time after the training regime in which to require model
stability.
stability_margin : float
Factor by which the predicted reduced states may deviate from the
range of the training reduced states without the trajectory being
classified as unstable.
num_posterior_draws : int
Number of draws from the operator :attr:`posterior` for stability
checks and for estimating the sample mean of model predictions.
test_cases : list of ContinuousRegTest objects
Additional test cases for which the model is required to be stable.
See :class:`opinf.utils.ContinuousRegTest`.
verbose : bool
If ``True``, print information during the regularization selection.
predict_options : dict or None
Extra arguments for :meth:`opinf.models.ContinuousModel.predict`.
Notes
-----
If there is only one trajectory of training data (s = 1), ``states``
may be provided as an (n, k) ndarray. In this case, it is assumed that
``ddts`` (if provided) is an (n, k) ndarray.
The ``train_time_domains`` may be a single one-dimensional array, in
which case it is assumed that each trajectory ``states[i]`` corresponds
to the same time domain. Similarly, if ``input_functions`` is a single
callable, it is assumed to be the input function for each trajectory.
"""
return _BayesianROMMixin.fit_regselect_continuous(
self,
candidates=candidates,
train_time_domains=train_time_domains,
parameters=None,
states=states,
ddts=ddts,
input_functions=input_functions,
fit_transformer=fit_transformer,
fit_basis=fit_basis,
regularizer_factory=regularizer_factory,
gridsearch_only=gridsearch_only,
test_time_length=test_time_length,
stability_margin=stability_margin,
num_posterior_draws=num_posterior_draws,
test_cases=test_cases,
verbose=verbose,
**predict_options,
)
[docs]
def fit_regselect_discrete(
self,
candidates: list,
states: list,
inputs: list = None,
fit_transformer: bool = True,
fit_basis: bool = True,
regularizer_factory=None,
gridsearch_only: bool = False,
num_test_iters: int = 0,
stability_margin: float = 5,
num_posterior_draws: int = 20,
test_cases: list = None,
verbose: bool = False,
):
"""Calibrate the fully discrete model to training data, selecting the
regularization hyperparameter(s) that minimize the sample mean training
error while maintaining stability over the testing regime.
This method requires the :attr:`model` to be fully discrete; use
:meth:`fit_regselect_continuous` for time-continuous models. The
``model.solver.regularizer`` is repeatedly adjusted, and the operator
:attr:`posterior` is recalibrated, until a best regularization is
selected. Training error is measured by comparing training data to the
sample mean of ``num_posterior_draws`` model predictions. Stability is
required for each of the individual model predictions.
See :cite:`mcquarrie2021combustion,guo2022bayesopinf`.
Parameters
----------
candidates : list of regularization hyperparameters
Regularization hyperparameters to check. If a single hyperparameter
is given, use it as the start of an optimization-based search.
states : list of s (n, k_i) ndarrays
State snapshots in the original state space. Each array
``states[i]`` is data corresponding to a different trajectory;
each column ``states[i][:, j]`` is one snapshot. This method
assumes the snapshots are sequential, i.e., the model maps
``states[i][:, j]`` to ``states[i][:, j+1]``.
states : list of s (r, k_i) ndarrays
State snapshots in the reduced state space. This method assumes
the snapshots are sequential, i.e., the model maps
``states[i][:, j]`` to ``states[i][:, j+1]``.
inputs : list of s (m, k_i + num_test_iters) ndarrays
Inputs corresponding to the training data, together with inputs
for the testing regime. Only required if the :attr:`model` takes
external inputs.
regularizer_factory : callable or None
Function mapping regularization hyperparameters to the full
regularizer. Specifically, ``regularizer_factory(candidates[i])``
will be assigned to ``model.solver.regularizer`` for each ``i``.
gridsearch_only : bool
If ``True``, stop after checking all regularization ``candidates``
and do not follow up with optimization.
num_test_iters : int
Number of iterations after the training data in which to require
model stability.
stability_margin : float,
Factor by which the reduced states may deviate from the range of
the training data without being flagged as unstable.
num_posterior_draws : int
Number of draws from the operator :attr:`posterior` for stability
checks and for estimating the sample mean of model predictions.
test_cases : list of DiscreteRegTest objects
Additional test cases for which the model is required to be stable.
See :class:`opinf.utils.DiscreteRegTest`.
verbose : bool
If ``True``, print information during the regularization selection.
Notes
-----
If there is only one trajectory of training data (s = 1), ``states``
may be provided as an (n, k) ndarray. In this case, it is assumed that
``inputs`` (if provided) is a single (m, k + num_test_iters) ndarray.
"""
return _BayesianROMMixin.fit_regselect_discrete(
self,
candidates=candidates,
parameters=None,
states=states,
inputs=inputs,
fit_transformer=fit_transformer,
fit_basis=fit_basis,
regularizer_factory=regularizer_factory,
gridsearch_only=gridsearch_only,
num_test_iters=num_test_iters,
stability_margin=stability_margin,
num_posterior_draws=num_posterior_draws,
test_cases=test_cases,
verbose=verbose,
)
[docs]
def predict(self, state0, *args, **kwargs):
"""Draw from the operator posterior and evaluate the resulting model.
Arguments are the same as the ``predict()`` method of :attr:`model`.
Parameters
----------
state0 : (n,) ndarray
Initial state, expressed in the original state space.
*args : list
Other positional arguments to the ``predict()`` method of
:attr:`model`.
**kwargs : dict
Keyword arguments to the ``predict()`` method of :attr:`model`.
Returns
-------
states: (n, k) ndarray
Solution to the drawn model, expressed in the original state space.
"""
self.draw_operators()
return ROM.predict(self, state0, *args, **kwargs)