Source code for opty.direct_collocation

#!/usr/bin/env python

from functools import wraps
import logging

import numpy as np
import sympy as sm
from sympy.physics import mechanics as me
import cyipopt
try:
    plt = sm.external.import_module('matplotlib.pyplot',
                                    __import__kwargs={'fromlist': ['']},
                                    catch=(RuntimeError,))
except TypeError:  # SymPy >=1.6
    plt = sm.external.import_module('matplotlib.pyplot',
                                    import_kwargs={'fromlist': ['']},
                                    catch=(RuntimeError,))

from .utils import (ufuncify_matrix, parse_free, _optional_plt_dep,
                    _forward_jacobian, sort_sympy)

__all__ = ['Problem', 'ConstraintCollocator']


class _DocInherit(object):
    """
    Docstring inheriting method descriptor

    The class itself is also used as a decorator

    Taken from https://stackoverflow.com/questions/2025562/inherit-docstrings-in-python-class-inheritance

    This is the rather complex solution to using the super classes method
    docstring and modifying it.
    """

    def __init__(self, mthd):
        self.mthd = mthd
        self.name = mthd.__name__

    def __get__(self, obj, cls):
        if obj:
            return self.get_with_inst(obj, cls)
        else:
            return self.get_no_inst(cls)

    def get_with_inst(self, obj, cls):

        overridden = getattr(super(cls, obj), self.name, None)

        @wraps(self.mthd, assigned=('__name__', '__module__'))
        def f(*args, **kwargs):
            return self.mthd(obj, *args, **kwargs)

        return self.use_parent_doc(f, overridden)

    def get_no_inst(self, cls):

        for parent in cls.__mro__[1:]:
            overridden = getattr(parent, self.name, None)
            if overridden:
                break

        @wraps(self.mthd, assigned=('__name__', '__module__'))
        def f(*args, **kwargs):
            return self.mthd(*args, **kwargs)

        return self.use_parent_doc(f, overridden)

    def use_parent_doc(self, func, source):
        if source is None:
            raise NameError("Can't find '%s' in parents" % self.name)
        func.__doc__ = self._combine_docs(self.mthd.__doc__,
                                          ConstraintCollocator.__init__.__doc__)
        return func

    @staticmethod
    def _combine_docs(prob_doc, coll_doc):
        beg, end = prob_doc.split('bounds')
        _, middle = coll_doc.split('Parameters\n        ==========\n        ')
        return beg + middle[:-9] + '        bounds' + end

_doc_inherit = _DocInherit


[docs] class Problem(cyipopt.Problem): """This class allows the user to instantiate a problem object with the essential data required to solve a direct collocation optinal control or parameter identification problem.""" INF = 10e19 @_doc_inherit def __init__(self, obj, obj_grad, *args, **kwargs): """ Parameters ========== obj : function Returns the value of the objective function given the free vector. obj_grad : function Returns the gradient of the objective function given the free vector. bounds : dictionary, optional This dictionary should contain a mapping from any of the symbolic states, unknown trajectories, or unknown parameters to a 2-tuple of floats, the first being the lower bound and the second the upper bound for that free variable, e.g. ``{x(t): (-1.0, 5.0)}``. """ self.bounds = kwargs.pop('bounds', None) self.collocator = ConstraintCollocator(*args, **kwargs) self.obj = obj self.obj_grad = obj_grad self.con = self.collocator.generate_constraint_function() logging.info('Constraint function generated.') self.con_jac = self.collocator.generate_jacobian_function() logging.info('Jacobian function generated.') self.con_jac_rows, self.con_jac_cols = \ self.collocator.jacobian_indices() self.num_free = self.collocator.num_free self.num_constraints = self.collocator.num_constraints self._generate_bound_arrays() # All constraints are expected to be equal to zero. con_bounds = np.zeros(self.num_constraints) super(Problem, self).__init__(n=self.num_free, m=self.num_constraints, lb=self.lower_bound, ub=self.upper_bound, cl=con_bounds, cu=con_bounds) self.obj_value = [] def _generate_bound_arrays(self): lb = -self.INF * np.ones(self.num_free) ub = self.INF * np.ones(self.num_free) N = self.collocator.num_collocation_nodes num_state_nodes = N * self.collocator.num_states num_non_par_nodes = N * (self.collocator.num_states + self.collocator.num_unknown_input_trajectories) state_syms = self.collocator.state_symbols unk_traj = self.collocator.unknown_input_trajectories unk_par = self.collocator.unknown_parameters if self.bounds is not None: for var, bounds in self.bounds.items(): if var in state_syms: i = state_syms.index(var) start = i * N stop = start + N lb[start:stop] = bounds[0] * np.ones(N) ub[start:stop] = bounds[1] * np.ones(N) elif var in unk_traj: i = unk_traj.index(var) start = num_state_nodes + i * N stop = start + N lb[start:stop] = bounds[0] * np.ones(N) ub[start:stop] = bounds[1] * np.ones(N) elif var in unk_par: i = unk_par.index(var) idx = num_non_par_nodes + i lb[idx] = bounds[0] ub[idx] = bounds[1] self.lower_bound = lb self.upper_bound = ub
[docs] def objective(self, free): """Returns the value of the objective function given a solution to the problem. Parameters ========== free : ndarray, (n*N + q*N + r, ) A solution to the optimization problem in the canonical form. Returns ======= obj_val : float The value of the objective function. Notes ===== - N : number of collocation nodes - n : number of unknown state trajectories - q : number of unknown input trajectories - r : number of unknown parameters """ return self.obj(free)
[docs] def gradient(self, free): """Returns the value of the gradient of the objective function given a solution to the problem. Parameters ========== free : ndarray, (n*N + q*N + r, ) A solution to the optimization problem in the canonical form. Returns ======= gradient_val : ndarray, shape(n*N + q*N + r, 1) The value of the gradient of the objective function. Notes ===== - N : number of collocation nodes - n : number of unknown state trajectories - q : number of unknown input trajectories - r : number of unknown parameters """ # This should return a column vector. return self.obj_grad(free)
[docs] def constraints(self, free): """Returns the value of the constraint functions given a solution to the problem. Parameters ========== free : ndarray, (n*N + q*N + r, ) A solution to the optimization problem in the canonical form. Returns ======= constraints_val : ndarray, shape(n*(N - 1) + o) The value of the constraint function. Notes ===== - N : number of collocation nodes - n : number of unknown state trajectories - q : number of unknown input trajectories - r : number of unknown parameters - o : number of instance constraints """ # This should return a column vector. return self.con(free)
[docs] def jacobianstructure(self): """Returns the sparsity structur of the Jacobian of the constraint function. Returns ======= jac_row_idxs : ndarray, shape(2*n + q + r,) The row indices for the non-zero values in the Jacobian. jac_col_idxs : ndarray, shape(n,) The column indices for the non-zero values in the Jacobian. """ return (self.con_jac_rows, self.con_jac_cols)
[docs] def jacobian(self, free): """Returns the non-zero values of the Jacobian of the constraint function. Returns ======= jac_vals = ndarray, shape() """ return self.con_jac(free)
[docs] def intermediate(self, *args): """This method is called at every optimization iteration. Not for pubic use.""" self.obj_value.append(args[2])
[docs] @_optional_plt_dep def plot_trajectories(self, vector, axes=None): """Returns the axes for two plots. The first plot displays the state trajectories versuse time and the second plot displays the input trjaectories versus time. Parameters ========== vector : ndarray, (n*N + q*N + r, ) The initial guess, solution, or any other vector that is in the canonical form. axes : ndarray of AxesSubplot, shape(n + m, ) An array of matplotlib axes to plot to. Returns ======= axes : ndarray of AxesSubplot A matplotlib axes with the state and input trajectories plotted. Notes ===== - N : number of collocation nodes - n : number of unknown state trajectories - m : number of input trajectories - q : number of unknown input trajectories - r : number of unknown parameters """ state_traj, input_traj, constants = \ parse_free(vector, self.collocator.num_states, self.collocator.num_unknown_input_trajectories, self.collocator.num_collocation_nodes) time = np.linspace(0, self.collocator.num_collocation_nodes * self.collocator.node_time_interval, num=self.collocator.num_collocation_nodes) num_axes = (self.collocator.num_states + self.collocator.num_input_trajectories) traj_syms = (self.collocator.state_symbols + self.collocator.input_trajectories) trajectories = np.vstack((state_traj, input_traj)) if axes is None: fig, axes = plt.subplots(num_axes, 1, sharex=True) for ax, traj, symbol in zip(axes, trajectories, traj_syms): ax.plot(time, traj) ax.set_ylabel(sm.latex(symbol, mode='inline')) ax.set_xlabel('Time') axes[0].set_title('State Trajectories') axes[self.collocator.num_states].set_title('Input Trajectories') return axes
[docs] @_optional_plt_dep def plot_constraint_violations(self, vector): """Returns an axis with the state constraint violations plotted versus node number and the instance constraints as a bar graph. Parameters ========== vector : ndarray, (n*N + q*N + r, ) The initial guess, solution, or any other vector that is in the canonical form. Returns ======= axes : ndarray of AxesSubplot A matplotlib axes with the constraint violations plotted. Notes ===== - N : number of collocation nodes - n : number of unknown state trajectories - q : number of unknown input trajectories - r : number of unknown parameters """ con_violations = self.con(vector) con_nodes = range(self.collocator.num_states, self.collocator.num_collocation_nodes + 1) N = len(con_nodes) fig, axes = plt.subplots(self.collocator.num_states + 1) for i, (ax, symbol) in enumerate(zip(axes[:-1], self.collocator.state_symbols)): ax.plot(con_nodes, con_violations[i * N:i * N + N]) ax.set_ylabel(sm.latex(symbol, mode='inline')) axes[0].set_title('Constraint Violations') axes[-2].set_xlabel('Node Number') left = range(len(con_violations[self.collocator.num_states * N:])) axes[-1].bar(left, con_violations[self.collocator.num_states * N:], tick_label=[sm.latex(s, mode='inline') for s in self.collocator.instance_constraints]) axes[-1].set_ylabel('Instance') axes[-1].set_xticklabels(axes[-1].get_xticklabels(), rotation=-10) return axes
[docs] @_optional_plt_dep def plot_objective_value(self): """Returns an axis with the objective value plotted versus the optimization iteration. solve() must be run first.""" fig, ax = plt.subplots(1) ax.set_title('Objective Value') ax.plot(self.obj_value) ax.set_ylabel('Objective Value') ax.set_xlabel('Iteration Number') return ax
[docs] class ConstraintCollocator(object): """This class is responsible for generating the constraint function and the sparse Jacobian of the constraint function using direct collocation methods for a non-linear programming problem where the essential constraints are defined from the equations of motion of the system. Notes ===== - N : number of collocation nodes - n : number of states - m : number of input trajectories - p : number of parameters - q : number of unknown input trajectories - r : number of unknown parameters - o : number of instance constraints - nN + qN + r : number of free variables - n(N - 1) + o : number of constraints """ time_interval_symbol = sm.Symbol('h', real=True) def __init__(self, equations_of_motion, state_symbols, num_collocation_nodes, node_time_interval, known_parameter_map={}, known_trajectory_map={}, instance_constraints=None, time_symbol=None, tmp_dir=None, integration_method='backward euler', parallel=False): """Instantiates a ConstraintCollocator object. Parameters ========== equations_of_motion : sympy.Matrix, shape(n, 1) A column matrix of SymPy expressions defining the right hand side of the equations of motion when the left hand side is zero, e.g. 0 = x'(t) - f(x(t), u(t), p) or 0 = f(x'(t), x(t), u(t), p). These should be in first order form but not necessairly explicit. state_symbols : iterable An iterable containing all of the SymPy functions of time which represent the states in the equations of motion. num_collocation_nodes : integer The number of collocation nodes, N. All known trajectory arrays should be of this length. node_time_interval : float The time interval between collocation nodes. known_parameter_map : dictionary, optional A dictionary that maps the SymPy symbols representing the known constant parameters to floats. Any parameters in the equations of motion not provided in this dictionary will become free optimization variables. known_trajectory_map : dictionary, optional A dictionary that maps the non-state SymPy functions of time to ndarrays of floats of shape(N,). Any time varying parameters in the equations of motion not provided in this dictionary will become free trajectories optimization variables. instance_constraints : iterable of SymPy expressions, optional These expressions are for constraints on the states at specific time points. They can be expressions with any state instance and any of the known parameters found in the equations of motion. All states should be evaluated at a specific instant of time. For example, the constraint x(0) = 5.0 would be specified as x(0) - 5.0 and the constraint x(0) = x(5.0) would be specified as x(0) - x(5.0). Unknown parameters and time varying parameters other than the states are currently not supported. time_symbol : SymPy Symbol, optional The symbol representating time in the equations of motion. If not given, it is assumed to be the default stored in dynamicsymbols._t. tmp_dir : string, optional If you want to see the generated Cython and C code for the constraint and constraint Jacobian evaluations, pass in a path to a directory here. integration_method : string, optional The integration method to use, either `backward euler` or `midpoint`. parallel : boolean, optional If true and openmp is installed, constraints and the Jacobian of the constraints will be executed across multiple threads. This is only useful when the equations of motion have an extremely large number of operations. """ self.eom = equations_of_motion if time_symbol is not None: self.time_symbol = time_symbol else: self.time_symbol = me.dynamicsymbols._t self.state_symbols = tuple(state_symbols) self.state_derivative_symbols = tuple([s.diff(self.time_symbol) for s in state_symbols]) self.num_states = len(self.state_symbols) self.num_collocation_nodes = num_collocation_nodes self.node_time_interval = node_time_interval self.known_parameter_map = known_parameter_map self.known_trajectory_map = known_trajectory_map self.instance_constraints = instance_constraints self.num_constraints = self.num_states * (num_collocation_nodes - 1) self.tmp_dir = tmp_dir self.parallel = parallel self._sort_parameters() self._check_known_trajectories() self._sort_trajectories() self.num_free = ((self.num_states + self.num_unknown_input_trajectories) * self.num_collocation_nodes + self.num_unknown_parameters) self.integration_method = integration_method if instance_constraints is not None: self.num_instance_constraints = len(instance_constraints) self.num_constraints += self.num_instance_constraints self._identify_functions_in_instance_constraints() self._find_closest_free_index() self.eval_instance_constraints = \ self._instance_constraints_func() self.eval_instance_constraints_jacobian_values = \ self._instance_constraints_jacobian_values_func() @property def integration_method(self): return self._integration_method @integration_method.setter def integration_method(self, method): """The method can be ``'backward euler'`` or ``'midpoint'``.""" if method not in ['backward euler', 'midpoint']: msg = ("{} is not a valid integration method.") raise ValueError(msg.format(method)) else: self._integration_method = method self._discrete_symbols() self._discretize_eom() @staticmethod def _parse_inputs(all_syms, known_syms): """Returns sets of symbols and their counts, based on if the known symbols exist in the set of all symbols. Parameters ---------- all_syms : sequence A set of SymPy symbols or functions. known_syms : sequence A set of SymPy symbols or functions. Returns ------- known : tuple The set of known symbols. num_known : integer The number of known symbols. unknown : tuple The set of unknown symbols in all_syms. num_unknown :integer The number of unknown symbols. """ all_syms = set(all_syms) known_syms = known_syms if not all_syms: # if empty sequence if known_syms: msg = '{} are not in the provided equations of motion.' raise ValueError(msg.format(known_syms)) else: known = tuple() num_known = 0 unknown = tuple() num_unknown = 0 else: if known_syms: known = tuple(known_syms) # don't sort known syms num_known = len(known) unknown = tuple(sort_sympy(all_syms.difference(known))) num_unknown = len(unknown) else: known = tuple() num_known = 0 unknown = tuple(sort_sympy(all_syms)) num_unknown = len(unknown) return known, num_known, unknown, num_unknown def _sort_parameters(self): """Finds and counts all of the parameters in the equations of motion and categorizes them based on which parameters the user supplies. The unknown parameters are sorted by name.""" parameters = self.eom.free_symbols.copy() parameters.remove(self.time_symbol) res = self._parse_inputs(parameters, self.known_parameter_map.keys()) self.known_parameters = res[0] self.num_known_parameters = res[1] self.unknown_parameters = res[2] self.num_unknown_parameters = res[3] self.parameters = res[0] + res[2] self.num_parameters = len(self.parameters) def _check_known_trajectories(self): """Raises and error if the known tracjectories are not the correct length.""" N = self.num_collocation_nodes for k, v in self.known_trajectory_map.items(): if len(v) != N: msg = 'The known parameter {} is not length {}' raise ValueError(msg.format(k, N)) def _sort_trajectories(self): """Finds and counts all of the non-state, time varying parameters in the equations of motion and categorizes them based on which parameters the user supplies. The unknown parameters are sorted by name.""" states = set(self.state_symbols) states_derivatives = set(self.state_derivative_symbols) time_varying_symbols = me.find_dynamicsymbols(self.eom) state_related = states.union(states_derivatives) non_states = time_varying_symbols.difference(state_related) res = self._parse_inputs(non_states, self.known_trajectory_map.keys()) self.known_input_trajectories = res[0] self.num_known_input_trajectories = res[1] self.unknown_input_trajectories = res[2] self.num_unknown_input_trajectories = res[3] self.input_trajectories = res[0] + res[2] self.num_input_trajectories = len(self.input_trajectories) def _discrete_symbols(self): """Instantiates discrete symbols for each time varying variable in the euqations of motion. Instantiates ------------ previous_discrete_state_symbols : tuple of sympy.Symbols The n symbols representing the system's (ith - 1) states. current_discrete_state_symbols : tuple of sympy.Symbols The n symbols representing the system's ith states. next_discrete_state_symbols : tuple of sympy.Symbols The n symbols representing the system's (ith + 1) states. current_known_discrete_specified_symbols : tuple of sympy.Symbols The symbols representing the system's ith known input trajectories. next_known_discrete_specified_symbols : tuple of sympy.Symbols The symbols representing the system's (ith + 1) known input trajectories. current_unknown_discrete_specified_symbols : tuple of sympy.Symbols The symbols representing the system's ith unknown input trajectories. next_unknown_discrete_specified_symbols : tuple of sympy.Symbols The symbols representing the system's (ith + 1) unknown input trajectories. current_discrete_specified_symbols : tuple of sympy.Symbols The m symbols representing the system's ith specified inputs. next_discrete_specified_symbols : tuple of sympy.Symbols The m symbols representing the system's (ith + 1) specified inputs. """ # The previus, current, and next states. self.previous_discrete_state_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'p', real=True) for f in self.state_symbols]) self.current_discrete_state_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'i', real=True) for f in self.state_symbols]) self.next_discrete_state_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'n', real=True) for f in self.state_symbols]) # The current and next known input trajectories. self.current_known_discrete_specified_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'i', real=True) for f in self.known_input_trajectories]) self.next_known_discrete_specified_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'n', real=True) for f in self.known_input_trajectories]) # The current and next unknown input trajectories. self.current_unknown_discrete_specified_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'i', real=True) for f in self.unknown_input_trajectories]) self.next_unknown_discrete_specified_symbols = \ tuple([sm.Symbol(f.__class__.__name__ + 'n', real=True) for f in self.unknown_input_trajectories]) self.current_discrete_specified_symbols = \ (self.current_known_discrete_specified_symbols + self.current_unknown_discrete_specified_symbols) self.next_discrete_specified_symbols = \ (self.next_known_discrete_specified_symbols + self.next_unknown_discrete_specified_symbols) def _discretize_eom(self): """Instantiates the constraint equations in a discretized form using backward Euler discretization. Instantiates ------------ discrete_eoms : sympy.Matrix, shape(n, 1) The column vector of the discretized equations of motion. """ logging.info('Discretizing the equations of motion.') x = self.state_symbols xd = self.state_derivative_symbols u = self.input_trajectories xp = self.previous_discrete_state_symbols xi = self.current_discrete_state_symbols xn = self.next_discrete_state_symbols ui = self.current_discrete_specified_symbols un = self.next_discrete_specified_symbols h = self.time_interval_symbol if self.integration_method == 'backward euler': deriv_sub = {d: (i - p) / h for d, i, p in zip(xd, xi, xp)} func_sub = dict(zip(x + u, xi + ui)) self.discrete_eom = me.msubs(self.eom, deriv_sub, func_sub) elif self.integration_method == 'midpoint': xdot_sub = {d: (n - i) / h for d, i, n in zip(xd, xi, xn)} x_sub = {d: (i + n) / 2 for d, i, n in zip(x, xi, xn)} u_sub = {d: (i + n) / 2 for d, i, n in zip(u, ui, un)} self.discrete_eom = me.msubs(self.eom, xdot_sub, x_sub, u_sub) def _identify_functions_in_instance_constraints(self): """Instantiates a set containing all of the instance functions, i.e. x(1.0) in the instance constraints.""" all_funcs = set() for con in self.instance_constraints: all_funcs = all_funcs.union(con.atoms(sm.Function)) self.instance_constraint_function_atoms = all_funcs def _find_closest_free_index(self): """Instantiates a dictionary mapping the instance functions to the nearest index in the free variables vector.""" def determine_free_index(time_index, state): state_index = self.state_symbols.index(state) return time_index + state_index * self.num_collocation_nodes N = self.num_collocation_nodes h = self.node_time_interval duration = h * (N - 1) time_vector = np.linspace(0.0, duration, num=N) node_map = {} for func in self.instance_constraint_function_atoms: time_value = func.args[0] time_index = np.argmin(np.abs(time_vector - time_value)) free_index = determine_free_index(time_index, func.__class__(self.time_symbol)) node_map[func] = free_index self.instance_constraints_free_index_map = node_map def _instance_constraints_func(self): """Returns a function that evaluates the instance constraints given the free optimization variables.""" free = sm.DeferredVector('FREE') def_map = {k: free[v] for k, v in self.instance_constraints_free_index_map.items()} subbed_constraints = [con.subs(def_map) for con in self.instance_constraints] f = sm.lambdify(([free] + list(self.known_parameter_map.keys())), subbed_constraints, modules=[{'ImmutableMatrix': np.array}, "numpy"]) return lambda free: f(free, *self.known_parameter_map.values()) def _instance_constraints_jacobian_indices(self): """Returns the row and column indices of the non-zero values in the Jacobian of the constraints.""" idx_map = self.instance_constraints_free_index_map num_eom_constraints = self.num_states*(self.num_collocation_nodes - 1) rows = [] cols = [] for i, con in enumerate(self.instance_constraints): funcs = con.atoms(sm.Function) indices = [idx_map[f] for f in funcs] row_idxs = num_eom_constraints + i * np.ones(len(indices), dtype=int) rows += list(row_idxs) cols += indices return np.array(rows), np.array(cols) def _instance_constraints_jacobian_values_func(self): """Returns the non-zero values of the constraint Jacobian associated with the instance constraints.""" free = sm.DeferredVector('FREE') def_map = {k: free[v] for k, v in self.instance_constraints_free_index_map.items()} funcs = [] num_vals_per_func = [] for con in self.instance_constraints: partials = list(con.atoms(sm.Function)) num_vals_per_func.append(len(partials)) jac = sm.Matrix([con]).jacobian(partials) jac = jac.subs(def_map) funcs.append(sm.lambdify(([free] + list(self.known_parameter_map.keys())), jac, modules=[{'ImmutableMatrix': np.array}, "numpy"])) l = np.sum(num_vals_per_func) def wrapped(free): arr = np.zeros(l) j = 0 for i, (f, num) in enumerate(zip(funcs, num_vals_per_func)): arr[j:j + num] = f(free, *self.known_parameter_map.values()) j += num return arr return wrapped def _gen_multi_arg_con_func(self): """Instantiates a function that evaluates the constraints given all of the arguments of the functions, i.e. not just the free optimization variables. Instantiates ------------ _multi_arg_con_func : function A function which returns the numerical values of the constraints at collocation nodes 2,...,N. Notes ----- args: all current states (x1i, ..., xni) all previous states (x1p, ... xnp) all current specifieds (s1i, ..., smi) parameters (c1, ..., cb) time interval (h) args: (x1i, ..., xni, x1p, ... xnp, s1i, ..., smi, c1, ..., cb, h) n: num states m: num specified b: num parameters The function should evaluate and return an array: [con_1_2, ..., con_1_N, con_2_2, ..., con_2_N, ..., con_n_2, ..., con_n_N] for n states and N-1 constraints at the time points. """ xi_syms = self.current_discrete_state_symbols xp_syms = self.previous_discrete_state_symbols xn_syms = self.next_discrete_state_symbols si_syms = self.current_discrete_specified_symbols sn_syms = self.next_discrete_specified_symbols h_sym = self.time_interval_symbol constant_syms = self.known_parameters + self.unknown_parameters if self.integration_method == 'backward euler': args = [x for x in xi_syms] + [x for x in xp_syms] args += [s for s in si_syms] + list(constant_syms) + [h_sym] current_start = 1 current_stop = None adjacent_start = None adjacent_stop = -1 elif self.integration_method == 'midpoint': args = [x for x in xi_syms] + [x for x in xn_syms] args += [s for s in si_syms] + [s for s in sn_syms] args += list(constant_syms) + [h_sym] current_start = None current_stop = -1 adjacent_start = 1 adjacent_stop = None logging.info('Compiling the constraint function.') f = ufuncify_matrix(args, self.discrete_eom, const=constant_syms + (h_sym,), tmp_dir=self.tmp_dir, parallel=self.parallel) def constraints(state_values, specified_values, constant_values, interval_value): """Returns a vector of constraint values given all of the unknowns in the equations of motion over the 2, ..., N time steps. Parameters ---------- states : ndarray, shape(n, N) The array of n states through N time steps. specified_values : ndarray, shape(m, N) or shape(N,) The array of m specifieds through N time steps. constant_values : ndarray, shape(b,) The array of b parameters. interval_value : float The value of the discretization time interval. Returns ------- constraints : ndarray, shape(N-1,) The array of constraints from t = 2, ..., N. [con_1_2, ..., con_1_N, con_2_2, ..., con_2_N, ..., con_n_2, ..., con_n_N] """ if state_values.shape[0] < 2: raise ValueError('There should always be at least two states.') assert state_values.shape == (self.num_states, self.num_collocation_nodes) x_current = state_values[:, current_start:current_stop] # n x N - 1 x_adjacent = state_values[:, adjacent_start:adjacent_stop] # n x N - 1 # 2n x N - 1 args = [x for x in x_current] + [x for x in x_adjacent] # 2n + m x N - 1 if len(specified_values.shape) == 2: assert specified_values.shape == \ (self.num_input_trajectories, self.num_collocation_nodes) si = specified_values[:, current_start:current_stop] args += [s for s in si] if self.integration_method == 'midpoint': sn = specified_values[:, adjacent_start:adjacent_stop] args += [s for s in sn] elif len(specified_values.shape) == 1 and specified_values.size != 0: assert specified_values.shape == \ (self.num_collocation_nodes,) si = specified_values[current_start:current_stop] args += [si] if self.integration_method == 'midpoint': sn = specified_values[adjacent_start:adjacent_stop] args += [sn] args += [c for c in constant_values] args += [interval_value] num_constraints = state_values.shape[1] - 1 # TODO : Move this to an attribute of the class so that it is # only initialized once and just reuse it on each evaluation of # this function. result = np.empty((num_constraints, state_values.shape[0])) return f(result, *args).T.flatten() self._multi_arg_con_func = constraints
[docs] def jacobian_indices(self): """Returns the row and column indices for the non-zero values in the constraint Jacobian. Returns ------- jac_row_idxs : ndarray, shape(2*n + q + r,) The row indices for the non-zero values in the Jacobian. jac_col_idxs : ndarray, shape(n,) The column indices for the non-zero values in the Jacobian. """ N = self.num_collocation_nodes n = self.num_states num_constraint_nodes = N - 1 if self.integration_method == 'backward euler': num_partials = n * (2 * n + self.num_unknown_input_trajectories + self.num_unknown_parameters) elif self.integration_method == 'midpoint': num_partials = n * (2 * n + 2 * self.num_unknown_input_trajectories + self.num_unknown_parameters) num_non_zero_values = num_constraint_nodes * num_partials if self.instance_constraints is not None: ins_row_idxs, ins_col_idxs = \ self._instance_constraints_jacobian_indices() num_non_zero_values += len(ins_row_idxs) jac_row_idxs = np.empty(num_non_zero_values, dtype=int) jac_col_idxs = np.empty(num_non_zero_values, dtype=int) """ The symbolic derivative matrix for a single constraint node follows these patterns: Backward Euler -------------- i: ith, p: ith-1 For example: x1i = the first state at the ith constraint node uqi = the qth state at the ith constraint node uqn = the qth state at the ith+1 constraint node [x1] [x1i, ..., xni, x1p, ..., xnp, u1i, .., uqi, p1, ..., pr] [. ] [. ] [. ] [xn] Midpoint -------- i: ith, n: ith+1 [x1] [x1i, ..., xni, x1n, ..., xnn, u1i, .., uqi, u1n, ..., uqn, p1, ..., pp] [. ] [. ] [. ] [xn] Each of these matrices are evaulated at N-1 constraint nodes and then the 3D matrix is flattened into a 1d array. The backward euler uses nodes 1 <= i <= N-1 and the midpoint uses 0 <= i <= N - 2. So the flattened arrays looks like: M = N-1 P = N-2 Backward Euler -------------- i=1 x1 | [x11, ..., xn1, x10, ..., xn0, u11, .., uq1, p1, ..., pr, x2 | x11, ..., xn1, x10, ..., xn0, u11, .., uq1, p1, ..., pr, ... | ..., xn | x11, ..., xn1, x10, ..., xn0, u11, .., uq1, p1, ..., pr, i=2 x1 | x12, ..., xn2, x11, ..., xn1, u12, .., uq2, p1, ..., pr, x2 | x12, ..., xn2, x11, ..., xn1, u12, .., uq2, p1, ..., pr, ... | ..., xn | x12, ..., xn2, x11, ..., xn1, u12, .., uq2, p1, ..., pr, | ..., i=M x1 | x1M, ..., xnM, x1P, ..., xnP, u1M, .., uqM, p1, ..., pr, x2 | x1M, ..., xnM, x1P, ..., xnP, u1M, .., uqM, p1, ..., pr, ... | ..., xn | x1M, ..., xnM, x1P, ..., xnP, u1M, .., uqM, p1, ..., pr] Midpoint -------- i=0 x1 | [x10, ..., xn0, x11, ..., xn1, u10, .., uq0, u11, .., uq1, p1, ..., pr, x2 | x10, ..., xn0, x11, ..., xn1, u10, .., uq0, u11, .., uq1, p1, ..., pr, ... | ..., xn | x10, ..., xn0, x11, ..., xn1, u10, .., uq0, u11, .., uq1, p1, ..., pr, i=1 x1 | x11, ..., xn1, x12, ..., xn2, u11, .., uq1, u12, .., uq2, p1, ..., pr, x2 | x11, ..., xn1, x12, ..., xn2, u11, .., uq1, u12, .., uq2, p1, ..., pr, ... | ..., xn | x11, ..., xn1, x12, ..., xn2, u11, .., uq1, u12, .., uq2, p1, ..., pr, ... | ..., i=P x1 | x1P, ..., xnP, x1M, ..., xnM, u1P, .., uqP, u1M, .., uqM, p1, ..., pr, x2 | x1P, ..., xnP, x1M, ..., xnM, u1P, .., uqP, u1M, .., uqM, p1, ..., pr, ... | ..., xn | x1P, ..., xnP, x1M, ..., xnM, u1P, .., uqP, u1M, .., uqM, p1, ..., pr] These two arrays contain of the non-zero values of the sparse Jacobian[#]_. .. [#] Some of the partials can be equal to zero and could be excluded from the array. These could be a significant number. Now we need to generate the triplet format indices of the full sparse Jacobian for each one of the entries in these arrays. The format of the Jacobian matrix is: Backward Euler -------------- [x10, ..., x1N-1, ..., xn0, ..., xnN-1, u10, ..., u1N-1, ..., uq0, ..., uqN-1, p1, ..., pr] [x11] [x12] [...] [x1M] [...] [xn1] [xn2] [...] [xnM] Midpoint -------- [x10, ..., x1N-1, ..., xn0, ..., xnN-1, u10, ..., u1N-1, ..., uq0, ..., uqN-1, p1, ..., pr] [x10] [x11] [...] [x1P] [...] [xn0] [xn1] [...] [xnP] """ for i in range(num_constraint_nodes): # n : number of states # m : number of input trajectories # p : number of parameters # q : number of unknown input trajectories # r : number of unknown parameters # the states repeat every N - 1 constraints # row_idxs = [0 * (N - 1), 1 * (N - 1), 2 * (N - 1), ..., n * (N - 1)] # This gives the Jacobian row indices matching the ith # constraint node for each state. ith corresponds to the loop # indice. row_idxs = [j * (num_constraint_nodes) + i for j in range(n)] # first row, the columns indices mapping is: # [1, N + 1, ..., N - 1] : [x1p, x1i, 0, ..., 0] # [0, N, ..., 2 * (N - 1)] : [x2p, x2i, 0, ..., 0] # [-p:] : p1,..., pp the free constants # i=0: [1, ..., n * N + 1, 0, ..., n * N + 0, n * N:n * N + p] # i=1: [2, ..., n * N + 2, 1, ..., n * N + 1, n * N:n * N + p] # i=2: [3, ..., n * N + 3, 2, ..., n * N + 2, n * N:n * N + p] if self.integration_method == 'backward euler': col_idxs = [j * N + i + 1 for j in range(n)] col_idxs += [j * N + i for j in range(n)] col_idxs += [n * N + j * N + i + 1 for j in range(self.num_unknown_input_trajectories)] col_idxs += [(n + self.num_unknown_input_trajectories) * N + j for j in range(self.num_unknown_parameters)] elif self.integration_method == 'midpoint': col_idxs = [j * N + i for j in range(n)] col_idxs += [j * N + i + 1 for j in range(n)] col_idxs += [n * N + j * N + i for j in range(self.num_unknown_input_trajectories)] col_idxs += [n * N + j * N + i + 1 for j in range(self.num_unknown_input_trajectories)] col_idxs += [(n + self.num_unknown_input_trajectories) * N + j for j in range(self.num_unknown_parameters)] row_idx_permutations = np.repeat(row_idxs, len(col_idxs)) col_idx_permutations = np.array(list(col_idxs) * len(row_idxs), dtype=int) start = i * num_partials stop = (i + 1) * num_partials jac_row_idxs[start:stop] = row_idx_permutations jac_col_idxs[start:stop] = col_idx_permutations if self.instance_constraints is not None: jac_row_idxs[-len(ins_row_idxs):] = ins_row_idxs jac_col_idxs[-len(ins_col_idxs):] = ins_col_idxs return jac_row_idxs, jac_col_idxs
def _gen_multi_arg_con_jac_func(self): """Instantiates a function that evaluates the Jacobian of the constraints. Instantiates ------------ _multi_arg_con_jac_func : function A function which returns the numerical values of the constraints at time points 2,...,N. """ xi_syms = self.current_discrete_state_symbols xp_syms = self.previous_discrete_state_symbols xn_syms = self.next_discrete_state_symbols si_syms = self.current_discrete_specified_symbols sn_syms = self.next_discrete_specified_symbols ui_syms = self.current_unknown_discrete_specified_symbols un_syms = self.next_unknown_discrete_specified_symbols h_sym = self.time_interval_symbol constant_syms = self.known_parameters + self.unknown_parameters if self.integration_method == 'backward euler': # The free parameters are always the n * (N - 1) state values, # the unknown input trajectories, and the unknown model # constants, so the base Jacobian needs to be taken with respect # to the ith, and ith - 1 states, and the free model constants. wrt = (xi_syms + xp_syms + ui_syms + self.unknown_parameters) # The arguments to the Jacobian function include all of the free # Symbols/Functions in the matrix expression. args = xi_syms + xp_syms + si_syms + constant_syms + (h_sym,) current_start = 1 current_stop = None adjacent_start = None adjacent_stop = -1 elif self.integration_method == 'midpoint': wrt = (xi_syms + xn_syms + ui_syms + un_syms + self.unknown_parameters) # The arguments to the Jacobian function include all of the free # Symbols/Functions in the matrix expression. args = (xi_syms + xn_syms + si_syms + sn_syms + constant_syms + (h_sym,)) current_start = None current_stop = -1 adjacent_start = 1 adjacent_stop = None # This creates a matrix with all of the symbolic partial derivatives # necessary to compute the full Jacobian. logging.info('Differentiating the constraint function.') discrete_eom_matrix = sm.ImmutableDenseMatrix(self.discrete_eom) wrt_matrix = sm.ImmutableDenseMatrix([list(wrt)]) symbolic_partials = _forward_jacobian(discrete_eom_matrix, wrt_matrix.T) # This generates a numerical function that evaluates the matrix of # partial derivatives. This function returns the non-zero elements # needed to build the sparse constraint Jacobian. logging.info('Compiling the Jacobian function.') eval_partials = ufuncify_matrix(args, symbolic_partials, const=constant_syms + (h_sym,), tmp_dir=self.tmp_dir, parallel=self.parallel) if isinstance(symbolic_partials, tuple) and len(symbolic_partials) == 2: num_rows = symbolic_partials[1][0].shape[0] num_cols = symbolic_partials[1][0].shape[1] else: num_rows = symbolic_partials.shape[0] num_cols = symbolic_partials.shape[1] result = np.empty((self.num_collocation_nodes - 1, num_rows * num_cols)) def constraints_jacobian(state_values, specified_values, parameter_values, interval_value): """Returns the values of the sparse constraing Jacobian matrix given all of the values for each variable in the equations of motion over the N - 1 nodes. Parameters ---------- states : ndarray, shape(n, N) The array of n states through N time steps. There are always at least two states. specified_values : ndarray, shape(m, N) or shape(N,) The array of m specified inputs through N time steps. parameter_values : ndarray, shape(p,) The array of p parameter. interval_value : float The value of the discretization time interval. Returns ------- constraint_jacobian_values : ndarray, shape(see below,) backward euler: shape((N - 1) * n * (2*n + q + r),) midpoint: shape((N - 1) * n * (2*n + 2*q + r),) The values of the non-zero entries of the constraints Jacobian. These correspond to the triplet formatted indices returned from jacobian_indices. Notes ----- - N : number of collocation nodes - n : number of states - m : number of input trajectories - p : number of parameters - q : number of unknown input trajectories - r : number of unknown parameters - n*(N - 1) : number of constraints """ if state_values.shape[0] < 2: raise ValueError('There should always be at least two states.') # Each of these arrays are shape(n, N - 1). The x_adjacent is # either the previous value of the state or the next value of # the state, depending on the integration method. x_current = state_values[:, current_start:current_stop] x_adjacent = state_values[:, adjacent_start:adjacent_stop] # 2n x N - 1 args = [x for x in x_current] + [x for x in x_adjacent] # 2n + m x N - 1 if len(specified_values.shape) == 2: si = specified_values[:, current_start:current_stop] args += [s for s in si] if self.integration_method == 'midpoint': sn = specified_values[:, adjacent_start:adjacent_stop] args += [s for s in sn] elif len(specified_values.shape) == 1 and specified_values.size != 0: si = specified_values[current_start:current_stop] args += [si] if self.integration_method == 'midpoint': sn = specified_values[adjacent_start:adjacent_stop] args += [sn] args += [c for c in parameter_values] args += [interval_value] # backward euler: shape(N - 1, n, 2*n + q + r) # midpoint: shape(N - 1, n, 2*n + 2*q + r) non_zero_derivatives = eval_partials(result, *args) return non_zero_derivatives.ravel() self._multi_arg_con_jac_func = constraints_jacobian @staticmethod def _merge_fixed_free(syms, fixed, free, typ): """Returns an array with the fixed and free values combined. This just takes the known and unknown values and combines them for the function evaluation. This assumes that you have the free constants in the correct order. Parameters ---------- syms : iterable of SymPy Symbols or Functions fixed : dictionary A mapping from Symbols to floats or Functions to 1d ndarrays. free : ndarray, (N,) or shape(n,N) An array typ : string traj or par """ merged = [] n = 0 # syms is order as known (fixed) then unknown (free) for i, s in enumerate(syms): if s in fixed.keys(): merged.append(fixed[s]) else: if typ == 'traj' and len(free.shape) == 1: merged.append(free) else: merged.append(free[n]) n += 1 return np.array(merged) def _wrap_constraint_funcs(self, func, typ): """Returns a function that evaluates all of the constraints or Jacobian of the constraints given the free optimization variables. Parameters ---------- func : function A function that takes the full parameter set and evaluates the constraint functions or the Jacobian of the contraint functions. i.e. the output of _gen_multi_arg_con_func or _gen_multi_arg_con_jac_func. Returns ------- func : function A function which returns constraint values given the system's free optimization variables. """ def constraints(free): free_states, free_specified, free_constants = \ parse_free(free, self.num_states, self.num_unknown_input_trajectories, self.num_collocation_nodes) all_specified = self._merge_fixed_free(self.input_trajectories, self.known_trajectory_map, free_specified, 'traj') all_constants = self._merge_fixed_free(self.parameters, self.known_parameter_map, free_constants, 'par') eom_con_vals = func(free_states, all_specified, all_constants, self.node_time_interval) if self.instance_constraints is not None: if typ == 'con': ins_con_vals = self.eval_instance_constraints(free) elif typ == 'jac': ins_con_vals = \ self.eval_instance_constraints_jacobian_values(free) return np.hstack((eom_con_vals, ins_con_vals)) else: return eom_con_vals intro, second = func.__doc__.split('Parameters') params, returns = second.split('Returns') new_doc = '{}Parameters\n----------\nfree : ndarray, shape()\n\nReturns\n{}' constraints.__doc__ = new_doc.format(intro, returns) return constraints
[docs] def generate_constraint_function(self): """Returns a function which evaluates the constraints given the array of free optimization variables.""" logging.info('Generating constraint function.') self._gen_multi_arg_con_func() return self._wrap_constraint_funcs(self._multi_arg_con_func, 'con')
[docs] def generate_jacobian_function(self): """Returns a function which evaluates the Jacobian of the constraints given the array of free optimization variables.""" logging.info('Generating jacobian function.') self._gen_multi_arg_con_jac_func() return self._wrap_constraint_funcs(self._multi_arg_con_jac_func, 'jac')