#!/usr/bin/env python
u"""
spatial.py
Written by Tyler Sutterley (10/2024)
Data class for reading, writing and processing spatial data
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
dateutil: powerful extensions to datetime
https://dateutil.readthedocs.io/en/stable/
netCDF4: Python interface to the netCDF C library
https://unidata.github.io/netcdf4-python/netCDF4/index.html
h5py: Pythonic interface to the HDF5 binary data format.
https://www.h5py.org/
PROGRAM DEPENDENCIES:
time.py: utilities for calculating time operations
UPDATE HISTORY:
Updated 10/2024: allow 2D and 3D arrays in output netCDF4 files
Updated 06/2024: use wrapper to importlib for optional dependencies
Updated 05/2024: make subscriptable and allow item assignment
Updated 10/2023: place time and month variables in try/except block
Updated 09/2023: prevent double printing of filenames when using debug
Updated 08/2023: add string representation of the spatial object
Updated 05/2023: use pathlib to define and operate on paths
more operatations on spatial error if in possible data keys
rename reverse function to flip to match numpy nomenclature
Updated 03/2023: customizable file-level attributes to netCDF4 and HDF5
add attributes fetching to from_dict function
retrieve all root attributes from HDF5 and netCDF4 datasets
fix indexing of filenames in single string case
add indexing of filenames to spatial object iterator
use copy.copy and not numpy.copy in copy spatial object function
fix mask and shape of subsetted spatial grid objects
add extend_matrix function and add error output to from_list
convert spacing, extent, shape and ndim to spatial class properties
improve typing for variables in docstrings
set case insensitive filename to None if filename is empty
Updated 02/2023: use monospaced text to note spatial objects in docstrings
Updated 12/2022: add software information to output HDF5 and netCDF4
make spatial objects iterable and with length
Updated 11/2022: use f-strings for formatting verbose or ascii output
Updated 08/2022: fix output latitude HDF5 and netCDF4 attributes
place index filename within try/except statement
Updated 04/2022: updated docstrings to numpy documentation format
using internal netCDF4 and HDF5 readers and writers
include utf-8 encoding in reads to be windows compliant
include filename attribute when copying spatial objects
Updated 12/2021: logging case_insensitive_filename output for debugging
Updated 11/2021: fix kwargs to index and hdf5 read functions
Updated 10/2021: using python logging for handling verbose output
Updated 09/2021: use functions for converting to and from GRACE months
Updated 05/2021: define int/float precision to prevent deprecation warning
Updated 04/2021: add parser object for removing commented or empty lines
use file not found exceptions in case insensitive filename
Updated 02/2021: added replace_masked to replace masked values in data
use adjust_months function to fix special cases of GRACE/GRACE-FO months
added generic reader, generic writer and write to list functions
generalize ascii, netCDF4 and HDF5 readers and writers
replaced numpy bool to prevent deprecation warning
Updated 01/2021: added scaling factor and scaling factor error function
from Lander and Swenson (2012) https://doi.org/10.1029/2011WR011453
Updated 12/2020: added transpose function, can calculate mean over indices
output attributes dictionary from netCDF4 and HDF5 files
can create a spatial object from an open file-like object
Updated 09/2020: added header option to skip rows in ascii files
Updated 08/2020: added compression options for ascii, netCDF4 and HDF5 files
Updated 07/2020: added class docstring and using kwargs for output to file
added case_insensitive_filename function to search directories
Updated 06/2020: added zeros_like() for creating an empty spatial object
Written 06/2020
"""
import re
import io
import copy
import gzip
import time
import uuid
import logging
import pathlib
import zipfile
import numpy as np
import gravity_toolkit.version
from gravity_toolkit.time import adjust_months, calendar_to_grace
from gravity_toolkit.utilities import import_dependency
# attempt imports
h5py = import_dependency('h5py')
netCDF4 = import_dependency('netCDF4')
[docs]
class spatial(object):
"""
Data class for reading, writing and processing spatial data
Attributes
----------
data: np.ndarray
spatial grid data
mask: np.ndarray
spatial grid mask
lon: np.ndarray
grid longitudes
lat: np.ndarray
grid latitudes
time: np.ndarray
time variable of the spatial data
month: np.ndarray
GRACE/GRACE-FO months variable of the spatial data
fill_value: float or NoneType, default None
invalid value for spatial grid data
attributes: dict
attributes of ``spatial`` variables
filename: str
input or output filename
"""
np.seterr(invalid='ignore')
def __init__(self, **kwargs):
# set default keyword arguments
kwargs.setdefault('fill_value',None)
# set default class attributes
self.data=None
self.mask=None
self.lon=None
self.lat=None
self.time=None
self.month=None
self.fill_value=kwargs['fill_value']
self.attributes=dict()
self.filename=None
# iterator
self.__index__ = 0
[docs]
def case_insensitive_filename(self, filename):
"""
Searches a directory for a filename without case dependence
Parameters
----------
filename: str, io.IOBase, pathlib.Path or None
input filename
"""
# check if filename is open file object
if isinstance(filename, io.IOBase):
self.filename = copy.copy(filename)
elif isinstance(filename, type(None)) or not bool(filename):
self.filename = None
else:
# tilde-expand input filename
self.filename = pathlib.Path(filename).expanduser().absolute()
# check if file presently exists with input case
if not self.filename.exists():
# search for filename without case dependence
f = [f.name for f in self.filename.parent.iterdir() if
re.match(self.filename.name, f.name, re.I)]
if not f:
msg = f'{filename} not found in file system'
raise FileNotFoundError(msg)
self.filename = self.filename.with_name(f.pop())
# return the filename
return self
[docs]
def compressuser(self, filename=None):
"""
Tilde-compresses a file to be relative to the home directory
Parameters
----------
filename: str or None, default None
output filename
"""
if filename is None:
filename = self.filename
else:
filename = pathlib.Path(filename).expanduser().absolute()
# attempt to compress filename relative to home directory
try:
relative_to = filename.relative_to(pathlib.Path().home())
except (ValueError, AttributeError) as exc:
return filename
else:
return pathlib.Path('~').joinpath(relative_to)
[docs]
def from_ascii(self, filename, date=True, **kwargs):
"""
Read a ``spatial`` object from an ascii file
Parameters
----------
filename: str
full path of input ascii file
date: bool, default True
ascii file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
spacing: list, default [None,None]
grid step size ``[longitude,latitude]``
extent: list, default [None,None,None,None]
spatial grid bounds
``[minimum longitude, maximum longitude,
minimum latitude, maximum latitude]``
nlat: int or NoneType, default None
length of latitude dimension
nlon: int or NoneType, default None
length of longitude dimension
columns: list, default ['lon','lat','data','time']
variable names for each column
header: int, default 0
Number of rows of header lines to skip
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('verbose',False)
kwargs.setdefault('compression',None)
kwargs.setdefault('spacing',[None,None])
kwargs.setdefault('nlat',None)
kwargs.setdefault('nlon',None)
kwargs.setdefault('extent',[None]*4)
kwargs.setdefault('columns',['lon','lat','data','time'])
kwargs.setdefault('header',0)
# open the ascii file and extract contents
logging.info(str(self.filename))
if (kwargs['compression'] == 'gzip'):
# read input ascii data from gzip compressed file and split lines
with gzip.open(self.filename, mode='r') as f:
file_contents = f.read().decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'zip'):
# read input ascii data from zipped file and split lines
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
file_contents = z.read(stem).decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'bytes'):
# read input file object and split lines
file_contents = self.filename.read().splitlines()
else:
# read input ascii file (.txt, .asc) and split lines
with self.filename.open(mode='r', encoding='utf8') as f:
file_contents = f.read().splitlines()
# compile regular expression operator for extracting numerical values
# from input ascii files of spatial data
regex_pattern = r'[-+]?(?:(?:\d*\.\d+)|(?:\d+\.?))(?:[EeD][+-]?\d+)?'
rx = re.compile(regex_pattern, re.VERBOSE)
# output spatial dimensions
if (None not in kwargs['extent']) and kwargs['nlat'] and kwargs['nlon']:
extent = kwargs.get('extent')
self.lat = np.linspace(extent[3], extent[2], kwargs['nlat'])
self.lon = np.linspace(extent[0], extent[1], kwargs['nlon'])
dlon = np.abs(self.lon[1] - self.lon[0])
dlat = np.abs(self.lat[1] - self.lat[0])
elif (None not in kwargs['extent']) and (None not in kwargs['spacing']):
extent = kwargs.get('extent')
dlon, dlat = kwargs.get('spacing')
self.lat = np.arange(extent[3], extent[2] - dlat, dlat)
self.lon = np.arange(extent[0], extent[1] + dlon, dlon)
elif kwargs['nlat'] and kwargs['nlon'] and (None not in kwargs['spacing']):
dlon, dlat = kwargs.get('spacing')
self.lat = np.zeros((kwargs['nlat']))
self.lon = np.zeros((kwargs['nlon']))
else:
raise ValueError('Unknown dimensions for input ``spatial`` object')
# get spatial dimensions
nlat = len(self.lat)
nlon = len(self.lon)
# output spatial data
self.data = np.zeros((nlat, nlon))
self.mask = np.zeros((nlat, nlon), dtype=bool)
# remove time from list of column names if not date
columns = [c for c in kwargs['columns'] if (c != 'time')]
# extract spatial data array and convert to matrix
# for each line in the file
header = kwargs['header']
for line in file_contents[header:]:
# extract columns of interest and assign to dict
# convert fortran exponentials if applicable
d = {c:r.replace('D','E') for c,r in zip(columns,rx.findall(line))}
# convert line coordinates to integers
ilon = np.int64(np.float64(d['lon'])/dlon)
ilat = np.int64((90.0 - np.float64(d['lat']))//dlat)
self.data[ilat, ilon] = np.float64(d['data'])
self.mask[ilat, ilon] = False
self.lon[ilon] = np.float64(d['lon'])
self.lat[ilat] = np.float64(d['lat'])
# if the ascii file contains date variables
if date:
self.time = np.array(d['time'],dtype='f')
self.month = calendar_to_grace(self.time)
# if the ascii file contains date variables
if date:
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# update mask
self.update_mask()
return self
[docs]
def from_netCDF4(self, filename, **kwargs):
"""
Read a ``spatial`` object from a netCDF4 file
Parameters
----------
filename: str
full path of input netCDF4 file
date: bool, default True
netCDF4 file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
varname: str, default 'data'
name for data variable
lonname: str, default 'lon'
name for longitude variable
latname: str, default 'lat'
name for latitude variable
timename: str, default 'time'
name for time-dimension variable
field_mapping: dict, default {}
mapping between output variables and input netCDF4
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',True)
kwargs.setdefault('compression',None)
kwargs.setdefault('varname','z')
kwargs.setdefault('lonname','lon')
kwargs.setdefault('latname','lat')
kwargs.setdefault('timename','time')
kwargs.setdefault('field_mapping',{})
kwargs.setdefault('verbose',False)
# Open the NetCDF4 file for reading
if (kwargs['compression'] == 'gzip'):
# read as in-memory (diskless) netCDF4 dataset
with gzip.open(self.filename, mode='r') as f:
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=f.read())
elif (kwargs['compression'] == 'zip'):
# read zipped file and extract file into in-memory file object
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
# first try finding a netCDF4 file with same base filename
# if none found simply try searching for a netCDF4 file
try:
f,=[f for f in z.namelist() if re.match(stem,f,re.I)]
except:
f,=[f for f in z.namelist() if re.search(r'\.nc(4)?$',f)]
# read bytes from zipfile as in-memory (diskless) netCDF4 dataset
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=z.read(f))
elif (kwargs['compression'] == 'bytes'):
# read as in-memory (diskless) netCDF4 dataset
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=filename.read())
else:
# read netCDF4 dataset
fileID = netCDF4.Dataset(self.filename, 'r')
# Output NetCDF file information
logging.info(fileID.filepath())
logging.info(list(fileID.variables.keys()))
# set automasking
fileID.set_auto_mask(False)
# list of variable attributes
attributes_list = ['description','units','long_name','calendar',
'standard_name','_FillValue','missing_value']
# mapping between output keys and netCDF4 variable names
if not kwargs['field_mapping']:
fields = [kwargs['lonname'],kwargs['latname'],kwargs['varname']]
if kwargs['date']:
fields.append(kwargs['timename'])
kwargs['field_mapping'] = self.default_field_mapping(fields)
# for each variable
for field,key in kwargs['field_mapping'].items():
# Getting the data from each NetCDF variable
# remove singleton dimensions
setattr(self, field, np.squeeze(fileID.variables[key][:]))
# Getting attributes of included variables
self.attributes[field] = {}
for attr in attributes_list:
# try getting the attribute
try:
self.attributes[field][attr] = \
fileID.variables[key].getncattr(attr)
except (KeyError,ValueError,AttributeError):
pass
# get global netCDF4 attributes
self.attributes['ROOT'] = {}
for att_name in fileID.ncattrs():
self.attributes['ROOT'][att_name] = fileID.getncattr(att_name)
# Closing the NetCDF file
fileID.close()
# switching data array to lat/lon if lon/lat
sz = self.data.shape
if (self.data.ndim == 2) and (len(self.lon) == sz[0]):
self.data = self.data.T
# set fill value and mask
if '_FillValue' in self.attributes['data'].keys():
self.fill_value = self.attributes['data']['_FillValue']
self.mask = (self.data == self.fill_value)
else:
self.mask = np.zeros(self.data.shape, dtype=bool)
# set GRACE/GRACE-FO month if file has date variables
if kwargs['date']:
self.month = calendar_to_grace(self.time)
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# update mask
self.update_mask()
return self
[docs]
def from_HDF5(self, filename, **kwargs):
"""
Read a ``spatial`` object from a HDF5 file
Parameters
----------
filename: str
full path of input HDF5 file
date: bool, default True
HDF5 file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
varname: str, default 'data'
name for data variable
lonname: str, default 'lon'
name for longitude variable
latname: str, default 'lat'
name for latitude variable
timename: str, default 'time'
name for time-dimension variable
field_mapping: dict, default {}
mapping between output variables and input HDF5
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',True)
kwargs.setdefault('compression',None)
kwargs.setdefault('varname','z')
kwargs.setdefault('lonname','lon')
kwargs.setdefault('latname','lat')
kwargs.setdefault('timename','time')
kwargs.setdefault('field_mapping',{})
kwargs.setdefault('verbose',False)
# Open the HDF5 file for reading
if (kwargs['compression'] == 'gzip'):
# read gzip compressed file and extract into in-memory file object
with gzip.open(self.filename, mode='r') as f:
fid = io.BytesIO(f.read())
# set filename of BytesIO object
fid.filename = self.filename.name
# rewind to start of file
fid.seek(0)
# read as in-memory (diskless) HDF5 dataset from BytesIO object
fileID = h5py.File(fid, 'r')
elif (kwargs['compression'] == 'zip'):
# read zipped file and extract file into in-memory file object
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
# first try finding a HDF5 file with same base filename
# if none found simply try searching for a HDF5 file
try:
f,=[f for f in z.namelist() if re.match(stem,f,re.I)]
except:
f,=[f for f in z.namelist() if re.search(r'\.H(DF)?5$',f,re.I)]
# read bytes from zipfile into in-memory BytesIO object
fid = io.BytesIO(z.read(f))
# set filename of BytesIO object
fid.filename = self.filename.name
# rewind to start of file
fid.seek(0)
# read as in-memory (diskless) HDF5 dataset from BytesIO object
fileID = h5py.File(fid, mode='r')
elif (kwargs['compression'] == 'bytes'):
# read as in-memory (diskless) HDF5 dataset
fileID = h5py.File(filename, mode='r')
else:
# read HDF5 dataset
fileID = h5py.File(self.filename, 'r')
# Output HDF5 file information
logging.info(fileID.filename)
logging.info(list(fileID.keys()))
# list of variable attributes
attributes_list = ['description','units','long_name','calendar',
'standard_name','_FillValue','missing_value']
# mapping between output keys and HDF5 variable names
if not kwargs['field_mapping']:
fields = [kwargs['lonname'],kwargs['latname'],kwargs['varname']]
if kwargs['date']:
fields.append(kwargs['timename'])
kwargs['field_mapping'] = self.default_field_mapping(fields)
# for each variable
for field,key in kwargs['field_mapping'].items():
# Getting the data from each HDF5 variable
# remove singleton dimensions
setattr(self, field, np.squeeze(fileID[key][:]))
# Getting attributes of included variables
self.attributes[field] = {}
for attr in attributes_list:
try:
self.attributes[field][attr] = fileID[key].attrs[attr]
except (KeyError, AttributeError):
pass
# get global HDF5 attributes
self.attributes['ROOT'] = {}
for att_name,att_val in fileID.attrs.items():
self.attributes['ROOT'][att_name] = att_val
# Closing the HDF5 file
fileID.close()
# switching data array to lat/lon if lon/lat
sz = self.data.shape
if (self.data.ndim == 2) and (len(self.lon) == sz[0]):
self.data = self.data.T
# set fill value and mask
if '_FillValue' in self.attributes['data'].keys():
self.fill_value = self.attributes['data']['_FillValue']
self.mask = (self.data == self.fill_value)
else:
self.mask = np.zeros(self.data.shape, dtype=bool)
# set GRACE/GRACE-FO month if file has date variables
if kwargs['date']:
self.month = calendar_to_grace(self.time)
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# update mask
self.update_mask()
return self
[docs]
def from_index(self, filename, **kwargs):
"""
Read a ``spatial`` object from an index of
ascii, netCDF4 or HDF5 files
Parameters
----------
filename: str
full path of index file
format: str or NoneType, default None
format of individual files within index
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
files contains date information
sort: bool, default True
sort ``spatial`` objects by date information
**kwargs: dict
keyword arguments for input readers
"""
# set default keyword arguments
kwargs.setdefault('format',None)
kwargs.setdefault('date',True)
kwargs.setdefault('sort',True)
# set filename
self.case_insensitive_filename(filename)
# file parser for reading index files
# removes commented lines (can comment out files in the index)
# removes empty lines (if there are extra empty lines)
parser = re.compile(r'^(?!\#|\%|$)', re.VERBOSE)
# Read index file of input spatial data
with self.filename.open(mode='r', encoding='utf8') as f:
file_list = [l for l in f.read().splitlines() if parser.match(l)]
# create a list of spatial objects
s = []
# for each file in the index
for i,f in enumerate(file_list):
if (kwargs['format'] == 'ascii'):
# netcdf (.nc)
s.append(spatial().from_ascii(f, **kwargs))
elif (kwargs['format'] == 'netCDF4'):
# netcdf (.nc)
s.append(spatial().from_netCDF4(f, **kwargs))
elif (kwargs['format'] == 'HDF5'):
# HDF5 (.H5)
s.append(spatial().from_HDF5(f, **kwargs))
# create a single spatial object from the list
return self.from_list(s,date=kwargs['date'],sort=kwargs['sort'])
[docs]
def from_list(self, object_list, **kwargs):
"""
Build a sorted ``spatial`` object from a list of
other ``spatial`` objects
Parameters
----------
object_list: list
list of ``spatial`` objects to be merged
date: bool, default True
files contains date information
sort: bool, default True
sort ``spatial`` objects by date information
clear: bool, default True
clear the list of ``spatial`` objects from memory
"""
# set default keyword arguments
kwargs.setdefault('date',True)
kwargs.setdefault('sort',True)
kwargs.setdefault('clear',False)
# number of spatial objects in list
n = len(object_list)
# indices to sort data objects if spatial list contain dates
if kwargs['date'] and kwargs['sort']:
list_sort = np.argsort([d.time for d in object_list],axis=None)
else:
list_sort = np.arange(n)
# extract grid spacing
shape = object_list[0].shape
# create output spatial grid and mask
self.data = np.zeros((shape[0], shape[1], n))
self.mask = np.zeros((shape[0], shape[1], n),dtype=bool)
# add error if in original list attributes
if hasattr(object_list[0], 'error'):
self.error = np.zeros((shape[0], shape[1], n))
self.fill_value = object_list[0].fill_value
self.lon = object_list[0].lon.copy()
self.lat = object_list[0].lat.copy()
# create list of files and attributes
self.filename = []
self.attributes = []
# output dates
if kwargs['date']:
self.time = np.zeros((n))
self.month = np.zeros((n),dtype=np.int64)
# for each indice
for t,i in enumerate(list_sort):
self.data[:,:,t] = object_list[i].data[:,:].copy()
self.mask[:,:,t] |= object_list[i].mask[:,:]
if hasattr(object_list[i], 'error'):
self.error[:,:,t] = object_list[i].error[:,:].copy()
if kwargs['date']:
self.time[t] = np.atleast_1d(object_list[i].time)
self.month[t] = np.atleast_1d(object_list[i].month)
# append filename to list
if getattr(object_list[i], 'filename'):
self.filename.append(object_list[i].filename)
# append attributes to list
if getattr(object_list[i], 'attributes'):
self.attributes.append(object_list[i].attributes)
# adjust months to fix special cases if necessary
if kwargs['date']:
self.month = adjust_months(self.month)
# update mask
self.update_mask()
# clear the input list to free memory
if kwargs['clear']:
object_list = None
# return the single spatial object
return self
[docs]
def from_file(self, filename, format=None, date=True, **kwargs):
"""
Read a ``spatial`` object from a specified format
Parameters
----------
filename: str
full path of input file
format: str or NoneType, default None
file format
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
file contains date information
verbose: bool, default False
print file and variable information
**kwargs: dict
keyword arguments for input readers
"""
# set filename
self.case_insensitive_filename(filename)
# set default verbosity
kwargs.setdefault('verbose',False)
# read from file
if (format == 'ascii'):
# ascii (.txt)
return spatial().from_ascii(filename, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
return spatial().from_netCDF4(filename, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
return spatial().from_HDF5(filename, date=date, **kwargs)
[docs]
def from_dict(self, d, **kwargs):
"""
Convert a ``dict`` object to a ``spatial`` object
Parameters
----------
d: dict
dictionary object to be converted
"""
# assign variables to self
for key in ['lon','lat','data','error','time','month','directory']:
try:
setattr(self, key, d[key].copy())
except (AttributeError, KeyError):
pass
# create output mask for data
self.mask = np.zeros_like(self.data, dtype=bool)
# add attributes to root if in dictionary
self.attributes['ROOT'] = d.get('attributes')
# update mask
self.update_mask()
return self
[docs]
def to_ascii(self, filename, **kwargs):
"""
Write a ``spatial`` object to ascii file
Parameters
----------
filename: str
full path of output ascii file
date: bool, default True
``spatial`` objects contain date information
verbose: bool, default False
Output file and variable information
"""
self.filename = pathlib.Path(filename).expanduser().absolute()
# set default verbosity and parameters
kwargs.setdefault('date',True)
kwargs.setdefault('verbose',False)
logging.info(str(self.filename))
# open the output file
fid = self.filename.open(mode='w', encoding='utf8')
if hasattr(self, 'error') and kwargs['date']:
file_format = '{0:10.4f} {1:10.4f} {2:12.4f} {3:12.4f} {4:10.4f}'
elif hasattr(self, 'error'):
file_format = '{0:10.4f} {1:10.4f} {2:12.4f} {3:12.4f}'
elif kwargs['date']:
file_format = '{0:10.4f} {1:10.4f} {2:12.4f} {4:10.4f}'
else:
file_format = '{0:10.4f} {1:10.4f} {2:12.4f}'
# write to file for each valid latitude and longitude
ii,jj = np.nonzero((self.data != self.fill_value) & (~self.mask))
for i,j in zip(ii,jj):
ln = self.lon[j]
lt = self.lat[i]
data = self.data[i,j]
error = self.error[i,j] if hasattr(self, 'error') else 0.0
print(file_format.format(ln,lt,data,error,self.time), file=fid)
# close the output file
fid.close()
[docs]
def to_netCDF4(self, filename, **kwargs):
"""
Write a ``spatial`` object to netCDF4 file
Parameters
----------
filename: str
full path of output netCDF4 file
varname: str, default 'z'
data variable name in netCDF4 file
lonname: str
longitude variable name in netCDF4 file
latname: str
latitude variable name in netCDF4 file
field_mapping: dict, default {}
mapping between input variables and output netCDF4
attributes: dict, default {}
output netCDF4 variable and file-level attributes
units: str or NoneType, default: None
data variable units
longname: str or NoneType, default: None
data variable unit description
time_units: str, default 'years'
time variable units
time_longname: str, default 'Date_in_Decimal_Years'
time variable unit description
title: str or NoneType, default None
title attribute of dataset
source: str or NoneType, default None
source attribute of dataset
reference: str or NoneType, default None
reference attribute of dataset
date: bool, default True
``spatial`` objects contain date information
clobber: bool, default True
Overwrite an existing netCDF4 file
verbose: bool, default False
Output file and variable information
"""
# set default verbosity and parameters
kwargs.setdefault('verbose',False)
kwargs.setdefault('varname','z')
kwargs.setdefault('lonname','lon')
kwargs.setdefault('latname','lat')
kwargs.setdefault('timename','time')
kwargs.setdefault('field_mapping',{})
attributes = self.attributes.get('ROOT') or {}
kwargs.setdefault('attributes',dict(ROOT=attributes))
kwargs.setdefault('units',None)
kwargs.setdefault('longname',None)
kwargs.setdefault('time_units','years')
kwargs.setdefault('time_longname','Date_in_Decimal_Years')
kwargs.setdefault('title',None)
kwargs.setdefault('source',None)
kwargs.setdefault('reference',None)
kwargs.setdefault('date',True)
kwargs.setdefault('clobber',True)
kwargs.setdefault('verbose',False)
# setting NetCDF clobber attribute
clobber = 'w' if kwargs['clobber'] else 'a'
# opening NetCDF file for writing
self.filename = pathlib.Path(filename).expanduser().absolute()
fileID = netCDF4.Dataset(self.filename, clobber, format="NETCDF4")
# mapping between output keys and netCDF4 variable names
if not kwargs['field_mapping']:
fields = [kwargs['lonname'],kwargs['latname'],kwargs['varname']]
if kwargs['date']:
fields.append(kwargs['timename'])
kwargs['field_mapping'] = self.default_field_mapping(fields)
# create attributes dictionary for output variables
if not all(key in kwargs['attributes'] for key in kwargs['field_mapping'].values()):
# Defining attributes for longitude and latitude
kwargs['attributes'][kwargs['field_mapping']['lon']] = {}
kwargs['attributes'][kwargs['field_mapping']['lon']]['long_name'] = 'longitude'
kwargs['attributes'][kwargs['field_mapping']['lon']]['units'] = 'degrees_east'
kwargs['attributes'][kwargs['field_mapping']['lat']] = {}
kwargs['attributes'][kwargs['field_mapping']['lat']]['long_name'] = 'latitude'
kwargs['attributes'][kwargs['field_mapping']['lat']]['units'] = 'degrees_north'
# Defining attributes for dataset
kwargs['attributes'][kwargs['field_mapping']['data']] = {}
kwargs['attributes'][kwargs['field_mapping']['data']]['long_name'] = kwargs['longname']
kwargs['attributes'][kwargs['field_mapping']['data']]['units'] = kwargs['units']
# Defining attributes for date if applicable
if kwargs['date']:
kwargs['attributes'][kwargs['field_mapping']['time']] = {}
kwargs['attributes'][kwargs['field_mapping']['time']]['long_name'] = kwargs['time_longname']
kwargs['attributes'][kwargs['field_mapping']['time']]['units'] = kwargs['time_units']
# add default global (file-level) attributes
if kwargs['title']:
kwargs['attributes']['ROOT']['title'] = kwargs['title']
if kwargs['source']:
kwargs['attributes']['ROOT']['source'] = kwargs['source']
if kwargs['reference']:
kwargs['attributes']['ROOT']['reference'] = kwargs['reference']
# netCDF4 dimension variables
dimensions = []
dimensions.append('lat')
dimensions.append('lon')
# expand dimensions if containing date variables
if kwargs['date']:
self.expand_dims()
dimensions.append('time')
dims = tuple(kwargs['field_mapping'][key] for key in dimensions)
# defining the NetCDF dimensions and variables
nc = {}
# NetCDF dimensions
for i,field in enumerate(dimensions):
temp = getattr(self,field)
key = kwargs['field_mapping'][field]
fileID.createDimension(key, len(temp))
nc[key] = fileID.createVariable(key, temp.dtype, (key,))
# NetCDF spatial data
variables = set(kwargs['field_mapping'].keys()) - set(dimensions)
for field in sorted(variables):
temp = getattr(self,field)
ndim = temp.ndim
key = kwargs['field_mapping'][field]
nc[key] = fileID.createVariable(key, temp.dtype, dims[:ndim],
fill_value=self.fill_value, zlib=True)
# filling NetCDF variables
for field,key in kwargs['field_mapping'].items():
nc[key][:] = getattr(self,field)
# filling netCDF dataset attributes
for att_name,att_val in kwargs['attributes'][key].items():
# skip variable attribute if None
if not att_val:
continue
# skip variable attributes if in list
if att_name not in ('DIMENSION_LIST','CLASS','NAME','_FillValue'):
nc[key].setncattr(att_name, att_val)
# global attributes of NetCDF4 file
for att_name,att_val in kwargs['attributes']['ROOT'].items():
fileID.setncattr(att_name, att_val)
# add software information
fileID.software_reference = gravity_toolkit.version.project_name
fileID.software_version = gravity_toolkit.version.full_version
# date created
fileID.date_created = time.strftime('%Y-%m-%d',time.localtime())
# Output NetCDF structure information
logging.info(str(self.filename))
logging.info(list(fileID.variables.keys()))
# Closing the NetCDF file
fileID.close()
[docs]
def to_HDF5(self, filename, **kwargs):
"""
Write a ``spatial`` object to HDF5 file
Parameters
----------
filename: str
full path of output HDF5 file
varname: str, default 'z'
data variable name in HDF5 file
lonname: str
longitude variable name in HDF5 file
latname: str
latitude variable name in HDF5 file
field_mapping: dict, default {}
mapping between input variables and output HDF5
attributes: dict, default {}
output HDF5 variable and file-level attributes
units: str or NoneType, default: None
data variable units
longname: str or NoneType, default: None
data variable unit description
time_units: str, default 'years'
time variable units
time_longname: str, default 'Date_in_Decimal_Years'
time variable unit description
title: str or NoneType, default None
description attribute of dataset
source: str or NoneType, default None
source attribute of dataset
reference: str or NoneType, default None
reference attribute of dataset
date: bool, default True
``spatial`` objects contain date information
clobber: bool, default True
Overwrite an existing HDF5 file
verbose: bool, default False
Output file and variable information
"""
# set default verbosity and parameters
kwargs.setdefault('verbose',False)
kwargs.setdefault('varname','z')
kwargs.setdefault('lonname','lon')
kwargs.setdefault('latname','lat')
kwargs.setdefault('timename','time')
kwargs.setdefault('field_mapping',{})
attributes = self.attributes.get('ROOT') or {}
kwargs.setdefault('attributes',dict(ROOT=attributes))
kwargs.setdefault('units',None)
kwargs.setdefault('longname',None)
kwargs.setdefault('time_units','years')
kwargs.setdefault('time_longname','Date_in_Decimal_Years')
kwargs.setdefault('title',None)
kwargs.setdefault('source',None)
kwargs.setdefault('reference',None)
kwargs.setdefault('date',True)
kwargs.setdefault('clobber',True)
kwargs.setdefault('verbose',False)
# setting NetCDF clobber attribute
clobber = 'w' if kwargs['clobber'] else 'w-'
# opening NetCDF file for writing
self.filename = pathlib.Path(filename).expanduser().absolute()
fileID = h5py.File(self.filename, clobber)
# mapping between output keys and HDF5 variable names
if not kwargs['field_mapping']:
fields = [kwargs['lonname'],kwargs['latname'],kwargs['varname']]
if kwargs['date']:
fields.append(kwargs['timename'])
kwargs['field_mapping'] = self.default_field_mapping(fields)
# create attributes dictionary for output variables
if not all(key in kwargs['attributes'] for key in kwargs['field_mapping'].values()):
# Defining attributes for longitude and latitude
kwargs['attributes'][kwargs['field_mapping']['lon']] = {}
kwargs['attributes'][kwargs['field_mapping']['lon']]['long_name'] = 'longitude'
kwargs['attributes'][kwargs['field_mapping']['lon']]['units'] = 'degrees_east'
kwargs['attributes'][kwargs['field_mapping']['lat']] = {}
kwargs['attributes'][kwargs['field_mapping']['lat']]['long_name'] = 'latitude'
kwargs['attributes'][kwargs['field_mapping']['lat']]['units'] = 'degrees_north'
# Defining attributes for dataset
kwargs['attributes'][kwargs['field_mapping']['data']] = {}
kwargs['attributes'][kwargs['field_mapping']['data']]['long_name'] = kwargs['longname']
kwargs['attributes'][kwargs['field_mapping']['data']]['units'] = kwargs['units']
# Defining attributes for date if applicable
if kwargs['date']:
kwargs['attributes'][kwargs['field_mapping']['time']] = {}
kwargs['attributes'][kwargs['field_mapping']['time']]['long_name'] = kwargs['time_longname']
kwargs['attributes'][kwargs['field_mapping']['time']]['units'] = kwargs['time_units']
# add default global (file-level) attributes
if kwargs['title']:
kwargs['attributes']['ROOT']['title'] = kwargs['title']
if kwargs['source']:
kwargs['attributes']['ROOT']['source'] = kwargs['source']
if kwargs['reference']:
kwargs['attributes']['ROOT']['reference'] = kwargs['reference']
# HDF5 dimension variables
dimensions = []
dimensions.append('lat')
dimensions.append('lon')
# expand dimensions if containing date variables
if kwargs['date']:
self.expand_dims()
dimensions.append('time')
dims = tuple(kwargs['field_mapping'][key] for key in dimensions)
# Defining the HDF5 dataset variables
h5 = {}
for field,key in kwargs['field_mapping'].items():
temp = getattr(self,field)
key = kwargs['field_mapping'][field]
h5[key] = fileID.create_dataset(key, temp.shape,
data=temp, dtype=temp.dtype, compression='gzip')
# filling HDF5 dataset attributes
for att_name,att_val in kwargs['attributes'][key].items():
# skip variable attribute if None
if not att_val:
continue
# skip variable attributes if in list
if att_name not in ('DIMENSION_LIST','CLASS','NAME'):
h5[key].attrs[att_name] = att_val
# add dimensions
variables = set(kwargs['field_mapping'].keys()) - set(dimensions)
for field in sorted(variables):
key = kwargs['field_mapping'][field]
for i,dim in enumerate(dims):
h5[key].dims[i].label = dim
h5[key].dims[i].attach_scale(h5[dim])
# Dataset contains missing values
if (self.fill_value is not None):
h5[key].attrs['_FillValue'] = self.fill_value
# global attributes of HDF5 file
for att_name,att_val in kwargs['attributes']['ROOT'].items():
fileID.attrs[att_name] = att_val
# add software information
fileID.attrs['software_reference'] = gravity_toolkit.version.project_name
fileID.attrs['software_version'] = gravity_toolkit.version.full_version
# date created
fileID.attrs['date_created'] = time.strftime('%Y-%m-%d',time.localtime())
# Output HDF5 structure information
logging.info(str(self.filename))
logging.info(list(fileID.keys()))
# Closing the NetCDF file
fileID.close()
[docs]
def to_index(self, filename, file_list, format=None, date=True, **kwargs):
"""
Write a ``spatial`` object to index of ascii, netCDF4 or HDF5 files
Parameters
----------
filename: str
full path of index file to be written
file_list: list
list of filenames for each output file
format: str or NoneType, default None
format of files in index
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
``spatial`` object contains date information
verbose: bool, default False
print file and variable information
kwargs: dict
keyword arguments for output writers
"""
# Write index file of output spatial files
self.filename = pathlib.Path(filename).expanduser().absolute()
fid = self.filename.open(mode='w', encoding='utf8')
# set default verbosity
kwargs.setdefault('verbose',False)
# for each file to be in the index
for i,f in enumerate(file_list):
# print filename to index
print(self.compressuser(f), file=fid)
# index spatial object at i
s = self.index(i, date=date)
# write to file
if (format == 'ascii'):
# ascii (.txt)
s.to_ascii(f, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
s.to_netCDF4(f, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
s.to_HDF5(f, date=date, **kwargs)
# close the index file
fid.close()
[docs]
def to_file(self, filename, format=None, date=True, **kwargs):
"""
Write a ``spatial`` object to a specified format
Parameters
----------
filename: str
full path of output file
format: str or NoneType, default None
file format
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
``spatial`` object contains date information
verbose: bool, default False
print file and variable information
kwargs: dict
keyword arguments for output writers
"""
# set default verbosity
kwargs.setdefault('verbose',False)
# write to file
if (format == 'ascii'):
# ascii (.txt)
self.to_ascii(filename, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
self.to_netCDF4(filename, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
self.to_HDF5(filename, date=date, **kwargs)
[docs]
def default_field_mapping(self, variables):
"""
Builds field mappings from a variable list
Parameters
----------
variables: list
netCDF4/HDF5 variables names to be mapped
- ``lonname``
- ``latname``
- ``varname``
- ``timename``
Returns
-------
field_mapping: dict
Field mappings for netCDF4/HDF5 read and write functions
"""
# get each variable name and add to field mapping dictionary
field_mapping = {}
for i, var in enumerate(['lon', 'lat', 'data', 'time']):
try:
field_mapping[var] = copy.copy(variables[i])
except IndexError as exc:
pass
# return the field mapping
return field_mapping
[docs]
def to_masked_array(self):
"""
Convert a ``spatial`` object to a masked numpy array
"""
return np.ma.array(self.data, mask=self.mask,
fill_value=self.fill_value)
[docs]
def update_mask(self):
"""
Update the mask of the ``spatial`` object
"""
if self.fill_value is not None:
self.mask |= (self.data == self.fill_value)
self.mask |= np.isnan(self.data)
self.data[self.mask] = self.fill_value
if hasattr(self, 'error'):
self.error[self.mask] = self.fill_value
return self
[docs]
def copy(self):
"""
Copy a ``spatial`` object to a new ``spatial`` object
"""
temp = spatial(fill_value=self.fill_value)
# copy attributes or update attributes dictionary
if isinstance(self.attributes, list):
setattr(temp,'attributes',self.attributes)
elif isinstance(self.attributes, dict):
temp.attributes.update(self.attributes)
# assign variables to self
var = ['lon','lat','data','mask','error','time','month','filename']
for key in var:
try:
val = getattr(self, key)
setattr(temp, key, copy.copy(val))
except AttributeError:
pass
# update mask
temp.replace_masked()
return temp
[docs]
def zeros_like(self):
"""
Create a ``spatial`` object using the dimensions of another
"""
temp = spatial(fill_value=self.fill_value)
# assign variables to self
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
var = ['data','mask','error','time','month']
for key in var:
try:
val = getattr(self, key)
setattr(temp, key, np.zeros_like(val))
except AttributeError:
pass
# update mask
temp.replace_masked()
return temp
[docs]
def expand_dims(self):
"""
Add a singleton dimension to a ``spatial`` object if non-existent
"""
# change time dimensions to be iterable
self.time = np.atleast_1d(self.time)
self.month = np.atleast_1d(self.month)
# output spatial with a third dimension
if (np.ndim(self.data) == 2):
self.data = self.data[:,:,None]
# try expanding mask variable
try:
self.mask = self.mask[:,:,None]
except Exception as exc:
pass
# try expanding spatial error
try:
self.error = self.error[:,:,None]
except AttributeError as exc:
pass
# update mask
self.update_mask()
return self
# PURPOSE: Extend a global matrix
[docs]
def extend_matrix(self):
"""
Extends a global matrix to wrap along longitudes
Returns
-------
temp: float
extended matrix
"""
temp = self.copy()
# shape of the original data object
ny, nx, *nt = self.shape
# extended longitude array [x-1,x0,...,xN,xN+1]
temp.lon = np.zeros((nx+2), dtype=self.lon.dtype)
temp.lon[0] = self.lon[0] - self.spacing[0]
temp.lon[1:-1] = self.lon[:]
temp.lon[-1] = self.lon[-1] + self.spacing[1]
# attempt to extend possible data variables
for key in ['data','mask','error']:
try:
# get the original data variable
var = getattr(self, key)
# extended data matrices along longitude axis
if (self.ndim == 2):
tmp = np.zeros((ny, nx+2), dtype=var.dtype)
tmp[:,0] = var[:,-1]
tmp[:,1:-1] = var[:,:]
tmp[:,-1] = var[:,0]
elif (self.ndim == 3):
var = getattr(self, key)
tmp = np.zeros((ny, nx+2, nt[0]), dtype=var.dtype)
tmp[:,0,:] = var[:,-1,:]
tmp[:,1:-1,:] = var[:,:,:]
tmp[:,-1,:] = var[:,0,:]
# set the output extended data variable
setattr(temp, key, tmp)
except Exception as exc:
pass
# update mask
temp.update_mask()
# return the extended spatial object
return temp
[docs]
def squeeze(self):
"""
Remove singleton dimensions from a ``spatial`` object
"""
# squeeze singleton dimensions
self.time = np.squeeze(self.time)
self.month = np.squeeze(self.month)
# attempt to squeeze possible data variables
for key in ['data','mask','error']:
try:
setattr(self, key, np.squeeze(getattr(self, key)))
except Exception as exc:
pass
# update mask
self.update_mask()
return self
[docs]
def index(self, indice, date=True):
"""
Subset a ``spatial`` object to specific index
Parameters
----------
indice: int
index in matrix for subsetting
date: bool, default True
``spatial`` objects contain date information
"""
# output spatial object
temp = spatial(fill_value=self.fill_value)
# attempt to subset possible data variables
for key in ['data','mask','error']:
try:
tmp = getattr(self, key)
setattr(temp, key, tmp[:,:,indice].copy())
except Exception as exc:
pass
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# subset output dates
if date:
temp.time = self.time[indice].copy()
temp.month = self.month[indice].copy()
# subset filenames if applicable
if getattr(self, 'filename'):
if isinstance(self.filename, (list, tuple, np.ndarray)):
temp.filename = str(self.filename[indice])
elif isinstance(self.filename, str):
temp.filename = copy.copy(self.filename)
# remove singleton dimensions if importing a single value
return temp.squeeze()
[docs]
def subset(self, months):
"""
Subset a ``spatial`` object to specific GRACE/GRACE-FO months
Parameters
----------
months: int
GRACE/GRACE-FO to subset
"""
# check if months is an array or a single value
months = np.atleast_1d(months)
# number of months
n = len(months)
# check that all months are available
months_check = list(set(months) - set(self.month))
if months_check:
m = ','.join([f'{m:03d}' for m in months_check])
raise IOError(f'GRACE/GRACE-FO months {m} not Found')
# indices to sort data objects
months_list = [i for i,m in enumerate(self.month) if m in months]
# output spatial object
temp = self.zeros_like()
# create output spatial object
temp.data = np.zeros((self.shape[0],self.shape[1],n))
temp.mask = np.zeros((self.shape[0],self.shape[1],n), dtype=bool)
# create output spatial error
try:
getattr(self, 'error')
temp.error = np.zeros((self.shape[0],self.shape[1],n))
except AttributeError:
pass
# allocate for output dates
temp.time = np.zeros((n))
temp.month = np.zeros((n),dtype=np.int64)
temp.filename = []
# for each indice
for t,i in enumerate(months_list):
temp.data[:,:,t] = self.data[:,:,i].copy()
temp.mask[:,:,t] = self.mask[:,:,i].copy()
try:
temp.error[:,:,t] = self.error[:,:,i].copy()
except AttributeError:
pass
# copy time dimensions
temp.time[t] = self.time[i].copy()
temp.month[t] = self.month[i].copy()
# subset filename
if getattr(self, 'filename'):
if isinstance(self.filename, (list, tuple, np.ndarray)):
temp.filename.append(str(self.filename[i]))
elif isinstance(self.filename, str):
temp.filename.append(self.filename)
# remove singleton dimensions if importing a single value
return temp.squeeze()
[docs]
def offset(self, var):
"""
Offset a ``spatial`` object by a constant
Parameters
----------
var: float
scalar value to which the ``spatial`` object will be offset
"""
temp = self.copy()
# offset by a single constant or a time-variable scalar
if (np.ndim(var) == 0):
temp.data = self.data + var
elif (np.ndim(var) == 1) and (self.ndim == 2):
n = len(var)
temp.data = np.zeros((temp.shape[0],temp.shape[1],n))
temp.mask = np.zeros((temp.shape[0],temp.shape[1],n),dtype=bool)
for i,v in enumerate(var):
temp.data[:,:,i] = self.data[:,:] + v
temp.mask[:,:,i] = np.copy(self.mask[:,:])
elif (np.ndim(var) == 1) and (self.ndim == 3):
for i,v in enumerate(var):
temp.data[:,:,i] = self.data[:,:,i] + v
elif (np.ndim(var) == 2) and (self.ndim == 2):
temp.data = self.data + var
elif (np.ndim(var) == 2) and (self.ndim == 3):
for i,t in enumerate(self.time):
temp.data[:,:,i] = self.data[:,:,i] + var
elif (np.ndim(var) == 3) and (self.ndim == 3):
for i,t in enumerate(self.time):
temp.data[:,:,i] = self.data[:,:,i] + var[:,:,i]
# update mask
temp.update_mask()
return temp
[docs]
def scale(self, var):
"""
Multiply a ``spatial`` object by a constant
Parameters
----------
var: float
scalar value to which the ``spatial`` object will be multiplied
"""
temp = self.copy()
# multiply by a single constant or a time-variable scalar
if (np.ndim(var) == 0):
temp.data = var*self.data
elif (np.ndim(var) == 1) and (self.ndim == 2):
n = len(var)
temp.data = np.zeros((temp.shape[0],temp.shape[1],n))
temp.mask = np.zeros((temp.shape[0],temp.shape[1],n),dtype=bool)
for i,v in enumerate(var):
temp.data[:,:,i] = v*self.data[:,:]
temp.mask[:,:,i] = np.copy(self.mask[:,:])
elif (np.ndim(var) == 1) and (self.ndim == 3):
for i,v in enumerate(var):
temp.data[:,:,i] = v*self.data[:,:,i]
elif (np.ndim(var) == 2) and (self.ndim == 2):
temp.data = var*self.data
elif (np.ndim(var) == 2) and (self.ndim == 3):
for i,t in enumerate(self.time):
temp.data[:,:,i] = var*self.data[:,:,i]
elif (np.ndim(var) == 3) and (self.ndim == 3):
for i,t in enumerate(self.time):
temp.data[:,:,i] = var[:,:,i]*self.data[:,:,i]
# update mask
temp.update_mask()
return temp
[docs]
def mean(self, apply=False, indices=Ellipsis):
"""
Compute mean spatial field and remove from data if specified
Parameters
----------
apply: bool, default False
remove the mean field from the input ``spatial`` object
indices: int, default Ellipsis
indices of input ``spatial`` object to compute mean
"""
# output spatial object
temp = spatial(nlon=self.shape[0],nlat=self.shape[1],
fill_value=self.fill_value)
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# create output mean spatial object
temp.data = np.mean(self.data[:,:,indices],axis=2)
temp.mask = np.any(self.mask[:,:,indices],axis=2)
# calculate the mean time
try:
val = getattr(self, 'time')
temp.time = np.mean(val[indices])
except (AttributeError,TypeError):
pass
# calculate the spatial anomalies by removing the mean field
if apply:
for i,t in enumerate(self.time):
self.data[:,:,i] -= temp.data[:,:]
# update mask
temp.update_mask()
return temp
[docs]
def flip(self, axis=0):
"""
Reverse the order of data and dimensions along an axis
Parameters
----------
axis: int, default 0
axis to reorder
"""
# output spatial object
temp = self.copy()
# copy dimensions and reverse order
if (axis == 0):
temp.lat = temp.lat[::-1].copy()
elif (axis == 1):
temp.lon = temp.lon[::-1].copy()
elif (axis == 2):
temp.time = temp.time[::-1].copy()
# attempt to reverse possible data variables
for key in ['data','mask','error']:
try:
setattr(temp, key, np.flip(getattr(self, key), axis=axis))
except Exception as exc:
pass
# update mask
temp.update_mask()
return temp
[docs]
def transpose(self, axes=None):
"""
Transpose or permute the axes of a ``spatial`` object
Parameters
----------
axis: int or NoneType, default None
order of the output axes
"""
# output spatial object
temp = self.copy()
# attempt to transpose possible data variables
for key in ['data','mask','error']:
try:
setattr(temp, key, np.transpose(getattr(self, key), axes=axes))
except Exception as exc:
pass
# update mask
temp.update_mask()
return temp
[docs]
def sum(self, power=1):
"""
Compute summation of a ``spatial`` object
Parameters
----------
power: int, default 1
apply a power before calculating summation
"""
# output spatial object
temp = spatial(nlon=self.shape[0],nlat=self.shape[1],
fill_value=self.fill_value)
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# create output summation spatial object
temp.data = np.sum(np.power(self.data,power),axis=2)
temp.mask = np.any(self.mask,axis=2)
# update mask
temp.update_mask()
return temp
[docs]
def power(self, power):
"""
Raise a ``spatial`` object to a power
Parameters
----------
power: int
power to which the ``spatial`` object will be raised
"""
temp = self.copy()
temp.data = np.power(self.data,power)
return temp
[docs]
def max(self):
"""
Compute maximum value of a ``spatial`` object
"""
# output spatial object
temp = spatial(nlon=self.shape[0],nlat=self.shape[1],
fill_value=self.fill_value)
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# create output maximum spatial object
temp.data = np.max(self.data,axis=2)
temp.mask = np.any(self.mask,axis=2)
# update mask
temp.update_mask()
return temp
[docs]
def min(self):
"""
Compute minimum value of a ``spatial`` object
"""
# output spatial object
temp = spatial(nlon=self.shape[0],nlat=self.shape[1],
fill_value=self.fill_value)
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# create output minimum spatial object
temp.data = np.min(self.data,axis=2)
temp.mask = np.any(self.mask,axis=2)
# update mask
temp.update_mask()
return temp
[docs]
def replace_invalid(self, fill_value, mask=None):
"""
Replace the masked values with a new ``fill_value``
Parameters
----------
fill_value: float
Replacement invalid value
mask: bool or NoneType, default None
Update the current mask
"""
# validate current mask
self.update_mask()
# update the mask if specified
if mask is not None:
if (np.shape(mask) == self.shape):
self.mask |= mask
elif (np.ndim(mask) == 2) & (self.ndim == 3):
# broadcast mask over third dimension
temp = np.repeat(mask[:,:,np.newaxis],self.shape[2],axis=2)
self.mask |= temp
# update the fill value
self.fill_value = fill_value
# replace invalid values with new fill value
self.data[self.mask] = self.fill_value
if hasattr(self, 'error'):
self.error[self.mask] = self.fill_value
return self
[docs]
def replace_masked(self):
"""
Replace the masked values with ``fill_value``
"""
if (self.fill_value is not None):
self.data[self.mask] = self.fill_value
if (self.fill_value is not None) and hasattr(self, 'error'):
self.error[self.mask] = self.fill_value
return self
@property
def dtype(self):
"""Main data type of ``spatial`` object"""
return self.data.dtype
@property
def spacing(self):
"""Step size of ``spatial`` object ``[longitude,latitude]``
"""
dlat = np.abs(self.lat[1] - self.lat[0])
dlon = np.abs(self.lon[1] - self.lon[0])
return (dlon,dlat)
@property
def extent(self):
"""Bounds of ``spatial`` object
``[minimum longitude, maximum longitude,
minimum latitude, maximum latitude]``
"""
lonmin = np.min(self.lon)
lonmax = np.max(self.lon)
latmin = np.min(self.lat)
latmax = np.max(self.lat)
return [lonmin, lonmax, latmin, latmax]
@property
def shape(self):
"""Dimensions of ``spatial`` object
"""
return np.shape(self.data)
@property
def ndim(self):
"""Number of dimensions in ``spatial`` object
"""
return np.ndim(self.data)
def __str__(self):
"""String representation of the ``spatial`` object
"""
properties = ['gravity_toolkit.spatial']
extent = ', '.join(map(str, self.extent))
properties.append(f" extent: {extent}")
shape = ', '.join(map(str, self.shape))
properties.append(f" shape: {shape}")
if self.month:
properties.append(f" start_month: {min(self.month)}")
properties.append(f" end_month: {max(self.month)}")
return '\n'.join(properties)
def __len__(self):
"""Number of months
"""
return len(self.month) if np.any(self.month) else 0
def __iter__(self):
"""Iterate over GRACE/GRACE-FO months
"""
self.__index__ = 0
return self
def __next__(self):
"""Get the next month of data
"""
# output spatial object
temp = spatial(fill_value=self.fill_value)
# subset output spatial field and dates
try:
temp.data = self.data[:,:,self.__index__].copy()
temp.mask = self.mask[:,:,self.__index__].copy()
except IndexError as exc:
raise StopIteration from exc
# subset output spatial time and month
try:
temp.time = self.time[self.__index__].copy()
temp.month = self.month[self.__index__].copy()
except AttributeError as exc:
pass
# subset output spatial error
try:
temp.error = self.error[:,:,self.__index__].copy()
except AttributeError as exc:
pass
# subset filename
if getattr(self, 'filename'):
if isinstance(self.filename, (list, tuple, np.ndarray)):
temp.filename = str(self.filename[self.__index__])
elif isinstance(self.filename, str):
temp.filename = copy.copy(self.filename)
# copy dimensions
temp.lon = self.lon.copy()
temp.lat = self.lat.copy()
# add to index
self.__index__ += 1
return temp
# PURPOSE: additional routines for the spatial module
# for outputting scaling factor data
[docs]
class scaling_factors(spatial):
"""
Inheritance of ``spatial`` class for outputting scaling factors
Attributes
----------
data: float
spatial scaling factor data
error: float
spatial scaling factor errors
magnitude: float
spatial magnitude of the original data
mask: bool
spatial grid mask
x: float
x-coordinate array
y: float
y-coordinate array
lon: float
grid longitudes
lat: float
grid latitudes
fill_value: float or NoneType, default None
invalid value for spatial grid data
attributes: dict
attributes of spatial variables
extent: list, default [None,None,None,None]
spatial grid bounds
``[minimum x, maximum x, minimum y, maximum y]``
spacing: list, default [None,None]
grid step size ``[x, y]``
shape: tuple
dimensions of spatial object
ndim: int
number of dimensions of spatial object
filename: str
input or output filename
"""
np.seterr(invalid='ignore')
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.error = None
self.magnitude = None
[docs]
def from_ascii(self, filename, **kwargs):
"""
Read a ``scaling_factors`` object from an ascii file
Parameters
----------
filename: str
full path of input ascii file
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
spacing: list, default [None,None]
grid step size ``[longitude,latitude]``
extent: list, default [None,None,None,None]
spatial grid bounds
``[minimum longitude, maximum longitude,
minimum latitude, maximum latitude]``
nlat: int or NoneType, default None
length of latitude dimension
nlon: int or NoneType, default None
length of longitude dimension
columns: list, default ['lon','lat','kfactor','error','magnitude']
variable names for each column
header: int, default 0
Number of rows of header lines to skip
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('verbose',False)
kwargs.setdefault('compression',None)
kwargs.setdefault('spacing',[None,None])
kwargs.setdefault('nlat',None)
kwargs.setdefault('nlon',None)
kwargs.setdefault('extent',[None]*4)
default_columns = ['lon','lat','kfactor','error','magnitude']
kwargs.setdefault('columns',default_columns)
kwargs.setdefault('header',0)
# open the ascii file and extract contents
logging.info(str(self.filename))
if (kwargs['compression'] == 'gzip'):
# read input ascii data from gzip compressed file and split lines
with gzip.open(self.filename, mode='r') as f:
file_contents = f.read().decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'zip'):
# read input ascii data from zipped file and split lines
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
file_contents = z.read(stem).decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'bytes'):
# read input file object and split lines
file_contents = self.filename.read().splitlines()
else:
# read input ascii file (.txt, .asc) and split lines
with self.filename.open(mode='r', encoding='utf8') as f:
file_contents = f.read().splitlines()
# compile regular expression operator for extracting numerical values
# from input ascii files of spatial data
regex_pattern = r'[-+]?(?:(?:\d*\.\d+)|(?:\d+\.?))(?:[EeD][+-]?\d+)?'
rx = re.compile(regex_pattern, re.VERBOSE)
# output spatial dimensions
if (None not in kwargs['extent']) and kwargs['nlat'] and kwargs['nlon']:
extent = kwargs.get('extent')
self.lat = np.linspace(extent[3], extent[2], kwargs['nlat'])
self.lon = np.linspace(extent[0], extent[1], kwargs['nlon'])
dlon = np.abs(self.lon[1] - self.lon[0])
dlat = np.abs(self.lat[1] - self.lat[0])
elif (None not in kwargs['extent']) and (None not in kwargs['spacing']):
extent = kwargs.get('extent')
dlon, dlat = kwargs.get('spacing')
self.lat = np.arange(extent[3], extent[2] - dlat, dlat)
self.lon = np.arange(extent[0], extent[1] + dlon, dlon)
elif kwargs['nlat'] and kwargs['nlon'] and (None not in kwargs['spacing']):
dlon, dlat = kwargs.get('spacing')
self.lat = np.zeros((kwargs['nlat']))
self.lon = np.zeros((kwargs['nlon']))
else:
raise ValueError('Unknown dimensions for input ``spatial`` object')
# get spatial dimensions
nlat = len(self.lat)
nlon = len(self.lon)
# output spatial data
self.data = np.zeros((nlat, nlon))
self.mask = np.zeros((nlat, nlon), dtype=bool)
# remove time from list of column names if not date
columns = [c for c in kwargs['columns'] if (c != 'time')]
# extract spatial data array and convert to matrix
# for each line in the file
header = kwargs['header']
for line in file_contents[header:]:
# extract columns of interest and assign to dict
# convert fortran exponentials if applicable
d = {c:r.replace('D','E') for c,r in zip(columns,rx.findall(line))}
# convert line coordinates to integers
ilon = np.int64(np.float64(d['lon'])/dlon)
ilat = np.int64((90.0-np.float64(d['lat']))//dlat)
# get scaling factor, error and magnitude
self.data[ilat,ilon] = np.float64(d['data'])
self.error[ilat,ilon] = np.float64(d['error'])
self.magnitude[ilat,ilon] = np.float64(d['magnitude'])
# set mask
self.mask[ilat,ilon] = False
# set latitude and longitude
self.lon[ilon] = np.float64(d['lon'])
self.lat[ilat] = np.float64(d['lat'])
# update mask
self.update_mask()
return self
[docs]
def to_ascii(self, filename, **kwargs):
"""
Write a ``scaling_factors`` object to ascii file
Parameters
----------
filename: str
full path of output ascii file
verbose: bool, default False
Output file and variable information
"""
self.filename = pathlib.Path(filename).expanduser().absolute()
# set default verbosity and parameters
kwargs.setdefault('verbose',False)
logging.info(str(self.filename))
# open the output file
fid = self.filename.open(mode='w', encoding='utf8')
# write to file for each valid latitude and longitude
ii,jj = np.nonzero((self.data != self.fill_value) & (~self.mask))
for i,j in zip(ii,jj):
print((f'{self.lon[j]:10.4f} {self.lat[i]:10.4f} '
f'{self.data[i,j]:12.4f} {self.error[i,j]:12.4f} '
f'{self.magnitude[i,j]:12.4f}'), file=fid)
# close the output file
fid.close()
[docs]
def kfactor(self, var):
"""
Calculate the scaling factor and scaling factor errors
from two ``spatial`` or ``scaling_factors`` objects
following :cite:t:`Landerer:2012kf` and :cite:p:`Hsu:2017hd`
Parameters
----------
var: obj
``spatial`` object to used for scaling
Returns
-------
temp: obj
scaling factor, scaling error and magnitude
"""
# copy to not modify original inputs
temp1 = self.copy()
temp2 = var.copy()
# expand dimensions and replace invalid values with 0
temp1.expand_dims().replace_invalid(0.0)
temp2.expand_dims().replace_invalid(0.0)
# dimensions of input spatial object
nlat, nlon, nt = temp1.shape
# allocate for scaling factor and scaling factor error
temp = scaling_factors(nlat=nlat, nlon=nlon, fill_value=0.0)
temp.data = np.zeros((nlat, nlon))
temp.error = np.zeros((nlat, nlon))
# copy latitude and longitude variables
temp.lon = np.copy(temp1.lon)
temp.lat = np.copy(temp1.lat)
# find valid data points and set mask
temp.mask = np.any(temp1.mask | temp2.mask, axis=2)
indy,indx = np.nonzero(np.logical_not(temp.mask))
# calculate point-based scaling factors as centroids
val1 = np.sum(temp1.data[indy,indx,:]*temp2.data[indy,indx,:],axis=1)
val2 = np.sum(temp1.data[indy,indx,:]**2,axis=1)
temp.data[indy,indx] = val1/val2
# calculate difference between scaled and original
variance = temp1.scale(temp.data).offset(-temp2.data)
# calculate scaling factor errors as RMS of variance
temp.error = np.sqrt((variance.sum(power=2).data)/nt)
# calculate magnitude of original data
temp.magnitude = temp2.sum(power=2.0).power(0.5).data[:]
# update mask
temp.update_mask()
# return the scaling factors and scaling factor errors
return temp
[docs]
def update_mask(self):
"""
Update the mask of the ``scaling_factors`` object
"""
if self.fill_value is not None:
self.mask |= (self.data == self.fill_value)
self.mask |= np.isnan(self.data)
self.data[self.mask] = self.fill_value
# replace fill values within scaling factor errors
if getattr(self, 'error') is not None:
self.error[self.mask] = self.fill_value
# replace fill values within scaling factor magnitudes
if getattr(self, 'magnitude') is not None:
self.magnitude[self.mask] = self.fill_value
return self
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)