Source code for climada_petals.hazard.rf_glofas.rf_glofas

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

User Interface for GloFAS River Flood Module
"""

import logging
from contextlib import contextmanager
from typing import Union

import xarray as xr
import pandas as pd
from dask.distributed import Client

from climada.util.constants import SYSTEM_DIR
from climada.hazard import Hazard

LOGGER = logging.getLogger(__name__)

DEFAULT_DATA_DIR = SYSTEM_DIR / "river-flood-computation"


[docs] @contextmanager def dask_client(n_workers, threads_per_worker, memory_limit, *args, **kwargs): """Create a context with a ``dask.distributed.Client``. This is a lightweight wrapper and intended to expose only the most important parameters to end users. Parameters ---------- n_workers : int Number of parallel processes to launch. threads_per_worker : int Compute threads launched by each worker. memory_limit : str Memory limit for each process. Example: 4 GB can be expressed as ``4000M`` or ``4G``. args, kwargs Additional (keyword) arguments passed to the ``dask.distributed.Client`` constructor. Example ------- >>> with dask_client(n_workers=2, threads_per_worker=2, memory_limit="4G"): ... xr.open_dataset("data.nc", chunks="auto").median() """ # Yield the client with the arguments, and close it afterwards LOGGER.info("Creating dask.distributed.Client") client = Client( *args, n_workers=n_workers, threads_per_worker=threads_per_worker, memory_limit=memory_limit, **kwargs, ) try: yield client finally: LOGGER.info("Closing dask.distributed.Client") client.close()
[docs] def hazard_series_from_dataset( data: xr.Dataset, intensity: str, event_dim: str ) -> Union[pd.Series, Hazard]: """Create a series of Hazard objects from a multi-dimensional dataset The input flood data is usually multi-dimensional. For example, you might have downloaded ensemble data over an extended period of time. Therefore, this function returns a ``pandas.Series``. Each entry of the series is a ``Hazard`` object whose events have the same coordinates in this multi-dimensional space except the one given by ``event_dim``. For example, if your data space has the dimensions ``time``, ``lead_time`` and ``number``, and you choose ``event_dim="number"``, then the index of the series will be a ``MultiIndex`` from ``time`` and ``lead_time``, and a single hazard object will contain all events along the ``number`` axis for a given MultiIndex. Parameters ---------- data : xarray.Dataset Data to load a hazard series from. intensity : str Name of the dataset variable to read as hazard intensity. event_dim : str Name of the dimension to be used as event dimension in the hazards. All other dimension names except the dimensions for longitude and latitude will make up the hierarchy of the ``MultiIndex`` of the resulting series. Returns ------- pandas.Series Series of ``Hazard`` objects with events along ``event_dim`` and with a ``MultiIndex`` of the remaining dimensions. Tip --- This function must transpose the underlying data in the dataset to convenietly build ``Hazard`` objects. To ensure that this is an efficient operation, avoid plugging the return value of :py:meth:`~climada_petals.hazard.rf_glofas.river_flood_computation.RiverFloodInundation.compute` directly into this function, especially for **large data**. Instead, save the data first using :py:func:`~climada_petals.hazard.rf_glofas.transform_ops.save_file`, then re-open the data with xarray and call this function on it. Examples -------- Execute the default pipeline and retrieve the Hazard series >>> import xarray as xr >>> dset = xr.open_dataset("flood.nc") >>> sorted(list(dset.dims.keys())) ["date", "latitude", "longitude", "number", "sample"] >>> from climada_petals.hazard.rf_glofas import hazard_series_from_dataset >>> with xr.open_dataset("flood.nc") as dset: >>> hazard_series_from_dataset(dset, "flood_depth_flopros", "number") date sample 2022-08-10 0 <climada.hazard.base.Hazard ... 1 <climada.hazard.base.Hazard ... 2022-08-11 0 <climada.hazard.base.Hazard ... 1 <climada.hazard.base.Hazard ... Length: 4, dtype: object """ def create_hazard(dataset: xr.Dataset) -> Hazard: """Create hazard from a GloFASRiverFlood hazard dataset""" return Hazard.from_xarray_raster( dataset, hazard_type="RF", intensity=intensity, intensity_unit="m", coordinate_vars=dict(event=event_dim), data_vars=dict(date="time"), rechunk=True, ) # Iterate over all dimensions that are not lon, lat, or 'event_dim' iter_dims = list(set(data.dims) - {"longitude", "latitude", event_dim}) if iter_dims: index = pd.MultiIndex.from_product( [data[dim].values for dim in iter_dims], names=iter_dims ) hazards = [ create_hazard(data.sel(dict(zip(iter_dims, idx)))) for idx in index.to_flat_index() ] return pd.Series(hazards, index=index) return create_hazard(data)