summaryrefslogtreecommitdiff
path: root/cdsapi
diff options
context:
space:
mode:
authorBaudouin Raoult <baudouin.raoult@ecmwf.int>2018-05-11 21:35:28 +0100
committerBaudouin Raoult <baudouin.raoult@ecmwf.int>2018-05-11 21:35:28 +0100
commitcfbbb3428794ba16cba1cebf322ff0a1acd57625 (patch)
tree0d7a4a285ac7c9fb8f8c02a81dc6e88604bb2633 /cdsapi
parent8d86d0bb128a3734985196ea7514e4c53055ad12 (diff)
Use logging
Diffstat (limited to 'cdsapi')
-rw-r--r--cdsapi/api.py125
1 files changed, 68 insertions, 57 deletions
diff --git a/cdsapi/api.py b/cdsapi/api.py
index 915bd47..5428b57 100644
--- a/cdsapi/api.py
+++ b/cdsapi/api.py
@@ -3,6 +3,7 @@ import json
import time
import datetime
import os
+import logging
def bytes_to_string(n):
@@ -11,27 +12,21 @@ def bytes_to_string(n):
while n >= 1024:
n /= 1024.0
i += 1
- return "%g%s" % (int(n * 10 + 0.5) / 10.0, u[i])
-
-
-def robust(call):
-
- def wrapped(*args, **kwargs):
- return call(*args, **kwargs)
-
- return wrapped
+ return '%g%s' % (int(n * 10 + 0.5) / 10.0, u[i])
class Client(object):
+ logger = logging.getLogger('cdsapi')
+
def __init__(self,
- end_point=os.environ.get("CDSAPI_URL"),
- api_key=os.environ.get("CDSAPI_KEY"),
+ end_point=os.environ.get('CDSAPI_URL'),
+ api_key=os.environ.get('CDSAPI_KEY'),
verbose=False,
verify=None,
timeout=None, full_stack=False):
- dotrc = os.environ.get("CDSAPI_RC", os.path.expanduser('~/.cdsapirc'))
+ dotrc = os.environ.get('CDSAPI_RC', os.path.expanduser('~/.cdsapirc'))
if end_point is None or api_key is None:
if os.path.exists(dotrc):
@@ -53,7 +48,7 @@ class Client(object):
verify = int(config.get('verify', 1))
if end_point is None or api_key is None or api_key is None:
- raise Exception("Missing/incomplete configuration file: %s" % (dotrc))
+ raise Exception('Missing/incomplete configuration file: %s' % (dotrc))
self.end_point = end_point
self.api_key = api_key
@@ -63,25 +58,28 @@ class Client(object):
self.timeout = timeout
self.sleep_max = 120
self.full_stack = full_stack
+ self.delete = False
- self._trace(dict(end_point=self.end_point,
- api_key=self.api_key,
- verbose=self.verbose,
- verify=self.verify,
- timeout=self.timeout,
- sleep_max=self.sleep_max,
- full_stack=self.full_stack,
- ))
+ self.logger.debug("CDSAPI %s" % dict(end_point=self.end_point,
+ api_key=self.api_key,
+ verbose=self.verbose,
+ verify=self.verify,
+ timeout=self.timeout,
+ sleep_max=self.sleep_max,
+ full_stack=self.full_stack,
+ ))
def get_resource(self, name, request, target=None):
- self._api("%s/resources/%s" % (self.end_point, name), request, target)
+ self._api('%s/resources/%s' % (self.end_point, name), request, target)
def _download(self, url, size, local_filename=None):
if local_filename is None:
local_filename = url.split('/')[-1]
- r = robust(requests.get)(url, stream=True, verify=self.verify)
+ self.logger.debug('Downloading %s to %s (%s)' % (url, local_filename, bytes_to_string(size)))
+ start = time.time()
+ r = self.robust(requests.get)(url, stream=True, verify=self.verify)
total = 0
with open(local_filename, 'wb') as f:
for chunk in r.iter_content(chunk_size=1024):
@@ -90,6 +88,10 @@ class Client(object):
total += len(chunk)
assert total == size
+
+ elapsed = time.time() - start
+ if elapsed:
+ self.logger.debug('Download rate %s/s' % (bytes_to_string(size / elapsed)))
return local_filename
def _api(self, url, request, target):
@@ -97,16 +99,23 @@ class Client(object):
session = requests.Session()
session.auth = tuple(self.api_key.split(':', 2))
- self._trace("POST %s %s" % (url, json.dumps(request)))
- result = robust(session.post)(url, json=request, verify=self.verify)
- reply = {}
- print(result)
+ self.logger.debug('POST %s %s' % (url, json.dumps(request)))
+ result = self.robust(session.post)(url, json=request, verify=self.verify)
+ reply = None
try:
- reply = result.json()
result.raise_for_status()
reply = result.json()
except Exception:
+
+ if reply is None:
+ try:
+ reply = result.json()
+ except Exception:
+ reply = dict(message=result.text)
+
+ self.logger.debug(json.dumps(reply))
+
if 'message' in reply:
error = reply['message']
@@ -125,67 +134,69 @@ class Client(object):
while True:
- self._trace(reply)
+ self.logger.debug("REPLY %s", reply)
if reply['state'] == 'completed':
if target:
self._download(reply['location'], int(reply['content_length']), target)
else:
- self._trace("HEAD %s" % (reply['location'],))
- metadata = robust(session.head)(reply['location'], verify=self.verify)
+ self.logger.debug('HEAD %s' % (reply['location'],))
+ metadata = self.robust(session.head)(reply['location'], verify=self.verify)
metadata.raise_for_status()
- self._trace(metadata.headers)
+ self.logger.debug(metadata.headers)
if 'request_id' in reply:
rid = reply['request_id']
- task_url = "%s/tasks/%s" % (self.end_point, rid)
- self._trace("DELETE %s" % (task_url,))
- delete = session.delete(task_url, verify=self.verify)
- self._trace("DELETE returns %s %s" % (delete.status_code, delete.reason))
- try:
- delete.raise_for_status()
- except Exception:
- self._warning("DELETE %s returns %s %s" % (task_url, delete.status_code, delete.reason))
-
- self._trace("Done")
+ if self.delete:
+ task_url = '%s/tasks/%s' % (self.end_point, rid)
+ self.logger.debug('DELETE %s' % (task_url,))
+ delete = session.delete(task_url, verify=self.verify)
+ self.logger.debug('DELETE returns %s %s' % (delete.status_code, delete.reason))
+ try:
+ delete.raise_for_status()
+ except Exception:
+ self._warning('DELETE %s returns %s %s' % (task_url, delete.status_code, delete.reason))
+
+ self.logger.debug('Done')
return
if reply['state'] in ('queued', 'running'):
rid = reply['request_id']
if self.timeout and (time.time() - start > self.timeout):
- raise Exception("TIMEOUT")
+ raise Exception('TIMEOUT')
- self._trace("Request ID is %s, sleep %s" % (rid, sleep))
+ self.logger.debug('Request ID is %s, sleep %s' % (rid, sleep))
time.sleep(sleep)
sleep *= 1.5
if sleep > self.sleep_max:
sleep = self.sleep_max
- task_url = "%s/tasks/%s" % (self.end_point, rid)
- self._trace("GET %s" % (task_url,))
+ task_url = '%s/tasks/%s' % (self.end_point, rid)
+ self.logger.debug('GET %s' % (task_url,))
- result = robust(session.get)(task_url, verify=self.verify)
+ result = self.robust(session.get)(task_url, verify=self.verify)
result.raise_for_status()
reply = result.json()
continue
if reply['state'] in ('failed',):
- print("Message: %s" % (reply['error'].get("message"),))
- print("Reason: %s" % (reply['error'].get("reason"),))
+ print('Message: %s' % (reply['error'].get('message'),))
+ print('Reason: %s' % (reply['error'].get('reason'),))
for n in reply.get('error', {}).get('context', {}).get('traceback', '').split('\n'):
if n.strip() == '' and not self.full_stack:
break
- print(" %s" % (n,))
- raise Exception(reply['error'].get("reason"),)
+ print(' %s' % (n,))
+ raise Exception(reply['error'].get('reason'),)
+
+ raise Exception('Unknown API state [%s]' % (reply['state'],))
- raise Exception("Unknown API state [%s]" % (reply['state'],))
+ def robust(self, call):
- def _trace(self, what):
- if isinstance(what, (dict, list)):
- what = json.dumps(what, indent=4, sort_keys=True)
+ def wrapped(*args, **kwargs):
+ r = call(*args, **kwargs)
+ return r
- ts = "{:%Y-%m-%d %H:%M:%S}".format(datetime.datetime.now())
- print('CDS-API %s %s' % (ts, what))
+ return wrapped