Source code for finam.modules.mergers

"""Pull-based components for merging multiple inputs into a single output"""
from finam.interfaces import ComponentStatus

from ..data.tools import compatible_units, strip_time
from ..errors import FinamMetaDataError
from ..sdk import CallbackOutput, Component
from ..tools.log_helper import ErrorLogger


[docs] class WeightedSum(Component): """Pull-based component to merge inputs by weighted sum. This component does not have an own time step. Execution is triggered by downstream pulls. .. code-block:: text +-------------------+ --> [<custom1> ] | | --> [<custom1>_weight] | | --> [<custom2> ] | CallbackComponent | [WeightedSum] --> --> [<custom2>_weight] | | --> [.... ] | | +-------------------+ Examples -------- .. testcode:: constructor import finam as fm component = fm.modules.WeightedSum(inputs=["A", "B", "C"]) # ... create and initialize composition # comp_1.outputs["Value"] >> component.inputs["A"] # comp_1.outputs["Weight"] >> component.inputs["A_weight"] # ... .. testcode:: constructor :hide: component.initialize() Parameters ---------- inputs : list(str) Base input names; will create two inputs for each entry: "<name>" and "<name>_weight" start : :class:`datetime <datetime.datetime>` Starting time, for initial data exchange grid : GridBase or None Expected input grid specification; tries to obtain grid specs from inputs if set to None """ def __init__(self, inputs, grid=None): super().__init__() self._input_names = inputs self._grid = grid self._units = None self._in_data = None self._out_data = None self._last_update = None
[docs] def _initialize(self): for name in self._input_names: self.inputs.add(name=name, time=None, grid=self._grid, units=None) self.inputs.add(name=name + "_weight", time=None, grid=self._grid, units="") self._grid = None self.outputs.add(CallbackOutput(callback=self._get_data, name="WeightedSum")) self.create_connector(pull_data=list(self.inputs))
[docs] def _connect(self, start_time): push_infos = self._check_infos() self.try_connect(start_time, push_infos=push_infos) if self.status == ComponentStatus.CONNECTED: # just to check for all inputs equal _push_infos = self._check_infos() if self.connector.all_data_pulled: self._in_data = self.connector.in_data
def _check_infos(self): push_infos = {} for name in self._input_names: info = self.connector.in_infos[name] if info is not None: if not self.connector.infos_pushed["WeightedSum"]: push_infos["WeightedSum"] = info.copy_with() self._check_grid(info) self._compatible_units(info) weight_info = self.connector.in_infos[name + "_weight"] if weight_info is not None: self._check_grid(weight_info) return push_infos def _check_grid(self, info): if self._grid is None: self._grid = info.grid else: if self._grid != info.grid: with ErrorLogger(self.logger): raise FinamMetaDataError("All inputs must have the same grid.") def _compatible_units(self, info): if self._units is None: self._units = info.units else: if not compatible_units(self._units, info.units): with ErrorLogger(self.logger): raise FinamMetaDataError( "All value inputs must have the same dimensions." )
[docs] def _validate(self): pass
[docs] def _update(self): pass
[docs] def _finalize(self): pass
[docs] def _get_data(self, _caller, time): if self._in_data is None: return None if time != self._last_update: if self.status == ComponentStatus.VALIDATED: self._in_data = { name: inp.pull_data(time) for name, inp in self.inputs.items() } result = None for name in self._input_names: value = strip_time(self._in_data[name], self._grid) weight = strip_time(self._in_data[name + "_weight"], self._grid) if result is None: result = value * weight else: result += value * weight self._out_data = result self._last_update = time return self._out_data