Source code for opty.utils

#!/usr/bin/env python

import os
import sys
import shutil
import tempfile
import subprocess
import importlib
from functools import wraps
import warnings
from distutils.ccompiler import new_compiler
from distutils.errors import CompileError
from distutils.sysconfig import customize_compiler

import numpy as np
import sympy as sm
try:
    plt = sm.external.import_module('matplotlib.pyplot',
                                    __import__kwargs={'fromlist': ['']},
                                    catch=(RuntimeError,))
except TypeError:  # SymPy >=1.6
    plt = sm.external.import_module('matplotlib.pyplot',
                                    import_kwargs={'fromlist': ['']},
                                    catch=(RuntimeError,))


def building_docs():
    if 'READTHEDOCS' in os.environ:
        return True
    elif 'SPHINX' in os.environ:
        return True
    else:
        return False


def _optional_plt_dep(func):
    """Decorator that aborts function/method call if matplotlib is not
    installed."""
    @wraps(func)
    def wrapper(*args, **kwargs):
        if plt is None:
            raise ImportError('Install matplotlib for plotting features.')
        else:
            func(*args, **kwargs)
    return wrapper


[docs]def state_derivatives(states): """Returns functions of time which represent the time derivatives of the states.""" return [state.diff() for state in states]
[docs]def f_minus_ma(mass_matrix, forcing_vector, states): """Returns Fr + Fr* from the mass_matrix and forcing vector.""" xdot = sm.Matrix(state_derivatives(states)) return mass_matrix * xdot - forcing_vector
[docs]def parse_free(free, n, q, N): """Parses the free parameters vector and returns it's components. Parameters ---------- free : ndarray, shape(n*N + q*N + r) The free parameters of the system. n : integer The number of states. q : integer The number of free specified inputs. N : integer The number of time steps. Returns ------- states : ndarray, shape(n, N) The array of n states through N time steps. specified_values : ndarray, shape(r, N) or shape(N,), or None The array of r specified inputs through N time steps. constant_values : ndarray, shape(q,) The array of q constants. """ len_states = n * N len_specified = q * N free_states = free[:len_states].reshape((n, N)) if q == 0: free_specified = None else: free_specified = free[len_states:len_states + len_specified] if q > 1: free_specified = free_specified.reshape((q, N)) free_constants = free[len_states + len_specified:] return free_states, free_specified, free_constants
_c_template = """\ #include <math.h> #include "{file_prefix}_h.h" void {routine_name}(double matrix[{matrix_output_size}], {input_args}) {{ {eval_code} }} """ _h_template = """\ void {routine_name}(double matrix[{matrix_output_size}], {input_args}); """ _cython_template = """\ import numpy as np from cython.parallel import prange cimport numpy as np cimport cython cdef extern from "{file_prefix}_h.h"{head_gil}: void {routine_name}(double matrix[{matrix_output_size}], {input_args}) @cython.boundscheck(False) @cython.wraparound(False) def {routine_name}_loop(np.ndarray[np.double_t, ndim=2] matrix, {numpy_typed_input_args}): cdef int n = matrix.shape[0] cdef int i for i in {loop_sig}: {routine_name}(&matrix[i, 0], {indexed_input_args}) return matrix.reshape(n, {num_rows}, {num_cols}) """ _setup_template = """\ import numpy from distutils.core import setup from distutils.extension import Extension from Cython.Build import cythonize extension = Extension(name="{file_prefix}", sources=["{file_prefix}.pyx", "{file_prefix}_c.c"], extra_compile_args=[{compile_args}], extra_link_args=[{link_args}], include_dirs=[numpy.get_include()]) setup(name="{routine_name}", ext_modules=cythonize([extension])) """ module_counter = 0
[docs]def openmp_installed(): """Returns true if openmp is installed, false if not. Modified from: https://stackoverflow.com/questions/16549893/programatically-testing-for-openmp-support-from-a-python-setup-script """ tmpdir = tempfile.mkdtemp(".opty_openmp_check") curdir = os.getcwd() os.chdir(tmpdir) filename = r'test.c' contents = r"""\ #include <omp.h> #include <stdio.h> int main(void) { #pragma omp parallel printf("Hello from thread %d, nthreads %d\n", omp_get_thread_num(), omp_get_num_threads()); }""" with open(filename, 'w') as f: f.write(contents) ccompiler = new_compiler() customize_compiler(ccompiler) try: # .compile() should return ['test.o'] on linux ccompiler.compile([filename], extra_postargs=['-fopenmp']) exit = True except CompileError: exit = False finally: os.chdir(curdir) # NOTE : I can't figure out how to get rmtree to work on Windows, so I # don't delete the directory on Windows. if sys.platform != "win32": shutil.rmtree(tmpdir) return exit
[docs]def ufuncify_matrix(args, expr, const=None, tmp_dir=None, parallel=False): """Returns a function that evaluates a matrix of expressions in a tight loop. Parameters ---------- args : iterable of sympy.Symbol A list of all symbols in expr in the desired order for the output function. expr : sympy.Matrix A matrix of expressions. const : tuple, optional This should include any of the symbols in args that should be constant with respect to the loop. tmp_dir : string, optional The path to a directory in which to store the generated files. If None then the files will be not be retained after the function is compiled. parallel : boolean, optional If True and openmp is installed, the generated code will be parallelized across threads. This is only useful when expr are extremely large. """ # TODO : This is my first ever global variable in Python. It'd probably # be better if this was a class attribute of a Ufuncifier class. And I'm # not sure if this current version counts sequentially. global module_counter matrix_size = expr.shape[0] * expr.shape[1] file_prefix_base = 'ufuncify_matrix' file_prefix = '{}_{}'.format(file_prefix_base, module_counter) if tmp_dir is None: codedir = tempfile.mkdtemp(".opty_ufuncify_compile") else: codedir = os.path.abspath(tmp_dir) if not os.path.exists(codedir): os.makedirs(codedir) taken = False while not taken: try: open(os.path.join(codedir, file_prefix + '.pyx'), 'r') except IOError: taken = True else: file_prefix = '{}_{}'.format(file_prefix_base, module_counter) module_counter += 1 d = {'routine_name': 'eval_matrix', 'file_prefix': file_prefix, 'matrix_output_size': matrix_size, 'num_rows': expr.shape[0], 'num_cols': expr.shape[1]} if parallel: if openmp_installed(): openmp = True else: openmp = False msg = ('openmp is not installed or not working properly, request ' 'for parallel execution ignored.') warnings.warn(msg) if parallel and openmp: d['loop_sig'] = "prange(n, nogil=True)" d['head_gil'] = " nogil" d['compile_args'] = "'-fopenmp'" d['link_args'] = "'-fopenmp'" else: d['loop_sig'] = "range(n)" d['head_gil'] = "" d['compile_args'] = "" d['link_args'] = "" matrix_sym = sm.MatrixSymbol('matrix', expr.shape[0], expr.shape[1]) sub_exprs, simple_mat = sm.cse(expr, sm.numbered_symbols('z_')) sub_expr_code = '\n'.join(['double ' + sm.ccode(sub_expr[1], sub_expr[0]) for sub_expr in sub_exprs]) matrix_code = sm.ccode(simple_mat[0], matrix_sym) d['eval_code'] = ' ' + '\n '.join((sub_expr_code + '\n' + matrix_code).split('\n')) c_indent = len('void {routine_name}('.format(**d)) c_arg_spacer = ',\n' + ' ' * c_indent input_args = ['double {}'.format(sm.ccode(a)) for a in args] d['input_args'] = c_arg_spacer.join(input_args) cython_input_args = [] indexed_input_args = [] for a in args: if const is not None and a in const: typ = 'double' idexy = '{}' else: typ = 'np.ndarray[np.double_t, ndim=1]' idexy = '{}[i]' cython_input_args.append('{} {}'.format(typ, sm.ccode(a))) indexed_input_args.append(idexy.format(sm.ccode(a))) cython_indent = len('def {routine_name}_loop('.format(**d)) cython_arg_spacer = ',\n' + ' ' * cython_indent d['numpy_typed_input_args'] = cython_arg_spacer.join(cython_input_args) d['indexed_input_args'] = ',\n'.join(indexed_input_args) files = {} files[d['file_prefix'] + '_c.c'] = _c_template.format(**d) files[d['file_prefix'] + '_h.h'] = _h_template.format(**d) files[d['file_prefix'] + '.pyx'] = _cython_template.format(**d) files[d['file_prefix'] + '_setup.py'] = _setup_template.format(**d) workingdir = os.getcwd() os.chdir(codedir) try: sys.path.append(codedir) for filename, code in files.items(): with open(filename, 'w') as f: f.write(code) cmd = [sys.executable, d['file_prefix'] + '_setup.py', 'build_ext', '--inplace'] subprocess.call(cmd, stderr=subprocess.STDOUT, stdout=subprocess.PIPE) cython_module = importlib.import_module(d['file_prefix']) finally: module_counter += 1 sys.path.remove(codedir) os.chdir(workingdir) if tmp_dir is None: # NOTE : I can't figure out how to get rmtree to work on Windows, # so I don't delete the directory on Windows. if sys.platform != "win32": shutil.rmtree(codedir) return getattr(cython_module, d['routine_name'] + '_loop')
[docs]def controllable(a, b): """Returns true if the system is controllable and false if not. Parameters ---------- a : array_like, shape(n,n) The state matrix. b : array_like, shape(n,r) The input matrix. Returns ------- controllable : boolean """ a = np.asmatrix(a) b = np.asmatrix(b) n = a.shape[0] controllability_matrix = [] for i in range(n): controllability_matrix.append(a ** i * b) controllability_matrix = np.hstack(controllability_matrix) return np.linalg.matrix_rank(controllability_matrix) == n
[docs]def substitute_matrix(matrix, row_idxs, col_idxs, sub_matrix): """Returns the matrix with the values given by the row and column indices with those in the sub-matrix. Parameters ---------- matrix : ndarray, shape(n, m) A matrix (i.e. 2D array). row_idxs : array_like, shape(p<=n,) The row indices which designate which entries should be replaced by the sub matrix entries. col_idxs : array_like, shape(q<=m,) The column indices which designate which entries should be replaced by the sub matrix entries. sub_matrix : ndarray, shape(p, q) A matrix of values to substitute into the specified rows and columns. Notes ----- This makes a copy of the sub_matrix, so if it is large it may be slower than a more optimal implementation. Examples -------- >>> a = np.zeros((3, 4)) >>> sub = np.arange(4).reshape((2, 2)) >>> substitute_matrix(a, [1, 2], [0, 2], sub) array([[ 0., 0., 0., 0.], [ 0., 0., 1., 0.], [ 2., 0., 3., 0.]]) """ assert sub_matrix.shape == (len(row_idxs), len(col_idxs)) row_idx_permutations = np.repeat(row_idxs, len(col_idxs)) col_idx_permutations = np.array(list(col_idxs) * len(row_idxs)) matrix[row_idx_permutations, col_idx_permutations] = sub_matrix.flatten() return matrix
[docs]def sum_of_sines(sigma, frequencies, time): """Returns a sum of sines centered at zero along with its first and second derivatives. Parameters ========== sigma : float The desired standard deviation of the series. frequencies : iterable of floats The frequencies of the sin curves to be included in the sum. time : array_like, shape(n,) The montonically increasing time vector. Returns ======= sines : ndarray, shape(n,) A sum of sines. sines_prime : ndarray, shape(n,) The first derivative of the sum of sines. sines_double_prime : ndarray, shape(n,) The second derivative of the sum of sines. """ phases = 2.0 * np.pi * np.random.ranf(len(frequencies)) sines = np.zeros_like(time) sines_prime = np.zeros_like(time) sines_double_prime = np.zeros_like(time) amplitude = sigma / 2.0 for w, p in zip(frequencies, phases): sines += amplitude * np.sin(w * time + p) sines_prime += amplitude * w * np.cos(w * time + p) sines_double_prime -= amplitude * w**2 * np.sin(w * time + p) return sines, sines_prime, sines_double_prime