From 0c28fb7da83c7afba74416a88682eb931d874ce4 Mon Sep 17 00:00:00 2001 From: Gard Spreemann Date: Mon, 6 Jan 2020 15:05:46 +0100 Subject: Upstream 0.2.5 tarball as released on PyPI. --- PKG-INFO | 2 +- cds-test.py | 32 ++++++ cdsapi.egg-info/PKG-INFO | 2 +- cdsapi.egg-info/SOURCES.txt | 1 + cdsapi.egg-info/requires.txt | 1 + cdsapi/api.py | 235 +++++++++++++++++++++++++++++++++++++------ example-era5.py | 2 +- setup.py | 3 +- 8 files changed, 246 insertions(+), 32 deletions(-) create mode 100644 cds-test.py diff --git a/PKG-INFO b/PKG-INFO index bb3309d..5c327c1 100644 --- a/PKG-INFO +++ b/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: cdsapi -Version: 0.1.4 +Version: 0.2.5 Summary: Climate Data Store API Home-page: https://software.ecmwf.int/stash/projects/CDS/repos/cdsapi Author: ECMWF diff --git a/cds-test.py b/cds-test.py new file mode 100644 index 0000000..30d3fa0 --- /dev/null +++ b/cds-test.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python + +# (C) Copyright 2018 ECMWF. +# +# This software is licensed under the terms of the Apache Licence Version 2.0 +# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0. +# In applying this licence, ECMWF does not waive the privileges and immunities +# granted to it by virtue of its status as an intergovernmental organisation nor +# does it submit to any jurisdiction. + +import cdsapi + + +c = cdsapi.Client(full_stack=True, + #url='https://cds-test.climate.copernicus.eu/api/v2', + #key='1:1c2ab50b-2208-4d84-b59d-87154cae4441', + debug=True, quiet=False) + +# print(c.status()) + +r = c.retrieve( + "reanalysis-era5-pressure-levels", + { + "variable": "temperature", + "pressure_level": "all", + "product_type": "reanalysis", + "date": "2017-12-01/2017-12-30", + "time": "19:00", + }, +) + +# r.download("x.grib") diff --git a/cdsapi.egg-info/PKG-INFO b/cdsapi.egg-info/PKG-INFO index bb3309d..5c327c1 100644 --- a/cdsapi.egg-info/PKG-INFO +++ b/cdsapi.egg-info/PKG-INFO @@ -1,6 +1,6 @@ Metadata-Version: 1.1 Name: cdsapi -Version: 0.1.4 +Version: 0.2.5 Summary: Climate Data Store API Home-page: https://software.ecmwf.int/stash/projects/CDS/repos/cdsapi Author: ECMWF diff --git a/cdsapi.egg-info/SOURCES.txt b/cdsapi.egg-info/SOURCES.txt index cbd1fce..9196f00 100644 --- a/cdsapi.egg-info/SOURCES.txt +++ b/cdsapi.egg-info/SOURCES.txt @@ -2,6 +2,7 @@ CONTRIBUTING.rst LICENSE.txt MANIFEST.in README.rst +cds-test.py example-era5.py example-glaciers.py setup.cfg diff --git a/cdsapi.egg-info/requires.txt b/cdsapi.egg-info/requires.txt index 8f94722..58ac32b 100644 --- a/cdsapi.egg-info/requires.txt +++ b/cdsapi.egg-info/requires.txt @@ -1 +1,2 @@ requests>=2.5.0 +tqdm diff --git a/cdsapi/api.py b/cdsapi/api.py index 88a3a8c..80eaa59 100644 --- a/cdsapi/api.py +++ b/cdsapi/api.py @@ -12,9 +12,16 @@ import json import time import os import logging - +import uuid import requests +try: + from urllib.parse import urljoin +except ImportError: + from urlparse import urljoin + +from tqdm import tqdm + def bytes_to_string(n): u = ['', 'K', 'M', 'G', 'T', 'P'] @@ -36,6 +43,24 @@ def read_config(path): return config +def toJSON(obj): + + to_json = getattr(obj, "toJSON", None) + if callable(to_json): + return to_json() + + if isinstance(obj, (list, tuple)): + return [toJSON(x) for x in obj] + + if isinstance(obj, dict): + r = {} + for k, v in obj.items(): + r[k] = toJSON(v) + return r + + return obj + + class Result(object): def __init__(self, client, reply): @@ -53,9 +78,21 @@ class Result(object): self.info = client.info self.warning = client.warning self.error = client.error + self.sleep_max = client.sleep_max + self.retry_max = client.retry_max + + self.timeout = client.timeout + self.progress = client.progress self._deleted = False + def toJSON(self): + r = dict(resultType='url', + contentType=self.content_type, + contentLength=self.content_length, + location=self.location) + return r + def _download(self, url, size, target): if target is None: @@ -64,18 +101,56 @@ class Result(object): self.info("Downloading %s to %s (%s)", url, target, bytes_to_string(size)) start = time.time() - r = self.robust(requests.get)(url, stream=True, verify=self.verify) - try: - r.raise_for_status() + mode = 'wb' + total = 0 + sleep = 10 + tries = 0 + headers = None + + while tries < self.retry_max: - total = 0 - with open(target, 'wb') as f: - for chunk in r.iter_content(chunk_size=1024): - if chunk: - f.write(chunk) - total += len(chunk) - finally: - r.close() + r = self.robust(requests.get)(url, + stream=True, + verify=self.verify, + headers=headers, + timeout=self.timeout) + try: + r.raise_for_status() + + with tqdm(total=size, + unit_scale=True, + unit_divisor=1024, + unit='B', + disable=not self.progress, + leave=False, + ) as pbar: + pbar.update(total) + with open(target, mode) as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: + f.write(chunk) + total += len(chunk) + pbar.update(len(chunk)) + + except requests.exceptions.ConnectionError as e: + self.error("Download interupted: %s" % (e,)) + finally: + r.close() + + if total >= size: + break + + self.error("Download incomplete, downloaded %s byte(s) out of %s" % (total, size)) + self.warning("Sleeping %s seconds" % (sleep,)) + time.sleep(sleep) + mode = 'ab' + total = os.path.getsize(target) + sleep *= 1.5 + if sleep > self.sleep_max: + sleep = self.sleep_max + headers = {'Range': 'bytes=%d-' % total} + tries += 1 + self.warning("Resuming download at byte %s" % (total, )) if total != size: raise Exception("Download failed: downloaded %s byte(s) out of %s" % (total, size)) @@ -97,7 +172,7 @@ class Result(object): @property def location(self): - return self.reply['location'] + return urljoin(self._url, self.reply['location']) @property def content_type(self): @@ -109,9 +184,10 @@ class Result(object): self.location) def check(self): - self.debug("HEAD %s", self.reply['location']) - metadata = self.robust(self.session.head)(self.reply['location'], - verify=self.verify) + self.debug("HEAD %s", self.location) + metadata = self.robust(self.session.head)(self.location, + verify=self.verify, + timeout=self.timeout) metadata.raise_for_status() self.debug(metadata.headers) return metadata @@ -156,7 +232,8 @@ class Client(object): quiet=False, debug=False, verify=None, - timeout=None, + timeout=60, + progress=True, full_stack=False, delete=True, retry_max=500, @@ -199,6 +276,8 @@ class Client(object): self.key = key self.quiet = quiet + self.progress = progress and not quiet + self.verify = True if verify else False self.timeout = timeout self.sleep_max = sleep_max @@ -220,6 +299,7 @@ class Client(object): quiet=self.quiet, verify=self.verify, timeout=self.timeout, + progress=self.progress, sleep_max=self.sleep_max, retry_max=self.retry_max, full_stack=self.full_stack, @@ -227,22 +307,68 @@ class Client(object): )) def retrieve(self, name, request, target=None): - result = self._api('%s/resources/%s' % (self.url, name), request) + result = self._api('%s/resources/%s' % (self.url, name), request, 'POST') if target is not None: result.download(target) return result - def identity(self): - return self._api('%s/resources' % (self.url,), {}) + def service(self, name, *args, **kwargs): + self.delete = False # Don't delete results + name = '/'.join(name.split('.')) + request = toJSON(dict(args=args, kwargs=kwargs)) + result = self._api('%s/tasks/services/%s/clientid-%s' % (self.url, name, uuid.uuid4().hex), request, 'PUT') + return result + + def workflow(self, code, *args, **kwargs): + params = dict(code=code, + args=args, + kwargs=kwargs, + workflow_name='application') + return self.service("tool.toolbox.orchestrator.run_workflow", params) + + def status(self, context=None): + url = '%s/status.json' % (self.url,) + r = requests.get(url, verify=self.verify) + r.raise_for_status() + return r.json() + + def _status(self, url): + try: + status = self.status(url) + + info = status.get('info', []) + if not isinstance(info, list): + info = [info] + for i in info: + self.info("%s", i) + + warning = status.get('warning', []) + if not isinstance(warning, list): + warning = [warning] + for w in warning: + self.warning("%s", w) - def _api(self, url, request): + except Exception: + pass + + def _api(self, url, request, method): + + self._status(url) session = self.session self.info("Sending request to %s", url) - self.debug("POST %s %s", url, json.dumps(request)) + self.debug("%s %s %s", method, url, json.dumps(request)) - result = self.robust(session.post)(url, json=request, verify=self.verify) + if method == 'PUT': + action = session.put + else: + action = session.post + + result = self.robust(action)(url, + json=request, + verify=self.verify, + timeout=self.timeout) reply = None try: @@ -272,7 +398,6 @@ class Client(object): raise sleep = 1 - start = time.time() while True: @@ -284,14 +409,15 @@ class Client(object): if reply['state'] == 'completed': self.debug("Done") + + if 'result' in reply: + return reply['result'] + return Result(self, reply) if reply['state'] in ('queued', 'running'): rid = reply['request_id'] - if self.timeout and (time.time() - start > self.timeout): - raise Exception('TIMEOUT') - self.debug("Request ID is %s, sleep %s", rid, sleep) time.sleep(sleep) sleep *= 1.5 @@ -301,7 +427,9 @@ class Client(object): task_url = '%s/tasks/%s' % (self.url, rid) self.debug("GET %s", task_url) - result = self.robust(session.get)(task_url, verify=self.verify) + result = self.robust(session.get)(task_url, + verify=self.verify, + timeout=self.timeout) result.raise_for_status() reply = result.json() continue @@ -341,6 +469,52 @@ class Client(object): else: self.logger.debug(*args, **kwargs) + def _download(self, results, targets=None): + + if isinstance(results, Result): + if targets: + path = targets.pop(0) + else: + path = None + return results.download(path) + + if isinstance(results, (list, tuple)): + return [self._download(x, targets) for x in results] + + if isinstance(results, dict): + + if 'location' in results and 'contentLength' in results: + reply = dict(location=results['location'], + content_length=results['contentLength'], + content_type=results.get('contentType')) + + if targets: + path = targets.pop(0) + else: + path = None + + return Result(self, reply).download(path) + + r = {} + for k, v in results.items(): + r[v] = self._download(v, targets) + return r + + return results + + def download(self, results, targets=None): + if targets: + # Make a copy + targets = [t for t in targets] + return self._download(results, targets) + + def remote(self, url): + r = requests.head(url) + reply = dict(location=url, + content_length=r.headers['Content-Length'], + content_type=r.headers['Content-Type']) + return Result(self, reply) + def robust(self, call): def retriable(code, reason): @@ -368,6 +542,10 @@ class Client(object): if r is not None: if not retriable(r.status_code, r.reason): return r + try: + self.warning(r.json()['reason']) + except Exception: + pass self.warning("Recovering from HTTP error [%s %s], attemps %s of %s", r.status_code, r.reason, tries, self.retry_max) @@ -375,5 +553,6 @@ class Client(object): self.warning("Retrying in %s seconds", self.sleep_max) time.sleep(self.sleep_max) + self.info("Retrying now...") return wrapped diff --git a/example-era5.py b/example-era5.py index c0ad58b..979c352 100755 --- a/example-era5.py +++ b/example-era5.py @@ -10,7 +10,7 @@ import cdsapi -c = cdsapi.Client(debug=True) +c = cdsapi.Client() r = c.retrieve( "reanalysis-era5-single-levels", diff --git a/setup.py b/setup.py index 041cdcc..d1d74bd 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ def read(fname): return io.open(file_path, encoding='utf-8').read() -version = '0.1.4' +version = '0.2.5' setuptools.setup( @@ -46,6 +46,7 @@ setuptools.setup( include_package_data=True, install_requires=[ 'requests>=2.5.0', + 'tqdm', ], zip_safe=True, classifiers=[ -- cgit v1.2.3