Source code for finam.modules.writers

"""
Modules for writing data.
"""
from datetime import datetime

import numpy as np

from ..data import tools as dtools
from ..data.grid_spec import NoGrid
from ..interfaces import ComponentStatus
from ..sdk import TimeComponent
from ..tools.date_helper import is_timedelta
from ..tools.log_helper import ErrorLogger


[docs] class CsvWriter(TimeComponent): """Writes CSV time series with one row per time step, from multiple inputs. Expects all inputs to be scalar values. .. code-block:: text +-----------+ --> [custom] | | --> [custom] | CsvWriter | --> [......] | | +-----------+ Examples -------- .. testcode:: constructor import datetime as dt import finam as fm writer = fm.modules.CsvWriter( path="test.csv", inputs=["A", "B", "C"], time_column="T", separator=",", start=dt.datetime(2000, 1, 1), step=dt.timedelta(days=1), ) .. testcode:: constructor :hide: writer.initialize() Parameters ---------- path : PathLike Path to the output file. inputs : list of str List of input names that will be written to file. start : :class:`datetime <datetime.datetime>` Starting time. step : :class:`timedelta <datetime.timedelta>` or :class:`relativedelta <dateutil.relativedelta.relativedelta>` Time step. time_column : str Time column name. Default "time" separator : str Column separator. Default ";" """ def __init__(self, path, start, step, inputs, time_column="time", separator=";"): super().__init__() with ErrorLogger(self.logger): if not isinstance(start, datetime): raise ValueError("Start must be of type datetime") if not is_timedelta(step): raise ValueError("Step must be of type timedelta or relativedelta") self._path = path self._step = step self._time = start self._time_column = time_column self._separator = separator self._input_names = inputs self._rows = [] @property def next_time(self): return self.time + self._step
[docs] def _initialize(self): """Initialize the component. After the method call, the component's inputs and outputs must be available, and the component should have status INITIALIZED. """ for inp in self._input_names: self.inputs.add(name=inp, time=None, grid=NoGrid(), units=None) self.create_connector(pull_data=self._input_names)
[docs] def _connect(self, start_time): """Push initial values to outputs. After the method call, the component should have status CONNECTED. """ self.try_connect(start_time) if self.status == ComponentStatus.CONNECTED: values = [ dtools.get_magnitude(dtools.strip_time(data, NoGrid())) for _, data in self.connector.in_data.items() ] self._update_rows(values)
[docs] def _validate(self): """Validate the correctness of the component's settings and coupling. After the method call, the component should have status VALIDATED. """
[docs] def _update(self): """Update the component by one time step. Push new values to outputs. After the method call, the component should have status UPDATED or FINISHED. """ self._time += self._step values = [ dtools.get_magnitude( dtools.strip_time(self.inputs[inp].pull_data(self.time), NoGrid()) ) for inp in self._input_names ] self._update_rows(values)
def _update_rows(self, values): with ErrorLogger(self.logger): for value, name in zip(values, self._input_names): dtools.assert_type(self, name, value.item(), [int, float]) self._rows.append([self.time.isoformat()] + values)
[docs] def _finalize(self): """Finalize and clean up the component. After the method call, the component should have status FINALIZED. """ np.savetxt( self._path, self._rows, fmt="%s", delimiter=self._separator, header=self._separator.join([self._time_column] + self._input_names), comments="", )