Source code for finam.data.tools.mask

"""Mask tools for FINAM."""

from enum import Enum

import numpy as np

from ...errors import FinamDataError
from .units import UNITS, is_quantified, quantify

MASK_INDICATORS = ["_FillValue", "missing_value"]


[docs] class Mask(Enum): """Mask settings for Info.""" FLEX = 0 """Data can be masked or unmasked.""" NONE = 1 """Data is expected to be unmasked and given as plain numpy arrays."""
[docs] def is_masked_array(data): """ Check if data is a masked array. Parameters ---------- data : Any The given data array. Returns ------- bool Whether the data is a MaskedArray. """ if is_quantified(data): return np.ma.isMaskedArray(data.magnitude) return np.ma.isMaskedArray(data)
[docs] def has_masked_values(data): """ Determine whether the data has masked values. Parameters ---------- data : Any The given data array. Returns ------- bool Whether the data is a MaskedArray and has any masked values. """ return np.ma.is_masked(data)
[docs] def filled(data, fill_value=None): """ Return input as an array with masked data replaced by a fill value. This routine respects quantified and un-quantified data. Parameters ---------- data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` The reference object input. fill_value : array_like, optional The value to use for invalid entries. Can be scalar or non-scalar. If non-scalar, the resulting ndarray must be broadcastable over input array. Default is None, in which case, the `fill_value` attribute of the array is used instead. Returns ------- pint.Quantity or numpy.ndarray New object with the same shape and type as other, with the data filled with fill_value. Units will be taken from the input if present. See also -------- :func:`numpy.ma.filled`: Numpy routine doing the same. """ if not is_masked_array(data): return data if is_quantified(data): return UNITS.Quantity(data.magnitude.filled(fill_value), data.units) return data.filled(fill_value)
[docs] def to_masked(data, **kwargs): """ Return a masked version of the data. Parameters ---------- data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` The reference object input. **kwargs keyword arguments forwarded to :any:`numpy.ma.array` Returns ------- pint.Quantity or numpy.ma.MaskedArray New object with the same shape and type but as a masked array. Units will be taken from the input if present. """ if is_masked_array(data) and not kwargs: return data if is_quantified(data): return UNITS.Quantity(np.ma.array(data.magnitude, **kwargs), data.units) return np.ma.array(data, **kwargs)
[docs] def to_compressed(xdata, order="C", mask=None): """ Return all the non-masked data as a 1-D array respecting the given array order. Parameters ---------- data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` The reference object input. order : str order argument for :any:`numpy.ravel` mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray`, optional mask to use when data is not masked already Returns ------- :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` New object with the flat shape and only unmasked data but and same type as input. Units will be taken from the input if present. See also -------- :func:`numpy.ma.compressed`: Numpy routine doing the same but only for C-order. """ is_masked = is_masked_array(xdata) if is_masked or (mask is not None and mask_specified(mask)): data = np.ravel(xdata.data if is_masked else xdata, order) mask = xdata.mask if is_masked else mask if mask is not np.ma.nomask: data = data.compress(np.logical_not(np.ravel(mask, order))) return quantify(data, xdata.units) if is_quantified(xdata) else data return np.reshape(xdata, -1, order=order)
[docs] def from_compressed(xdata, shape, order="C", mask=None, **kwargs): """ Fill a (masked) array following a given mask or shape with the provided data. This will only create a masked array if kwargs are given (especially a mask). Otherwise this is simply reshaping the given data. Filling is performed in the given array order. Parameters ---------- data : :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` The reference object input. shape : str shape argument for :any:`numpy.reshape` order : str order argument for :any:`numpy.reshape` mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` mask to use **kwargs keyword arguments forwarded to :any:`numpy.ma.array` Returns ------- :class:`pint.Quantity` or :class:`numpy.ndarray` or :class:`numpy.ma.MaskedArray` New object with the desired shape and same type as input. Units will be taken from the input if present. Will only be a masked array if kwargs are given. See also -------- to_compressed: Inverse operation. :any:`numpy.ma.array`: Routine consuming kwargs to create a masked array. :any:`numpy.reshape`: Equivalent routine if no mask is provided. Notes ----- If both `mask` and `shape` are given, they need to match in size. """ if mask is None or mask is np.ma.nomask or not mask_specified(mask): if kwargs and mask is Mask.NONE: msg = "from_compressed: Can't create masked array with mask=Mask.NONE" raise FinamDataError(msg) data = np.reshape(xdata, shape, order=order) return to_masked(data, **kwargs) if kwargs or mask is np.ma.nomask else data if is_quantified(xdata): # pylint: disable-next=unexpected-keyword-arg data = quantify(np.empty_like(xdata, shape=np.prod(shape)), xdata.units) else: # pylint: disable-next=unexpected-keyword-arg data = np.empty_like(xdata, shape=np.prod(shape)) data[np.logical_not(np.ravel(mask, order=order))] = xdata return to_masked(np.reshape(data, shape, order=order), mask=mask, **kwargs)
[docs] def check_data_covers_domain(data, mask=None): """ Check if the given data covers a domain defined by a mask on the same grid. Parameters ---------- data : Any The given data array for a single time-step. mask : None or bool or array of bool, optional Mask defining the target domain on the same grid as the data, by default None Returns ------- bool Whether the data covers the desired domain. Raises ------ ValueError When mask is given and mask and data don't share the same shape. """ if not _is_single_mask_value(mask) and np.shape(mask) != np.shape(data): raise ValueError("check_data_covers_domain: mask and data shape differ.") if not has_masked_values(data): return True if _is_single_mask_value(mask): return bool(mask) return np.all(mask[data.mask])
def _is_single_mask_value(mask): return mask is None or mask is np.ma.nomask or mask is False or mask is True
[docs] def masks_compatible( this, incoming, incoming_donwstream, this_grid=None, incoming_grid=None ): """ Check if an incoming mask is compatible with a given mask. Parameters ---------- this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None mask specification to check against incoming : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None incoming mask to check for compatibility incoming_donwstream : bool Whether the incoming mask is from downstream data this_grid : Grid or NoGrid or None, optional grid for first mask (to check shape and value equality) incoming_grid : Grid or NoGrid or None, optional grid for second mask (to check shape and value equality) Returns ------- bool mask compatibility """ if incoming_donwstream: upstream, downstream = this, incoming up_grid, down_grid = this_grid, incoming_grid else: upstream, downstream = incoming, this up_grid, down_grid = incoming_grid, this_grid # None is incompatible if upstream is None: return False # Mask.FLEX accepts anything, Mask.NONE only Mask.NONE if not mask_specified(downstream): if not mask_specified(upstream): return downstream == Mask.FLEX or upstream == Mask.NONE return downstream == Mask.FLEX # if mask is specified, upstream mask must also be specified if not mask_specified(upstream): return False # if both mask given, compare them return masks_equal(downstream, upstream, down_grid, up_grid)
[docs] def masks_equal(this, other, this_grid=None, other_grid=None): """ Check two masks for equality. Parameters ---------- this : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None first mask other : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` or None second mask this_grid : Grid or NoGrid or None, optional grid for first mask (to check shape and value equality) other_grid : Grid or NoGrid or None, optional grid for second mask (to check shape and value equality) Returns ------- bool mask equality """ if this is None and other is None: return True if not mask_specified(this) and not mask_specified(other): return this == other # need a valid mask at this point if not np.ma.is_mask(this) or not np.ma.is_mask(other): return False # special treatment of "nomask" if this is np.ma.nomask: if other is np.ma.nomask: return True return not np.any(other) if other is np.ma.nomask: return not np.any(this) # compare masks if not np.ndim(this) == np.ndim(other): return False # mask shape is grid specific (reversed axes, decreasing axis) if this_grid is None or other_grid is None: return True this = this_grid.to_canonical(this) other = other_grid.to_canonical(other) if not np.all(np.shape(this) == np.shape(other)): return False return np.all(this == other)
def is_sub_mask(mask, submask): """ Check for a sub-mask. Parameters ---------- mask : arraylike The original mask. submask : arraylike The potential submask. Returns ------- bool Whether 'submask' is a sub-mask of 'mask'. """ if not np.ma.is_mask(mask) or not np.ma.is_mask(submask): return False if mask is np.ma.nomask: return True if submask is np.ma.nomask: return not np.any(mask) if not np.ndim(mask) == np.ndim(submask): return False if not np.all(np.shape(mask) == np.shape(submask)): return False return np.all(submask[mask])
[docs] def mask_specified(mask): """ Determine whether given mask selection indicates a masked array. Parameters ---------- mask : :any:`Mask` value or valid boolean mask for :any:`MaskedArray` mask to check Returns ------- bool False if mask is Mask.FLEX or Mask.NONE, True otherwise """ return not any(mask is val for val in list(Mask))