diff options
Diffstat (limited to 'cdsapi')
-rw-r--r-- | cdsapi/__init__.py | 23 | ||||
-rw-r--r-- | cdsapi/api.py | 639 |
2 files changed, 0 insertions, 662 deletions
diff --git a/cdsapi/__init__.py b/cdsapi/__init__.py deleted file mode 100644 index c9e9d4e..0000000 --- a/cdsapi/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2018 European Centre for Medium-Range Weather Forecasts (ECMWF) -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation nor -# does it submit to any jurisdiction. - -from __future__ import absolute_import, division, print_function, unicode_literals - -from . import api - -Client = api.Client diff --git a/cdsapi/api.py b/cdsapi/api.py deleted file mode 100644 index eb82e34..0000000 --- a/cdsapi/api.py +++ /dev/null @@ -1,639 +0,0 @@ -# (C) Copyright 2018 ECMWF. -# -# This software is licensed under the terms of the Apache Licence Version 2.0 -# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. -# In applying this licence, ECMWF does not waive the privileges and immunities -# granted to it by virtue of its status as an intergovernmental organisation nor -# does it submit to any jurisdiction. - -from __future__ import absolute_import, division, print_function, unicode_literals - -import json -import time -import os -import logging -import uuid -import requests - -try: - from urllib.parse import urljoin -except ImportError: - from urlparse import urljoin - -from tqdm import tqdm - - -def bytes_to_string(n): - u = ["", "K", "M", "G", "T", "P"] - i = 0 - while n >= 1024: - n /= 1024.0 - i += 1 - return "%g%s" % (int(n * 10 + 0.5) / 10.0, u[i]) - - -def read_config(path): - config = {} - with open(path) as f: - for l in f.readlines(): - if ":" in l: - k, v = l.strip().split(":", 1) - if k in ("url", "key", "verify"): - config[k] = v.strip() - return config - - -def toJSON(obj): - - to_json = getattr(obj, "toJSON", None) - if callable(to_json): - return to_json() - - if isinstance(obj, (list, tuple)): - return [toJSON(x) for x in obj] - - if isinstance(obj, dict): - r = {} - for k, v in obj.items(): - r[k] = toJSON(v) - return r - - return obj - - -class Result(object): - def __init__(self, client, reply): - - self.reply = reply - - self._url = client.url - - self.session = client.session - self.robust = client.robust - self.verify = client.verify - self.cleanup = client.delete - - self.debug = client.debug - self.info = client.info - self.warning = client.warning - self.error = client.error - self.sleep_max = client.sleep_max - self.retry_max = client.retry_max - - self.timeout = client.timeout - self.progress = client.progress - - self._deleted = False - - def toJSON(self): - r = dict( - resultType="url", - contentType=self.content_type, - contentLength=self.content_length, - location=self.location, - ) - return r - - def _download(self, url, size, target): - - if target is None: - target = url.split("/")[-1] - - self.info("Downloading %s to %s (%s)", url, target, bytes_to_string(size)) - start = time.time() - - mode = "wb" - total = 0 - sleep = 10 - tries = 0 - headers = None - - while tries < self.retry_max: - - r = self.robust(self.session.get)( - url, - stream=True, - verify=self.verify, - headers=headers, - timeout=self.timeout, - ) - try: - r.raise_for_status() - - with tqdm( - total=size, - unit_scale=True, - unit_divisor=1024, - unit="B", - disable=not self.progress, - leave=False, - ) as pbar: - pbar.update(total) - with open(target, mode) as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - total += len(chunk) - pbar.update(len(chunk)) - - except requests.exceptions.ConnectionError as e: - self.error("Download interupted: %s" % (e,)) - finally: - r.close() - - if total >= size: - break - - self.error( - "Download incomplete, downloaded %s byte(s) out of %s" % (total, size) - ) - self.warning("Sleeping %s seconds" % (sleep,)) - time.sleep(sleep) - mode = "ab" - total = os.path.getsize(target) - sleep *= 1.5 - if sleep > self.sleep_max: - sleep = self.sleep_max - headers = {"Range": "bytes=%d-" % total} - tries += 1 - self.warning("Resuming download at byte %s" % (total,)) - - if total != size: - raise Exception( - "Download failed: downloaded %s byte(s) out of %s" % (total, size) - ) - - elapsed = time.time() - start - if elapsed: - self.info("Download rate %s/s", bytes_to_string(size / elapsed)) - - return target - - def download(self, target=None): - return self._download(self.location, self.content_length, target) - - @property - def content_length(self): - return int(self.reply["content_length"]) - - @property - def location(self): - return urljoin(self._url, self.reply["location"]) - - @property - def content_type(self): - return self.reply["content_type"] - - def __repr__(self): - return "Result(content_length=%s,content_type=%s,location=%s)" % ( - self.content_length, - self.content_type, - self.location, - ) - - def check(self): - self.debug("HEAD %s", self.location) - metadata = self.robust(self.session.head)( - self.location, verify=self.verify, timeout=self.timeout - ) - metadata.raise_for_status() - self.debug(metadata.headers) - return metadata - - def update(self, request_id=None): - if request_id is None: - request_id = self.reply["request_id"] - task_url = "%s/tasks/%s" % (self._url, request_id) - self.debug("GET %s", task_url) - - result = self.robust(self.session.get)(task_url, verify=self.verify) - result.raise_for_status() - self.reply = result.json() - - def delete(self): - - if self._deleted: - return - - if "request_id" in self.reply: - rid = self.reply["request_id"] - - task_url = "%s/tasks/%s" % (self._url, rid) - self.debug("DELETE %s", task_url) - - delete = self.session.delete(task_url, verify=self.verify) - self.debug("DELETE returns %s %s", delete.status_code, delete.reason) - - try: - delete.raise_for_status() - except Exception: - self.warning( - "DELETE %s returns %s %s", - task_url, - delete.status_code, - delete.reason, - ) - - self._deleted = True - - def __del__(self): - try: - if self.cleanup: - self.delete() - except Exception as e: - print(e) - - -class Client(object): - - logger = logging.getLogger("cdsapi") - - def __init__( - self, - url=os.environ.get("CDSAPI_URL"), - key=os.environ.get("CDSAPI_KEY"), - quiet=False, - debug=False, - verify=None, - timeout=60, - progress=True, - full_stack=False, - delete=True, - retry_max=500, - sleep_max=120, - wait_until_complete=True, - info_callback=None, - warning_callback=None, - error_callback=None, - debug_callback=None, - metadata=None, - forget=False, - session=requests.Session(), - ): - - if not quiet: - - if debug: - level = logging.DEBUG - else: - level = logging.INFO - - logging.basicConfig( - level=level, format="%(asctime)s %(levelname)s %(message)s" - ) - - dotrc = os.environ.get("CDSAPI_RC", os.path.expanduser("~/.cdsapirc")) - - if url is None or key is None: - if os.path.exists(dotrc): - config = read_config(dotrc) - - if key is None: - key = config.get("key") - - if url is None: - url = config.get("url") - - if verify is None: - verify = int(config.get("verify", 1)) - - if url is None or key is None or key is None: - raise Exception("Missing/incomplete configuration file: %s" % (dotrc)) - - self.url = url - self.key = key - - self.quiet = quiet - self.progress = progress and not quiet - - self.verify = True if verify else False - self.timeout = timeout - self.sleep_max = sleep_max - self.retry_max = retry_max - self.full_stack = full_stack - self.delete = delete - self.last_state = None - self.wait_until_complete = wait_until_complete - - self.debug_callback = debug_callback - self.warning_callback = warning_callback - self.info_callback = info_callback - self.error_callback = error_callback - - self.session = session - self.session.auth = tuple(self.key.split(":", 2)) - - self.metadata = metadata - self.forget = forget - - self.debug( - "CDSAPI %s", - dict( - url=self.url, - key=self.key, - quiet=self.quiet, - verify=self.verify, - timeout=self.timeout, - progress=self.progress, - sleep_max=self.sleep_max, - retry_max=self.retry_max, - full_stack=self.full_stack, - delete=self.delete, - metadata=self.metadata, - forget=self.forget, - ), - ) - - def retrieve(self, name, request, target=None): - result = self._api("%s/resources/%s" % (self.url, name), request, "POST") - if target is not None: - result.download(target) - return result - - def service(self, name, mimic_ui=False, *args, **kwargs): - self.delete = False # Don't delete results - name = "/".join(name.split(".")) - # To mimic the CDS ui the request should be populated directly with the kwargs - if mimic_ui: - request = kwargs - else: - request = dict(args=args, kwargs=kwargs) - - if self.metadata: - request["_cds_metadata"] = self.metadata - request = toJSON(request) - result = self._api( - "%s/tasks/services/%s/clientid-%s" % (self.url, name, uuid.uuid4().hex), - request, - "PUT", - ) - return result - - def workflow(self, code, *args, **kwargs): - workflow_name = kwargs.pop("workflow_name", "application") - params = dict(code=code, args=args, kwargs=kwargs, workflow_name=workflow_name) - return self.service("tool.toolbox.orchestrator.run_workflow", params) - - def status(self, context=None): - url = "%s/status.json" % (self.url,) - r = self.session.get(url, verify=self.verify) - r.raise_for_status() - return r.json() - - def _status(self, url): - try: - status = self.status(url) - - info = status.get("info", []) - if not isinstance(info, list): - info = [info] - for i in info: - self.info("%s", i) - - warning = status.get("warning", []) - if not isinstance(warning, list): - warning = [warning] - for w in warning: - self.warning("%s", w) - - except Exception: - pass - - def _api(self, url, request, method): - - self._status(url) - - session = self.session - - self.info("Sending request to %s", url) - self.debug("%s %s %s", method, url, json.dumps(request)) - - if method == "PUT": - action = session.put - else: - action = session.post - - result = self.robust(action)( - url, json=request, verify=self.verify, timeout=self.timeout - ) - - if self.forget: - return result - - reply = None - - try: - result.raise_for_status() - reply = result.json() - except Exception: - - if reply is None: - try: - reply = result.json() - except Exception: - reply = dict(message=result.text) - - self.debug(json.dumps(reply)) - - if "message" in reply: - error = reply["message"] - - if "context" in reply and "required_terms" in reply["context"]: - e = [error] - for t in reply["context"]["required_terms"]: - e.append( - "To access this resource, you first need to accept the terms" - "of '%s' at %s" % (t["title"], t["url"]) - ) - error = ". ".join(e) - raise Exception(error) - else: - raise - - if not self.wait_until_complete: - return Result(self, reply) - - sleep = 1 - - while True: - - self.debug("REPLY %s", reply) - - if reply["state"] != self.last_state: - self.info("Request is %s" % (reply["state"],)) - self.last_state = reply["state"] - - if reply["state"] == "completed": - self.debug("Done") - - if "result" in reply: - return reply["result"] - - return Result(self, reply) - - if reply["state"] in ("queued", "running"): - rid = reply["request_id"] - - self.debug("Request ID is %s, sleep %s", rid, sleep) - time.sleep(sleep) - sleep *= 1.5 - if sleep > self.sleep_max: - sleep = self.sleep_max - - task_url = "%s/tasks/%s" % (self.url, rid) - self.debug("GET %s", task_url) - - result = self.robust(session.get)( - task_url, verify=self.verify, timeout=self.timeout - ) - result.raise_for_status() - reply = result.json() - continue - - if reply["state"] in ("failed",): - self.error("Message: %s", reply["error"].get("message")) - self.error("Reason: %s", reply["error"].get("reason")) - for n in ( - reply.get("error", {}) - .get("context", {}) - .get("traceback", "") - .split("\n") - ): - if n.strip() == "" and not self.full_stack: - break - self.error(" %s", n) - raise Exception( - "%s. %s." - % (reply["error"].get("message"), reply["error"].get("reason")) - ) - - raise Exception("Unknown API state [%s]" % (reply["state"],)) - - def info(self, *args, **kwargs): - if self.info_callback: - self.info_callback(*args, **kwargs) - else: - self.logger.info(*args, **kwargs) - - def warning(self, *args, **kwargs): - if self.warning_callback: - self.warning_callback(*args, **kwargs) - else: - self.logger.warning(*args, **kwargs) - - def error(self, *args, **kwargs): - if self.error_callback: - self.error_callback(*args, **kwargs) - else: - self.logger.error(*args, **kwargs) - - def debug(self, *args, **kwargs): - if self.debug_callback: - self.debug_callback(*args, **kwargs) - else: - self.logger.debug(*args, **kwargs) - - def _download(self, results, targets=None): - - if isinstance(results, Result): - if targets: - path = targets.pop(0) - else: - path = None - return results.download(path) - - if isinstance(results, (list, tuple)): - return [self._download(x, targets) for x in results] - - if isinstance(results, dict): - - if "location" in results and "contentLength" in results: - reply = dict( - location=results["location"], - content_length=results["contentLength"], - content_type=results.get("contentType"), - ) - - if targets: - path = targets.pop(0) - else: - path = None - - return Result(self, reply).download(path) - - r = {} - for k, v in results.items(): - r[v] = self._download(v, targets) - return r - - return results - - def download(self, results, targets=None): - if targets: - # Make a copy - targets = [t for t in targets] - return self._download(results, targets) - - def remote(self, url): - r = requests.head(url) - reply = dict( - location=url, - content_length=r.headers["Content-Length"], - content_type=r.headers["Content-Type"], - ) - return Result(self, reply) - - def robust(self, call): - def retriable(code, reason): - - if code in [ - requests.codes.internal_server_error, - requests.codes.bad_gateway, - requests.codes.service_unavailable, - requests.codes.gateway_timeout, - requests.codes.too_many_requests, - requests.codes.request_timeout, - ]: - return True - - return False - - def wrapped(*args, **kwargs): - tries = 0 - while tries < self.retry_max: - try: - r = call(*args, **kwargs) - except ( - requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout, - ) as e: - r = None - self.warning( - "Recovering from connection error [%s], attemps %s of %s", - e, - tries, - self.retry_max, - ) - - if r is not None: - if not retriable(r.status_code, r.reason): - return r - try: - self.warning(r.json()["reason"]) - except Exception: - pass - self.warning( - "Recovering from HTTP error [%s %s], attemps %s of %s", - r.status_code, - r.reason, - tries, - self.retry_max, - ) - - tries += 1 - - self.warning("Retrying in %s seconds", self.sleep_max) - time.sleep(self.sleep_max) - self.info("Retrying now...") - - return wrapped |