Source code for finam.data.tools.core

"""Core data tools for FINAM."""

import copy
import datetime

import numpy as np
import pandas as pd

from ...errors import FinamDataError
from .. import grid_spec
from ..grid_base import Grid
from .units import (
    UNITS,
    check_quantified,
    compatible_units,
    equivalent_units,
    get_units,
    is_quantified,
)

_BASE_DATETIME = datetime.datetime(1970, 1, 1)
_BASE_TIME = np.datetime64("1970-01-01T00:00:00")
_BASE_DELTA = np.timedelta64(1, "s")


[docs] def prepare(data, info, time_entries=1, force_copy=False, report_conversion=False): """ Prepares data in FINAM's internal transmission format. Checks tha shape of the data. Checks or adds units and time dimension. Parameters ---------- data : arraylike The input data. info : Info Info associated with the data. time_entries : int, optional Number of time slices in the data. Default 1. force_copy : bool, optional Forces the result to be a copy of the passed data. Default ``False``. If not used, the result is a view of the data if no units conversion needs to be done. report_conversion : bool, optional If true, returns a tuple with the second element indicating the unit conversion if it was required. Returns ------- pint.Quantity or tuple(pint.Quantity, tuple(pint.Unit, pint.Unit) or None) The prepared data as a numpy array, wrapped into a :class:`pint.Quantity`. If ``report_conversion`` is ``True``, a tuple is returned with the second element indicating the unit conversion if it was required. The second element is ``None`` if no conversion was required, and a tuple of two :class:`pint.Unit` objects otherwise. Raises ------ FinamDataError If the data doesn't match its info. """ units_converted = None units = info.units if is_quantified(data): if not compatible_units(data.units, units): raise FinamDataError( f"Given data has incompatible units. " f"Got {data.units}, expected {units}." ) if info.is_masked and not np.ma.isarray(data.magnitude): data = UNITS.Quantity( np.ma.array( data=data.magnitude, mask=info.mask, shrink=False, fill_value=info.fill_value, ), data.units, ) if not equivalent_units(data.units, units): units_converted = data.units, units data = data.to(units) elif force_copy: data = data.copy() else: if info.is_masked and not np.ma.isarray(data): data = UNITS.Quantity( np.ma.array( data=data, mask=info.mask, shrink=False, fill_value=info.fill_value, copy=force_copy, ), units, ) # this covers masked arrays as well elif isinstance(data, np.ndarray): if force_copy: data = data.copy() data = UNITS.Quantity(data, units) else: if force_copy: data = copy.copy(data) data = UNITS.Quantity(np.asarray(data), units) data = _check_input_shape(data, info, time_entries) if report_conversion: return data, units_converted return data
def _check_input_shape(data, info, time_entries): # check correct data size if isinstance(info.grid, Grid): time_entries = ( data.shape[0] if len(data.shape) == len(info.grid.data_shape) + 1 else time_entries ) data_size = data.size / time_entries if data_size != info.grid.data_size: raise FinamDataError( f"quantify: data size doesn't match grid size. " f"Got {data_size}, expected {info.grid.data_size}" ) # check shape of non-flat arrays if len(data.shape) != 1: if data.shape[1:] != info.grid.data_shape: if data.shape == info.grid.data_shape: data = np.expand_dims(data, 0) else: raise FinamDataError( f"quantify: data shape doesn't match grid shape. " f"Got {data.shape}, expected {info.grid.data_shape}" ) else: # reshape arrays if time_entries <= 1: data = data.reshape( [1] + list(info.grid.data_shape), order=info.grid.order ) else: data = data.reshape( [time_entries] + list(info.grid.data_shape), order=info.grid.order ) elif isinstance(info.grid, grid_spec.NoGrid): data = _check_input_shape_no_grid(data, info, time_entries) return data def _check_input_shape_no_grid(data, info, time_entries): if len(data.shape) != info.grid.dim + 1: if _no_grid_shape_valid(data.shape, info.grid): data = np.expand_dims(data, 0) else: raise FinamDataError( f"Data shape not valid. " f"Got {data.shape}, expected {info.grid.data_shape}" ) else: if not _no_grid_shape_valid(data.shape[1:], info.grid): raise FinamDataError( f"Data shape not valid. " f"Got {data.shape[1:]}, expected {info.grid.data_shape}" ) if data.shape[0] != time_entries: raise FinamDataError( f"Number of time entries in data doesn't match expected number. " f"Got {data.shape[0]}, expected {time_entries}" ) return data def _no_grid_shape_valid(data_shape, grid): if len(data_shape) != grid.dim: return False dshp = np.array(data_shape) gshp = np.array(grid.data_shape) fix_dims = gshp != -1 return np.all(dshp[fix_dims] == gshp[fix_dims])
[docs] def has_time_axis(xdata, grid): """ Check if the data array has a time axis. Parameters ---------- xdata : numpy.ndarray The given data array. grid : GridBase The associated grid specification Returns ------- bool Whether the data has a time axis. """ grid_dim = None if isinstance(grid, Grid): grid_dim = len(grid.data_shape) elif isinstance(grid, grid_spec.NoGrid): grid_dim = grid.dim else: raise ValueError( f"Expected type Grid or NoGrid, got {grid.__class__.__name__}." ) if xdata.ndim == grid_dim: return False if xdata.ndim == grid_dim + 1: return True raise FinamDataError("Data dimension must be grid dimension or grid dimension + 1.")
[docs] def to_datetime(date): """Converts a numpy datetime64 object to a python datetime object""" if np.isnan(date): return pd.NaT timestamp = (date - _BASE_TIME) / _BASE_DELTA if timestamp < 0: return _BASE_DATETIME + datetime.timedelta(seconds=timestamp) tz = datetime.timezone.utc return datetime.datetime.fromtimestamp(timestamp, tz).replace(tzinfo=None)
[docs] def strip_time(xdata, grid): """Returns a view of the data with the time dimension squeezed if there is only a single entry Parameters ---------- xdata : arraylike Data to strip time dimension from grid : GridBase The associated grid specification Returns ------- arraylike Stripped data Raises ------ FinamDataError If the data has multiple time entries. """ if has_time_axis(xdata, grid): if xdata.shape[0] > 1: raise FinamDataError( "Can't strip time of a data array with multiple time entries" ) return xdata[0, ...] return xdata
[docs] def full_like(xdata, value): """ Return a new data array with the same shape, type and units as a given object. Parameters ---------- xdata : :class:`pint.Quantity` or :class:`numpy.ndarray` The reference object input. value : scalar Value to fill the new object with before returning it. Returns ------- pint.Quantity or numpy.ndarray New object with the same shape and type as other, with the data filled with fill_value. Units will be taken from the input if present. """ data = np.full_like(xdata, value) if is_quantified(xdata): return UNITS.Quantity(data, xdata.units) return data
[docs] def full(value, info): """ Return a new data array with units according to the given info, filled with given value. Parameters ---------- value : scalar Value to fill the new object with before returning it. info : Info Info associated with the data. Returns ------- pint.Quantity The converted data. """ shape = info.grid.data_shape if isinstance(info.grid, Grid) else tuple() return prepare(np.full([1] + list(shape), value), info)
[docs] def check(xdata, info): """ Check if data matches given info. Parameters ---------- xdata : numpy.ndarray The given data array. info : Info Info associated with the data. Raises ------ FinamDataError If data doesn't match given info. """ check_quantified(xdata, "check") if not has_time_axis(xdata, info.grid): raise FinamDataError("check: given data should have a time dimension.") _check_shape(xdata.shape[1:], info.grid) # check units if not compatible_units(info.units, xdata): raise FinamDataError( f"check: given data has incompatible units. " f"Got {get_units(xdata)}, expected {info.units}." )
def _check_shape(shape, grid): if isinstance(grid, Grid) and shape != grid.data_shape: raise FinamDataError( f"check: given data has wrong shape. " f"Got {shape}, expected {grid.data_shape}" ) if isinstance(grid, grid_spec.NoGrid) and len(shape) != grid.dim: raise FinamDataError( f"check: given data has wrong number of dimensions. " f"Got {len(shape)}, expected {grid.dim}" )
[docs] def assert_type(cls, slot, obj, types): """Type assertion.""" for t in types: if isinstance(obj, t): return raise TypeError( f"Unsupported data type for {slot} in " f"{cls.__class__.__name__}: {obj.__class__.__name__}. " f"Expected one of [{', '.join([tp.__name__ for tp in types])}]" )