Source code for opinf.models.mono._nonparametric

# models/mono/_nonparametric.py
"""Nonparametric monolithic dynamical systems models."""

__all__ = [
    # "SteadyModel",
    "DiscreteModel",
    "ContinuousModel",
]

import abc
import warnings
import numpy as np
import scipy.integrate as spintegrate
import scipy.interpolate as spinterpolate

from ._base import _Model
from ... import errors, utils
from ...operators import (
    ConstantOperator,
    LinearOperator,
    QuadraticOperator,
    CubicOperator,
    InputOperator,
    StateInputOperator,
    _utils as oputils,
)


_operator_name2class = {
    OpClass.__name__: OpClass
    for OpClass in (
        ConstantOperator,
        LinearOperator,
        QuadraticOperator,
        CubicOperator,
        InputOperator,
        StateInputOperator,
    )
}


# Base class ==================================================================
class _NonparametricModel(_Model):
    """Base class for nonparametric monolithic models.

    Parent class: :class:`opinf.models.mono._base._Model`

    Child classes:

    * :class:`opinf.models.DiscreteModel`
    * :class:`opinf.models.ContinuousModel`
    """

    _LHS_ARGNAME = "lhs"  # Name of LHS argument in fit(), e.g., "ddts".
    _LHS_LABEL = None  # String representation of LHS, e.g., "dq / dt".
    _STATE_LABEL = None  # String representation of state, e.g., "q(t)".
    _INPUT_LABEL = None  # String representation of input, e.g., "u(t)".

    # Properties: operators ---------------------------------------------------
    _operator_abbreviations = {
        "c": ConstantOperator,
        "A": LinearOperator,
        "H": QuadraticOperator,
        "G": CubicOperator,
        "B": InputOperator,
        "N": StateInputOperator,
    }

    @staticmethod
    def _isvalidoperator(op):
        """Return True if and only if ``op`` is a valid operator object
        for this class of model.
        """
        return oputils.is_nonparametric(op)

    @staticmethod
    def _check_operator_types_unique(ops):
        """Raise a ValueError if any two operators represent the same kind
        of operation (e.g., two constant operators).
        """
        if len({type(op) for op in ops}) != len(ops):
            raise ValueError("duplicate type in list of operators to infer")

    def _get_operator_of_type(self, OpClass):
        """Return the first operator of type ``OpClass``."""
        for op in self.operators:
            if isinstance(op, OpClass):
                return op

    # String representation ---------------------------------------------------
    def __str__(self):
        """String representation: structure of the model, dimensions, etc."""
        terms = [
            op._str(self._STATE_LABEL, self._INPUT_LABEL)
            for op in self.operators
        ]

        out = [
            self.__class__.__name__,
            f"structure: {self._LHS_LABEL} = " + " + ".join(terms),
            f"state_dimension: {self.state_dimension}",
            f"input_dimension: {self.input_dimension}",
            "operators:",
        ]
        for op in self.operators:
            out.append("  " + "\n    ".join(str(op).split("\n")))
        out.append("solver: " + "\n  ".join(str(self.solver).split("\n")))

        return "\n  ".join(out)

    def __repr__(self):
        """Unique ID + string representation."""
        return utils.str2repr(self)

    # Properties: operator inference ------------------------------------------
    @property
    def operator_matrix(self):
        r""":math:`r \times d(r, m)` operator matrix, e.g.,
        :math:`\Ohat = [~\chat~~\Ahat~~\Hhat~~\Bhat~]`.

        This matrix **does not** includes the entries of any operators whose
        entries are known *a priori*.
        """
        self._check_is_trained()
        return np.column_stack(
            [
                self.operators[i].entries
                for i in self._indices_of_operators_to_infer
            ]
        )

    # Fitting -----------------------------------------------------------------
    def _process_fit_arguments(self, states, lhs, inputs):
        """Prepare training data for Operator Inference by extracting
        dimensions, validating data sizes, and modifying the left-hand side
        data if there are any known operators.
        """
        # Clear non-intrusive operator data.
        self._clear()

        def _check_valid_dimension0(dataset, label):
            """Dimension 0 must be r (state dimensions)."""
            if (dim := dataset.shape[0]) != self.state_dimension:
                raise errors.DimensionalityError(
                    f"{label}.shape[0] = {dim} != r = {self.state_dimension}"
                )

        # Process states, extract model dimension if needed.
        states = np.atleast_2d(states)
        if self.state_dimension is None:
            self.state_dimension = states.shape[0]
        _check_valid_dimension0(states, "states")

        def _check_valid_dimension1(dataset, label):
            """Dimension 1 must be k (number of snapshots)."""
            if (dim := dataset.shape[1]) != (k := states.shape[1]):
                raise errors.DimensionalityError(
                    f"{label}.shape[-1] = {dim} != {k} = states.shape[-1]"
                )

        # Process LHS.
        lhs = np.atleast_2d(lhs)
        _check_valid_dimension0(lhs, self._LHS_ARGNAME)
        _check_valid_dimension1(lhs, self._LHS_ARGNAME)

        # Process inputs, extract input dimension if needed.
        self._check_inputargs(inputs, "inputs")
        if self._has_inputs:
            inputs = np.atleast_2d(inputs)
            if not self.input_dimension:
                self.input_dimension = inputs.shape[0]
            if inputs.shape[0] != self.input_dimension:
                raise errors.DimensionalityError(
                    f"inputs.shape[0] = {inputs.shape[0]} "
                    f"!= {self.input_dimension} = m"
                )
            _check_valid_dimension1(inputs, "inputs")

        # Subtract known operator evaluations from the LHS.
        for i in self._indices_of_known_operators:
            lhs = lhs - self.operators[i].apply(states, inputs)

        return states, lhs, inputs

    def _assemble_data_matrix(self, states, inputs):
        r"""Construct the Operator Inference data matrix :math:`\D`
        from state snapshots and/or input data.

        For example, if the model has the structure
        .. math::
            \ddt\qhat(t)
            = \chat + \Ahat\qhat(t)
            + \Hhat[\qhat(t)\otimes\qhat(t)] + \Bhat\u(t),

        then the data matrix is
        :math:`\D = [~
        \mathbf{1}~~
        \widehat{\Q}\trp~~
        (\widehat{\Q}\odot\widehat{\Q})\trp~~
        \U\trp~]`,

        where :math:`\widehat{\Q}` is ``states``
        and :math:`\U` is `inputs`.

        Parameters
        ----------
        states : (r, k) ndarray
            Snapshot training data.
        inputs : (m, k) ndarray or None
            Inputs corresponding to the snapshots.

        Returns
        -------
        D : (k, d(r, m)) ndarray
            Operator Inference data matrix.
        """
        return np.hstack(
            [
                self.operators[i].datablock(states, inputs).T
                for i in self._indices_of_operators_to_infer
            ]
        )

    def _extract_operators(self, Ohat):
        """Extract and save the inferred operators from the solution to the
        Operator Inference regression problem.

        Parameters
        ----------
        Ohat : (r, d(r, m)) ndarray
            Matrix of operator entries, concatenated horizontally.
        """
        index = 0
        for i in self._indices_of_operators_to_infer:
            endex = index + self.operators[i].operator_dimension(
                self.state_dimension, self.input_dimension
            )
            self.operators[i].set_entries(Ohat[:, index:endex])
            index = endex

    def _fit_solver(self, states, lhs, inputs=None):
        """Construct a solver object mapping the regularizer to solutions
        of the Operator Inference least-squares problem.
        """
        # Set up non-intrusive learning.
        states_, lhs_, inputs_ = self._process_fit_arguments(
            states, lhs, inputs
        )
        D = self._assemble_data_matrix(states_, inputs_)
        self.solver.fit(D, lhs_)

    def refit(self):
        """Solve the Operator Inference regression using the data from the
        last :meth:`fit()` call, then extract the inferred operators.

        This method is useful for calibrating the model operators with
        different ``solver`` hyperparameters without reprocessing any training
        data. For example, if ``solver`` is an :class:`opinf.lstsq.L2Solver`,
        changing its ``regularizer`` attribute and calling this method solves
        the regression with the new regression value without re-factorizing the
        data matrix.
        """
        # Fully intrusive case (nothing to learn).
        if self._fully_intrusive:
            warnings.warn(
                "all operators initialized explicitly, nothing to learn",
                errors.OpInfWarning,
            )
            return self

        # Execute non-intrusive learning.
        self._extract_operators(self.solver.solve())

    def fit(self, states, lhs, inputs=None):
        r"""Learn the model operators from data.

        The operators are inferred by solving the regression problem

        .. math::
           \min_{\Ohat}\sum_{j=0}^{k-1}\left\|
           \fhat(\qhat_j, \u_j) - \zhat_j
           \right\|_2^2
           = \min_{\Ohat}\left\|\D\Ohat\trp - \dot{\Qhat}\trp\right\|_F^2

        where
        :math:`\zhat = \fhat(\qhat, \u)` is the model and

        * :math:`\qhat_j\in\RR^r` is a measurement of the state,
        * :math:`\u_j\in\RR^m` is a measurement of the input, and
        * :math:`\zhat_j\in\RR^r` is a measurement of the left-hand side
          of the model.

        The *operator matrix* :math:`\Ohat\in\RR^{r\times d(r,m)}` is such that
        :math:`\fhat(\q,\u) = \Ohat\d(\qhat,\u)` for some data vector
        :math:`\d(\qhat,\u)\in\RR^{d(r,m)}`; the *data matrix*
        :math:`\D\in\RR^{k\times d(r,m)}` is given by
        :math:`[~\d(\qhat_0,\u_0)~~\cdots~~\d(\qhat_{k-1},\u_{k-1})~]\trp`.
        Finally,
        :math:`\Zhat = [~\zhat_0~~\cdots~~\zhat_{k-1}~]\in\RR^{r\times k}`.
        See the :mod:`opinf.operators` module for more explanation.

        The strategy for solving the regression, as well as any additional
        regularization or constraints, are specified by the ``solver`` set
        in the constructor.

        Parameters
        ----------
        states : (r, k) ndarray
            Snapshot training data. Each column is a single snapshot.
        lhs : (r, k) ndarray
            Left-hand side training data. Each column ``lhs[:, j]``
            corresponds to the snapshot ``states[:, j]``.
            The interpretation of this argument depends on the setting:
            forcing data for steady-state problems, next iteration for
            discrete-time problems, and time derivatives of the state for
            continuous-time problems.
        inputs : (m, k) or (k,) ndarray or None
            Input training data. Each column ``inputs[:, j]`` corresponds
            to the snapshot ``states[:, j]``.
            May be a one-dimensional array if ``m=1`` (scalar input).

        Returns
        -------
        self
        """
        # Fully intrusive case (nothing to learn).
        if self._fully_intrusive:
            warnings.warn(
                "all operators initialized explicitly, nothing to learn",
                errors.OpInfWarning,
            )
            return self

        self._fit_solver(states, lhs, inputs)
        self.refit()
        return self

    # Model evaluation --------------------------------------------------------
    def rhs(self, state, input_=None):
        r"""Evaluate the right-hand side of the model by applying each operator
        and summing the results.

        This is the function :math:`\Ophat(\qhat, \u)`
        where the model can be written as one of the following:

        * :math:`\ddt\qhat(t) = \Ophat(\qhat(t), \u(t))` (continuous time)
        * :math:`\qhat_{j+1} = \Ophat(\qhat_j, \u_j)` (discrete time)
        * :math:`\widehat{\mathbf{g}} = \Ophat(\qhat, \u)` (steady state)

        Parameters
        ----------
        state : (r,) ndarray
            State vector.
        input_ : (m,) ndarray or None
            Input vector corresponding to the state.

        Returns
        -------
        out : (r,) ndarray
            Evaluation of the right-hand side of the model.
        """
        out = np.zeros_like(state)
        for op in self.operators:
            out += op.apply(state, input_)
        return out

    def jacobian(self, state, input_=None):
        r"""Construct and sum the state Jacobian of each model operator.

        This the derivative of the right-hand side of the model with respect
        to the state, i.e., the function :math:`\ddqhat\Ophat(\qhat, \u)`
        where the model can be written as one of the following:

        * :math:`\ddt\qhat(t) = \Ophat(\qhat(t), \u(t))` (continuous time)
        * :math:`\qhat_{j+1} = \Ophat(\qhat_{j}, \u_{j})` (discrete time)
        * :math:`\widehat{\mathbf{g}} = \Ophat(\qhat, \u)` (steady state)

        Parameters
        ----------
        state : (r,) ndarray
            State vector :math:`\qhat`.
        input_ : (m,) ndarray or None
            Input vector :math:`\u`.

        Returns
        -------
        jac : (r, r) ndarray
            State Jacobian of the right-hand side of the model.
        """
        r = self.state_dimension
        out = np.zeros_like(state, shape=(r, r))
        for op in self.operators:
            out += op.jacobian(state, input_)
        return out

    @abc.abstractmethod
    def predict(*args, **kwargs):  # pragma: no cover
        """Solve the model under specified conditions."""
        raise NotImplementedError

    # Model persistence -------------------------------------------------------
    def save(self, savefile, overwrite=False):
        """Serialize the model, saving it in HDF5 format.
        The model can be recovered with the :meth:`load()` class method.

        Parameters
        ----------
        savefile : str
            File to save to, with extension ``.h5`` (HDF5).
        overwrite : bool
            If ``True`` and the specified ``savefile`` already exists,
            overwrite the file. If ``False`` (default) and the specified
            ``savefile`` already exists, raise an error.
        """
        with utils.hdf5_savehandle(savefile, overwrite=overwrite) as hf:
            # Metadata.
            meta = hf.create_dataset("meta", shape=(0,))
            meta.attrs["num_operators"] = len(self.operators)
            meta.attrs["r"] = (
                int(self.state_dimension) if self.state_dimension else 0
            )
            meta.attrs["m"] = (
                int(self.input_dimension) if self.input_dimension else 0
            )

            # Store operator data.
            hf.create_dataset(
                "indices_infer", data=self._indices_of_operators_to_infer
            )
            hf.create_dataset(
                "indices_known", data=self._indices_of_known_operators
            )
            for i, op in enumerate(self.operators):
                op.save(hf.create_group(f"operator_{i}"))
            if self.solver is not None:
                self.solver.save(hf.create_group("solver"))

    @classmethod
    def load(cls, loadfile: str):
        """Load a serialized model from an HDF5 file, created previously from
        the :meth:`save()` method.

        Parameters
        ----------
        loadfile : str
            Path to the file where the model was stored via :meth:`save()`.

        Returns
        -------
        model : _NonparametricModel
            Loaded model.
        """
        with utils.hdf5_loadhandle(loadfile) as hf:
            # Load metadata.
            num_operators = int(hf["meta"].attrs["num_operators"])
            indices_infer = [int(i) for i in hf["indices_infer"][:]]
            indices_known = [int(i) for i in hf["indices_known"][:]]

            # Load operators.
            ops = []
            for i in range(num_operators):
                gp = hf[f"operator_{i}"]
                OpClassName = gp["meta"].attrs["class"]
                ops.append(_operator_name2class[OpClassName].load(gp))

            # Construct the model.
            model = cls(ops)
            model._indices_of_operators_to_infer = indices_infer
            model._indices_of_known_operators = indices_known
            if r := int(hf["meta"].attrs["r"]):
                model.state_dimension = r
            if (m := int(hf["meta"].attrs["m"])) and model._has_inputs:
                model.input_dimension = m

        return model


# Public classes ==============================================================
class SteadyModel(_NonparametricModel):  # pragma: no cover
    r"""Nonparametric steady state model :math:`\zhat = \fhat(\qhat)`.

    Here,

    * :math:`\qhat(t)\in\RR^{r}` is the model state, and
    * :math:`\u(t)\in\RR^{m}` is the (optional) input.

    The structure of :math:`\fhat` is specified through the
    ``operators`` argument.

    Parameters
    ----------
    operators : list of :mod:`opinf.operators` objects
        Operators comprising the terms of the model.
    solver : :mod:`opinf.lstsq` object or float > 0 or None
        Solver for the least-squares regression. Defaults:

        * ``None``: :class:`opinf.lstsq.PlainSolver`.
          SVD-based solve without regularization.
        * float > 0: :class:`opinf.lstsq.L2Solver`.
          SVD-based solve with scalar Tikhonov regularization.
    """

    _LHS_ARGNAME = "forcing"
    _LHS_LABEL = "z"
    _STATE_LABEL = "q"
    _INPUT_LABEL = None
    # TODO: disallow input terms?

    def fit(self, states, forcing=None):
        r"""Learn the model operators from data.

        The operators are inferred by solving the regression problem

        .. math::
           \min_{\Ohat}\sum_{j=0}^{k-1}\left\|
           \fhat(\qhat_j) - \zhat_{j}
           \right\|_2^2
           = \min_{\Ohat}\left\|
           \D\Ohat\trp - [~\zhat_0~~\cdots~~\zhat_{k-1}~]\trp
           \right\|_F^2

        where :math:`\zhat = \fhat(\qhat)` is the model and

        * :math:`\qhat_j\in\RR^r` is a measurement of the state,
        * :math:`\zhat_j\in\RR^r` is a measurement of the forcing term
          corresponding to :math:`\qhat_j`,
        * :math:`\Ohat\in\RR^{r\times d(r,m)}` is the *operator matrix* such
          that :math:`\fhat(\q,\u) = \Ohat\d(\qhat,\u)` for some data vector
          :math:`\d(\qhat,\u)\in\RR^{d(r,m)}`, and
        * :math:`\D\in\RR^{k\times d(r,m)}` is the *data matrix* given by
          :math:`[~\d(\qhat_0,\u_0)~~\cdots~~\d(\qhat_{k-1},\u_{k-1})~]\trp`.

        See the :mod:`opinf.operators` module for further explanation.

        The strategy for solving the regression, as well as any additional
        regularization or constraints, are specified by the ``solver``.

        Parameters
        ----------
        states : (r, k) ndarray
            Snapshot training data. Each column is a single snapshot.
        forcing : (r, k) ndarray or None
            Forcing training data. Each column ``forcing[:, j]``
            corresponds to the snapshot ``states[:, j]``.
            If ``None``, set ``forcing = 0``.

        Returns
        -------
        self
        """
        return _NonparametricModel.fit(self, states, forcing, inputs=None)

    def rhs(self, state):
        r"""Evaluate the right-hand side of the model by applying each operator
        and summing the results.

        This is the function :math:`\fhat(\qhat)` where the model is given by
        :math:`\zhat = \fhat(\qhat)`.

        Parameters
        ----------
        state : (r,) ndarray
            State vector :math:`\qhat`.

        Returns
        -------
        g: (r,) ndarray
            Evaluation of the right-hand-side of the model.
        """
        return _NonparametricModel.rhs(self, state, None)

    def jacobian(self, state):
        r"""Sum the state Jacobian of each model operator.

        This the derivative of the right-hand side of the model with respect
        to the state, i.e., the function :math:`\ddqhat\fhat(\qhat)`
        where the model is given by :math:`\zhat = \fhat(\qhat)`.

        Parameters
        ----------
        state : (r,) ndarray
            State vector :math:`\qhat`.

        Returns
        -------
        jac : (r, r) ndarray
            State Jacobian of the right-hand side of the model.
        """
        return _NonparametricModel.jacobian(self, state, input_=None)

    def predict(self, forcing, guess=None):
        """Solve the model with the given forcing and initial guess."""
        raise NotImplementedError("TODO")


[docs] class DiscreteModel(_NonparametricModel): r"""Nonparametric discrete dynamical system model :math:`\qhat_{j+1} = \fhat(\qhat_{j}, \u_{j})`. Here, * :math:`\qhat_j\in\RR^{r}` is the :math:`j`-th iteration of the model state, and * :math:`\u_j\in\RR^{m}` is the (optional) corresponding input. The structure of :math:`\fhat` is specified through the ``operators`` attribute. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. solver : :mod:`opinf.lstsq` object or float > 0 or None Solver for the least-squares regression. Defaults: * ``None``: :class:`opinf.lstsq.PlainSolver`. SVD-based solve without regularization. * float > 0: :class:`opinf.lstsq.L2Solver`. SVD-based solve with scalar Tikhonov regularization. """ _LHS_ARGNAME = "nextstates" _LHS_LABEL = r"q_{j+1}" _STATE_LABEL = r"q_{j}" _INPUT_LABEL = r"u_{j}"
[docs] @staticmethod def stack_trajectories(statelist, inputlist=None): """Translate a collection of state trajectories and (optionally) inputs to arrays that are appropriate arguments for :meth:`fit()`. Parameters ---------- statelist : list of s (r, k_i) ndarrays Collection of state trajectories. inputlist : list of s (m, k_i) ndarrays Collection of inputs corresponding to the state trajectories. Returns ------- states : (r, sum_i(k_i)) ndarray Snapshot matrix with data from all but the final snapshot of each trajectory in ``statelist``. nextstates : (r, sum_i(k_i)) ndarray Snapshot matrix with data from all but the first snapshot of each trajectory in ``statelist``. inputs : (r, sum_i(k_i)) ndarray Input matrix with data from all but the last input for each trajectory. Only returned if ``inputlist`` is provided. """ states = np.hstack([Q[:, :-1] for Q in statelist]) nextstates = np.hstack([Q[:, 1:] for Q in statelist]) if inputlist is not None: inputs = np.hstack( [ U[..., : (S.shape[1] - 1)] for S, U in zip(statelist, inputlist) ] ) return states, nextstates, inputs return states, nextstates
[docs] def fit(self, states, nextstates=None, inputs=None): r"""Learn the model operators from data. The operators are inferred by solving the regression problem .. math:: \min_{\Ohat}\sum_{j=0}^{k-1}\left\| \fhat(\qhat_j, \u_j) - \zhat_{j} \right\|_2^2 = \min_{\Ohat}\left\| \D\Ohat\trp - [~\zhat_0~~\cdots~~\zhat_{k-1}~]\trp \right\|_F^2 where :math:`\zhat_j = \fhat(\qhat_j, \u_j)` is the model and * :math:`\qhat_j\in\RR^r` is a measurement of the state, * :math:`\u_j\in\RR^m` is a measurement of the input, and * :math:`\zhat_j\in\RR^r` is a measurement of the next state iteration after :math:`\qhat_j`, * :math:`\Ohat\in\RR^{r\times d(r,m)}` is the *operator matrix* such that :math:`\fhat(\q,\u) = \Ohat\d(\qhat,\u)` for some data vector :math:`\d(\qhat,\u)\in\RR^{d(r,m)}`, and * :math:`\D\in\RR^{k\times d(r,m)}` is the *data matrix* given by :math:`[~\d(\qhat_0,\u_0)~~\cdots~~\d(\qhat_{k-1},\u_{k-1})~]\trp`, See the :mod:`opinf.operators` module for further explanation. The strategy for solving the regression, as well as any additional regularization or constraints, are specified by the ``solver``. Parameters ---------- states : (r, k) ndarray Snapshot training data. Each column is a single snapshot. nextstates : (r, k) ndarray or None Next iteration training data. Each column ``nextstates[:, j]`` is the iteration following ``states[:, j]``. If ``None``, use ``nextstates[:, j] = states[:, j+1]``. inputs : (m, k) or (k,) ndarray or None Input training data. Each column ``inputs[:, j]`` corresponds to the snapshot ``states[:, j]``. May be a one-dimensional array if ``m=1`` (scalar input). Returns ------- self """ if nextstates is None: nextstates = states[:, 1:] states = states[:, :-1] if inputs is not None: inputs = inputs[..., : states.shape[1]] return _NonparametricModel.fit(self, states, nextstates, inputs=inputs)
[docs] def rhs(self, state, input_=None): r"""Evaluate the right-hand side of the model by applying each operator and summing the results. This is the function :math:`\fhat(\qhat, \u)` where the model is given by :math:`\qhat_{j+1} = \fhat(\qhat_{j}, \u_{j})`. Parameters ---------- state : (r,) ndarray State vector :math:`\qhat`. input_ : (m,) ndarray or None Input vector :math:`\u`. Returns ------- nextstate : (r,) ndarray Evaluation of the right-hand side of the model. """ return _NonparametricModel.rhs(self, state, input_)
[docs] def jacobian(self, state, input_=None): r"""Sum the state Jacobian of each model operator. This the derivative of the right-hand side of the model with respect to the state, i.e., the function :math:`\ddqhat\fhat(\qhat, \u)` where the model is given by :math:`\qhat_{j+1} = \fhat(\qhat_{j}, \u_{j})`. Parameters ---------- state : (r,) ndarray State vector :math:`\qhat`. input_ : (m,) ndarray or None Input vector :math:`\u`. Returns ------- jac : (r, r) ndarray State Jacobian of the right-hand side of the model. """ return _NonparametricModel.jacobian(self, state, input_)
[docs] def predict(self, state0, niters, inputs=None): """Step forward the discrete dynamical system ``niters`` steps. Essentially, this amounts to the following. .. code-block:: python >>> states[:, 0] = state0 >>> states[:, 1] = model.rhs(states[:, 0], inputs[:, 0]) >>> states[:, 2] = model.rhs(states[:, 1], inputs[:, 1]) ... # Repeat `niters` times. Parameters ---------- state0 : (r,) ndarray Initial state. niters : int Number of times to step the system forward. inputs : (m, niters-1) ndarray or None Inputs for the next ``niters - 1`` time steps. Returns ------- states : (r, niters) ndarray Solution to the system, including the initial condition ``state0``. """ self._check_is_trained() # Check initial condition dimension and process inputs. if (_shape := np.shape(state0)) != (self.state_dimension,): raise errors.DimensionalityError( "initial condition not aligned with model " f"(state0.shape = {_shape} != " f"({self.state_dimension},) = (r,))" ) self._check_inputargs(inputs, "inputs") # Verify iteration argument. if not isinstance(niters, int) or niters < 1: raise ValueError("argument 'niters' must be a positive integer") # Create the solution array and fill in the initial condition. states = np.empty((self.state_dimension, niters)) states[:, 0] = state0.copy() # Run the iteration. if self._has_inputs: if callable(inputs): raise TypeError("inputs must be NumPy array, not callable") # Validate shape of input, reshaping if input is 1d. U = np.atleast_2d(inputs) if ( U.ndim != 2 or U.shape[0] != self.input_dimension or U.shape[1] < niters - 1 ): raise ValueError( f"inputs.shape = ({U.shape} " f"!= {(self.input_dimension, niters-1)} = (m, niters-1)" ) for j in range(niters - 1): states[:, j + 1] = self.rhs(states[:, j], U[:, j]) else: for j in range(niters - 1): states[:, j + 1] = self.rhs(states[:, j]) # Return state results. return states
[docs] class ContinuousModel(_NonparametricModel): r"""Nonparametric system of ordinary differential equations :math:`\ddt\qhat(t) = \fhat(\qhat(t), \u(t))`. Here, * :math:`\qhat(t)\in\RR^{r}` is the model state, and * :math:`\u(t)\in\RR^{m}` is the (optional) input. The structure of :math:`\fhat` is specified through the ``operators`` argument. Parameters ---------- operators : list of :mod:`opinf.operators` objects Operators comprising the terms of the model. solver : :mod:`opinf.lstsq` object or float > 0 or None Solver for the least-squares regression. Defaults: * ``None``: :class:`opinf.lstsq.PlainSolver`. SVD-based solve without regularization. * float > 0: :class:`opinf.lstsq.L2Solver`. SVD-based solve with scalar Tikhonov regularization. """ _LHS_ARGNAME = "ddts" _LHS_LABEL = "dq / dt" _STATE_LABEL = "q(t)" _INPUT_LABEL = "u(t)"
[docs] def fit(self, states, ddts, inputs=None): r"""Learn the model operators from data. The operators are inferred by solving the regression problem .. math:: \min_{\Ohat}\sum_{j=0}^{k-1}\left\| \fhat(\qhat_j, \u_j) - \dot{\qhat}_j \right\|_2^2 = \min_{\Ohat}\left\| \D\Ohat\trp - [~\dot{\qhat}_0~~\cdots~~\dot{\qhat}_{k-1}~]\trp \right\|_F^2 where :math:`\ddt\qhat(t) = \fhat(\qhat(t), \u(t))` is the model and * :math:`\qhat_j\in\RR^r` is the state at some time :math:`t_j`, * :math:`\u_j\in\RR^m` is the input at time :math:`t_j`, * :math:`\dot{\qhat}_j\in\RR^r` is the time derivative of the state at time :math:`t_j`, i.e., :math:`\dot{\qhat}_j = \ddt\qhat(t)\big|_{t=t_j}`, * :math:`\Ohat\in\RR^{r\times d(r,m)}` is the *operator matrix* such that :math:`\fhat(\q,\u) = \Ohat\d(\qhat,\u)` for some data vector :math:`\d(\qhat,\u)\in\RR^{d(r,m)}`, and * :math:`\D\in\RR^{k\times d(r,m)}` is the *data matrix* given by :math:`[~\d(\qhat_0,\u_0)~~\cdots~~\d(\qhat_{k-1},\u_{k-1})~]\trp`. See the :mod:`opinf.operators` module for further explanation. The strategy for solving the regression, as well as any additional regularization or constraints, are specified by the ``solver``. Parameters ---------- states : (r, k) ndarray Snapshot training data. Each column is a single snapshot. ddts : (r, k) ndarray Snapshot time derivative data. Each column ``ddts[:, j]`` corresponds to the snapshot ``states[:, j]``. inputs : (m, k) or (k,) ndarray or None Input training data. Each column ``inputs[:, j]`` corresponds to the snapshot ``states[:, j]``. May be a one-dimensional array if ``m=1`` (scalar input). Returns ------- self """ return _NonparametricModel.fit(self, states, ddts, inputs=inputs)
[docs] def rhs(self, t, state, input_func=None): r"""Evaluate the right-hand side of the model by applying each operator and summing the results. This is the right-hand side of the model, i.e., the function :math:`\fhat(\qhat(t), \u(t))` where the model is given by :math:`\ddt \qhat(t) = \fhat(\qhat(t), \u(t))`. Parameters ---------- t : float Time :math:`t`, a scalar. state : (r,) ndarray State vector :math:`\qhat(t)` corresponding to time ``t``. input_func : callable(float) -> (m,), or None Input function that maps time ``t`` to the input vector :math:`\u(t)`. Returns ------- dqdt : (r,) ndarray Evaluation of the right-hand side of the model. """ input_ = None if not self._has_inputs else input_func(t) return _NonparametricModel.rhs(self, state, input_)
[docs] def jacobian(self, t, state, input_func=None): r"""Sum the state Jacobian of each model operator. This the derivative of the right-hand side of the model with respect to the state, i.e., the function :math:`\ddqhat\fhat(\qhat(t), \u(t))` where the model is given by :math:`\ddt\qhat(t) = \fhat(\qhat(t), \u(t))`. Parameters ---------- t : float Time :math:`t`, a scalar. state : (r,) ndarray State vector :math:`\qhat(t)` corresponding to time ``t``. input_func : callable(float) -> (m,), or None Input function that maps time ``t`` to the input vector :math:`\u(t)`. Returns ------- jac : (r, r) ndarray State Jacobian of the right-hand side of the model. """ input_ = None if not self._has_inputs else input_func(t) return _NonparametricModel.jacobian(self, state, input_)
[docs] def predict(self, state0, t, input_func=None, **options): """Solve the system of ordinary differential equations. This method wraps :func:`scipy.integrate.solve_ivp()`. Parameters ---------- state0 : (r,) ndarray Initial state vector. t : (nt,) ndarray Time domain over which to integrate the model. input_func : callable or (m, nt) ndarray Input as a function of time (preferred) or the input values at the times ``t``. If given as an array, cubic spline interpolation on the known data points is used as needed. options Arguments for :func:`scipy.integrate.solve_ivp()`. Common options: * **method : str** ODE solver for the model. * ``'RK45'`` (default): Explicit Runge--Kutta method of order 5(4). * ``'RK23'``: Explicit Runge--Kutta method of order 3(2). * ``'Radau'``: Implicit Runge--Kutta method of the Radau IIA family of order 5. * ``'BDF'``: Implicit multi-step variable-order (1 to 5) method based on a backward differentiation formula for the derivative. * ``'LSODA'``: Adams/BDF method with automatic stiffness detection and switching. * **max_step : float** Maximimum allowed integration step size. Returns ------- states : (r, nt) ndarray Computed solution to the system over the time domain ``t``. A more detailed report on the integration results is stored as the ``predict_result_`` attribute. """ self._check_is_trained() # Check initial condition dimension and process inputs. if (_shape := np.shape(state0)) != (self.state_dimension,): raise errors.DimensionalityError( "initial condition not aligned with model " f"(state0.shape = {_shape} != " f"({self.state_dimension},) = (r,))" ) self._check_inputargs(input_func, "input_func") # Verify time domain. if t.ndim != 1: raise ValueError("time 't' must be one-dimensional") nt = t.shape[0] # Interpret control input argument. if self._has_inputs: if not callable(input_func): # input_func must be (m, nt) ndarray. Interpolate -> callable. U = np.atleast_2d(input_func) if U.shape != (self.input_dimension, nt): raise ValueError( f"input_func.shape = {U.shape} " f"!= {(self.input_dimension, nt)} = (m, len(t))" ) input_func = spinterpolate.CubicSpline(t, U, axis=1) # Check dimension of input_func() outputs. _tmp = input_func(t[0]) if self.input_dimension == 1 and np.isscalar(_tmp): original_input_func = input_func def input_func(t): """Wrap outputs of input_func() as an array.""" return np.array([original_input_func(t)]) _tmp = input_func(t[0]) if not isinstance(_tmp, np.ndarray) or _tmp.shape != ( self.input_dimension, ): raise errors.DimensionalityError( "input_func() must return ndarray" f" of shape (m,) = ({self.input_dimension},)" ) if "method" in options and options["method"] in ( # These methods require the Jacobian. "BDF", "Radau", "LSODA", ): options["jac"] = self.jacobian # Integrate the model. out = spintegrate.solve_ivp( self.rhs, # Integrate this function [t[0], t[-1]], # over this time interval state0, # from this initial condition args=(input_func,), # with this input function t_eval=t, # evaluated at these points **options, # using these solver options. ) # Warn if the integration failed. if not out.success: # pragma: no cover warnings.warn(out.message, spintegrate.IntegrationWarning) # Return state results. self.predict_result_ = out return out.y
# "Frozen" classes for parametric evaluation ================================== class _FrozenMixin: """Mixin for evaluations of parametric models (disables fit()).""" def _clear(self): raise NotImplementedError( "_clear() is disabled for this class, " "call fit() on the parametric model object" ) @property def solver(self): return None @solver.setter def solver(self, solver): pass def fit(*args, **kwargs): raise NotImplementedError( "fit() is disabled for this class, " "call fit() on the parametric model object" ) class _FrozenSteadyModel(_FrozenMixin, SteadyModel): """Nonparametric steady-state model that is the evaluation of a parametric model. """ pass # pragma: no cover class _FrozenDiscreteModel(_FrozenMixin, DiscreteModel): """Nonparametric discrete-time model that is the evaluation of a parametric model. """ pass # pragma: no cover class _FrozenContinuousModel(_FrozenMixin, ContinuousModel): """Nonparametric continuous-time model that is the evaluation of a parametric model. """ pass # pragma: no cover