# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Common utilities for accessing VO simple services.
.. note::
Some functions are not used by Astroquery but kept for
backward-compatibility with ``astropy.vo.client``.
"""
from __future__ import (absolute_import, division, print_function,
unicode_literals)
from astropy.extern import six
from astropy.extern.six.moves import urllib
import fnmatch
import json
import os
import re
import socket
import warnings
from collections import defaultdict
from copy import deepcopy
from astropy.io.votable import parse_single_table, table, tree
from astropy.io.votable.exceptions import vo_raise, vo_warn, E19, W24, W25
from astropy.utils.console import color_print
from astropy.utils.data import get_readable_fileobj
from astropy.utils.data import conf as data_conf
from astropy.utils.exceptions import AstropyUserWarning
from astropy.utils.misc import JsonCustomEncoder
from astropy.utils.xml.unescaper import unescape_all
from .exceptions import (VOSError, MissingCatalog, DuplicateCatalogName,
DuplicateCatalogURL, InvalidAccessURL)
from ..utils.url_helpers import urljoin_keep_path
# Import configurable items declared in __init__.py
from . import conf
__all__ = ['VOSBase', 'VOSCatalog', 'VOSDatabase', 'get_remote_catalog_db',
'call_vo_service', 'list_catalogs']
__dbversion__ = 1
[docs]class VOSBase(object):
"""
Base class for VO catalog and database.
Parameters
----------
tree : JSON tree
"""
def __init__(self, tree):
self._tree = tree
def __getattr__(self, what):
"""Expose dictionary attributes."""
return getattr(self._tree, what)
def __getitem__(self, what):
"""Expose dictionary key look-up."""
return self._tree[what]
def __setitem__(self, what, value):
"""Expose dictionary key assignment."""
self._tree[what] = value
def __iter__(self):
"""Expose dictionary iteration."""
return iter(self._tree)
[docs] def dumps(self):
"""
Dump the contents into a string.
Returns
-------
s : str
Contents as JSON string dump.
"""
return json.dumps(self._tree, cls=JsonCustomEncoder, sort_keys=True,
indent=4)
[docs]class VOSCatalog(VOSBase):
"""
A class to represent VO Service Catalog.
Parameters
----------
tree : JSON tree
Raises
------
VOSError
Missing necessary key(s).
"""
_compulsory_keys = ('title', 'url')
def __init__(self, tree):
super(VOSCatalog, self).__init__(tree)
for key in self._compulsory_keys:
if key not in self._tree:
raise VOSError('Catalog must have "{0}" key.'.format(key))
def __str__(self): # pragma: no cover
"""Show the most important and unique things about a catalog."""
out_str = '\n'.join(['{0}: {1}'.format(key, self._tree[key])
for key in self._compulsory_keys
if key in self._tree])
return out_str
[docs] def delete_attribute(self, key):
"""
Delete given metadata key and its value from the catalog.
Parameters
----------
key : str
Metadata key to delete.
Raises
------
KeyError
Key not found.
VOSError
Key must exist in catalog, therefore cannot be deleted.
"""
if key in self._compulsory_keys:
raise VOSError('{0} must exist in catalog, therefore cannot be '
'deleted.'.format(key))
del self._tree[key]
[docs] @classmethod
def create(cls, title, url, **kwargs):
"""
Create a new VO Service Catalog with user parameters.
Parameters
----------
title : str
Title of the catalog.
url : str
Access URL of the service. This is used to build queries.
kwargs : dict
Additional metadata as keyword-value pairs describing the catalog,
except 'title' and 'url'.
Returns
-------
cat : `VOSCatalog`
VO Service Catalog.
Raises
------
TypeError
Multiple values given for keyword argument.
"""
tree = {'title': title, 'url': url}
tree.update(kwargs)
return cls(tree)
[docs]class VOSDatabase(VOSBase):
"""
A class to represent a collection of :class:`VOSCatalog`.
Parameters
----------
tree : JSON tree
Raises
------
VOSError
If given ``tree`` does not have 'catalogs' key
or catalog is invalid.
"""
def __init__(self, tree):
if 'catalogs' not in tree:
raise VOSError("Invalid VO service catalog database")
super(VOSDatabase, self).__init__(tree)
self._catalogs = tree['catalogs']
if self.version > __dbversion__: # pragma: no cover
vo_warn(W24)
# Maps access URL to primary key(s).
# URL is the real key, but we chose title because it is more readable
# when written out to JSON.
self._url_keys = defaultdict(list)
for key, cat in self.get_catalogs():
self._url_keys[cat['url']].append(key)
def __str__(self): # pragma: no cover
"""Show the most important and unique things about a database."""
return '\n'.join(sorted(self._catalogs))
def __len__(self):
"""Return the number of catalogs in database."""
return len(self._catalogs)
@property
def version(self):
"""Database version number."""
return self._tree['__version__']
[docs] def get_catalogs(self):
"""Iterator to get all catalogs."""
for key, val in self._catalogs.items():
yield key, VOSCatalog(val)
[docs] def get_catalogs_by_url(self, url):
"""Like :meth:`get_catalogs` but using access URL look-up."""
keys = self._url_keys[url]
for key in keys:
yield key, VOSCatalog(self._catalogs[key])
[docs] def get_catalog(self, name):
"""
Get one catalog of given name.
Parameters
----------
name : str
Primary key identifying the catalog.
Returns
-------
obj : `VOSCatalog`
Raises
------
MissingCatalog
If catalog is not found.
"""
if name not in self._catalogs:
raise MissingCatalog("No catalog '{0}' found.".format(name))
return VOSCatalog(self._catalogs[name])
[docs] def get_catalog_by_url(self, url):
"""
Like :meth:`get_catalog` but using access URL look-up.
On multiple matches, only first match is returned.
"""
keys = self._url_keys[url]
if len(keys) < 1:
raise MissingCatalog(
"No catalog with URL '{0}' found.".format(url))
return VOSCatalog(self._catalogs[keys[0]])
@staticmethod
def _match_pattern(all_keys, pattern, sort):
"""Used by :meth:`list_catalogs` and :meth:`list_catalogs_by_url`."""
if pattern is None or len(all_keys) == 0:
out_arr = all_keys
else:
pattern = re.compile(fnmatch.translate('*' + pattern + '*'),
re.IGNORECASE)
out_arr = [s for s in all_keys if pattern.match(s)]
if sort:
out_arr.sort()
return out_arr
[docs] def list_catalogs(self, pattern=None, sort=True):
"""
List catalog names.
Parameters
----------
pattern : str or `None`
If given string is anywhere in a catalog name, it is
considered a matching catalog. It accepts patterns as
in :py:mod:`fnmatch` and is case-insensitive.
By default, all catalogs are returned.
sort : bool
Sort output in alphabetical order. If not sorted, the
order depends on dictionary hashing. Default is `True`.
Returns
-------
out_arr : list of str
List of catalog names.
"""
return self._match_pattern(list(self._catalogs), pattern, sort)
[docs] def list_catalogs_by_url(self, pattern=None, sort=True):
"""Like :meth:`list_catalogs` but using access URL."""
out_arr = self._match_pattern(list(self._url_keys), pattern, sort)
# Discard URL that maps to nothing
return [a for a in out_arr if len(self._url_keys[a]) > 0]
[docs] def add_catalog(self, name, cat, allow_duplicate_url=False):
"""
Add a catalog to database.
Parameters
----------
name : str
Primary key for the catalog.
cat : `VOSCatalog`
Catalog to add.
allow_duplicate_url : bool
Allow catalog with duplicate access URL?
Raises
------
VOSError
Invalid catalog.
DuplicateCatalogName
Catalog with given name already exists.
DuplicateCatalogURL
Catalog with given access URL already exists.
"""
if not isinstance(cat, VOSCatalog):
raise VOSError('{0} is not a VO Service Catalog.'.format(cat))
if name in self._catalogs:
raise DuplicateCatalogName('{0} already exists.'.format(name))
url = cat['url']
names = self._url_keys[url]
if len(names) > 0 and not allow_duplicate_url:
raise DuplicateCatalogURL(
'{0} already exists: {1}'.format(url, names))
self._catalogs[name] = deepcopy(cat._tree)
self._url_keys[url].append(name)
[docs] def add_catalog_by_url(self, name, url, **kwargs):
"""
Like :meth:`add_catalog` but the catalog is created with
only the given name and access URL.
Parameters
----------
name : str
Primary key for the catalog.
url : str
Access URL of the service. This is used to build queries.
kwargs : dict
Keywords accepted by :meth:`add_catalog`.
"""
self.add_catalog(name, VOSCatalog.create(name, url), **kwargs)
[docs] def delete_catalog(self, name):
"""
Delete a catalog from database with given name.
Parameters
----------
name : str
Primary key identifying the catalog.
Raises
------
MissingCatalog
If catalog is not found.
"""
if name not in self._catalogs:
raise MissingCatalog('{0} not found.'.format(name))
self._url_keys[self._catalogs[name]['url']].remove(name)
del self._catalogs[name]
[docs] def delete_catalog_by_url(self, url):
"""
Like :meth:`delete_catalog` but using access URL.
On multiple matches, all matches are deleted.
"""
keys = sorted(self._url_keys[url]) # Makes a copy of list
if len(keys) < 1:
raise MissingCatalog('{0} not found.'.format(url))
for key in keys:
self.delete_catalog(key)
[docs] def merge(self, other, **kwargs):
"""
Merge two database together.
Parameters
----------
other : `VOSDatabase`
The other database to merge.
kwargs : dict
Keywords accepted by :meth:`add_catalog`.
Returns
-------
db : `VOSDatabase`
Merged database.
Raises
------
VOSError
Invalid database or incompatible version.
"""
if not isinstance(other, VOSDatabase):
raise VOSError('{0} is not a VO database.'.format(other))
if other.version != self.version:
raise VOSError('Incompatible database version: {0}, '
'{1}'.format(self.version, other.version))
db = VOSDatabase.create_empty()
for old_db in (self, other):
for key, cat in old_db.get_catalogs():
db.add_catalog(key, cat, **kwargs)
return db
[docs] def to_json(self, filename, overwrite=False):
"""
Write database content to a JSON file.
Parameters
----------
filename : str
JSON file.
overwrite : bool
If `True`, overwrite the output file if it exists.
Raises
------
OSError
If the file exists and ``overwrite`` is `False`.
"""
if os.path.exists(filename) and not overwrite:
raise OSError('{0} exists.'.format(filename))
with open(filename, 'w') as fd:
fd.write(self.dumps())
[docs] @classmethod
def create_empty(cls):
"""
Create an empty database of VO services.
Empty database format::
{
"__version__": 1,
"catalogs" : {
}
}
Returns
-------
db : `VOSDatabase`
Empty database.
"""
return cls({'__version__': __dbversion__, 'catalogs': {}})
[docs] @classmethod
def from_json(cls, filename, **kwargs):
"""
Create a database of VO services from a JSON file.
Example JSON format for Cone Search::
{
"__version__": 1,
"catalogs" : {
"My Cone Search": {
"capabilityClass": "ConeSearch",
"title": "My Cone Search",
"url": "http://foo/cgi-bin/search?CAT=bar&",
...
},
"Another Cone Search": {
...
}
}
}
Parameters
----------
filename : str
JSON file.
kwargs : dict
Keywords accepted by
:func:`~astropy.utils.data.get_readable_fileobj`.
Returns
-------
db : `VOSDatabase`
Database from given file.
"""
with get_readable_fileobj(filename, **kwargs) as fd:
tree = json.load(fd)
return cls(tree)
[docs] @classmethod
def from_registry(cls, registry_url, timeout=60, **kwargs):
"""
Create a database of VO services from VO registry URL.
This is described in detail in :ref:`vo-sec-validator-build-db`,
except for the ``validate_xxx`` keys that are added by the
validator itself.
Parameters
----------
registry_url : str
URL of VO registry that returns a VO Table.
For example, see
``astroquery.vo_conesearch.validator.conf.cs_mstr_list``.
Pedantic is automatically set to `False` for parsing.
timeout : number
Temporarily set ``astropy.utils.data.conf.remote_timeout``
to this value to avoid time out error while reading the
entire registry.
kwargs : dict
Keywords accepted by
:func:`~astropy.utils.data.get_readable_fileobj`.
Returns
-------
db : `VOSDatabase`
Database from given registry.
Raises
------
VOSError
Invalid VO registry.
"""
# Download registry as VO table
with data_conf.set_temp('remote_timeout', timeout):
with get_readable_fileobj(registry_url, **kwargs) as fd:
tab_all = parse_single_table(fd, pedantic=False)
# Registry must have these fields
compulsory_fields = ['res_title', 'access_url']
cat_fields = tab_all.array.dtype.names
for field in compulsory_fields:
if field not in cat_fields: # pragma: no cover
raise VOSError('"{0}" is missing from registry.'.format(field))
title_counter = defaultdict(int)
title_fmt = '{0} {1}'
db = cls.create_empty()
# Each row in the table becomes a catalog
for arr in tab_all.array.data:
cur_cat = {}
cur_key = ''
# Process each field and build the catalog.
# Catalog is completely built before being thrown out
# because codes need less changes should we decide to
# allow duplicate URLs in the future.
for field in cat_fields:
# For primary key, a number needs to be appended to the title
# because registry can have multiple entries with the same
# title but different URLs.
if field == 'res_title':
cur_title = arr['res_title']
title_counter[cur_title] += 1 # Starts with 1
if isinstance(cur_title, bytes): # pragma: py3
cur_key = title_fmt.format(cur_title.decode('utf-8'),
title_counter[cur_title])
else: # pragma: py2
cur_key = title_fmt.format(cur_title,
title_counter[cur_title])
# Special handling of title and access URL,
# otherwise no change.
if field == 'access_url':
s = unescape_all(arr['access_url'])
if isinstance(s, six.binary_type):
s = s.decode('utf-8')
cur_cat['url'] = s
elif field == 'res_title':
cur_cat['title'] = arr[field]
else:
cur_cat[field] = arr[field]
# New field to track duplicate access URLs.
cur_cat['duplicatesIgnored'] = 0
# Add catalog to database, unless duplicate access URL exists.
# In that case, the entry is thrown out and the associated
# counter is updated.
dup_keys = db._url_keys[cur_cat['url']]
if len(dup_keys) < 1:
db.add_catalog(
cur_key, VOSCatalog(cur_cat), allow_duplicate_url=False)
else:
db._catalogs[dup_keys[0]]['duplicatesIgnored'] += 1
warnings.warn(
'{0} is thrown out because it has same access URL as '
'{1}.'.format(cur_key, dup_keys[0]), AstropyUserWarning)
return db
[docs]def get_remote_catalog_db(dbname, cache=True, verbose=True):
"""
Get a database of VO services (which is a JSON file) from a remote
location.
Parameters
----------
dbname : str
Prefix of JSON file to download from
``astroquery.vo_conesearch.conf.vos_baseurl``.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
verbose : bool
Show download progress bars.
Returns
-------
db : `VOSDatabase`
A database of VO services.
"""
return VOSDatabase.from_json(
urljoin_keep_path(conf.vos_baseurl, dbname + '.json'),
encoding='utf8', cache=cache, show_progress=verbose)
def _get_catalogs(service_type, catalog_db, **kwargs):
"""
Expand ``catalog_db`` to a list of catalogs.
Parameters
----------
service_type, catalog_db
See :func:`call_vo_service`.
kwargs : dict
Keywords accepted by :func:`get_remote_catalog_db`.
Returns
-------
catalogs : list of tuple
List of catalogs in the form of ``(key, VOSCatalog)``.
Raises
------
VOSError
Invalid ``catalog_db``.
"""
if catalog_db is None:
catalog_db = get_remote_catalog_db(service_type, **kwargs)
catalogs = catalog_db.get_catalogs()
elif isinstance(catalog_db, VOSDatabase):
catalogs = catalog_db.get_catalogs()
elif isinstance(catalog_db, (VOSCatalog, six.string_types)):
catalogs = [(None, catalog_db)]
elif isinstance(catalog_db, list):
for x in catalog_db:
assert (isinstance(x, (VOSCatalog, six.string_types)) and
not isinstance(x, VOSDatabase))
catalogs = [(None, x) for x in catalog_db]
else: # pragma: no cover
raise VOSError('catalog_db must be a catalog database, '
'a list of catalogs, or a catalog')
return catalogs
def _vo_service_request(url, pedantic, kwargs, cache=True, verbose=False):
"""
This is called by :func:`call_vo_service`.
Raises
------
InvalidAccessURL
Invalid access URL.
"""
if len(kwargs) and not url.endswith(('?', '&')):
raise InvalidAccessURL("url should already end with '?' or '&'")
query = []
for key, value in six.iteritems(kwargs):
query.append('{0}={1}'.format(
urllib.parse.quote(key), urllib.parse.quote_plus(str(value))))
parsed_url = url + '&'.join(query)
with get_readable_fileobj(parsed_url, encoding='binary', cache=cache,
show_progress=verbose) as req:
tab = table.parse(req, filename=parsed_url, pedantic=pedantic)
return vo_tab_parse(tab, url, kwargs)
def vo_tab_parse(tab, url, kwargs):
"""
In case of errors from the server, a complete and correct
'stub' VOTable file may still be returned.
This is to detect that case.
Parameters
----------
tab : `astropy.io.votable.tree.VOTableFile`
url : str
URL used to obtain ``tab``.
kwargs : dict
Keywords used to obtain ``tab``, if any.
Returns
-------
out_tab : `astropy.io.votable.tree.Table`
Raises
------
IndexError
Table iterator fails.
VOSError
Server returns error message or invalid table.
"""
for param in tab.iter_fields_and_params():
if param.ID is not None and param.ID.lower() == 'error':
if isinstance(param, tree.Param):
e = param.value
else: # pragma: no cover
e = ''
raise VOSError("Catalog server '{0}' returned error '{1}'".format(
url, e))
for info in tab.infos:
if info.name is not None and info.name.lower() == 'error':
raise VOSError("Catalog server '{0}' returned error '{1}'".format(
url, info.value))
if tab.resources == []: # pragma: no cover
vo_raise(E19)
for info in tab.resources[0].infos:
if ((info.name == 'QUERY_STATUS' and info.value != 'OK') or
(info.name is not None and info.name.lower() == 'error')):
if info.content is not None: # pragma: no cover
long_descr = ':\n{0}'.format(info.content)
else:
long_descr = ''
raise VOSError("Catalog server '{0}' returned status "
"'{1}'{2}".format(url, info.value, long_descr))
out_tab = tab.get_first_table()
kw_sr = [k for k in kwargs if 'sr' == k.lower()]
if len(kw_sr) == 0:
sr = 0
else:
sr = kwargs.get(kw_sr[0])
if sr != 0 and out_tab.array.size <= 0:
raise VOSError("Catalog server '{0}' returned {1} result".format(
url, out_tab.array.size))
out_tab.url = url # Track the URL
return out_tab
[docs]def call_vo_service(service_type, catalog_db=None, pedantic=None,
verbose=True, cache=True, kwargs={}):
"""
Makes a generic VO service call.
Parameters
----------
service_type : str
Name of the type of service, e.g., 'conesearch_good'.
Used in error messages and to select a catalog database
if ``catalog_db`` is not provided.
catalog_db
May be one of the following, in order from easiest to
use to most control:
- `None`: A database of ``service_type`` catalogs is
downloaded from ``astroquery.vo_conesearch.conf.vos_baseurl``.
The first catalog in the database to successfully return a
result is used.
- *catalog name*: A name in the database of
``service_type`` catalogs at
``astroquery.vo_conesearch.conf.vos_baseurl`` is used.
For a list of acceptable names, use :func:`list_catalogs`.
- *url*: The prefix of a URL to a IVOA Service for
``service_type``. Must end in either '?' or '&'.
- :class:`VOSCatalog` object: A specific catalog manually
downloaded and selected from the database
(see :ref:`vo-sec-client-vos`).
- Any of the above 3 options combined in a list, in which case
they are tried in order.
pedantic : bool or `None`
When `True`, raise an error when the file violates the spec,
otherwise issue a warning. Warnings may be controlled using
:py:mod:`warnings` module. When not provided, uses the
configuration setting ``astroquery.vo_conesearch.conf.pedantic``,
which defaults to `False`.
verbose : bool
Verbose output.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
kwargs : dictionary
Keyword arguments to pass to the catalog service.
No checking is done that the arguments are accepted by
the service, etc.
Returns
-------
obj : `astropy.io.votable.tree.Table`
First table from first successful VO service request.
Raises
------
VOSError
If VO service request fails.
"""
n_timed_out = 0
catalogs = _get_catalogs(service_type, catalog_db, cache=cache,
verbose=verbose)
if pedantic is None: # pragma: no cover
pedantic = conf.pedantic
for name, catalog in catalogs:
if isinstance(catalog, six.string_types):
if catalog.startswith('http'):
url = catalog
else:
remote_db = get_remote_catalog_db(service_type, cache=cache,
verbose=verbose)
catalog = remote_db.get_catalog(catalog)
url = catalog['url']
else:
url = catalog['url']
if verbose: # pragma: no cover
color_print('Trying {0}'.format(url), 'green')
try:
return _vo_service_request(url, pedantic, kwargs, cache=cache,
verbose=verbose)
except Exception as e:
vo_warn(W25, (url, str(e)))
if hasattr(e, 'reason') and isinstance(e.reason, socket.timeout):
n_timed_out += 1
err_msg = 'None of the available catalogs returned valid results.'
if n_timed_out > 0:
err_msg += ' ({0} URL(s) timed out.)'.format(n_timed_out)
raise VOSError(err_msg)
[docs]def list_catalogs(service_type, cache=True, verbose=True, **kwargs):
"""List the catalogs available for the given service type.
Parameters
----------
service_type : str
Name of the type of service, e.g., 'conesearch_good'.
cache : bool
Use caching for VO Service database. Access to actual VO
websites referenced by the database still needs internet
connection.
verbose : bool
Show download progress bars.
pattern : str or `None`
If given string is anywhere in a catalog name, it is
considered a matching catalog. It accepts patterns as
in :py:mod:`fnmatch` and is case-insensitive.
By default, all catalogs are returned.
sort : bool
Sort output in alphabetical order. If not sorted, the
order depends on dictionary hashing. Default is `True`.
Returns
-------
arr : list of str
List of catalog names.
"""
return get_remote_catalog_db(service_type, cache=cache,
verbose=verbose).list_catalogs(**kwargs)