#!/usr/bin/env python
import os
import sys
import shutil
import tempfile
import subprocess
import importlib
from functools import wraps
import warnings
from distutils.ccompiler import new_compiler
from distutils.errors import CompileError
from distutils.sysconfig import customize_compiler
from collections import Counter
from timeit import default_timer as timer
import logging
import numpy as np
import sympy as sm
import sympy.physics.mechanics as me
from sympy.utilities.iterables import numbered_symbols
try:
plt = sm.external.import_module('matplotlib.pyplot',
__import__kwargs={'fromlist': ['']},
catch=(RuntimeError,))
except TypeError: # SymPy >=1.6
plt = sm.external.import_module('matplotlib.pyplot',
import_kwargs={'fromlist': ['']},
catch=(RuntimeError,))
def _forward_jacobian(expr, wrt):
def add_to_cache(node):
if node in expr_to_replacement_cache:
replacement_symbol = expr_to_replacement_cache[node]
return replacement_symbol, replacement_to_reduced_expr_cache[replacement_symbol]
elif node in replacement_to_reduced_expr_cache:
return node, replacement_to_reduced_expr_cache[node]
elif isinstance(node, sm.Tuple):
return None, None
elif not node.free_symbols:
return node, node
replacement_symbol = replacement_symbols.__next__()
replaced_subexpr = node.xreplace(expr_to_replacement_cache)
replacement_to_reduced_expr_cache[replacement_symbol] = replaced_subexpr
expr_to_replacement_cache[node] = replacement_symbol
return replacement_symbol, replaced_subexpr
if not isinstance(expr, sm.ImmutableDenseMatrix):
msg = (
'The forward Jacobian differentiation algorithm can only be used '
'to differentiate a single matrix expression at a time.'
)
raise NotImplementedError(msg)
elif expr.shape[1] != 1:
msg = 'Can only compute the Jacobian for column matrices.'
raise NotImplementedError(msg)
elif not isinstance(wrt, sm.ImmutableDenseMatrix) or wrt.shape[1] != 1:
msg = (
'The forward Jacobian differentiation algorithm can compute '
'Jacobians with respect to column matrices.'
)
raise NotImplementedError
replacement_symbols = numbered_symbols(
prefix='_z',
cls=sm.Symbol,
exclude=expr.free_symbols,
)
expr_to_replacement_cache = {}
replacement_to_reduced_expr_cache = {}
logging.info('Adding expression nodes to cache...')
start = timer()
replacements, reduced_exprs = sm.cse(expr.args[2], replacement_symbols)
for replacement_symbol, reduced_subexpr in replacements:
replaced_subexpr = reduced_subexpr.xreplace(expr_to_replacement_cache)
replacement_to_reduced_expr_cache[replacement_symbol] = replaced_subexpr
expr_to_replacement_cache[reduced_subexpr] = replacement_symbol
for node in sm.postorder_traversal(reduced_subexpr):
_ = add_to_cache(node)
for reduced_expr in reduced_exprs:
for node in reduced_expr:
_ = add_to_cache(node)
finish = timer()
logging.info(f'Completed in {finish - start:.2f}s')
reduced_matrix = sm.ImmutableDenseMatrix(reduced_exprs).xreplace(expr_to_replacement_cache)
replacements = list(replacement_to_reduced_expr_cache.items())
partial_derivative_mapping = {}
absolute_derivative_mapping = {}
for i, wrt_symbol in enumerate(wrt.args[2]):
absolute_derivative = [sm.S.Zero] * len(wrt)
absolute_derivative[i] = sm.S.One
absolute_derivative_mapping[wrt_symbol] = sm.ImmutableDenseMatrix([absolute_derivative])
logging.info('Differentiating expression nodes...')
start = timer()
zeros = sm.ImmutableDenseMatrix.zeros(1, len(wrt))
for symbol, subexpr in replacements:
free_symbols = subexpr.free_symbols
absolute_derivative = zeros
for free_symbol in free_symbols:
replacement_symbol, partial_derivative = add_to_cache(subexpr.diff(free_symbol))
absolute_derivative += partial_derivative * absolute_derivative_mapping.get(free_symbol, zeros)
absolute_derivative_mapping[symbol] = sm.ImmutableDenseMatrix([[add_to_cache(a)[0] for a in absolute_derivative]])
replaced_jacobian = sm.ImmutableDenseMatrix.vstack(*[absolute_derivative_mapping[e] for e in reduced_matrix])
finish = timer()
logging.info(f'Completed in {finish - start:.2f}s')
logging.info('Determining required replacements...')
start = timer()
required_replacement_symbols = set()
stack = [entry for entry in replaced_jacobian if entry.free_symbols]
while stack:
entry = stack.pop()
if entry in required_replacement_symbols or entry in wrt:
continue
children = list(replacement_to_reduced_expr_cache.get(entry, entry).free_symbols)
for child in children:
if child not in required_replacement_symbols and child not in wrt:
stack.append(child)
required_replacement_symbols.add(entry)
finish = timer()
logging.info(f'Completed in {finish - start:.2f}s')
required_replacements_dense = {
replacement_symbol: replaced_subexpr
for replacement_symbol, replaced_subexpr in replacement_to_reduced_expr_cache.items()
if replacement_symbol in required_replacement_symbols
}
counter = Counter(replaced_jacobian.free_symbols)
for replaced_subexpr in required_replacements_dense.values():
counter.update(replaced_subexpr.free_symbols)
logging.info('Substituting required replacements...')
required_replacements = {}
unrequired_replacements = {}
for replacement_symbol, replaced_subexpr in required_replacements_dense.items():
if isinstance(replaced_subexpr, sm.Symbol) or counter[replacement_symbol] == 1:
unrequired_replacements[replacement_symbol] = replaced_subexpr.xreplace(unrequired_replacements)
else:
required_replacements[replacement_symbol] = replaced_subexpr.xreplace(unrequired_replacements)
finish = timer()
logging.info(f'Completed in {finish - start:.2f}s')
return (list(required_replacements.items()), [replaced_jacobian.xreplace(unrequired_replacements)])
def building_docs():
if 'READTHEDOCS' in os.environ:
return True
elif 'SPHINX' in os.environ:
return True
else:
return False
def _optional_plt_dep(func):
"""Decorator that aborts function/method call if matplotlib is not
installed."""
@wraps(func)
def wrapper(*args, **kwargs):
if plt is None:
raise ImportError('Install matplotlib for plotting features.')
else:
func(*args, **kwargs)
return wrapper
[docs]
def state_derivatives(states):
"""Returns functions of time which represent the time derivatives of the
states."""
return [state.diff() for state in states]
[docs]
def f_minus_ma(mass_matrix, forcing_vector, states):
"""Returns Fr + Fr* from the mass_matrix and forcing vector."""
xdot = sm.Matrix(state_derivatives(states))
return mass_matrix * xdot - forcing_vector
[docs]
def parse_free(free, n, q, N):
"""Parses the free parameters vector and returns it's components.
Parameters
----------
free : ndarray, shape(n*N + q*N + r)
The free parameters of the system.
n : integer
The number of states.
q : integer
The number of free specified inputs.
N : integer
The number of time steps.
Returns
-------
states : ndarray, shape(n, N)
The array of n states through N time steps.
specified_values : ndarray, shape(q, N) or shape(N,), or None
The array of q specified inputs through N time steps.
constant_values : ndarray, shape(r,)
The array of r constants.
"""
len_states = n * N
len_specified = q * N
free_states = free[:len_states].reshape((n, N))
if q == 0:
free_specified = None
else:
free_specified = free[len_states:len_states + len_specified]
if q > 1:
free_specified = free_specified.reshape((q, N))
free_constants = free[len_states + len_specified:]
return free_states, free_specified, free_constants
[docs]
def create_objective_function(
objective, state_symbols, input_symbols, unknown_symbols,
N, node_time_interval=1.0, integration_method="backward euler"):
"""Creates function to evaluate the objective and objective gradient.
Parameters
----------
objective : sympy.Expr
The objective function to be minimized. It should solely depend on the
states, inputs, and unknown symbols. Any known symbols should be
substituted beforehand. Additionally, the objective function can contain
non-nested indefinite integrals of time, e.g. ``Integral(f(t)**2, t)``.
state_symbols : iterable of symbols
The state variables.
input_symbols : iterable of symbols
The input variables.
unknown_symbols : iterable of symbols
The unknown parameters.
N : int
Number of collocation nodes, i.e. the number of time steps.
node_time_interval : float
The value of the time interval. The default is 1.0, as this term only
appears in the objective function as a scaling factor.
integration_method : str, optional
The method used to integrate the system. The default is "backward
euler".
"""
def lambdify_function(expr, multiplication_array, take_sum):
if take_sum:
def integration_function(x):
return node_time_interval * np.sum(x * multiplication_array)
else:
def integration_function(x):
return node_time_interval * x * multiplication_array
return sm.lambdify(
(states, inputs, params), expr,
modules=[{int_placeholder.name: integration_function}, "numpy"],
cse=True)
def parse_expr(expr, in_integral=False):
if not expr.args:
return expr
if isinstance(expr, sm.Integral):
if in_integral:
raise NotImplementedError("Nested integrals are not supported.")
if expr.limits != ((me.dynamicsymbols._t,),):
raise NotImplementedError(
"Only indefinite integrals of time are supported.")
return int_placeholder(parse_expr(expr.function, True))
return expr.func(*(parse_expr(arg) for arg in expr.args))
# Parse function arguments
states = sm.ImmutableMatrix(state_symbols)
inputs = sm.ImmutableMatrix(sort_sympy(input_symbols))
params = sm.ImmutableMatrix(sort_sympy(unknown_symbols))
if states.shape[1] > 1 or inputs.shape[1] > 1 or params.shape[1] > 1:
raise ValueError(
'The state, input, and unknown symbols must be column matrices.')
n, q = states.shape[0], inputs.shape[0]
i_idx, r_idx = n * N, (n + q) * N
# Compute analytical gradient of the objective function
objective_grad = sm.ImmutableMatrix([objective]).jacobian(
states[:] + inputs[:] + params[:])
# Replace the integral with a custom function
int_placeholder = sm.Function("_IntegralFunction")
objective = parse_expr(objective)
objective_grad = tuple(parse_expr(objective_grad))
# Replace zeros with an array of zeros, otherwise lambdify will return a
# scalar zero instead of an array of zeros.
objective_grad = tuple(
np.zeros(N) if grad == 0 else grad for grad in objective_grad[:n + q]
) + tuple(objective_grad[n + q:])
# Define evaluation functions based on the integration method
if integration_method == "backward euler":
obj_expr_eval = lambdify_function(
objective, np.hstack((0, np.ones(N - 1))), True)
obj_grad_time_expr_eval = lambdify_function(
objective_grad[:n + q], np.hstack((0, np.ones(N - 1))), False)
obj_grad_param_expr_eval = lambdify_function(
objective_grad[n + q:], np.hstack((0, np.ones(N - 1))), True)
def obj(free):
states = free[:i_idx].reshape((n, N))
inputs = free[i_idx:r_idx].reshape((q, N))
return obj_expr_eval(states, inputs, free[r_idx:])
def obj_grad(free):
states = free[:i_idx].reshape((n, N))
inputs = free[i_idx:r_idx].reshape((q, N))
return np.hstack((
*obj_grad_time_expr_eval(states, inputs, free[r_idx:]),
obj_grad_param_expr_eval(states, inputs, free[r_idx:])
))
elif integration_method == "midpoint":
obj_expr_eval = lambdify_function(
objective, np.ones(N - 1), True)
obj_grad_time_expr_eval = lambdify_function(
objective_grad[:n + q], np.hstack((0.5, np.ones(N - 2), 0.5)),
False)
obj_grad_param_expr_eval = lambdify_function(
objective_grad[n + q:], np.ones(N - 1), True)
def obj(free):
states = free[:i_idx].reshape((n, N))
states_mid = 0.5 * (states[:, :-1] + states[:, 1:])
inputs = free[i_idx:r_idx].reshape((q, N))
inputs_mid = 0.5 * (inputs[:, :-1] + inputs[:, 1:])
return obj_expr_eval(states_mid, inputs_mid, free[r_idx:])
def obj_grad(free):
states = free[:i_idx].reshape((n, N))
states_mid = 0.5 * (states[:, :-1] + states[:, 1:])
inputs = free[i_idx:r_idx].reshape((q, N))
inputs_mid = 0.5 * (inputs[:, :-1] + inputs[:, 1:])
return np.hstack((
*obj_grad_time_expr_eval(states, inputs, free[r_idx:]),
obj_grad_param_expr_eval(states_mid, inputs_mid, free[r_idx:])
))
else:
raise NotImplementedError(
f"Integration method '{integration_method}' is not implemented.")
return obj, obj_grad
[docs]
def sort_sympy(seq):
"""Returns a sorted list of the symbols."""
seq = list(seq)
try: # symbols
seq.sort(key=lambda x: x.name)
except AttributeError: # functions
seq.sort(key=lambda x: x.__class__.__name__)
return seq
_c_template = """\
#include <math.h>
#include "{file_prefix}_h.h"
void {routine_name}(double matrix[{matrix_output_size}], {input_args})
{{
{eval_code}
}}
"""
_h_template = """\
void {routine_name}(double matrix[{matrix_output_size}], {input_args});
"""
_cython_template = """\
import numpy as np
from cython.parallel import prange
cimport numpy as np
cimport cython
cdef extern from "{file_prefix}_h.h"{head_gil}:
void {routine_name}(double matrix[{matrix_output_size}], {input_args})
@cython.boundscheck(False)
@cython.wraparound(False)
def {routine_name}_loop(np.ndarray[np.double_t, ndim=2] matrix, {numpy_typed_input_args}):
cdef int n = matrix.shape[0]
cdef int i
for i in {loop_sig}:
{routine_name}(&matrix[i, 0], {indexed_input_args})
return matrix.reshape(n, {num_rows}, {num_cols})
"""
_setup_template = """\
import numpy
from distutils.core import setup
from distutils.extension import Extension
from Cython.Build import cythonize
extension = Extension(name="{file_prefix}",
sources=["{file_prefix}.pyx",
"{file_prefix}_c.c"],
extra_compile_args=[{compile_args}],
extra_link_args=[{link_args}],
include_dirs=[numpy.get_include()])
setup(name="{routine_name}",
ext_modules=cythonize([extension]))
"""
module_counter = 0
[docs]
def openmp_installed():
"""Returns true if openmp is installed, false if not.
Modified from:
https://stackoverflow.com/questions/16549893/programatically-testing-for-openmp-support-from-a-python-setup-script
"""
tmpdir = tempfile.mkdtemp(".opty_openmp_check")
curdir = os.getcwd()
os.chdir(tmpdir)
filename = r'test.c'
contents = r"""\
#include <omp.h>
#include <stdio.h>
int main(void) {
#pragma omp parallel
printf("Hello from thread %d, nthreads %d\n",
omp_get_thread_num(), omp_get_num_threads());
}"""
with open(filename, 'w') as f:
f.write(contents)
ccompiler = new_compiler()
customize_compiler(ccompiler)
try:
# .compile() should return ['test.o'] on linux
ccompiler.compile([filename], extra_postargs=['-fopenmp'])
exit = True
except CompileError:
exit = False
finally:
os.chdir(curdir)
# NOTE : I can't figure out how to get rmtree to work on Windows, so I
# don't delete the directory on Windows.
if sys.platform != "win32":
shutil.rmtree(tmpdir)
return exit
[docs]
def ufuncify_matrix(args, expr, const=None, tmp_dir=None, parallel=False):
"""Returns a function that evaluates a matrix of expressions in a tight
loop.
Parameters
----------
args : iterable of sympy.Symbol
A list of all symbols in expr in the desired order for the output
function.
expr : sympy.Matrix
A matrix of expressions.
const : tuple, optional
This should include any of the symbols in args that should be
constant with respect to the loop.
tmp_dir : string, optional
The path to a directory in which to store the generated files. If
None then the files will be not be retained after the function is
compiled.
parallel : boolean, optional
If True and openmp is installed, the generated code will be
parallelized across threads. This is only useful when expr are
extremely large.
"""
# TODO : This is my first ever global variable in Python. It'd probably
# be better if this was a class attribute of a Ufuncifier class. And I'm
# not sure if this current version counts sequentially.
global module_counter
if hasattr(expr, 'shape'):
num_rows = expr.shape[0]
num_cols = expr.shape[1]
else:
num_rows = expr[1][0].shape[0]
num_cols = expr[1][0].shape[1]
matrix_size = num_rows * num_cols
file_prefix_base = 'ufuncify_matrix'
file_prefix = '{}_{}'.format(file_prefix_base, module_counter)
if tmp_dir is None:
codedir = tempfile.mkdtemp(".opty_ufuncify_compile")
else:
codedir = os.path.abspath(tmp_dir)
if not os.path.exists(codedir):
os.makedirs(codedir)
taken = False
while not taken:
try:
open(os.path.join(codedir, file_prefix + '.pyx'), 'r')
except IOError:
taken = True
else:
file_prefix = '{}_{}'.format(file_prefix_base, module_counter)
module_counter += 1
d = {'routine_name': 'eval_matrix',
'file_prefix': file_prefix,
'matrix_output_size': matrix_size,
'num_rows': num_rows,
'num_cols': num_cols}
if parallel:
if openmp_installed():
openmp = True
else:
openmp = False
msg = ('openmp is not installed or not working properly, request '
'for parallel execution ignored.')
warnings.warn(msg)
if parallel and openmp:
d['loop_sig'] = "prange(n, nogil=True)"
d['head_gil'] = " nogil"
d['compile_args'] = "'-fopenmp'"
d['link_args'] = "'-fopenmp'"
else:
d['loop_sig'] = "range(n)"
d['head_gil'] = ""
d['compile_args'] = ""
d['link_args'] = ""
matrix_sym = sm.MatrixSymbol('matrix', num_rows, num_cols)
if isinstance(expr, tuple) and len(expr) == 2:
sub_exprs, simple_mat = expr
else:
sub_exprs, simple_mat = sm.cse(expr, sm.numbered_symbols('z_'))
sub_expr_code = '\n'.join(['double ' + sm.ccode(sub_expr[1], sub_expr[0])
for sub_expr in sub_exprs])
matrix_code = sm.ccode(simple_mat[0], matrix_sym)
d['eval_code'] = ' ' + '\n '.join((sub_expr_code + '\n' +
matrix_code).split('\n'))
c_indent = len('void {routine_name}('.format(**d))
c_arg_spacer = ',\n' + ' ' * c_indent
input_args = ['double {}'.format(sm.ccode(a)) for a in args]
d['input_args'] = c_arg_spacer.join(input_args)
cython_input_args = []
indexed_input_args = []
for a in args:
if const is not None and a in const:
typ = 'double'
idexy = '{}'
else:
typ = 'np.ndarray[np.double_t, ndim=1]'
idexy = '{}[i]'
cython_input_args.append('{} {}'.format(typ, sm.ccode(a)))
indexed_input_args.append(idexy.format(sm.ccode(a)))
cython_indent = len('def {routine_name}_loop('.format(**d))
cython_arg_spacer = ',\n' + ' ' * cython_indent
d['numpy_typed_input_args'] = cython_arg_spacer.join(cython_input_args)
d['indexed_input_args'] = ',\n'.join(indexed_input_args)
files = {}
files[d['file_prefix'] + '_c.c'] = _c_template.format(**d)
files[d['file_prefix'] + '_h.h'] = _h_template.format(**d)
files[d['file_prefix'] + '.pyx'] = _cython_template.format(**d)
files[d['file_prefix'] + '_setup.py'] = _setup_template.format(**d)
workingdir = os.getcwd()
os.chdir(codedir)
try:
sys.path.append(codedir)
for filename, code in files.items():
with open(filename, 'w') as f:
f.write(code)
cmd = [sys.executable, d['file_prefix'] + '_setup.py', 'build_ext',
'--inplace']
subprocess.call(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE)
cython_module = importlib.import_module(d['file_prefix'])
finally:
module_counter += 1
sys.path.remove(codedir)
os.chdir(workingdir)
if tmp_dir is None:
# NOTE : I can't figure out how to get rmtree to work on Windows,
# so I don't delete the directory on Windows.
if sys.platform != "win32":
shutil.rmtree(codedir)
return getattr(cython_module, d['routine_name'] + '_loop')
[docs]
def controllable(a, b):
"""Returns true if the system is controllable and false if not.
Parameters
----------
a : array_like, shape(n,n)
The state matrix.
b : array_like, shape(n,r)
The input matrix.
Returns
-------
controllable : boolean
"""
a = np.asmatrix(a)
b = np.asmatrix(b)
n = a.shape[0]
controllability_matrix = []
for i in range(n):
controllability_matrix.append(a ** i * b)
controllability_matrix = np.hstack(controllability_matrix)
return np.linalg.matrix_rank(controllability_matrix) == n
[docs]
def substitute_matrix(matrix, row_idxs, col_idxs, sub_matrix):
"""Returns the matrix with the values given by the row and column
indices with those in the sub-matrix.
Parameters
----------
matrix : ndarray, shape(n, m)
A matrix (i.e. 2D array).
row_idxs : array_like, shape(p<=n,)
The row indices which designate which entries should be replaced by
the sub matrix entries.
col_idxs : array_like, shape(q<=m,)
The column indices which designate which entries should be replaced
by the sub matrix entries.
sub_matrix : ndarray, shape(p, q)
A matrix of values to substitute into the specified rows and
columns.
Notes
-----
This makes a copy of the sub_matrix, so if it is large it may be slower
than a more optimal implementation.
Examples
--------
>>> a = np.zeros((3, 4))
>>> sub = np.arange(4).reshape((2, 2))
>>> substitute_matrix(a, [1, 2], [0, 2], sub)
array([[ 0., 0., 0., 0.],
[ 0., 0., 1., 0.],
[ 2., 0., 3., 0.]])
"""
assert sub_matrix.shape == (len(row_idxs), len(col_idxs))
row_idx_permutations = np.repeat(row_idxs, len(col_idxs))
col_idx_permutations = np.array(list(col_idxs) * len(row_idxs))
matrix[row_idx_permutations, col_idx_permutations] = sub_matrix.flatten()
return matrix
[docs]
def sum_of_sines(sigma, frequencies, time):
"""Returns a sum of sines centered at zero along with its first and
second derivatives.
Parameters
==========
sigma : float
The desired standard deviation of the series.
frequencies : iterable of floats
The frequencies of the sin curves to be included in the sum.
time : array_like, shape(n,)
The montonically increasing time vector.
Returns
=======
sines : ndarray, shape(n,)
A sum of sines.
sines_prime : ndarray, shape(n,)
The first derivative of the sum of sines.
sines_double_prime : ndarray, shape(n,)
The second derivative of the sum of sines.
"""
phases = 2.0 * np.pi * np.random.ranf(len(frequencies))
sines = np.zeros_like(time)
sines_prime = np.zeros_like(time)
sines_double_prime = np.zeros_like(time)
amplitude = sigma / 2.0
for w, p in zip(frequencies, phases):
sines += amplitude * np.sin(w * time + p)
sines_prime += amplitude * w * np.cos(w * time + p)
sines_double_prime -= amplitude * w**2 * np.sin(w * time + p)
return sines, sines_prime, sines_double_prime