Source code for binoculars.dispatcher

import sys
import os
import time
import itertools
import subprocess
import multiprocessing

from . import util, errors, space

#python3 support
PY3 = sys.version_info > (3,)

[docs]class Destination(object): type = filename = overwrite = value = config = limits = None opts = {}
[docs] def set_final_filename(self, filename, overwrite): self.type = 'final' self.filename = filename self.overwrite = overwrite
[docs] def set_final_options(self, opts): if opts is not False: self.opts = opts
[docs] def set_limits(self, limits): self.limits = limits
[docs] def set_config(self, conf): self.config = conf
[docs] def set_tmp_filename(self, filename): self.type = 'tmp' self.filename = filename
[docs] def set_memory(self): self.type = 'memory'
[docs] def store(self, verse): self.value = None if verse.dimension == 0: raise ValueError('Empty output, Multiverse contains no spaces') if self.type == 'memory': self.value = verse elif self.type == 'tmp': verse.tofile(self.filename) elif self.type == 'final': for sp, fn in zip(verse.spaces, self.final_filenames()): sp.config = self.config sp.tofile(fn)
[docs] def retrieve(self): if self.type == 'memory': return self.value
[docs] def final_filenames(self): fns = [] if not self.limits == None: base, ext = os.path.splitext(self.filename) for limlabel in util.limit_to_filelabel(self.limits): fn = (base + '_' + limlabel + ext).format(**self.opts) if not self.overwrite: fn = util.find_unused_filename(fn) fns.append(fn) else: fn = self.filename.format(**self.opts) if not self.overwrite: fn = util.find_unused_filename(fn) fns.append(fn) return fns
[docs]class DispatcherBase(util.ConfigurableObject): def __init__(self, config, main): self.main = main super(DispatcherBase, self).__init__(config)
[docs] def parse_config(self, config): super(DispatcherBase, self).parse_config(config) self.config.destination = Destination() destination = config.pop('destination', 'output.hdf5') # optional 'output.hdf5' by default overwrite = util.parse_bool(config.pop('overwrite', 'false')) #by default: numbered files in the form output_ # .hdf5: self.config.destination.set_final_filename(destination, overwrite) # explicitly parsing the options first helps with the debugging self.config.host = config.pop('host', None) # ip adress of the running gui awaiting the spaces self.config.port = config.pop('port', None) # port of the running gui awaiting the spaces self.config.send_to_gui = util.parse_bool(config.pop('send_to_gui', 'false')) # previewing the data, if true, also specify host and port
[docs] def send(self, verses): # provides the possiblity to send the results to the gui over the network if self.config.send_to_gui or (self.config.host is not None and self.config.host is not None): # only continue of ip is specified and send_to_server is flagged for M in verses: if self.config.destination.limits is None: sp = M.spaces[0] if isinstance(sp, space.Space): util.socket_send(self.config.host, int(self.config.port), util.serialize(sp, ','.join(self.main.config.command))) else: for sp, label in zip(M.spaces, util.limit_to_filelabel(self.config.destination.limits)): if isinstance(sp, space.Space): util.socket_send(self.config.host, int(self.config.port), util.serialize(sp, '{0}_{1}'.format(','.join(self.main.config.command), label))) yield M else: for M in verses: yield M
[docs] def has_specific_task(self): return False
[docs] def process_jobs(self, jobs): raise NotImplementedError
[docs] def sum(self, results): raise NotImplementedError
# The simplest possible dispatcher. Does the work all by itself on a single # thread/core/node. 'Local' will most likely suit your needs better.
[docs]class SingleCore(DispatcherBase):
[docs] def process_jobs(self, jobs): for job in jobs: yield self.main.process_job(job)
[docs] def sum(self, results): return space.chunked_sum(self.send(results))
# Base class for Dispatchers using subprocesses to do some work.
[docs]class ReentrantBase(DispatcherBase): actions = 'user',
[docs] def parse_config(self, config): super(ReentrantBase, self).parse_config(config) self.config.action = config.pop('action', 'user').lower() if self.config.action not in self.actions: raise errors.ConfigError('action {0} not recognized for {1}'.format(self.config.action, self.__class__.__name__))
[docs] def has_specific_task(self): if self.config.action == 'user': return False else: return True
[docs] def run_specific_task(self, command): raise NotImplementedError
# Dispatch multiple worker processes locally, while doing the summation in the main process
[docs]class Local(ReentrantBase): ### OFFICIAL API actions = 'user', 'job'
[docs] def parse_config(self, config): super(Local, self).parse_config(config) self.config.ncores = int(config.pop('ncores', 0)) # optionally, specify number of cores (autodetect by default) if self.config.ncores <= 0: self.config.ncores = multiprocessing.cpu_count()
[docs] def process_jobs(self, jobs): if self.config.ncores == 1 and not PY3: # note: SingleCore will be marginally faster map = itertools.imap else: pool = multiprocessing.Pool(self.config.ncores) map = pool.imap_unordered configs = (self.prepare_config(job) for job in jobs) for result in map(self.main.get_reentrant(), configs): yield result
[docs] def sum(self, results): return space.chunked_sum(self.send(results))
[docs] def run_specific_task(self, command): if command: raise errors.SubprocessError("invalid command, too many parameters: '{0}'".format(command)) if self.config.action == 'job': result = self.main.process_job(self.config.job) self.config.destination.store(result)
### UTILITY
[docs] def prepare_config(self, job): config = self.main.clone_config() config.dispatcher.destination.set_memory() config.dispatcher.action = 'job' config.dispatcher.job = job return config, ()
# Dispatch many worker processes on an Oar cluster.
[docs]class Oar(ReentrantBase): ### OFFICIAL API actions = 'user', 'process'
[docs] def parse_config(self, config): super(Oar, self).parse_config(config) self.config.tmpdir = config.pop('tmpdir', os.getcwd()) # Optional, current directory by default self.config.oarsub_options = config.pop('oarsub_options', 'walltime=0:15') # optionally, tweak oarsub parameters self.config.executable = config.pop('executable', ' '.join(util.get_python_executable())) # optionally, override default location of python and/or BINoculars installation
[docs] def process_jobs(self, jobs): self.configfiles = [] self.intermediates = [] clusters = util.cluster_jobs2(jobs, self.main.input.config.target_weight) for jobscluster in clusters: uniq = util.uniqid() jobconfig = os.path.join(self.config.tmpdir, 'binoculars-{0}-jobcfg.zpi'.format(uniq)) self.configfiles.append(jobconfig) config = self.main.clone_config() interm = os.path.join(self.config.tmpdir, 'binoculars-{0}-jobout.hdf5'.format(uniq)) self.intermediates.append(interm) config.dispatcher.destination.set_tmp_filename(interm) config.dispatcher.sum = () config.dispatcher.action = 'process' config.dispatcher.jobs = jobscluster util.zpi_save(config, jobconfig) yield self.oarsub(jobconfig) #if all jobs are sent to the cluster send the process that sums all other jobs uniq = util.uniqid() jobconfig = os.path.join(self.config.tmpdir, 'binoculars-{0}-jobcfg.zpi'.format(uniq)) self.configfiles.append(jobconfig) config = self.main.clone_config() config.dispatcher.sum = self.intermediates config.dispatcher.action = 'process' config.dispatcher.jobs = () util.zpi_save(config, jobconfig) yield self.oarsub(jobconfig)
[docs] def sum(self, results): jobs = list(results) jobscopy = jobs[:] self.oarwait(jobs) self.oar_cleanup(jobscopy) return True
[docs] def run_specific_task(self, command): if self.config.action != 'process' or (not self.config.jobs and not self.config.sum) or command: raise errors.SubprocessError("invalid command, too many parameters or no jobs/sum given") jobs = sum = space.EmptyVerse() if self.config.jobs: jobs = space.verse_sum(self.send(self.main.process_job(job) for job in self.config.jobs)) if self.config.sum: sum = space.chunked_sum(space.Multiverse.fromfile(src) for src in util.yield_when_exists(self.config.sum)) self.config.destination.store(jobs + sum)
### calling OAR
[docs] @staticmethod def subprocess_run(*command): process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) output, unused_err = process.communicate() retcode = process.poll() return retcode, output
[docs] def oarsub(self, *args): command = '{0} process {1}'.format(self.config.executable, ' '.join(args)) ret, output = self.subprocess_run('oarsub', '-l {0}'.format(self.config.oarsub_options), command) if ret == 0: lines = output.split('\n') for line in lines: if line.startswith('OAR_JOB_ID='): void, jobid = line.split('=') util.status('{0}: Launched job {1}'.format(time.ctime(), jobid)) return jobid.strip() return False
[docs] def oarstat(self, jobid): # % oarstat -s -j 5651374 # 5651374: Running # % oarstat -s -j 5651374 # 5651374: Finishing ret, output = self.subprocess_run('oarstat', '-s', '-j', str(jobid)) if ret == 0: for n in output.split('\n'): if n.startswith(str(jobid)): job, status = n.split(':') return status.strip() else: return 'Unknown'
[docs] def oarwait(self, jobs, remaining=0): linelen = 0 if len(jobs) > remaining: util.status('{0}: getting status of {1} jobs...'.format(time.ctime(), len(jobs))) else: return delay = util.loop_delayer(30) while len(jobs) > remaining: next(delay) i = 0 R = 0 W = 0 U = 0 while i < len(jobs): state = self.oarstat(jobs[i]) if state == 'Running': R += 1 elif state in ('Waiting', 'toLaunch', 'Launching'): W += 1 elif state == 'Unknown': U += 1 else: # assume state == 'Finishing' or 'Terminated' but don't wait on something unknown del jobs[i] i -= 1 # otherwise it skips a job i += 1 util.status('{0}: {1} jobs to go. {2} waiting, {3} running, {4} unknown.'.format(time.ctime(), len(jobs), W, R, U)) util.statuseol()
[docs] def oar_cleanup(self, jobs): # cleanup: for f in itertools.chain(self.configfiles, self.intermediates): try: os.remove(f) except Exception as e: print("unable to remove {0}: {1}".format(f, e)) errorfn = [] for jobid in jobs: errorfilename = 'OAR.{0}.stderr'.format(jobid) if os.path.exists(errorfilename): with open(errorfilename, 'r') as fp: errormsg = fp.read() if len(errormsg) > 0: errorfn.append(errorfilename) print('Critical error: OAR Job {0} failed with the following error: \n{1}'.format(jobid, errormsg)) if len(errorfn) > 0: print('Warning! {0} job(s) failed. See above for the details or the error log files: {1}'.format(len(errorfn), ', '.join(errorfn)))