"""
Basic linear and nearest neighbour regridding adapters.
See package `finam-regrid <https://finam.pages.ufz.de/finam-regrid/>`_ for more advanced regridding.
"""
from abc import ABC, abstractmethod
import numpy as np
from pyproj import Transformer, crs
from scipy.interpolate import LinearNDInterpolator, RegularGridInterpolator
from scipy.spatial import KDTree
from ..data import tools as dtools
from ..data.grid_spec import StructuredGrid
from ..errors import FinamMetaDataError
from ..sdk import Adapter
from ..tools.log_helper import ErrorLogger
__all__ = [
"ARegridding",
"RegridNearest",
"RegridLinear",
]
class ARegridding(Adapter, ABC):
"""Abstract regridding class for handling data info"""
def __init__(self, in_grid=None, out_grid=None):
super().__init__()
self.input_grid = in_grid
self.output_grid = out_grid
self.input_meta = None
self.transformer = None
self._is_initialized = False
@abstractmethod
def _update_grid_specs(self):
"""set up interpolator"""
def _get_info(self, info):
request = info.copy_with(grid=self.input_grid)
in_info = self.exchange_info(request)
if self.output_grid is None and info.grid is None:
with ErrorLogger(self.logger):
raise FinamMetaDataError("Missing target grid specification")
if self.input_grid is None and in_info.grid is None:
with ErrorLogger(self.logger):
raise FinamMetaDataError("Missing source grid specification")
if (
self.output_grid is not None
and info.grid is not None
and self.output_grid != info.grid
):
with ErrorLogger(self.logger):
raise FinamMetaDataError(
"Target grid specification is already set, new specs differ"
)
self.input_grid = self.input_grid or in_info.grid
self.output_grid = self.output_grid or info.grid
if self.input_grid.crs is None and self.output_grid.crs is not None:
raise FinamMetaDataError("Input grid has a CRS, but output grid does not")
if self.output_grid.crs is None and self.input_grid.crs is not None:
raise FinamMetaDataError("output grid has a CRS, but input grid does not")
out_info = in_info.copy_with(grid=self.output_grid)
if not self._is_initialized:
self._update_grid_specs()
self._is_initialized = True
self.input_meta = in_info.meta
return out_info
def _do_transform(self, points):
if self.transformer is None:
return points
return np.asarray(list(self.transformer.itransform(points)))
[docs]
class RegridNearest(ARegridding):
"""Regrid data between two grid specifications with nearest neighbour interpolation.
See package `finam-regrid <https://finam.pages.ufz.de/finam-regrid/>`_ for more advanced regridding
using `ESMPy <https://earthsystemmodeling.org/esmpy/>`_.
.. warning::
Does currently not support masked input data. Raises a ``NotImplementedError`` in that case.
Examples
--------
.. testcode:: constructor
import finam as fm
adapter = fm.adapters.RegridNearest()
adapter = fm.adapters.RegridNearest(
in_grid=fm.UniformGrid(dims=(20, 10)),
out_grid=fm.UniformGrid(dims=(10, 5), spacing=(2.0, 2.0, 2.0)),
)
Parameters
----------
in_grid : Grid or None (optional)
Input grid specification. Will be taken from source component if not specified.
out_grid : Grid or None (optional)
Output grid specification. Will be taken from target component if not specified.
tree_options : dict
kwargs for :class:`scipy.spatial.KDTree`
"""
def __init__(self, in_grid=None, out_grid=None, tree_options=None):
super().__init__(in_grid, out_grid)
self.tree_options = tree_options
self.ids = None
def _update_grid_specs(self):
self.transformer = _create_transformer(self.output_grid, self.input_grid)
out_coords = self._do_transform(self.output_grid.data_points)
# generate IDs to select data
kw = self.tree_options or {}
tree = KDTree(self.input_grid.data_points, **kw)
# only store IDs, since they will be constant
self.ids = tree.query(out_coords)[1]
[docs]
def _get_data(self, time, target):
in_data = self.pull_data(time, target)
if dtools.is_masked_array(in_data):
with ErrorLogger(self.logger):
msg = "Regridding is currently not implemented for masked data"
raise NotImplementedError(msg)
res = in_data.reshape(-1, order=self.input_grid.order)[self.ids].reshape(
self.output_grid.data_shape, order=self.output_grid.order
)
return res
[docs]
class RegridLinear(ARegridding):
"""
Regrid data between two grid specifications with linear interpolation.
Uses :class:`scipy.interpolate.RegularGridInterpolator` for structured grids.
For unstructured grids, :class:`scipy.interpolate.LinearNDInterpolator` is used,
which performs triangulation internally.
So the actual topology of the grid is not taken into account.
See package `finam-regrid <https://finam.pages.ufz.de/finam-regrid/>`_ for more advanced regridding
using `ESMPy <https://earthsystemmodeling.org/esmpy/>`_.
.. warning::
Does currently not support masked input data. Raises a ``NotImplementedError`` in that case.
Examples
--------
.. testcode:: constructor
import finam as fm
adapter = fm.adapters.RegridLinear()
adapter = fm.adapters.RegridLinear(
in_grid=fm.UniformGrid(dims=(20, 10)),
out_grid=fm.UniformGrid(dims=(10, 5), spacing=(2.0, 2.0, 2.0)),
)
Parameters
----------
in_grid : Grid or None (optional)
Input grid specification. Will be taken from source component if not specified.
out_grid : Grid or None (optional)
Output grid specification. Will be taken from target component if not specified.
fill_with_nearest : bool
Whether out of bounds points should be filled with the nearest value. Default ``False``.
tree_options : dict
kwargs for :class:`scipy.spatial.KDTree`
"""
def __init__(
self, in_grid=None, out_grid=None, fill_with_nearest=False, tree_options=None
):
super().__init__(in_grid, out_grid)
self.tree_options = tree_options
self.fill_with_nearest = bool(fill_with_nearest)
self.ids = None
self.inter = None
self.out_ids = None
self.fill_ids = None
self.out_coords = None
def _update_grid_specs(self):
self.transformer = _create_transformer(self.output_grid, self.input_grid)
self.out_coords = self._do_transform(self.output_grid.data_points)
if isinstance(self.input_grid, StructuredGrid):
self.inter = RegularGridInterpolator(
points=self.input_grid.data_axes,
values=np.zeros(self.input_grid.data_shape, dtype=np.double),
bounds_error=False,
)
else:
self.inter = LinearNDInterpolator(
points=self.input_grid.data_points,
values=np.zeros(np.prod(self.input_grid.data_shape), dtype=np.double),
)
if self.fill_with_nearest:
# check for outliers once
points = self.out_coords
res = self.inter(points)
self.out_ids = np.isnan(res)
out_points = points[self.out_ids]
kw = self.tree_options or {}
tree = KDTree(self.input_grid.data_points, **kw)
self.fill_ids = tree.query(out_points)[1]
[docs]
def _get_data(self, time, target):
in_data = self.pull_data(time, target)
if dtools.is_masked_array(in_data):
with ErrorLogger(self.logger):
msg = "Regridding is currently not implemented for masked data"
raise NotImplementedError(msg)
if isinstance(self.input_grid, StructuredGrid):
self.inter.values = in_data[0, ...].magnitude
res = self.inter(self.out_coords)
if self.fill_with_nearest:
res[self.out_ids] = self.inter.values.flatten(
order=self.input_grid.order
)[self.fill_ids]
else:
self.inter.values = np.ascontiguousarray(
in_data[0, ...].magnitude.reshape((-1, 1), order=self.input_grid.order),
dtype=np.double,
)
res = self.inter(self.out_coords)
if self.fill_with_nearest:
res[self.out_ids] = self.inter.values[self.fill_ids, 0]
return dtools.quantify(res, dtools.get_units(in_data))
def _create_transformer(input_grid, output_grid):
in_crs = None if input_grid.crs is None else crs.CRS(input_grid.crs)
out_crs = None if output_grid.crs is None else crs.CRS(output_grid.crs)
transformer = (
None
if (in_crs is None and out_crs is None) or in_crs == out_crs
else Transformer.from_crs(in_crs, out_crs)
)
return transformer