Source code for opinf.models.mono._parametric

# models/mono/_parametric.py
"""Parametric monolithic dynamical systems models."""

__all__ = [
    "ParametricDiscreteModel",
    "ParametricContinuousModel",
    "InterpDiscreteModel",
    "InterpContinuousModel",
    # Deprecations:
    "InterpolatedDiscreteModel",
    "InterpolatedContinuousModel",
]

import warnings
import numpy as np
import scipy.interpolate as spinterpolate

from ._base import _Model
from ._nonparametric import (
    _FrozenDiscreteModel,
    _FrozenContinuousModel,
)
from ... import errors, utils
from ...operators import (
    OperatorTemplate,
    ParametricOperatorTemplate,
    InterpConstantOperator,
    InterpLinearOperator,
    InterpQuadraticOperator,
    InterpCubicOperator,
    InterpInputOperator,
    InterpStateInputOperator,
    _utils as oputils,
)


_operator_name2class = {
    OpClass.__name__: OpClass
    for OpClass in (
        InterpConstantOperator,
        InterpLinearOperator,
        InterpQuadraticOperator,
        InterpCubicOperator,
        InterpInputOperator,
        InterpStateInputOperator,
    )
}


# Base classes ================================================================
class _ParametricModel(_Model):
    """Base class for parametric monolithic models."""

    _ModelClass = NotImplemented  # Must be specified by child classes.

    # ModelClass properties ---------------------------------------------------
    @property
    def _LHS_ARGNAME(self):  # pragma: no cover
        """Name of LHS argument in fit(), e.g., "ddts"."""
        return self._ModelClass._LHS_ARGNAME

    @property
    def _LHS_LABEL(self):  # pragma: no cover
        """String representation of LHS, e.g., "dq / dt"."""
        return self._ModelClass._LHS_LABEL

    @property
    def _STATE_LABEL(self):  # pragma: no cover
        """String representation of state, e.g., "q(t)"."""
        return self._ModelClass._STATE_LABEL

    @property
    def _INPUT_LABEL(self):  # pragma: no cover
        """String representation of input, e.g., "u(t)"."""
        return self._ModelClass._INPUT_LABEL

    # Properties: operators ---------------------------------------------------
    _operator_abbreviations = dict()

    def _isvalidoperator(self, op):
        """All monolithic operators are allowed."""
        return isinstance(
            op,
            (
                OperatorTemplate,
                ParametricOperatorTemplate,
            ),
        )

    @staticmethod
    def _check_operator_types_unique(ops):
        """Raise a ValueError if any two operators represent the same kind
        of operation (e.g., two constant operators).
        """
        OpClasses = {
            (op._OperatorClass if oputils.is_parametric(op) else type(op))
            for op in ops
        }
        if len(OpClasses) != len(ops):
            raise ValueError("duplicate type in list of operators to infer")

    def _get_operator_of_type(self, OpClass):
        """Return the first operator in the model corresponding to the
        operator class ``OpClass``.
        """
        for op in self.operators:
            if oputils.is_parametric(op) and op._OperatorClass is OpClass:
                return op
            if oputils.is_nonparametric(op) and isinstance(op, OpClass):
                return op

    @property
    def operators(self):
        """Operators comprising the terms of the model."""
        return _Model.operators.fget(self)

    @operators.setter
    def operators(self, ops):
        """Set the operators."""
        _Model.operators.fset(self, ops)

        # Check at least one operator is parametric.
        parametric_operators = [
            op for op in self.operators if oputils.is_parametric(op)
        ]
        if len(parametric_operators) == 0:
            warnings.warn(
                "no parametric operators detected, "
                "consider using a nonparametric model class",
                errors.OpInfWarning,
            )

        # Check that not every operator is interpolated.
        if not isinstance(self, _InterpModel):
            interpolated_operators = [
                op for op in self.operators if oputils.is_interpolated(op)
            ]
            if len(interpolated_operators) == len(self.operators):
                warnings.warn(
                    "all operators interpolatory, "
                    "consider using an InterpModel class",
                    errors.OpInfWarning,
                )
        self._synchronize_parameter_dimensions()

    # Properties: dimensions --------------------------------------------------
    @property
    def parameter_dimension(self):
        r"""Dimension :math:`p` of a parameter vector :math:`\bfmu`."""
        return self.__p

    def _synchronize_parameter_dimensions(self, newdim=None):
        """Synchronize the parameter_dimension attribute for each operator."""
        # Get any non-None parameter dimensions and check for uniqueness.
        ps = {
            op.parameter_dimension
            for op in self.operators
            if oputils.is_parametric(op) and op.parameter_dimension is not None
        }
        if len(ps) > 1:
            raise errors.DimensionalityError(
                "operators not aligned "
                "(parameter_dimension must be the same for all operators)"
            )
        p = ps.pop() if len(ps) == 1 else None

        # Check operator parameter_dimension matches new parameter_dimension.
        if newdim is not None:
            if p is None:
                p = newdim
            if p != newdim:
                raise errors.DimensionalityError(
                    f"{p} = each operator.parameter_dimension != "
                    f"parameter dimension = {newdim}"
                )

        # Ensure all parametric operators have the same parameter_dimension.
        if p is not None:
            for op in self.operators:
                if (
                    oputils.is_parametric(op)
                    and op.parameter_dimension is None
                ):
                    op.parameter_dimension = p

        # Set the model's parameter_dimension to the same as the operators.
        self.__p = p

    # Fitting -----------------------------------------------------------------
    def _process_fit_arguments(self, parameters, states, lhs, inputs):
        """Prepare training data for Operator Inference by extracting
        dimensions, validating data sizes, and modifying the left-hand side
        data if there are any known operators.
        """
        # Clear non-intrusive operator data.
        self._clear()

        # Process parameters.
        if (dim := len(shape := np.shape(parameters))) == 1:
            p = 1
        elif dim == 2:
            p = shape[1]
        else:
            raise errors.DimensionalityError(
                "'parameters' must be a sequence of scalars or 1D arrays"
            )
        self._synchronize_parameter_dimensions(p)
        n_datasets = len(parameters)

        def _check_valid_dimension0(dataset, label):
            """Dimension 0 must be s (number of training parameters)."""
            if (datalen := len(dataset)) != n_datasets:
                raise errors.DimensionalityError(
                    f"len({label}) = {datalen} "
                    f"!= {n_datasets} = len(parameters)"
                )

        def _check_valid_dimension1(dataset, label):
            """Dimension 1 must be r (state dimensions)."""
            for i, subset in enumerate(dataset):
                if (dim := len(subset)) != (r := self.state_dimension):
                    raise errors.DimensionalityError(
                        f"len({label}[{i}]) = {dim} != {r} = r"
                    )

        # Process states, extract model dimension if needed.
        states = [np.atleast_2d(Q) for Q in states]
        if self.state_dimension is None:
            self.state_dimension = states[0].shape[0]
        _check_valid_dimension0(states, "states")
        _check_valid_dimension1(states, "states")

        def _check_valid_dimension2(dataset, label):
            """Dimension 2 must match across datasets (number of snapshots)."""
            for i, subset in enumerate(dataset):
                if (dim := subset.shape[-1]) != (k := states[i].shape[-1]):
                    raise errors.DimensionalityError(
                        f"{label}[{i}].shape[-1] = {dim} "
                        f"!= {k} = states[{i}].shape[-1]"
                    )

        # Process LHS.
        lhs = [np.atleast_2d(L) for L in lhs]
        _check_valid_dimension0(lhs, self._LHS_ARGNAME)
        _check_valid_dimension1(lhs, self._LHS_ARGNAME)
        _check_valid_dimension2(lhs, self._LHS_ARGNAME)

        # Process inputs, extract input dimension if needed.
        self._check_inputargs(inputs, "inputs")
        if self._has_inputs:
            inputs = [np.atleast_2d(U) for U in inputs]
            if not self.input_dimension:
                self.input_dimension = inputs[0].shape[0]
            _check_valid_dimension0(inputs, "inputs")
            for i, subset in enumerate(inputs):
                if (dim := subset.shape[0]) != (m := self.input_dimension):
                    raise errors.DimensionalityError(
                        f"inputs[{i}].shape[0] = {dim} != {m} = m"
                    )
            _check_valid_dimension2(inputs, "inputs")
        elif inputs is None:
            inputs = [None] * n_datasets

        # Subtract known operator evaluations from the LHS.
        for ell in self._indices_of_known_operators:
            op = self.operators[ell]
            _isparametric = oputils.is_parametric(op)
            for i, lhsi in enumerate(lhs):
                _args = [states[i], inputs[i]]
                if _isparametric:
                    _args.insert(0, parameters[i])
                lhs[i] = lhsi - op.apply(*_args)

        return parameters, states, lhs, inputs

    def _assemble_data_matrix(self, parameters, states, inputs):
        """Assemble the data matrix for operator inference."""
        blocks = []
        for i in self._indices_of_operators_to_infer:
            op = self.operators[i]
            if not oputils.is_parametric(op):
                block = np.hstack(
                    [op.datablock(Q, U) for Q, U in zip(states, inputs)]
                )
            else:
                block = op.datablock(parameters, states, inputs)
            blocks.append(block.T)
        return np.hstack(blocks)

    def _fit_solver(self, parameters, states, lhs, inputs=None):
        """Construct a solver for the operator inference least-squares
        regression."""
        (
            parameters_,
            states_,
            lhs_,
            inputs_,
        ) = self._process_fit_arguments(parameters, states, lhs, inputs)

        # Set training_parameters for interpolatory operators.
        for op in self.operators:
            if oputils.is_interpolated(op):
                op.set_training_parameters(parameters_)

        # Set up non-intrusive learning.
        D = self._assemble_data_matrix(parameters_, states_, inputs_)
        self.solver.fit(D, np.hstack(lhs_))
        self.__s = len(parameters_)

    def _extract_operators(self, Ohat):
        """Unpack the operator matrix and populate operator entries."""
        index = 0
        for i in self._indices_of_operators_to_infer:
            op = self.operators[i]
            if oputils.is_parametric(op):
                endex = index + op.operator_dimension(
                    self.__s, self.state_dimension, self.input_dimension
                )
                op.set_entries(Ohat[:, index:endex], fromblock=True)
            else:
                endex = index + op.operator_dimension(
                    self.state_dimension, self.input_dimension
                )
                op.set_entries(Ohat[:, index:endex])
            index = endex

    def refit(self):
        """Solve the Operator Inference regression using the data from the
        last :meth:`fit()` call, then extract the inferred operators.

        This method is useful for calibrating the model operators with
        different ``solver`` hyperparameters without reprocessing any training
        data. For example, if ``solver`` is an :class:`opinf.lstsq.L2Solver`,
        changing its ``regularizer`` attribute and calling this method solves
        the regression with the new regression value without re-factorizing the
        data matrix.
        """
        if self._fully_intrusive:
            warnings.warn(
                "all operators initialized explicitly, nothing to learn",
                errors.OpInfWarning,
            )
            return self

        # Execute non-intrusive learning.
        self._extract_operators(self.solver.solve())
        return self

    def fit(self, parameters, states, lhs, inputs=None):
        r"""Learn the model operators from data.

        The operators are inferred by solving the regression problem

        .. math::
           \min_{\Ophat}
           \sum_{i=1}^{s}\sum_{j=0}^{k_{i}-1}\left\|
           \Ophat(\qhat_{i,j}, \u_{i,j}; \bfmu_i) - \dot{\qhat}_{i,j}
           \right\|_2^2

        where
        :math:`\zhat = \Ophat(\qhat, \u)` is the model and

        * :math:`\qhat_j\in\RR^r` is a measurement of the state,
        * :math:`\u_j\in\RR^m` is a measurement of the input, and
        * :math:`\zhat_j\in\RR^r` is a measurement of the left-hand side
          of the model.

        The *operator matrix* :math:`\Ohat\in\RR^{r\times d(r,m)}` is such that
        :math:`\Ophat(\q,\u) = \Ohat\d(\qhat,\u)` for some data vector
        :math:`\d(\qhat,\u)\in\RR^{d(r,m)}`; the *data matrix*
        :math:`\D\in\RR^{k\times d(r,m)}` is given by
        :math:`[~\d(\qhat_0,\u_0)~~\cdots~~\d(\qhat_{k-1},\u_{k-1})~]\trp`.
        Finally,
        :math:`\Zhat = [~\zhat_0~~\cdots~~\zhat_{k-1}~]\in\RR^{r\times k}`.
        See the :mod:`opinf.operators` module for more explanation.

        The strategy for solving the regression, as well as any additional
        regularization or constraints, are specified by the ``solver`` in
        the constructor.

        Parameters
        ----------
        parameters : list of s (floats or (p,) ndarrays)
            Parameter values for which training data are available.
        states : list of s (r, k) ndarrays
            Snapshot training data. Each array ``states[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``states[i][:, j]`` is a single snapshot.
        lhs : list of s (r, k) ndarrays
            Left-hand side training data. Each array ``lhs[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``lhs[i][:, j]`` corresponds to the snapshot ``states[i][:, j]``.
            The interpretation of this argument depends on the setting:
            forcing data for steady-state problems, next iteration for
            discrete-time problems, and time derivatives of the state for
            continuous-time problems.
        inputs : list of s (m, k) or (k,) ndarrays, or None
            Input training data. Each array ``inputs[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``inputs[i][:, j]`` corresponds to the snapshot ``states[:, j]``.
            May be a two-dimensional array if :math:`m=1` (scalar input).

        Returns
        -------
        self
        """
        if self._fully_intrusive:
            warnings.warn(
                "all operators initialized explicitly, nothing to learn",
                errors.OpInfWarning,
            )
            return self

        self._fit_solver(parameters, states, lhs, inputs)
        return self.refit()

    # Parametric evaluation ---------------------------------------------------
    def evaluate(self, parameter):
        """Construct a nonparametric model by fixing the parameter value.

        Parameters
        ----------
        parameter : (p,) ndarray or float
            Parameter value at which to evaluate the model.

        Returns
        -------
        model : _NonparametricModel
            Nonparametric model of type ``ModelClass``.
        """
        return self._ModelClass(
            [
                op.evaluate(parameter) if oputils.is_parametric(op) else op
                for op in self.operators
            ]
        )

    def rhs(self, parameter, *args, **kwargs):
        r"""Evaluate the right-hand side of the model by applying each operator
        and summing the results.

        This is the function :math:`\Ophat(\qhat, \u; \bfmu)`
        where the model can be written as one of the following:

        * :math:`\ddt\qhat(t; \bfmu) = \Ophat(\qhat(t; \bfmu), \u(t); \bfmu)`
          (continuous time)
        * :math:`\qhat_{j+1}(\bfmu) = \Ophat(\qhat_j(\bfmu), \u_j; \bfmu)`
          (discrete time)
        * :math:`\hat{\mathbf{g}} = \Ophat(\qhat(\bmfu), \u; \bfmu)`
          (steady state)

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        args
            Positional arguments to ``ModelClass.rhs()``.
        kwargs
            Keyword arguments to ``ModelClass.rhs()``.

        Returns
        -------
        evaluation : (r,) ndarray
            Evaluation of the right-hand side of the model.

        Notes
        -----
        For repeated ``rhs()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.
        """
        return self.evaluate(parameter).rhs(*args, **kwargs)

    def jacobian(self, parameter, *args, **kwargs):
        r"""Construct and sum the state Jacobian of each model operator.

        This the derivative of the right-hand side of the model with respect
        to the state, i.e., the function
        :math:`\ddqhat\Ophat(\qhat, \u; \bmfu)`
        where the model can be written as one of the following:

        * :math:`\ddt\qhat(t; \bfmu) = \Ophat(\qhat(t; \bfmu), \u(t); \bfmu)`
          (continuous time)
        * :math:`\qhat(\bfmu)_{j+1} = \Ophat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)`
          (discrete time)
        * :math:`\hat{\mathbf{g}} = \Ophat(\qhat(\bfmu), \u; \bfmu)`
          (steady state)

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        args
            Positional arguments to ``ModelClass.jacobian()``.
        kwargs
            Keyword arguments to ``ModelClass.jacobian()``.

        Returns
        -------
        jac : (r, r) ndarray
            State Jacobian of the right-hand side of the model.

        Notes
        -----
        For repeated ``jacobian()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.
        """
        return self.evaluate(parameter).jacobian(*args, **kwargs)

    def predict(self, parameter, *args, **kwargs):
        r"""Solve the model at the given parameter value.

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        args
            Positional arguments to ``ModelClass.predict()``.
        kwargs
            Keyword arguments to ``ModelClass.predict()``.

        Notes
        -----
        For repeated ``predict()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.
        """
        return self.evaluate(parameter).predict(*args, **kwargs)


class _ParametricDiscreteMixin:
    """Mixin class for parametric models of discrete dynamical system."""

    _ModelClass = _FrozenDiscreteModel

    def fit(self, parameters, states, nextstates=None, inputs=None):
        r"""Learn the model operators from data.

        The operators are inferred by solving the regression problem

        .. math::
           \min_{\Ohat^{(1)},\ldots,\Ohat^{(s)}}
           \sum_{i=1}^{s}\sum_{j=0}^{k_{i}-1}\left\|
           \Ophat(\qhat_{i,j}, \u_{i,j}; \bfmu_i) - \zhat_{i,j}
           \right\|_2^2
           = \min_{\Ohat^{(1)},\ldots,\Ohat^{(s)}}
           \sum_{i=1}^{s}\left\|
           \D^{(i)}(\Ohat^{(i)})\trp
           - [~\zhat_{i,0}~~\cdots~~\zhat_{i,k_{i}-1}~]\trp
           \right\|_F^2

        where
        :math:`\zhat(\bfmu)_{j} = \Ophat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)
        = \Ohat(\bfmu)\d(\qhat(\bfmu)_{j}, \u_{j})` is the model and

        * :math:`\qhat_{i,j}\in\RR^r` is the :math:`j`-th measurement of the
          state corresponding to training parameter value :math:`\bfmu_i`,
        * :math:`\u_{i,j}\in\RR^m` is the :math:`j`-th measurement of the
          input corresponding to training parameter value :math:`\bfmu_i`,
        * :math:`\dot{qhat}_{i,j}\in\RR^r` is a measurement of the time
          derivative of the state corresponding to the state-input pair
          :math:`(\qhat_{i,j},\u_{i,j})`,
        * :math:`\Ohat^{(i)} = \Ohat(\bfmu)` is the operator matrix
          evaluated at training parameter value :math:`\bfmu_i`, and
        * :math:`\D^{(i)}` is the data matrix for data corresponding to
          training parameter value :math:`\bfmu_i`, given by
          :math:`[~\d(\qhat_{i,0},\u_{i,0})~~\cdots~~
          \d(\qhat_{i,k_i-1},\u_{i,k_i-1})~]\trp`.

        Because all operators in this model are interpolatory, the
        least-squares problem decouples into :math:`s` individual regressions.
        That is, for :math:`i = 1, \ldots, s`, we solve (independently) the
        regressions

        .. math::
           \min_{\Ohat^{(i)}}\left\|
           \D^{(i)}(\Ohat^{(i)})\trp
           - [~\zhat_{i,0}~~\cdots~~\zhat_{i,k_{i}-1}~]\trp
           \right\|_F^2

        and define the full operator matrix via elementwise interpolation,

        .. math::
           \Ohat(\bfmu) = \textrm{interpolate}(
           (\bfmu_1, \Ohat^{(i)}), \ldots, (\bfmu_s, \Ohat^{(s)}); \bfmu).

        The strategy for solving the regression, as well as any additional
        regularization or constraints, are specified by the ``solver`` in
        the constructor.

        Parameters
        ----------
        parameters : list of s scalars or (p,) 1D ndarrays
            Parameter values for which the operator entries are known
            or will be inferred from data.
        states : list of s (r, k_i) ndarrays
            Snapshot training data. Each array ``states[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``states[i][:, j]`` is a single snapshot.
        nextstates : list of s (r, k_i) ndarrays
            Next iteration training data. Each array ``nextstates[i]`` is the
            data corresponding to parameter value ``parameters[i]``; each
            column ``nextstates[i][:, j]`` is the iteration following
            ``states[i][:, j]``.
        inputs : list of s (m, k_i) or (k_i,) ndarrays, or None
            Input training data. Each array ``inputs[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``inputs[i][:, j]`` corresponds to the snapshot ``states[:, j]``.
            May be a two-dimensional array if :math:`m=1` (scalar input).

        Returns
        -------
        self
        """
        if nextstates is None:
            nextstates = [Q[:, 1:] for Q in states]
            states = [Q[:, :-1] for Q in states]
        if inputs is not None:
            inputs = [U[..., : Q.shape[1]] for U, Q in zip(inputs, states)]
        return super().fit(parameters, states, nextstates, inputs)

    def rhs(self, parameter, state, input_=None):
        r"""Evaluate the right-hand side of the model by applying each operator
        and summing the results.

        This is the function :math:`\fhat(\qhat, \u; \bfmu)`
        where the model is given by
        :math:`\qhat(\bfmu)_{j+1} = \fhat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)`.

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state : (r,) ndarray
            State vector :math:`\qhat`.
        input_ : (m,) ndarray or None
            Input vector :math:`\u`.

        Returns
        -------
        nextstate : (r,) ndarray
            Evaluation of the right-hand side of the model.

        Notes
        -----
        For repeated ``rhs()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.

        .. code-block::

           # Instead of this...
           >>> values = [parametric_model.rhs(parameter, q, input_)
           ...           for q in list_of_states]
           # ...it is faster to do this.
           >>> model_at_parameter = parametric_model.evaluate(parameter)
           >>> values = [model_at_parameter.rhs(parameter, q, input_)
           ...           for q in list_of_states]
        """
        return super().rhs(parameter, state, input_)

    def jacobian(self, parameter, state, input_=None):
        r"""Sum the state Jacobian of each model operator.

        This the derivative of the right-hand side of the model with respect
        to the state, i.e., the function :math:`\ddqhat\fhat(\qhat, \u; \bfmu)`
        where the model is given by
        :math:`\qhat(\bfmu)_{j+1} = \fhat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)`.

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state : (r,) ndarray
            State vector :math:`\qhat`.
        input_ : (m,) ndarray or None
            Input vector :math:`\u`.

        Returns
        -------
        jac : (r, r) ndarray
            State Jacobian of the right-hand side of the model.

        Notes
        -----
        For repeated ``jacobian()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.

        .. code-block::

           # Instead of this...
           >>> jacs = [parametric_model.jacobian(parameter, q, input_)
           ...         for q in list_of_states]
           # ...it is faster to do this.
           >>> model_at_parameter = parametric_model.evaluate(parameter)
           >>> jacs = [model_at_parameter.jacobian(q, input_)
           ...         for q in list_of_states]
        """
        return super().jacobian(parameter, state, input_)

    def predict(self, parameter, state0, niters, inputs=None):
        r"""Step forward the discrete dynamical system
        ``niters`` steps. Essentially, this amounts to the following.

        .. code-block:: python

           >>> states[:, 0] = state0
           >>> states[:, 1] = model.rhs(parameter, states[:, 0], inputs[:, 0])
           >>> states[:, 2] = model.rhs(parameter, states[:, 1], inputs[:, 1])
           ...                                     # Repeat `niters` times.

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state0 : (r,) ndarray
            Initial state.
        niters : int
            Number of times to step the system forward.
        inputs : (m, niters-1) ndarray or None
            Inputs for the next ``niters - 1`` time steps.

        Returns
        -------
        states : (r, niters) ndarray
            Solution to the system, including the initial condition ``state0``.

        Notes
        -----
        For repeated ``predict()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.
        """
        return super().predict(parameter, state0, niters, inputs=inputs)


class _ParametricContinuousMixin:
    """Mixin class for parametric models of system of ODEs."""

    _ModelClass = _FrozenContinuousModel

    def fit(self, parameters, states, ddts, inputs=None):
        r"""Learn the model operators from data.

        The operators are inferred by solving the regression problem

        .. math::
           \min_{\Ohat^{(1)},\ldots,\Ohat^{(s)}}
           \sum_{i=1}^{s}\sum_{j=0}^{k_{i}-1}\left\|
           \Ophat(\qhat_{i,j}, \u_{i,j}; \bfmu_i) - \dot{\qhat}_{i,j}
           \right\|_2^2
           = \min_{\Ohat^{(1)},\ldots,\Ohat^{(s)}}
           \sum_{i=1}^{s}\left\|
           \D^{(i)}(\Ohat^{(i)})\trp
           - [~\dot{\qhat}_{i,0}~~\cdots~~\dot{\qhat}_{i,k_{i}-1}~]\trp
           \right\|_F^2

        where
        :math:`\ddt\qhat(t; \bfmu) = \Ophat(\qhat(t; \bfmu), \u(t); \bfmu)
        = \Ohat(\bfmu)\d(\qhat(t; \bfmu), \u(t))` is the model and

        * :math:`\qhat_{i,j}\in\RR^r` is the :math:`j`-th measurement of the
          state corresponding to training parameter value :math:`\bfmu_i`,
        * :math:`\u_{i,j}\in\RR^m` is the :math:`j`-th measurement of the
          input corresponding to training parameter value :math:`\bfmu_i`,
        * :math:`\dot{qhat}_{i,j}\in\RR^r` is a measurement of the time
          derivative of the state :math:`\qhat_{i,j}`, i.e.,
          :math:`\dot{\qhat}_{i,j} = \ddt\qhat(t; \bfmu_i)\big|_{t=t_{i,j}}`
          where `\qhat_{i,j} = \qhat(t_{i,j};\bfmu_i)`,
        * :math:`\Ohat^{(i)} = \Ohat(\bfmu_i)` is the operator matrix
          evaluated at training parameter value :math:`\bfmu_i`, and
        * :math:`\D^{(i)}` is the data matrix for data corresponding to
          training parameter value :math:`\bfmu_i`, given by
          :math:`[~\d(\qhat_{i,0},\u_{i,0})~~\cdots~~
          \d(\qhat_{i,k_i-1},\u_{i,k_i-1})~]\trp`.

        Because all operators in this model are interpolatory, the
        least-squares problem decouples into :math:`s` individual regressions.
        That is, for :math:`i = 1, \ldots, s`, we solve (independently) the
        regressions

        .. math::
           \min_{\Ohat^{(i)}}\left\|
           \D^{(i)}(\Ohat^{(i)})\trp
           - [~\dot{\qhat}_{i,0}~~\cdots~~\dot{\qhat}_{i,k_{i}-1}~]\trp
           \right\|_F^2

        and define the full operator matrix via elementwise interpolation,

        .. math::
           \Ohat(\bfmu) = \textrm{interpolate}(
           (\bfmu_1, \Ohat^{(i)}), \ldots, (\bfmu_s, \Ohat^{(s)}); \bfmu).

        The strategy for solving the regression, as well as any additional
        regularization or constraints, are specified by the ``solver`` in
        the constructor.

        Parameters
        ----------
        parameters : list of s scalars or (p,) 1D ndarrays
            Parameter values for which the operator entries are known
            or will be inferred from data.
        states : list of s (r, k_i) ndarrays
            Snapshot training data. Each array ``states[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``states[i][:, j]`` is a single snapshot.
        ddts : list of s (r, k_i) ndarrays
            Snapshot time derivative data. Each array ``ddts[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``ddts[i][:, j]`` corresponds to the snapshot ``states[i][:, j]``.
        inputs : list of s (m, k_i) or (k_i,) ndarrays, or None
            Input training data. Each array ``inputs[i]`` is the data
            corresponding to parameter value ``parameters[i]``; each column
            ``inputs[i][:, j]`` corresponds to the snapshot
            ``states[i][:, j]``.
            May be a two-dimensional array if :math:`m=1` (scalar input).
            Only required if one or more model operators depend on inputs.

        Returns
        -------
        self
        """
        return super().fit(parameters, states, ddts, inputs=inputs)

    def rhs(self, t, parameter, state, input_func=None):
        r"""Evaluate the right-hand side of the model by applying each operator
        and summing the results.

        This is the right-hand side of the model, i.e., the function
        :math:`\fhat(\qhat(t), \u(t); \bfmu)` where the model is given by
        :math:`\ddt \qhat(t; \bfmu) = \fhat(\qhat(t), \u(t); \bfmu)`.

        Parameters
        ----------
        t : float
            Time :math:`t`, a scalar.
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state : (r,) ndarray
            State vector :math:`\qhat(t)` corresponding to time ``t``.
        input_func : callable(float) -> (m,), or None
            Input function that maps time ``t`` to the input vector
            :math:`\u(t)`.

        Returns
        -------
        dqdt : (r,) ndarray
            Evaluation of the right-hand side of the model.

        Notes
        -----
        For repeated ``rhs()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.

        .. code-block::

           # Instead of this...
           >>> values = [parametric_model.rhs(t, parameter, q, input_func)
           ...           for t, q in zip(times, states)]
           # ...it is faster to do this.
           >>> model_at_parameter = parametric_model.evaluate(parameter)
           >>> values = [model_at_parameter.rhs(t, parameter, q, input_func)
           ...           for t, q in zip(times, states)]
        """
        return super().rhs(parameter, t, state, input_func)

    def jacobian(self, t, parameter, state, input_func=None):
        r"""Sum the state Jacobian of each model operator.

        This the derivative of the right-hand side of the model with respect
        to the state, i.e., the function
        :math:`\ddqhat\fhat(\qhat(t), \u(t); \bfmu)` where the model is given
        by :math:`\ddt\qhat(t; \bfmu) = \fhat(\qhat(t), \u(t); \bfmu)`.

        Parameters
        ----------
        t : float
            Time :math:`t`, a scalar.
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state : (r,) ndarray
            State vector :math:`\qhat(t)` corresponding to time ``t``.
        input_func : callable(float) -> (m,), or None
            Input function that maps time ``t`` to the input vector
            :math:`\u(t)`.

        Returns
        -------
        jac : (r, r) ndarray
            State Jacobian of the right-hand side of the model.

        Notes
        -----
        For repeated ``jacobian()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.

        .. code-block::

           # Instead of this...
           >>> jacs = [parametric_model.jacobian(t, parameter, q, input_func)
           ...         for t, q in zip(times, states)]
           # ...it is faster to do this.
           >>> model_at_parameter = parametric_model.evaluate(parameter)
           >>> jacs = [model_at_parameter.jacobian(t, parameter, q, input_func)
           ...         for t, q in zip(times, states)]
        """
        return super().jacobian(parameter, t, state, input_func)

    def predict(self, parameter, state0, t, input_func=None, **options):
        r"""Solve the system of ordinary differential equations.
        This method wraps :func:`scipy.integrate.solve_ivp()`.

        Parameters
        ----------
        parameter : (p,) ndarray
            Parameter value :math:`\bfmu`.
        state0 : (r,) ndarray
            Initial state vector.
        t : (nt,) ndarray
            Time domain over which to integrate the model.
        input_func : callable or (m, nt) ndarray
            Input as a function of time (preferred) or the input values at the
            times ``t``. If given as an array, cubic spline interpolation on
            the known data points is used as needed.
        options
            Arguments for :func:`scipy.integrate.solve_ivp()`.
            Common options:

            * **method : str** ODE solver for the model.

              * ``'RK45'`` (default): Explicit Runge--Kutta method
                of order 5(4).
              * ``'RK23'``: Explicit Runge--Kutta method
                of order 3(2).
              * ``'Radau'``: Implicit Runge--Kutta method
                of the Radau IIA family of order 5.
              * ``'BDF'``: Implicit multi-step variable-order (1 to 5) method
                based on a backward differentiation formula for the derivative.
              * ``'LSODA'``: Adams/BDF method with automatic stiffness
                detection and switching.

            * **max_step : float** Maximimum allowed integration step size.

        Returns
        -------
        states : (r, nt) ndarray
            Computed solution to the system over the time domain ``t``.

        Notes
        -----
        For repeated ``predict()`` calls with the same parameter value, use
        :meth:`evaluate` to first get the nonparametric model corresponding
        to the parameter value.
        """
        return super().predict(
            parameter, state0, t, input_func=input_func, **options
        )


# Public classes ==============================================================
[docs] class ParametricDiscreteModel(_ParametricDiscreteMixin, _ParametricModel): r"""Parametric discrete dynamical system model :math:`\qhat(\bfmu)_{j+1} = \Ophat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)`. Here, * :math:`\qhat(\bfmu)_j\in\RR^{r}` is the :math:`j`-th iteration of the model state, * :math:`\u_j\in\RR^{m}` is the (optional) corresponding input, and * :math:`\bfmu\in\RR^{p}\in\RR^{p}` is the parameter vector. The structure of :math:`\Ophat` is specified through the ``operators`` attribute. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. """ pass
[docs] class ParametricContinuousModel(_ParametricContinuousMixin, _ParametricModel): r"""Parametric system of ordinary differential equations :math:`\ddt\qhat(t; \bfmu) = \fhat(\qhat(t; \bfmu), \u(t); \bfmu)`. Here, * :math:`\qhat(t;\bfmu)\in\RR^{r}` is the model state, * :math:`\u(t)\in\RR^{m}` is the (optional) input, and * :math:`\bfmu\in\RR^{p}\in\RR^{p}` is the parameter vector. The structure of :math:`\fhat` is specified through the ``operators`` argument. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. """ pass
# Special case: completely interpolation-based models ========================= class _InterpModel(_ParametricModel): """Base class for parametric monolithic models where all operators MUST be interpolation-based parametric operators. In this special case, the inference problems completely decouple by training parameter. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. solver : :mod:`opinf.lstsq` object or float > 0 or None Solver for the least-squares regression. Defaults: * ``None``: :class:`opinf.lstsq.PlainSolver`, SVD-based solve without regularization. * float > 0: :class:`opinf.lstsq.L2Solver`, SVD-based solve with scalar Tikhonov regularization. InterpolatorClass : type or None Class for the elementwise interpolation. Must obey the syntax .. code-block:: python >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) This can be, e.g., a class from :mod:`scipy.interpolate`. If ``None`` (default), use :class:`scipy.interpolate.CubicSpline` for one-dimensional parameters and :class:`scipy.interpolate.LinearNDInterpolator` otherwise. """ @property def _ModelFitClass(self): """Parent of ModelClass that has a callable ``fit()`` method.""" return self._ModelClass.__bases__[-1] def __init__(self, operators, solver=None, InterpolatorClass=None): """Define the model structure and set the interpolator class.""" _ParametricModel.__init__(self, operators, solver=solver) self.set_interpolator(InterpolatorClass) self._submodels = None self._training_parameters = None @classmethod def _from_models(cls, parameters, models, InterpolatorClass: type = None): """Interpolate a collection of non-parametric models. Parameters ---------- parameters : list of s scalars or (p,) 1D ndarrays Parameter values for which the operator entries are known. models : list of s :mod:`opinf.models` objects Nonparametric models with fully populated operator entries. InterpolatorClass : type or None Class for the elementwise interpolation. Must obey the syntax >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) This can be, e.g., a class from :mod:`scipy.interpolate`. If ``None`` (default), use :class:`scipy.interpolate.CubicSpline` for one-dimensional parameters and :class:`scipy.interpolate.LinearNDInterpolator` otherwise. """ # Check for consistency in the model operators. opclasses = [type(op) for op in models[0].operators] for mdl in models: # Operator count and type. if len(mdl.operators) != len(opclasses): raise ValueError( "models not aligned (inconsistent number of operators)" ) for ell, op in enumerate(mdl.operators): if not isinstance(op, opclasses[ell]): raise ValueError( "models not aligned (inconsistent operator types)" ) # Entries are set. mdl._check_is_trained() # Extract the operators from the individual models. return cls( operators=[ oputils.nonparametric_to_interpolated(OpClass)._from_operators( training_parameters=parameters, operators=[mdl.operators[ell] for mdl in models], InterpolatorClass=InterpolatorClass, ) for ell, OpClass in enumerate(opclasses) ], InterpolatorClass=InterpolatorClass, ) def set_interpolator(self, InterpolatorClass): """Set the interpolator for the operator entries. Parameters ---------- InterpolatorClass : type Class for the elementwise interpolation. Must obey the syntax >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) This can be, e.g., a class from :mod:`scipy.interpolate`. If ``None`` (default), use :class:`scipy.interpolate.CubicSpline` for one-dimensional parameters and :class:`scipy.interpolate.LinearNDInterpolator` otherwise. """ if InterpolatorClass is not None: for op in self.operators: op.set_interpolator(InterpolatorClass) self.__InterpolatorClass = InterpolatorClass # Properties: operators --------------------------------------------------- _operator_abbreviations = { "c": InterpConstantOperator, "A": InterpLinearOperator, "H": InterpQuadraticOperator, "G": InterpCubicOperator, "B": InterpInputOperator, "N": InterpStateInputOperator, } def _isvalidoperator(self, op): """Only interpolated parametric operators are allowed.""" return oputils.is_interpolated(op) # Fitting ----------------------------------------------------------------- def _assemble_data_matrix(self, *args, **kwargs): # pragma: no cover """Assemble the data matrix for operator inference.""" raise NotImplementedError( "_assemble_data_matrix() not used by this class" ) def _extract_operators(self, *args, **kwargs): # pragma: no cover """Unpack the operator matrix and populate operator entries.""" raise NotImplementedError( "_extract_operators() not used by this class" ) def _fit_solver(self, parameters, states, lhs, inputs=None): """Construct a solver for the operator inference least-squares regression. """ ( parameters_, states_, lhs_, inputs_, ) = self._process_fit_arguments(parameters, states, lhs, inputs) n_datasets = len(parameters) # Distribute training data to individual OpInf problems. nonparametric_models = [] for i in range(n_datasets): model_i = self._ModelFitClass( operators=[ op._OperatorClass( op.entries[i] if op.entries is not None else None ) for op in self.operators ], solver=self.solver.copy(), ) model_i._fit_solver( states_[i], lhs_[i], inputs_[i], ) nonparametric_models.append(model_i) self.solvers = [mdl.solver for mdl in nonparametric_models] self._submodels = nonparametric_models self._training_parameters = parameters_ def refit(self): """Evaluate the least-squares solver and process the results.""" if self._submodels is None: raise RuntimeError("model solvers not set, call fit() first") # Solve each independent subproblem. # TODO: parallelize? for submodel in self._submodels: submodel.refit() # Interpolate the resulting operators. for ell, op in enumerate(self.operators): op._clear() op.set_training_parameters(self._training_parameters) op.set_entries( [mdl.operators[ell].entries for mdl in self._submodels] ) return self # Model persistence ------------------------------------------------------- def save(self, savefile: str, overwrite: bool = False) -> None: """Serialize the model, saving it in HDF5 format. The model can be recovered with the :meth:`load()` class method. Parameters ---------- savefile : str File to save to, with extension ``.h5`` (HDF5). overwrite : bool If ``True`` and the specified ``savefile`` already exists, overwrite the file. If ``False`` (default) and the specified ``savefile`` already exists, raise an error. """ with utils.hdf5_savehandle(savefile, overwrite=overwrite) as hf: # Metadata. meta = hf.create_dataset("meta", shape=(0,)) meta.attrs["num_operators"] = len(self.operators) meta.attrs["r"] = ( int(self.state_dimension) if self.state_dimension else 0 ) meta.attrs["m"] = ( int(self.input_dimension) if self.input_dimension else 0 ) # Interpolator class. suppress_warnings = False InterpolatorClassName = ( "NoneType" if self.__InterpolatorClass is None else self.__InterpolatorClass.__name__ ) meta.attrs["InterpolatorClass"] = InterpolatorClassName if InterpolatorClassName != "NoneType" and not hasattr( spinterpolate, InterpolatorClassName ): warnings.warn( "cannot serialize InterpolatorClass " f"'{InterpolatorClassName}', must pass in the class " "when calling load()", errors.OpInfWarning, ) suppress_warnings = True # Operator data. hf.create_dataset( "indices_infer", data=self._indices_of_operators_to_infer ) hf.create_dataset( "indices_known", data=self._indices_of_known_operators ) with warnings.catch_warnings(): if suppress_warnings: warnings.simplefilter("ignore", errors.OpInfWarning) for i, op in enumerate(self.operators): op.save(hf.create_group(f"operator_{i}")) @classmethod def load(cls, loadfile: str, InterpolatorClass: type = None): """Load a serialized model from an HDF5 file, created previously from the :meth:`save()` method. Parameters ---------- loadfile : str Path to the file where the operator was stored via :meth:`save()`. InterpolatorClass : type Class for the elementwise interpolation. Must obey the syntax >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) Not required if the saved operator utilizes a class from :mod:`scipy.interpolate`. Returns ------- op : _Operator Initialized operator object. """ with utils.hdf5_loadhandle(loadfile) as hf: # Load metadata. num_operators = int(hf["meta"].attrs["num_operators"]) indices_infer = [int(i) for i in hf["indices_infer"][:]] indices_known = [int(i) for i in hf["indices_known"][:]] # Get the InterpolatorClass. SavedClassName = hf["meta"].attrs["InterpolatorClass"] if InterpolatorClass is None: # Load from scipy.interpolate. if hasattr(spinterpolate, SavedClassName): InterpolatorClass = getattr(spinterpolate, SavedClassName) elif SavedClassName != "NoneType": raise ValueError( f"unknown InterpolatorClass '{SavedClassName}', " f"call load({loadfile}, {SavedClassName})" ) else: # Warn the user if the InterpolatorClass does not match. if SavedClassName != ( InterpolatorClassName := InterpolatorClass.__name__ ): warnings.warn( f"InterpolatorClass={InterpolatorClassName} does not " f"match loadfile InterpolatorClass '{SavedClassName}'", errors.OpInfWarning, ) # Load operators. ops = [] for i in range(num_operators): gp = hf[f"operator_{i}"] OpClassName = gp["meta"].attrs["class"] ops.append( _operator_name2class[OpClassName].load( gp, InterpolatorClass ) ) # Construct the model. model = cls(ops) model._indices_of_operators_to_infer = indices_infer model._indices_of_known_operators = indices_known if r := int(hf["meta"].attrs["r"]): model.state_dimension = r if (m := int(hf["meta"].attrs["m"])) and model._has_inputs: model.input_dimension = m return model def copy(self): """Make a copy of the model.""" return self.__class__( operators=[op.copy() for op in self.operators], InterpolatorClass=self.__InterpolatorClass, )
[docs] class InterpDiscreteModel(_ParametricDiscreteMixin, _InterpModel): r"""Parametric discrete dynamical system model :math:`\qhat(\bfmu)_{j+1} = \fhat(\qhat(\bfmu)_{j}, \u_{j}; \bfmu)` where the parametric dependence is handled by elementwise interpolation. Here, * :math:`\qhat_j\in\RR^{r}` is the :math:`j`-th iteration of the model state, * :math:`\u_j\in\RR^{m}` is the (optional) corresponding input, and * :math:`\bfmu\in\RR^{p}\in\RR^{p}` is the parameter vector. The structure of :math:`\fhat` is specified through the ``operators`` attribute. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. For this class, these must be interpolated parametric operators. InterpolatorClass : type or None Class for the elementwise interpolation. Must obey the syntax >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) This can be, e.g., a class from :mod:`scipy.interpolate`. If ``None`` (default), use :class:`scipy.interpolate.CubicSpline` for one-dimensional parameters and :class:`scipy.interpolate.LinearNDInterpolator` otherwise. """ pass
[docs] class InterpContinuousModel( _ParametricContinuousMixin, _InterpModel, ): r"""Parametric system of ordinary differential equations :math:`\ddt\qhat(t; \bfmu) = \fhat(\qhat(t; \bfmu), \u(t); \bfmu)` where the parametric dependence is handled by elementwise interpolation. Here, * :math:`\qhat(t;\bfmu)\in\RR^{r}` is the model state, * :math:`\u(t)\in\RR^{m}` is the (optional) input, and * :math:`\bfmu\in\RR^{p}\in\RR^{p}` is the parameter vector. The structure of :math:`\fhat` is specified through the ``operators`` argument. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. For this class, these must be interpolated parametric operators. InterpolatorClass : type or None Class for the elementwise interpolation. Must obey the syntax >>> interpolator = InterpolatorClass(data_points, data_values) >>> interpolator_evaluation = interpolator(new_data_point) This can be, e.g., a class from :mod:`scipy.interpolate`. If ``None`` (default), use :class:`scipy.interpolate.CubicSpline` for one-dimensional parameters and :class:`scipy.interpolate.LinearNDInterpolator` otherwise. """ pass
# Deprecations ================================================================ class InterpolatedDiscreteModel(InterpDiscreteModel): def __init__(self, operators, solver=None, InterpolatorClass=None): warnings.warn( "InterpolatedDiscreteModel has been renamed " "and will be removed in an upcoming release, use " "InterpDiscreteModel", DeprecationWarning, ) InterpDiscreteModel.__init__( self, operators=operators, solver=solver, InterpolatorClass=InterpolatorClass, ) class InterpolatedContinuousModel(InterpContinuousModel): def __init__(self, operators, solver=None, InterpolatorClass=None): warnings.warn( "InterpolatedContinuousModel has been renamed " "and will be removed in an upcoming release, use " "InterpContinuousModel", DeprecationWarning, ) InterpContinuousModel.__init__( self, operators=operators, solver=solver, InterpolatorClass=InterpolatorClass, )