Source code for astroquery.utils.tap.core

# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
=============
TAP plus
=============

@author: Juan Carlos Segovia
@contact: juan.carlos.segovia@sciops.esa.int

European Space Astronomy Centre (ESAC)
European Space Agency (ESA)

Created on 30 jun. 2016


"""
from astroquery.utils.tap import taputils
from astroquery.utils.tap.conn.tapconn import TapConn
from astroquery.utils.tap.xmlparser.tableSaxParser import TableSaxParser
from astroquery.utils.tap.model.job import Job
from datetime import datetime
from astroquery.utils.tap.gui.login import LoginDialog
from astroquery.utils.tap.xmlparser.jobSaxParser import JobSaxParser
from astroquery.utils.tap.xmlparser.jobListSaxParser import JobListSaxParser
from astroquery.utils.tap.xmlparser import utils
from astroquery.utils.tap.model.filter import Filter
import requests

__all__ = ['Tap', 'TapPlus']

VERSION = "1.0.1"
TAP_CLIENT_ID = "aqtappy-" + VERSION


[docs]class Tap(object): """TAP class Provides TAP capabilities """ def __init__(self, url=None, host=None, server_context=None, tap_context=None, port=80, sslport=443, default_protocol_is_https=False, connhandler=None, verbose=False): """Constructor Parameters ---------- url : str, mandatory if no host is specified, default None TAP URL host : str, optional, default None host name server_context : str, optional, default None server context tap_context : str, optional, default None tap context port : int, optional, default '80' HTTP port sslport : int, optional, default '443' HTTPS port default_protocol_is_https : bool, optional, default False Specifies whether the default protocol to be used is HTTPS connhandler connection handler object, optional, default None HTTP(s) connection hander (creator). If no handler is provided, a new one is created. verbose : bool, optional, default 'False' flag to display information about the process """ self.__internalInit() if url is not None: protocol, host, port, server_context, tap_context = self.__parseUrl(url) if protocol == "http": self.__connHandler = TapConn(False, host, server_context, tap_context, port, sslport) else: # https port -> sslPort self.__connHandler = TapConn(True, host, server_context, tap_context, port, port) else: self.__connHandler = TapConn(default_protocol_is_https, host, server_context, tap_context, port, sslport) # if connectionHandler is set, use it (useful for testing) if connhandler is not None: self.__connHandler = connhandler if verbose: print("Created TAP+ (v"+VERSION+") - Connection:\n" + str(self.__connHandler)) def __internalInit(self): self.__connHandler = None
[docs] def load_tables(self, verbose=False): """Loads all public tables Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of table objects """ return self.__load_tables(verbose=verbose)
def __load_tables(self, only_names=False, include_shared_tables=False, verbose=False): """Loads all public tables Parameters ---------- only_names : bool, TAP+ only, optional, default 'False' True to load table names only include_shared_tables : bool, TAP+, optional, default 'False' True to include shared tables verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of table objects """ # share_info=true&share_accessible=true&only_tables=true flags = "" addedItem = False if only_names: flags = "only_tables=true" addedItem = True if include_shared_tables: if addedItem: flags += "&" flags += "share_accessible=true" addedItem = True print("Retrieving tables...") if flags != "": response = self.__connHandler.execute_get("tables?"+flags) else: response = self.__connHandler.execute_get("tables") if verbose: print(response.status, response.reason) isError = self.__connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.status, response.reason) raise requests.exceptions.HTTPError(response.reason) return None print("Parsing tables...") tsp = TableSaxParser() tsp.parseData(response) print("Done.") return tsp.get_tables()
[docs] def launch_job(self, query, name=None, output_file=None, output_format="votable", verbose=False, dump_to_file=False, upload_resource=None, upload_table_name=None): """Launches a synchronous job Parameters ---------- query : str, mandatory query to be executed output_file : str, optional, default None file name where the results are saved if dumpToFile is True. If this parameter is not provided, the jobid is used instead output_format : str, optional, default 'votable' results format verbose : bool, optional, default 'False' flag to display information about the process dump_to_file : bool, optional, default 'False' if True, the results are saved in a file instead of using memory upload_resource: str, optional, default None resource to be uploaded to UPLOAD_SCHEMA upload_table_name: str, required if uploadResource is provided, default None resource temporary table name associated to the uploaded resource Returns ------- A Job object """ query = taputils.set_top_in_query(query, 2000) if verbose: print("Launched query: '"+str(query)+"'") if upload_resource is not None: if upload_table_name is None: raise ValueError("Table name is required when a resource is uploaded") response = self.__launchJobMultipart(query, upload_resource, upload_table_name, output_format, "sync", verbose, name) else: response = self.__launchJob(query, output_format, "sync", verbose, name) # handle redirection if response.status == 303: # redirection if verbose: print("Redirection found") location = self.__connHandler.find_header( response.getheaders(), "location") if location is None: raise requests.exceptions.HTTPError("No location found after redirection was received (303)") if verbose: print("Redirect to %s", location) subcontext = self.__extract_sync_subcontext(location) response = self.__connHandler.execute_get(subcontext) job = Job(async_job=False, query=query, connhandler=self.__connHandler) isError = self.__connHandler.check_launch_response_status(response, verbose, 200) suitableOutputFile = self.__getSuitableOutputFile(False, output_file, response.getheaders(), isError, output_format) job.outputFile = suitableOutputFile job.parameters['format'] = output_format job.set_response_status(response.status, response.reason) if isError: job.set_failed(True) if dump_to_file: self.__connHandler.dump_to_file(suitableOutputFile, response) raise requests.exceptions.HTTPError(response.reason) else: if verbose: print("Retrieving sync. results...") if dump_to_file: self.__connHandler.dump_to_file(suitableOutputFile, response) else: results = utils.read_http_response(response, output_format) job.set_results(results) if verbose: print("Query finished.") job._phase = 'COMPLETED' return job
[docs] def launch_job_async(self, query, name=None, output_file=None, output_format="votable", verbose=False, dump_to_file=False, background=False, upload_resource=None, upload_table_name=None): """Launches an asynchronous job Parameters ---------- query : str, mandatory query to be executed output_file : str, optional, default None file name where the results are saved if dumpToFile is True. If this parameter is not provided, the jobid is used instead output_format : str, optional, default 'votable' results format verbose : bool, optional, default 'False' flag to display information about the process dump_to_file : bool, optional, default 'False' if True, the results are saved in a file instead of using memory background : bool, optional, default 'False' when the job is executed in asynchronous mode, this flag specifies whether the execution will wait until results are available upload_resource: str, optional, default None resource to be uploaded to UPLOAD_SCHEMA upload_table_name: str, required if uploadResource is provided, default None resource temporary table name associated to the uploaded resource Returns ------- A Job object """ if verbose: print("Launched query: '"+str(query)+"'") if upload_resource is not None: if upload_table_name is None: raise ValueError( "Table name is required when a resource is uploaded") response = self.__launchJobMultipart(query, upload_resource, upload_table_name, output_format, "async", verbose, name) else: response = self.__launchJob(query, output_format, "async", verbose, name) isError = self.__connHandler.check_launch_response_status(response, verbose, 303) job = Job(async_job=True, query=query, connhandler=self.__connHandler) suitableOutputFile = self.__getSuitableOutputFile(True, output_file, response.getheaders(), isError, output_format) job.outputFile = suitableOutputFile job.set_response_status(response.status, response.reason) job.parameters['format'] = output_format if isError: job.set_failed(True) if dump_to_file: self.__connHandler.dump_to_file(suitableOutputFile, response) raise requests.exceptions.HTTPError(response.reason) else: location = self.__connHandler.find_header( response.getheaders(), "location") jobid = self.__getJobId(location) if verbose: print("job " + str(jobid) + ", at: " + str(location)) job.jobid = jobid job.remoteLocation = location if not background: if verbose: print("Retrieving async. results...") # saveResults or getResults will block (not background) if dump_to_file: job.save_results(verbose) else: job.get_results() print("Query finished.") return job
[docs] def load_async_job(self, jobid=None, name=None, verbose=False): """Loads an asynchronous job Parameters ---------- jobid : str, mandatory if no name is provided, default None job identifier name : str, mandatory if no jobid is provided, default None job name verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A Job object """ if name is not None: jobfilter = Filter() jobfilter.add_filter('name', name) jobs = self.search_async_jobs(jobfilter) if jobs is None or len(jobs) < 1: print("No job found for name '"+str(name)+"'") return None jobid = jobs[0].get_jobid() if jobid is None: print("No job identifier found") return None subContext = "async/" + str(jobid) response = self.__connHandler.execute_get(subContext) if verbose: print(response.status, response.reason) print(response.getheaders()) isError = self.__connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.reason) raise requests.exceptions.HTTPError(response.reason) return None # parse job jsp = JobSaxParser(async_job=True) job = jsp.parseData(response)[0] job.set_connhandler(self.__connHandler) # load resulst job.get_results() return job
[docs] def list_async_jobs(self, verbose=False): """Returns all the asynchronous jobs Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of Job objects """ subContext = "async" response = self.__connHandler.execute_get(subContext) if verbose: print(response.status, response.reason) print(response.getheaders()) isError = self.__connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.reason) raise requests.exceptions.HTTPError(response.reason) return None # parse jobs jsp = JobListSaxParser(async_job=True) jobs = jsp.parseData(response) if jobs is not None: for j in jobs: j.connHandler = self.__connHandler return jobs
def __appendData(self, args): data = self.__connHandler.url_encode(args) result = "" firtsTime = True for k in data: if firtsTime: firtsTime = False result = k + '=' + data[k] else: result = result + "&" + k + '=' + data[k] return result
[docs] def save_results(self, job, verbose=False): """Saves job results Parameters ---------- job : Job, mandatory job verbose : bool, optional, default 'False' flag to display information about the process """ job.save_results()
def __getJobId(self, location): pos = location.rfind('/')+1 jobid = location[pos:] return jobid def __launchJobMultipart(self, query, uploadResource, uploadTableName, outputFormat, context, verbose, name=None): uploadValue = str(uploadTableName) + ",param:" + str(uploadTableName) args = { "REQUEST": "doQuery", "LANG": "ADQL", "FORMAT": str(outputFormat), "tapclient": str(TAP_CLIENT_ID), "PHASE": "RUN", "QUERY": str(query), "UPLOAD": ""+str(uploadValue)} if name is not None: args['jobname'] = name f = open(uploadResource, "r") chunk = f.read() f.close() files = [[uploadTableName, uploadResource, chunk]] contentType, body = self.__connHandler.encode_multipart(args, files) response = self.__connHandler.execute_post(context, body, contentType) if verbose: print(response.status, response.reason) print(response.getheaders()) return response def __launchJob(self, query, outputFormat, context, verbose, name=None): args = { "REQUEST": "doQuery", "LANG": "ADQL", "FORMAT": str(outputFormat), "tapclient": str(TAP_CLIENT_ID), "PHASE": "RUN", "QUERY": str(query)} if name is not None: args['jobname'] = name data = self.__connHandler.url_encode(args) response = self.__connHandler.execute_post(context, data) if verbose: print(response.status, response.reason) print(response.getheaders()) return response def __getSuitableOutputFile(self, async_job, outputFile, headers, isError, output_format): dateTime = datetime.now().strftime("%Y%m%d%H%M%S") ext = self.__connHandler.get_suitable_extension(headers) fileName = "" if outputFile is None: if not async_job: fileName = "sync_" + str(dateTime) + ext else: ext = self.__connHandler.get_suitable_extension_by_format( output_format) fileName = "async_" + str(dateTime) + ext else: fileName = outputFile if isError: fileName += ".error" return fileName def __extract_sync_subcontext(self, location): pos = location.find('sync') if pos < 0: return location return location[pos:] def __findCookieInHeader(self, headers, verbose=False): cookies = self.__connHandler.find_header(headers, 'Set-Cookie') if verbose: print(cookies) if cookies is None: return None else: items = cookies.split(';') for i in items: if i.startswith("JSESSIONID="): return i return None def __parseUrl(self, url, verbose=False): isHttps = False if url.startswith("https://"): isHttps = True protocol = "https" else: protocol = "http" if verbose: print("is https: " + str(isHttps)) urlInfoPos = url.find("://") if urlInfoPos < 0: raise ValueError("Invalid URL format") urlInfo = url[(urlInfoPos+3):] items = urlInfo.split("/") if verbose: print("'" + urlInfo + "'") for i in items: print("'" + i + "'") itemsSize = len(items) hostPort = items[0] portPos = hostPort.find(":") if portPos > 0: # port found host = hostPort[0:portPos] port = int(hostPort[portPos+1:]) else: # no port found host = hostPort # no port specified: use defaults if isHttps: port = 443 else: port = 80 if itemsSize == 1: serverContext = "" tapContext = "" elif itemsSize == 2: serverContext = "/"+items[1] tapContext = "" elif itemsSize == 3: serverContext = "/"+items[1] tapContext = "/"+items[2] else: data = [] for i in range(1, itemsSize-1): data.append("/"+items[i]) serverContext = utils.util_create_string_from_buffer(data) tapContext = "/"+items[itemsSize-1] if verbose: print("protocol: '%s'" % protocol) print("host: '%s'" % host) print("port: '%d'" % port) print("server context: '%s'" % serverContext) print("tap context: '%s'" % tapContext) return protocol, host, port, serverContext, tapContext def __str__(self): return ("Created TAP+ (v"+VERSION+") - Connection: \n" + str(self.__connHandler))
[docs]class TapPlus(Tap): """TAP plus class Provides TAP and TAP+ capabilities """ def __init__(self, url=None, host=None, server_context=None, tap_context=None, port=80, sslport=443, default_protocol_is_https=False, connhandler=None, verbose=True): """Constructor Parameters ---------- url : str, mandatory if no host is specified, default None TAP URL host : str, optional, default None host name server_context : str, optional, default None server context tap_context : str, optional, default None tap context port : int, optional, default '80' HTTP port sslport : int, optional, default '443' HTTPS port default_protocol_is_https : bool, optional, default False Specifies whether the default protocol to be used is HTTPS connhandler connection handler object, optional, default None HTTP(s) connection hander (creator). If no handler is provided, a new one is created. verbose : bool, optional, default 'True' flag to display information about the process """ super(TapPlus, self).__init__(url, host, server_context, tap_context, port, sslport, default_protocol_is_https, connhandler, verbose) self.__internalInit() def __internalInit(self): self.__user = None self.__pwd = None self.__isLoggedIn = False
[docs] def load_tables(self, only_names=False, include_shared_tables=False, verbose=False): """Loads all public tables Parameters ---------- only_names : bool, TAP+ only, optional, default 'False' True to load table names only include_shared_tables : bool, TAP+, optional, default 'False' True to include shared tables verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of table objects """ return self._Tap__load_tables(only_names=only_names, include_shared_tables=include_shared_tables, verbose=verbose)
[docs] def load_table(self, table, verbose=False): """Loads the specified table Parameters ---------- table : str, mandatory full qualified table name (i.e. schema name + table name) verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A table object """ print("Retrieving table '"+str(table)+"'") connHandler = self.__getconnhandler() response = connHandler.execute_get("tables?tables="+table) if verbose: print(response.status, response.reason) isError = connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.status, response.reason) raise requests.exceptions.HTTPError(response.reason) return None print("Parsing table '"+str(table)+"'...") tsp = TableSaxParser() tsp.parseData(response) print("Done.") return tsp.get_table()
[docs] def search_async_jobs(self, jobfilter=None, verbose=False): """Searches for jobs applying the specified filter Parameters ---------- jobfilter : JobFilter, optional, default None job filter verbose : bool, optional, default 'False' flag to display information about the process Returns ------- A list of Job objects """ # jobs/list?[&session=][&limit=][&offset=][&order=][&metadata_only=true|false] subContext = "jobs/async" if jobfilter is not None: data = jobfilter.createUrlRequest() if data is not None: subContext = subContext + '?' + self.__appendData(data) connHandler = self.__getconnhandler() response = connHandler.execute_get(subContext) if verbose: print(response.status, response.reason) print(response.getheaders()) isError = connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.reason) raise requests.exceptions.HTTPError(response.reason) return None # parse jobs jsp = JobSaxParser(async_job=True) jobs = jsp.parseData(response) if jobs is not None: for j in jobs: j.set_connhandler(connHandler) return jobs
[docs] def remove_jobs(self, jobs_list, verbose=False): """Removes the specified jobs Parameters ---------- jobs_list : str, mandatory jobs identifiers to be removed verbose : bool, optional, default 'False' flag to display information about the process """ if jobs_list is None: return jobsIds = None if isinstance(jobs_list, str): jobsIds = jobs_list elif isinstance(jobs_list, list): jobsIds = ','.join(jobs_list) else: raise Exception("Invalid object type") if verbose: print("Jobs to be removed: " + str(jobsIds)) data = "JOB_IDS=" + jobsIds subContext = "deletejobs" connHandler = self.__getconnhandler() response = connHandler.execute_post(subContext, data) if verbose: print(response.status, response.reason) print(response.getheaders()) isError = connHandler.check_launch_response_status(response, verbose, 200) if isError: print(response.reason) raise requests.exceptions.HTTPError(response.reason)
[docs] def login(self, user=None, password=None, credentials_file=None, verbose=False): """Performs a login. User and password can be used or a file that contains user name and password (2 lines: one for user name and the following one for the password) Parameters ---------- user : str, mandatory if 'file' is not provided, default None login name password : str, mandatory if 'file' is not provided, default None user password credentials_file : str, mandatory if no 'user' & 'password' are provided file containing user and password in two lines verbose : bool, optional, default 'False' flag to display information about the process """ if credentials_file is not None: # read file: get user & password with open(credentials_file, "r") as ins: user = ins.readline().strip() password = ins.readline().strip() if user is None: print("Invalid user name") return if password is None: print("Invalid password") return self.__user = user self.__pwd = password self.__dologin(verbose)
[docs] def login_gui(self, verbose=False): """Performs a login using a GUI dialog Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process """ connHandler = self.__getconnhandler() url = connHandler.get_host_url() loginDialog = LoginDialog(url) loginDialog.show_login() if loginDialog.is_accepted(): self.__user = loginDialog.get_user() self.__pwd = loginDialog.get_password() # execute login self.__dologin(verbose) else: self.__isLoggedIn = False
def __dologin(self, verbose=False): self.__isLoggedIn = False response = self.__execLogin(self.__user, self.__pwd, verbose) # check response connHandler = self.__getconnhandler() isError = connHandler.check_launch_response_status(response, verbose, 200) if isError: print("Login error: " + str(response.reason)) raise requests.exceptions.HTTPError("Login error: " + str(response.reason)) else: # extract cookie cookie = self._Tap__findCookieInHeader(response.getheaders()) if cookie is not None: self.__isLoggedIn = True connHandler.set_cookie(cookie)
[docs] def logout(self, verbose=False): """Performs a logout Parameters ---------- verbose : bool, optional, default 'False' flag to display information about the process """ subContext = "logout" args = {} connHandler = self.__getconnhandler() data = connHandler.url_encode(args) response = connHandler.execute_secure(subContext, data) if verbose: print(response.status, response.reason) print(response.getheaders()) self.__isLoggedIn = False
def __execLogin(self, usr, pwd, verbose=False): subContext = "login" args = { "username": str(usr), "password": str(pwd)} connHandler = self.__getconnhandler() data = connHandler.url_encode(args) response = connHandler.execute_secure(subContext, data) if verbose: print(response.status, response.reason) print(response.getheaders()) return response def __getconnhandler(self): return self._Tap__connHandler