Source code for opinf.utils._gridsearch

# _gridsearch.py
"""Routine for performing a grid search, followed by a derivative-free
optimization.
"""

__all__ = [
    "gridsearch",
    "DiscreteRegTest",
    "ContinuousRegTest",
]

import abc
import typing
import warnings
import dataclasses
import numpy as np
import scipy.optimize

from .. import errors


__MAXOPTVAL = 1e20


[docs] def gridsearch( func: typing.Callable, candidates: np.ndarray, gridsearch_only: bool = False, label: str = "parameter", verbose: bool = False, ) -> typing.Union[float, np.ndarray]: r"""Minimize a function by first checking a collection of candidates, then following up with a derivative-free optimization routine. If the candidates are one-dimensional, meaning ``func()`` is a scalar map :math:`f:\RR_+\to\RR`, the optimization is carried out via :func:`scipy.optimize.minimize_scalar` with ``method='brent'``. Otherwise (:math:`f:\RR_+^p\to\RR` for some :math:`p > 1`), the optimization is carried out via :func:`scipy.optimize.minimize` with ``method='Nelder-Mead'``. Parameters ---------- func : callable(candidate) -> float Function to minimize. candidates : list of floats or ndarrays Candidates to check before starting the optimization routine. gridsearch_only : bool If ``True``, stop after checking all ``candidates`` and do not follow up with optimization. label : str Label for the types of candidates being checked. verbose : bool If ``True``, print information at each iteration. Returns ------- winner : float or ndarray Optimized candidate. Raises ------ RuntimeError If the grid search fails because ``func()`` returns ``np.inf`` for each candidate. Warns ----- OpInfWarning 1) If the ``candidates`` are one-dimensional and the grid search winner is the smallest or largest candidate, or 2) If the minimization fails or does not result in a solution that improves upon the grid search. Notes ----- The optimization minimizes ``func(x)`` by varying ``x`` logarithmically. Unless ``gridsearch_only=True``, it is assumed that all entries of the argument ``x`` are positive. If only one scalar candidate is given, the (one-dimensional) optimization sets the bounds to ``[candidate / 100, candidate * 100]``. """ # Process the candidates. candidates = np.atleast_1d(candidates) if np.shape(candidates[0]) == (1,): candidates = [reg[0] for reg in candidates] scalar_regularization = np.ndim(candidates[0]) == 0 def pstr(params): if np.isscalar(params): return f"{params:.4e}" else: return "[" + ", ".join([f"{p:.3e}" for p in params]) + "]" def linfunc(params): if verbose: print(f"{label.title()} {pstr(params)}...", end="", flush=True) out = func(params) if verbose: if out == np.inf: print("UNSTABLE") else: print(f"{out:.8%} error") return out def logfunc(log10params): params = 10**log10params return min(__MAXOPTVAL, linfunc(params)) # Grid search. num_tests = len(candidates) if verbose: print(f"\nGRID SEARCH ({num_tests} candidates)") ndigits = len(str(num_tests)) winning_error, winner_index = np.inf, None if scalar_regularization: candidates = np.sort(candidates) for i, candidate in enumerate(candidates): if verbose: print(f"({i+1: >{ndigits}d}/{num_tests:d}) ", end="") if (error := linfunc(candidate)) < winning_error: winning_error = error winner_index = i if winner_index is None: raise RuntimeError(f"{label} grid search failed") gridsearch_winner = candidates[winner_index] if verbose: print( f"Best {label} candidate via grid search:", pstr(gridsearch_winner), ) if gridsearch_only: return gridsearch_winner # Post-process results if the candidates are scalars. if scalar_regularization: if num_tests == 1: search_bounds = [gridsearch_winner / 1e2, 1e2 * gridsearch_winner] elif winner_index == 0: warnings.warn( f"smallest {label} candidate won grid search, " f"consider using smaller candidates", errors.OpInfWarning, ) search_bounds = [gridsearch_winner / 1e2, candidates[1]] elif winner_index == num_tests - 1: warnings.warn( f"largest {label} candidate won grid search, " f"consider using larger candidates", errors.OpInfWarning, ) search_bounds = [candidates[-2], 1e2 * gridsearch_winner] else: search_bounds = [ candidates[winner_index - 1], candidates[winner_index + 1], ] # Follow up grid search with minimization-based search. if verbose: print("OPTIMIZATION") if scalar_regularization: opt_result = scipy.optimize.minimize_scalar( logfunc, method="bounded", bounds=np.log10(search_bounds), ) else: opt_result = scipy.optimize.minimize( logfunc, x0=np.log10(gridsearch_winner), method="Nelder-Mead", ) # Report results. success = opt_result.success and opt_result.fun != __MAXOPTVAL if not success or (winning_error < opt_result.fun): warnings.warn( f"{label} grid search performed better than optimization, " "falling back on grid search solution", errors.OpInfWarning, ) return gridsearch_winner optimization_winner = 10**opt_result.x if verbose: print( f"Best {label} candidate via optimization:", pstr(optimization_winner), ) return optimization_winner
# Additional regularization test cases ======================================== class _RegTest(abc.ABC): """Base class for regularization selection test cases.""" def evaluate(self, model, **predict_options): """Evaluate the ``model`` under the test case conditions. Parameters ---------- model : opinf.models object Instantiated model, ready for prediction. predict_options : dict Additional keyword arguments for ``model.predict()``. Returns ------- result : bool ``True`` if the ``model`` produces a stable solution under the test case conditions, ``False`` otherwise. """ with warnings.catch_warnings(): warnings.simplefilter("ignore") solution = model.predict(*self.predict_args, **predict_options) return not self.unstable(solution) @abc.abstractmethod def unstable(self, Q): """Return ``True`` if the trajectory ``Q`` is unstable.""" pass # pragma: no cover @abc.abstractmethod def copy(self, newICs): """Return a copy of this test case with new initial conditions.""" pass # pragma: no cover
[docs] @dataclasses.dataclass(frozen=True) class DiscreteRegTest(_RegTest): """Test case for regularization selection with fully discrete models. The ``test_cases`` argument of :meth:`opinf.roms.ROM.fit_regselect_discrete` is a list of these. Parameters ---------- initial_conditions : (n,) ndarray Initial conditions to be tested. niters : int Number of iterations to step forward from the initial conditions. parameters : (p,) ndarray or float or None Parameter value to be tested. inputs : (m, niters) ndarray or None Inputs to use in the forward prediction, if the model takes inputs. bound : float or None Amount that the forward prediction is allowed to deviate from the initial conditions without the trajectory being classified as unstable. """ initial_conditions: np.ndarray niters: int parameters: np.ndarray = None inputs: np.ndarray = None bound: float = None predict_args: tuple = dataclasses.field(init=False) def __post_init__(self): predict_args = [ self.initial_conditions, self.niters, self.inputs, ] if self.parameters is not None: predict_args.insert(0, self.parameters) object.__setattr__(self, "predict_args", tuple(predict_args))
[docs] def copy(self, newICs): return self.__class__( newICs, self.niters, self.parameters, self.inputs, self.bound, )
[docs] def unstable(self, Q): """Return ``True`` if the trajectory ``Q`` is unstable.""" if np.isnan(Q).any() or np.isinf(Q).any(): return True if (B := self.bound) is not None: Qshifted = Q - self.initial_conditions.reshape((-1, 1)) if np.any(np.abs(Qshifted) > B): return True return False
[docs] @dataclasses.dataclass(frozen=True) class ContinuousRegTest(_RegTest): """Test case for regularization selection with time-continuous models. The ``test_cases`` argument of :meth:`opinf.roms.ROM.fit_regselect_continuous` is a list of these. Parameters ---------- initial_conditions : (n,) ndarray Initial conditions to be tested. time_domain : (k,) ndarray Time domain over which to solve the model forward in time. parameters : (p,) ndarray or float or None Parameter value to be tested. inputs : callable or None Input function, mapping time to input vectors, to use in the forward prediction, if the model takes inputs. bound : float or None Amount that the forward prediction is allowed to deviate from the initial conditions without the trajectory being classified as unstable. """ initial_conditions: np.ndarray time_domain: np.ndarray parameters: np.ndarray = None input_function: typing.Callable = None bound: float = None predict_args: tuple = dataclasses.field(init=False) def __post_init__(self): predict_args = [ self.initial_conditions, self.time_domain, self.input_function, ] if self.parameters is not None: predict_args.insert(0, self.parameters) object.__setattr__(self, "predict_args", tuple(predict_args))
[docs] def copy(self, newICs): return self.__class__( newICs, self.time_domain, self.parameters, self.input_function, self.bound, )
[docs] def unstable(self, Q): """Return ``True`` if the trajectory ``Q`` is unstable.""" if Q.shape[-1] != self.time_domain.size: return True if np.isnan(Q).any() or np.isinf(Q).any(): return True if (B := self.bound) is not None: Qshifted = Q - self.initial_conditions.reshape((-1, 1)) if np.any(np.abs(Qshifted) > B): return True return False