Source code for finam.adapters.time_integration

"""Adapters for time integration"""
from abc import ABC
from datetime import timedelta

from ..data import tools
from ..errors import FinamNoDataError, FinamTimeError
from ..tools.log_helper import ErrorLogger
from .time import TimeCachingAdapter, check_time, interpolate


class TimeIntegrationAdapter(TimeCachingAdapter, ABC):
    """Abstract base class for time integration adapters."""

    def __init__(self):
        super().__init__()
        self._prev_time = None
        self._info = None

    def _source_updated(self, time):
        """Informs the input that a new output is available.

        Parameters
        ----------
        time : datetime
            Simulation time of the notification.
        """
        check_time(self.logger, time)

        data = tools.strip_time(self.pull_data(time, self), self._input_info.grid)
        self.data.append((time, self._pack(data)))

        if self._prev_time is None:
            self._prev_time = time

    def _get_data(self, time, _target):
        """Get the output's data-set for the given time.

        Parameters
        ----------
        time : datetime
            simulation time to get the data for.

        Returns
        -------
        array_like
            data-set for the requested time.
        """
        if len(self.data) == 0:
            raise FinamNoDataError(f"No data available in {self.name}")

        check_time(self.logger, time, (self.data[0][0], self.data[-1][0]))

        sum_value = self._interpolate(time)
        self._clear_cached_data(self._prev_time)
        self._prev_time = time
        return sum_value


# pylint: disable=too-many-ancestors
[docs] class AvgOverTime(TimeIntegrationAdapter): """Aggregates data over time to form the temporal average over the last pull time step. Output is the average height of the Area under Curve (AuC) between the last and the current pull (vertical lines): .. plot:: api/plots/integration-methods.py Illustration of time integration. Can use step-wise or linear interpolation between push time steps. .. plot:: api/plots/interpolation-methods.py Illustration of interpolation methods. Parameters ---------- step : float, optional Value in range [0, 1] that determines the relative step position. Linear interpolation is used if set to ``None`` (the default). * For a value of 0.0, the new value is returned for any dt > 0.0. * For a value of 1.0, the old value is returned for any dt <= 1.0. * Values between 0.0 and 1.0 shift the step between the first and the second time. * A value of 0.5 results in nearest interpolation. See also -------- .adapters.LinearTime : Linear time interpolation. .adapters.StepTime : Step-wise time interpolation. .adapters.SumOverTime : Sum aggregation over time. Examples -------- .. testcode:: constructor import finam as fm adapter = fm.adapters.AvgOverTime() """ def __init__(self, step=None): super().__init__() self._prev_time = None self._step = step self._info = None def _interpolate(self, time): if len(self.data) == 1: return self._unpack(self.data[0][1]) if time <= self.data[0][0]: return self._unpack(self.data[0][1]) sum_value = None t_old, v_old = self.data[0] v_old = self._unpack(v_old) for i in range(len(self.data) - 1): t_new, v_new = self.data[i + 1] v_new = self._unpack(v_new) if self._prev_time >= t_new: t_old, v_old = t_new, v_new continue if time <= t_old: break time_range = t_new - t_old dt1 = max((self._prev_time - t_old) / time_range, 0.0) dt2 = min((time - t_old) / time_range, 1.0) if self._step is None: v1 = interpolate(v_old, v_new, dt1) v2 = interpolate(v_old, v_new, dt2) value = (dt2 - dt1) * 0.5 * (v1 + v2) else: dt1_c = min(dt1, self._step) dt2_c = max(self._step, dt2) value = (min(self._step, dt2) - dt1_c) * v_old + ( dt2_c - max(self._step, dt1) ) * v_new value *= time_range.total_seconds() * tools.UNITS.Unit("s") sum_value = value if sum_value is None else sum_value + value t_old, v_old = t_new, v_new dt = time - self._prev_time if dt.total_seconds() > 0: sum_value /= dt.total_seconds() * tools.UNITS.Unit("s") else: with ErrorLogger(self.logger): raise FinamTimeError( "Can't calculate average over zero-length time duration." ) return sum_value
# pylint: disable=too-many-ancestors
[docs] class SumOverTime(TimeIntegrationAdapter): """Aggregates data over time to form the temporal sum (area under curve) over the last pull time step. Output is the Area under Curve (AuC) between the last and the current pull (vertical lines): .. plot:: api/plots/integration-methods.py Illustration of time integration. Can use step-wise or linear interpolation between push time steps. .. plot:: api/plots/interpolation-methods.py Illustration of interpolation methods. Parameters ---------- step : float, optional Value in range [0, 1] that determines the relative step position. Defaults to 0.0, which means values are interpreted as constant over each time step. Linear interpolation is used if set to ``None``. * For a value of 0.0, the new value is returned for any dt > 0.0. * For a value of 1.0, the old value is returned for any dt <= 1.0. * Values between 0.0 and 1.0 shift the step between the first and the second time. * A value of 0.5 results in nearest interpolation. per_time : bool, optional Whether the input data is time-normalized. Use ``True`` for input units like mm/d, e.g. for precipitation. Data per step is multiplied with step length, with time cancelling out: mm/d becomes mm. Use ``False`` for absolute amount, e.g. mm for the precipitation of the last time step. Output units will be the same as input units. Duration of time steps is not explicitly taken into account. Examples: * ``per_time=True``, value=1mm/d, step=2x5d --> 10 mm * ``per_time=True``, value=1mm, step=2x5d --> 10mm*d * ``per_time=False``, value=1mm/d, step=2x5d --> 2mm/d * ``per_time=False``, value=1mm, step=2x5d --> 2mm initial_interval: :class:`datetime <datetime.datetime>`, optional Time scaling duration for the initial data, if ``per_time=True``. Defaults to 0 days. See also -------- .adapters.LinearTime : Linear time interpolation. .adapters.StepTime : Step-wise time interpolation. .adapters.AvgOverTime : Average aggregation over time. Examples -------- .. testcode:: constructor import finam as fm adapter = fm.adapters.SumOverTime() """ def __init__(self, step=0.0, per_time=True, initial_interval=timedelta(0)): super().__init__() self._prev_time = None self._step = step self._per_time = per_time self._initial_interval = initial_interval self._info = None def _interpolate(self, time): if len(self.data) == 1 or time <= self.data[0][0]: if self._per_time: return ( self._unpack(self.data[0][1]) * (self._initial_interval.total_seconds() * tools.UNITS.Unit("s")) ).to_reduced_units() return self._unpack(self.data[0][1]) sum_value = None t_old, v_old = self.data[0] v_old = self._unpack(v_old) for i in range(len(self.data) - 1): t_new, v_new = self.data[i + 1] v_new = self._unpack(v_new) if self._prev_time >= t_new: t_old, v_old = t_new, v_new continue if time <= t_old: break time_range = t_new - t_old dt1 = max((self._prev_time - t_old) / time_range, 0.0) dt2 = min((time - t_old) / time_range, 1.0) if self._step is None: v1 = interpolate(v_old, v_new, dt1) v2 = interpolate(v_old, v_new, dt2) value = (dt2 - dt1) * 0.5 * (v1 + v2) else: dt1_c = min(dt1, self._step) dt2_c = max(self._step, dt2) value = (min(self._step, dt2) - dt1_c) * v_old + ( dt2_c - max(self._step, dt1) ) * v_new if self._per_time: value *= time_range.total_seconds() * tools.UNITS.Unit("s") sum_value = value if sum_value is None else sum_value + value t_old, v_old = t_new, v_new if self._per_time: return sum_value.to_reduced_units() return sum_value
[docs] def _get_info(self, info): if self._per_time: up_info = info.copy_with(units=None) else: up_info = info.copy_with() in_info = self.exchange_info(up_info) units = in_info.units if self._per_time: units *= tools.UNITS.Unit("s") out_info = in_info.copy_with(units=(1.0 * units).to_reduced_units().units) else: out_info = in_info.copy_with() self._info = out_info return out_info