Source code for binoculars.space

from typing import Generator, Iterable, List, Optional, Sequence, Tuple, Union

import math

import numpy

from functools import reduce
from itertools import chain
from numbers import Number

from numpy import ndarray
from numpy.ma import MaskedArray
from vtk import vtkImageData, vtkXMLImageDataWriter
from vtk.util import numpy_support

from . import util, errors

basestring = (str, bytes)


[docs]def silence_numpy_errors(): """Silence numpy warnings about zero division. Normal usage of Space() will trigger these warnings.""" numpy.seterr(divide="ignore", invalid="ignore")
[docs]def sum_onto(a, axis): """Numpy convenience. Project all dimensions of an array onto an axis, i.e. apply sum() to all axes except the one given.""" for i in reversed(list(range(len(a.shape)))): if axis != i: a = a.sum(axis=i) return a
[docs]class Axis(object): """Represents a single dimension finite discrete grid centered at 0. Important attributes: min lower bound max upper bound res step size / resolution label human-readable identifier min, max and res are floats, but internally only integer operations are used. In particular min = imin * res, max = imax * res """ def __init__(self, min, max, res, label=None): self.res = float(res) if isinstance(min, int): self.imin = min else: self.imin = math.floor(min / self.res) if isinstance(max, int): self.imax = max else: self.imax = math.ceil(max / self.res) self.label = label @property def max(self): return self.imax * self.res @property def min(self): return self.imin * self.res def __len__(self): return self.imax - self.imin + 1 def __iter__(self): return iter(self[index] for index in range(len(self))) def __getitem__(self, key): if isinstance(key, slice): if key.step is not None: raise IndexError("stride not supported") if key.start is None: start = 0 elif key.start < 0: raise IndexError("key out of range") elif isinstance(key.start, int): start = key.start else: raise IndexError("key start must be integer") if key.stop is None: stop = len(self) elif key.stop > len(self): raise IndexError("key out of range") elif isinstance(key.stop, int): stop = key.stop else: raise IndexError("slice stop must be integer") return self.__class__( self.imin + start, self.imin + stop - 1, self.res, self.label ) elif isinstance(key, int) or isinstance(key, numpy.int64): if key >= len(self): # to support iteration raise IndexError("key out of range") return (self.imin + key) * self.res else: raise IndexError("unknown key {0!r}".format(key)) def _get_index_float(self, value: float) -> int: intvalue = int(round(value / self.res)) if self.imin <= intvalue <= self.imax: return intvalue - self.imin raise ValueError( "cannot get index: value {0} not in range [{1}, {2}]".format( value, self.min, self.max ) )
[docs] def get_index(self, value: Union[float, slice, ndarray]) -> Union[int, slice, ndarray]: if isinstance(value, (int, float)): # nice mypy accept int as float but not isinstance return self._get_index_float(value) elif isinstance(value, slice): if value.step is not None: raise IndexError("stride not supported") if value.start is None: start = None else: start = self._get_index_float(value.start) if value.stop is None: stop = None else: stop = self._get_index_float(value.stop) if start is not None and stop is not None and start > stop: start, stop = stop, start return slice(start, stop) else: # TODO it seems that we have maskedarray here... intvalue = numpy.around(value / self.res).astype(int) if ((self.imin <= intvalue) & (intvalue <= self.imax)).all(): return intvalue - self.imin raise ValueError( "cannot get indices, values from [{0}, {1}], axes range [{2}, {3}]".format( value.min(), value.max(), self.min, self.max ) )
def __or__(self, other): # union operation if not isinstance(other, Axis): return NotImplemented if not self.is_compatible(other): raise ValueError("cannot unite axes with different resolution/label") return self.__class__( min(self.imin, other.imin), max(self.imax, other.imax), self.res, self.label ) def __eq__(self, other): if not isinstance(other, Axis): return NotImplemented return ( self.res == other.res and self.imin == other.imin and self.imax == other.imax and self.label == other.label ) def __hash__(self): return hash(self.imin) ^ hash(self.imax) ^ hash(self.res) ^ hash(self.label)
[docs] def is_compatible(self, other): if not isinstance(other, Axis): return False return self.res == other.res and self.label == other.label
def __contains__(self, other): if isinstance(other, Number): return self.min <= other <= self.max elif isinstance(other, Axis): return ( self.is_compatible(other) and self.imin <= other.imin and self.imax >= other.imax )
[docs] def rebound(self, min, max): return self.__class__(min, max, self.res, self.label)
[docs] def rebin(self, factor): # for integers the following relations hold: a // b == floor(a / b), -(-a // b) == ceil(a / b) new = self.__class__( self.imin // factor, -(-self.imax // factor), factor * self.res, self.label ) return self.imin % factor, -self.imax % factor, new
def __repr__(self): return "{0.__class__.__name__} {0.label} (min={0.min}, max={0.max}, res={0.res}, count={1})".format( self, len(self) )
[docs] def restrict_slice(self, value: slice) -> slice: # TODO rename into clip_slice if value.step is not None: raise IndexError("stride not supported") v = value.start # type: Optional[float] if value.start is None: start = None else: start = numpy.clip(value.start, self.min, self.max) if value.stop is None: stop = None else: stop = numpy.clip(value.stop, self.min, self.max) # TODO this produce a wrong slice sementic, the stop value can not be None. # if value.stop == self.max: # stop = None if start is not None and stop is not None and start > stop: start, stop = stop, start # TODO par of the code necessite that start and stop are not None. # this is not the usual slice semantic assert start is not None, f"1-{start}" assert stop is not None, f"2-{stop}" return slice(start, stop)
[docs] def restrict(self, value: Union[float, slice]) -> Union[float, slice]: if isinstance(value, float): return numpy.clip(value, self.min, self.max) elif isinstance(value, slice): return self.restrict_slice(value)
[docs]class Axes(object): """Luxurious tuple of Axis objects.""" def __init__(self, axes): self.axes = tuple(axes) if len(self.axes) > 1 and any(axis.label is None for axis in self.axes): raise ValueError("axis label is required for multidimensional space") def __iter__(self): return iter(self.axes) @property def dimension(self): return len(self.axes) @property def npoints(self): return numpy.array([len(ax) for ax in self.axes]).prod() @property def memory_size(self): # assuming double precision floats for photons, 32 bit integers for contributions return (8 + 4) * self.npoints
[docs] @classmethod def fromfile(cls, filename: str) -> 'Axes': with util.open_h5py(filename, "r") as fp: try: if "axes" in fp and "axes_labels" in fp: # oldest style, float min/max return cls( tuple( Axis(min, max, res, lbl) for ((min, max, res), lbl) in zip( fp["axes"], fp["axes_labels"] ) ) ) elif "axes" in fp: # new try: axes = tuple( Axis(int(imin), int(imax), res, lbl) for ((index, fmin, fmax, res, imin, imax), lbl) in zip( fp["axes"].values(), fp["axes"].keys() ) ) return cls( tuple( axes[int(values[0])] for values in fp["axes"].values() ) ) # reorder the axes to the way in which they were saved except ValueError: return cls( tuple( Axis(int(imin), int(imax), res, lbl) for ((imin, imax, res), lbl) in zip( fp["axes"].values(), fp["axes"].keys() ) ) ) else: # older style, integer min/max return cls( tuple( Axis(imin, imax, res, lbl) for ((imin, imax), res, lbl) in zip( fp["axes_range"], fp["axes_res"], fp["axes_labels"] ) ) ) except (KeyError, TypeError) as e: raise errors.HDF5FileError( "unable to load axes definition from HDF5 file {0}, is it a valid BINoculars file? (original error: {1!r})".format( filename, e ) )
[docs] def tofile(self, filename): with util.open_h5py(filename, "w") as fp: axes = fp.create_group("axes") for index, ax in enumerate(self.axes): axes.create_dataset( ax.label, data=[index, ax.min, ax.max, ax.res, ax.imin, ax.imax] )
[docs] def toarray(self): return numpy.vstack( numpy.hstack([str(ax.imin), str(ax.imax), str(ax.res), ax.label]) for ax in self.axes )
[docs] @classmethod def fromarray(cls, arr): return cls( tuple( Axis(int(imin), int(imax), float(res), lbl) for (imin, imax, res, lbl) in arr ) )
[docs] def index(self, obj: Union[Axis, int, str]) -> int: if isinstance(obj, Axis): return self.axes.index(obj) elif isinstance(obj, int) and 0 <= obj < len(self.axes): return obj elif isinstance(obj, basestring): label = obj.lower() matches = tuple( i for i, axis in enumerate(self.axes) if axis.label.lower() == label ) if len(matches) == 0: raise ValueError("no matching axis found") elif len(matches) == 1: return matches[0] else: raise ValueError("ambiguous axis label {0}".format(label)) else: raise ValueError("invalid axis identifier {0!r}".format(obj))
def __contains__(self, obj): if isinstance(obj, Axis): return obj in self.axes elif isinstance(obj, int): return 0 <= obj < len(self.axes) elif isinstance(obj, basestring): label = obj.lower() return any(axis.label.lower() == label for axis in self.axes) else: raise ValueError("invalid axis identifier {0!r}".format(obj)) def __len__(self): return len(self.axes) def __getitem__(self, key): return self.axes[key] def __eq__(self, other): if not isinstance(other, Axes): return NotImplemented return self.axes == other.axes def __ne__(self, other): if not isinstance(other, Axes): return NotImplemented return self.axes != other.axes def __repr__(self): return "{0.__class__.__name__} ({0.dimension} dimensions, {0.npoints} points, {1}) {{\n {2}\n}}".format( self, util.format_bytes(self.memory_size), "\n ".join(repr(ax) for ax in self.axes), )
[docs] def restricted_key(self, key): if len(key) == 0: return None if len(key) == len(self.axes): return tuple(ax.restrict(s) for s, ax in zip(key, self.axes)) else: raise IndexError("dimension mismatch")
[docs]class EmptySpace(object): """Convenience object for sum() and friends. Treated as zero for addition. Does not share a base class with Space for simplicity.""" def __init__(self, config=None, metadata=None): self.config = config self.metadata = metadata def __add__(self, other): if not isinstance(other, Space) and not isinstance(other, EmptySpace): return NotImplemented return other def __radd__(self, other): if not isinstance(other, Space) and not isinstance(other, EmptySpace): return NotImplemented return other def __iadd__(self, other): if not isinstance(other, Space) and not isinstance(other, EmptySpace): return NotImplemented return other
[docs] def tofile(self, filename): """Store EmptySpace in HDF5 file.""" with util.atomic_write(filename) as tmpname: with util.open_h5py(tmpname, "w") as fp: fp.attrs["type"] = "Empty"
def __repr__(self): return "{0.__class__.__name__}".format(self)
[docs]class Space(object): """Main data-storing object in BINoculars. Data is represented on an n-dimensional rectangular grid. Per grid point, the number of photons (~ intensity) and the number of original data points (pixels) contribution is stored. Important attributes: axes Axes instances describing range and stepsizes of each of the dimensions photons n-dimension numpy float array, total intensity per grid point contribitions n-dimensional numpy integer array, number of original datapoints (pixels) per grid point dimension n""" def __init__(self, axes, config=None, metadata=None): if not isinstance(axes, Axes): self.axes = Axes(axes) else: self.axes = axes self.config = config self.metadata = metadata self.photons = numpy.zeros([len(ax) for ax in self.axes], order="C") self.contributions = numpy.zeros(self.photons.shape, order="C") @property def dimension(self): return self.axes.dimension @property def npoints(self): return self.photons.size @property def memory_size(self): """Returns approximate memory consumption of this Space. Only considers size of .photons and .contributions, does not take into account the overhead.""" return self.photons.nbytes + self.contributions.nbytes @property def config(self): """util.ConfigFile instance describing configuration file used to create this Space instance""" return self._config @config.setter def config(self, conf): if isinstance(conf, util.ConfigFile): self._config = conf elif not conf: self._config = util.ConfigFile() else: raise TypeError("'{0!r}' is not a util.ConfigFile".format(conf)) @property def metadata(self): """util.MetaData instance describing metadata used to create this Space instance""" return self._metadata @metadata.setter def metadata(self, metadata): if isinstance(metadata, util.MetaData): self._metadata = metadata elif not metadata: self._metadata = util.MetaData() else: raise TypeError("'{0!r}' is not a util.MetaData".format(metadata))
[docs] def copy(self): """Returns a copy of self. Numpy data is not shared, but the Axes object is.""" new = self.__class__(self.axes, self.config, self.metadata) new.photons[:] = self.photons new.contributions[:] = self.contributions return new
[docs] def get(self): """Returns normalized photon count.""" return self.photons / self.contributions
def __repr__(self): return "{0.__class__.__name__} ({0.dimension} dimensions, {0.npoints} points, {1}) {{\n {2}\n}}".format( self, util.format_bytes(self.memory_size), "\n ".join(repr(ax) for ax in self.axes), ) def __getitem__(self, key): """Slicing only! space[-0.2:0.2, 0.9:1.1] does exactly what the syntax implies. Ellipsis operator '...' is not supported.""" newkey = self.get_key(key) newaxes = tuple( ax[k] for k, ax in zip(newkey, self.axes) if isinstance(ax[k], Axis) ) if not newaxes: return self.photons[newkey] / self.contributions[newkey] newspace = self.__class__(newaxes, self.config, self.metadata) newspace.photons = self.photons[newkey].copy() newspace.contributions = self.contributions[newkey].copy() return newspace
[docs] def get_key(self, key: Union[float, slice, Tuple[Union[float, slice, ndarray], ...], List[Union[float, slice, ndarray]]] ) -> Tuple[Union[int, slice, ndarray], ...]: # needed in the fitaid for visualising the interpolated data """Convert the n-dimensional interval described by key (as used by e.g. __getitem__()) from data coordinates to indices.""" if isinstance(key, (int, float)) or isinstance(key, slice): if not len(self.axes) == 1: raise IndexError("dimension mismatch") else: key = [key] elif not (isinstance(key, tuple) or isinstance(key, list)) or not len( key ) == len(self.axes): raise IndexError("dimension mismatch") return tuple(ax.get_index(k) for k, ax in zip(key, self.axes))
[docs] def project(self, axis: Union[str, int], *more_axes) -> 'Space': """Reduce dimensionality of Space by projecting onto 'axis'. All data (photons, contributions) is summed along this axis. axis the label of the axis or the index *more_axis also project on these axes""" index = self.axes.index(axis) newaxes = list(self.axes) newaxes.pop(index) newspace = self.__class__(newaxes, self.config, self.metadata) newspace.photons = self.photons.sum(axis=index) newspace.contributions = self.contributions.sum(axis=index) if more_axes: return newspace.project(more_axes[0], *more_axes[1:]) else: return newspace
[docs] def slice(self, axis, key): """Single-axis slice. axis label or index of axis to slice key something like slice(lower_data_range, upper_data_range)""" axindex = self.axes.index(axis) newkey = list(slice(None) for ax in self.axes) newkey[axindex] = key return self.__getitem__(tuple(newkey))
[docs] def get_masked(self) -> MaskedArray: """Returns photons/contributions, but with divide-by-zero's masked out.""" return numpy.ma.array(data=self.get(), mask=(self.contributions == 0))
[docs] def get_variance(self): return numpy.ma.array( data=1 / self.contributions, mask=(self.contributions == 0) )
[docs] def get_grid(self) -> Tuple[ndarray]: """Returns the data coordinates of each grid point, as n-tuple of n-dimensinonal arrays. Basically numpy.mgrid() in data coordinates.""" igrid = numpy.mgrid[tuple(slice(0, len(ax)) for ax in self.axes)] grid = tuple( numpy.array((grid + ax.imin) * ax.res) for grid, ax in zip(igrid, self.axes) ) return grid
[docs] def max(self, axis=None): """Returns maximum intensity.""" return self.get_masked().max(axis=axis)
[docs] def argmax(self): """Returns data coordinates of grid point with maximum intensity.""" array = self.get_masked() return tuple( ax[key] for ax, key in zip( self.axes, numpy.unravel_index(numpy.argmax(array), array.shape) ) )
def __add__(self, other): if isinstance(other, Number): new = self.copy() new.photons += other * self.contributions return new if not isinstance(other, Space): return NotImplemented if not len(self.axes) == len(other.axes) or not all( a.is_compatible(b) for (a, b) in zip(self.axes, other.axes) ): raise ValueError( "cannot add spaces with different dimensionality or resolution" ) new = self.__class__([a | b for (a, b) in zip(self.axes, other.axes)]) new += self new += other return new def __iadd__(self, other): if isinstance(other, Number): self.photons += other * self.contributions return self if not isinstance(other, Space): return NotImplemented if not len(self.axes) == len(other.axes) or not all( a.is_compatible(b) for (a, b) in zip(self.axes, other.axes) ): raise ValueError( "cannot add spaces with different dimensionality or resolution" ) if not all( other_ax in self_ax for (self_ax, other_ax) in zip(self.axes, other.axes) ): return self.__add__(other) index = tuple( slice( self_ax.get_index(other_ax.min), self_ax.get_index(other_ax.min) + len(other_ax), ) for (self_ax, other_ax) in zip(self.axes, other.axes) ) self.photons[index] += other.photons self.contributions[index] += other.contributions self.metadata += other.metadata return self def __sub__(self, other): return self.__add__(other * -1) def __isub__(self, other): return self.__iadd__(other * -1) def __mul__(self, other): if isinstance(other, Number): new = self.__class__(self.axes, self.config, self.metadata) # we would like to keep 1/contributions as the variance # var(aX) = a**2var(X) new.photons = self.photons / other new.contributions = self.contributions / other ** 2 return new else: return NotImplemented
[docs] def trim(self): """Reduce total size of Space by trimming zero-contribution data points on the boundaries.""" mask = self.contributions > 0 lims = ( numpy.flatnonzero(sum_onto(mask, i)) for (i, ax) in enumerate(self.axes) ) lims = tuple((i.min(), i.max()) for i in lims) self.axes = Axes( ax.rebound(min + ax.imin, max + ax.imin) for (ax, (min, max)) in zip(self.axes, lims) ) slices = tuple(slice(min, max + 1) for (min, max) in lims) self.photons = self.photons[slices].copy() self.contributions = self.contributions[slices].copy()
[docs] def rebin(self, resolutions): """Change bin size. resolution n-tuple of floats, new resolution of each axis""" if not len(resolutions) == len(self.axes): raise ValueError("cannot rebin space with different dimensionality") if resolutions == tuple(ax.res for ax in self.axes): return self # gather data and transform labels = list(ax.label for ax in self.axes) coords = self.get_grid() intensity = self.get() weights = self.contributions return self.from_image(resolutions, labels, coords, intensity, weights)
[docs] def reorder(self, labels): """Change order of axes.""" if not self.dimension == len(labels): raise ValueError("dimension mismatch") newindices = list(self.axes.index(label) for label in labels) new = self.__class__( tuple(self.axes[index] for index in newindices), self.config, self.metadata ) new.photons = numpy.transpose(self.photons, axes=newindices) new.contributions = numpy.transpose(self.contributions, axes=newindices) return new
[docs] def transform_coordinates(self, resolutions, labels, transformation): # gather data and transform coords = self.get_grid() transcoords = transformation(*coords) intensity = self.get() weights = self.contributions # get rid of invalid coords valid = reduce( numpy.bitwise_and, chain((numpy.isfinite(t) for t in transcoords)), (weights > 0), ) transcoords = tuple(t[valid] for t in transcoords) return self.from_image( resolutions, labels, transcoords, intensity[valid], weights[valid] )
[docs] def process_image(self, coordinates, intensity, weights): """Load image data into Space. coordinates n-tuple of data coordinate arrays intensity data intensity array weights weights array, supply numpy.ones_like(intensity) for equal weights""" if len(coordinates) != len(self.axes): raise ValueError("dimension mismatch between coordinates and axes") intensity = numpy.nan_to_num( intensity ).flatten() # invalids can be handeled by setting weight to 0, this ensures the weights can do that weights = weights.flatten() indices = numpy.array( tuple(ax.get_index(coord) for (ax, coord) in zip(self.axes, coordinates)) ) for i in range(0, len(self.axes)): for j in range(i + 1, len(self.axes)): indices[i, :] *= len(self.axes[j]) indices = indices.sum(axis=0).astype(int).flatten() photons = numpy.bincount(indices, weights=intensity * weights) contributions = numpy.bincount(indices, weights=weights) self.photons.ravel()[: photons.size] += photons self.contributions.ravel()[: contributions.size] += contributions
[docs] @classmethod def from_image( cls, resolutions, labels, coordinates, intensity, weights, limits=None ): """Create Space from image data. resolutions n-tuple of axis resolutions labels n-tuple of axis labels coordinates n-tuple of data coordinate arrays intensity data intensity array""" if limits is not None: invalid = numpy.zeros(intensity.shape).astype(numpy.bool) for coord, sl in zip(coordinates, limits): if sl.start is None and sl.stop is not None: invalid += coord > sl.stop elif sl.start is not None and sl.stop is None: invalid += coord < sl.start elif sl.start is not None and sl.stop is not None: invalid += numpy.bitwise_or(coord < sl.start, coord > sl.stop) if numpy.all(invalid == True): return EmptySpace() coordinates = tuple(coord[~invalid] for coord in coordinates) intensity = intensity[~invalid] weights = weights[~invalid] axes = tuple( Axis(coord.min(), coord.max(), res, label) for res, label, coord in zip(resolutions, labels, coordinates) ) newspace = cls(axes) newspace.process_image(coordinates, intensity, weights) return newspace
[docs] def tovti(self, filename: str) -> None: assert self.dimension == 3, "only array with three dimensions are allowed" data = self.photons / self.contributions spacing = tuple(a.res for a in self.axes.axes) origin = tuple(a.min for a in self.axes.axes) name = str(tuple(a.label for a in self.axes.axes)) image_data = vtkImageData() image_data.SetSpacing(spacing) image_data.SetOrigin(origin) image_data.SetDimensions(data.shape) temp_array = numpy.transpose(numpy.flip(data, 2)).flatten() temp_array = numpy_support.numpy_to_vtk(temp_array, deep=1) pd = image_data.GetPointData() pd.SetScalars(temp_array) pd.GetArray(0).SetName(name) # export data to file writer = vtkXMLImageDataWriter() writer.SetFileName(filename) writer.SetInputData(image_data) writer.Write()
[docs] def tofile(self, filename): """Store Space in HDF5 file.""" with util.atomic_write(filename) as tmpname: with util.open_h5py(tmpname, "w") as fp: fp.attrs["type"] = "Space" self.config.tofile(fp) self.axes.tofile(fp) self.metadata.tofile(fp) fp.create_dataset( "counts", self.photons.shape, dtype=self.photons.dtype, compression="gzip", ).write_direct(self.photons) fp.create_dataset( "contributions", self.contributions.shape, dtype=self.contributions.dtype, compression="gzip", ).write_direct(self.contributions)
[docs] @classmethod def fromfile(cls, file: str, key: Optional[Sequence[slice]]=None) -> 'Space': """Load Space from HDF5 file. file filename string or h5py.Group instance key sliced (subset) loading, should be an n-tuple of slice()s in data coordinates""" try: with util.open_h5py(file, "r") as fp: if "type" in fp.attrs.keys(): if fp.attrs["type"] == "Empty": return EmptySpace() axes = Axes.fromfile(fp) config = util.ConfigFile.fromfile(fp) metadata = util.MetaData.fromfile(fp) if key: if len(axes) != len(key): raise ValueError( "dimensionality of 'key' does not match dimensionality of Space in HDF5 file {0}".format( file ) ) key = tuple(ax.get_index(k) for k, ax in zip(key, axes)) for index, sl in enumerate(key): if sl.start == sl.stop and sl.start is not None: raise KeyError("key results in empty space") axes = tuple( ax[k] for k, ax in zip(key, axes) if isinstance(k, slice) ) else: key = Ellipsis space = cls(axes, config, metadata) try: fp["counts"].read_direct(space.photons, key) fp["contributions"].read_direct(space.contributions, key) except (KeyError, TypeError) as e: raise errors.HDF5FileError( "unable to load Space from HDF5 file {0}, is it a valid BINoculars file? (original error: {1!r})".format( file, e ) ) except IOError as e: raise errors.HDF5FileError( "unable to open '{0}' as HDF5 file (original error: {1!r})".format( file, e ) ) return space
[docs]class Multiverse(object): """A collection of spaces with basic support for addition. Only to be used when processing data. This makes it possible to process multiple limit sets in a combination of scans""" def __init__(self, spaces): self.spaces = list(spaces) @property def dimension(self): return len(self.spaces) def __add__(self, other): if not isinstance(other, Multiverse): return NotImplemented if not self.dimension == other.dimension: raise ValueError("cannot add multiverses with different dimensionality") return self.__class__(tuple(s + o for s, o in zip(self.spaces, other.spaces))) def __iadd__(self, other): if not isinstance(other, Multiverse): return NotImplemented if not self.dimension == other.dimension: raise ValueError("cannot add multiverses with different dimensionality") for index, o in enumerate(other.spaces): self.spaces[index] += o return self
[docs] def tofile(self, filename): with util.atomic_write(filename) as tmpname: with util.open_h5py(tmpname, "w") as fp: fp.attrs["type"] = "Multiverse" for index, sp in enumerate(self.spaces): spacegroup = fp.create_group("space_{0}".format(index)) sp.tofile(spacegroup)
[docs] @classmethod def fromfile(cls, file): """Load Multiverse from HDF5 file.""" try: with util.open_h5py(file, "r") as fp: if "type" in fp.attrs: if fp.attrs["type"] == "Multiverse": return cls(tuple(Space.fromfile(fp[label]) for label in fp)) else: raise TypeError("This is not a multiverse") else: raise TypeError("This is not a multiverse") except IOError as e: raise errors.HDF5FileError( "unable to open '{0}' as HDF5 file (original error: {1!r})".format( file, e ) )
def __repr__(self): return "{0.__class__.__name__}\n{1}".format(self, self.spaces)
[docs]class EmptyVerse(object): """Convenience object for sum() and friends. Treated as zero for addition.""" spaces = [EmptySpace()] @property def dimension(self): return len(self.spaces) def __add__(self, other): if not isinstance(other, Multiverse): return NotImplemented return other def __radd__(self, other): if not isinstance(other, Multiverse): return NotImplemented return other def __iadd__(self, other): if not isinstance(other, Multiverse): return NotImplemented return other
[docs]def union_axes(axes): axes = tuple(axes) if len(axes) == 1: return axes[0] if not all(isinstance(ax, Axis) for ax in axes): raise TypeError("not all objects are Axis instances") if len(set(ax.res for ax in axes)) != 1 or len(set(ax.label for ax in axes)) != 1: raise ValueError("cannot unite axes with different resolution/label") mi = min(ax.min for ax in axes) ma = max(ax.max for ax in axes) first = axes[0] return first.__class__(mi, ma, first.res, first.label)
[docs]def union_unequal_axes(axes): axes = tuple(axes) if len(axes) == 1: return axes[0] if not all(isinstance(ax, Axis) for ax in axes): raise TypeError("not all objects are Axis instances") if len(set(ax.label for ax in axes)) != 1: raise ValueError("cannot unite axes with different label") mi = min(ax.min for ax in axes) ma = max(ax.max for ax in axes) res = min( ax.res for ax in axes ) # making it easier to use the sliderwidget otherwise this hase no meaning first = axes[0] return first.__class__(mi, ma, res, first.label)
[docs]def sum(spaces): """Calculate sum of iterable of Space instances.""" spaces = tuple(space for space in spaces if not isinstance(space, EmptySpace)) if len(spaces) == 0: return EmptySpace() if len(spaces) == 1: return spaces[0] if len(set(space.dimension for space in spaces)) != 1: raise TypeError("dimension mismatch in spaces") first = spaces[0] axes = tuple( union_axes(space.axes[i] for space in spaces) for i in range(first.dimension) ) newspace = first.__class__(axes) for space in spaces: newspace += space return newspace
[docs]def verse_sum(verses: Generator[Multiverse, None, None]) -> Multiverse: i = iter(M.spaces for M in verses) return Multiverse(sum(spaces) for spaces in zip(*i))
# hybrid sum() / __iadd__()
[docs]def chunked_sum(verses: Iterable[Multiverse], chunksize=10) -> Union[EmptyVerse, Multiverse]: """Calculate sum of iterable of Multiverse instances. Creates intermediate sums to avoid growing a large space at every summation. verses iterable of Multiverse instances chunksize number of Multiverse instances in each intermediate sum""" result = EmptyVerse() for chunk in util.grouper(verses, chunksize): result += verse_sum(M for M in chunk) return result
[docs]def iterate_over_axis(space, axis, resolution=None): ax = space.axes[space.axes.index(axis)] if resolution: bins = get_bins(ax, resolution) for start, stop in zip(bins[:-1], bins[1:]): yield space.slice(axis, slice(start, stop)) else: for value in ax: yield space.slice(axis, value)
[docs]def get_axis_values(axes, axis, resolution=None) -> ndarray: ax = axes[axes.index(axis)] if resolution: bins = get_bins(ax, resolution) return (bins[:-1] + bins[1:]) / 2 else: return numpy.array(list(ax))
[docs]def iterate_over_axis_keys(axes, axis, resolution=None): axindex = axes.index(axis) ax = axes[axindex] k = [slice(None) for i in axes] if resolution: bins = get_bins(ax, resolution) for start, stop in zip(bins[:-1], bins[1:]): k[axindex] = slice(start, stop) yield k else: for value in ax: k[axindex] = value yield k
[docs]def get_bins(ax: Axis, resolution: float) -> ndarray: if float(resolution) < ax.res: raise ValueError( "interval {0} to low, minimum interval is {1}".format(resolution, ax.res) ) mi, ma = ax.min, ax.max if(resolution >= (ma - mi)): return numpy.array([mi, ma]) else: return numpy.linspace(mi, ma, math.ceil(1.0 / resolution * (ma - mi)))
[docs]def dstack(spaces, dindices, dlabel, dresolution): def transform(space, dindex): resolutions = list(ax.res for ax in space.axes) resolutions.append(dresolution) labels = list(ax.label for ax in space.axes) labels.append(dlabel) exprs = list(ax.label for ax in space.axes) exprs.append("ones_like({0}) * {1}".format(labels[0], dindex)) transformation = util.transformation_from_expressions(space, exprs) return space.transform_coordinates(resolutions, labels, transformation) return sum(transform(space, dindex) for space, dindex in zip(spaces, dindices))
[docs]def axis_offset(space, label, offset): exprs = list(ax.label for ax in space.axes) index = space.axes.index(label) exprs[index] += "+ {0}".format(offset) transformation = util.transformation_from_expressions(space, exprs) return space.transform_coordinates( (ax.res for ax in space.axes), (ax.label for ax in space.axes), transformation )
[docs]def bkgsubtract(space, bkg): if space.dimension == bkg.dimension: bkg.photons = bkg.photons * space.contributions / bkg.contributions bkg.photons[bkg.contributions == 0] = 0 bkg.contributions = space.contributions return space - bkg else: photons = numpy.broadcast_arrays(space.photons, bkg.photons)[1] contributions = numpy.broadcast_arrays(space.contributions, bkg.contributions)[ 1 ] bkg = Space(space.axes) bkg.photons = photons bkg.contributions = contributions return bkgsubtract(space, bkg)
[docs]def make_compatible(spaces): if not numpy.alen(numpy.unique(len(space.axes) for space in spaces)) == 1: raise ValueError("cannot make spaces with different dimensionality compatible") ax0 = tuple(ax.label for ax in spaces[0].axes) resmax = tuple( numpy.vstack( tuple(ax.res for ax in space.reorder(ax0).axes) for space in spaces ).max(axis=0) ) resmin = tuple( numpy.vstack( tuple(ax.res for ax in space.reorder(ax0).axes) for space in spaces ).min(axis=0) ) if not resmax == resmin: print( "Warning: Not all spaces have the same resolution. Resolution will be changed to: {0}".format( resmax ) ) return tuple(space.reorder(ax0).rebin2(resmax) for space in spaces)