"""
Abstract base implementations for components with and without time step.
"""
import collections
import logging
from abc import ABC
from datetime import datetime
from enum import IntEnum
from typing import final
from ..errors import FinamLogError, FinamStatusError, FinamTimeError
from ..interfaces import (
ComponentStatus,
IComponent,
IInput,
IOutput,
ITimeComponent,
Loggable,
)
from ..tools.connect_helper import ConnectHelper
from ..tools.enum_helper import get_enum_value
from ..tools.log_helper import ErrorLogger, is_loggable
from .input import Input
from .output import Output
[docs]
class Component(IComponent, Loggable, ABC):
"""Abstract component implementation.
Extend this class for components without time step.
See :doc:`/finam-book/development/special_components`.
For components with a time step, use :class:`.TimeComponent`.
Derived classes overwrite these methods:
* :meth:`._initialize`
* :meth:`._connect`
* :meth:`._validate`
* :meth:`._update`
* :meth:`._finalize`
"""
def __init__(self):
Loggable.__init__(self)
self._name = self.__class__.__name__
self._status = ComponentStatus.CREATED
self._inputs = IOList(self, "INPUT")
self._outputs = IOList(self, "OUTPUT")
self.base_logger_name = None
self._connector: ConnectHelper = None
[docs]
def with_name(self, name):
"""Renames the component and returns self."""
self._name = name
return self
[docs]
@final
def initialize(self):
"""Initialize the component.
After the method call, the component's inputs and outputs must be available,
and the component should have :attr:`.status` :attr:`.ComponentStatus.INITIALIZED`.
"""
self.logger.debug("init")
self._initialize()
self.inputs.frozen = True
self.outputs.frozen = True
if self.status != ComponentStatus.FAILED:
self.status = ComponentStatus.INITIALIZED
[docs]
def _initialize(self):
"""Initialize the component.
Components must overwrite this method.
After the method call, the component's inputs and outputs must be available.
"""
raise NotImplementedError(
f"Method `_initialize` must be implemented by all components, but implementation is missing in {self.name}."
)
[docs]
@final
def connect(self, start_time):
"""Connect exchange data and metadata with linked components.
The method can be called multiple times if there are failed pull attempts.
After each method call, the component should have :attr:`.status` :attr:`.ComponentStatus.CONNECTED` if
connecting was completed, :attr:`.ComponentStatus.CONNECTING` if some but not all required initial input(s)
could be pulled, and :attr:`.ComponentStatus.CONNECTING_IDLE` if nothing could be pulled.
Parameters
----------
start_time : :class:`datetime <datetime.datetime>`
The composition's starting time.
Can be before the component's actual time.
"""
if start_time is not None and not isinstance(start_time, datetime):
raise FinamTimeError("Time in connect must be either None or a datetime")
if self.status == ComponentStatus.INITIALIZED:
self.logger.debug("connect: ping phase")
for _, inp in self.inputs.items():
inp.ping()
self.status = ComponentStatus.CONNECTING
else:
self.logger.debug("connect")
self._connect(start_time)
[docs]
def _connect(self, start_time):
"""Connect exchange data and metadata with linked components.
Components must overwrite this method.
Parameters
----------
start_time : :class:`datetime <datetime.datetime>`
The composition's starting time.
Can be before the component's actual time.
Should be passed to :meth:`.try_connect` calls.
"""
raise NotImplementedError(
f"Method `_connect` must be implemented by all components, but implementation is missing in {self.name}."
)
[docs]
@final
def validate(self):
"""Validate the correctness of the component's settings and coupling.
After the method call, the component should have :attr:`.status` :attr:`.ComponentStatus.VALIDATED`.
"""
self.logger.debug("validate")
self._validate()
if self.status != ComponentStatus.FAILED:
self.status = ComponentStatus.VALIDATED
[docs]
def _validate(self):
"""Validate the correctness of the component's settings and coupling.
Components should overwrite this method.
"""
self.logger.debug("Method `_validate` not implemented by user.")
[docs]
@final
def update(self):
"""Update the component by one time step.
Push new values to outputs.
After the method call, the component should have :attr:`.status`
:attr:`.ComponentStatus.UPDATED` or :attr:`.ComponentStatus.FINISHED`.
"""
if isinstance(self, ITimeComponent):
self.logger.debug("update - current time: %s", self.time)
else:
self.logger.debug("update")
self._update()
if self.status not in (ComponentStatus.FAILED, ComponentStatus.FINALIZED):
self.status = ComponentStatus.UPDATED
[docs]
def _update(self):
"""Update the component by one time step.
Push new values to outputs.
Components must overwrite this method.
"""
raise NotImplementedError(
f"Method `_update` must be implemented by all components, but implementation is missing in {self.name}."
)
[docs]
@final
def finalize(self):
"""Finalize and clean up the component.
After the method call, the component should have :attr:`.status` :attr:`.ComponentStatus.FINALIZED`.
"""
self.logger.debug("finalize")
self._finalize()
for _n, out in self.outputs.items():
out.finalize()
if self.status != ComponentStatus.FAILED:
self.status = ComponentStatus.FINALIZED
[docs]
def _finalize(self):
"""Finalize and clean up the component.
Components should overwrite this method.
"""
self.logger.debug("Method `_finalize` not implemented by user.")
@property
def inputs(self):
"""dict: The component's inputs."""
return self._inputs
@property
def outputs(self):
"""dict: The component's outputs."""
return self._outputs
@property
def status(self):
"""The component's current status."""
return self._status
@status.setter
def status(self, status):
"""The component's current status."""
self._status = status
@property
def name(self):
"""Component name."""
return self._name
@property
def metadata(self):
"""
The component's metadata.
Will only be called after the connect phase from :attr:`Composition.metadata`.
Components can overwrite this property to add their own specific metadata:
.. testcode:: metadata
import finam as fm
class MyComponent(fm.Component):
@property
def metadata(self):
# Get the default metadata
md = super().metadata
# Add your own metadata
md["my_field"] = "some value"
# Return the dictionary
return md
.. testcode:: metadata
:hide:
comp = MyComponent()
md = comp.metadata
Returns
-------
dict
A ``dict`` with the following default metadata:
- ``name`` - the component's name
- ``class`` - the component's class
- ``inputs`` - ``dict`` of metadata for all inputs
- ``outputs`` - ``dict`` of metadata for all outputs
"""
inputs = {}
outputs = {}
for name, inp in self.inputs.items():
inputs[name] = {
"name": name,
"class": inp.__class__.__module__ + "." + inp.__class__.__qualname__,
"is_static": inp.is_static,
"info": inp.info.as_dict(),
}
for name, out in self.outputs.items():
outputs[name] = {
"name": name,
"class": out.__class__.__module__ + "." + out.__class__.__qualname__,
"is_static": out.is_static,
"has_targets": out.has_targets,
"info": out.info.as_dict(),
}
return {
"name": self.name,
"class": self.__class__.__module__ + "." + self.__class__.__qualname__,
"inputs": inputs,
"outputs": outputs,
}
@property
def logger_name(self):
"""Logger name derived from base logger name and class name."""
base_logger = logging.getLogger(self.base_logger_name)
# logger hierarchy indicated by "." in name
return ".".join(([base_logger.name, self.name]))
@property
def uses_base_logger_name(self):
"""Whether this class has a ``base_logger_name`` attribute. True."""
return True
@property
def connector(self):
"""The component's :class:`.tools.ConnectHelper`.
See also :meth:`.create_connector` and :meth:`.try_connect`.
"""
return self._connector
[docs]
def create_connector(
self, pull_data=None, in_info_rules=None, out_info_rules=None, cache=True
):
"""Initialize the component's :class:`.tools.ConnectHelper`.
See also :meth:`.try_connect`, :attr:`.connector` and :class:`.ConnectHelper` for details.
Parameters
----------
pull_data : arraylike
Names of the inputs that are to be pulled.
in_info_rules : dict
Info transfer rules for inputs. See the examples for details.
See also :class:`.tools.FromInput`, :class:`.tools.FromOutput` and :class:`.tools.FromValue`.
out_info_rules : dict
Info transfer rules for outputs. See the examples for details.
See also :class:`.tools.FromInput`, :class:`.tools.FromOutput` and :class:`.tools.FromValue`.
cache : bool
Whether data and :class:`.Info` objects passed via :meth:`try_connect() <.Component.try_connect>`
are cached for later calls. Default ``True``.
Examples
--------
The following examples show the usage of this method in :meth:`._initialize`.
.. testsetup:: *
import finam as fm
import datetime as dt
self = fm.modules.CallbackComponent(
inputs={},
outputs={},
callback=lambda inp, _t: {},
start=dt.datetime(2000, 1, 1),
step=dt.timedelta(days=1),
)
Simple usage if no input data or any metadata from connected components is required:
.. testcode:: create-connector-simple
self.inputs.add(name="In", time=self.time, grid=fm.NoGrid())
self.outputs.add(name="Out", time=self.time, grid=fm.NoGrid())
self.create_connector()
To pull specific inputs, use ``pull_data`` like this:
.. testcode:: create-connector-pull
self.inputs.add(name="In1", time=self.time, grid=fm.NoGrid())
self.inputs.add(name="In2", time=self.time, grid=fm.NoGrid())
self.create_connector(pull_data=["In1", "In2"])
With the ``in_info_rules`` and ``out_info_rules``, metadata can be transferred between coupling slots.
Here, the metadata for an output is taken from an input:
.. testcode:: create-connector-in-to-out
self.inputs.add(name="In", time=self.time, grid=None, units=None)
self.outputs.add(name="Out")
self.create_connector(
out_info_rules={
"Out": [
fm.tools.FromInput("In")
]
}
)
The :class:`.Info` object for output ``Out`` will be created and pushed automatically in :meth:`.try_connect`
as soon as the metadata for ``In`` becomes available.
Here, the metadata of an output is composed from the metadata of two inputs and a user-defined value:
.. testcode:: create-connector-in-to-out-multi
self.inputs.add(name="In1", time=self.time, grid=None, units=None)
self.inputs.add(name="In2", time=self.time, grid=None, units=None)
self.outputs.add(name="Out")
self.create_connector(
out_info_rules={
"Out": [
fm.tools.FromInput("In1", ["time", "grid"]),
fm.tools.FromInput("In2", ["units"]),
fm.tools.FromValue("source", "FINAM"),
]
}
)
The :class:`.Info` object for output ``Out`` would be automatically composed in :meth:`.try_connect`
as soon as the infos of both inputs become available.
``time`` and ``grid`` would be taken from ``In1``, ``units`` from ``In2``,
and ``source`` would be set to ``"finam"``.
Rules are evaluated in the given order. Later rules can overwrite attributes set by earlier rules.
"""
self.logger.trace("create connector")
self._connector = ConnectHelper(
self.logger_name,
self.inputs,
self.outputs,
pull_data=pull_data,
in_info_rules=in_info_rules,
out_info_rules=out_info_rules,
cache=cache,
)
self.inputs.frozen = True
self.outputs.frozen = True
[docs]
def try_connect(
self, start_time, exchange_infos=None, push_infos=None, push_data=None
):
"""Exchange the info and data with linked components.
Values passed by the arguments are cached internally for later calls to the method
if the connector was created with ``cache=True`` (the default).
Thus, it is sufficient to provide only data and infos that became newly available.
Giving the same data or infos repeatedly overwrites the cache.
Sets the component's :attr:`.status` according to success of exchange.
See also :meth:`.create_connector`, :attr:`.connector` and :class:`.ConnectHelper` for details.
Parameters
----------
start_time : :class:`datetime <datetime.datetime>`
the composition's starting time as passed to :meth:`.Component.try_connect`
exchange_infos : dict of [str, Info]
currently or newly available input data infos by input name
push_infos : dict of [str, Info]
currently or newly available output data infos by output name
push_data : dict of [str, array-like]
currently or newly available output data by output name
"""
self.logger.trace("try connect")
if self._connector is None:
raise FinamStatusError(
f"No connector in component {self.name}. Call `create_connector()` in `_initialize()`."
)
self.status = self._connector.connect(
start_time,
exchange_infos=exchange_infos,
push_infos=push_infos,
push_data=push_data,
)
self.logger.trace("try_connect status is %s", self.status)
[docs]
def __getitem__(self, name):
"""Get an input or output by name. Implements access through square brackets.
Allows for the use of ``comp["Name"]`` as shortcut for ``comp.inputs["Name"]`` and ``comp.outputs["Name"]``.
Requires that the name does not appear in inputs as well as outputs.
Returns
-------
:class:`.IInput` or :class:`.IOutput`
The slot with the given name
Raises
------
KeyError
If the name occurs in the inputs as well as the outputs,
or neither in the inputs nor the outputs.
"""
if name in self.inputs:
if name in self.outputs:
msg = f"Name `{name}` exists in inputs as well as outputs of component {self.name}"
if self.status == ComponentStatus.CREATED:
raise KeyError(msg)
with ErrorLogger(self.logger):
raise KeyError(msg)
return self.inputs[name]
if name in self.outputs:
return self.outputs[name]
msg = f"Name `{name}` does not exist in inputs or outputs of component `{self.name}`"
if self.status == ComponentStatus.CREATED:
msg += " The component is not initialized. Did you miss to add it to the composition?"
raise KeyError(msg)
with ErrorLogger(self.logger):
raise KeyError(msg)
def __repr__(self):
return self.name
[docs]
class TimeComponent(ITimeComponent, Component, ABC):
"""Abstract component with time step implementation.
Extend this class for components with time step.
See :doc:`/finam-book/development/components`.
For components without a time step, use :class:`.Component`.
Derived classes overwrite these methods
* :meth:`._initialize`
* :meth:`._connect`
* :meth:`._validate`
* :meth:`._update`
* :meth:`._finalize`
* :meth:`._next_time`
"""
def __init__(self):
Component.__init__(self)
self._time = None
@property
def time(self):
"""The component's current simulation time."""
if self._time is None and self.status in (
ComponentStatus.CREATED,
ComponentStatus.INITIALIZED,
):
return None
if not isinstance(self._time, datetime):
with ErrorLogger(self.logger):
raise ValueError("Time must be of type datetime")
return self._time
@time.setter
def time(self, time):
if not isinstance(time, datetime):
with ErrorLogger(self.logger):
raise ValueError("Time must be of type datetime")
self._time = time
@property
def next_time(self):
"""The component's predicted simulation time of its next pulls.
Can be ``None`` if the component has no inputs.
"""
return self._next_time()
def _next_time(self):
raise NotImplementedError(
"Method `_next_time` must be implemented by all time components, "
f"but implementation is missing in {self.name}."
)
class IOType(IntEnum):
"""IOType of the IOList."""
INPUT = 0
OUTPUT = 1
class IOList(collections.abc.Mapping):
"""
Map for IO.
Parameters
----------
owner : :class:`.IComponent`
The owning component of this IOList
io_type : int, str, IOType
IO type. Either "INPUT" or "OUTPUT".
"""
def __init__(self, owner, io_type):
"""
_summary_
Parameters
----------
io_type : _type_
_description_
"""
self.owner = owner
self.type = get_enum_value(io_type, IOType)
self.cls = [Input, Output][self.type]
self.name = self.cls.__name__
self.icls = [IInput, IOutput][self.type]
self.iname = self.icls.__name__
self._dict = {}
self.frozen = False
def add(self, io=None, *, name=None, info=None, static=False, **info_kwargs):
"""
Add a new IO object either directly ob by attributes.
Parameters
----------
io : :class:`.IInput` or :class:`.IOutput`, optional
IO object to add, by default None
name : str, optional
Name of the new IO object to add, by default None
info : :class:`.Info`, optional
Info of the new IO object to add, by default None
static : bool, optional
Whether the new IO object in static, by default False
**info_kwargs
Optional keyword arguments to instantiate an Info object
Raises
------
ValueError
If io is not of the correct type.
"""
if self.frozen:
raise ValueError("IO.add: list is frozen.")
io = (
self.cls(name=name, info=info, static=static, **info_kwargs)
if io is None
else io
)
if not isinstance(io, self.icls):
raise ValueError(f"IO.add: {self.name} is not of type {self.iname}")
if io.name in self._dict:
raise ValueError(f"IO.add: {self.name} '{io.name}' already exists.")
self._dict[io.name] = io
@property
def names(self):
"""list: all IO names in this list."""
return list(self)
def set_logger(self, module):
"""
Set the logger in the items of the IOList.
Parameters
----------
module : :class:`.IComponent`
Module holding the IOList.
Raises
------
FinamLogError
When item is loggable but not the base module.
"""
for name, item in self.items():
if (
is_loggable(item)
and item.uses_base_logger_name
and not is_loggable(module)
):
mname = getattr(module, "name", None)
raise FinamLogError(
f"IO: {self.name} '{name}' can't get logger from '{mname}'."
)
if is_loggable(item) and item.uses_base_logger_name:
item.base_logger_name = module.logger_name
def __iter__(self):
return iter(self._dict)
def __len__(self):
return len(self._dict)
def __contains__(self, item):
return item in self._dict
def __getitem__(self, key):
"""Access an item by name."""
if key in self._dict:
return self._dict[key]
if self.owner is None:
raise KeyError(f"No {self.cls.__name__} `{key}` in unknown component.")
msg = f"No {self.cls.__name__} `{key}` in component `{self.owner.name}`."
if self.owner.status == ComponentStatus.CREATED:
msg += " The component is not initialized. Did you miss to add it to the composition?"
raise KeyError(msg)
with ErrorLogger(self.owner.logger):
raise KeyError(msg)
def __setitem__(self, key, value):
if self.frozen:
raise ValueError("IO: list is frozen.")
if key in self._dict:
raise ValueError(f"IO: {self.name} '{key}' already exists.")
if not isinstance(value, self.icls):
raise ValueError(f"IO: {self.name} is not of type {self.iname}")
if key != value.name:
raise ValueError(
f"IO: {self.name} name '{value.name}' differs from key '{key}'"
)
self._dict[key] = value
def __str__(self):
return str(self._dict)
def __repr__(self):
return repr(self._dict)