"""
Implementations of IOutput
"""
import logging
import os
from datetime import datetime
import numpy as np
from ..data import tools
from ..data.tools import Info
from ..errors import (
FinamDataError,
FinamMetaDataError,
FinamNoDataError,
FinamStaticDataError,
FinamTimeError,
)
from ..interfaces import IAdapter, IInput, IOutput, Loggable
from ..tools.log_helper import ErrorLogger
# pylint: disable=too-many-public-methods
[docs]
class Output(IOutput, Loggable):
"""Default output implementation."""
def __init__(self, name=None, info=None, static=False, **info_kwargs):
Loggable.__init__(self)
self._targets = []
self.data = []
self._output_info = None
self.base_logger_name = None
if name is None:
raise ValueError("Output: needs a name.")
self._name = name
self._static = static
if info_kwargs:
if info is not None:
raise ValueError("Output: can't use **kwargs in combination with info")
info = Info(**info_kwargs)
if info is not None:
self.push_info(info)
self._connected_inputs = {}
self._out_infos_exchanged = 0
self._time = None
self._mem_limit = None
self._mem_location = None
self._total_mem = 0
self._mem_counter = 0
@property
def name(self):
"""Input name."""
return self._name
@property
def time(self):
"""The output's time of the latest available data"""
return self._time
@property
def is_static(self):
"""Whether the output is static"""
return self._static
@property
def info(self):
"""Info: The input's data info."""
if self._output_info is None:
raise FinamNoDataError("No data info available")
if self.has_targets and self._out_infos_exchanged < len(self._connected_inputs):
raise FinamNoDataError("Data info was not completely exchanged yet")
return self._output_info
@property
def needs_pull(self):
"""bool: if the output needs pull."""
return False
@property
def needs_push(self):
"""bool: if the output needs push."""
return True
@property
def memory_limit(self):
"""The memory limit for this slot"""
return self._mem_limit
@memory_limit.setter
def memory_limit(self, limit):
"""The memory limit for this slot"""
self._mem_limit = limit
@property
def memory_location(self):
"""The memory-mapping location for this slot"""
return self._mem_location
@memory_location.setter
def memory_location(self, directory):
"""The memory-mapping location for this slot"""
self._mem_location = directory
[docs]
def has_info(self):
"""Returns if the output has a data info.
The info is not required to be validly exchanged.
"""
return self._output_info is not None
[docs]
def add_target(self, target):
"""Add a target input or adapter for this output.
Parameters
----------
target : :class:`.IInput`
The target to add.
"""
self.logger.trace("add target")
if not isinstance(target, IInput):
with ErrorLogger(self.logger):
raise ValueError("Only IInput can added as target for IOutput")
self._targets.append(target)
@property
def targets(self):
"""Get target inputs and adapters for this output.
Returns
-------
list
List of targets.
"""
return self._targets
[docs]
def pinged(self, source):
"""Called when receiving a ping from a downstream input."""
if not isinstance(source, IAdapter) and source in self._connected_inputs:
with ErrorLogger(self.logger):
raise ValueError(
f"Input '{source.name}' is already connected to this output"
)
self._connected_inputs[source] = None
[docs]
def push_data(self, data, time):
"""Push data into the output.
Should notify targets, and can handle the provided date.
Parameters
----------
data : array_like
Data set to push.
time : :class:`datetime <datetime.datetime>`
Simulation time of the data set.
"""
if not self.has_targets:
self.logger.trace("skipping push to unconnected output")
return
self.logger.trace("push data")
with ErrorLogger(self.logger):
_check_time(time, self.is_static)
if self.has_targets and self._out_infos_exchanged < len(self._connected_inputs):
raise FinamNoDataError("Can't push data before output info was exchanged.")
if self.is_static:
if len(self.data) > 0:
raise FinamStaticDataError(
"Can't push data repeatedly to a static output."
)
time = None
with ErrorLogger(self.logger):
xdata, conv = tools.prepare(data, self.info, report_conversion=True)
if len(self.data) > 0 and not isinstance(self.data[-1][1], str):
d = self.data[-1][1]
if np.may_share_memory(d.data, xdata.data):
raise FinamDataError(
"Received data that shares memory with previously received data."
)
if conv is not None:
self.logger.profile(
"converted units from %s to %s (%d entries)", *conv, xdata.size
)
xdata = self._pack(xdata)
self.data.append((time, xdata))
self._time = time
self.logger.trace("data cache: %d", len(self.data))
self.notify_targets(time)
[docs]
def push_info(self, info):
"""Push data info into the output.
Parameters
----------
info : :class:`.Info`
Delivered data info
"""
self.logger.trace("push info")
if not isinstance(info, Info):
with ErrorLogger(self.logger):
raise FinamMetaDataError("Metadata must be of type Info")
self._output_info = info
[docs]
def notify_targets(self, time):
"""Notify all targets by calling their ``source_updated(time)`` method.
Parameters
----------
time : :class:`datetime <datetime.datetime>`
Simulation time of the simulation.
"""
self.logger.trace("notify targets")
with ErrorLogger(self.logger):
_check_time(time, self.is_static)
for target in self._targets:
target.source_updated(time)
[docs]
def get_data(self, time, target):
"""Get the output's data-set for the given time.
Parameters
----------
time : :class:`datetime <datetime.datetime>`
simulation time to get the data for.
target : :class:`.IInput` or None
Requesting end point of this pull.
Returns
-------
:class:`pint.Quantity`
data-set for the requested time.
Raises
------
FinamNoDataError
Raises the error if no data is available
"""
self.logger.trace("get data")
with ErrorLogger(self.logger):
_check_time(time, self.is_static)
if self._output_info is None:
raise FinamNoDataError(f"No data info available in {self.name}")
if self._out_infos_exchanged < len(self._connected_inputs):
raise FinamNoDataError(f"Data info was not yet exchanged in {self.name}")
if len(self.data) == 0:
raise FinamNoDataError(f"No data available in {self.name}")
with ErrorLogger(self.logger):
data = (
self._unpack(self.data[0][1])
if self.is_static
else self._interpolate(time)
)
if not self.is_static:
data_count = len(self.data)
self._clear_data(time, target)
if len(self.data) < data_count:
self.logger.trace(
"reduced data cache: %d -> %d", data_count, len(self.data)
)
return data
def _pack(self, data):
data_size = data.nbytes
if self.memory_limit is not None and 0 <= self.memory_limit < (
self._total_mem + data_size
):
fn = os.path.join(
self.memory_location or "", f"{id(self)}-{self._mem_counter}.npy"
)
self.logger.profile(
"dumping data to file %s (total RAM %0.2f MB)",
fn,
self._total_mem / 1048576,
)
self._mem_counter += 1
np.save(fn, data.magnitude)
return fn
self._total_mem += data_size
self.logger.trace(
"keeping data in RAM (total RAM %0.2f MB)", self._total_mem / 1048576
)
return data
def _unpack(self, where):
if isinstance(where, str):
self.logger.profile("reading data from file %s", where)
data = np.load(where, allow_pickle=True)
return tools.UNITS.Quantity(data, self.info.units)
return where
def _clear_data(self, time, target):
self._connected_inputs[target] = time
if any(t is None for t in self._connected_inputs.values()):
return
t_min = min(self._connected_inputs.values())
while len(self.data) > 1 and self.data[1][0] <= t_min:
d = self.data.pop(0)
if isinstance(d[1], str):
os.remove(d[1])
else:
self._total_mem -= d[1].nbytes
[docs]
def finalize(self):
"""Finalize the output"""
for _t, d in self.data:
if isinstance(d, str):
os.remove(d)
self.data.clear()
def _interpolate(self, time):
if time < self.data[0][0] or time > self.data[-1][0]:
raise FinamTimeError(
f"Requested time {time} out of range [{self.data[0][0]}, {self.data[-1][0]}]"
)
for i, (t, data) in enumerate(self.data):
if time > t:
continue
if time == t:
return self._unpack(data)
t_prev, data_prev = self.data[i - 1]
diff = t - t_prev
t_half = t_prev + diff / 2
if time < t_half:
return self._unpack(data_prev)
return self._unpack(data)
raise FinamTimeError(
f"Time interpolation failed. This should not happen and is probably a bug. "
f"Time is {time}."
)
[docs]
def get_info(self, info):
"""Exchange and get the output's data info.
For internal use. To get the info in a component, use property :attr:`.info`.
Parameters
----------
info : :class:`Info`
Requested data info
Returns
-------
dict
Delivered data info
Raises
------
FinamNoDataError
Raises the error if no info is available
"""
self.logger.trace("get info")
if self._output_info is None:
raise FinamNoDataError("No data info available")
fail_info = {}
if not self._output_info.accepts(info, fail_info, ignore_none=True):
fail_info = "\n".join(
[
f"{name} - got {got}, expected {exp}"
for name, (got, exp) in fail_info.items()
]
)
with ErrorLogger(self.logger):
raise FinamMetaDataError(
f"Can't accept conflicting data infos. Failed entries:\n{fail_info}"
)
with ErrorLogger(self.logger):
if self._output_info.grid is None:
if info.grid is None:
raise FinamMetaDataError(
"Can't set property `grid` from target info, as it is not provided"
)
self._output_info.grid = info.grid
if self._output_info.time is None:
if not self.is_static and info.time is None:
raise FinamMetaDataError(
"Can't set property `time` from target info, as it is not provided"
)
self._output_info.time = info.time
for k, v in self._output_info.meta.items():
if v is None:
if k not in info.meta or info.meta[k] is None:
raise FinamMetaDataError(
f"Can't set property `meta.{k}` from target info, as it is not provided"
)
self._output_info.meta[k] = info.meta[k]
self._out_infos_exchanged += 1
return self._output_info
[docs]
def chain(self, other):
"""Chain outputs and adapters.
Parameters
----------
other : :class:`.IInput` or :class:`.IAdapter`
The adapter or input to add as target to this output.
Returns
-------
:class:`.IOutput`
The last element of the chain.
"""
self.logger.trace("chain")
self.add_target(other)
other.source = self
return other
@property
def has_targets(self):
"""Flag if this output instance has any targets."""
return bool(self._targets)
@property
def logger_name(self):
"""Logger name derived from base logger name and class name."""
base_logger = logging.getLogger(self.base_logger_name)
# logger hierarchy indicated by "." in name
return ".".join(([base_logger.name, "->", self.name]))
@property
def uses_base_logger_name(self):
"""Whether this class has a ``base_logger_name`` attribute. True."""
return True
[docs]
class CallbackOutput(Output):
"""Output implementation calling a callback when pulled.
Use for components without time step.
Parameters
----------
callback : callable
A callback ``callback(data, time)``, returning the transformed data.
"""
def __init__(self, callback, name, info=None, **info_kwargs):
super().__init__(name=name, info=info, static=False, **info_kwargs)
self.callback = callback
self.last_data = None
@property
def needs_push(self):
"""bool: if the output needs push."""
return False
@property
def needs_pull(self):
"""bool: if the output needs pull."""
return True
[docs]
def push_data(self, data, time):
raise NotImplementedError("CallbackInput does not support push of data")
[docs]
def get_data(self, time, target):
"""Get the output's data-set for the given time.
Parameters
----------
time : :class:`datetime <datetime.datetime>`
Simulation time to get the data for.
target : :class:`.IInput`
Requesting end point of this pull
Returns
-------
:class:`pint.Quantity`
Data-set for the requested time.
Raises
------
FinamNoDataError
Raises the error if no data is available
"""
self.logger.trace("get data")
with ErrorLogger(self.logger):
_check_time(time, False)
if self._output_info is None:
raise FinamNoDataError(f"No data info available in {self.name}")
if self._out_infos_exchanged < len(self._connected_inputs):
raise FinamNoDataError(f"Data info was not yet exchanged in {self.name}")
data = self.callback(self, time)
if data is None:
raise FinamNoDataError(f"No data available in {self.name}")
with ErrorLogger(self.logger):
xdata, conv = tools.prepare(data, self.info, report_conversion=True)
if self.last_data is not None and np.may_share_memory(
tools.get_magnitude(self.last_data), tools.get_magnitude(xdata)
):
raise FinamDataError(
"Received data that shares memory with previously received data."
)
if conv is not None:
self.logger.profile(
"converted units from %s to %s (%d entries)", *conv, xdata.size
)
self.last_data = xdata
return xdata
[docs]
def finalize(self):
"""Finalize the output"""
def _check_time(time, is_static):
if is_static:
if time is not None and not isinstance(time, datetime):
raise ValueError("Time must be of type datetime or None in static outputs")
else:
if not isinstance(time, datetime):
raise ValueError("Time must be of type datetime")