Source code for finam.data.tools.units

"""Units tools for FINAM."""

import numpy as np
import pint

from ...errors import FinamDataError

# pylint: disable-next=unused-import
from . import cf_units

# set default format to cf-convention for pint.dequantify
# some problems with degree_Celsius and similar here
pint.application_registry.default_format = "cf"
UNITS = pint.application_registry

_UNIT_PAIRS_CACHE = {}


[docs] def get_magnitude(xdata): """ Get magnitude of given data. Parameters ---------- xdata : pint.Quantity The given data array. Returns ------- numpy.ndarray Magnitude of given data. """ check_quantified(xdata, "get_magnitude") return xdata.magnitude
[docs] def get_units(xdata): """ Get units of the data. Parameters ---------- xdata : DataArray The given data array. Returns ------- pint.Unit Units of the data. """ check_quantified(xdata, "get_units") return xdata.units
[docs] def get_dimensionality(xdata): """ Get dimensionality of the data. Parameters ---------- xdata : pint.Quantity The given data array. Returns ------- pint.UnitsContainer Dimensionality of the data. """ check_quantified(xdata, "get_dimensionality") return xdata.dimensionality
[docs] def to_units(xdata, units, check_equivalent=False, report_conversion=False): """ Convert data to given units. Parameters ---------- xdata : pint.Quantity The given data array. units : str or pint.Unit Desired units. check_equivalent : bool, optional Checks for equivalent units and simply re-assigns if possible. report_conversion : bool, optional If true, returns a tuple with the second element indicating the unit conversion if it was required. Returns ------- pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None) The converted data. If ``report_conversion`` is ``True``, a tuple is returned with the second element indicating the unit conversion if it was required. The second element is ``None`` if no conversion was required, and a tuple of two :class:`pint.Unit` objects otherwise. """ check_quantified(xdata, "to_units") units = _get_pint_units(units) units2 = xdata.units conversion = None if units != units2: if check_equivalent and equivalent_units(units, units2): xdata = UNITS.Quantity(xdata.magnitude, units) else: xdata = xdata.to(units) conversion = units2, units if report_conversion: return xdata, conversion return xdata
[docs] def is_quantified(xdata): """ Check if data is a quantified DataArray. Parameters ---------- xdata : Any The given data array. Returns ------- bool Whether the data is a quantified DataArray. """ return isinstance(xdata, pint.Quantity)
[docs] def quantify(xdata, units=None): """ Quantifies data. Parameters ---------- xdata : Any The given data array. units : UnitLike or Quantified or None, optional units to use, dimensionless by default Returns ------- pint.Quantity The quantified array. """ if is_quantified(xdata): raise FinamDataError(f"Data is already quantified with units '{xdata.units}'") return UNITS.Quantity(xdata, _get_pint_units(units or UNITS.dimensionless))
[docs] def check_quantified(xdata, routine="check_quantified"): """ Check if data is a quantified DataArray. Parameters ---------- xdata : numpy.ndarray The given data array. routine : str, optional Name of the routine to show in the Error, by default "check_quantified" Raises ------ FinamDataError If the array is not a quantified DataArray. """ if not is_quantified(xdata): raise FinamDataError(f"{routine}: given data is not quantified.")
def _get_pint_units(var): if var is None: raise FinamDataError("Can't extract units from 'None'.") if isinstance(var, pint.Unit): return var if isinstance(var, pint.Quantity): return var.units or UNITS.dimensionless return UNITS.Unit(var) def compatible_units(unit1, unit2): """ Checks if two units are compatible/convertible. Parameters ---------- unit1 : UnitLike or Quantified First unit to compare. unit2 : UnitLike or Quantified Second unit to compare. Returns ------- bool Unit compatibility. """ unit1, unit2 = _get_pint_units(unit1), _get_pint_units(unit2) comp_equiv = _UNIT_PAIRS_CACHE.get((unit1, unit2)) if comp_equiv is None: comp_equiv = _cache_units(unit1, unit2) return comp_equiv[0] def equivalent_units(unit1, unit2): """ Check if two given units are equivalent. Parameters ---------- unit1 : UnitLike or Quantified First unit to compare. unit2 : UnitLike or Quantified Second unit to compare. Returns ------- bool Unit equivalence. """ unit1, unit2 = _get_pint_units(unit1), _get_pint_units(unit2) comp_equiv = _UNIT_PAIRS_CACHE.get((unit1, unit2)) if comp_equiv is None: comp_equiv = _cache_units(unit1, unit2) return comp_equiv[1] def _cache_units(unit1, unit2): equiv = False compat = False try: equiv = np.isclose((1.0 * unit1).to(unit2).magnitude, 1.0) compat = True except pint.errors.DimensionalityError: pass _UNIT_PAIRS_CACHE[(unit1, unit2)] = compat, equiv return compat, equiv def clear_units_cache(): """Clears the units cache""" _UNIT_PAIRS_CACHE.clear()