Source code for climada_petals.hazard.rf_glofas.cds_glofas_downloader

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Functions for downloading GloFAS river discharge data from the Copernicus Climate Data
Store (CDS).
"""

from pathlib import Path
import multiprocessing as mp
from copy import deepcopy
from typing import Iterable, Mapping, Any, Optional, List, Union
from itertools import repeat
from datetime import date, datetime
import logging
import hashlib

from cdsapi import Client
from ruamel.yaml import YAML
from ruamel.yaml.compat import StringIO
import pandas as pd
import numpy as np

from climada.util.constants import SYSTEM_DIR

LOGGER = logging.getLogger(__name__)

CDS_DOWNLOAD_DIR = Path(SYSTEM_DIR, "cds-download")

DEFAULT_REQUESTS = {
    "historical": {
        "variable": "river_discharge_in_the_last_24_hours",
        "product_type": "consolidated",
        "system_version": "version_3_1",
        "hydrological_model": "lisflood",
        "format": "grib",
        "hyear": "1979",
        "hmonth": [
            "january",
            "february",
            "march",
            "april",
            "may",
            "june",
            "july",
            "august",
            "september",
            "october",
            "november",
            "december",
        ],
        "hday": [f"{day:02}" for day in range(1, 32)],
    },
    "forecast": {
        "variable": "river_discharge_in_the_last_24_hours",
        "product_type": "ensemble_perturbed_forecasts",
        "system_version": "version_3_1",
        "hydrological_model": "lisflood",
        "format": "grib",
        "year": "2022",
        "month": "08",
        "day": "01",
        "leadtime_hour": (np.arange(1, 31) * 24).astype(str).tolist(),
    },
}
"""Default request keyword arguments to be updated by the user requests"""


def request_to_md5(request: Mapping[Any, Any]) -> str:
    """Hash a string with the MD5 algorithm"""
    yaml = YAML()
    stream = StringIO()
    yaml.dump(request, stream)
    return hashlib.md5(stream.getvalue().encode("utf-8")).hexdigest()


def cleanup_download_dir(
    download_dir: Union[Path, str] = CDS_DOWNLOAD_DIR, dry_run: bool = False
):
    """Delete the contents of the download directory"""
    for filename in Path(download_dir).glob("*"):
        LOGGER.debug("Removing file: %s", filename)
        if not dry_run:
            filename.unlink()
    if dry_run:
        LOGGER.debug("Dry run. No files removed")


def glofas_request_single(
    product: str,
    request: Mapping[str, Any],
    outpath: Union[Path, str],
    use_cache: bool = True,
    client_kw: Optional[Mapping[str, Any]] = None,
) -> Path:
    """Perform a single request for data from the Copernicus data store

    This will skip the download if a file was found at the target location with the same
    request. The request will be stored as YAML file alongside the target file and used
    for comparison. This behavior can be adjusted with the ``use_cache`` parameter.

    Parameters
    ----------
    product : str
        The string identifier of the product in the Copernicus data store
    request : dict
        The download request as dictionary
    outpath : str or Path
        The file path to store the download into (including extension)
    use_cache : bool (optional)
        Skip downloading if the target file exists and the accompanying request file
        contains the same request
    client_kw : dict (optional)
        Dictionary with keyword arguments for the ``cdsapi.Client`` used for downloading
    """
    # Define output file
    outpath = Path(outpath)
    request_hash = request_to_md5(request)
    outfile = outpath / (
        datetime.today().strftime("%y%m%d-%H%M%S") + f"-{request_hash}"
    )
    extension = ".grib" if request["format"] == "grib" else ".nc"
    outfile = outfile.with_suffix(extension)

    # Check if request was issued before
    if use_cache:
        for filename in outpath.glob(f"*{extension}"):
            if request_hash == filename.stem.split("-")[-1]:
                LOGGER.info(
                    "Skipping request for file '%s' because it already exists", outfile
                )
                return filename.resolve()

    # Set up client and retrieve data
    LOGGER.info("Downloading file: %s", outfile)
    client_kw_default = dict(quiet=False, debug=False)
    if client_kw is not None:
        client_kw_default.update(client_kw)
    client = Client(**client_kw_default)
    client.retrieve(product, request, outfile)

    # Dump request
    yaml = YAML()
    yaml.dump(request, outfile.with_suffix(".yml"))

    # Return file path
    return outfile.resolve()


def glofas_request_multiple(
    product: str,
    requests: Iterable[Mapping[str, str]],
    outdir: Union[Path, str],
    num_proc: int,
    use_cache: bool,
    client_kw: Optional[Mapping[str, Any]] = None,
) -> List[Path]:
    """Execute multiple requests to the Copernicus data store in parallel"""
    with mp.Pool(num_proc) as pool:
        return pool.starmap(
            glofas_request_single,
            zip(
                repeat(product),
                requests,
                repeat(outdir),
                repeat(use_cache),
                repeat(client_kw),
            ),
        )


[docs] def glofas_request( product: str, date_from: str, date_to: Optional[str], output_dir: Union[Path, str], num_proc: int = 1, use_cache: bool = True, request_kw: Optional[Mapping[str, str]] = None, client_kw: Optional[Mapping[str, Any]] = None, ) -> List[Path]: """Request download of GloFAS data products from the Copernicus Data Store (CDS) Uses the Copernicus Data Store API (cdsapi) Python module. The interpretation of the ``date`` parameters and the grouping of the downloaded data depends on the type of ``product`` requested. Available ``products``: - ``historical``: Historical reanalysis discharge data. ``date_from`` and ``date_to`` are interpreted as integer years. Data for each year is placed into a single file. - ``forecast``: Forecast discharge data. ``date_from`` and ``date_to`` are interpreted as ISO date format strings. Data for each day is placed into a single file. Notes ----- Downloading data from the CDS requires authentication via a user key which is granted to each user upon registration. Do the following **before calling this function**: - Create an account at the Copernicus Data Store website: https://cds.climate.copernicus.eu/ - Follow the instructions to install the CDS API key: https://cds.climate.copernicus.eu/api-how-to#install-the-cds-api-key Parameters ---------- product : str The indentifier for the CMS product to download. See below for available options. date_from : str First date to download data for. Interpretation varies based on ``product``. date_to : str or None Last date to download data for. Interpretation varies based on ``product``. If ``None``, or the same date as ``date_from``, only download data for ``date_from`` output_dir : Path Output directory for the downloaded data num_proc : int Number of processes used for parallel requests use_cache : bool (optional) Skip downloading if the target file exists and the accompanying request file contains the same request request_kw : dict(str: str) Dictionary to update the default request for the given product client_kw : dict (optional) Dictionary with keyword arguments for the ``cdsapi.Client`` used for downloading Returns ------- list of Path Paths of the downloaded files """ # Check if product exists try: default_request = deepcopy(DEFAULT_REQUESTS[product]) except KeyError as err: raise NotImplementedError( f"product = {product}. Choose from {list(DEFAULT_REQUESTS.keys())}" ) from err # Update with request_kw if request_kw is not None: default_request.update(**request_kw) if product == "historical": # Interpret dates as years only year_from = int(date_from) year_to = int(date_to) if date_to is not None else year_from # List up all requests requests = [ {"hyear": str(year)} for year in list(range(year_from, year_to + 1)) ] elif product == "forecast": # Download single date if 'date_to' is 'None' date_from: date = date.fromisoformat(date_from) date_to: date = ( date.fromisoformat(date_to) if date_to is not None else date_from ) # List up all requests dates = pd.date_range(date_from, date_to, freq="D", inclusive="both").date requests = [ {"year": str(d.year), "month": f"{d.month:02d}", "day": f"{d.day:02d}"} for d in dates ] else: NotImplementedError("Unknown product: %s" % product) requests = [{**default_request, **req} for req in requests] glofas_product = f"cems-glofas-{product}" # Execute request return glofas_request_multiple( glofas_product, requests, output_dir, num_proc, use_cache, client_kw )