Source code for finam.modules.control

"""Components for controlling data flow"""
import datetime as dt

from ..data.grid_spec import NoGrid
from ..errors import FinamMetaDataError
from ..sdk import TimeComponent
from ..tools.connect_helper import FromInput, FromOutput
from ..tools.log_helper import ErrorLogger


[docs] class TimeTrigger(TimeComponent): """Component to forward data in regular time intervals. Can be used to enable coupling of pull-based source and push-based target components, that would not work due to the dead link otherwise. .. code-block:: text +-------------+ | | --> [In] | TimeTrigger | [Out] --> | | +-------------+ The typical usage is shown in the example below. Here, the component will retrieve metadata from it's input, for all metadata fields that are ```None``. Because ``out_info`` is ``None``, the received metadata is then forwarded to the output. The same mechanism can be used in the other direction, by specifying ``out_info`` but not ``in_info`` in the method call. Examples -------- .. testcode:: constructor import datetime as dt import finam as fm component = fm.modules.TimeTrigger( in_info=fm.Info(time=None, grid=None, units=None), start=dt.datetime(2000, 1, 1), step=dt.timedelta(days=1), ) .. testcode:: constructor :hide: component.initialize() Parameters ---------- start : :class:`datetime <datetime.datetime>` Starting time. Can be ``None`` to retrieve it from linked components. See parameter ``start_from_input`` for details. step : :class:`timedelta <datetime.timedelta>` or :class:`relativedelta <dateutil.relativedelta.relativedelta>` The component's time step in_info : :class:`.Info`, optional Input info, optional. However, one of ``in_info`` or ``out_info`` must be given. ``time`` is ignored and can be set to ``None``. out_info : :class:`.Info`, optional Output info, optional. However, one of ``in_info`` or ``out_info`` must be given. ``time`` is ignored and can be set to ``None``. start_from_input : bool, optional Whether to get the starting time from the input, instead of the output. Default ``True``. If ``start`` is ``None``, the component can try to retrieve the starting time either from the input or from the output. The respective linked component should have an internal time step. If both linked components have no time step, ``start`` must be given. """ def __init__(self, start, step, in_info=None, out_info=None, start_from_input=True): super().__init__() self._ini_in_info = in_info self._ini_out_info = out_info self._start = start if self._start is not None: self.time = self._start self._step = step self._start_from_input = start_from_input @property def next_time(self): return self.time + self._step
[docs] def _initialize(self): with ErrorLogger(self.logger): if self._ini_in_info is None and self._ini_out_info is None: raise FinamMetaDataError( "At least one of input or output info must be given." ) if self._start is None: if self._start_from_input and self._ini_in_info is None: raise FinamMetaDataError( "Can't get starting time from the input without an input info." ) if not self._start_from_input and self._ini_out_info is None: raise FinamMetaDataError( "Can't get starting time from the output without an output info." ) if self._ini_in_info is not None: self._ini_in_info.time = self._start if self._ini_out_info is not None: self._ini_out_info.time = self._start in_info_rules = {} out_info_rules = {} if self._start is None: if self._start_from_input: self.inputs.add(name="In", info=self._ini_in_info) self.outputs.add(name="Out") out_info_rules["Out"] = [FromInput("In")] else: self.inputs.add(name="In") self.outputs.add(name="Out", info=self._ini_out_info) in_info_rules["In"] = [FromOutput("Out")] else: self.inputs.add(name="In", info=self._ini_in_info) self.outputs.add(name="Out", info=self._ini_out_info) if self._ini_out_info is None: out_info_rules["Out"] = [FromInput("In")] if self._ini_in_info is None: in_info_rules["In"] = [FromOutput("Out")] self.create_connector( pull_data=["In"], in_info_rules=in_info_rules, out_info_rules=out_info_rules, )
[docs] def _connect(self, start_time): out_data = {} if ( not self.connector.data_pushed["Out"] and self.connector.in_data["In"] is not None ): out_data["Out"] = self.connector.in_data["In"] self.try_connect(start_time, push_data=out_data) in_info = self.connector.in_infos["In"] if in_info is not None: self.time = in_info.time
[docs] def _validate(self): pass
[docs] def _update(self): self.time += self._step data = self.inputs["In"].pull_data(self.time) self.outputs["Out"].push_data(data, self.time)
[docs] def _finalize(self): pass
[docs] class UserControl(TimeComponent): """Component to allow users to step a simulation. Prompts for input on the console. Users can just press ENTER to proceed by one step, or enter a target time in ISO format. .. code-block:: text +-------------+ | | | UserControl | [Counter] --> | | +-------------+ Examples -------- .. testcode:: constructor import datetime as dt import finam as fm component = fm.modules.UserControl( start=dt.datetime(2000, 1, 1), step=dt.timedelta(days=1), ) .. testcode:: constructor :hide: component.initialize() .. |relativedelta| replace:: :class:`relativedelta <dateutil.relativedelta.relativedelta>` Parameters ---------- start : :class:`datetime <datetime.datetime>` Starting time. Can be ``None`` to retrieve it from linked components. step : :class:`timedelta <datetime.timedelta>` or |relativedelta|, optional The component's time step. Default 1 day. """ def __init__(self, start, step=dt.timedelta(days=1)): super().__init__() self.time = start self.step = step self._counter = 0 @property def next_time(self): return None
[docs] def _initialize(self): self.outputs.add(name="Counter", time=self.time, grid=NoGrid()) self.create_connector()
[docs] def _connect(self, start_time): push_data = {} if self.connector.data_required["Counter"]: push_data["Counter"] = self._counter self.try_connect(start_time, push_data=push_data)
[docs] def _validate(self): pass
[docs] def _update(self): self._counter += 1 self._prompt()
def _prompt(self): run_until = None inp = input(f"Time: {self.time} - Run until (ENTER to step): ") if inp == "": run_until = self.time + self.step else: try: run_until = dt.datetime.fromisoformat(inp) except ValueError: print(f"Not a time: '{inp}'.") if run_until is not None: self.time = run_until self.outputs["Counter"].push_data(self._counter, self.time)
[docs] def _finalize(self): pass