Source code for compas.com.matlab_.process

from __future__ import print_function
from __future__ import absolute_import
from __future__ import division

import sys
import time

from subprocess import Popen
from subprocess import PIPE

try:
    from scipy.io import savemat
    from scipy.io import loadmat
except ImportError:
    if 'ironpython' not in sys.version.lower():
        raise


__author__     = ['Tom Van Mele <vanmelet@ethz.ch>', ]
__copyright__  = 'Copyright 2014, Block Research Group - ETH Zurich'
__license__    = 'MIT License'
__email__      = 'vanmelet@ethz.ch'


__all__ = ['MatlabProcess', ]


class MatlabProcessError(Exception):
    def __init__(self, message=None):
        if not message:
            message = """"""
        super(MatlabProcessError, self).__init__(message)


[docs]class MatlabProcess(object): """Communicate with Matlab through a subprocess. Parameters ---------- matlab_exec : str, optional Path to the Matlab executable. Defaults to ``matlab``. ws_data : dict, optional Workspace data to be loaded at startup. Defaults to an empty dict. ws_filename : str, optional Filename for workspace storage. Defaults to ``'./workspace.mat'``. Examples -------- >>> m = MatlabProcess() >>> m.start() >>> m.write_value('a', 37) >>> m.run_command('tf = isprime(a);') >>> m.read_workspace() >>> m.stop() >>> print(m.ws_data) >>> m.write_value('a', 17) >>> m.run_command('res = isprime(a);') >>> m.read_value('res') True >>> m.run_command('res = isprime(a);', ivars={'a': 17}) >>> m.read_value('res') True >>> m.run_command('res = isprime(a);', ivars={'a': 17}, ovars={'res': None}) {'res': True} """
[docs] def __init__(self, matlab_exec=None, ws_data=None, ws_filename=None, timeout=None, verbose=True): """""" self.matlab_exec = matlab_exec or 'matlab' self.matlab_options = ['-nosplash'] self.ws_data = ws_data or {} self.ws_filename = ws_filename or './workspace.mat' self.timeout = timeout or 20 self.process = None self.verbose = verbose
[docs] def init(self): pass
[docs] def start(self, options=None): """Start the subprocess. Parameters: options (list, optional) : A list of command line options for the Matlab executable. Available options: * -nosplash * -wait (Windows) * ... """ options = options or self.matlab_options if self.verbose: print('create workspace file.') with open(self.ws_filename, 'wb'): pass if self.verbose: print('starting Matlab process...') pargs = [self.matlab_exec] pargs.extend(options) self.process = Popen(pargs, stdout=PIPE, stderr=PIPE, stdin=PIPE) self._wait_until('__READY__') if self.verbose: print('=' * 79)
def _wait_until(self, str_to_wait_for): self.process.stdin.write("'{0}'\n".format(str_to_wait_for)) t0 = time.time() while True: line = self.process.stdout.readline() if line.strip() == str_to_wait_for: return if time.time() - t0 > self.timeout: return
[docs] def stop(self): if self.verbose: print('=' * 79) print('stopping Matlab process...') self.process.stdin.write("exit;\n") self.process.terminate() if self.verbose: print('closing streams...') self.process.stdin.close() self.process.stdout.close() self.process.stderr.close()
[docs] def run_command(self, command, ivars=None, ovars=None): """Run a command in Matlab. Parameters: command (str) : The command string. ivars (dict, optional) : A dictionary of pairs of variable names and values to write to the Matlab workspace. """ if self.verbose: print('run Matlab command: {0}'.format(command)) if ivars: for name, value in ivars.items(): self.write_value(name, value) command = command.strip() + '\n' self.process.stdin.write(command) self._wait_until('__COMPLETED__') if ovars: for name, value in ovars.items(): ovars[name] = self.read_value(name, value) return ovars
[docs] def write_value(self, name, value): if self.verbose: print('write Matlab value: {0} => {1}'.format(name, value)) self.process.stdin.write("{0}={1};\n".format(name, value))
[docs] def read_value(self, name, default=None): if self.verbose: print('read Matlab value: {0}'.format(name)) self.process.stdin.write("save('{0}', '{1}');\n".format(self.ws_filename, name)) self._wait_until('__SAVED__') loadmat(self.ws_filename, mdict=self.ws_data) value = self.ws_data.get(name) if value: return value[0][0] return default
[docs] def write_workspace(self): if not self.ws_data: return if self.verbose: print('write Matlab workspace.') savemat(self.ws_filename, self.ws_data) self.process.stdin.write("load({0});\n".format(self.ws_filename)) self._wait_until('__LOADED__')
[docs] def read_workspace(self): if self.verbose: print('read Matlab workspace.') self.process.stdin.write("save('{0}');\n".format(self.ws_filename)) self._wait_until('__SAVED__') loadmat(self.ws_filename, mdict=self.ws_data)
# ============================================================================== # Main # ============================================================================== if __name__ == "__main__": m = MatlabProcess() m.start() m.write_value('a', 37) m.run_command('res = isprime(a);') print(m.read_value('res')) print(m.run_command('res = isprime(a);', ivars={'a': 17}, ovars={'res': None})) # m.read_workspace() m.stop() print(m.ws_data)