This document describes the HydrOffice BAG library and tools (0.2). For the source code, go here.

Source code for hydroffice.bag.bag

from __future__ import absolute_import, division, print_function  # , unicode_literals

import os
import sys
import logging
import numpy as np
import h5py
from lxml import etree

log = logging.getLogger(__name__)

from .base import is_bag, File
from .helper import BAGError, Helper
from .meta import Meta


[docs]class BAGFile(File): """ Represents a BAG file. """ _bag_root = "BAG_root" _bag_version = "Bag Version" _bag_version_number = b'1.5.3' _bag_elevation = "BAG_root/elevation" _bag_elevation_min_ev = "Minimum Elevation Value" _bag_elevation_max_ev = "Maximum Elevation Value" _bag_metadata = "BAG_root/metadata" _bag_tracking_list = "BAG_root/tracking_list" _bag_tracking_list_len = "Tracking List Length" _bag_tracking_list_type = np.dtype([('row', np.uint32), ('col', np.uint32), ('depth', np.float32), ('uncertainty', np.float32), ('track_code', np.byte), ('list_series', np.uint16)]) _bag_uncertainty = "BAG_root/uncertainty" _bag_uncertainty_min_uv = "Minimum Uncertainty Value" _bag_uncertainty_max_uv = "Maximum Uncertainty Value" BAG_NAN = 1000000 default_metadata_file = "BAG_metadata.xml" def __init__(self, name, mode=None, driver=None, libver=None, userblock_size=None, swmr=False, **kwds): """ Create a new file object. See the low level bag.File for a detailed explanation of the options. """ if mode is not None: if 'w' not in mode: if not is_bag(name): raise BAGError("The passed file %s is not a BAG file") super(BAGFile, self).__init__(name=name, mode=mode, driver=driver, libver=libver, userblock_size=userblock_size, swmr=swmr, **kwds) self.meta = None self.meta_errors = list() self._str = None
[docs] @classmethod def create_template(cls, name): """ create a BAG file with empty template structure """ log.debug("create new BAG file: %s" % name) try: new_bag = File(name, 'w') new_bag.create_group(cls._bag_root) new_bag.attrs.create(cls._bag_version, cls._bag_version_number, shape=(), dtype="S5") elevation = new_bag.create_dataset(cls._bag_elevation, shape=(), dtype=np.float32) elevation.attrs.create(cls._bag_elevation_min_ev, 0.0, shape=(), dtype=np.float32) elevation.attrs.create(cls._bag_elevation_max_ev, 0.0, shape=(), dtype=np.float32) new_bag.create_dataset(cls._bag_metadata, shape=(1, ), dtype="S1") tracking_list = new_bag.create_dataset(cls._bag_tracking_list, shape=(), dtype=cls._bag_tracking_list_type) tracking_list.attrs.create(cls._bag_tracking_list_len, 0, shape=(), dtype=np.uint32) uncertainty = new_bag.create_dataset(cls._bag_uncertainty, shape=(), dtype=np.float32) uncertainty.attrs.create(cls._bag_uncertainty_min_uv, 0.0, shape=(), dtype=np.float32) uncertainty.attrs.create(cls._bag_uncertainty_max_uv, 0.0, shape=(), dtype=np.float32) except (BAGError, OSError) as e: raise BAGError("Unable to create the BAG file %s: %s" % (name, e)) return new_bag
[docs] def elevation(self, mask_nan=True, row_range=None): """ Return the elevation as numpy array mask_nan If True, apply a mask using the BAG nan value row_range If present, a slice of rows to read from """ if row_range: if not isinstance(row_range, slice): raise BAGError("Invalid type of slice selector: %s" % type(row_range)) if (row_range.start < 0) or (row_range.start >= self.elevation_shape()[0]) \ or (row_range.stop < 0) or (row_range.stop > self.elevation_shape()[0]) \ or (row_range.start > row_range.stop): raise BAGError("Invalid values for slice selector: %s" % row_range) if mask_nan: if row_range: el = self[BAGFile._bag_elevation][row_range] else: el = self[BAGFile._bag_elevation][:] mask = el == BAGFile.BAG_NAN el[mask] = np.nan return el if slice: return self[BAGFile._bag_elevation][row_range] else: return self[BAGFile._bag_elevation][:]
[docs] def elevation_shape(self): return self[BAGFile._bag_elevation].shape
[docs] def uncertainty(self, mask_nan=True, row_range=None): """ Return the uncertainty as numpy array mask_nan If True, apply a mask using the BAG nan value row_range If present, a slice of rows to read from """ if row_range: if not isinstance(row_range, slice): raise BAGError("Invalid type of slice selector: %s" % type(row_range)) if (row_range.start < 0) or (row_range.start >= self.uncertainty_shape()[0]) \ or (row_range.stop < 0) or (row_range.stop > self.uncertainty_shape()[0]) \ or (row_range.start > row_range.stop): raise BAGError("Invalid values for slice selector: %s" % row_range) if mask_nan: if row_range: un = self[BAGFile._bag_uncertainty][row_range] else: un = self[BAGFile._bag_uncertainty][:] mask = un == BAGFile.BAG_NAN un[mask] = np.nan return un if slice: return self[BAGFile._bag_uncertainty][row_range] else: return self[BAGFile._bag_uncertainty][:]
[docs] def uncertainty_shape(self): return self[BAGFile._bag_uncertainty].shape
[docs] def tracking_list(self): """ Return the tracking list as numpy array """ return self[BAGFile._bag_tracking_list][:]
[docs] def tracking_list_fields(self): """ Return the tracking list field names """ return self[BAGFile._bag_tracking_list].dtype.names
[docs] def metadata(self, as_string=True, as_pretty_xml=True): """ Return the metadata as_string If True, convert the metadata from a dataset of characters to a string as_pretty_xml If True, return the xml in a pretty format """ if as_string and not as_pretty_xml: try: return self[BAGFile._bag_metadata][:].tostring() except RuntimeError as e: log.info("exception raised: %s" % e) return None if as_pretty_xml: try: xml_tree = etree.fromstring(self[BAGFile._bag_metadata][:].tostring()) return etree.tostring(xml_tree, pretty_print=True) except RuntimeError as e: log.info("exception raised: %s" % e) return None return self[BAGFile._bag_metadata][:]
[docs] def extract_metadata(self, name=None): """ Save metadata on disk name The file path where the metadata will be saved. If None, use a default name. """ meta_xml = self.metadata(as_pretty_xml=True) if meta_xml is None: log.info("unable to access the metadata") return if name is None: name = os.path.join(self.default_metadata_file) with open(os.path.abspath(name), 'w') as fid: fid.write(meta_xml)
[docs] def validate_metadata(self): """ Validate metadata based on XML Schemas and schematron. """ # clean metadata error list self.meta_errors = list() # assuming a valid BAG is_valid = True try: xml_tree = etree.fromstring(self.metadata(as_pretty_xml=True)) except etree.Error as e: log.warning("unabled to parse XML metadata: %s" % e) self.meta_errors.append(e.message) return False try: schema_path = os.path.join(Helper.iso19139_folder(), 'bag', 'bag.xsd') schema_doc = etree.parse(schema_path) schema = etree.XMLSchema(schema_doc) except etree.Error as e: log.warning("unabled to parse XML schema: %s" % e) self.meta_errors.append(e.message) return False try: schema.assertValid(xml_tree) except etree.DocumentInvalid as e: log.warning("invalid metadata based on XML schema: %s" % e) self.meta_errors.append(e.message) for i in schema.error_log: self.meta_errors.append(i) is_valid = False if is_valid: log.debug("xsd validated") try: schematron_path = os.path.join(Helper.iso19757_3_folder(), 'bag_metadata_profile.sch') schematron_doc = etree.parse(schematron_path) except etree.DocumentInvalid as e: log.warning("unabled to parse BAG schematron: %s" % e) self.meta_errors.append(e.message) return False try: from lxml import isoschematron except IOError as e: msg = "Unable to load lxml isoschematron files" log.warning("%s: %s" % (msg, e)) self.meta_errors.append(e.message) return False try: schematron = isoschematron.Schematron(schematron_doc, store_report=True) except etree.DocumentInvalid as e: log.warning("unabled to load BAG schematron: %s" % e) self.meta_errors.append(e.message) return False if schematron.validate(xml_tree): log.debug("schematron validated") else: log.warning("invalid metadata based on Schematron") is_valid = False ns = { 'svrl': 'http://purl.oclc.org/dsdl/svrl', } for i in schematron.error_log: err_tree = etree.fromstring(i.message) # print(etree.tostring(err_tree, pretty_print=True)) err_msg = err_tree.xpath('/svrl:failed-assert/svrl:text', namespaces=ns)[0].text.strip() log.warning(err_msg) self.meta_errors.append(err_msg) return is_valid
[docs] def validation_info(self): """ Return a message string with the result of the validation """ msg = str() msg += "XML input source: %s\nValidation output: " % self._bag_metadata if self.validate_metadata(): msg += "VALID" else: msg += "INVALID\nReasons:\n" for err_msg in self.meta_errors: msg += " - %s\n" % err_msg return msg
[docs] def populate_metadata(self): """ Populate metadata class """ if self.meta is not None: # log.debug("metadata already populated") return self.meta self.meta = Meta(meta_xml=self.metadata(as_pretty_xml=True)) return self.meta
def _str_group_info(self, grp): if grp == self._bag_root: self._str += " <root>\n" elif grp == self._bag_elevation: self._str += " <elevation shape=%s>\n" % str(self.elevation().shape) elif grp == self._bag_uncertainty: self._str += " <uncertainty shape=%s>\n" % str(self.uncertainty().shape) elif grp == self._bag_tracking_list: self._str += " <tracking list shape=%s>\n" % str(self.tracking_list().shape) elif grp == self._bag_metadata: if self.meta is not None: self._str += " %s\n" % str(self.meta) else: self._str += " <%s>\n" % grp else: self._str += " <%s>\n" % grp if grp != self._bag_metadata: for atr in self[grp].attrs: atr_val = self[grp].attrs[atr] self._str += " <%s: %s (%s, %s)>\n" % (atr, atr_val, atr_val.shape, atr_val.dtype) def __str__(self): self._str = super(BAGFile, self).__str__() self.visit(self._str_group_info) return self._str