Source code for climada_petals.hazard.rf_glofas.river_flood_computation

This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <>.


Top-level computation class for river flood inundation
from pathlib import Path
from tempfile import TemporaryDirectory
import logging
from typing import Iterable, Union, Optional, Mapping, Any, Callable, List
from contextlib import contextmanager
from datetime import datetime
from collections import namedtuple

import xarray as xr
import geopandas as gpd
import numpy as np
import pandas as pd

from .rf_glofas import DEFAULT_DATA_DIR, dask_client
from .transform_ops import (

LOGGER = logging.getLogger(__name__)

def _maybe_open_dataarray(
    arr: Optional[xr.DataArray],
    filename: Union[str, Path],
    """Create a context for opening an xarray file or yielding the input array

    This will open the file with ``xr.open_dataarray``

    arr : xr.DataArray or None
        The input array. If this is *not* ``None`` it is simply returned.
    filename : Path or str
        The file to open as data array if ``arr`` is ``None``.
        Keyword arguments passed to ``xr.open_dataarray``.

        Either ``arr`` or the array at ``filename``. If a file was opened, it will be
        closed when this context manager closes.
    if arr is None:
        LOGGER.debug("Opening file: %s", filename)
        arr = xr.open_dataarray(filename, **open_dataarray_kwargs)
            yield arr
            LOGGER.debug("Closing file: %s", filename)

        yield arr

_RiverFloodCachePaths = namedtuple(

[docs] class RiverFloodCachePaths(_RiverFloodCachePaths): """Container for storing paths to caches for :py:class:`RiverFloodInundation` Depending on the state of the corresponding :py:class:`RiverFloodInundation` instance, files might be present or not. Please check this explicitly before accessing them. Attributes ---------- discharge : pathlib.Path Path to the discharge data cache. return_period : pathlib.Path Path to the return period data cache. return_period_regrid : pathlib.Path Path to the regridded return period data cache. return_period_regrid_protect : pathlib.Path Path to the regridded return period data cache, where the return period was restricted by the protection limits. """
[docs] @classmethod def from_dir(cls, cache_dir: Path): """Set default paths from a cache directory""" return cls( discharge=cache_dir / "", return_period=cache_dir / "", return_period_regrid=cache_dir / "", return_period_regrid_protect=cache_dir / "", )
[docs] class RiverFloodInundation: """Class for computing river flood inundations Attributes ---------- store_intermediates : bool If intermediate results are stored in the respective :py:attr:`cache_paths` cache_paths : RiverFloodCachePaths Paths pointing to potential intermediate results stored in a cache directory. flood_maps : xarray.DataArray Flood inundation on lat/lon grid for specific return periods. gumbel_fits : xarray.Dataset Gumbel parameters resulting from extreme value analysis of historical discharge data. flopros : geopandas.GeoDataFrame Spatially explicit information on flood protection levels. regridder : xesmf.Regridder Storage for re-using the XESMF regridder in case the computation is repeated on the same grid. This reduces the runtime of subsequent computations. """
[docs] def __init__( self, store_intermediates: bool = True, data_dir: Union[Path, str] = DEFAULT_DATA_DIR, cache_dir: Union[Path, str] = DEFAULT_DATA_DIR / ".cache", ): """Initialize the instance Parameters ---------- store_intermediates : bool, optional Whether the data of each computation step should be stored in the cache directory. This is recommended especially for larger data. Only set this to ``False`` if the data operated on is very small (e.g., for a small country or region). Defaults to ``True``. data_dir : Path or str, optional The directory where flood maps, Gumbel distribution parameters and the FLOPROS database are located. Defaults to ``<climada>/data/river-flood-computation``, where ``<climada>`` is the Climada data directory indicated by ``local_data : system`` in the ``climada.conf``. This directory must exist. cache_dir : Path or str, optional The top-level cache directory where computation caches of this instance will be placed. Defaults to ``<climada>/data/river-flood-computation/.cache`` (see above for configuration). This directory (and all its parents) will be created. """ self._tempdir = None data_dir = Path(data_dir) if not data_dir.is_dir(): raise FileNotFoundError(f"'data_dir' does not exist: {data_dir}") self.store_intermediates = store_intermediates self.flood_maps = xr.open_dataarray( data_dir / "", chunks=dict(return_period=-1, latitude="auto", longitude="auto"), ) self.gumbel_fits = xr.open_dataset(data_dir / "", chunks="auto") self.flopros = gpd.read_file(data_dir / "FLOPROS_shp_V1/FLOPROS_shp_V1.shp") self.regridder = None self._create_tempdir(cache_dir=cache_dir)
def __del__(self): """Upon deletion, make sure the temporary directory is cleaned up""" # NOTE: Deletion might also happen when __init__ did not succeed/conclude! if self._tempdir is not None: self._tempdir.cleanup() def _create_tempdir(self, cache_dir: Union[Path, str]): """Create a temporary directory inside the top-level cache dir Parameters ---------- cache_dir : Path or str The directory where caches are placed. Each cache is a temporary subdirectory of ``cache_dir``. If the path does not exist, it will be created, including all parent directories. """ # Create cache directory cache_dir = Path(cache_dir) cache_dir.mkdir(parents=True, exist_ok=True) # Create temporary directory for cache self._tempdir = TemporaryDirectory( dir=cache_dir,"%y%m%d-%H%M%S-") ) # Define cache paths self.cache_paths = RiverFloodCachePaths.from_dir(Path(
[docs] def clear_cache(self): """Clear the cache of this instance This will delete the temporary cache directory and create a new one by calling :py:meth:`_create_tempdir`. """ cache_dir = Path( self._tempdir.cleanup() self._create_tempdir(cache_dir=cache_dir)
[docs] def compute( self, discharge: Optional[xr.DataArray] = None, apply_protection: Union[bool, str] = "both", resample_kws: Optional[Mapping[str, Any]] = None, regrid_kws: Optional[Mapping[str, Any]] = None, ): """Compute river flood inundation from downloaded river discharge After downloading discharge data, this will execute the pipeline for computing river flood inundaton. This pipeline has the following steps: - Compute the equivalent return period, either with :py:meth:`return_period`, or :py:meth:`return_period_resample`. - Regrid the return period data onto the grid of the flood hazard maps with :py:meth:`regrid`. - *Optional*: Apply the protection layer with :py:meth:`apply_protection`. - Compute the flood depth by interpolating flood hazard maps with :py:meth:`flood_depth`. Resampling, regridding, and the application of protection information are controlled via the parameters of this method. Parameters ---------- discharge : xr.DataArray or None (optional) The discharge data to compute flood depths for. If ``None``, the cached discharge will be used. Defaults to ``None``. apply_protection : bool or "both" (optional) If the stored protection layer should be considered when computing the flood depth. If ``"both"``, this method will return a dataset with two flood depth arrays. Defaults to ``both``. resample_kws : Mapping (str, Any) or None (optional) Keyword arguments for :py:meth:`return_period_resample`. If ``None``, this method will call :py:meth:`return_period`. Otherwise, it will call :py:meth:`return_period_resample` and pass this parameter as keyword arguments. Defaults to ``None``. regrid_kws : Mapping (str, Any) or None (optional) Keyword arguments for :py:meth:`regrid`. Defaults to ``None``. Returns ------- xr.Dataset Dataset containing the flood data with the same dimensions as the input discharge data. Depending on the choice of ``apply_protection``, this will contain one or two DataArrays. Raises ------ RuntimeError If ``discharge`` is ``None``, but no discharge data is cached. """ if discharge is None and not self.cache_paths.discharge.is_file(): raise RuntimeError( "No discharge data. Please download a discharge with this object " "first or supply the data as argument to this function" ) # Compute return period if resample_kws is None: self.return_period(discharge=discharge) else: self.return_period_resample(discharge=discharge, **resample_kws) # Regrid regrid_kws = regrid_kws if regrid_kws is not None else {} self.regrid(**regrid_kws) # Compute flood depth ds_flood = xr.Dataset() if not apply_protection or apply_protection == "both": ds_flood["flood_depth"] = self.flood_depth() # Compute flood depth with protection self.apply_protection() ds_flood["flood_depth_flopros"] = self.flood_depth() # Return data return ds_flood
[docs] def download_forecast( self, countries: Union[str, List[str]], forecast_date: Union[str, np.datetime64, datetime, pd.Timestamp], lead_time_days: int = 10, preprocess: Optional[Callable] = None, **download_glofas_discharge_kwargs, ) -> xr.DataArray: """Download GloFAS discharge ensemble forecasts If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- countries : str or list of str Names or codes of countries to download data for. The downloaded data will be a lat/lon grid covering all specified countries. forecast_date The date at which the forecast was issued. Can be defined any way that is compatible with ``pandas.Timestamp``, see lead_time_days : int, optional How many days of lead time to include in the downloaded forecast. Maximum is 30. Defaults to 10, in which case the 10 days following the ``forecast_date`` are included in the download. preprocess Callable for preprocessing data while loading it. See download_glofas_discharge_kwargs Additional arguments to :py:func:`climada_petals.hazard.rf_glofas.transform_ops.download_glofas_discharge` Returns ------- forecast : xr.DataArray Downloaded forecast as DataArray after preprocessing See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.download_glofas_discharge` """ leadtime_hour = list( map(str, (np.arange(1, lead_time_days + 1, dtype=np.int_) * 24).flat) ) forecast = download_glofas_discharge( product="forecast", date_from=pd.Timestamp(forecast_date).date().isoformat(), date_to=None, countries=countries, preprocess=preprocess, leadtime_hour=leadtime_hour, **download_glofas_discharge_kwargs, ) if self.store_intermediates: save_file(forecast, self.cache_paths.discharge, zlib=False) return forecast
[docs] def download_reanalysis( self, countries: Union[str, Iterable[str]], year: int, preprocess: Optional[Callable] = None, **download_glofas_discharge_kwargs, ): """Download GloFAS discharge historical data If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- countries : str or list of str Names or codes of countries to download data for. The downloaded data will be a lat/lon grid covering all specified countries. year : int The year to download data for. preprocess Callable for preprocessing data while loading it. See download_glofas_discharge_kwargs Additional arguments to :py:func:`climada_petals.hazard.rf_glofas.transform_ops.download_glofas_discharge` Returns ------- reanalysis : xr.DataArray Downloaded forecast as DataArray after preprocessing See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.download_glofas_discharge` """ reanalysis = download_glofas_discharge( product="historical", date_from=str(year), date_to=None, countries=countries, preprocess=preprocess, **download_glofas_discharge_kwargs, ) if self.store_intermediates: save_file(reanalysis, self.cache_paths.discharge, zlib=False) return reanalysis
[docs] def return_period( self, discharge: Optional[xr.DataArray] = None, ) -> xr.DataArray: """Compute the return period for a given discharge If no discharge data is given as parameter, the discharge cache will be accessed. If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- discharge : xr.DataArray, optional The discharge data to operate on. Defaults to ``None``, which indicates that data should be loaded from the cache Returns ------- r_period : xr.DataArray Return period for each location of the input discharge. See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.return_period` """ with _maybe_open_dataarray( discharge, self.cache_paths.discharge, chunks="auto" ) as discharge: r_period = return_period( discharge, self.gumbel_fits["loc"], self.gumbel_fits["scale"] ) if self.store_intermediates: save_file(r_period, self.cache_paths.return_period) return r_period
[docs] def return_period_resample( self, num_bootstrap_samples: int, discharge: Optional[xr.DataArray] = None, fit_method: str = "MM", num_workers: int = 1, memory_per_worker: str = "2G", ): """Compute the return period for a given discharge using bootstrap sampling. For each input discharge value, this creates an ensemble of return periods by employing bootstrap sampling. The ensemble size is controlled with ``num_bootstrap_samples``. If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- num_bootstrap_samples : int Number of bootstrap samples to compute for each discharge value. discharge : xr.DataArray, optional The discharge data to operate on. Defaults to ``None``, which indicates that data should be loaded from the cache. fit_method : str, optional Method for fitting data to bootstrapped samples. * ``"MM"``: Method of Moments * ``"MLE"``: Maximum Likelihood Estimation num_workers : int, optional Number of parallel processes to use when computing the samples. memory_per_worker : str, optional Memory to allocate for each process. Returns ------- r_period : xr.DataArray Return period samples for each location of the input discharge. See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.return_period_resample` """ # Use smaller chunks so function does not suffocate with _maybe_open_dataarray( discharge, self.cache_paths.discharge, chunks=dict(longitude=50, latitude=50), ) as discharge_data: kwargs = dict( discharge=discharge_data, gev_loc=self.gumbel_fits["loc"], gev_scale=self.gumbel_fits["scale"], gev_samples=self.gumbel_fits["samples"], bootstrap_samples=num_bootstrap_samples, fit_method=fit_method, ) def work(): r_period = return_period_resample(**kwargs) if self.store_intermediates: save_file(r_period, self.cache_paths.return_period, zlib=False) return r_period if num_workers > 1: with dask_client(num_workers, 1, memory_per_worker): return work() else: return work()
[docs] def regrid( self, r_period: Optional[xr.DataArray] = None, method: str = "bilinear", reuse_regridder: bool = False, ): """Regrid the return period data onto the flood hazard map grid. This computes the regridding matrix for the given coordinates and then performs the actual regridding. The matrix is stored in :py:attr:`regridder`. If another regridding is performed on the same grid (but possibly different data), the regridder can be reused to save time. To control that, set ``reuse_regridder=True``. If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- r_period : xr.DataArray, optional The return period data to regrid. Defaults to ``None``, which indicates that data should be loaded from the cache. method : str, optional Interpolation method of the return period data. Defaults to ``"bilinear"``. See reuse_regridder : bool, optional Reuse the regridder stored if one is stored. Defaults to ``False``, which means that a new regridder is always built when calling this function. If ``True``, and no regridder is stored, it will be built nonetheless. Returns ------- return_period_regrid : xr.DataArray The regridded return period data. See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.regrid` """ # NOTE: Chunks must be small because resulting array is huge! with _maybe_open_dataarray( r_period, self.cache_paths.return_period, chunks=dict(longitude=-1, latitude=-1, time=1, sample=1, number=1, step=1), ) as return_period_data: if not reuse_regridder: self.regridder = None return_period_regrid, self.regridder = regrid( return_period_data, self.flood_maps, method=method, regridder=self.regridder, return_regridder=True, ) if self.store_intermediates: save_file( return_period_regrid, self.cache_paths.return_period_regrid, zlib=False, ) return return_period_regrid
[docs] def apply_protection(self, return_period_regrid: Optional[xr.DataArray] = None): """Limit the return period data by applying FLOPROS protection levels. This sets each return period value where the local FLOPROS protection level is not exceeded to NaN and returns the result. Protection levels are read from :py:attr:`flopros`. If :py:attr:`store_intermediates` is true, the returned data is also stored in :py:attr:`cache_paths`. Parameters ---------- return_period_regrid : xr.DataArray, optional The return period data to regrid. Defaults to ``None``, which indicates that data should be loaded from the cache. Returns ------- return_period_regrid_protect : xr.DataArray The regridded return period where each value that does not reach the protection limit is set to NaN. See Also -------- :py:func:`climada_petals.hazard.rf_glofas.transform_ops.apply_flopros` """ with _maybe_open_dataarray( return_period_regrid, self.cache_paths.return_period_regrid, chunks="auto" ) as return_period_regrid_data: return_period_regrid_protect = apply_flopros( self.flopros, return_period_regrid_data ) if self.store_intermediates: save_file( return_period_regrid_protect, self.cache_paths.return_period_regrid_protect, zlib=False, ) return return_period_regrid_protect
[docs] def flood_depth(self, return_period_regrid: Optional[xr.DataArray] = None): """Compute the flood depth from regridded return period data. Interpolate the flood hazard maps stored in :py:attr`flood_maps` in the return period dimension at every location to compute the flood footprint. Note ---- Even if :py:attr:`store_intermediates` is true, the returned data is **not** stored automatically! Use :py:func:`climada_petals.hazard.rf_glofas.transform_ops.save_file` to store the data yourself. Parameters ---------- return_period_regrid : xr.DataArray, optional The regridded return period data to use for computing the flood footprint. Defaults to ``None`` which indicates that data should be loaded from the cache. If :py:attr:`RiverFloodCachePaths.return_period_regrid_protect` exists, that data is used. Otherwise, the "unprotected" data :py:attr:`RiverFloodCachePaths.return_period_regrid` is loaded. Returns ------- inundation : xr.DataArray The flood inundation at every location of the flood hazard maps grid. """ file_path = self.cache_paths.return_period_regrid if ( return_period_regrid is None and self.cache_paths.return_period_regrid_protect.is_file() ): file_path = self.cache_paths.return_period_regrid_protect with _maybe_open_dataarray( return_period_regrid, file_path, chunks="auto" ) as return_period_regrid_data: inundation = flood_depth(return_period_regrid_data, self.flood_maps) return inundation