From 9b8667ad2b09eef2881ed872068a51e2518ceb37 Mon Sep 17 00:00:00 2001 From: Baudouin Raoult Date: Wed, 23 May 2018 12:34:36 +0200 Subject: work on logging --- cdsapi/api.py | 108 ++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 74 insertions(+), 34 deletions(-) (limited to 'cdsapi/api.py') diff --git a/cdsapi/api.py b/cdsapi/api.py index 5ea1a48..cacc84b 100644 --- a/cdsapi/api.py +++ b/cdsapi/api.py @@ -49,7 +49,11 @@ class Client(object): full_stack=False, delete=False, retry_max=500, - sleep_max=120 + sleep_max=120, + info_callback=None, + warning_callback=None, + error_callback=None, + debug_callback=None, ): dotrc = os.environ.get('CDSAPI_RC', os.path.expanduser('~/.cdsapirc')) @@ -80,17 +84,23 @@ class Client(object): self.retry_max = retry_max self.full_stack = full_stack self.delete = delete - - self.logger.debug("CDSAPI %s", dict(url=self.url, - key=self.key, - quiet=self.quiet, - verify=self.verify, - timeout=self.timeout, - sleep_max=self.sleep_max, - retry_max=self.retry_max, - full_stack=self.full_stack, - delete=self.delete - )) + self.last_state = None + + self.debug_callback = debug_callback + self.warning_callback = warning_callback + self.info_callback = info_callback + self.error_callback = error_callback + + self.debug("CDSAPI %s", dict(url=self.url, + key=self.key, + quiet=self.quiet, + verify=self.verify, + timeout=self.timeout, + sleep_max=self.sleep_max, + retry_max=self.retry_max, + full_stack=self.full_stack, + delete=self.delete + )) def retrieve(self, name, request, target=None): self._api('%s/resources/%s' % (self.url, name), request, target) @@ -100,7 +110,7 @@ class Client(object): if local_filename is None: local_filename = url.split('/')[-1] - self.logger.debug("Downloading %s to %s (%s)", url, local_filename, bytes_to_string(size)) + self.info("Downloading %s to %s (%s)", url, local_filename, bytes_to_string(size)) start = time.time() r = self.robust(requests.get)(url, stream=True, verify=self.verify) total = 0 @@ -114,7 +124,7 @@ class Client(object): elapsed = time.time() - start if elapsed: - self.logger.debug("Download rate %s/s", bytes_to_string(size / elapsed)) + self.info("Download rate %s/s", bytes_to_string(size / elapsed)) return local_filename def _api(self, url, request, target): @@ -122,7 +132,9 @@ class Client(object): session = requests.Session() session.auth = tuple(self.key.split(':', 2)) - self.logger.debug("POST %s, %s", url, json.dumps(request)) + self.info("Sending request to %s", url) + self.debug("POST %s %s", url, json.dumps(request)) + result = self.robust(session.post)(url, json=request, verify=self.verify) reply = None @@ -137,7 +149,7 @@ class Client(object): except Exception: reply = dict(message=result.text) - self.logger.debug(json.dumps(reply)) + self.debug(json.dumps(reply)) if 'message' in reply: error = reply['message'] @@ -157,33 +169,37 @@ class Client(object): while True: - self.logger.debug("REPLY %s", reply) + self.debug("REPLY %s", reply) + + if reply['state'] != self.last_state: + self.info("Request is %s" % (reply['state'],)) + self.last_state = reply['state'] if reply['state'] == 'completed': if target: self._download(reply['location'], int(reply['content_length']), target) else: - self.logger.debug("HEAD %s", reply['location']) + self.debug("HEAD %s", reply['location']) metadata = self.robust(session.head)(reply['location'], verify=self.verify) metadata.raise_for_status() - self.logger.debug(metadata.headers) + self.debug(metadata.headers) if 'request_id' in reply: rid = reply['request_id'] if self.delete: task_url = '%s/tasks/%s' % (self.url, rid) - self.logger.debug("DELETE %s", task_url) + self.debug("DELETE %s", task_url) delete = session.delete(task_url, verify=self.verify) - self.logger.debug("DELETE returns %s %s", delete.status_code, delete.reason) + self.debug("DELETE returns %s %s", delete.status_code, delete.reason) try: delete.raise_for_status() except Exception: - self.logger.warning("DELETE %s returns %s %s", - task_url, delete.status_code, delete.reason) + self.warning("DELETE %s returns %s %s", + task_url, delete.status_code, delete.reason) - self.logger.debug("Done") + self.debug("Done") return if reply['state'] in ('queued', 'running'): @@ -192,14 +208,14 @@ class Client(object): if self.timeout and (time.time() - start > self.timeout): raise Exception('TIMEOUT') - self.logger.debug("Request ID is %s, sleep %s", rid, sleep) + self.debug("Request ID is %s, sleep %s", rid, sleep) time.sleep(sleep) sleep *= 1.5 if sleep > self.sleep_max: sleep = self.sleep_max task_url = '%s/tasks/%s' % (self.url, rid) - self.logger.debug("GET %s", task_url) + self.debug("GET %s", task_url) result = self.robust(session.get)(task_url, verify=self.verify) result.raise_for_status() @@ -207,16 +223,40 @@ class Client(object): continue if reply['state'] in ('failed',): - self.logger.error("Message: %s", reply['error'].get('message')) - self.logger.error("Reason: %s", reply['error'].get('reason')) + self.error("Message: %s", reply['error'].get('message')) + self.error("Reason: %s", reply['error'].get('reason')) for n in reply.get('error', {}).get('context', {}).get('traceback', '').split('\n'): if n.strip() == '' and not self.full_stack: break - self.logger.error(" %s", n) + self.error(" %s", n) raise Exception("%s. %s." % (reply['error'].get('message'), reply['error'].get('reason'))) raise Exception('Unknown API state [%s]' % (reply['state'],)) + def info(self, *args, **kwargs): + if self.info_callback: + self.info_callback(*args, **kwargs) + else: + self.logger.info(*args, **kwargs) + + def warning(self, *args, **kwargs): + if self.warning_callback: + self.warning_callback(*args, **kwargs) + else: + self.logger.warning(*args, **kwargs) + + def error(self, *args, **kwargs): + if self.error_callback: + self.error_callback(*args, **kwargs) + else: + self.logger.error(*args, **kwargs) + + def debug(self, *args, **kwargs): + if self.debug_callback: + self.debug_callback(*args, **kwargs) + else: + self.logger.debug(*args, **kwargs) + def robust(self, call): def retriable(code, reason): @@ -238,18 +278,18 @@ class Client(object): r = call(*args, **kwargs) except requests.exceptions.ConnectionError as e: r = None - self.logger.warning("Recovering from connection error [%s], attemps %s of %s", - e, tries, self.retry_max) + self.warning("Recovering from connection error [%s], attemx ps %s of %s", + e, tries, self.retry_max) if r is not None: if not retriable(r.status_code, r.reason): return r - self.logger.warning("Recovering from HTTP error [%s %s], attemps %s of %s", - r.status_code, r.reason, tries, self.retry_max) + self.warning("Recovering from HTTP error [%s %s], attemps %s of %s", + r.status_code, r.reason, tries, self.retry_max) tries += 1 - self.logger.warning("Retrying in %s second (", self.sleep_max) + self.warning("Retrying in %s seconds", self.sleep_max) time.sleep(self.sleep_max) return wrapped -- cgit v1.2.3