"""
NetCDF reader components.
"""
from __future__ import annotations
import finam as fm
from netCDF4 import Dataset
from .tools import (
create_time_dim,
extract_data,
extract_info,
extract_time,
extract_variables,
)
[docs]
class NetCdfStaticReader(fm.Component):
"""
NetCDF reader component that reads a single 2D data array per output at startup.
Usage:
.. testcode:: constructor
from finam_netcdf import Variable, NetCdfStaticReader
path = "tests/data/lai.nc"
# automatically determine data variables
reader = NetCdfStaticReader(path)
# explicit data variables
reader = NetCdfStaticReader(path, [Variable("lai", slices={"time": 0})])
.. testcode:: constructor
:hide:
reader.initialize()
Parameters
----------
path : str
Path to the NetCDF file to read.
outputs : list of Variable or str
List of outputs. Output is either defined by name or a :class:`Variable` instance.
By default all NetCDF variables found in the file.
"""
def __init__(self, path, outputs=None):
super().__init__()
self.path = path
self.variables = outputs
self.dataset = None
self._infos = None
self._data = None
self.status = fm.ComponentStatus.CREATED
def _initialize(self):
self.dataset = Dataset(self.path)
self.variables = extract_variables(
self.dataset, self.variables, only_static=True
)
for var in self.variables:
self.outputs.add(name=var.io_name, static=True)
self.create_connector()
def _connect(self, start_time):
if self._infos is None:
self._data = {}
self._infos = {}
for var in self.variables:
self._infos[var.io_name] = extract_info(self.dataset, var)
self._data[var.io_name] = extract_data(self.dataset, var)
self.try_connect(start_time, push_infos=self._infos, push_data=self._data)
if self.status == fm.ComponentStatus.CONNECTED:
del self._data
del self._infos
self.dataset.close()
del self.dataset
def _validate(self):
pass
def _update(self):
pass
def _finalize(self):
pass
[docs]
class NetCdfReader(fm.TimeComponent):
"""
NetCDF reader component that steps along a date/time coordinate dimension of a dataset.
Usage:
.. testcode:: constructor
from finam_netcdf import Variable, NetCdfReader
path = "tests/data/lai.nc"
# automatically determine data variables
reader = NetCdfReader(path)
# explicit data variables
reader = NetCdfReader(path, outputs=["lai"])
# explicit data variables with additional information
reader = NetCdfReader(path, outputs=[Variable("lai", slices={"time": 0})])
.. testcode:: constructor
:hide:
reader.initialize()
Parameters
----------
path : str
Path to the NetCDF file to read.
outputs : list of str or Variable, optional
List of outputs. Output is either defined by name or a :class:`Variable` instance.
By default all NetCDF variables found in the file.
time_limits : tuple (datetime.datetime, datetime.datetime), optional
Tuple of start and end datetime (both inclusive)
time_callback : callable, optional
An optional callback for time stepping and indexing:
(step, last_time, last_index) -> (time, index)
"""
def __init__(
self,
path,
outputs=None,
time_limits=None,
time_callback=None,
):
super().__init__()
self.path = path
self.variables = outputs
self.time_var = None
self.time_callback = time_callback
self.time_limits = time_limits
self.dataset = None
self._init_data = {}
self.output_infos = {}
self.times = None
self.time_index = None
self.time_indices = None
self.step = 0
self.data_pushed = False
self._status = fm.ComponentStatus.CREATED
@property
def next_time(self):
return None
def _initialize(self):
self.dataset = Dataset(self.path)
self.time_var = extract_time(self.dataset)
self.variables = extract_variables(self.dataset, self.variables)
for var in self.variables:
self.outputs.add(name=var.io_name, static=var.static)
self._process_initial_data()
self.create_connector()
def _connect(self, start_time):
if self.data_pushed:
self.try_connect(start_time)
else:
self.data_pushed = True
self.try_connect(
start_time,
push_data=self._init_data,
push_infos=self.output_infos,
)
if self.status == fm.ComponentStatus.CONNECTED:
del self._init_data
def _process_initial_data(self):
if self.time_var is not None:
self.times = create_time_dim(self.dataset, self.time_var)
if self.time_limits is None:
self.time_indices = list(range(len(self.times)))
else:
self.time_indices = []
mn, mx = self.time_limits
for index, time in enumerate(self.times):
if (mn is None or time >= mn) and (mx is None or time <= mx):
self.time_indices.append(index)
for i in range(len(self.times) - 1):
if self.times[i] >= self.times[i + 1]:
msg = f"NetCDF reader requires time dimension '{self.time_var}' to be in ascending order."
raise ValueError(msg)
if self.time_callback is None:
self.time_index = 0
self._time = self.times[self.time_indices[self.time_index]]
else:
self._time, self.time_index = self.time_callback(self.step, None, None)
else:
self.time_indices, self.time_index = [0], 0
for var in self.variables:
info = extract_info(self.dataset, var, self._time)
data = extract_data(
self.dataset, var, self.time_var, self.time_indices[self.time_index]
)
self._init_data[var.io_name] = data
self.output_infos[var.io_name] = info
def _validate(self):
pass
def _update(self):
self.step += 1
if self.time_callback is None:
self.time_index += 1
else:
self._time, self.time_index = self.time_callback(
self.step, self._time, self.time_index
)
# this also catches the case for no time dimension
if self.time_index >= len(self.time_indices):
# for a "static reader" don't set status to finished
if self.time_var is not None:
self._status = fm.ComponentStatus.FINISHED
return
if self.time_callback is None:
self._time = self.times[self.time_indices[self.time_index]]
for var in self.variables:
if var.static:
continue
data = fm.UNITS.Quantity(
extract_data(
self.dataset, var, self.time_var, self.time_indices[self.time_index]
),
self.output_infos[var.io_name].units,
)
self._outputs[var.io_name].push_data(data, self._time)
def _finalize(self):
self.dataset.close()