"""Data tools for FINAM."""
import copy
import datetime
import numpy as np
import pandas as pd
import pint
from ..errors import FinamDataError, FinamMetaDataError
# pylint: disable-next=unused-import
from . import cf_units, grid_spec
from .grid_base import Grid, GridBase
# set default format to cf-convention for pint.dequantify
# some problems with degree_Celsius and similar here
pint.application_registry.default_format = "cf"
UNITS = pint.application_registry
_UNIT_PAIRS_CACHE = {}
_MASK_INDICATORS = ["_FillValue", "missing_value"]
[docs]
def prepare(data, info, time_entries=1, force_copy=False, report_conversion=False):
"""
Prepares data in FINAM's internal transmission format.
Checks tha shape of the data.
Checks or adds units and time dimension.
Parameters
----------
data : arraylike
The input data.
info : Info
Info associated with the data.
time_entries : int, optional
Number of time slices in the data. Default 1.
force_copy : bool, optional
Forces the result to be a copy of the passed data. Default ``False``.
If not used, the result is a view of the data if no units conversion needs to be done.
report_conversion : bool, optional
If true, returns a tuple with the second element indicating the unit conversion if it was required.
Returns
-------
pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None)
The prepared data as a numpy array, wrapped into a :class:`pint.Quantity`.
If ``report_conversion`` is ``True``, a tuple is returned with the second element
indicating the unit conversion if it was required.
The second element is ``None`` if no conversion was required,
and a tuple of two :class:`pint.Unit` objects otherwise.
Raises
------
FinamDataError
If the data doesn't match its info.
"""
units_converted = None
units = info.units
if is_quantified(data):
if not compatible_units(data.units, units):
raise FinamDataError(
f"Given data has incompatible units. "
f"Got {data.units}, expected {units}."
)
if not equivalent_units(data.units, units):
units_converted = data.units, units
data = data.to(units)
elif force_copy:
data = data.copy()
else:
# this covers masked arrays as well
if isinstance(data, np.ndarray):
if force_copy:
data = data.copy()
data = UNITS.Quantity(data, units)
else:
if force_copy:
data = copy.copy(data)
data = UNITS.Quantity(np.asarray(data), units)
data = _check_input_shape(data, info, time_entries)
if report_conversion:
return data, units_converted
return data
def _check_input_shape(data, info, time_entries):
# check correct data size
if isinstance(info.grid, Grid):
time_entries = (
data.shape[0]
if len(data.shape) == len(info.grid.data_shape) + 1
else time_entries
)
data_size = data.size / time_entries
if data_size != info.grid.data_size:
raise FinamDataError(
f"quantify: data size doesn't match grid size. "
f"Got {data_size}, expected {info.grid.data_size}"
)
# check shape of non-flat arrays
if len(data.shape) != 1:
if data.shape[1:] != info.grid.data_shape:
if data.shape == info.grid.data_shape:
data = np.expand_dims(data, 0)
else:
raise FinamDataError(
f"quantify: data shape doesn't match grid shape. "
f"Got {data.shape}, expected {info.grid.data_shape}"
)
else:
# reshape arrays
if time_entries <= 1:
data = data.reshape(
[1] + list(info.grid.data_shape), order=info.grid.order
)
else:
data = data.reshape(
[time_entries] + list(info.grid.data_shape), order=info.grid.order
)
elif isinstance(info.grid, grid_spec.NoGrid):
data = _check_input_shape_no_grid(data, info, time_entries)
return data
def _check_input_shape_no_grid(data, info, time_entries):
if len(data.shape) != info.grid.dim + 1:
if len(data.shape) == info.grid.dim:
data = np.expand_dims(data, 0)
else:
raise FinamDataError(
f"quantify: number of dimensions in data doesn't match expected number. "
f"Got {len(data.shape)}, expected {info.grid.dim}"
)
else:
if data.shape[0] != time_entries:
raise FinamDataError(
f"quantify: number of time entries in data doesn't match expected number. "
f"Got {data.shape[0]}, expected {time_entries}"
)
return data
[docs]
def has_time_axis(xdata, grid):
"""
Check if the data array has a time axis.
Parameters
----------
xdata : numpy.ndarray
The given data array.
grid : GridBase
The associated grid specification
Returns
-------
bool
Whether the data has a time axis.
"""
grid_dim = None
if isinstance(grid, Grid):
grid_dim = len(grid.data_shape)
elif isinstance(grid, grid_spec.NoGrid):
grid_dim = grid.dim
else:
raise ValueError(
f"Expected type Grid or NoGrid, got {grid.__class__.__name__}."
)
if xdata.ndim == grid_dim:
return False
if xdata.ndim == grid_dim + 1:
return True
raise FinamDataError("Data dimension must be grid dimension or grid dimension + 1.")
_BASE_DATETIME = datetime.datetime(1970, 1, 1)
_BASE_TIME = np.datetime64("1970-01-01T00:00:00")
_BASE_DELTA = np.timedelta64(1, "s")
[docs]
def to_datetime(date):
"""Converts a numpy datetime64 object to a python datetime object"""
if np.isnan(date):
return pd.NaT
timestamp = (date - _BASE_TIME) / _BASE_DELTA
if timestamp < 0:
return _BASE_DATETIME + datetime.timedelta(seconds=timestamp)
return datetime.datetime.utcfromtimestamp(timestamp)
[docs]
def strip_time(xdata, grid):
"""Returns a view of the data with the time dimension squeezed if there is only a single entry
Parameters
----------
xdata : arraylike
Data to strip time dimension from
grid : GridBase
The associated grid specification
Returns
-------
arraylike
Stripped data
Raises
------
FinamDataError
If the data has multiple time entries.
"""
if has_time_axis(xdata, grid):
if xdata.shape[0] > 1:
raise FinamDataError(
"Can't strip time of a data array with multiple time entries"
)
return xdata[0, ...]
return xdata
[docs]
def get_magnitude(xdata):
"""
Get magnitude of given data.
Parameters
----------
xdata : pint.Quantity
The given data array.
Returns
-------
numpy.ndarray
Magnitude of given data.
"""
check_quantified(xdata, "get_magnitude")
return xdata.magnitude
[docs]
def get_units(xdata):
"""
Get units of the data.
Parameters
----------
xdata : DataArray
The given data array.
Returns
-------
pint.Unit
Units of the data.
"""
check_quantified(xdata, "get_units")
return xdata.units
[docs]
def get_dimensionality(xdata):
"""
Get dimensionality of the data.
Parameters
----------
xdata : pint.Quantity
The given data array.
Returns
-------
pint.UnitsContainer
Dimensionality of the data.
"""
check_quantified(xdata, "get_dimensionality")
return xdata.dimensionality
[docs]
def to_units(xdata, units, check_equivalent=False, report_conversion=False):
"""
Convert data to given units.
Parameters
----------
xdata : pint.Quantity
The given data array.
units : str or pint.Unit
Desired units.
check_equivalent : bool, optional
Checks for equivalent units and simply re-assigns if possible.
report_conversion : bool, optional
If true, returns a tuple with the second element indicating the unit conversion if it was required.
Returns
-------
pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None)
The converted data.
If ``report_conversion`` is ``True``, a tuple is returned with the second element
indicating the unit conversion if it was required.
The second element is ``None`` if no conversion was required,
and a tuple of two :class:`pint.Unit` objects otherwise.
"""
check_quantified(xdata, "to_units")
units = _get_pint_units(units)
units2 = xdata.units
conversion = None
if units != units2:
if check_equivalent and equivalent_units(units, units2):
xdata = UNITS.Quantity(xdata.magnitude, units)
else:
xdata = xdata.to(units)
conversion = units2, units
if report_conversion:
return xdata, conversion
return xdata
[docs]
def full_like(xdata, value):
"""
Return a new data array with the same shape, type and units as a given object.
Parameters
----------
xdata : :class:`pint.Quantity` or :class:`numpy.ndarray`
The reference object input.
value : scalar
Value to fill the new object with before returning it.
Returns
-------
pint.Quantity or numpy.ndarray
New object with the same shape and type as other,
with the data filled with fill_value.
Units will be taken from the input if present.
"""
data = np.full_like(xdata, value)
if is_quantified(xdata):
return UNITS.Quantity(data, xdata.units)
return data
[docs]
def full(value, info):
"""
Return a new data array with units according to the given info, filled with given value.
Parameters
----------
value : scalar
Value to fill the new object with before returning it.
info : Info
Info associated with the data.
Returns
-------
pint.Quantity
The converted data.
"""
shape = info.grid.data_shape if isinstance(info.grid, Grid) else tuple()
return prepare(np.full([1] + list(shape), value), info)
[docs]
def check(xdata, info):
"""
Check if data matches given info.
Parameters
----------
xdata : numpy.ndarray
The given data array.
info : Info
Info associated with the data.
Raises
------
FinamDataError
If data doesn't match given info.
"""
check_quantified(xdata, "check")
if not has_time_axis(xdata, info.grid):
raise FinamDataError("check: given data should have a time dimension.")
_check_shape(xdata.shape[1:], info.grid)
# check units
if not compatible_units(info.units, xdata):
raise FinamDataError(
f"check: given data has incompatible units. "
f"Got {get_units(xdata)}, expected {info.units}."
)
def _check_shape(shape, grid):
if isinstance(grid, Grid) and shape != grid.data_shape:
raise FinamDataError(
f"check: given data has wrong shape. "
f"Got {shape}, expected {grid.data_shape}"
)
if isinstance(grid, grid_spec.NoGrid) and len(shape) != grid.dim:
raise FinamDataError(
f"check: given data has wrong number of dimensions. "
f"Got {len(shape)}, expected {grid.dim}"
)
[docs]
def is_quantified(xdata):
"""
Check if data is a quantified DataArray.
Parameters
----------
xdata : Any
The given data array.
Returns
-------
bool
Whether the data is a quantified DataArray.
"""
return isinstance(xdata, pint.Quantity)
[docs]
def is_masked_array(data):
"""
Check if data is a masked array.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray.
"""
if is_quantified(data):
return np.ma.isMaskedArray(data.magnitude)
return np.ma.isMaskedArray(data)
[docs]
def has_masked_values(data):
"""
Determine whether the data has masked values.
Parameters
----------
data : Any
The given data array.
Returns
-------
bool
Whether the data is a MaskedArray and has any masked values.
"""
return np.ma.is_masked(data)
[docs]
def filled(data, fill_value=None):
"""
Return input as an array with masked data replaced by a fill value.
This routine respects quantified and un-quantified data.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
fill_value : array_like, optional
The value to use for invalid entries. Can be scalar or non-scalar.
If non-scalar, the resulting ndarray must be broadcastable over
input array. Default is None, in which case, the `fill_value`
attribute of the array is used instead.
Returns
-------
pint.Quantity or numpy.ndarray
New object with the same shape and type as other,
with the data filled with fill_value.
Units will be taken from the input if present.
See also
--------
:func:`numpy.ma.filled`:
Numpy routine doing the same.
"""
if not is_masked_array(data):
return data
if is_quantified(data):
return UNITS.Quantity(data.magnitude.filled(fill_value), data.units)
return data.filled(fill_value)
[docs]
def to_masked(data, **kwargs):
"""
Return a masked version of the data.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
pint.Quantity or numpy.ma.MaskedArray
New object with the same shape and type but as a masked array.
Units will be taken from the input if present.
"""
if is_masked_array(data) and not kwargs:
return data
if is_quantified(data):
return UNITS.Quantity(np.ma.array(data.magnitude, **kwargs), data.units)
return np.ma.array(data, **kwargs)
[docs]
def to_compressed(xdata, order="C"):
"""
Return all the non-masked data as a 1-D array respecting the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
order : str
order argument for :any:`numpy.ravel`
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the flat shape and only unmasked data but and same type as input.
Units will be taken from the input if present.
See also
--------
:func:`numpy.ma.compressed`:
Numpy routine doing the same but only for C-order.
"""
if is_masked_array(xdata):
data = np.ravel(xdata.data, order)
if xdata.mask is not np.ma.nomask:
data = data.compress(np.logical_not(np.ravel(xdata.mask, order)))
return quantify(data, xdata.units) if is_quantified(xdata) else data
return np.reshape(xdata, -1, order=order)
[docs]
def from_compressed(xdata, shape, order="C", **kwargs):
"""
Fill a (masked) array following a given mask or shape with the provided data.
This will only create a masked array if kwargs are given (especially a mask).
Otherwise this is simply reshaping the given data.
Filling is performed in the given array order.
Parameters
----------
data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
The reference object input.
shape : str
shape argument for :any:`numpy.reshape`
order : str
order argument for :any:`numpy.reshape`
**kwargs
keyword arguments forwarded to :any:`numpy.ma.array`
Returns
-------
:class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray`
New object with the desired shape and same type as input.
Units will be taken from the input if present.
Will only be a masked array if kwargs are given.
See also
--------
to_compressed:
Inverse operation.
:any:`numpy.ma.array`:
Routine consuming kwargs to create a masked array.
:any:`numpy.reshape`:
Equivalent routine if no mask is provided.
Notes
-----
If both `mask` and `shape` are given, they need to match in size.
"""
if kwargs:
if "mask" in kwargs:
mask = np.reshape(kwargs["mask"], -1, order=order)
if is_quantified(xdata):
# pylint: disable-next=unexpected-keyword-arg
data = quantify(np.empty_like(xdata, shape=np.size(mask)), xdata.units)
else:
# pylint: disable-next=unexpected-keyword-arg
data = np.empty_like(xdata, shape=np.size(mask))
data[~mask] = xdata
data = np.reshape(data, shape, order=order)
else:
data = np.reshape(xdata, shape, order=order)
return to_masked(data, **kwargs)
return np.reshape(xdata, shape, order=order)
[docs]
def check_data_covers_domain(data, mask=None):
"""
Check if the given data covers a domain defined by a mask on the same grid.
Parameters
----------
data : Any
The given data array for a single time-step.
mask : None or bool or array of bool, optional
Mask defining the target domain on the same grid as the data,
by default None
Returns
-------
bool
Whether the data covers the desired domain.
Raises
------
ValueError
When mask is given and mask and data don't share the same shape.
"""
if not _is_single_mask_value(mask) and np.shape(mask) != np.shape(data):
raise ValueError("check_data_covers_domain: mask and data shape differ.")
if not has_masked_values(data):
return True
if _is_single_mask_value(mask):
return bool(mask)
return np.all(mask[data.mask])
def _is_single_mask_value(mask):
return mask is None or mask is np.ma.nomask or mask is False or mask is True
[docs]
def quantify(xdata, units=None):
"""
Quantifies data.
Parameters
----------
xdata : Any
The given data array.
units :
Returns
-------
pint.Quantity
The quantified array.
"""
if is_quantified(xdata):
raise FinamDataError(f"Data is already quantified with units '{xdata.units}'")
return UNITS.Quantity(xdata, _get_pint_units(units or UNITS.dimensionless))
[docs]
def check_quantified(xdata, routine="check_quantified"):
"""
Check if data is a quantified DataArray.
Parameters
----------
xdata : numpy.ndarray
The given data array.
routine : str, optional
Name of the routine to show in the Error, by default "check_quantified"
Raises
------
FinamDataError
If the array is not a quantified DataArray.
"""
if not is_quantified(xdata):
raise FinamDataError(f"{routine}: given data is not quantified.")
def _get_pint_units(var):
if var is None:
raise FinamDataError("Can't extract units from 'None'.")
if isinstance(var, pint.Unit):
return var
if isinstance(var, pint.Quantity):
return var.units or UNITS.dimensionless
return UNITS.Unit(var)
def compatible_units(unit1, unit2):
"""
Checks if two units are compatible/convertible.
Parameters
----------
unit1 : UnitLike or Quantified
First unit to compare.
unit2 : UnitLike or Quantified
Second unit to compare.
Returns
-------
bool
Unit compatibility.
"""
unit1, unit2 = _get_pint_units(unit1), _get_pint_units(unit2)
comp_equiv = _UNIT_PAIRS_CACHE.get((unit1, unit2))
if comp_equiv is None:
comp_equiv = _cache_units(unit1, unit2)
return comp_equiv[0]
def equivalent_units(unit1, unit2):
"""
Check if two given units are equivalent.
Parameters
----------
unit1 : UnitLike or Quantified
First unit to compare.
unit2 : UnitLike or Quantified
Second unit to compare.
Returns
-------
bool
Unit equivalence.
"""
unit1, unit2 = _get_pint_units(unit1), _get_pint_units(unit2)
comp_equiv = _UNIT_PAIRS_CACHE.get((unit1, unit2))
if comp_equiv is None:
comp_equiv = _cache_units(unit1, unit2)
return comp_equiv[1]
def _cache_units(unit1, unit2):
equiv = False
compat = False
try:
equiv = np.isclose((1.0 * unit1).to(unit2).magnitude, 1.0)
compat = True
except pint.errors.DimensionalityError:
pass
_UNIT_PAIRS_CACHE[(unit1, unit2)] = compat, equiv
return compat, equiv
def clear_units_cache():
"""Clears the units cache"""
_UNIT_PAIRS_CACHE.clear()
[docs]
def assert_type(cls, slot, obj, types):
"""Type assertion."""
for t in types:
if isinstance(obj, t):
return
raise TypeError(
f"Unsupported data type for {slot} in "
f"{cls.__class__.__name__}: {obj.__class__.__name__}. "
f"Expected one of [{', '.join([tp.__name__ for tp in types])}]"
)
[docs]
class Info:
"""Data info containing grid specification and metadata
Parameters
----------
grid : Grid or NoGrid or None
grid specification
meta : dict
dictionary of metadata
**meta_kwargs
additional metadata by name, will overwrite entries in ``meta``
Attributes
----------
grid : Grid or NoGrid or None
grid specification
meta : dict
dictionary of metadata
"""
def __init__(self, time, grid, meta=None, **meta_kwargs):
if time is not None and not isinstance(time, datetime.datetime):
raise FinamMetaDataError("Time in Info must be either None or a datetime")
if grid is not None and not isinstance(grid, GridBase):
raise FinamMetaDataError(
"Grid in Info must be either None or of a sub-class of GridBase"
)
self.time = time
self.grid = grid
self.meta = meta or {}
self.meta.update(meta_kwargs)
units = self.meta.get("units", "")
units = None if units is None else UNITS.Unit(units)
self.meta["units"] = units
@property
def is_masked(self):
"""bool: whether info indicates masked data ("_FillValue" or "missing_value" in meta)."""
return any(v in self.meta for v in _MASK_INDICATORS)
[docs]
def copy(self):
"""Copies the info object"""
return copy.copy(self)
[docs]
def copy_with(self, use_none=True, **kwargs):
"""Copies the info object and sets variables and meta values according to the kwargs
Parameters
----------
use_none : bool
whether properties with None value should also be transferred
**kwargs
key values pairs for properties to change
"""
other = Info(time=self.time, grid=self.grid, meta=copy.copy(self.meta))
for k, v in kwargs.items():
if k == "time":
if v is not None or use_none:
other.time = v
elif k == "grid":
if v is not None or use_none:
other.grid = v
elif k == "units":
if v is not None or use_none:
other.meta[k] = v if v is None else UNITS.Unit(v)
else:
if v is not None or use_none:
other.meta[k] = v
return other
[docs]
def accepts(self, incoming, fail_info, ignore_none=False):
"""Tests whether this info can accept/is compatible with an incoming info
Parameters
----------
incoming : Info
Incoming/source info to check. This is the info from upstream.
fail_info : dict
Dictionary that will be filled with failed properties; name: (source, target).
ignore_none : bool
Ignores ``None`` values in the incoming info.
Returns
-------
bool
Whether the incoming info is accepted
"""
if not isinstance(incoming, Info):
fail_info["type"] = (incoming.__class__, self.__class__)
return False
success = True
if self.grid is not None and not self.grid.compatible_with(incoming.grid):
if not (ignore_none and incoming.grid is None):
fail_info["grid"] = (incoming.grid, self.grid)
success = False
for k, v in self.meta.items():
if v is not None and k in incoming.meta:
in_value = incoming.meta[k]
if k == "units":
if not (ignore_none and in_value is None) and not compatible_units(
v, in_value
):
fail_info["meta." + k] = (in_value, v)
success = False
else:
if not (ignore_none and in_value is None) and in_value != v:
fail_info["meta." + k] = (in_value, v)
success = False
return success
def __copy__(self):
"""Shallow copy of the info"""
return Info(time=self.time, grid=self.grid, meta=self.meta)
def __eq__(self, other):
"""Equality check for two infos
Ignores time.
"""
if not isinstance(other, Info):
return False
return self.grid == other.grid and self.meta == other.meta
def __getattr__(self, name):
# only called if attribute is not present in class
if "meta" in self.__dict__ and name in self.meta:
return self.meta[name]
raise AttributeError(f"'Info' object has no attribute '{name}'")
def __setattr__(self, name, value):
# first check if attribute present or meta not yet present (e.g. grid)
if name in self.__dir__() or "meta" not in self.__dict__:
super().__setattr__(name, value)
else:
self.__dict__["meta"][name] = value
def __repr__(self):
grid = self.grid.name if self.grid is not None else "None"
meta = ", " * bool(self.meta)
meta += ", ".join(
f"{k}=" + ("None" if v is None else f"'{v}'") for k, v in self.meta.items()
)
return f"Info(grid={grid}{meta})"
[docs]
def as_dict(self):
"""Returns a ``dict`` containing all metadata in this Info."""
return {
**self.meta,
"grid": f"{self.grid}",
"units": f"{self.units:~}",
}