Source code for binoculars.main

import os
import sys
import argparse

from . import space, backend, util, errors


[docs]def parse_args(args): parser = argparse.ArgumentParser(prog="binoculars process") parser.add_argument( "-c", metavar="SECTION:OPTION=VALUE", action="append", type=parse_commandline_config_option, default=[], help="additional configuration option in the form section:option=value", ) # noqa parser.add_argument("configfile", help="configuration file") parser.add_argument("command", nargs="*", default=[]) return parser.parse_args(args)
[docs]def parse_commandline_config_option(s): try: key, value = s.split("=", 1) section, option = key.split(":") except ValueError: raise argparse.ArgumentTypeError( "configuration specification '{0}' not in the form section:option=value".format( s ) ) # noqa return section, option, value
[docs]def multiprocessing_main(xxx_todo_changeme): """note the double parenthesis for map() convenience""" (config, command) = xxx_todo_changeme Main.from_object(config, command) return config.dispatcher.destination.retrieve()
[docs]class Main(object): def __init__(self, config, command): if isinstance(config, util.ConfigSectionGroup): self.config = config.configfile.copy() elif isinstance(config, util.ConfigFile): self.config = config.copy() else: raise ValueError("Configfile is the wrong type") # distribute the configfile to space and to the metadata # instance spaceconf = self.config.copy() # input from either the configfile or the configsectiongroup # is valid self.dispatcher = backend.get_dispatcher( config.dispatcher, self, default="local" ) self.projection = backend.get_projection(config.projection) self.input = backend.get_input(config.input) self.dispatcher.config.destination.set_final_options( self.input.get_destination_options(command) ) # noqa if "limits" in self.config.projection: self.dispatcher.config.destination.set_limits( self.config.projection["limits"] ) # noqa if command: self.dispatcher.config.destination.set_config(spaceconf) self.run(command)
[docs] @classmethod def from_args(cls, args): args = parse_args(args) if not os.path.exists(args.configfile): # wait up to 10 seconds if it is a zpi, it might take a # while for the file to appear accross the network if not args.configfile.endswith(".zpi") or not util.wait_for_file( args.configfile, 10 ): # noqa raise errors.FileError( "configuration file '{0}' does not exist".format( # noqa args.configfile ) ) configobj = False with open(args.configfile, "rb") as fp: if fp.read(2) == "\x1f\x8b": # gzip marker fp.seek(0) configobj = util.zpi_load(fp) if not configobj: # reopen args.configfile as text configobj = util.ConfigFile.fromtxtfile( args.configfile, command=args.command, overrides=args.c ) return cls(configobj, args.command)
[docs] @classmethod def from_object(cls, config, command): config.command = command return cls(config, command)
[docs] def run(self, command): if self.dispatcher.has_specific_task(): self.dispatcher.run_specific_task(command) else: jobs = self.input.generate_jobs(command) tokens = self.dispatcher.process_jobs(jobs) self.result = self.dispatcher.sum(tokens) if self.result is True: pass elif isinstance(self.result, space.EmptySpace): sys.stderr.write("error: output is an empty dataset\n") else: self.dispatcher.config.destination.store(self.result)
[docs] def process_job(self, job): def generator(): res = self.projection.config.resolution labels = self.projection.get_axis_labels() for intensity, weights, params in self.input.process_job(job): coords = self.projection.project(*params) if self.projection.config.limits is None: yield space.Multiverse( ( space.Space.from_image( res, labels, coords, intensity, weights=weights ), ) ) # noqa else: yield space.Multiverse( space.Space.from_image( res, labels, coords, intensity, weights=weights, limits=limits, ) for limits in self.projection.config.limits ) # noqa jobverse = space.chunked_sum(generator(), chunksize=25) for sp in jobverse.spaces: if isinstance(sp, space.Space): sp.metadata.add_dataset(self.input.metadata) return jobverse
[docs] def clone_config(self): config = util.ConfigSectionGroup() config.configfile = self.config config.dispatcher = self.dispatcher.config.copy() config.projection = self.projection.config.copy() config.input = self.input.config.copy() return config
[docs] def get_reentrant(self): return multiprocessing_main
[docs]class Split(Main): # completely ignores the dispatcher, just yields a space per image def __init__(self, config, command): self.command = command if isinstance(config, util.ConfigSectionGroup): self.config = config.configfile.copy() elif isinstance(config, util.ConfigFile): self.config = config.copy() else: raise ValueError("Configfile is the wrong type") # input from either the configfile or the configsectiongroup is valid self.projection = backend.get_projection(config.projection) self.input = backend.get_input(config.input)
[docs] def process_job(self, job): res = self.projection.config.resolution labels = self.projection.get_axis_labels() for intensity, weights, params in self.input.process_job(job): coords = self.projection.project(*params) if self.projection.config.limits is None: yield space.Space.from_image( res, labels, coords, intensity, weights=weights ) # noqa else: yield space.Multiverse( space.Space.from_image( res, labels, coords, intensity, weights=weights, limits=limits ) for limits in self.projection.config.limits ) # noqa
[docs] def run(self): for job in self.input.generate_jobs(self.command): for verse in self.process_job(job): yield verse