#!/usr/bin/env python
u"""
utilities.py
Written by Tyler Sutterley (10/2025)
Download and management utilities for syncing time and auxiliary files
PYTHON DEPENDENCIES:
lxml: processing XML and HTML in Python
https://pypi.python.org/pypi/lxml
UPDATE HISTORY:
Updated 10/2025: switch from_gfz to https as ftp server is being retired
Updated 11/2024: simplify unique file name function
add function to scrape GSFC website for GRACE mascon urls
Updated 10/2024: update CMR search utility to replace deprecated scrolling
https://cmr.earthdata.nasa.gov/search/site/docs/search/api.html
Updated 08/2024: generalize hash function to use any available algorithm
Updated 06/2024: added wrapper to importlib for optional dependencies
make default case for an import exception be a class
Updated 04/2024: added argument for products in CMR shortname query
Updated 11/2023: updated ssl context to fix deprecation error
Updated 10/2023: add capability to download CSR LRI solutions
Updated 06/2023: add functions to retrieve and revoke Earthdata tokens
add TN11e.txt file to list of CSR SLR downloads
Updated 05/2023: add reify decorator for evaluation of properties
use pathlib to define and operate on paths
Updated 04/2023: use release-03 GFZ GravIS SLR and geocenter files
Updated 03/2023: place boto3 import within try/except statement
Updated 01/2023: add default ssl context attribute with protocol
Updated 12/2022: add variables for NASA DAAC and s3 providers
add functions for managing and maintaining git repositories
Updated 11/2022: add CMR queries for collection metadata
exposed GSFC SLR url for weekly 5x5 harmonics as an option
Updated 08/2022: add regular expression function for finding files
Updated 07/2022: add s3 endpoints and buckets for Earthdata Cumulus
Updated 05/2022: function for extracting bucket name from presigned url
Updated 04/2022: updated docstrings to numpy documentation format
update CMR queries to prepare for version 1 of RL06
Updated 03/2022: add NASA Common Metadata Repository (CMR) queries
added attempt login function to recursively check credentials
Updated 11/2021: add CSR satellite laser ranging oblateness file
Updated 10/2021: using python logging for handling verbose output
Updated 09/2021: added generic list from Apache http server
Updated 07/2021: added unique filename opener for log files
Updated 06/2021: add parser for converting file lines to arguments
Updated 05/2021: download GFZ satellite laser ranging and GravIS files
Updated 04/2021: download CSR SLR figure axis and azimuthal dependence files
Updated 03/2021: added sha1 option for retrieving file hashes
Updated 12/2020: added ICGEM list for static models
added figshare geocenter download for Sutterley and Velicogna files
added download for satellite laser ranging (SLR) files from UTCSR
added file object keyword for downloads if verbose printing to file
renamed podaac_list() and from_podaac() to drive_list() and from_drive()
added username and password to ftp functions. added ftp connection check
Updated 09/2020: copy from http and https to bytesIO object in chunks
use netrc credentials if not entered from PO.DAAC functions
generalize build opener function for different Earthdata instances
Updated 08/2020: add PO.DAAC Drive opener, login and download functions
Written 08/2020
"""
from __future__ import print_function, division, annotations
import sys
import os
import re
import io
import ssl
import json
import netrc
import ftplib
import shutil
import base64
import socket
import getpass
import inspect
import hashlib
import logging
import pathlib
import builtins
import dateutil
import warnings
import importlib
import posixpath
import lxml.etree
import subprocess
import calendar,time
if sys.version_info[0] == 2:
from cookielib import CookieJar
from urllib import urlencode
import urllib2
else:
from http.cookiejar import CookieJar
from urllib.parse import urlencode
import urllib.request as urllib2
# PURPOSE: get absolute path within a package from a relative path
[docs]
def get_data_path(relpath: list | str | pathlib.Path):
"""
Get the absolute path within a package from a relative path
Parameters
----------
relpath: list, str or pathlib.Path
relative path
"""
# current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
filepath = pathlib.Path(filename).absolute().parent
if isinstance(relpath, list):
# use *splat operator to extract from list
return filepath.joinpath(*relpath)
elif isinstance(relpath, str):
return filepath.joinpath(relpath)
def import_dependency(
name: str,
extra: str = "",
raise_exception: bool = False
):
"""
Import an optional dependency
Adapted from ``pandas.compat._optional::import_optional_dependency``
Parameters
----------
name: str
Module name
extra: str, default ""
Additional text to include in the ``ImportError`` message
raise_exception: bool, default False
Raise an ``ImportError`` if the module is not found
Returns
-------
module: obj
Imported module
"""
# check if the module name is a string
msg = f"Invalid module name: '{name}'; must be a string"
assert isinstance(name, str), msg
# default error if module cannot be imported
err = f"Missing optional dependency '{name}'. {extra}"
module = type('module', (), {})
# try to import the module
try:
module = importlib.import_module(name)
except (ImportError, ModuleNotFoundError) as exc:
if raise_exception:
raise ImportError(err) from exc
else:
logging.debug(err)
# return the module
return module
[docs]
class reify(object):
"""Class decorator that puts the result of the method it
decorates into the instance"""
def __init__(self, wrapped):
self.wrapped = wrapped
self.__name__ = wrapped.__name__
self.__doc__ = wrapped.__doc__
def __get__(self, inst, objtype=None):
if inst is None:
return self
val = self.wrapped(inst)
setattr(inst, self.wrapped.__name__, val)
return val
# PURPOSE: get the hash value of a file
[docs]
def get_hash(
local: str | io.IOBase | pathlib.Path,
algorithm: str = 'md5'
):
"""
Get the hash value from a local file or ``BytesIO`` object
Parameters
----------
local: obj, str or pathlib.Path
BytesIO object or path to file
algorithm: str, default 'md5'
hashing algorithm for checksum validation
"""
# check if open file object or if local file exists
if isinstance(local, io.IOBase):
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local.getvalue()).hexdigest()
else:
raise ValueError(f'Invalid hashing algorithm: {algorithm}')
elif isinstance(local, (str, pathlib.Path)):
# generate checksum hash for local file
local = pathlib.Path(local).expanduser()
# if file currently doesn't exist, return empty string
if not local.exists():
return ''
# open the local_file in binary read mode
with local.open(mode='rb') as local_buffer:
# generate checksum hash for a given type
if algorithm in hashlib.algorithms_available:
return hashlib.new(algorithm, local_buffer.read()).hexdigest()
else:
raise ValueError(f'Invalid hashing algorithm: {algorithm}')
else:
return ''
# PURPOSE: get the git hash value
[docs]
def get_git_revision_hash(
refname: str = 'HEAD',
short: bool = False
):
"""
Get the ``git`` hash value for a particular reference
Parameters
----------
refname: str, default HEAD
Symbolic reference name
short: bool, default False
Return the shorted hash value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'rev-parse']
cmd.append('--short') if short else None
cmd.append(refname)
# get output
with warnings.catch_warnings():
return str(subprocess.check_output(cmd), encoding='utf8').strip()
# PURPOSE: get the current git status
[docs]
def get_git_status():
"""Get the status of a ``git`` repository as a boolean value
"""
# get path to .git directory from current file path
filename = inspect.getframeinfo(inspect.currentframe()).filename
basepath = pathlib.Path(filename).absolute().parent.parent
gitpath = basepath.joinpath('.git')
# build command
cmd = ['git', f'--git-dir={gitpath}', 'status', '--porcelain']
with warnings.catch_warnings():
return bool(subprocess.check_output(cmd))
# PURPOSE: recursively split a url path
[docs]
def url_split(s: str):
"""
Recursively split a url path into a list
Parameters
----------
s: str
url string
"""
head, tail = posixpath.split(s)
if head in ('http:','https:','ftp:','s3:'):
return s,
elif head in ('', posixpath.sep):
return tail,
return url_split(head) + (tail,)
# PURPOSE: convert file lines to arguments
[docs]
def convert_arg_line_to_args(arg_line):
"""
Convert file lines to arguments
Parameters
----------
arg_line: str
line string containing a single argument and/or comments
"""
# remove commented lines and after argument comments
for arg in re.sub(r'\#(.*?)$',r'',arg_line).split():
if not arg.strip():
continue
yield arg
# PURPOSE: returns the Unix timestamp value for a formatted date string
[docs]
def get_unix_time(
time_string: str,
format: str = '%Y-%m-%d %H:%M:%S'
):
"""
Get the Unix timestamp value for a formatted date string
Parameters
----------
time_string: str
formatted time string to parse
format: str, default '%Y-%m-%d %H:%M:%S'
format for input time string
"""
try:
parsed_time = time.strptime(time_string.rstrip(), format)
except (TypeError, ValueError):
pass
else:
return calendar.timegm(parsed_time)
# try parsing with dateutil
try:
parsed_time = dateutil.parser.parse(time_string.rstrip())
except (TypeError, ValueError):
return None
else:
return parsed_time.timestamp()
# PURPOSE: output a time string in isoformat
# PURPOSE: rounds a number to an even number less than or equal to original
[docs]
def even(value: float):
"""
Rounds a number to an even number less than or equal to original
Parameters
----------
value: float
number to be rounded
"""
return 2*int(value//2)
# PURPOSE: rounds a number upward to its nearest integer
[docs]
def ceil(value: float):
"""
Rounds a number upward to its nearest integer
Parameters
----------
value: float
number to be rounded upward
"""
return -int(-value//1)
# PURPOSE: make a copy of a file with all system information
[docs]
def copy(
source: str | pathlib.Path,
destination: str | pathlib.Path,
move: bool = False,
**kwargs
):
"""
Copy or move a file with all system information
Parameters
----------
source: str or pathlib.Path
source file
destination: str or pathlib.Path
copied destination file
move: bool, default False
remove the source file
"""
source = pathlib.Path(source).expanduser().absolute()
destination = pathlib.Path(destination).expanduser().absolute()
# log source and destination
logging.info(f'{str(source)} -->\n\t{str(destination)}')
shutil.copyfile(source, destination)
shutil.copystat(source, destination)
# remove the original file if moving
if move:
source.unlink()
# PURPOSE: open a unique file adding a numerical instance if existing
[docs]
def create_unique_file(filename: str | pathlib.Path):
"""
Open a unique file adding a numerical instance if existing
Parameters
----------
filename: str or pathlib.Path
full path to output file
"""
# validate input filename
filename = pathlib.Path(filename).expanduser().absolute()
stem, suffix = filename.stem, filename.suffix
# create counter to add to the end of the filename if existing
counter = 1
while counter:
try:
# open file descriptor only if the file doesn't exist
fd = filename.open(mode='xb')
except OSError:
pass
else:
# return the file descriptor
return fd
# new filename adds a counter before the file extension
filename = filename.with_name(f'{stem}_{counter:d}{suffix}')
counter += 1
# PURPOSE: check ftp connection
[docs]
def check_ftp_connection(
HOST: str,
username: str | None = None,
password: str | None = None
):
"""
Check internet connection with ftp host
Parameters
----------
HOST: str
remote ftp host
username: str or NoneType
ftp username
password: str or NoneType
ftp password
"""
# attempt to connect to ftp host
try:
f = ftplib.FTP(HOST)
f.login(username, password)
f.voidcmd("NOOP")
except IOError:
raise RuntimeError('Check internet connection')
except ftplib.error_perm:
raise RuntimeError('Check login credentials')
else:
return True
# PURPOSE: list a directory on a ftp host
[docs]
def ftp_list(
HOST: str | list,
username: str | None = None,
password: str | None = None,
timeout: int | None = None,
basename: bool = False,
pattern: str | None = None,
sort: bool = False
):
"""
List a directory on a ftp host
Parameters
----------
HOST: str or list
remote ftp host path split as list
username: str or NoneType
ftp username
password: str or NoneType
ftp password
timeout: int or NoneType, default None
timeout in seconds for blocking operations
basename: bool, default False
return the file or directory basename instead of the full path
pattern: str or NoneType, default None
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
output: list
items in a directory
mtimes: list
last modification times for items in the directory
"""
# verify inputs for remote ftp host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try to connect to ftp host
try:
ftp = ftplib.FTP(HOST[0],timeout=timeout)
except (socket.gaierror,IOError):
raise RuntimeError(f'Unable to connect to {HOST[0]}')
else:
ftp.login(username,password)
# list remote path
output = ftp.nlst(posixpath.join(*HOST[1:]))
# get last modified date of ftp files and convert into unix time
mtimes = [None]*len(output)
# iterate over each file in the list and get the modification time
for i,f in enumerate(output):
try:
# try sending modification time command
mdtm = ftp.sendcmd(f'MDTM {f}')
except ftplib.error_perm:
# directories will return with an error
pass
else:
# convert the modification time into unix time
mtimes[i] = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S")
# reduce to basenames
if basename:
output = [posixpath.basename(i) for i in output]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(output) if re.search(pattern,f)]
# reduce list of listed items and last modified times
output = [output[indice] for indice in i]
mtimes = [mtimes[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(output), key=lambda i: i[1])]
# sort list of listed items and last modified times
output = [output[indice] for indice in i]
mtimes = [mtimes[indice] for indice in i]
# close the ftp connection
ftp.close()
# return the list of items and last modified times
return (output, mtimes)
# PURPOSE: download a file from a ftp host
[docs]
def from_ftp(
HOST: str | list,
username: str | None = None,
password: str | None = None,
timeout: int | None = None,
local: str | pathlib.Path | None = None,
hash: str = '',
chunk: int = 8192,
verbose: bool = False,
fid=sys.stdout,
mode: oct = 0o775
):
"""
Download a file from a ftp host
Parameters
----------
HOST: str or list
remote ftp host path
username: str or NoneType
ftp username
password: str or NoneType
ftp password
timeout: int or NoneType, default None
timeout in seconds for blocking operations
local: str, pathlib.Path or NoneType, default None
path to local file
hash: str, default ''
MD5 hash of local file
chunk: int, default 8192
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
Returns
-------
remote_buffer: obj
BytesIO representation of file
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# verify inputs for remote ftp host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try downloading from ftp
try:
# try to connect to ftp host
ftp = ftplib.FTP(HOST[0], timeout=timeout)
except (socket.gaierror,IOError):
raise RuntimeError(f'Unable to connect to {HOST[0]}')
else:
ftp.login(username,password)
# remote path
ftp_remote_path = posixpath.join(*HOST[1:])
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
ftp.retrbinary(f'RETR {ftp_remote_path}',
remote_buffer.write, blocksize=chunk)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# get last modified date of remote file and convert into unix time
mdtm = ftp.sendcmd(f'MDTM {ftp_remote_path}')
remote_mtime = get_unix_time(mdtm[4:], format="%Y%m%d%H%M%S")
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
args = (posixpath.join(*HOST), str(local))
logging.info('{0} -->\n\t{1}'.format(*args))
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode='wb') as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode)
# keep remote modification time of file and local access time
os.utime(local, (local.stat().st_atime, remote_mtime))
# close the ftp connection
ftp.close()
# return the bytesIO object
remote_buffer.seek(0)
return remote_buffer
[docs]
def _create_default_ssl_context() -> ssl.SSLContext:
"""Creates the default SSL context
"""
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
_set_ssl_context_options(context)
context.options |= ssl.OP_NO_COMPRESSION
return context
[docs]
def _create_ssl_context_no_verify() -> ssl.SSLContext:
"""Creates an SSL context for unverified connections
"""
context = _create_default_ssl_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
return context
[docs]
def _set_ssl_context_options(context: ssl.SSLContext) -> None:
"""Sets the default options for the SSL context
"""
if sys.version_info >= (3, 10) or ssl.OPENSSL_VERSION_INFO >= (1, 1, 0, 7):
context.minimum_version = ssl.TLSVersion.TLSv1_2
else:
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1
# default ssl context
_default_ssl_context = _create_ssl_context_no_verify()
# PURPOSE: check internet connection
[docs]
def check_connection(
HOST: str,
context: ssl.SSLContext = _default_ssl_context,
):
"""
Check internet connection with http host
Parameters
----------
HOST: str
remote http host
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
"""
# attempt to connect to http host
try:
urllib2.urlopen(HOST, timeout=20, context=context)
except urllib2.URLError as exc:
raise RuntimeError('Check internet connection') from exc
else:
return True
# PURPOSE: list a directory on an Apache http Server
[docs]
def http_list(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
parser = lxml.etree.HTMLParser(),
format: str = '%Y-%m-%d %H:%M',
pattern: str = '',
sort: bool = False
):
"""
List a directory on an Apache http Server
Parameters
----------
HOST: str or list
remote http host path
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
format: str, default '%Y-%m-%d %H:%M'
format for input time string
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout, context=context)
except (urllib2.HTTPError, urllib2.URLError):
raise Exception('List error from {0}'.format(posixpath.join(*HOST)))
else:
# read and parse request for files (column names and modified times)
tree = lxml.etree.parse(response, parser)
colnames = tree.xpath('//tr/td[not(@*)]//a/@href')
# get the Unix timestamp value for a modification time
collastmod = [get_unix_time(i,format=format)
for i in tree.xpath('//tr/td[@align="right"][1]/text()')]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(colnames) if re.search(pattern, f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames, collastmod)
# PURPOSE: download a file from a http host
[docs]
def from_http(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
local: str | pathlib.Path | None = None,
hash: str = '',
chunk: int = 16384,
verbose: bool = False,
fid = sys.stdout,
mode: oct = 0o775
):
"""
Download a file from a http host
Parameters
----------
HOST: str or list
remote http host path split as list
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
local: str, pathlib.Path or NoneType, default None
path to local file
hash: str, default ''
MD5 hash of local file
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
Returns
-------
remote_buffer: obj
BytesIO representation of file
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try downloading from http
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout, context=context)
except:
raise Exception('Download error from {0}'.format(posixpath.join(*HOST)))
else:
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
shutil.copyfileobj(response, remote_buffer, chunk)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
args = (posixpath.join(*HOST), str(local))
logging.info('{0} -->\n\t{1}'.format(*args))
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode='wb') as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode)
# return the bytesIO object
remote_buffer.seek(0)
return remote_buffer
# PURPOSE: load a JSON response from a http host
[docs]
def from_json(
HOST: str | list,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context
) -> dict:
"""
Load a JSON response from a http host
Parameters
----------
HOST: str or list
remote http host path split as list
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
"""
# verify inputs for remote http host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try loading JSON from http
try:
# Create and submit request for JSON response
request = urllib2.Request(posixpath.join(*HOST))
request.add_header('Accept', 'application/json')
response = urllib2.urlopen(request, timeout=timeout, context=context)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
msg = 'Load error from {0}'.format(posixpath.join(*HOST))
raise Exception(msg) from exc
else:
# load JSON response
return json.loads(response.read())
# PURPOSE: attempt to build an opener with netrc
[docs]
def attempt_login(
urs: str,
context: ssl.SSLContext = _default_ssl_context,
password_manager: bool = True,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = True,
**kwargs
):
"""
Attempt to build a ``urllib`` opener for NASA Earthdata
Parameters
----------
urs: str
Earthdata login URS 3 host
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
password_manager: bool, default True
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default True
Add base64 encoded authorization header to opener
username: str, default from environmental variable
NASA Earthdata username
password: str, default from environmental variable
NASA Earthdata password
retries: int, default 5
number of retry attempts
netrc: str, default ~/.netrc
path to .netrc file for authentication
Returns
-------
opener: obj
OpenerDirector instance
"""
# set default keyword arguments
kwargs.setdefault('username', os.environ.get('EARTHDATA_USERNAME'))
kwargs.setdefault('password', os.environ.get('EARTHDATA_PASSWORD'))
kwargs.setdefault('retries', 5)
kwargs.setdefault('netrc', pathlib.Path.home().joinpath('.netrc'))
try:
# verify permissions level of netrc file
# only necessary on jupyterhub
nc = pathlib.Path(kwargs['netrc']).expanduser().absolute()
nc.chmod(mode=0o600)
# try retrieving credentials from netrc
username, _, password = netrc.netrc(nc).authenticators(urs)
except Exception as exc:
# try retrieving credentials from environmental variables
username, password = (kwargs['username'], kwargs['password'])
pass
# if username or password are not available
if not username:
username = builtins.input(f'Username for {urs}: ')
if not password:
prompt = f'Password for {username}@{urs}: '
password = getpass.getpass(prompt=prompt)
# for each retry
for retry in range(kwargs['retries']):
# build an opener for urs with credentials
opener = build_opener(username, password,
context=context,
password_manager=password_manager,
get_ca_certs=get_ca_certs,
redirect=redirect,
authorization_header=authorization_header,
urs=urs)
# try logging in by check credentials
HOST = 'https://archive.podaac.earthdata.nasa.gov/s3credentials'
try:
check_credentials(HOST)
except Exception as exc:
pass
else:
return opener
# reattempt login
username = builtins.input(f'Username for {urs}: ')
password = getpass.getpass(prompt=prompt)
# reached end of available retries
raise RuntimeError('End of Retries: Check NASA Earthdata credentials')
# PURPOSE: "login" to NASA Earthdata with supplied credentials
[docs]
def build_opener(
username: str,
password: str,
context: ssl.SSLContext = _default_ssl_context,
password_manager: bool = False,
get_ca_certs: bool = False,
redirect: bool = False,
authorization_header: bool = True,
urs: str = 'https://urs.earthdata.nasa.gov'
):
"""
Build ``urllib`` opener for NASA Earthdata with supplied credentials
Parameters
----------
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
password_manager: bool, default False
Create password manager context using default realm
get_ca_certs: bool, default False
Get list of loaded “certification authority” certificates
redirect: bool, default False
Create redirect handler object
authorization_header: bool, default True
Add base64 encoded authorization header to opener
urs: str, default 'https://urs.earthdata.nasa.gov'
Earthdata login URS 3 host
Returns
-------
opener: obj
OpenerDirector instance
"""
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# create a password manager
if password_manager:
password_mgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
# Add the username and password for NASA Earthdata Login system
password_mgr.add_password(None, urs, username, password)
handler.append(urllib2.HTTPBasicAuthHandler(password_mgr))
# Create cookie jar for storing cookies. This is used to store and return
# the session cookie given to use by the data server (otherwise will just
# keep sending us back to Earthdata Login to authenticate).
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
# SSL context handler
if get_ca_certs:
context.get_ca_certs()
handler.append(urllib2.HTTPSHandler(context=context))
# redirect handler
if redirect:
handler.append(urllib2.HTTPRedirectHandler())
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# Encode username/password for request authorization headers
# add Authorization header to opener
if authorization_header:
b64 = base64.b64encode(f'{username}:{password}'.encode())
opener.addheaders = [("Authorization", f"Basic {b64.decode()}")]
# Now all calls to urllib2.urlopen use our opener.
urllib2.install_opener(opener)
# All calls to urllib2.urlopen will now use handler
# Make sure not to include the protocol in with the URL, or
# HTTPPasswordMgrWithDefaultRealm will be confused.
return opener
# PURPOSE: generate a NASA Earthdata user token
[docs]
def get_token(
HOST: str = 'https://urs.earthdata.nasa.gov/api/users/token',
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = 'urs.earthdata.nasa.gov',
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
HOST: str or list
NASA Earthdata token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
token: dict
JSON response with NASA Earthdata User Token
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True)
# create post response with Earthdata token API
try:
request = urllib2.Request(HOST, method='POST')
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError('Check internet connection') from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: generate a NASA Earthdata user token
[docs]
def list_tokens(
HOST: str = 'https://urs.earthdata.nasa.gov/api/users/tokens',
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = 'urs.earthdata.nasa.gov',
):
"""
List the current associated NASA Earthdata User Tokens
Parameters
----------
HOST: str
NASA Earthdata list token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
Returns
-------
tokens: list
JSON response with NASA Earthdata User Tokens
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True)
# create get response with Earthdata list tokens API
try:
request = urllib2.Request(HOST)
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError('Check internet connection') from exc
# read and return JSON response
return json.loads(response.read())
# PURPOSE: revoke a NASA Earthdata user token
[docs]
def revoke_token(
token: str,
HOST: str = f'https://urs.earthdata.nasa.gov/api/users/revoke_token',
username: str | None = None,
password: str | None = None,
build: bool = True,
context: ssl.SSLContext = _default_ssl_context,
urs: str = 'urs.earthdata.nasa.gov',
):
"""
Generate a NASA Earthdata User Token
Parameters
----------
token: str
NASA Earthdata token to be revoked
HOST: str
NASA Earthdata revoke token API host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
NASA Earthdata password
build: bool, default True
Build opener and check credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
urs: str, default 'urs.earthdata.nasa.gov'
NASA Earthdata URS 3 host
"""
# attempt to build urllib2 opener and check credentials
if build:
attempt_login(urs,
username=username,
password=password,
context=context,
password_manager=False,
get_ca_certs=False,
redirect=False,
authorization_header=True)
# full path for NASA Earthdata revoke token API
url = f'{HOST}?token={token}'
# create post response with Earthdata revoke tokens API
try:
request = urllib2.Request(url, method='POST')
response = urllib2.urlopen(request)
except urllib2.HTTPError as exc:
logging.debug(exc.code)
raise RuntimeError(exc.reason) from exc
except urllib2.URLError as exc:
logging.debug(exc.reason)
raise RuntimeError('Check internet connection') from exc
# verbose response
logging.debug(f'Token Revoked: {token}')
# NASA on-prem DAAC providers
_daac_providers = {
'gesdisc': 'GES_DISC',
'ghrcdaac': 'GHRC_DAAC',
'lpdaac': 'LPDAAC_ECS',
'nsidc': 'NSIDC_ECS',
'ornldaac': 'ORNL_DAAC',
'podaac': 'PODAAC',
}
# NASA Cumulus AWS providers
_s3_providers = {
'gesdisc': 'GES_DISC',
'ghrcdaac': 'GHRC_DAAC',
'lpdaac': 'LPCLOUD',
'nsidc': 'NSIDC_CPRD',
'ornldaac': 'ORNL_CLOUD',
'podaac': 'POCLOUD',
}
# NASA Cumulus AWS S3 credential endpoints
_s3_endpoints = {
'gesdisc': 'https://data.gesdisc.earthdata.nasa.gov/s3credentials',
'ghrcdaac': 'https://data.ghrc.earthdata.nasa.gov/s3credentials',
'lpdaac': 'https://data.lpdaac.earthdatacloud.nasa.gov/s3credentials',
'nsidc': 'https://data.nsidc.earthdatacloud.nasa.gov/s3credentials',
'ornldaac': 'https://data.ornldaac.earthdata.nasa.gov/s3credentials',
'podaac': 'https://archive.podaac.earthdata.nasa.gov/s3credentials'
}
# NASA Cumulus AWS S3 buckets
_s3_buckets = {
'gesdisc': 'gesdisc-cumulus-prod-protected',
'ghrcdaac': 'ghrc-cumulus-dev',
'lpdaac': 'lp-prod-protected',
'nsidc': 'nsidc-cumulus-prod-protected',
'ornldaac': 'ornl-cumulus-prod-protected',
'podaac': 'podaac-ops-cumulus-protected',
'podaac-doc': 'podaac-ops-cumulus-docs'
}
def s3_region():
"""
Get AWS s3 region for EC2 instance
Returns
-------
region_name: str
AWS region name
"""
boto3 = import_dependency('boto3')
region_name = boto3.session.Session().region_name
return region_name
# PURPOSE: get AWS s3 client for PO.DAAC Cumulus
[docs]
def s3_client(
HOST: str = _s3_endpoints['podaac'],
timeout: int | None = None,
region_name: str = 'us-west-2'
):
"""
Get AWS s3 client for PO.DAAC Cumulus
Parameters
----------
HOST: str
PO.DAAC or ECCO AWS S3 credential host
timeout: int or NoneType, default None
timeout in seconds for blocking operations
region_name: str, default 'us-west-2'
AWS region name
Returns
-------
client: obj
AWS s3 client for PO.DAAC Cumulus
"""
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=timeout)
cumulus = json.loads(response.read())
# get AWS client object
boto3 = import_dependency('boto3')
client = boto3.client('s3',
aws_access_key_id=cumulus['accessKeyId'],
aws_secret_access_key=cumulus['secretAccessKey'],
aws_session_token=cumulus['sessionToken'],
region_name=region_name)
# return the AWS client for region
return client
# PURPOSE: get a s3 bucket name from a presigned url
[docs]
def s3_bucket(presigned_url: str) -> str:
"""
Get a s3 bucket name from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url
Returns
-------
bucket: str
s3 bucket name
"""
host = url_split(presigned_url)
bucket = re.sub(r's3:\/\/', r'', host[0], re.IGNORECASE)
return bucket
# PURPOSE: get a s3 bucket key from a presigned url
[docs]
def s3_key(presigned_url: str) -> str:
"""
Get a s3 bucket key from a presigned url
Parameters
----------
presigned_url: str
s3 presigned url
Returns
-------
key: str
s3 bucket key for object
"""
host = url_split(presigned_url)
key = posixpath.join(*host[1:])
return key
# PURPOSE: check that entered NASA Earthdata credentials are valid
[docs]
def check_credentials(HOST: str = _s3_endpoints['podaac']):
"""
Check that entered NASA Earthdata credentials are valid
HOST: str
full url to protected credential website
"""
try:
request = urllib2.Request(HOST)
response = urllib2.urlopen(request, timeout=20)
except urllib2.HTTPError:
raise RuntimeError('Check your NASA Earthdata credentials')
except urllib2.URLError:
raise RuntimeError('Check internet connection')
else:
return True
# PURPOSE: list a directory on JPL PO.DAAC/ECCO Drive https server
[docs]
def drive_list(
HOST: str | list,
username: str | None = None,
password: str | None = None,
build: bool = True,
timeout: int | None = None,
urs: str = 'podaac-tools.jpl.nasa.gov',
parser = lxml.etree.HTMLParser(),
pattern: str = '',
sort: bool = False
):
"""
List a directory on
`JPL PO.DAAC <https://podaac-tools.jpl.nasa.gov/drive>`_ or
`ECCO Drive <https://ecco.jpl.nasa.gov/drive/>`_
Parameters
----------
HOST: str or list
remote https host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
JPL PO.DAAC Drive WebDAV password
build: bool, default True
Build opener and check WebDAV credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'podaac-tools.jpl.nasa.gov'
JPL PO.DAAC or ECCO login URS 3 host
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
collastmod: list
last modification times for items in the directory
"""
# use netrc credentials
if build and not (username or password):
username,_,password = netrc.netrc().authenticators(urs)
# build urllib2 opener and check credentials
if build:
# build urllib2 opener with credentials
build_opener(username, password)
# check credentials
check_credentials()
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
tree = lxml.etree.parse(urllib2.urlopen(request, timeout=timeout),parser)
except (urllib2.HTTPError, urllib2.URLError) as exc:
raise Exception('List error from {0}'.format(posixpath.join(*HOST)))
else:
# read and parse request for files (column names and modified times)
colnames = tree.xpath('//tr/td//a[@class="text-left"]/text()')
# get the Unix timestamp value for a modification time
collastmod = [get_unix_time(i) for i in tree.xpath('//tr/td[3]/text()')]
# reduce using regular expression pattern
if pattern:
i = [i for i,f in enumerate(colnames) if re.search(pattern,f)]
# reduce list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# sort the list
if sort:
i = [i for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# sort list of column names and last modified times
colnames = [colnames[indice] for indice in i]
collastmod = [collastmod[indice] for indice in i]
# return the list of column names and last modified times
return (colnames,collastmod)
# PURPOSE: download a file from a PO.DAAC/ECCO Drive https server
[docs]
def from_drive(
HOST: str | list,
username: str | None = None,
password: str | None = None,
build: bool = True,
timeout: int | None = None,
urs: str = 'podaac-tools.jpl.nasa.gov',
local: str | pathlib.Path | None = None,
hash: str = '',
chunk: int = 16384,
verbose: bool = False,
fid = sys.stdout,
mode: oct = 0o775
):
"""
Download a file from a
`JPL PO.DAAC <https://podaac-tools.jpl.nasa.gov/drive>`_ or
`ECCO Drive <https://ecco.jpl.nasa.gov/drive/>`_ https server
Parameters
----------
HOST: str or list
remote https host
username: str or NoneType, default None
NASA Earthdata username
password: str or NoneType, default None
JPL PO.DAAC Drive WebDAV password
build: bool, default True
Build opener and check WebDAV credentials
timeout: int or NoneType, default None
timeout in seconds for blocking operations
urs: str, default 'podaac-tools.jpl.nasa.gov'
JPL PO.DAAC or ECCO login URS 3 host
local: str or NoneType, default None
path to local file
hash: str, default ''
MD5 hash of local file
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
Returns
-------
remote_buffer: obj
BytesIO representation of file
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# use netrc credentials
if build and not (username or password):
username,_,password = netrc.netrc().authenticators(urs)
# build urllib2 opener and check credentials
if build:
# build urllib2 opener with credentials
build_opener(username, password)
# check credentials
check_credentials()
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try downloading from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout)
except (urllib2.HTTPError, urllib2.URLError) as exc:
raise Exception('Download error from {0}'.format(posixpath.join(*HOST)))
else:
# copy remote file contents to bytesIO object
remote_buffer = io.BytesIO()
shutil.copyfileobj(response, remote_buffer, chunk)
remote_buffer.seek(0)
# save file basename with bytesIO object
remote_buffer.filename = HOST[-1]
# generate checksum hash for remote file
remote_hash = hashlib.md5(remote_buffer.getvalue()).hexdigest()
# compare checksums
if local and (hash != remote_hash):
# convert to absolute path
local = pathlib.Path(local).expanduser().absolute()
# create directory if non-existent
local.parent.mkdir(mode=mode, parents=True, exist_ok=True)
# print file information
args = (posixpath.join(*HOST), str(local))
logging.info('{0} -->\n\t{1}'.format(*args))
# store bytes to file using chunked transfer encoding
remote_buffer.seek(0)
with local.open(mode='wb') as f:
shutil.copyfileobj(remote_buffer, f, chunk)
# change the permissions mode
local.chmod(mode=mode)
# return the bytesIO object
remote_buffer.seek(0)
return remote_buffer
# PURPOSE: retrieve shortnames for GRACE/GRACE-FO products
[docs]
def cmr_product_shortname(
mission: str,
center: str,
release: str,
level: str = 'L2',
version: str = '0',
product: list = ['GAA','GAB','GAC','GAD','GSM']
):
"""
Create a list of product shortnames for NASA Common Metadata
Repository (CMR) queries
Parameters
----------
mission: str
GRACE (grace) or GRACE Follow-On (grace-fo)
center: str
GRACE/GRACE-FO processing center
release: str
GRACE/GRACE-FO data release
level: str, default 'L2'
GRACE/GRACE-FO product level
- ``'L1A'``
- ``'L1B'``
- ``'L2'``
version: str, default '0'
GRACE/GRACE-FO Level-2 data version
product: list, default ['GAA','GAB','GAC','GAD','GSM']
GRACE/GRACE-FO Level-2 data products
Returns
-------
cmr_shortnames: list
shortnames for CMR queries
"""
# build dictionary for GRACE/GRACE-FO shortnames
cmr_shortname = {}
cmr_shortname['grace'] = {}
cmr_shortname['grace-fo'] = {}
# format of GRACE/GRACE-FO shortnames
grace_l1_format = 'GRACE_{0}_GRAV_{1}_{2}'
grace_l2_format = 'GRACE_{0}_{1}_GRAV_{2}_{3}'
gracefo_l1_format = 'GRACEFO_{0}_{1}_GRAV_{2}_{3}'
gracefo_l2_format = 'GRACEFO_{0}_{1}_MONTHLY_{2}{3}'
# dictionary entries for each product level
cmr_shortname['grace']['L1B'] = dict(GFZ={},JPL={})
cmr_shortname['grace']['L2'] = dict(CSR={},GFZ={},JPL={})
cmr_shortname['grace-fo']['L1A'] = dict(JPL={})
cmr_shortname['grace-fo']['L1B'] = dict(JPL={})
cmr_shortname['grace-fo']['L2'] = dict(CSR={},GFZ={},JPL={})
# dictionary entry for GRACE Level-1B dealiasing products
# for each data release
for rl in ['RL06']:
shortname = grace_l1_format.format('AOD1B','GFZ',rl)
cmr_shortname['grace']['L1B']['GFZ'][rl] = [shortname]
# dictionary entries for GRACE Level-1B ranging data products
# for each data release
for rl in ['RL02','RL03']:
shortname = grace_l1_format.format('L1B','JPL',rl)
cmr_shortname['grace']['L1B']['JPL'][rl] = [shortname]
# dictionary entries for GRACE Level-2 products
# for each data release
for rl in ['RL06']:
# Center for Space Research (CSR)
cmr_shortname['grace']['L2']['CSR'][rl] = []
# German Research Centre for Geosciences (GFZ)
cmr_shortname['grace']['L2']['GFZ'][rl] = []
# NASA Jet Propulsion Laboratory (JPL)
cmr_shortname['grace']['L2']['JPL'][rl] = []
# check that product is iterable
if isinstance(product, str):
product = [product]
# create list of product shortnames for GRACE level-2 products
# for each L2 data processing center
for c in ['CSR','GFZ','JPL']:
# for each level-2 product
for p in product:
# skip atmospheric and oceanic dealiasing products for CSR
if (c == 'CSR') and p in ('GAA', 'GAB'):
continue
# shortname for center and product
shortname = grace_l2_format.format(p,'L2',c,rl)
cmr_shortname['grace']['L2'][c][rl].append(shortname)
# dictionary entries for GRACE-FO Level-1 ranging data products
# for each data release
for rl in ['RL04']:
for l in ['L1A','L1B']:
shortname = gracefo_l1_format.format(l,'ASCII','JPL',rl)
cmr_shortname['grace-fo'][l]['JPL'][rl] = [shortname]
# dictionary entries for GRACE-FO Level-2 products
# for each data release
for rl in ['RL06']:
rs = re.findall(r'\d+',rl).pop().zfill(3)
for c in ['CSR','GFZ','JPL']:
shortname = gracefo_l2_format.format('L2',c,rs,version)
cmr_shortname['grace-fo']['L2'][c][rl] = [shortname]
# try to retrieve the shortname for a given mission
try:
cmr_shortnames = cmr_shortname[mission][level][center][release]
except Exception as exc:
raise Exception('NASA CMR shortname not found')
else:
return cmr_shortnames
[docs]
def cmr_readable_granules(
product: str,
level: str = 'L2',
solution: str = 'BA01',
version: str = '0'
):
"""
Create readable granule names pattern for NASA Common Metadata
Repository (CMR) queries
Parameters
----------
product: str
GRACE/GRACE-FO data product
level: str, default 'L2'
GRACE/GRACE-FO product level
- ``'L1A'``
- ``'L1B'``
- ``'L2'``
solution: str, default 'BA01'
monthly gravity field solution for Release-06
- ``'BA01'``: unconstrained monthly gravity field solution to d/o 60
- ``'BB01'``: unconstrained monthly gravity field solution to d/o 96
- ``'BC01'``: computed monthly dealiasing solution to d/o 180
version: str, default '0'
GRACE/GRACE-FO Level-2 data version
Returns
-------
pattern: str
readable granule names pattern for CMR queries
"""
if (level == 'L1B') and (product == 'AOD1B'):
pattern = 'AOD1B_*'
elif (level == 'L1A') or (level == 'L1B'):
pattern = 'grace*'
elif (level == 'L2') and (product == 'GSM'):
args = (product, solution, version)
pattern = '{0}-2_???????-???????_????_?????_{1}_???{2}*'.format(*args)
elif (level == 'L2'):
args = (product, 'BC01', version)
pattern = '{0}-2_???????-???????_????_?????_{1}_???{2}*'.format(*args)
else:
pattern = '*'
# return readable granules pattern
return pattern
# PURPOSE: filter the CMR json response for desired data files
[docs]
def cmr_filter_json(
search_results: dict,
endpoint: str = 'data'
):
"""
Filter the NASA Common Metadata Repository (CMR) json
response for desired data files
Parameters
----------
search_results: dict
json response from CMR query
endpoint: str, default 'data'
url endpoint type
- ``'data'``: PO.DAAC https archive
- ``'s3'``: PO.DAAC Cumulus AWS S3 bucket
Returns
-------
granule_names: list
GRACE/GRACE-FO granule names
granule_urls: list
GRACE/GRACE-FO granule urls
granule_mtimes: list
GRACE/GRACE-FO granule modification times
"""
# output list of granule ids, urls and modified times
granule_names = []
granule_urls = []
granule_mtimes = []
# check that there are urls for request
if ('feed' not in search_results) or ('entry' not in search_results['feed']):
return (granule_names,granule_urls)
# descriptor links for each endpoint
rel = {}
rel['data'] = "http://esipfed.org/ns/fedsearch/1.1/data#"
rel['s3'] = "http://esipfed.org/ns/fedsearch/1.1/s3#"
# iterate over references and get cmr location
for entry in search_results['feed']['entry']:
granule_names.append(entry['title'])
granule_mtimes.append(get_unix_time(entry['updated'],
format='%Y-%m-%dT%H:%M:%S.%f%z'))
for link in entry['links']:
if (link['rel'] == rel[endpoint]):
granule_urls.append(link['href'])
break
# return the list of urls, granule ids and modified times
return (granule_names,granule_urls,granule_mtimes)
# PURPOSE: filter the CMR json response for desired metadata files
# PURPOSE: cmr queries for GRACE/GRACE-FO products
[docs]
def cmr(
mission: str | None = None,
center: str | None = None,
release: str | None = None,
level: str | None = 'L2',
product: str | None = None,
solution: str | None = 'BA01',
version: str | None = '0',
start_date: str | None = None,
end_date: str | None = None,
provider: str | None = 'POCLOUD',
endpoint: str | None = 'data',
context: ssl.SSLContext = _default_ssl_context,
verbose: bool = False,
fid = sys.stdout
):
"""
Query the NASA Common Metadata Repository (CMR) for GRACE/GRACE-FO data
Parameters
----------
mission: str or NoneType, default None
GRACE (``'grace'``) or GRACE Follow-On (``'grace-fo'``)
center: str or NoneType, default None
GRACE/GRACE-FO processing center
release: str or NoneType, default None
GRACE/GRACE-FO data release
level: str or NoneType, default 'L2'
GRACE/GRACE-FO product level
product: str or NoneType, default None
GRACE/GRACE-FO data product
solution: str or NoneType, default 'BA01'
monthly gravity field solution for Release-06
version: str or NoneType, default '0'
GRACE/GRACE-FO Level-2 data version
start_date: str or NoneType, default None
starting date for CMR product query
end_date: str or NoneType, default None
ending date for CMR product query
provider: str or NoneType, default 'POCLOUD'
CMR data provider
- ``'PODAAC'``: PO.DAAC Drive
- ``'POCLOUD'``: PO.DAAC Cumulus
endpoint: str or NoneType, default 'data'
url endpoint type
- ``'data'``: PO.DAAC https archive
- ``'s3'``: PO.DAAC Cumulus AWS S3 bucket
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
verbose: bool, default False
print CMR query information
fid: obj, default sys.stdout
open file object to print if verbose
Returns
-------
granule_names: list
GRACE/GRACE-FO granule names
granule_urls: list
GRACE/GRACE-FO granule urls
granule_mtimes: list
GRACE/GRACE-FO granule modification times
"""
# create logger
loglevel = logging.INFO if verbose else logging.CRITICAL
logging.basicConfig(stream=fid, level=loglevel)
# build urllib2 opener with SSL context
# https://docs.python.org/3/howto/urllib2.html#id5
handler = []
# Create cookie jar for storing cookies
cookie_jar = CookieJar()
handler.append(urllib2.HTTPCookieProcessor(cookie_jar))
handler.append(urllib2.HTTPSHandler(context=context))
# create "opener" (OpenerDirector instance)
opener = urllib2.build_opener(*handler)
# build CMR query
cmr_query_type = 'granules'
cmr_format = 'json'
cmr_page_size = 2000
CMR_HOST = ['https://cmr.earthdata.nasa.gov','search',
f'{cmr_query_type}.{cmr_format}']
# build list of CMR query parameters
CMR_KEYS = []
CMR_KEYS.append(f'?provider={provider}')
CMR_KEYS.append('&sort_key[]=start_date')
CMR_KEYS.append('&sort_key[]=producer_granule_id')
CMR_KEYS.append(f'&page_size={cmr_page_size}')
# dictionary of product shortnames
short_names = cmr_product_shortname(mission, center, release,
level=level, version=version)
for short_name in short_names:
CMR_KEYS.append(f'&short_name={short_name}')
# append keys for start and end time
# verify that start and end times are in ISO format
start_date = isoformat(start_date) if start_date else ''
end_date = isoformat(end_date) if end_date else ''
CMR_KEYS.append(f'&temporal={start_date},{end_date}')
# append keys for querying specific products
CMR_KEYS.append("&options[readable_granule_name][pattern]=true")
CMR_KEYS.append("&options[spatial][or]=true")
readable_granule = cmr_readable_granules(product,
level=level, solution=solution, version=version)
CMR_KEYS.append(f"&readable_granule_name[]={readable_granule}")
# full CMR query url
cmr_query_url = "".join([posixpath.join(*CMR_HOST),*CMR_KEYS])
logging.info(f'CMR request={cmr_query_url}')
# output list of granule names and urls
granule_names = []
granule_urls = []
granule_mtimes = []
cmr_search_after = None
while True:
req = urllib2.Request(cmr_query_url)
# add CMR search after header
if cmr_search_after:
req.add_header('CMR-Search-After', cmr_search_after)
logging.debug(f'CMR-Search-After: {cmr_search_after}')
response = opener.open(req)
# get search after index for next iteration
headers = {k.lower():v for k,v in dict(response.info()).items()}
cmr_search_after = headers.get('cmr-search-after')
# read the CMR search as JSON
search_page = json.loads(response.read().decode('utf8'))
ids,urls,mtimes = cmr_filter_json(search_page, endpoint=endpoint)
if not urls or cmr_search_after is None:
break
# extend lists
granule_names.extend(ids)
granule_urls.extend(urls)
granule_mtimes.extend(mtimes)
# return the list of granule ids, urls and modification times
return (granule_names, granule_urls, granule_mtimes)
# PURPOSE: cmr queries for GRACE/GRACE-FO auxiliary data and documentation
# PURPOSE: create and compile regular expression operator to find GRACE files
[docs]
def compile_regex_pattern(
PROC: str,
DREL: str,
DSET: str,
mission: str | None = None,
solution: str | None = r'BA01',
version: str | None = r'\d+'
):
"""
Compile regular expressor operators for finding a specified
subset of GRACE/GRACE-FO Level-2 spherical harmonic files
Parameters
----------
PROC: str
GRACE/GRACE-FO data processing center
- ``'CNES'``: French Centre National D'Etudes Spatiales
- ``'CSR'``: University of Texas Center for Space Research
- ``'GFZ'``: German Research Centre for Geosciences (GeoForschungsZentrum)
- ``'JPL'``: Jet Propulsion Laboratory
DREL: str
GRACE/GRACE-FO data release
DSET: str
GRACE/GRACE-FO data product
- ``'GAA'``: non-tidal atmospheric correction
- ``'GAB'``: non-tidal oceanic correction
- ``'GAC'``: combined non-tidal atmospheric and oceanic correction
- ``'GAD'``: ocean bottom pressure product
- ``'GSM'``: corrected monthly static gravity field product
mission: str or NoneType, default None
GRACE/GRACE-FO mission shortname
- ``'GRAC'``: GRACE
- ``'GRFO'``: GRACE-FO
solution: str, default 'BA01'
monthly gravity field solution for Release-06
- ``'BA01'``: unconstrained monthly gravity field solution to d/o 60
- ``'BB01'``: unconstrained monthly gravity field solution to d/o 96
- ``'BC01'``: computed monthly dealiasing solution to d/o 180
version: str, default '0'
GRACE/GRACE-FO Level-2 data version
"""
# verify inputs
if mission and mission not in ('GRAC','GRFO'):
raise ValueError(f'Unknown mission {mission}')
if PROC not in ('CNES','CSR','GFZ','JPL'):
raise ValueError(f'Unknown processing center {PROC}')
if DSET not in ('GAA','GAB','GAC','GAD','GSM'):
raise ValueError(f'Unknown Level-2 product {DSET}')
if isinstance(version, int):
version = str(version).zfill(2)
# compile regular expression operator for inputs
if ((DSET == 'GSM') and (PROC == 'CSR') and (DREL in ('RL04','RL05'))):
# CSR GSM: only monthly degree 60 products
# not the longterm degree 180, degree 96 dataset or the
# special order 30 datasets for the high-resonance months
release, = re.findall(r'\d+', DREL)
args = (DSET, int(release))
pattern = r'{0}-2_\d+-\d+_\d+_UTCSR_0060_000{1:d}(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'CSR') and (DREL == 'RL06')):
# CSR GSM RL06: monthly products for mission and solution
release, = re.findall(r'\d+', DREL)
args = (DSET, mission, solution, release.zfill(2), version.zfill(2))
pattern = r'{0}-2_\d+-\d+_{1}_UTCSR_{2}_{3}{4}(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'CSR') and (DREL.endswith('LRI'))):
# CSR GSM LRI solutions: monthly products for mission and solution
release, version = re.findall(r'(\d+)\.(\d+)', DREL).pop()
args = (DSET, mission, r'EA01', release.zfill(2), version.zfill(2))
pattern = r'{0}-2_\d+-\d+_{1}_UTCSR_{2}_{3}{4}(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'GFZ') and (DREL == 'RL04')):
# GFZ RL04: only unconstrained solutions (not GK2 products)
args = (DSET,)
pattern = r'{0}-2_\d+-\d+_\d+_EIGEN_G---_0004(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'GFZ') and (DREL == 'RL05')):
# GFZ RL05: updated RL05a products which are less constrained to
# the background model. Allow regularized fields
args = (DSET, r'(G---|GK2-)')
pattern = r'{0}-2_\d+-\d+_\d+_EIGEN_{1}_005a(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'GFZ') and (DREL == 'RL06')):
# GFZ GSM RL06: monthly products for mission and solution
release, = re.findall(r'\d+', DREL)
args = (DSET, mission, solution, release.zfill(2), version.zfill(2))
pattern = r'{0}-2_\d+-\d+_{1}_GFZOP_{2}_{3}{4}(\.gz)?$'
elif (PROC == 'JPL') and DREL in ('RL04','RL05'):
# JPL: RL04a and RL05a products (denoted by 0001)
release, = re.findall(r'\d+', DREL)
args = (DSET, int(release))
pattern = r'{0}-2_\d+-\d+_\d+_JPLEM_0001_000{1:d}(\.gz)?$'
elif ((DSET == 'GSM') and (PROC == 'JPL') and (DREL == 'RL06')):
# JPL GSM RL06: monthly products for mission and solution
release, = re.findall(r'\d+', DREL)
args = (DSET, mission, solution, release.zfill(2), version.zfill(2))
pattern = r'{0}-2_\d+-\d+_{1}_JPLEM_{2}_{3}{4}(\.gz)?$'
elif (PROC == 'CNES'):
# CNES: use products in standard format
args = (DSET,)
pattern = r'{0}-2_\d+-\d+_\d+_GRGS_([a-zA-Z0-9_\-]+)(\.txt)?(\.gz)?$'
elif mission is not None:
# dealiasing products with mission listed
args = (DSET, mission)
pattern = r'{0}-2_([a-zA-Z0-9_\-]+)_{1}_([a-zA-Z0-9_\-]+)(\.gz)?$'
else:
# dealiasing products: use products in standard format
args = (DSET,)
pattern = r'{0}-2_([a-zA-Z0-9_\-]+)(\.gz)?$'
# return the compiled regular expression operator
return re.compile(pattern.format(*args), re.VERBOSE)
# PURPOSE: download geocenter files from Sutterley and Velicogna (2019)
# https://doi.org/10.3390/rs11182108
# https://doi.org/10.6084/m9.figshare.7388540
[docs]
def from_figshare(
directory: str | pathlib.Path,
article: str = '7388540',
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
chunk: int | None = 16384,
verbose: bool = False,
fid = sys.stdout,
pattern: str = r'(CSR|GFZ|JPL)_(RL\d+)_(.*?)_SLF_iter.txt$',
mode: oct = 0o775
):
"""
Download :cite:p:`Sutterley:2019bx` geocenter files from
`figshare <https://doi.org/10.6084/m9.figshare.7388540>`_
Parameters
----------
directory: str
download directory
article: str
figshare article number
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
pattern: str, default '(CSR|GFZ|JPL)_(RL\\\\d+)_(.*?)_SLF_iter.txt$'
regular expression pattern for reducing list
mode: oct, default 0o775
permissions mode of output local file
"""
# figshare host
HOST=['https://api.figshare.com','v2','articles',article]
# recursively create directory if non-existent
directory = pathlib.Path(directory).expanduser().absolute()
local_dir = directory.joinpath('geocenter')
local_dir.mkdir(mode=mode, parents=True, exist_ok=True)
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
response = urllib2.urlopen(request, timeout=timeout,context=context)
resp = json.loads(response.read())
# reduce list of geocenter files
geocenter_files = [f for f in resp['files'] if re.match(pattern,f['name'])]
for f in geocenter_files:
# download geocenter file
local_file = local_dir.joinpath(f['name'])
original_md5 = get_hash(local_file)
from_http(f['download_url'],
timeout=timeout,
context=context,
local=local_file,
hash=original_md5,
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# verify MD5 checksums
computed_md5 = get_hash(local_file)
if (computed_md5 != f['supplied_md5']):
raise Exception(f'Checksum mismatch: {f["download_url"]}')
# PURPOSE: send files to figshare using secure FTP uploader
[docs]
def to_figshare(
files: list,
username: str | None = None,
password: str | None = None,
directory: str | None | pathlib.Path = None,
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
get_ca_certs: bool = False,
verbose: bool = False,
chunk: int = 8192
):
"""
Send files to figshare using secure `FTP uploader
<https://help.figshare.com/article/upload-large-datasets-and-
bulk-upload-using-the-ftp-uploader-desktop-uploader-or-api>`_
Parameters
----------
files: list
files to upload
username: str or NoneType, default None
ftp username
password: str or NoneType, default None
ftp password
directory: str or NoneType, default None
figshare subdirectory for sending data
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
get_ca_certs: bool, default False
get list of loaded “certification authority” certificates
verbose: bool, default False
print ftp transfer information
chunk: int, default 8192
chunk size for transfer encoding
"""
# SSL context handler
if get_ca_certs:
context.get_ca_certs()
# connect to figshare secure ftp host
ftps = ftplib.FTP_TLS(host='ftps.figshare.com',
user=username,
passwd=password,
context=context,
timeout=timeout)
# set the verbosity level
ftps.set_debuglevel(1) if verbose else None
# encrypt data connections
ftps.prot_p()
# try to create project directory
try:
# will only create the directory if non-existent
ftps.mkd(posixpath.join('data',directory))
except:
pass
# upload each file
for local_file in files:
# local file
local_file = pathlib.Path(local_file).expanduser().absolute()
# remote ftp file
ftp_remote_path = posixpath.join('data',directory,
local_file.name)
# open local file and send bytes
with local_file.open(mode='rb') as fp:
ftps.storbinary(f'STOR {ftp_remote_path}', fp,
blocksize=chunk, callback=None, rest=None)
# PURPOSE: download files from CSR
# http://download.csr.utexas.edu/pub/slr/geocenter/GCN_L1_L2_30d_CF-CM.txt
# http://download.csr.utexas.edu/outgoing/cheng/gct2est.220_5s
[docs]
def from_csr(
directory: str | pathlib.Path,
variable: str | list | tuple | None = None,
version: str = 'RL06.1LRI',
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
chunk: int | None = 16384,
verbose: bool = False,
fid = sys.stdout,
mode: oct = 0o775
):
"""
Download files from the University of Texas Center for
Space Research (UTCSR)
Parameters
----------
directory: str
download directory
variable: str, list, tuple or NoneType, default None
CSR variable to download
- ``'SLR'``: low degree SLR solutions
- ``'geocenter'``: SLR geocenter solutions
- ``'LRI'``: level-2 solutions from LRI
version: str, default 'RL06.1LRI'
Version of the LRI dataset to download
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default fid.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
"""
# CSR download http server
HOST = 'http://download.csr.utexas.edu'
# recursively create directory if non-existent
directory = pathlib.Path(directory).expanduser().absolute()
directory.mkdir(mode=mode, parents=True, exist_ok=True)
# verify inputs for variable to be iterable
if isinstance(variable, str):
variable = [variable]
# download SLR files from CSR
if 'SLR' in variable:
# download SLR 5x5, figure axis and azimuthal dependence files
FILES = []
FILES.append([HOST,'pub','slr','degree_5',
'CSR_Monthly_5x5_Gravity_Harmonics.txt'])
FILES.append([HOST,'pub','slr','degree_2','C20_RL06.txt'])
FILES.append([HOST,'pub','slr','degree_2','C21_S21_RL06.txt'])
FILES.append([HOST,'pub','slr','degree_2','C22_S22_RL06.txt'])
FILES.append([HOST,'pub','slr','TN11E','TN11E.txt'])
# for each SLR file
for FILE in FILES:
local_file = directory.joinpath(FILE[-1])
original_md5 = get_hash(local_file)
from_http(FILE,
timeout=timeout,
context=context,
local=local_file,
hash=original_md5,
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# download geocenter files from CSR
if 'geocenter' in variable:
# recursively create geocenter directory if non-existent
local_dir = directory.joinpath('geocenter')
local_dir.mkdir(mode=mode, parents=True, exist_ok=True)
# download CF-CM SLR and updated SLR geocenter files from Minkang Cheng
FILES = []
FILES.append([HOST,'pub','slr','geocenter','GCN_L1_L2_30d_CF-CM.txt'])
FILES.append([HOST,'outgoing','cheng','gct2est.220_5s'])
# for each SLR geocenter file
for FILE in FILES:
local_file = local_dir.joinpath(FILE[-1])
original_md5 = get_hash(local_file)
from_http(FILE,
timeout=timeout,
context=context,
local=local_file,
hash=original_md5,
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# download LRI-only solutions
if 'LRI' in variable:
remote_path = ['http://download.csr.utexas.edu',
'outgoing', 'gracefo', version]
# find years of available LRI data
years, _ = http_list(remote_path, pattern=r'\d{4}')
# download each available CSR product
for PROD in ['GAC','GAD','GSM']:
# recursively create local directory if non-existent
local_dir = directory.joinpath('CSR', version, PROD)
local_dir.mkdir(mode=mode, parents=True, exist_ok=True)
# for each year
for year in years:
# find LRI files
files, mtimes = http_list([*remote_path, year], pattern=PROD)
# download each file
for fi, lmd in zip(files, mtimes):
local_file = local_dir.joinpath(fi)
original_md5 = get_hash(local_file)
from_http([*remote_path, year, fi],
timeout=timeout,
context=context,
local=local_file,
hash=original_md5,
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# PURPOSE: download GravIS and satellite laser ranging files from GFZ
# https://isdc-data.gfz.de/grace/Level-2/GFZ/RL06_SLR_C20/
# https://isdc-data.gfz.de/grace/GravIS/GFZ/Level-2B/aux_data/
[docs]
def from_gfz(
directory: str | pathlib.Path,
version: str = '0004',
timeout: int | None = None,
chunk: int | None = 8192,
verbose: bool = False,
fid = sys.stdout,
mode: oct = 0o775
):
"""
Download GravIS and satellite laser ranging (SLR) files from the
German Research Centre for Geosciences (GeoForschungsZentrum, GFZ)
Parameters
----------
directory: str
download directory
version: str, default '0004'
version of the GravIS Level-2B data products to download
timeout: int or NoneType, default None
timeout in seconds for blocking operations
chunk: int, default 8192
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default sys.stdout
open file object to print if verbose
mode: oct, default 0o775
permissions mode of output local file
"""
# recursively create directories if non-existent
directory = pathlib.Path(directory).expanduser().absolute()
local_dir = directory.joinpath('geocenter')
local_dir.mkdir(mode=mode, parents=True, exist_ok=True)
# SLR oblateness and combined low-degree harmonic files
FILES = []
FILES.append(['https://isdc-data.gfz.de','grace','Level-2','GFZ',
'RL06_SLR_C20','GFZ_RL06_C20_SLR.dat'])
# GRAVIS-2B_GFZOP_GRACE+SLR_LOW_DEGREES_0004.dat
GRAVIS = f'GRAVIS-2B_GFZOP_GRACE+SLR_LOW_DEGREES_{version}.dat'
FILES.append(['https://isdc-data.gfz.de','grace','GravIS','GFZ',
'Level-2B','aux_data',GRAVIS])
# get each file
for FILE in FILES:
local_file = directory.joinpath(FILE[-1])
from_http(FILE,
timeout=timeout,
local=local_file,
hash=get_hash(local_file),
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# GravIS geocenter file
GRAVIS = f'GRAVIS-2B_GFZOP_GEOCENTER_{version}.dat'
FILE = ['https://isdc-data.gfz.de','grace','GravIS','GFZ','Level-2B',
'aux_data',GRAVIS]
local_file = local_dir.joinpath(FILE[-1])
from_http(FILE,
timeout=timeout,
local=local_file,
hash=get_hash(local_file),
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# PURPOSE: lists files by scraping the GSFC grace-mascons website
def gsfc_list(
HOST: str | list = 'https://earth.gsfc.nasa.gov/geo/data/grace-mascons',
timeout: int | None = None,
parser = lxml.etree.HTMLParser(),
pattern: str = r'',
sort: bool = False
):
"""
Lists files by scraping the GSFC website for GRACE mascons
Parameters
----------
HOST: str or list
remote https host
timeout: int or NoneType, default None
timeout in seconds for blocking operations
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
pattern: str, default ''
regular expression pattern for reducing list
sort: bool, default False
sort output list
Returns
-------
colnames: list
column names in a directory
"""
# verify inputs for remote https host
if isinstance(HOST, str):
HOST = url_split(HOST)
# try listing from https
try:
# Create and submit request.
request = urllib2.Request(posixpath.join(*HOST))
tree = lxml.etree.parse(urllib2.urlopen(request, timeout=timeout),parser)
except (urllib2.HTTPError, urllib2.URLError) as exc:
raise Exception('List error from {0}'.format(posixpath.join(*HOST)))
else:
# read and parse request for relative links to files
rellinks = tree.xpath('//tr/td//a/@href')
# form complete column names
colnames = [posixpath.join(HOST[0], *url_split(l)) for l in rellinks]
# reduce using regular expression pattern
if pattern:
colnames = [f for i,f in enumerate(colnames) if re.search(pattern,f)]
# sort list of column names
if sort:
colnames = [j for i,j in sorted(enumerate(colnames), key=lambda i: i[1])]
# return the list of column names
return colnames
# PURPOSE: download satellite laser ranging files from GSFC
# https://earth.gsfc.nasa.gov/geo/data/slr
[docs]
def from_gsfc(
directory: str | pathlib.Path,
host: str = 'https://earth.gsfc.nasa.gov/sites/default/files/geo/slr-weekly',
timeout: int | None = None,
context: ssl.SSLContext = _default_ssl_context,
chunk: int | None = 16384,
verbose: bool = False,
fid = sys.stdout,
copy: bool = False,
mode: oct = 0o775
):
"""
Download `satellite laser ranging (SLR) <https://earth.gsfc.nasa.gov/geo/data/slr/>`_
files from NASA Goddard Space Flight Center (GSFC)
Parameters
----------
directory: str
download directory
host: str, default 'https://earth.gsfc.nasa.gov/sites/default/files/geo/slr-weekly'
url for the GSFC SLR weekly fields
timeout: int or NoneType, default None
timeout in seconds for blocking operations
context: obj, default gravity_toolkit.utilities._default_ssl_context
SSL context for ``urllib`` opener object
chunk: int, default 16384
chunk size for transfer encoding
verbose: bool, default False
print file transfer information
fid: obj, default fid.stdout
open file object to print if verbose
copy: bool, default False
create a copy of file for archival purposes
mode: oct, default 0o775
permissions mode of output local file
"""
# recursively create directory if non-existent
directory = pathlib.Path(directory).expanduser().absolute()
directory.mkdir(mode=mode, parents=True, exist_ok=True)
# download GSFC SLR 5x5 file
FILE = 'gsfc_slr_5x5c61s61.txt'
local_file = directory.joinpath(FILE)
original_md5 = get_hash(local_file)
fileID = from_http(posixpath.join(host,FILE),
timeout=timeout,
context=context,
local=local_file,
hash=original_md5,
chunk=chunk,
verbose=verbose,
fid=fid,
mode=mode)
# create a dated copy for archival purposes
if copy:
# create copy of file for archiving
# read file and extract data date span
file_contents = fileID.read().decode('utf-8').splitlines()
data_span, = [l for l in file_contents if l.startswith('Data span:')]
# extract start and end of data date span
span_start,span_end = re.findall(r'\d+[\s+]\w{3}[\s+]\d{4}', data_span)
# create copy of file with date span in filename
YM1 = time.strftime('%Y%m', time.strptime(span_start, '%d %b %Y'))
YM2 = time.strftime('%Y%m', time.strptime(span_end, '%d %b %Y'))
COPY = f'GSFC_SLR_5x5c61s61_{YM1}_{YM2}.txt'
shutil.copyfile(local_file, directory.joinpath(COPY))
# copy modification times and permissions for archive file
shutil.copystat(local_file, directory.joinpath(COPY))
# PURPOSE: list a directory on the GFZ ICGEM https server
# http://icgem.gfz-potsdam.de
[docs]
def icgem_list(
host: str = 'http://icgem.gfz-potsdam.de/tom_longtime',
timeout: int | None = None,
parser=lxml.etree.HTMLParser()
):
"""
Parse the table of static gravity field models on the GFZ
`International Centre for Global Earth Models (ICGEM) <http://icgem.gfz-potsdam.de/>`_
server
Parameters
----------
host: str
url for the GFZ ICGEM gravity field table
timeout: int or NoneType
timeout in seconds for blocking operations
parser: obj, default lxml.etree.HTMLParser()
HTML parser for ``lxml``
Returns
-------
colfiles: dict
Static gravity field file urls mapped by field name
"""
# try listing from https
try:
# Create and submit request.
request = urllib2.Request(host)
tree = lxml.etree.parse(urllib2.urlopen(request, timeout=timeout),parser)
except:
raise Exception(f'List error from {host}')
else:
# read and parse request for files
colfiles = tree.xpath('//td[@class="tom-cell-modelfile"]//a/@href')
# reduce list of files to find gfc files
# return the dict of model files mapped by name
return {re.findall(r'(.*?).gfc',posixpath.basename(f)).pop():url_split(f)
for i,f in enumerate(colfiles) if re.search(r'gfc$',f)}