Source code for finam.modules.debug

"""Generic component with arbitrary inputs and extensive debug logging."""
import logging
from datetime import datetime, timedelta

from ..data import tools
from ..interfaces import ComponentStatus, IInput
from ..sdk import CallbackInput, Component, TimeComponent
from ..tools.date_helper import is_timedelta


[docs] class DebugConsumer(TimeComponent): """Generic component with arbitrary inputs and extensive debug logging. .. code-block:: text +---------------+ --> [custom] | | --> [custom] | DebugConsumer | --> [......] | | +---------------+ Examples -------- .. testcode:: constructor import datetime as dt import finam as fm component = fm.modules.DebugConsumer( inputs={ "A": fm.Info(time=None, grid=fm.NoGrid()), "B": fm.Info(time=None, grid=fm.NoGrid()), }, callbacks={ "A": lambda n, d, t: print(t) }, start=dt.datetime(2000, 1, 1), step=dt.timedelta(days=7), log_data="INFO", strip_data=False, ) .. testcode:: constructor :hide: component.initialize() Parameters ---------- inputs : dict[str, Info] Dictionary of input names and infos. callbacks : dict[str, callable] Dictionary of optional input callbacks: callable(name, data, time). start : :class:`datetime <datetime.datetime>` Starting time step : :class:`timedelta <datetime.timedelta>` or :class:`relativedelta <dateutil.relativedelta.relativedelta>` Time step log_data : int or str or bool, optional Log level for printing received data, like "DEBUG" or "INFO". Default ``False``, logs nothing. ``True`` uses "INFO". strip_data : bool, optional Strips data before logging. Default ``True``. """ def __init__( self, inputs, start, step, callbacks=None, log_data=False, strip_data=True ): super().__init__() if not isinstance(start, datetime): raise ValueError("Start must be of type datetime") if not is_timedelta(step): raise ValueError("Step must be of type timedelta or relativedelta") self._strip_data = strip_data self._log_data = None if isinstance(log_data, bool): if log_data: self._log_data = logging.INFO else: self._log_data = logging.getLevelName(log_data) self._input_infos = inputs self._callbacks = callbacks or {} self._step = step self._time = start self._data = {} @property def data(self): """dict[str, data] : The component's input data from the last time step""" return self._data @property def next_time(self): return self.time + self._step
[docs] def _initialize(self): for name, info in self._input_infos.items(): info.time = self.time self.inputs.add(name=name, info=info) self.create_connector(pull_data=list(self._input_infos.keys()))
[docs] def _connect(self, start_time): self.try_connect(start_time) for name, info in self.connector.in_infos.items(): if info is not None: self.logger.debug("Exchanged input info for %s", name) for name, data in self.connector.in_data.items(): if data is not None: self.logger.debug("Pulled input data for %s", name) if self._log_data is not None: pdata = ( tools.strip_time(data, self.inputs[name].info.grid) if self._strip_data else data ) self.logger.log( self._log_data, 'Received "%s" - %s: %s', name, self._time, pdata, ) if name in self._callbacks: self._callbacks[name](name, data, self._time) self._data[name] = data
[docs] def _validate(self): pass
[docs] def _update(self): self._time += self._step self._data = { n: self.inputs[n].pull_data(self.time) for n in self._input_infos.keys() } for name, data in self._data.items(): if self._log_data is not None: pdata = ( tools.strip_time(data, self.inputs[name].info.grid) if self._strip_data else data ) self.logger.log( self._log_data, 'Received "%s" - %s: %s', name, self._time, pdata, ) if name in self._callbacks: self._callbacks[name](name, data, self._time)
[docs] def _finalize(self): pass
[docs] class DebugPushConsumer(Component): """Generic component with arbitrary inputs and extensive debug logging. Push-based. .. code-block:: text +-------------------+ --> [custom] | | --> [custom] | DebugPushConsumer | --> [......] | | +-------------------+ Examples -------- .. testcode:: constructor import datetime as dt import finam as fm component = fm.modules.DebugPushConsumer( inputs={ "A": fm.Info(time=None, grid=fm.NoGrid()), "B": fm.Info(time=None, grid=fm.NoGrid()), }, callbacks={ "A": lambda n, d, t: print(t) }, log_data="INFO", strip_data=False, ) .. testcode:: constructor :hide: component.initialize() Parameters ---------- inputs : dict[str, Info] Dictionary of input names and infos. callbacks : dict[str, callable] Dictionary of optional input callbacks: callable(name, data, time). log_data : int or str or bool, optional Log level for printing received data, like "DEBUG" or "INFO". Default ``False``, logs nothing. ``True`` uses "INFO". strip_data : bool, optional Strips data before logging. Default ``True``. """ def __init__(self, inputs, callbacks=None, log_data=False, strip_data=True): super().__init__() self._strip_data = strip_data self._log_data = None if isinstance(log_data, bool): if log_data: self._log_data = logging.INFO else: self._log_data = logging.getLevelName(log_data) self._input_infos = inputs self._callbacks = callbacks or {} self._data = {} @property def data(self): """dict[str, data] : The component's input data from the last time step""" return self._data
[docs] def _initialize(self): for name, info in self._input_infos.items(): self.inputs.add( CallbackInput(callback=self._data_pushed, name=name, info=info) ) self.create_connector()
[docs] def _connect(self, start_time): self.try_connect(start_time) for name, info in self.connector.in_infos.items(): if info is not None: self.logger.debug("Exchanged input info for %s", name)
[docs] def _validate(self): pass
[docs] def _update(self): pass
[docs] def _finalize(self): pass
def _data_pushed(self, caller, time): data = caller.pull_data(time) self._data[caller.name] = data if self._log_data is not None: pdata = ( tools.strip_time(data, caller.info.grid) if self._strip_data else data ) self.logger.log( self._log_data, 'Received "%s" - %s: %s', caller.name, time, pdata, ) if caller.name in self._callbacks: self._callbacks[caller.name](caller.name, data, time)
[docs] class ScheduleLogger(Component): """Logging of module update schedule. Takes inputs of arbitrary types and simply logs the time of notifications of each input as an ASCII graph. .. code-block:: text +----------------+ --> [custom] | | --> [custom] | ScheduleLogger | --> [......] | | +----------------+ Note: This component is push-based without an internal time step. Examples -------- .. testcode:: constructor from datetime import timedelta import finam as fm schedule = fm.modules.ScheduleLogger( inputs={"Grid1": True, "Grid2": True}, time_step=timedelta(days=1), log_level="DEBUG", stdout=True, ) .. testcode:: constructor :hide: schedule.initialize() .. |relativedelta| replace:: :class:`relativedelta <dateutil.relativedelta.relativedelta>` Parameters ---------- inputs : dict of str, bool Input names and whether to pull data from them when notified. Pulling is useful for correct output behaviour when clearing the data cache. time_step : :class:`timedelta <datetime.timedelta>` or |relativedelta|, optional Time per character in the ASCII graph. Default 1 day. log_level : str or int, optional Log level for the ASCII graph. Default "INFO". stdout : bool Prints the ASCII graphs to stdout. Useful for piping to file and/or for documentation """ def __init__( self, inputs, time_step=timedelta(days=1), log_level="INFO", stdout=False ): super().__init__() self._pull_inputs = inputs self._time_step = time_step self._log_level = logging.getLevelName(log_level) self._stdout = stdout self._schedule = None self._output_map = None
[docs] def _initialize(self): for inp in self._pull_inputs: self.inputs.add( CallbackInput(self._data_changed, name=inp, time=None, grid=None) ) self.create_connector( pull_data=[inp for inp, pull in self._pull_inputs.items() if pull] )
[docs] def _connect(self, start_time): self.try_connect(start_time)
[docs] def _validate(self): pass
def _data_changed(self, caller, time): self._update_schedule(caller, time)
[docs] def _update(self): pass
def _update_schedule(self, caller, time): if self._schedule is None: self._schedule = {inp: [] for _, inp in self.inputs.items()} self._output_map = {} for _, inp in self.inputs.items(): out = inp while isinstance(out, IInput): out = out.source self._output_map[inp] = out self._schedule[caller].append(time) if self._pull_inputs[caller.name]: _data = caller.pull_data(time) if self.status == ComponentStatus.VALIDATED: self._print_schedule(caller) def _print_schedule(self, caller): t_min = min(t[0] for _, t in self._schedule.items() if len(t) > 0) t_max = max(t[-1] for _, t in self._schedule.items() if len(t) > 0) t_diff = t_max - t_min num_char = int(t_diff / self._time_step) + 1 self.logger.log(self._log_level, "input updated") if self._stdout: print("") max_name_len = max(len(inp.name) for inp in self._schedule) for inp, times in self._schedule.items(): if len(times) == 0: continue s = [" "] * num_char out = self._output_map[inp] data_cache = getattr(out, "data", []) data_len = len(data_cache) i_min = len(times) - data_len prev = 0 for i, t in enumerate(times): d = t - t_min pos = int(d / self._time_step) for j in range(prev, pos): s[j] = "-" s[pos] = "o" if i >= i_min else "x" prev = pos + 1 s = "".join(s) if inp == caller: s += " <-" s = f"{inp.name.ljust(max_name_len)} {s}" self.logger.log(self._log_level, s) if self._stdout: print(s)