Source code for finam.sdk.adapter

"""
Abstract base implementation for adapters.
"""
import logging
from abc import ABC
from datetime import datetime
from typing import final

from ..data import tools
from ..data.tools import Info
from ..errors import FinamLogError, FinamMetaDataError, FinamTimeError
from ..interfaces import IAdapter, IOutput, ITimeDelayAdapter
from ..tools.log_helper import ErrorLogger, is_loggable
from .input import Input
from .output import Output


[docs] class Adapter(IAdapter, Input, Output, ABC): """Abstract adapter implementation. Extend this class for adapters. See :doc:`/finam-book/development/adapters`. Simple derived classes overwrite :meth:`._get_data`. Adapters that alter the metadata can intercept it in :meth:`._get_info` For time-dependent adapters with push-functionality, also overwrite the following: * :meth:`._source_updated` * :attr:`.needs_push` """ def __init__(self): Input.__init__(self, name=self.__class__.__name__) Output.__init__(self, name=self.__class__.__name__) self._name = self.__class__.__name__ self._source = None self._targets = []
[docs] def with_name(self, name): """Renames the adapter and returns self.""" self._name = name return self
@property def time(self): """The output's time of the latest available data""" return None @property def is_static(self): return False @final @property def info(self): return self._output_info @property def needs_pull(self): """bool: if the adapter needs pull.""" return False @property def needs_push(self): """bool: if the adapter needs push.""" return False @property def metadata(self): """ The adapter's metadata. Will only be called after the connect phase from :attr:`Composition.metadata`. Adapters can overwrite this property to add their own specific metadata: .. testcode:: metadata import finam as fm class MyAdapter(fm.Adapter): @property def metadata(self): # Get the default metadata md = super().metadata # Add your own metadata md["my_field"] = "some value" # Return the dictionary return md .. testcode:: metadata :hide: ada = MyAdapter() Returns ------- dict A ``dict`` with the following default metadata: - ``name`` - the component's name - ``class`` - the component's class """ meta = { "name": self.name, "class": self.__class__.__module__ + "." + self.__class__.__qualname__, "out_info": self._output_info.as_dict(), } if self._input_info is not None: meta["in_info"] = self._input_info.as_dict() return meta
[docs] @final def push_data(self, data, time): """Push data into the output. Should notify targets, and can handle the provided date. Parameters ---------- data : array_like Data set to push. time : :class:`datetime <datetime.datetime>` Simulation time of the data set. """ self.logger.debug("push data") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") self.notify_targets(time)
[docs] @final def source_updated(self, time): """Informs the input that a new output is available. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time of the notification. """ self.logger.debug("source updated") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") self._source_updated(time) self.notify_targets(time)
[docs] def notify_targets(self, time): """Notify all targets by calling their ``source_updated(time)`` method. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time of the simulation. """ self.logger.trace("notify targets") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise ValueError("Time must be of type datetime") for target in self.targets: target.source_updated(time)
[docs] def _source_updated(self, time): """Informs the input that a new output is available. Adapters can overwrite this method to handle incoming data. Adapters that make use of this method to accumulate data should also overwrite :attr:`.needs_push` to return ``True``. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time of the notification. """
[docs] def get_data(self, time, target): """Get the transformed data of this adapter. Internally calls :meth:`._get_data`. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time to get the data for. target : :class:`.IInput` Requesting end point of this pull. Returns ------- :class:`pint.Quantity` Transformed data-set for the requested time. """ self.logger.debug("get data") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise FinamTimeError("Time must be of type datetime") data = self._get_data(time, target) with ErrorLogger(self.logger): xdata, conv = tools.prepare(data, self._output_info, report_conversion=True) if conv is not None: self.logger.profile( "converted units from %s to %s (%d entries)", *conv, xdata.size ) return xdata
[docs] def _get_data(self, time, target): """Get the transformed data of this adapter. Adapters must overwrite this method. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time to get the data for. target : :class:`.IInput` Requesting end point of this pull. Returns ------- :class:`pint.Quantity` Transformed data-set for the requested time. """ raise NotImplementedError( f"Method `_get_data` must be implemented by all adapters, but implementation is missing in {self.name}." )
[docs] def get_info(self, info): """Exchange and get the output's data info. Parameters ---------- info : :class:`.Info` Requested data info Returns ------- Info Delivered data info Raises ------ FinamNoDataError Raises the error if no info is available """ self.logger.trace("get info") self._output_info = self._get_info(info) return self._output_info
[docs] def _get_info(self, info): """Exchange and get the output's data info. Adapters can overwrite this method to manipulate the metadata for the output. Parameters ---------- info : :class:`.Info` Requested data info Returns ------- Info Delivered data info """ return self.exchange_info(info)
[docs] def pinged(self, source): """Called when receiving a ping from a downstream input.""" self._source.pinged(self if self.needs_push else source)
[docs] @final def exchange_info(self, info=None): """Exchange the data info with the input's source. Parameters ---------- info : :class:`.Info` request parameters Returns ------- Info delivered parameters """ self.logger.trace("exchanging info") with ErrorLogger(self.logger): if info is None: raise FinamMetaDataError("No metadata provided") if not isinstance(info, Info): raise FinamMetaDataError("Metadata must be of type Info") in_info = self._source.get_info(info) self._input_info = in_info self._output_info = in_info return in_info
@property def source(self): """Get the input's source output or adapter Returns ------- :class:`.IOutput` The input's source. """ return super().source @source.setter def source(self, source): """Set the adapter input's source output or adapter Parameters ---------- source : source output or adapter """ self.logger.trace("set source") # fix to set base-logger for adapters derived from Input source logger if self.uses_base_logger_name and not is_loggable(source): with ErrorLogger(self.logger): raise FinamLogError( f"Adapter '{self.name}' can't get base logger from its source." ) else: self.base_logger_name = source.logger_name with ErrorLogger(self.logger): if self._source is not None: raise ValueError( "Source of input is already set! " "(You probably tried to connect multiple outputs to a single input)" ) if not isinstance(source, IOutput): raise ValueError("Only IOutput can be set as source for Input") self._source = source @property def logger_name(self): """Logger name derived from source logger name and class name.""" base_logger = logging.getLogger(self.base_logger_name) return ".".join(([base_logger.name, " >> ", self.name]))
[docs] def finalize(self): """Called at the end of each run. Calls :meth:`._finalize`.""" self.logger.debug("finalize") self._finalize()
[docs] def _finalize(self): """Called at the end of each run. Overwrite this for cleanup."""
[docs] class TimeDelayAdapter(Adapter, ITimeDelayAdapter, ABC): """Base class for adapters that delay/offset time to resolve dependency cycles.""" def __init__(self): super().__init__() self.initial_time = None
[docs] def get_data(self, time, target): """Get the transformed data of this adapter. Internally calls :meth:`._get_data`. Parameters ---------- time : :class:`datetime <datetime.datetime>` Simulation time to get the data for. target : :class:`.IInput` Requesting end point of this pull. Returns ------- :class:`pint.Quantity` Transformed data-set for the requested time. """ self.logger.debug("get data") if time is not None and not isinstance(time, datetime): with ErrorLogger(self.logger): raise FinamTimeError("Time must be of type datetime") new_time = self.with_delay(time) data = self._get_data(new_time, target) self._pulled(time) with ErrorLogger(self.logger): xdata, conv = tools.prepare(data, self._output_info, report_conversion=True) if conv is not None: self.logger.profile( "converted units from %s to %s (%d entries)", *conv, xdata.size ) return xdata
[docs] def _get_data(self, time, target): """Get the output's data-set for the given time. Parameters ---------- time : :class:`datetime <datetime.datetime>` simulation time to get the data for. Returns ------- array_like data-set for the requested time. """ d = self.pull_data(time, target) return d
[docs] def _pulled(self, time): """This method is called during pulls, with the original pull time. Can be overwritten to store the original pull time, as in :meth:`._get_data()` only the manipulated time is available. Called after :meth:`._get_data()` (i.e. after the actual pull). Parameters ---------- time : :class:`datetime <datetime.datetime>` The original (requested) time of the current pull. """
[docs] def get_info(self, info): """Exchange and get the output's data info. Parameters ---------- info : :class:`.Info` Requested data info Returns ------- dict Delivered data info Raises ------ FinamNoDataError Raises the error if no info is available """ self.logger.trace("get info") self._output_info = self._get_info(info) self.initial_time = self._output_info.time return self._output_info