From cfbbb3428794ba16cba1cebf322ff0a1acd57625 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Fri, 11 May 2018 21:35:28 +0100 Subject: Use logging --- cdsapi/api.py | 125 ++++++++++++++++++++++++++++++++-------------------------- 1 file changed, 68 insertions(+), 57 deletions(-) (limited to 'cdsapi') diff --git a/cdsapi/api.py b/cdsapi/api.py index 915bd47..5428b57 100644 --- a/cdsapi/api.py +++ b/cdsapi/api.py @@ -3,6 +3,7 @@ import json import time import datetime import os +import logging def bytes_to_string(n): @@ -11,27 +12,21 @@ def bytes_to_string(n): while n >= 1024: n /= 1024.0 i += 1 - return "%g%s" % (int(n * 10 + 0.5) / 10.0, u[i]) - - -def robust(call): - - def wrapped(*args, **kwargs): - return call(*args, **kwargs) - - return wrapped + return '%g%s' % (int(n * 10 + 0.5) / 10.0, u[i]) class Client(object): + logger = logging.getLogger('cdsapi') + def __init__(self, - end_point=os.environ.get("CDSAPI_URL"), - api_key=os.environ.get("CDSAPI_KEY"), + end_point=os.environ.get('CDSAPI_URL'), + api_key=os.environ.get('CDSAPI_KEY'), verbose=False, verify=None, timeout=None, full_stack=False): - dotrc = os.environ.get("CDSAPI_RC", os.path.expanduser('~/.cdsapirc')) + dotrc = os.environ.get('CDSAPI_RC', os.path.expanduser('~/.cdsapirc')) if end_point is None or api_key is None: if os.path.exists(dotrc): @@ -53,7 +48,7 @@ class Client(object): verify = int(config.get('verify', 1)) if end_point is None or api_key is None or api_key is None: - raise Exception("Missing/incomplete configuration file: %s" % (dotrc)) + raise Exception('Missing/incomplete configuration file: %s' % (dotrc)) self.end_point = end_point self.api_key = api_key @@ -63,25 +58,28 @@ class Client(object): self.timeout = timeout self.sleep_max = 120 self.full_stack = full_stack + self.delete = False - self._trace(dict(end_point=self.end_point, - api_key=self.api_key, - verbose=self.verbose, - verify=self.verify, - timeout=self.timeout, - sleep_max=self.sleep_max, - full_stack=self.full_stack, - )) + self.logger.debug("CDSAPI %s" % dict(end_point=self.end_point, + api_key=self.api_key, + verbose=self.verbose, + verify=self.verify, + timeout=self.timeout, + sleep_max=self.sleep_max, + full_stack=self.full_stack, + )) def get_resource(self, name, request, target=None): - self._api("%s/resources/%s" % (self.end_point, name), request, target) + self._api('%s/resources/%s' % (self.end_point, name), request, target) def _download(self, url, size, local_filename=None): if local_filename is None: local_filename = url.split('/')[-1] - r = robust(requests.get)(url, stream=True, verify=self.verify) + self.logger.debug('Downloading %s to %s (%s)' % (url, local_filename, bytes_to_string(size))) + start = time.time() + r = self.robust(requests.get)(url, stream=True, verify=self.verify) total = 0 with open(local_filename, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): @@ -90,6 +88,10 @@ class Client(object): total += len(chunk) assert total == size + + elapsed = time.time() - start + if elapsed: + self.logger.debug('Download rate %s/s' % (bytes_to_string(size / elapsed))) return local_filename def _api(self, url, request, target): @@ -97,16 +99,23 @@ class Client(object): session = requests.Session() session.auth = tuple(self.api_key.split(':', 2)) - self._trace("POST %s %s" % (url, json.dumps(request))) - result = robust(session.post)(url, json=request, verify=self.verify) - reply = {} - print(result) + self.logger.debug('POST %s %s' % (url, json.dumps(request))) + result = self.robust(session.post)(url, json=request, verify=self.verify) + reply = None try: - reply = result.json() result.raise_for_status() reply = result.json() except Exception: + + if reply is None: + try: + reply = result.json() + except Exception: + reply = dict(message=result.text) + + self.logger.debug(json.dumps(reply)) + if 'message' in reply: error = reply['message'] @@ -125,67 +134,69 @@ class Client(object): while True: - self._trace(reply) + self.logger.debug("REPLY %s", reply) if reply['state'] == 'completed': if target: self._download(reply['location'], int(reply['content_length']), target) else: - self._trace("HEAD %s" % (reply['location'],)) - metadata = robust(session.head)(reply['location'], verify=self.verify) + self.logger.debug('HEAD %s' % (reply['location'],)) + metadata = self.robust(session.head)(reply['location'], verify=self.verify) metadata.raise_for_status() - self._trace(metadata.headers) + self.logger.debug(metadata.headers) if 'request_id' in reply: rid = reply['request_id'] - task_url = "%s/tasks/%s" % (self.end_point, rid) - self._trace("DELETE %s" % (task_url,)) - delete = session.delete(task_url, verify=self.verify) - self._trace("DELETE returns %s %s" % (delete.status_code, delete.reason)) - try: - delete.raise_for_status() - except Exception: - self._warning("DELETE %s returns %s %s" % (task_url, delete.status_code, delete.reason)) - - self._trace("Done") + if self.delete: + task_url = '%s/tasks/%s' % (self.end_point, rid) + self.logger.debug('DELETE %s' % (task_url,)) + delete = session.delete(task_url, verify=self.verify) + self.logger.debug('DELETE returns %s %s' % (delete.status_code, delete.reason)) + try: + delete.raise_for_status() + except Exception: + self._warning('DELETE %s returns %s %s' % (task_url, delete.status_code, delete.reason)) + + self.logger.debug('Done') return if reply['state'] in ('queued', 'running'): rid = reply['request_id'] if self.timeout and (time.time() - start > self.timeout): - raise Exception("TIMEOUT") + raise Exception('TIMEOUT') - self._trace("Request ID is %s, sleep %s" % (rid, sleep)) + self.logger.debug('Request ID is %s, sleep %s' % (rid, sleep)) time.sleep(sleep) sleep *= 1.5 if sleep > self.sleep_max: sleep = self.sleep_max - task_url = "%s/tasks/%s" % (self.end_point, rid) - self._trace("GET %s" % (task_url,)) + task_url = '%s/tasks/%s' % (self.end_point, rid) + self.logger.debug('GET %s' % (task_url,)) - result = robust(session.get)(task_url, verify=self.verify) + result = self.robust(session.get)(task_url, verify=self.verify) result.raise_for_status() reply = result.json() continue if reply['state'] in ('failed',): - print("Message: %s" % (reply['error'].get("message"),)) - print("Reason: %s" % (reply['error'].get("reason"),)) + print('Message: %s' % (reply['error'].get('message'),)) + print('Reason: %s' % (reply['error'].get('reason'),)) for n in reply.get('error', {}).get('context', {}).get('traceback', '').split('\n'): if n.strip() == '' and not self.full_stack: break - print(" %s" % (n,)) - raise Exception(reply['error'].get("reason"),) + print(' %s' % (n,)) + raise Exception(reply['error'].get('reason'),) + + raise Exception('Unknown API state [%s]' % (reply['state'],)) - raise Exception("Unknown API state [%s]" % (reply['state'],)) + def robust(self, call): - def _trace(self, what): - if isinstance(what, (dict, list)): - what = json.dumps(what, indent=4, sort_keys=True) + def wrapped(*args, **kwargs): + r = call(*args, **kwargs) + return r - ts = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now()) - print('CDS-API %s %s' % (ts, what)) + return wrapped -- cgit v1.2.3