#!/usr/bin/env python
u"""
harmonics.py
Written by Tyler Sutterley (06/2024)
Contributions by Hugo Lecomte
Spherical harmonic data class for processing GRACE/GRACE-FO Level-2 data
PYTHON DEPENDENCIES:
numpy: Scientific Computing Tools For Python
https://numpy.org
https://numpy.org/doc/stable/user/numpy-for-matlab-users.html
dateutil: powerful extensions to datetime
https://dateutil.readthedocs.io/en/stable/
netCDF4: Python interface to the netCDF C library
https://unidata.github.io/netcdf4-python/netCDF4/index.html
h5py: Pythonic interface to the HDF5 binary data format.
https://www.h5py.org/
PROGRAM DEPENDENCIES:
time.py: utilities for calculating time operations
read_GRACE_harmonics.py: read spherical harmonic data from SHM files
read_gfc_harmonics.py: reads spherical harmonic data from gfc files
read_ICGEM_harmonics.py: reads gravity model coefficients from GFZ ICGEM
destripe_harmonics.py: filters spherical harmonics for correlated errors
UPDATE HISTORY:
Updated 06/2024: use wrapper to importlib for optional dependencies
Updated 05/2024: make subscriptable and allow item assignment
Updated 10/2023: place time and month variables in try/except block
Updated 09/2023: prevent double printing of filenames when using debug
Updated 08/2023: add string representation of the harmonics object
Updated 06/2023: fix GRACE/GRACE-FO months in drift function
Updated 05/2023: use reify decorators for complex form and amplitude
use pathlib to define and operate on paths
Updated 03/2023: customizable file-level attributes to netCDF4 and HDF5
add attributes fetching to the from_dict and to_dict functions
retrieve all root attributes from HDF5 and netCDF4 datasets
only attempt to squeeze from final dimension in harmonics objects
add indexing of filenames to harmonics object iterator
use copy.copy and not numpy.copy in copy harmonics object function
convert shape and ndim to harmonic class properties
improve typing for variables in docstrings
set case insensitive filename to None if filename is empty
Updated 02/2023: fix expand case where data is a single degree
fixed case where maximum spherical harmonic degree is 0
use monospaced text for harmonics objects in docstrings
Updated 01/2023: made amplitude a property of the harmonics class
added property for the complex form of the spherical harmonics
Updated 12/2022: add software information to output HDF5 and netCDF4
moved GIA model reader to be an inherited class of harmonics
make harmonics objects iterable and with length
added function for creating sized empty harmonic objects
Updated 11/2022: use f-strings for formatting verbose or ascii output
Updated 04/2022: updated docstrings to numpy documentation format
using internal netCDF4 and HDF5 readers and writers
added function for converting to a python dictionary
include utf-8 encoding in reads to be windows compliant
added GIA model reader and drift functions
include filename attribute when modifying harmonic objects
allow input ascii files to have additional columns
Updated 12/2021: logging case_insensitive_filename output for debugging
Updated 11/2021: kwargs to index, netCDF4 and HDF5 read functions
Updated 10/2021: using python logging for handling verbose output
Updated 09/2021: added time-variable gravity data from gfc format
use functions for converting to and from GRACE months
Updated 08/2021: added from spherical harmonic model (SHM) format
Updated 07/2021: fixed gfc format in from file wrapper
Updated 05/2021: define int/float precision to prevent deprecation warning
Updated 04/2021: add parser object for removing commented or empty lines
use file not found exceptions in case insensitive filename
Updated 02/2021: added degree amplitude function
use adjust_months function to fix special cases of GRACE/GRACE-FO months
added generic reader, generic writer and write to list functions
generalize ascii, netCDF4 and HDF5 readers and writers
replaced numpy bool to prevent deprecation warning
Updated 12/2020: added verbose option for gfc files
can calculate spherical harmonic mean over a range of time indices
will also calculate the mean time and month of a harmonics object
can create a harmonics object from an open file-like object
Updated 08/2020: added compression options for ascii, netCDF4 and HDF5 files
Updated 07/2020: added class docstring and using kwargs for output to file
added case_insensitive_filename function to search directories
Updated 06/2020: output list of filenames with from_list()
zeros_like() creates a new harmonics object with dimensions of another
add ndim and shape attributes of harmonics objects
Updated 04/2020: added from_gfc to read static gravity model coefficients
add to_ascii and iterate over temporal fields in convolve and destripe
make date optional for harmonic read functions. add more math functions
add option to sort if reading from an index or merging a list
add options to flatten and expand harmonics matrices or arrays
Written 03/2020
"""
from __future__ import print_function, division
import re
import io
import copy
import gzip
import time
import uuid
import logging
import pathlib
import zipfile
import numpy as np
import gravity_toolkit.version
from gravity_toolkit.time import adjust_months,calendar_to_grace
from gravity_toolkit.destripe_harmonics import destripe_harmonics
from gravity_toolkit.read_gfc_harmonics import read_gfc_harmonics
from gravity_toolkit.read_GRACE_harmonics import read_GRACE_harmonics
from gravity_toolkit.utilities import import_dependency, reify
# attempt imports
geoidtk = import_dependency('geoid_toolkit')
h5py = import_dependency('h5py')
netCDF4 = import_dependency('netCDF4')
sparse = import_dependency('sparse')
[docs]
class harmonics(object):
"""
Data class for reading, writing and processing spherical harmonic data
Attributes
----------
lmax: int
maximum degree of the spherical harmonic field
mmax: int
maximum order of the spherical harmonic field
clm: np.ndarray
cosine spherical harmonics
slm: np.ndarray
sine spherical harmonics
time: np.ndarray
time variable of the spherical harmonics
month: np.ndarray
GRACE/GRACE-FO months variable of the spherical harmonics
attributes: dict
attributes of ``harmonics`` variables
filename: str
input or output filename
flattened: bool
``harmonics`` object is compressed into arrays
"""
np.seterr(invalid='ignore')
def __init__(self, **kwargs):
# set default keyword arguments
kwargs.setdefault('lmax',None)
kwargs.setdefault('mmax',None)
# set default class attributes
self.clm=None
self.slm=None
self.time=None
self.month=None
self.lmax=kwargs['lmax']
self.mmax=kwargs['mmax']
# calculate spherical harmonic degree and order (0 is falsy)
self.l=np.arange(self.lmax+1) if (self.lmax is not None) else None
self.m=np.arange(self.mmax+1) if (self.mmax is not None) else None
self.attributes=dict()
self.filename=None
self.flattened=False
# iterator
self.__index__ = 0
[docs]
def case_insensitive_filename(self, filename):
"""
Searches a directory for a filename without case dependence
Parameters
----------
filename: str, io.IOBase, pathlib.Path or None
input filename
"""
# check if filename is open file object
if isinstance(filename, io.IOBase):
self.filename = copy.copy(filename)
elif isinstance(filename, type(None)) or not bool(filename):
self.filename = None
else:
# tilde-expand input filename
self.filename = pathlib.Path(filename).expanduser().absolute()
# check if file presently exists with input case
if not self.filename.exists():
# search for filename without case dependence
f = [f.name for f in self.filename.parent.iterdir() if
re.match(self.filename.name, f.name, re.I)]
if not f:
msg = f'{filename} not found in file system'
raise FileNotFoundError(msg)
self.filename = self.filename.with_name(f.pop())
# return the filename
return self
[docs]
def compressuser(self, filename=None):
"""
Tilde-compresses a file to be relative to the home directory
Parameters
----------
filename: str or None, default None
output filename
"""
if filename is None:
filename = self.filename
else:
filename = pathlib.Path(filename).expanduser().absolute()
# attempt to compress filename relative to home directory
try:
relative_to = filename.relative_to(pathlib.Path().home())
except (ValueError, AttributeError) as exc:
return filename
else:
return pathlib.Path('~').joinpath(relative_to)
[docs]
def from_ascii(self, filename, **kwargs):
"""
Read a ``harmonics`` object from an ascii file
Parameters
----------
filename: str
full path of input ascii file
date: bool, default True
ascii file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',True)
kwargs.setdefault('verbose',False)
kwargs.setdefault('compression',None)
# open the ascii file and extract contents
logging.info(self.filename)
if (kwargs['compression'] == 'gzip'):
# read input ascii data from gzip compressed file and split lines
with gzip.open(self.filename, mode='r') as f:
file_contents = f.read().decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'zip'):
# read input ascii data from zipped file and split lines
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
file_contents = z.read(stem).decode('ISO-8859-1').splitlines()
elif (kwargs['compression'] == 'bytes'):
# read input file object and split lines
file_contents = self.filename.read().splitlines()
else:
# read input ascii file (.txt, .asc) and split lines
with open(self.filename, mode='r', encoding='utf8') as f:
file_contents = f.read().splitlines()
# compile regular expression operator for extracting numerical values
# from input ascii files of spherical harmonics
regex_pattern = r'[-+]?(?:(?:\d*\.\d+)|(?:\d+\.?))(?:[EeD][+-]?\d+)?'
rx = re.compile(regex_pattern, re.VERBOSE)
# find maximum degree and order of harmonics
self.lmax = 0
self.mmax = 0
# for each line in the file
for line in file_contents:
l1,m1,clm1,slm1,*aux = rx.findall(line)
# convert line degree and order to integers
l1,m1 = np.array([l1,m1],dtype=np.int64)
self.lmax = np.copy(l1) if (l1 > self.lmax) else self.lmax
self.mmax = np.copy(m1) if (m1 > self.mmax) else self.mmax
# output spherical harmonics data
self.clm = np.zeros((self.lmax+1,self.mmax+1))
self.slm = np.zeros((self.lmax+1,self.mmax+1))
# if the ascii file contains date variables
if kwargs['date']:
self.time = np.float64(aux[0])
self.month = np.int64(calendar_to_grace(self.time))
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# extract harmonics and convert to matrix
# for each line in the file
for line in file_contents:
l1,m1,clm1,slm1,*aux = rx.findall(line)
# convert line degree and order to integers
ll,mm = np.array([l1,m1],dtype=np.int64)
# convert fortran exponentials if applicable
self.clm[ll,mm] = np.float64(clm1.replace('D','E'))
self.slm[ll,mm] = np.float64(slm1.replace('D','E'))
# assign degree and order fields
self.update_dimensions()
return self
[docs]
def from_netCDF4(self, filename, **kwargs):
"""
Read a ``harmonics`` object from a netCDF4 file
Parameters
----------
filename: str
full path of input netCDF4 file
date: bool, default True
netCDF4 file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',True)
kwargs.setdefault('verbose',False)
kwargs.setdefault('compression',None)
# Open the NetCDF4 file for reading
if (kwargs['compression'] == 'gzip'):
# read as in-memory (diskless) netCDF4 dataset
with gzip.open(self.filename, mode='r') as f:
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=f.read())
elif (kwargs['compression'] == 'zip'):
# read zipped file and extract file into in-memory file object
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
# first try finding a netCDF4 file with same base filename
# if none found simply try searching for a netCDF4 file
try:
f,=[f for f in z.namelist() if re.match(stem,f,re.I)]
except:
f,=[f for f in z.namelist() if re.search(r'\.nc(4)?$',f)]
# read bytes from zipfile as in-memory (diskless) netCDF4 dataset
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=z.read(f))
elif (kwargs['compression'] == 'bytes'):
# read as in-memory (diskless) netCDF4 dataset
fileID = netCDF4.Dataset(uuid.uuid4().hex, memory=filename.read())
else:
# read netCDF4 dataset
fileID = netCDF4.Dataset(self.filename, mode='r')
# Output NetCDF file information
logging.info(fileID.filepath())
logging.info(list(fileID.variables.keys()))
# read flattened spherical harmonics
temp = harmonics()
temp.filename = copy.copy(self.filename)
# create list of variables to retrieve
fields = ['l','m','clm','slm']
# retrieve date variables if specified
if kwargs['date']:
fields.extend(['time','month'])
# Getting the data from each NetCDF variable
for field in fields:
setattr(temp, field, fileID.variables[field][:].copy())
# calculate maximum degree and order
temp.lmax = np.max(temp.l)
temp.mmax = np.max(temp.m)
# expand the spherical harmonics to dimensions
self = temp.expand(date=kwargs['date'])
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# attributes of clm/slm and included variables
for key in fields:
# attempt to get attribute for variable
try:
self.attributes[key] = [
fileID.variables[key].units,
fileID.variables[key].long_name
]
except (KeyError,ValueError,AttributeError):
pass
# get global netCDF4 attributes
self.attributes['ROOT'] = {}
for att_name in fileID.ncattrs():
self.attributes['ROOT'][att_name] = fileID.getncattr(att_name)
# Closing the NetCDF file
fileID.close()
# remove singleton dimensions and
# assign degree and order fields
self.squeeze(update_dimensions=True)
return self
[docs]
def from_HDF5(self, filename, **kwargs):
"""
Read a ``harmonics`` object from a HDF5 file
Parameters
----------
filename: str
full path of input HDF5 file
date: bool, default True
HDF5 file has date information
compression: str or NoneType, default None
file compression type
- ``'gzip'``
- ``'zip'``
- ``'bytes'``
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',True)
kwargs.setdefault('verbose',False)
kwargs.setdefault('compression',None)
# Open the HDF5 file for reading
if (kwargs['compression'] == 'gzip'):
# read gzip compressed file and extract into in-memory file object
with gzip.open(self.filename, mode='r') as f:
fid = io.BytesIO(f.read())
# set filename of BytesIO object
fid.filename = self.filename.name
# rewind to start of file
fid.seek(0)
# read as in-memory (diskless) HDF5 dataset from BytesIO object
fileID = h5py.File(fid, mode='r')
elif (kwargs['compression'] == 'zip'):
# read zipped file and extract file into in-memory file object
stem = self.filename.stem
with zipfile.ZipFile(self.filename) as z:
# first try finding a HDF5 file with same base filename
# if none found simply try searching for a HDF5 file
try:
f,=[f for f in z.namelist() if re.match(stem,f,re.I)]
except:
f,=[f for f in z.namelist() if re.search(r'\.H(DF)?5$',f,re.I)]
# read bytes from zipfile into in-memory BytesIO object
fid = io.BytesIO(z.read(f))
# set filename of BytesIO object
fid.filename = self.filename.name
# rewind to start of file
fid.seek(0)
# read as in-memory (diskless) HDF5 dataset from BytesIO object
fileID = h5py.File(fid, mode='r')
elif (kwargs['compression'] == 'bytes'):
# read as in-memory (diskless) HDF5 dataset
fileID = h5py.File(self.filename, mode='r')
else:
# read HDF5 dataset
fileID = h5py.File(self.filename, mode='r')
# Output HDF5 file information
logging.info(fileID.filename)
logging.info(list(fileID.keys()))
# read flattened spherical harmonics
temp = harmonics()
temp.filename = copy.copy(self.filename)
# create list of variables to retrieve
fields = ['l','m','clm','slm']
# retrieve date variables if specified
if kwargs['date']:
fields.extend(['time','month'])
# Getting the data from each HDF5 variable
for field in fields:
setattr(temp, field, fileID[field][:].copy())
# calculate maximum degree and order
temp.lmax = np.max(temp.l)
temp.mmax = np.max(temp.m)
# expand the spherical harmonics to dimensions
self = temp.expand(date=kwargs['date'])
# adjust months to fix special cases if necessary
self.month = adjust_months(self.month)
# attributes of clm/slm and included variables
for key in fields:
# attempt to get attribute for variable
try:
self.attributes[key] = [
fileID[key].attrs['units'],
fileID[key].attrs['long_name']
]
except (KeyError, AttributeError):
pass
# get global HDF5 attributes
self.attributes['ROOT'] = {}
for att_name,att_val in fileID.attrs.items():
self.attributes['ROOT'][att_name] = att_val
# Closing the HDF5 file
fileID.close()
# remove singleton dimensions and
# assign degree and order fields
self.squeeze(update_dimensions=True)
return self
[docs]
def from_gfc(self, filename, **kwargs):
"""
Read a ``harmonics`` object from a gfc gravity model file
Parameters
----------
filename: str
full path of input gfc file
date: bool, default True
gfc file has date information
tide: str or NoneType, default None
permanent tide system of output gravity fields
- ``'tide_free'``: no permanent direct and indirect tidal potentials
- ``'mean_tide'``: permanent tidal potentials (direct and indirect)
- ``'zero_tide'``: permanent direct tidal potential removed
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default parameters
kwargs.setdefault('date',False)
kwargs.setdefault('tide',None)
kwargs.setdefault('verbose',False)
# read data from gfc file
if kwargs['date']:
Ylms = read_gfc_harmonics(self.filename,
TIDE=kwargs['tide'])
else:
Ylms = geoidtk.read_ICGEM_harmonics(self.filename,
TIDE=kwargs['tide'])
# Output file information
logging.info(self.filename)
logging.info(list(Ylms.keys()))
# copy variables for gravity model
self.clm = Ylms['clm'].copy()
self.slm = Ylms['slm'].copy()
self.lmax = np.int64(Ylms['max_degree'])
self.mmax = np.int64(Ylms['max_degree'])
# copy date variables
if kwargs['date']:
self.time = Ylms['time'].copy()
# adjust months to fix special cases if necessary
self.month = adjust_months(Ylms['month'])
# geophysical parameters of gravity model
self.GM = np.float64(Ylms['earth_gravity_constant'])
self.R = np.float64(Ylms['radius'])
self.tide = Ylms['tide_system']
# assign degree and order fields
self.update_dimensions()
return self
[docs]
def from_SHM(self, filename, **kwargs):
"""
Read a ``harmonics`` object from a spherical harmonic model file
Parameters
----------
filename: str
full path of input spherical harmonic model file
MMAX: int or NoneType, default None
Maximum order of spherical harmonics
POLE_TIDE: bool, default False
correct GSM data for pole tide drift
verbose: bool, default False
print file and variable information
"""
# set filename
self.case_insensitive_filename(filename)
# set default keyword arguments
kwargs.setdefault('verbose',False)
# read data from SHM file
Ylms = read_GRACE_harmonics(self.filename, self.lmax, **kwargs)
# Output file information
logging.info(self.filename)
logging.info(list(Ylms.keys()))
# copy variables for gravity model
self.clm = Ylms['clm'].copy()
self.slm = Ylms['slm'].copy()
self.time = Ylms['time'].copy()
self.month = np.int64(calendar_to_grace(self.time))
# copy header information for gravity model
self.header = Ylms['header']
# assign degree and order fields
self.update_dimensions()
return self
[docs]
def from_index(self, filename, **kwargs):
"""
Read a ``harmonics`` object from an index of ascii, netCDF4 or HDF5 files
Parameters
----------
filename: str
full path of index file
format: str or NoneType, default None
format of individual files within index
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
files contains date information
sort: bool, default True
sort ``harmonics`` objects by date information
"""
# set default keyword arguments
kwargs.setdefault('format',None)
kwargs.setdefault('date',True)
kwargs.setdefault('sort',True)
# set filename
self.case_insensitive_filename(filename)
# file parser for reading index files
# removes commented lines (can comment out files in the index)
# removes empty lines (if there are extra empty lines)
parser = re.compile(r'^(?!\#|\%|$)', re.VERBOSE)
# Read index file of input spherical harmonics
with open(self.filename, mode='r', encoding='utf8') as f:
file_list = [l for l in f.read().splitlines() if parser.match(l)]
# create a list of harmonic objects
h = []
# for each file in the index
for i,f in enumerate(file_list):
if (kwargs['format'] == 'ascii'):
# ascii (.txt)
h.append(harmonics().from_ascii(f, date=kwargs['date']))
elif (kwargs['format'] == 'netCDF4'):
# netcdf (.nc)
h.append(harmonics().from_netCDF4(f, date=kwargs['date']))
elif (kwargs['format'] == 'HDF5'):
# HDF5 (.H5)
h.append(harmonics().from_HDF5(f, date=kwargs['date']))
# create a single harmonic object from the list
return self.from_list(h,date=kwargs['date'],sort=kwargs['sort'])
[docs]
def from_list(self, object_list, **kwargs):
"""
Build a sorted ``harmonics`` object from a list of
other ``harmonics`` objects
Parameters
----------
object_list: list
list of ``harmonics`` objects to be merged
date: bool, default True
files contains date information
sort: bool, default True
sort ``harmonics`` objects by date information
clear: bool, default True
clear the list of ``harmonics`` objects from memory
"""
# set default keyword arguments
kwargs.setdefault('date',True)
kwargs.setdefault('sort',True)
kwargs.setdefault('clear',False)
# number of harmonic objects in list
n = len(object_list)
# indices to sort data objects if harmonics list contain dates
if kwargs['date'] and kwargs['sort']:
list_sort = np.argsort([d.time for d in object_list],axis=None)
else:
list_sort = np.arange(n)
# truncate to maximum degree and order
self.lmax = np.min([d.lmax for d in object_list])
self.mmax = np.min([d.mmax for d in object_list])
# create output harmonics
self.clm = np.zeros((self.lmax+1,self.mmax+1,n))
self.slm = np.zeros((self.lmax+1,self.mmax+1,n))
# create list of files
self.filename = []
# output dates
if kwargs['date']:
self.time = np.zeros((n))
self.month = np.zeros((n),dtype=np.int64)
# for each indice
for t,i in enumerate(list_sort):
self.clm[:,:,t] = object_list[i].clm[:self.lmax+1,:self.mmax+1]
self.slm[:,:,t] = object_list[i].slm[:self.lmax+1,:self.mmax+1]
if kwargs['date']:
self.time[t] = np.atleast_1d(object_list[i].time)
self.month[t] = np.atleast_1d(object_list[i].month)
# append filename to list
if getattr(object_list[i], 'filename'):
self.filename.append(object_list[i].filename)
# adjust months to fix special cases if necessary
if kwargs['date']:
self.month = adjust_months(self.month)
# assign degree and order fields
self.update_dimensions()
# clear the input list to free memory
if kwargs['clear']:
object_list = None
# return the single harmonic object
return self
[docs]
def from_file(self, filename, format=None, date=True, **kwargs):
"""
Read a ``harmonics`` object from a specified format
Parameters
----------
filename: str
full path of input file
format: str or NoneType, default None
file format
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
- ``'gfc'``
- ``'SHM'``
date: bool, default True
file contains date information
verbose: bool, default False
print file and variable information
**kwargs: dict
keyword arguments for input readers
"""
# set filename
self.case_insensitive_filename(filename)
# set default verbosity
kwargs.setdefault('verbose',False)
# read from file
if (format == 'ascii'):
# ascii (.txt)
return harmonics().from_ascii(filename, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
return harmonics().from_netCDF4(filename, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
return harmonics().from_HDF5(filename, date=date, **kwargs)
elif (format == 'gfc'):
# ICGEM gravity model (.gfc)
return harmonics().from_gfc(filename, **kwargs)
elif (format == 'SHM'):
# spherical harmonic model
return harmonics().from_SHM(filename, self.lmax, **kwargs)
[docs]
def from_dict(self, d, **kwargs):
"""
Convert a ``dict`` object to a ``harmonics`` object
Parameters
----------
d: dict
dictionary object to be converted
"""
# assign dictionary variables to self
for key in ['l','m','clm','slm','time','month']:
try:
setattr(self, key, d[key].copy())
except (AttributeError, KeyError):
pass
# maximum degree and order
self.lmax = np.max(d['l'])
self.mmax = np.max(d['m'])
# add attributes to root if in dictionary
self.attributes['ROOT'] = d.get('attributes')
# assign degree and order fields
self.update_dimensions()
return self
[docs]
def to_ascii(self, filename, date=True, **kwargs):
"""
Write a ``harmonics`` object to ascii file
Parameters
----------
filename: str
full path of output ascii file
date: bool, default True
``harmonics`` objects contain date information
verbose: bool, default False
Output file and variable information
"""
self.filename = pathlib.Path(filename).expanduser().absolute()
# set default verbosity
kwargs.setdefault('verbose',False)
logging.info(self.filename)
# open the output file
fid = open(self.filename, mode='w', encoding='utf8')
if date:
file_format = '{0:5d} {1:5d} {2:+21.12e} {3:+21.12e} {4:10.4f}'
else:
file_format = '{0:5d} {1:5d} {2:+21.12e} {3:+21.12e}'
# write to file for each spherical harmonic degree and order
for m in range(0, self.mmax+1):
for l in range(m, self.lmax+1):
args = (l, m, self.clm[l,m], self.slm[l,m], self.time)
print(file_format.format(*args), file=fid)
# close the output file
fid.close()
[docs]
def to_netCDF4(self, filename, **kwargs):
"""
Write a ``harmonics`` object to netCDF4 file
Parameters
----------
filename: str
full path of output netCDF4 file
units: str, default: 'Geodesy_Normalization'
spherical harmonic units
time_units: str, default 'years'
time variable units
time_longname: str, default 'Date_in_Decimal_Years'
time variable description
months_name: str, default 'month'
name of months variable
months_units: str, default 'number'
months variable units
months_longname: str, default 'GRACE_month'
months variable description
field_mapping: dict, default {}
mapping between input variables and output netCDF4
attributes: dict, default {}
output netCDF4 variable and file-level attributes
title: str or NoneType, default None
title attribute of dataset
source: str or NoneType, default None
source attribute of dataset
reference: str or NoneType, default None
reference attribute of dataset
date: bool, default True
``harmonics`` objects contain date information
clobber: bool, default True
Overwrite an existing netCDF4 file
verbose: bool, default False
Output file and variable information
"""
# set default keyword arguments
kwargs.setdefault('units','Geodesy_Normalization')
kwargs.setdefault('time_units','years')
kwargs.setdefault('time_longname','Date_in_Decimal_Years')
kwargs.setdefault('months_name','month')
kwargs.setdefault('months_units','number')
kwargs.setdefault('months_longname','GRACE_month')
kwargs.setdefault('field_mapping',{})
attributes = self.attributes.get('ROOT') or {}
kwargs.setdefault('attributes',dict(ROOT=attributes))
kwargs.setdefault('title',None)
kwargs.setdefault('source',None)
kwargs.setdefault('reference',None)
kwargs.setdefault('date',True)
kwargs.setdefault('clobber',True)
kwargs.setdefault('verbose',False)
# setting NetCDF clobber attribute
clobber = 'w' if kwargs['clobber'] else 'a'
# opening netCDF file for writing
self.filename = pathlib.Path(filename).expanduser().absolute()
fileID = netCDF4.Dataset(self.filename, clobber, format="NETCDF4")
# flatten harmonics
temp = self.flatten(date=kwargs['date'])
# mapping between output keys and netCDF4 variable names
if not kwargs['field_mapping']:
kwargs['field_mapping']['l'] = 'l'
kwargs['field_mapping']['m'] = 'm'
kwargs['field_mapping']['clm'] = 'clm'
kwargs['field_mapping']['slm'] = 'slm'
if kwargs['date']:
kwargs['field_mapping']['time'] = 'time'
kwargs['field_mapping']['month'] = kwargs['months_name']
# create attributes dictionary for output variables
if not all(key in kwargs['attributes'] for key in kwargs['field_mapping']):
# Defining attributes for degree and order
kwargs['attributes'][kwargs['field_mapping']['l']] = {}
kwargs['attributes'][kwargs['field_mapping']['l']]['long_name'] = 'spherical_harmonic_degree'
kwargs['attributes'][kwargs['field_mapping']['l']]['units'] = 'Wavenumber'
kwargs['attributes'][kwargs['field_mapping']['m']] = {}
kwargs['attributes'][kwargs['field_mapping']['m']]['long_name'] = 'spherical_harmonic_order'
kwargs['attributes'][kwargs['field_mapping']['m']]['units'] = 'Wavenumber'
# Defining attributes for dataset
kwargs['attributes'][kwargs['field_mapping']['clm']] = {}
kwargs['attributes'][kwargs['field_mapping']['clm']]['long_name'] = 'cosine_spherical_harmonics'
kwargs['attributes'][kwargs['field_mapping']['clm']]['units'] = kwargs['units']
kwargs['attributes'][kwargs['field_mapping']['slm']] = {}
kwargs['attributes'][kwargs['field_mapping']['slm']]['long_name'] = 'sine_spherical_harmonics'
kwargs['attributes'][kwargs['field_mapping']['slm']]['units'] = kwargs['units']
# Defining attributes for date if applicable
if kwargs['date']:
# attributes for date and month (or integer date)
kwargs['attributes'][kwargs['field_mapping']['time']] = {}
kwargs['attributes'][kwargs['field_mapping']['time']]['long_name'] = kwargs['time_longname']
kwargs['attributes'][kwargs['field_mapping']['time']]['units'] = kwargs['time_units']
kwargs['attributes'][kwargs['field_mapping']['month']] = {}
kwargs['attributes'][kwargs['field_mapping']['month']]['long_name'] = kwargs['months_longname']
kwargs['attributes'][kwargs['field_mapping']['month']]['units'] = kwargs['months_units']
# add default global (file-level) attributes
if kwargs['title']:
kwargs['attributes']['ROOT']['title'] = kwargs['title']
if kwargs['source']:
kwargs['attributes']['ROOT']['source'] = kwargs['source']
if kwargs['reference']:
kwargs['attributes']['ROOT']['reference'] = kwargs['reference']
# netCDF4 dimension variables
dimensions = []
dimensions.append('lm')
fileID.createDimension('lm', len(temp.l))
# defining netCDF4 temporal dimension
if kwargs['date']:
temp.expand_dims(update_dimensions=False)
dimensions.append('time')
fileID.createDimension('time', len(temp.time))
# defining and filling the netCDF variables
nc = {}
for field,key in kwargs['field_mapping'].items():
val = getattr(temp, field)
if field in ('l','m'):
dims = (dimensions[0],)
elif field in ('time','month'):
dims = (dimensions[1],)
else:
dims = tuple(dimensions)
# create netCDF4 variable
nc[key] = fileID.createVariable(key, val.dtype, dims)
nc[key][:] = val[:]
# filling netCDF dataset attributes
for att_name,att_val in kwargs['attributes'][key].items():
# skip variable attribute if None
if not att_val:
continue
# skip variable attributes if in list
if att_name not in ('DIMENSION_LIST','CLASS','NAME'):
nc[key].setncattr(att_name, att_val)
# global attributes of NetCDF4 file
for att_name,att_val in kwargs['attributes']['ROOT'].items():
fileID.setncattr(att_name, att_val)
# add software information
fileID.software_reference = gravity_toolkit.version.project_name
fileID.software_version = gravity_toolkit.version.full_version
# date created
fileID.date_created = time.strftime('%Y-%m-%d',time.localtime())
# Output netCDF structure information
logging.info(self.filename)
logging.info(list(fileID.variables.keys()))
# Closing the netCDF file
fileID.close()
[docs]
def to_HDF5(self, filename, **kwargs):
"""
Write a ``harmonics`` object to HDF5 file
Parameters
----------
filename: str
full path of output HDF5 file
units: str, default: 'Geodesy_Normalization'
spherical harmonic units
time_units: str, default 'years'
time variable units
time_longname: str, default 'Date_in_Decimal_Years'
time variable description
months_name: str, default 'month'
name of months variable
months_units: str, default 'number'
months variable units
months_longname: str, default 'GRACE_month'
months variable description
field_mapping: dict, default {}
mapping between input variables and output HDF5
attributes: dict, default {}
output HDF5 variable and file-level attributes
title: str or NoneType, default None
description attribute of dataset
source: str or NoneType, default None
source attribute of dataset
reference: str or NoneType, default None
reference attribute of dataset
date: bool, default True
``harmonics`` objects contain date information
clobber: bool, default True
Overwrite an existing HDF5 file
verbose: bool, default False
Output file and variable information
"""
# set default keyword arguments
kwargs.setdefault('units','Geodesy_Normalization')
kwargs.setdefault('time_units','years')
kwargs.setdefault('time_longname','Date_in_Decimal_Years')
kwargs.setdefault('months_name','month')
kwargs.setdefault('months_units','number')
kwargs.setdefault('months_longname','GRACE_month')
kwargs.setdefault('field_mapping',{})
attributes = self.attributes.get('ROOT') or {}
kwargs.setdefault('attributes',dict(ROOT=attributes))
kwargs.setdefault('title',None)
kwargs.setdefault('source',None)
kwargs.setdefault('reference',None)
kwargs.setdefault('date',True)
kwargs.setdefault('clobber',True)
kwargs.setdefault('verbose',False)
# setting HDF5 clobber attribute
clobber = 'w' if kwargs['clobber'] else 'w-'
# opening HDF5 file for writing
self.filename = pathlib.Path(filename).expanduser().absolute()
fileID = h5py.File(self.filename, clobber)
# flatten harmonics
temp = self.flatten(date=kwargs['date'])
if kwargs['date']:
temp.expand_dims(update_dimensions=False)
# mapping between output keys and HDF5 variable names
if not kwargs['field_mapping']:
kwargs['field_mapping']['l'] = 'l'
kwargs['field_mapping']['m'] = 'm'
kwargs['field_mapping']['clm'] = 'clm'
kwargs['field_mapping']['slm'] = 'slm'
if kwargs['date']:
kwargs['field_mapping']['time'] = 'time'
kwargs['field_mapping']['month'] = kwargs['months_name']
# create attributes dictionary for output variables
if not all(key in kwargs['attributes'] for key in kwargs['field_mapping']):
# Defining attributes for degree and order
kwargs['attributes'][kwargs['field_mapping']['l']] = {}
kwargs['attributes'][kwargs['field_mapping']['l']]['long_name'] = 'spherical_harmonic_degree'
kwargs['attributes'][kwargs['field_mapping']['l']]['units'] = 'Wavenumber'
kwargs['attributes'][kwargs['field_mapping']['m']] = {}
kwargs['attributes'][kwargs['field_mapping']['m']]['long_name'] = 'spherical_harmonic_order'
kwargs['attributes'][kwargs['field_mapping']['m']]['units'] = 'Wavenumber'
# Defining attributes for dataset
kwargs['attributes'][kwargs['field_mapping']['clm']] = {}
kwargs['attributes'][kwargs['field_mapping']['clm']]['long_name'] = 'cosine_spherical_harmonics'
kwargs['attributes'][kwargs['field_mapping']['clm']]['units'] = kwargs['units']
kwargs['attributes'][kwargs['field_mapping']['slm']] = {}
kwargs['attributes'][kwargs['field_mapping']['slm']]['long_name'] = 'sine_spherical_harmonics'
kwargs['attributes'][kwargs['field_mapping']['slm']]['units'] = kwargs['units']
# Defining attributes for date if applicable
if kwargs['date']:
# attributes for date and month (or integer date)
kwargs['attributes'][kwargs['field_mapping']['time']] = {}
kwargs['attributes'][kwargs['field_mapping']['time']]['long_name'] = kwargs['time_longname']
kwargs['attributes'][kwargs['field_mapping']['time']]['units'] = kwargs['time_units']
kwargs['attributes'][kwargs['field_mapping']['month']] = {}
kwargs['attributes'][kwargs['field_mapping']['month']]['long_name'] = kwargs['months_longname']
kwargs['attributes'][kwargs['field_mapping']['month']]['units'] = kwargs['months_units']
# add default global (file-level) attributes
if kwargs['title']:
kwargs['attributes']['ROOT']['title'] = kwargs['title']
if kwargs['source']:
kwargs['attributes']['ROOT']['source'] = kwargs['source']
if kwargs['reference']:
kwargs['attributes']['ROOT']['reference'] = kwargs['reference']
# Defining the HDF5 dataset variables
h5 = {}
for field,key in kwargs['field_mapping'].items():
val = getattr(temp, field)
h5[key] = fileID.create_dataset(key, val.shape,
data=val, dtype=val.dtype, compression='gzip')
# filling HDF5 dataset attributes
for att_name,att_val in kwargs['attributes'][key].items():
# skip variable attribute if None
if not att_val:
continue
# skip variable attributes if in list
if att_name not in ('DIMENSION_LIST','CLASS','NAME'):
h5[key].attrs[att_name] = att_val
# global attributes of HDF5 file
for att_name,att_val in kwargs['attributes']['ROOT'].items():
fileID.attrs[att_name] = att_val
# add software information
fileID.attrs['software_reference'] = gravity_toolkit.version.project_name
fileID.attrs['software_version'] = gravity_toolkit.version.full_version
# date created
fileID.attrs['date_created'] = time.strftime('%Y-%m-%d',time.localtime())
# Output HDF5 structure information
logging.info(self.filename)
logging.info(list(fileID.keys()))
# Closing the HDF5 file
fileID.close()
[docs]
def to_index(self, filename, file_list, format=None, date=True, **kwargs):
"""
Write a ``harmonics`` object to index of ascii, netCDF4 or HDF5 files
Parameters
----------
filename: str
full path of index file to be written
file_list: list
list of filenames for each output file
format: str or NoneType, default None
format of files in index
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
``harmonics`` object contains date information
verbose: bool, default False
print file and variable information
kwargs: dict
keyword arguments for output writers
"""
# Write index file of output spherical harmonics
self.filename = pathlib.Path(filename).expanduser().absolute()
fid = open(self.filename, mode='w', encoding='utf8')
# set default verbosity
kwargs.setdefault('verbose',False)
# for each file to be in the index
for i,f in enumerate(file_list):
# print filename to index
print(self.compressuser(f), file=fid)
# index harmonics object at i
h = self.index(i, date=date)
# write to file
if (format == 'ascii'):
# ascii (.txt)
h.to_ascii(f, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
h.to_netCDF4(f, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
h.to_HDF5(f, date=date, **kwargs)
# close the index file
fid.close()
[docs]
def to_file(self, filename, format=None, date=True, **kwargs):
"""
Write a ``harmonics`` object to a specified format
Parameters
----------
filename: str
full path of output file
format: str or NoneType, default None
file format
- ``'ascii'``
- ``'netCDF4'``
- ``'HDF5'``
date: bool, default True
``harmonics`` object contains date information
verbose: bool, default False
print file and variable information
kwargs: dict
keyword arguments for output writers
"""
# set default verbosity
kwargs.setdefault('verbose',False)
# write to file
if (format == 'ascii'):
# ascii (.txt)
self.to_ascii(filename, date=date, **kwargs)
elif (format == 'netCDF4'):
# netcdf (.nc)
self.to_netCDF4(filename, date=date, **kwargs)
elif (format == 'HDF5'):
# HDF5 (.H5)
self.to_HDF5(filename, date=date, **kwargs)
[docs]
def to_dict(self):
"""
Convert a ``harmonics`` object to a ``dict`` object
Returns
-------
d: dict
converted dictionary object
"""
# assign dictionary variables from self
d = {}
for key in ['l','m','clm','slm','time','month','attributes']:
try:
d[key] = getattr(self, key)
except (AttributeError, KeyError):
pass
# return the dictionary object
return d
[docs]
def to_masked_array(self):
"""
Convert a ``harmonics`` object to a masked numpy array
"""
# assign degree and order fields
self.update_dimensions()
# verify dimensions and get shape
ndim_prev = np.copy(self.ndim)
self.expand_dims()
l1,m1,nt = self.shape
# create single triangular matrices with harmonics
Ylms = np.ma.zeros((self.lmax+1,2*self.lmax+1,nt))
Ylms.mask = np.ones((self.lmax+1,2*self.lmax+1,nt),dtype=bool)
for m in range(-self.mmax,self.mmax+1):
mm = np.abs(m)
for l in range(mm,self.lmax+1):
if (m < 0):
Ylms.data[l,self.lmax+m,:] = self.slm[l,mm,:]
Ylms.mask[l,self.lmax+m,:] = False
else:
Ylms.data[l,self.lmax+m,:] = self.clm[l,mm,:]
Ylms.mask[l,self.lmax+m,:] = False
# reshape to previous
if (self.ndim != ndim_prev):
self.squeeze()
# return the triangular matrix
return Ylms
[docs]
def to_coo_array(self):
"""
Convert data arrays to a COO sparse matrices
"""
# assign degree and order fields
self.update_dimensions()
# create COO sparse matrices
self.clm = sparse.COO(self.clm)
self.slm = sparse.COO(self.slm)
# return the harmonics object
return self
[docs]
def update_dimensions(self):
"""
Update the dimension variables of the ``harmonics`` object
"""
# calculate spherical harmonic degree and order (0 is falsy)
self.l=np.arange(self.lmax+1) if (self.lmax is not None) else None
self.m=np.arange(self.mmax+1) if (self.mmax is not None) else None
return self
[docs]
def add(self, temp):
"""
Add two ``harmonics`` objects
Parameters
----------
temp: obj
harmonic object to be added
"""
# assign degree and order fields
self.update_dimensions()
temp.update_dimensions()
l1 = self.lmax+1 if (temp.lmax > self.lmax) else temp.lmax+1
m1 = self.mmax+1 if (temp.mmax > self.mmax) else temp.mmax+1
if (self.ndim == 2):
self.clm[:l1,:m1] += temp.clm[:l1,:m1]
self.slm[:l1,:m1] += temp.slm[:l1,:m1]
elif (self.ndim == 3) and (temp.ndim == 2):
for i,t in enumerate(self.time):
self.clm[:l1,:m1,i] += temp.clm[:l1,:m1]
self.slm[:l1,:m1,i] += temp.slm[:l1,:m1]
else:
self.clm[:l1,:m1,:] += temp.clm[:l1,:m1,:]
self.slm[:l1,:m1,:] += temp.slm[:l1,:m1,:]
return self
[docs]
def subtract(self, temp):
"""
Subtract one ``harmonics`` object from another
Parameters
----------
temp: obj
harmonic object to be subtracted
"""
# assign degree and order fields
self.update_dimensions()
temp.update_dimensions()
l1 = self.lmax+1 if (temp.lmax > self.lmax) else temp.lmax+1
m1 = self.mmax+1 if (temp.mmax > self.mmax) else temp.mmax+1
if (self.ndim == 2):
self.clm[:l1,:m1] -= temp.clm[:l1,:m1]
self.slm[:l1,:m1] -= temp.slm[:l1,:m1]
elif (self.ndim == 3) and (temp.ndim == 2):
for i,t in enumerate(self.time):
self.clm[:l1,:m1,i] -= temp.clm[:l1,:m1]
self.slm[:l1,:m1,i] -= temp.slm[:l1,:m1]
else:
self.clm[:l1,:m1,:] -= temp.clm[:l1,:m1,:]
self.slm[:l1,:m1,:] -= temp.slm[:l1,:m1,:]
return self
[docs]
def multiply(self, temp):
"""
Multiply two ``harmonics`` objects
Parameters
----------
temp: obj
harmonic object to be multiplied
"""
# assign degree and order fields
self.update_dimensions()
temp.update_dimensions()
l1 = self.lmax+1 if (temp.lmax > self.lmax) else temp.lmax+1
m1 = self.mmax+1 if (temp.mmax > self.mmax) else temp.mmax+1
if (self.ndim == 2):
self.clm[:l1,:m1] *= temp.clm[:l1,:m1]
self.slm[:l1,:m1] *= temp.slm[:l1,:m1]
elif (self.ndim == 3) and (temp.ndim == 2):
for i,t in enumerate(self.time):
self.clm[:l1,:m1,i] *= temp.clm[:l1,:m1]
self.slm[:l1,:m1,i] *= temp.slm[:l1,:m1]
else:
self.clm[:l1,:m1,:] *= temp.clm[:l1,:m1,:]
self.slm[:l1,:m1,:] *= temp.slm[:l1,:m1,:]
return self
[docs]
def divide(self, temp):
"""
Divide one ``harmonics`` object from another
Parameters
----------
temp: obj
harmonic object to be divided
"""
# assign degree and order fields
self.update_dimensions()
temp.update_dimensions()
l1 = self.lmax+1 if (temp.lmax > self.lmax) else temp.lmax+1
m1 = self.mmax+1 if (temp.mmax > self.mmax) else temp.mmax+1
# indices for cosine spherical harmonics (including zonals)
lc,mc = np.tril_indices(l1, m=m1)
# indices for sine spherical harmonics (excluding zonals)
m0 = np.nonzero(mc != 0)
ls,ms = (lc[m0],mc[m0])
if (self.ndim == 2):
self.clm[lc,mc] /= temp.clm[lc,mc]
self.slm[ls,ms] /= temp.slm[ls,ms]
elif (self.ndim == 3) and (temp.ndim == 2):
for i,t in enumerate(self.time):
self.clm[lc,mc,i] /= temp.clm[lc,mc]
self.slm[ls,ms,i] /= temp.slm[ls,ms]
else:
self.clm[lc,mc,:] /= temp.clm[lc,mc,:]
self.slm[ls,ms,:] /= temp.slm[ls,ms,:]
return self
[docs]
def copy(self):
"""
Copy a ``harmonics`` object to a new ``harmonics`` object
"""
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
# copy attributes or update attributes dictionary
if isinstance(self.attributes, list):
setattr(temp,'attributes',self.attributes)
elif isinstance(self.attributes, dict):
temp.attributes.update(self.attributes)
# try to assign variables to self
for key in ['clm','slm','time','month','filename']:
try:
val = getattr(self, key)
setattr(temp, key, copy.copy(val))
except AttributeError:
pass
# assign degree and order fields
temp.update_dimensions()
return temp
[docs]
def zeros(self, lmax=None, mmax=None, nt=None):
"""
Create an empty ``harmonics`` object
"""
# assign maximum degree and order
if lmax is not None:
self.lmax = np.int64(lmax)
if mmax is not None:
self.mmax = np.int64(mmax)
# use default maximum order
if self.mmax is None:
self.mmax = np.copy(self.lmax)
# assign variables to self
if nt is not None:
self.clm = np.zeros((self.lmax+1, self.mmax+1, nt))
self.slm = np.zeros((self.lmax+1, self.mmax+1, nt))
self.time = np.zeros((nt))
self.month = np.zeros((nt), dtype=int)
else:
self.clm = np.zeros((self.lmax+1, self.mmax+1))
self.slm = np.zeros((self.lmax+1, self.mmax+1))
# assign degree and order fields
self.update_dimensions()
return self
[docs]
def zeros_like(self):
"""
Create a ``harmonics`` object using the dimensions of another
"""
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
# assign variables to temp
for key in ['clm','slm','time','month']:
try:
val = getattr(self, key)
setattr(temp, key, np.zeros_like(val))
except AttributeError:
pass
# assign degree and order fields
temp.update_dimensions()
return temp
[docs]
def expand_dims(self, update_dimensions=True):
"""
Add a singleton dimension to a ``harmonics`` object if non-existent
Parameters
----------
update_dimensions: bool, default True
Update the degree and order dimensions
"""
# change time dimensions to be iterable
self.time = np.atleast_1d(self.time)
self.month = np.atleast_1d(self.month)
# output harmonics with a third dimension
if (self.ndim == 2) and not self.flattened:
self.clm = self.clm[:,:,None]
self.slm = self.slm[:,:,None]
elif (self.ndim == 1) and self.flattened:
self.clm = self.clm[:,None]
self.slm = self.slm[:,None]
# assign degree and order fields
if update_dimensions:
self.update_dimensions()
# return the expanded harmonics object
return self
[docs]
def squeeze(self, update_dimensions=True):
"""
Remove singleton dimensions from a ``harmonics`` object
Parameters
----------
update_dimensions: bool, default True
Update the degree and order dimensions
"""
# squeeze singleton dimensions
try:
self.clm = np.squeeze(self.clm, axis=-1)
self.slm = np.squeeze(self.slm, axis=-1)
self.time = np.squeeze(self.time)
self.month = np.squeeze(self.month)
except ValueError as exc:
pass
# assign degree and order fields
if update_dimensions:
self.update_dimensions()
else:
self.ndim = self.clm.ndim
self.shape = self.clm.shape
return self
[docs]
def flatten(self, date=True):
"""
Flatten ``harmonics`` matrices into arrays
Parameters
----------
date: bool, default True
``harmonics`` objects contain date information
"""
n_harm = (self.lmax**2 + 3*self.lmax - (self.lmax-self.mmax)**2 -
(self.lmax-self.mmax))//2 + 1
# restructured degree and order
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
temp.l = np.zeros((n_harm,), dtype=np.int64)
temp.m = np.zeros((n_harm,), dtype=np.int64)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
# copy date variables if applicable
if date:
temp.time = np.copy(self.time)
temp.month = np.copy(self.month)
# restructured spherical harmonic arrays
if (self.clm.ndim == 2):
temp.clm = np.zeros((n_harm))
temp.slm = np.zeros((n_harm))
else:
n = self.clm.shape[-1]
temp.clm = np.zeros((n_harm,n))
temp.slm = np.zeros((n_harm,n))
# create counter variable lm
lm = 0
for m in range(0,self.mmax+1):# MMAX+1 to include MMAX
for l in range(m,self.lmax+1):# LMAX+1 to include LMAX
temp.l[lm] = np.int64(l)
temp.m[lm] = np.int64(m)
if (self.clm.ndim == 2):
temp.clm[lm] = self.clm[l,m]
temp.slm[lm] = self.slm[l,m]
else:
temp.clm[lm,:] = self.clm[l,m,:]
temp.slm[lm,:] = self.slm[l,m,:]
# add 1 to lm counter variable
lm += 1
# update flattened attribute
temp.flattened = True
# return the flattened arrays
return temp
[docs]
def expand(self, date=True):
"""
Expand flattened ``harmonics`` into matrices
Parameters
----------
date: bool, default True
``harmonics`` objects contain date information
"""
# number of harmonics
n_harm = len(self.l)
# restructured degree and order
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
# copy date variables if applicable
if date:
temp.time = np.copy(self.time)
temp.month = np.copy(self.month)
# restructured spherical harmonic matrices
if (self.clm.ndim == 1):
temp.clm = np.zeros((self.lmax+1,self.mmax+1))
temp.slm = np.zeros((self.lmax+1,self.mmax+1))
else:
n = self.clm.shape[-1]
temp.clm = np.zeros((self.lmax+1,self.mmax+1,n))
temp.slm = np.zeros((self.lmax+1,self.mmax+1,n))
# create counter variable lm
for lm in range(n_harm):
l = self.l[lm]
m = self.m[lm]
if (self.clm.ndim == 1):
temp.clm[l,m] = self.clm[lm]
temp.slm[l,m] = self.slm[lm]
else:
temp.clm[l,m,:] = self.clm[lm,:]
temp.slm[l,m,:] = self.slm[lm,:]
# update flattened attribute
temp.flattened = False
# assign degree and order fields
temp.update_dimensions()
# return the expanded harmonics object
return temp
[docs]
def index(self, indice, date=True):
"""
Subset a ``harmonics`` object to specific index
Parameters
----------
indice: int
index in matrix for subsetting
date: bool, default True
``harmonics`` objects contain date information
"""
# output harmonics object
temp = harmonics(lmax=np.copy(self.lmax), mmax=np.copy(self.mmax))
# subset output harmonics
temp.clm = self.clm[:,:,indice].copy()
temp.slm = self.slm[:,:,indice].copy()
# subset output dates
if date:
temp.time = self.time[indice].copy()
temp.month = self.month[indice].copy()
# subset filenames if applicable
if getattr(self, 'filename'):
if isinstance(self.filename, (list, tuple, np.ndarray)):
temp.filename = str(self.filename[indice])
elif isinstance(self.filename, str):
temp.filename = copy.copy(self.filename)
# assign degree and order fields
temp.update_dimensions()
# return the subsetted object
return temp
[docs]
def subset(self, months):
"""
Subset a ``harmonics`` object to specific GRACE/GRACE-FO months
Parameters
----------
months: int
GRACE/GRACE-FO to subset
"""
# check if months is an array or a single value
months = np.atleast_1d(months)
# number of months
n = len(months)
# check that all months are available
months_check = list(set(months) - set(self.month))
if months_check:
m = ','.join([f'{m:03d}' for m in months_check])
raise IOError(f'GRACE/GRACE-FO months {m} not Found')
# indices to sort data objects
months_list = [i for i,m in enumerate(self.month) if m in months]
# output harmonics object
temp = harmonics(lmax=np.copy(self.lmax), mmax=np.copy(self.mmax))
# create output harmonics
temp.clm = np.zeros((temp.lmax+1,temp.mmax+1,n))
temp.slm = np.zeros((temp.lmax+1,temp.mmax+1,n))
temp.time = np.zeros((n))
temp.month = np.zeros((n),dtype=np.int64)
temp.filename = []
# for each indice
for t,i in enumerate(months_list):
temp.clm[:,:,t] = self.clm[:,:,i].copy()
temp.slm[:,:,t] = self.slm[:,:,i].copy()
temp.time[t] = self.time[i].copy()
temp.month[t] = self.month[i].copy()
# subset filenames if applicable
if getattr(self, 'filename'):
if isinstance(self.filename, list):
temp.filename.append(str(self.filename[i]))
elif isinstance(self.filename, str):
temp.filename.append(self.filename)
# assign degree and order fields
temp.update_dimensions()
# remove singleton dimensions if importing a single value
return temp.squeeze()
[docs]
def truncate(self, lmax, lmin=0, mmax=None):
"""
Truncate or expand a ``harmonics`` object to a new degree and order
Parameters
----------
lmax: int
maximum degree of spherical harmonics
lmin: int, default 0
minimum degree of spherical harmonics
mmax: int or NoneType, default None
maximum order of spherical harmonics
"""
# output harmonics dimensions
lmax = np.copy(self.lmax) if (lmax is None) else lmax
mmax = np.copy(lmax) if (mmax is None) else mmax
# copy prior harmonics object
temp = self.copy()
# set new degree and order
self.lmax = np.copy(lmax)
self.mmax = np.copy(mmax) if mmax else np.copy(lmax)
# truncation levels
l1 = self.lmax+1 if (temp.lmax > self.lmax) else temp.lmax+1
m1 = self.mmax+1 if (temp.mmax > self.mmax) else temp.mmax+1
# create output harmonics
if (temp.ndim == 3):
# number of months
n = temp.clm.shape[-1]
self.clm = np.zeros((self.lmax+1,self.mmax+1,n))
self.slm = np.zeros((self.lmax+1,self.mmax+1,n))
self.clm[lmin:l1,:m1,:] = temp.clm[lmin:l1,:m1,:].copy()
self.slm[lmin:l1,:m1,:] = temp.slm[lmin:l1,:m1,:].copy()
else:
self.clm = np.zeros((self.lmax+1,self.mmax+1))
self.slm = np.zeros((self.lmax+1,self.mmax+1))
self.clm[lmin:l1,:m1] = temp.clm[lmin:l1,:m1].copy()
self.slm[lmin:l1,:m1] = temp.slm[lmin:l1,:m1].copy()
# assign degree and order fields
self.update_dimensions()
# return the truncated or expanded harmonics object
return self
[docs]
def mean(self, apply=False, indices=Ellipsis):
"""
Compute mean gravitational field and remove from data if specified
Parameters
----------
apply: bool, default False
remove the mean field from the input ``harmonics`` object
indices: int, default Ellipsis
indices of input ``harmonics`` object to compute mean
"""
temp = harmonics(lmax=np.copy(self.lmax), mmax=np.copy(self.mmax))
# allocate for mean field
temp.clm = np.zeros((temp.lmax+1,temp.mmax+1))
temp.slm = np.zeros((temp.lmax+1,temp.mmax+1))
# Computes the mean for each spherical harmonic degree and order
for m in range(0,temp.mmax+1):# MMAX+1 to include l
for l in range(m,temp.lmax+1):# LMAX+1 to include LMAX
# calculate mean static field
temp.clm[l,m] = np.mean(self.clm[l,m,indices])
temp.slm[l,m] = np.mean(self.slm[l,m,indices])
# calculating the time-variable gravity field by removing
# the static component of the gravitational field
if apply:
self.clm[l,m,:] -= temp.clm[l,m]
self.slm[l,m,:] -= temp.slm[l,m]
# calculate mean of temporal variables
for key in ['time','month']:
try:
val = getattr(self, key)
setattr(temp, key, np.mean(val[indices]))
except:
continue
# assign degree and order fields
temp.update_dimensions()
# return the mean field
return temp
[docs]
def scale(self, var):
"""
Multiply a ``harmonics`` object by a constant
Parameters
----------
var: float or np.ndarray
scalar value to which the ``harmonics`` object will be multiplied
"""
# assign degree and order fields
self.update_dimensions()
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
temp.time = np.copy(self.time)
temp.month = np.copy(self.month)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
# multiply by a single constant or a time-variable scalar
if (np.ndim(var) == 0):
temp.clm = var*self.clm
temp.slm = var*self.slm
elif (np.ndim(var) == 1) and (self.ndim == 2):
temp.clm = np.zeros((temp.lmax+1,temp.mmax+1,len(var)))
temp.slm = np.zeros((temp.lmax+1,temp.mmax+1,len(var)))
for i,v in enumerate(var):
temp.clm[:,:,i] = v*self.clm
temp.slm[:,:,i] = v*self.slm
elif (np.ndim(var) == 1) and (self.ndim == 3):
for i,v in enumerate(var):
temp.clm[:,:,i] = v*self.clm[:,:,i]
temp.slm[:,:,i] = v*self.slm[:,:,i]
# assign degree and order fields
temp.update_dimensions()
return temp
[docs]
def power(self, power):
"""
Raise a ``harmonics`` object to a power
Parameters
----------
var: float
power to which the ``harmonics`` object will be raised
"""
# assign degree and order fields
self.update_dimensions()
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
temp.time = np.copy(self.time)
temp.month = np.copy(self.month)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
for key in ['clm','slm']:
val = getattr(self, key)
setattr(temp, key, np.power(val,power))
# assign degree and order fields
temp.update_dimensions()
return temp
[docs]
def drift(self, t, epoch=2003.3):
"""
Integrate a ``harmonics`` rate field over time to calculate drift
Parameters
----------
t: np.ndarray
times for calculating drift
epoch: float
reference epoch for times
"""
# assign degree and order fields
self.update_dimensions()
temp = harmonics(lmax=self.lmax, mmax=self.mmax)
# allocate for drift field
temp.clm = np.zeros((temp.lmax+1,temp.mmax+1,len(t)))
temp.slm = np.zeros((temp.lmax+1,temp.mmax+1,len(t)))
# copy time variables and calculate GRACE/GRACE-FO months
temp.time = np.copy(t)
temp.month = calendar_to_grace(temp.time)
# adjust months to fix special cases if necessary
temp.month = adjust_months(temp.month)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
# calculate drift
for i,ti in enumerate(t):
temp.clm[:,:,i] = self.clm*(ti - epoch)
temp.slm[:,:,i] = self.slm*(ti - epoch)
# assign degree and order fields
temp.update_dimensions()
return temp
[docs]
def convolve(self, var):
"""
Convolve ``harmonics`` with a degree-dependent array
Parameters
----------
var: np.ndarray
degree dependent array for convolution
"""
# assign degree and order fields
self.update_dimensions()
# check if a single field or a temporal field
if (self.ndim == 2):
for l in range(0,self.lmax+1):# LMAX+1 to include LMAX
self.clm[l,:] *= var[l]
self.slm[l,:] *= var[l]
else:
for i,t in enumerate(self.time):
for l in range(0,self.lmax+1):# LMAX+1 to include LMAX
self.clm[l,:,i] *= var[l]
self.slm[l,:,i] *= var[l]
# return the convolved field
return self
[docs]
def destripe(self, **kwargs):
"""
Filters spherical harmonic coefficients for correlated "striping"
errors following :cite:t:`Swenson:2006hu`
Parameters
----------
kwargs: dict
keyword arguments for ``destripe_harmonics``
"""
# assign degree and order fields
self.update_dimensions()
temp = harmonics(lmax=np.copy(self.lmax), mmax=np.copy(self.mmax))
temp.time = np.copy(self.time)
temp.month = np.copy(self.month)
# get filenames if applicable
if getattr(self, 'filename'):
temp.filename = copy.copy(self.filename)
# check if a single field or a temporal field
if (self.ndim == 2):
Ylms = destripe_harmonics(self.clm, self.slm,
LMIN=1, LMAX=self.lmax, MMAX=self.mmax, **kwargs)
temp.clm = Ylms['clm'].copy()
temp.slm = Ylms['slm'].copy()
else:
n = self.shape[-1]
temp.clm = np.zeros((self.lmax+1,self.mmax+1,n))
temp.slm = np.zeros((self.lmax+1,self.mmax+1,n))
for i in range(n):
Ylms = destripe_harmonics(self.clm[:,:,i], self.slm[:,:,i],
LMIN=1, LMAX=self.lmax, MMAX=self.mmax, **kwargs)
temp.clm[:,:,i] = Ylms['clm'].copy()
temp.slm[:,:,i] = Ylms['slm'].copy()
# assign degree and order fields
temp.update_dimensions()
# return the destriped field
return temp
@reify
def amplitude(self):
"""
Degree amplitude of the spherical harmonics
"""
# temporary matrix for squared harmonics
temp = self.power(2)
# check if a single field or a temporal field
if (self.ndim == 2):
# allocate for degree amplitudes
amp = np.zeros((self.lmax+1))
for l in range(self.lmax+1):
# truncate at mmax
m = np.arange(0,temp.mmax+1)
# degree amplitude of spherical harmonic degree
amp[l] = np.sqrt(np.sum(temp.clm[l,m] + temp.slm[l,m]))
else:
# allocate for degree amplitudes
n = self.shape[-1]
amp = np.zeros((self.lmax+1,n))
for l in range(self.lmax+1):
# truncate at mmax
m = np.arange(0,temp.mmax+1)
# degree amplitude of spherical harmonic degree
var = temp.clm[l,m,:] + temp.slm[l,m,:]
amp[l,:] = np.sqrt(np.sum(var, axis=0))
# return the degree amplitudes
return amp
@property
def dtype(self):
"""Main data type of ``harmonics`` object"""
return self.clm.dtype
@property
def shape(self):
"""Dimensions of ``harmonics`` object
"""
return np.shape(self.clm)
@property
def ndim(self):
"""Number of dimensions in ``harmonics`` object
"""
return np.ndim(self.clm)
@reify
def ilm(self):
"""
Complex form of the spherical harmonics
"""
return self.clm - self.slm*1j
def __str__(self):
"""String representation of the ``harmonics`` object
"""
properties = ['gravity_toolkit.harmonics']
properties.append(f" max_degree: {self.lmax}")
if self.mmax and (self.mmax != self.lmax):
properties.append(f" max_order: {self.mmax}")
if self.month:
properties.append(f" start_month: {min(self.month)}")
properties.append(f" end_month: {max(self.month)}")
return '\n'.join(properties)
def __len__(self):
"""Number of months
"""
return len(self.month) if np.any(self.month) else 0
def __iter__(self):
"""Iterate over GRACE/GRACE-FO months
"""
self.__index__ = 0
return self
def __next__(self):
"""Get the next month of data
"""
temp = harmonics(lmax=np.copy(self.lmax), mmax=np.copy(self.mmax))
try:
temp.clm = self.clm[:,:,self.__index__].copy()
temp.slm = self.slm[:,:,self.__index__].copy()
except IndexError as exc:
raise StopIteration from exc
# subset output spatial time and month
try:
temp.time = self.time[self.__index__].copy()
temp.month = self.month[self.__index__].copy()
except AttributeError as exc:
pass
# subset filename if applicable
if getattr(self, 'filename'):
if isinstance(self.filename, (list, tuple, np.ndarray)):
temp.filename = str(self.filename[self.__index__])
elif isinstance(self.filename, str):
temp.filename = copy.copy(self.filename)
# add to index
self.__index__ += 1
return temp
def __getitem__(self, key):
return getattr(self, key)
def __setitem__(self, key, value):
setattr(self, key, value)