summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGard Spreemann <gspr@nonempty.org>2020-01-06 15:05:46 +0100
committerGard Spreemann <gspr@nonempty.org>2020-01-06 15:05:46 +0100
commit0c28fb7da83c7afba74416a88682eb931d874ce4 (patch)
treed9b5b9c9f6c729ec471a29b6a91f2423a490d2a6
parent11e00a4a0a3771097ddddcdb8fa3c417407adf3b (diff)
Upstream 0.2.5 tarball as released on PyPI.upstream/0.2.5
-rw-r--r--PKG-INFO2
-rw-r--r--cds-test.py32
-rw-r--r--cdsapi.egg-info/PKG-INFO2
-rw-r--r--cdsapi.egg-info/SOURCES.txt1
-rw-r--r--cdsapi.egg-info/requires.txt1
-rw-r--r--cdsapi/api.py235
-rwxr-xr-xexample-era5.py2
-rw-r--r--setup.py3
8 files changed, 246 insertions, 32 deletions
diff --git a/PKG-INFO b/PKG-INFO
index bb3309d..5c327c1 100644
--- a/PKG-INFO
+++ b/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: cdsapi
-Version: 0.1.4
+Version: 0.2.5
Summary: Climate Data Store API
Home-page: https://software.ecmwf.int/stash/projects/CDS/repos/cdsapi
Author: ECMWF
diff --git a/cds-test.py b/cds-test.py
new file mode 100644
index 0000000..30d3fa0
--- /dev/null
+++ b/cds-test.py
@@ -0,0 +1,32 @@
+#!/usr/bin/env python
+
+# (C) Copyright 2018 ECMWF.
+#
+# This software is licensed under the terms of the Apache Licence Version 2.0
+# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
+# In applying this licence, ECMWF does not waive the privileges and immunities
+# granted to it by virtue of its status as an intergovernmental organisation nor
+# does it submit to any jurisdiction.
+
+import cdsapi
+
+
+c = cdsapi.Client(full_stack=True,
+ #url='https://cds-test.climate.copernicus.eu/api/v2',
+ #key='1:1c2ab50b-2208-4d84-b59d-87154cae4441',
+ debug=True, quiet=False)
+
+# print(c.status())
+
+r = c.retrieve(
+ "reanalysis-era5-pressure-levels",
+ {
+ "variable": "temperature",
+ "pressure_level": "all",
+ "product_type": "reanalysis",
+ "date": "2017-12-01/2017-12-30",
+ "time": "19:00",
+ },
+)
+
+# r.download("x.grib")
diff --git a/cdsapi.egg-info/PKG-INFO b/cdsapi.egg-info/PKG-INFO
index bb3309d..5c327c1 100644
--- a/cdsapi.egg-info/PKG-INFO
+++ b/cdsapi.egg-info/PKG-INFO
@@ -1,6 +1,6 @@
Metadata-Version: 1.1
Name: cdsapi
-Version: 0.1.4
+Version: 0.2.5
Summary: Climate Data Store API
Home-page: https://software.ecmwf.int/stash/projects/CDS/repos/cdsapi
Author: ECMWF
diff --git a/cdsapi.egg-info/SOURCES.txt b/cdsapi.egg-info/SOURCES.txt
index cbd1fce..9196f00 100644
--- a/cdsapi.egg-info/SOURCES.txt
+++ b/cdsapi.egg-info/SOURCES.txt
@@ -2,6 +2,7 @@ CONTRIBUTING.rst
LICENSE.txt
MANIFEST.in
README.rst
+cds-test.py
example-era5.py
example-glaciers.py
setup.cfg
diff --git a/cdsapi.egg-info/requires.txt b/cdsapi.egg-info/requires.txt
index 8f94722..58ac32b 100644
--- a/cdsapi.egg-info/requires.txt
+++ b/cdsapi.egg-info/requires.txt
@@ -1 +1,2 @@
requests>=2.5.0
+tqdm
diff --git a/cdsapi/api.py b/cdsapi/api.py
index 88a3a8c..80eaa59 100644
--- a/cdsapi/api.py
+++ b/cdsapi/api.py
@@ -12,9 +12,16 @@ import json
import time
import os
import logging
-
+import uuid
import requests
+try:
+ from urllib.parse import urljoin
+except ImportError:
+ from urlparse import urljoin
+
+from tqdm import tqdm
+
def bytes_to_string(n):
u = ['', 'K', 'M', 'G', 'T', 'P']
@@ -36,6 +43,24 @@ def read_config(path):
return config
+def toJSON(obj):
+
+ to_json = getattr(obj, "toJSON", None)
+ if callable(to_json):
+ return to_json()
+
+ if isinstance(obj, (list, tuple)):
+ return [toJSON(x) for x in obj]
+
+ if isinstance(obj, dict):
+ r = {}
+ for k, v in obj.items():
+ r[k] = toJSON(v)
+ return r
+
+ return obj
+
+
class Result(object):
def __init__(self, client, reply):
@@ -53,9 +78,21 @@ class Result(object):
self.info = client.info
self.warning = client.warning
self.error = client.error
+ self.sleep_max = client.sleep_max
+ self.retry_max = client.retry_max
+
+ self.timeout = client.timeout
+ self.progress = client.progress
self._deleted = False
+ def toJSON(self):
+ r = dict(resultType='url',
+ contentType=self.content_type,
+ contentLength=self.content_length,
+ location=self.location)
+ return r
+
def _download(self, url, size, target):
if target is None:
@@ -64,18 +101,56 @@ class Result(object):
self.info("Downloading %s to %s (%s)", url, target, bytes_to_string(size))
start = time.time()
- r = self.robust(requests.get)(url, stream=True, verify=self.verify)
- try:
- r.raise_for_status()
+ mode = 'wb'
+ total = 0
+ sleep = 10
+ tries = 0
+ headers = None
+
+ while tries < self.retry_max:
- total = 0
- with open(target, 'wb') as f:
- for chunk in r.iter_content(chunk_size=1024):
- if chunk:
- f.write(chunk)
- total += len(chunk)
- finally:
- r.close()
+ r = self.robust(requests.get)(url,
+ stream=True,
+ verify=self.verify,
+ headers=headers,
+ timeout=self.timeout)
+ try:
+ r.raise_for_status()
+
+ with tqdm(total=size,
+ unit_scale=True,
+ unit_divisor=1024,
+ unit='B',
+ disable=not self.progress,
+ leave=False,
+ ) as pbar:
+ pbar.update(total)
+ with open(target, mode) as f:
+ for chunk in r.iter_content(chunk_size=1024):
+ if chunk:
+ f.write(chunk)
+ total += len(chunk)
+ pbar.update(len(chunk))
+
+ except requests.exceptions.ConnectionError as e:
+ self.error("Download interupted: %s" % (e,))
+ finally:
+ r.close()
+
+ if total >= size:
+ break
+
+ self.error("Download incomplete, downloaded %s byte(s) out of %s" % (total, size))
+ self.warning("Sleeping %s seconds" % (sleep,))
+ time.sleep(sleep)
+ mode = 'ab'
+ total = os.path.getsize(target)
+ sleep *= 1.5
+ if sleep > self.sleep_max:
+ sleep = self.sleep_max
+ headers = {'Range': 'bytes=%d-' % total}
+ tries += 1
+ self.warning("Resuming download at byte %s" % (total, ))
if total != size:
raise Exception("Download failed: downloaded %s byte(s) out of %s" % (total, size))
@@ -97,7 +172,7 @@ class Result(object):
@property
def location(self):
- return self.reply['location']
+ return urljoin(self._url, self.reply['location'])
@property
def content_type(self):
@@ -109,9 +184,10 @@ class Result(object):
self.location)
def check(self):
- self.debug("HEAD %s", self.reply['location'])
- metadata = self.robust(self.session.head)(self.reply['location'],
- verify=self.verify)
+ self.debug("HEAD %s", self.location)
+ metadata = self.robust(self.session.head)(self.location,
+ verify=self.verify,
+ timeout=self.timeout)
metadata.raise_for_status()
self.debug(metadata.headers)
return metadata
@@ -156,7 +232,8 @@ class Client(object):
quiet=False,
debug=False,
verify=None,
- timeout=None,
+ timeout=60,
+ progress=True,
full_stack=False,
delete=True,
retry_max=500,
@@ -199,6 +276,8 @@ class Client(object):
self.key = key
self.quiet = quiet
+ self.progress = progress and not quiet
+
self.verify = True if verify else False
self.timeout = timeout
self.sleep_max = sleep_max
@@ -220,6 +299,7 @@ class Client(object):
quiet=self.quiet,
verify=self.verify,
timeout=self.timeout,
+ progress=self.progress,
sleep_max=self.sleep_max,
retry_max=self.retry_max,
full_stack=self.full_stack,
@@ -227,22 +307,68 @@ class Client(object):
))
def retrieve(self, name, request, target=None):
- result = self._api('%s/resources/%s' % (self.url, name), request)
+ result = self._api('%s/resources/%s' % (self.url, name), request, 'POST')
if target is not None:
result.download(target)
return result
- def identity(self):
- return self._api('%s/resources' % (self.url,), {})
+ def service(self, name, *args, **kwargs):
+ self.delete = False # Don't delete results
+ name = '/'.join(name.split('.'))
+ request = toJSON(dict(args=args, kwargs=kwargs))
+ result = self._api('%s/tasks/services/%s/clientid-%s' % (self.url, name, uuid.uuid4().hex), request, 'PUT')
+ return result
+
+ def workflow(self, code, *args, **kwargs):
+ params = dict(code=code,
+ args=args,
+ kwargs=kwargs,
+ workflow_name='application')
+ return self.service("tool.toolbox.orchestrator.run_workflow", params)
+
+ def status(self, context=None):
+ url = '%s/status.json' % (self.url,)
+ r = requests.get(url, verify=self.verify)
+ r.raise_for_status()
+ return r.json()
+
+ def _status(self, url):
+ try:
+ status = self.status(url)
+
+ info = status.get('info', [])
+ if not isinstance(info, list):
+ info = [info]
+ for i in info:
+ self.info("%s", i)
+
+ warning = status.get('warning', [])
+ if not isinstance(warning, list):
+ warning = [warning]
+ for w in warning:
+ self.warning("%s", w)
- def _api(self, url, request):
+ except Exception:
+ pass
+
+ def _api(self, url, request, method):
+
+ self._status(url)
session = self.session
self.info("Sending request to %s", url)
- self.debug("POST %s %s", url, json.dumps(request))
+ self.debug("%s %s %s", method, url, json.dumps(request))
- result = self.robust(session.post)(url, json=request, verify=self.verify)
+ if method == 'PUT':
+ action = session.put
+ else:
+ action = session.post
+
+ result = self.robust(action)(url,
+ json=request,
+ verify=self.verify,
+ timeout=self.timeout)
reply = None
try:
@@ -272,7 +398,6 @@ class Client(object):
raise
sleep = 1
- start = time.time()
while True:
@@ -284,14 +409,15 @@ class Client(object):
if reply['state'] == 'completed':
self.debug("Done")
+
+ if 'result' in reply:
+ return reply['result']
+
return Result(self, reply)
if reply['state'] in ('queued', 'running'):
rid = reply['request_id']
- if self.timeout and (time.time() - start > self.timeout):
- raise Exception('TIMEOUT')
-
self.debug("Request ID is %s, sleep %s", rid, sleep)
time.sleep(sleep)
sleep *= 1.5
@@ -301,7 +427,9 @@ class Client(object):
task_url = '%s/tasks/%s' % (self.url, rid)
self.debug("GET %s", task_url)
- result = self.robust(session.get)(task_url, verify=self.verify)
+ result = self.robust(session.get)(task_url,
+ verify=self.verify,
+ timeout=self.timeout)
result.raise_for_status()
reply = result.json()
continue
@@ -341,6 +469,52 @@ class Client(object):
else:
self.logger.debug(*args, **kwargs)
+ def _download(self, results, targets=None):
+
+ if isinstance(results, Result):
+ if targets:
+ path = targets.pop(0)
+ else:
+ path = None
+ return results.download(path)
+
+ if isinstance(results, (list, tuple)):
+ return [self._download(x, targets) for x in results]
+
+ if isinstance(results, dict):
+
+ if 'location' in results and 'contentLength' in results:
+ reply = dict(location=results['location'],
+ content_length=results['contentLength'],
+ content_type=results.get('contentType'))
+
+ if targets:
+ path = targets.pop(0)
+ else:
+ path = None
+
+ return Result(self, reply).download(path)
+
+ r = {}
+ for k, v in results.items():
+ r[v] = self._download(v, targets)
+ return r
+
+ return results
+
+ def download(self, results, targets=None):
+ if targets:
+ # Make a copy
+ targets = [t for t in targets]
+ return self._download(results, targets)
+
+ def remote(self, url):
+ r = requests.head(url)
+ reply = dict(location=url,
+ content_length=r.headers['Content-Length'],
+ content_type=r.headers['Content-Type'])
+ return Result(self, reply)
+
def robust(self, call):
def retriable(code, reason):
@@ -368,6 +542,10 @@ class Client(object):
if r is not None:
if not retriable(r.status_code, r.reason):
return r
+ try:
+ self.warning(r.json()['reason'])
+ except Exception:
+ pass
self.warning("Recovering from HTTP error [%s %s], attemps %s of %s",
r.status_code, r.reason, tries, self.retry_max)
@@ -375,5 +553,6 @@ class Client(object):
self.warning("Retrying in %s seconds", self.sleep_max)
time.sleep(self.sleep_max)
+ self.info("Retrying now...")
return wrapped
diff --git a/example-era5.py b/example-era5.py
index c0ad58b..979c352 100755
--- a/example-era5.py
+++ b/example-era5.py
@@ -10,7 +10,7 @@
import cdsapi
-c = cdsapi.Client(debug=True)
+c = cdsapi.Client()
r = c.retrieve(
"reanalysis-era5-single-levels",
diff --git a/setup.py b/setup.py
index 041cdcc..d1d74bd 100644
--- a/setup.py
+++ b/setup.py
@@ -30,7 +30,7 @@ def read(fname):
return io.open(file_path, encoding='utf-8').read()
-version = '0.1.4'
+version = '0.2.5'
setuptools.setup(
@@ -46,6 +46,7 @@ setuptools.setup(
include_package_data=True,
install_requires=[
'requests>=2.5.0',
+ 'tqdm',
],
zip_safe=True,
classifiers=[