Writing adapters#
This chapter provides a step-by-step guide to implement adapters in pure Python. For writing Python bindings for other languages, see Python bindings.
Completing the chapter will result in two adapters called Scale
and TimeInterpolation
.
We will build up the adapters step by step, accompanied by some test code.
It is assumed that you have FINAM installed, as well as pytest
.
For adapter implementation examples, browse the source code of the included adapters in module adapters
.
The source code of each API entry is linked in it’s upper right corner under [source]
.
Set up a Python project#
Create the following project structure:
- dummy_adapters/
+- src/
Simple Scale
adapter#
This is a simple, purely pull-based adapter. When output is requested, it should simply pull from its input, transform and forward it.
We implement IAdapter
by extending Adapter
. We only need to overwrite its method Adapter._get_data()
,
which is called from downstream to request data.
File src/scale.py
:
import finam as fm
class Scale(fm.Adapter):
def __init__(self, scale):
super().__init__()
self.scale = scale
def _get_data(self, time, target):
d = self.pull_data(time, target)
return d * self.scale
In Adapter._get_data()
, we:
Pull the input for the requested
time
Multiply the input by
scale
and return the result
Time-dependent TimeInterpolation
adapter#
The purpose of this adapter is to do temporal interpolation between upstream time steps. As an example, there could be a model with a weekly time step that passes data to another model with a daily time step. Assuming continuous transitions of the modelled data, temporal interpolation between the weekly time steps is required.
^ V
| _.o----
| _.-´
| _.-´|
| _.-´ |
| V _.-´ |
| ----o´ |
+-------------------------------------> t
^
Here, a simple pull-based mechanism is not sufficient. The adapter needs to store each new data entry that becomes available, and calculate the interpolated data when requested.
Due to FINAM’s scheduling algorithm, it is guaranteed that the time stamp of any request lies in the interval of the previous two time steps of any other component (see Coupling and scheduling for details). Thus, it is not required to store data for more than two time stamps.
Accordingly, this is the constructor (file src/time_interpolation.py
):
import finam as fm
class TimeInterpolation(fm.Adapter):
def __init__(self):
super().__init__()
self.old_data = None
self.new_data = None
The adapter needs to react to downstream requests as well as to new data available upstream.
This functionality is provided by Adapter
’s methods Adapter._get_data()
and Adapter._source_updated()
, respectively.
import finam as fm
class TimeInterpolation(fm.Adapter):
def __init__(self):
super().__init__()
self.old_data = None
self.new_data = None
@property
def needs_push(self):
return True
def _source_updated(self, time):
pass
def _get_data(self, time, target):
pass
- Note
We need to overwrite
Adapter.needs_push
here, as the scheduler needs to know that the adapter won’t work in a purely pull-based setup.
In Adapter._source_updated()
, we need to store incoming data:
import finam as fm
class TimeInterpolation(fm.Adapter):
def __init__(self):
super().__init__()
self.old_data = None
self.new_data = None
@property
def needs_push(self):
return True
def _source_updated(self, time):
data = self.pull_data(time, self)
self.old_data = self.new_data
self.new_data = (time, data)
def _get_data(self, time, target):
pass
We “move” the previous new_data
to old_data
, and replace new_data
by the incoming data, as a (time, data)
tuple.
As the output time will differ from the input time, we need to strip the time off the data by calling data.strip_data()
.
In Adapter._get_data()
, we can now do the interpolation whenever data is requested from upstream.
import finam as fm
class TimeInterpolation(fm.Adapter):
def __init__(self):
super().__init__()
self.old_data = None
self.new_data = None
@property
def needs_push(self):
return True
def _source_updated(self, time):
data = self.pull_data(time, self)
self.old_data = self.new_data
self.new_data = (time, data)
def _get_data(self, time, _target):
if self.old_data is None:
if self.new_data is None:
raise fm.FinamNoDataError("No data available.")
else:
return self.new_data[1]
dt = (time - self.old_data[0]) / (self.new_data[0] - self.old_data[0])
o = self.old_data[1]
n = self.new_data[1]
return o + dt * (n - o)
In Adapter._get_data()
, the following happens:
If only one data entry was received so far, we can’t interpolate and simply return the available data. Otherwise…
Calculate
dt
as the relative position oftime
in the available data interval (in range [0, 1])Interpolate and return the data
Note that, although we use datetime
when calculating dt
, we get a scalar output.
Due to dt
being relative, time units cancel out here.