Source code for finam_netcdf.writer

"""
NetCDF writer components.
"""
from datetime import datetime, timedelta
from functools import partial

import finam as fm
from netCDF4 import Dataset, date2num

from .tools import create_nc_framework, create_variable_list


[docs] class NetCdfTimedWriter(fm.TimeComponent): """ NetCDF writer component that writes in predefined time intervals. Usage: .. testcode:: constructor from datetime import timedelta from finam_netcdf import NetCdfTimedWriter file = "tests/data/out.nc" writer = NetCdfTimedWriter(file, ["lai", "soil_moist"], step=timedelta(days=1)) .. testcode:: constructor :hide: writer.initialize() Parameters ---------- path : str Path to the NetCDF file to read. inputs : list of str or Variable. List of inputs. Input is either defined by name or a :class:`Variable` instance. step : datetime.timedelta Time step time_var : str or None, optional Name of the time coordinate. To create a static output file, set this to None. By default: "time" global_attrs : dict, optional global attributes for the NetCDF file inputed by the user. """ def __init__( self, path, inputs, step, time_var="time", global_attrs=None, ): super().__init__() if step is not None and not isinstance(step, timedelta): raise ValueError("Step must be None or of type timedelta") self._path = path self.variables = create_variable_list(inputs) for var in self.variables: if var.static is None: var.static = not bool(time_var) if var.slices: msg = f"NetCDF: writer got slices information for variable: f{var.name}" raise ValueError(msg) self._step = step self.time_var = time_var self.global_attrs = global_attrs or {} self.dataset = None self.timestamp_counter = 0 self.status = fm.ComponentStatus.CREATED if not isinstance(self.global_attrs, dict): raise ValueError("inputed global attributes must be of type dict") @property def next_time(self): return self.time + self._step def _initialize(self): for var in self.variables: grid = var.info_kwargs.get("grid", None) units = var.info_kwargs.get("units", None) self.inputs.add( name=var.io_name, time=self.time, grid=grid, units=units, static=var.static, **var.get_meta(), ) self.dataset = Dataset(self._path, "w") self.create_connector(pull_data=[var.io_name for var in self.variables]) def _connect(self, start_time): self.try_connect(start_time=start_time) if self.status != fm.ComponentStatus.CONNECTED: return self._time = start_time create_nc_framework( self.dataset, self.time_var, self._time, self._step, self.connector.in_infos, self.connector.in_data, self.variables, self.global_attrs, ) # adding time and var data to the first timestamp for var in self.variables: data = self.connector.in_data[var.io_name].magnitude if var.static: self.dataset[var.name][...] = data else: self.dataset[var.name][self.timestamp_counter, ...] = data if self.time_var: current_date = date2num(self._time, self.dataset[self.time_var].units) self.dataset[self.time_var][self.timestamp_counter] = current_date def _validate(self): pass def _update(self): self._time += self._step self.timestamp_counter += 1 if not self.time_var: return for var in self.variables: if var.static: continue data = self.inputs[var.io_name].pull_data(self._time).magnitude self.dataset[var.name][self.timestamp_counter, ...] = data current_date = date2num(self._time, self.dataset[self.time_var].units) self.dataset[self.time_var][self.timestamp_counter] = current_date def _finalize(self): self.dataset.close()
[docs] class NetCdfPushWriter(fm.Component): """ NetCDF writer component that writes on push to its inputs. Usage: .. testcode:: constructor from finam_netcdf import NetCdfPushWriter file = "tests/data/out.nc" writer = NetCdfPushWriter(file, ["lai", "soil_moisture"]) .. testcode:: constructor :hide: writer.initialize() Note that all data sources must have the same time step! Parameters ---------- path : str Path to the NetCDF file to read. inputs : list of str or Variable. List of inputs. Input is either defined by name or a :class:`Variable` instance. time_var : str Name of the time coordinate. time_unit : str, optional time unit given as a string: days, hours, minutes or seconds. global_attrs : dict, optional global attributes for the NetCDF file inputed by the user. """ def __init__( self, path, inputs, time_var="time", time_unit="seconds", global_attrs=None, ): super().__init__() self._path = path self.variables = create_variable_list(inputs) for var in self.variables: if var.static is None: var.static = False if var.static: msg = f"NetCDF: push writer got a static input: f{var.name}" raise ValueError(msg) if var.slices: msg = f"NetCDF: writer got slices information for variable: f{var.name}" raise ValueError(msg) self.time_var = time_var self.dataset = None self.timestamp_counter = 0 self.time_unit = time_unit self.global_attrs = global_attrs or {} self.last_update = None if not isinstance(self.global_attrs, dict): raise ValueError("Given global attributes must be of type dict.") self.all_inputs = set(var.io_name for var in self.variables) self.pushed_inputs = set() self._status = fm.ComponentStatus.CREATED def _initialize(self): for var in self.variables: grid = var.info_kwargs.get("grid", None) units = var.info_kwargs.get("units", None) self.inputs.add( io=fm.CallbackInput( name=var.io_name, callback=partial(self._data_changed, var.io_name), time=None, grid=grid, units=units, **var.get_meta(), ) ) self.dataset = Dataset(self._path, "w") self.create_connector(pull_data=[var.io_name for var in self.variables]) def _connect(self, start_time): self.try_connect(start_time) if self.status != fm.ComponentStatus.CONNECTED: return create_nc_framework( self.dataset, self.time_var, start_time, self.time_unit, self.connector.in_infos, self.connector.in_data, self.variables, self.global_attrs, ) current_date = date2num(start_time, self.dataset[self.time_var].units) self.dataset[self.time_var][self.timestamp_counter] = current_date # adding time and var data to the first timestamp for var in self.variables: data = self.connector.in_data[var.io_name].magnitude self.dataset[var.name][self.timestamp_counter, ...] = data self.timestamp_counter += 1 def _validate(self): pass def _update(self): pass def _finalize(self): self.dataset.close() # pylint: disable-next=unused-argument def _data_changed(self, name, caller, time): if self.status in ( fm.ComponentStatus.CONNECTED, fm.ComponentStatus.CONNECTING, fm.ComponentStatus.CONNECTING_IDLE, ): self.last_update = time return if not isinstance(time, datetime): raise ValueError("Time must be of type datetime") if self.status == fm.ComponentStatus.INITIALIZED: self.last_update = time return if time != self.last_update and self.pushed_inputs: raise ValueError("Data not pushed for all inputs") self.last_update = time self.pushed_inputs.add(name) if self.pushed_inputs != self.all_inputs: return current_time = date2num(time, self.dataset[self.time_var].units) self.dataset[self.time_var][self.timestamp_counter] = current_time for var in self.variables: data = self.inputs[var.io_name].pull_data(time).magnitude self.dataset[var.name][self.timestamp_counter, ...] = data self.timestamp_counter += 1 self.pushed_inputs.clear() self.update()