Source code for climada_petals.hazard.low_flow

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define LowFlow (LF) class.
WORK IN PROGRESS
"""

__all__ = ['LowFlow']

import logging
import copy
import datetime as dt
from pathlib import Path
import cftime
import geopandas as gpd
import numpy as np
import numba
import pandas as pd
import xarray as xr


from sklearn.cluster import DBSCAN
from sklearn.neighbors import BallTree
from shapely.geometry import Point
from scipy import sparse

from climada.hazard.base import Hazard
from climada.hazard.centroids import Centroids
import climada.util.coordinates as u_coord

LOGGER = logging.getLogger(__name__)

HAZ_TYPE = 'LF'
"""Hazard type acronym for Low Flow / Water Scarcity"""

FN_STR_VAR = 'co2_dis_global_daily'  # FileName STRing depending on VARiable
"""constant part of discharge output file (according to ISIMIP filenaming)"""

YEARCHUNKS = dict()
"""list of year chunks: multiple files are combined"""

YEARCHUNKS['historical'] = list()
"""historical year chunks ISIMIP 2b"""
for i in np.arange(1860, 2000, 10):
    YEARCHUNKS['historical'].append(f'{i+1}_{i+10}')
YEARCHUNKS['historical'].append('2001_2005')

YEARCHUNKS['hist'] = list()
"""historical year chunks ISIMIP 2a"""
for i in np.arange(1970, 2010, 10):
    YEARCHUNKS['hist'].append(f'{i+1}_{i+10}')

YEARCHUNKS['rcp26'] = ['2006_2010']
for i in np.arange(2010, 2090, 10):
    YEARCHUNKS['rcp26'].append(f'{i+1}_{i+10}')
YEARCHUNKS['rcp26'].append('2091_2099')
YEARCHUNKS['rcp60'] = YEARCHUNKS['rcp26']
"""future year chunks"""

REFERENCE_YEARRANGE = (1971, 2005)
"""default year range used to compute threshold (base line reference)"""

TARGET_YEARRANGE = (2001, 2005)
"""arbitrary default, i.e. default year range of historical low flow hazard 2001-2005"""

BBOX = (-180, -85, 180, 85)
"""default quasi-global geographical bounding box: [lon_min, lat_min, lon_max, lat_max]"""

# reducing these two parameters decreases memory load but increases computation time:
BBOX_WIDTH = 75
"""default width and height of geographical bounding boxes for loop in degree lat/lon.
i.e., the bounding box is split into square boxes with maximum size BBOX_WIDTH*BBOX_WIDTH
(avoid memory usage spike)"""
INTENSITY_STEP = 300
"""max. number of events to be written to hazard.intensity matrix at once
(avoid memory usage spike)"""


[docs] class LowFlow(Hazard): """Contains river low flow events (surface water scarcity). The intensity of the hazard is number of days below a threshold (defined as percentile in reference data). The method set_from_nc can be used to create a LowFlow hazard set populated with data based on gridded hydrological model runs as provided by the ISIMIP project (https://www.isimip.org/), e.g. ISIMIP2a/b. grid cells with a minimum number of days below threshold per month are clustered in space (lat/lon) and time (monthly) to identify and set connected events. Attributes ---------- clus_thresh_t : int maximum time difference in months to be counted as$ connected points during clustering, default = 1 clus_thresh_xy : int maximum spatial grid cell distance in number of cells to be counted as connected points during clustering, default = 2 min_samples : int Minimum amount of data points in one cluster to consider as event, default = 1. date_start : np.array(int) for each event, the date of the first month of the event (ordinal) Note: Hazard attribute 'date' contains the date of maximum event intensity. date_end : np.array(int) for each event, the date of the last month of the event (ordinal) resolution : float spatial resoultion of gridded discharge input data in degree lat/lon, default = 0.5° """ clus_thresh_t = 1 # Default = 1: months with intensity<min_intensity interrupt event clus_thresh_xy = 2 # Default = 2: allows 1 cell gap and diagonal connection min_samples = 1 # Default = 1: no filtering of small events with this default resolution = .5 # Default = .5: in agreement with resolution of data from ISIMIP 1-3
[docs] def __init__(self, pool=None): """Empty constructor.""" Hazard.__init__(self, HAZ_TYPE) if pool: self.pool = pool LOGGER.info('Using %s CPUs.', self.pool.ncpus) else: self.pool = None
[docs] def set_from_nc(self, *args, **kwargs): """This function is deprecated, use LowFlow.from_netcdf instead.""" LOGGER.warning("The use of LowFlow.set_from_nc is deprecated." "Use LowFlow.from_netcdf instead.") self.__dict__ = LowFlow.from_netcdf(*args, **kwargs).__dict__
[docs] @classmethod def from_netcdf(cls, input_dir=None, centroids=None, countries=None, reg=None, bbox=None, percentile=2.5, min_intensity=1, min_number_cells=1, min_days_per_month=1, yearrange=TARGET_YEARRANGE, yearrange_ref=REFERENCE_YEARRANGE, gh_model=None, cl_model=None, scenario='historical', scenario_ref='historical', soc='histsoc', soc_ref='histsoc', fn_str_var=FN_STR_VAR, keep_dis_data=False, yearchunks='default', mask_threshold=('mean', 1)): """Wrapper to fill hazard from NetCDF file containing variable dis (daily), e.g. as provided from from ISIMIP Water Sectior (Global): https://esg.pik-potsdam.de/search/isimip/ Parameters ---------- input_dir : string path to input data directory. In this folder, netCDF files with gridded hydrological model output are required, containing the variable dis (discharge) on a daily temporal resolution as f.i. provided by the ISIMIP project (https://www.isimip.org/) centroids : Centroids centroids (area that is considered, reg and country must be None) countries : list of countries ISO3 selection of countries (reg must be None!) [not yet implemented] reg : list of regions can be set with region code if whole areas are considered (if not None, countries and centroids are ignored) [not yet implemented] bbox : tuple of four floats bounding box: (lon min, lat min, lon max, lat max) percentile : float percentile used to compute threshold, 0.0 < percentile < 100.0 min_intensity : int minimum intensity (nr of days) in an event event; events with lower max. intensity are dropped min_number_cells : int minimum spatial extent (nr of grid cells) in an event event; events with lower geographical extent are dropped min_days_per_month : int minimum nr of days below threshold in a month; months with lower nr of days below threshold are not considered for the event creation (clustering) yearrange : int tuple year range for hazard set, f.i. (2001, 2005) yearrange_ref : int tuple year range for reference (threshold), f.i. (1971, 2000) gh_model : str abbrev. hydrological model (only when input_dir is selected) f.i. 'H08', 'CLM45', 'ORCHIDEE', 'LPJmL', 'WaterGAP2', 'JULES-W1', 'MATSIRO' cl_model : str abbrev. climate model (only when input_dir is selected) f.i. 'gfdl-esm2m', 'hadgem2-es', 'ipsl-cm5a-lr', 'miroc5', 'gswp3', 'wfdei', 'princeton', 'watch' scenario : str climate change scenario (only when input_dir is selected) f.i. 'historical', 'rcp26', 'rcp60', 'hist' scenario_ref : str climate change scenario for reference (only when input_dir is selected) soc : str socio-economic trajectory (only when input_dir is selected) f.i. 'histsoc', # historical trajectory '2005soc', # constant at 2005 level 'rcp26soc', # RCP6.0 trajectory 'rcp60soc', # RCP6.0 trajectory 'pressoc' # constant at pre-industrial socio-economic level soc_ref : str csocio-economic trajectory for reference, like soc. (only when input_dir is selected) fn_str_var : str FileName STRing depending on VARiable and ISIMIP simuation round keep_dis_data : boolean keep monthly data (variable ndays = days below threshold) as dataframe (attribute "data") and save additional field 'relative_dis' (relative discharge compared to the long term) yearchunks : list of year chunks corresponding to each nc flow file. If set to 'default', uses the chunking corresponding to the scenario. mask_threshold : tuple with threshold value [1] for criterion [0] for mask: Threshold below which the grid is masked out. e.g.: ('mean', 1.) --> grid cells with a mean discharge below 1 are ignored ('percentile', .3) --> grid cells with a value of the computed percentile discharge values below 0.3 are ignored. default: ('mean', 1}). Set to None for no threshold. Provide a list of tuples for multiple thresholds. Raises ------ NameError Returns ------- LowFlow hazard set with lowflow calculated from netcdf file containing discharge data """ if input_dir: if not Path(input_dir).is_dir(): LOGGER.warning('Input directory %s does not exist', input_dir) raise NameError else: LOGGER.warning('Input directory %s not set', input_dir) raise NameError if centroids: centr_handling = 'align' elif countries or reg: LOGGER.warning('country or reg ignored: not yet implemented') centr_handling = 'full_hazard' else: centr_handling = 'full_hazard' # read data and call preprocessing routine: haz = cls() haz.lowflow_df, centroids_import = data_preprocessing_percentile( percentile, yearrange, yearrange_ref, input_dir, gh_model, cl_model, scenario, scenario_ref, soc, soc_ref, fn_str_var, bbox, min_days_per_month, keep_dis_data, yearchunks, mask_threshold) if centr_handling == 'full_hazard': centroids = centroids_import haz.identify_clusters() haz.set_intensity_from_clusters(centroids, min_intensity, min_number_cells, yearrange, keep_dis_data) return haz
[docs] def set_intensity_from_clusters(self, centroids=None, min_intensity=1, min_number_cells=1, yearrange=TARGET_YEARRANGE, keep_dis_data=False): """ Build low flow hazards with events from clustering and centroids and (re)set attributes. """ # sum "dis" (days per month below threshold) per pixel and # cluster_id and write to hazard.intensity self.events_from_clusters(centroids) if min_intensity > 1 or min_number_cells > 1: haz_tmp = self.filter_events(min_intensity=min_intensity, min_number_cells=min_number_cells) LOGGER.info('Filtering events: %i events remaining', haz_tmp.size) self.event_id = haz_tmp.event_id self.event_name = list(map(str, self.event_id)) self.date = haz_tmp.date self.date_start = haz_tmp.date_start self.date_end = haz_tmp.date_end self.orig = haz_tmp.orig self.frequency = haz_tmp.frequency self.intensity = haz_tmp.intensity self.fraction = haz_tmp.fraction del haz_tmp if not keep_dis_data: self.lowflow_df = None self.set_frequency(yearrange=yearrange)
def _intensity_loop(self, uniq_ev, coord, res_centr, num_centr): """Compute intensity and populate intensity matrix. For each event, if more than one points of data have the same coordinates, take the sum of days below threshold of these points (duration as accumulated intensity). Parameters ---------- uniq_ev : list of str list of unique cluster IDs coord : list Coordinates as in Centroids.coord res_centr : float Geographical resolution of centroids num_centroids : int Number of centroids Returns ------- intensity_mat : sparse.lilmatrix intensity values as sparse matrix """ tree_centr = BallTree(coord, metric='chebyshev') # steps: list of steps to be written to intensity matrix at once: steps = list(np.arange(0, len(uniq_ev) - 1, INTENSITY_STEP)) + [len(uniq_ev)] if len(steps) == 1: intensity_list = [self._intensity_one_cluster(tree_centr, cl_id, res_centr, num_centr) for cl_id in uniq_ev] return sparse.csr_matrix(intensity_list) # step_range: list of tuples containing the unique IDs to be written to # the intensity matrix in one step step_range = [tuple(uniq_ev[stp:steps[idx+1]]) for idx, stp in enumerate(steps[0:-1])] for idx, stp in enumerate(step_range): intensity_list = [] for cl_id in stp: intensity_list.append( self._intensity_one_cluster(tree_centr, cl_id, res_centr, num_centr)) if not idx: intensity_mat = sparse.lil_matrix(intensity_list) else: intensity_mat = sparse.vstack((intensity_mat, sparse.csr_matrix(intensity_list))) return intensity_mat def _set_dates(self, uniq_ev): """Set dates of maximum intensity (date) as well as start and end dates per event Parameters ---------- uniq_ev : list list of unique cluster IDs """ self.date = np.zeros(uniq_ev.size, int) self.date_start = np.zeros(uniq_ev.size, int) self.date_end = np.zeros(uniq_ev.size, int) for ev_idx, ev_id in enumerate(uniq_ev): # set event date to date of maximum intensity (ndays) self.date[ev_idx] = self.lowflow_df[self.lowflow_df.cluster_id == ev_id]\ .groupby('dtime')['ndays'].sum().idxmax() self.date_start[ev_idx] = self.lowflow_df[self.lowflow_df.cluster_id == ev_id]\ .dtime.min() self.date_end[ev_idx] = self.lowflow_df[self.lowflow_df.cluster_id == ev_id]\ .dtime.max()
[docs] def events_from_clusters(self, centroids): """Initiate hazard events from connected clusters found in self.lowflow_df Parameters ---------- centroids : Centroids """ # intensity = list() uniq_ev = np.unique(self.lowflow_df['cluster_id'].values) num_centr = centroids.size res_centr = self._centroids_resolution(centroids) self.units = 'days' # days below threshold self.centroids = centroids # Following values are defined for each event self.event_id = np.sort(uniq_ev) self.event_id = self.event_id[self.event_id > 0] self.event_name = list(map(str, self.event_id)) self._set_dates(uniq_ev) self.orig = np.ones(uniq_ev.size) self.set_frequency() self.intensity = self._intensity_loop(uniq_ev, centroids.coord, res_centr, num_centr) # Following values are defined for each event and centroid self.intensity = self.intensity.tocsr() self.fraction = self.intensity.copy() self.fraction.data.fill(1.0)
[docs] def identify_clusters(self, clus_thresh_xy=None, clus_thresh_t=None, min_samples=None): """call clustering functions to identify the clusters inside the dataframe Parameters ---------- clus_thresh_xy : int new value of maximum grid cell distance (number of grid cells) to be counted as connected points during clustering clus_thresh_t : int new value of maximum timse step difference (months) to be counted as connected points during clustering min_samples : int new value or minimum amount of data points in one cluster to retain the cluster as an event, smaller clusters will be ignored Returns ------- pandas.DataFrame """ if min_samples: self.min_samples = min_samples if clus_thresh_xy: self.clus_thresh_xy = clus_thresh_xy if clus_thresh_t: self.clus_thresh_t = clus_thresh_t self.lowflow_df['cluster_id'] = np.zeros(len(self.lowflow_df), dtype=int) LOGGER.debug('Computing 3D clusters.') # Compute clus_id: cluster identifier inside cons_id for cluster_vars in [('lat', 'lon'), ('lat', 'dt_month'), ('lon', 'dt_month')]: self.lowflow_df = self._df_clustering(self.lowflow_df, cluster_vars, self.resolution, self.clus_thresh_xy, self.clus_thresh_t, self.min_samples) self.lowflow_df = unique_clusters(self.lowflow_df) return self.lowflow_df
@staticmethod def _df_clustering(lowflow_df, cluster_vars, res_data, clus_thresh_xy, clus_thres_t, min_samples): """Compute 2D clusters and sort lowflow_df with ascending clus_id for each combination of the 3 dimensions (lat, lon, dt_month). Parameters ---------- lowflow_df : dataframe dataset obtained from ISIMIP data cluster_vars : tuple pf str pair of dimensions for 2D clustering, e.g. ('lat', 'dt_month') res_data : float input data grid resolution in degrees clus_thresh_xy : int clustering distance threshold in space clus_thresh_t : int clustering distance threshold in time min_samples : int clustering min. number Returns ------- pandas.DataFrame """ # set iter_var (dimension not used for clustering) if 'lat' not in cluster_vars: iter_var = 'lat' elif 'lon' not in cluster_vars: iter_var = 'lon' else: iter_var = 'dt_month' clus_id_var = 'c_%s_%s' % (cluster_vars[0], cluster_vars[1]) lowflow_df[clus_id_var] = np.zeros(len(lowflow_df), dtype=int) - 1 data_iter = lowflow_df[lowflow_df['iter_ev']][[iter_var, cluster_vars[0], cluster_vars[1], 'cons_id', clus_id_var]] if 'dt_month' in clus_id_var: # transform month count in accordance with spatial resolution # to achieve same distance between consecutive and geographically # neighboring points: data_iter.dt_month = data_iter.dt_month * res_data * clus_thresh_xy / clus_thres_t # Loop over slices: For each slice, perform 2D clustering with DBSCAN for i_var in data_iter[iter_var].unique(): temp = np.argwhere(np.array(data_iter[iter_var] == i_var)).reshape(-1, ) # slice x_y = data_iter.iloc[temp][[cluster_vars[0], cluster_vars[1]]].values x_y_uni, x_y_cpy = np.unique(x_y, return_inverse=True, axis=0) cluster_id = DBSCAN(eps=res_data * clus_thresh_xy, min_samples=min_samples). \ fit(x_y_uni).labels_ cluster_id = cluster_id[x_y_cpy] data_iter[clus_id_var].values[temp] = cluster_id + data_iter[clus_id_var].max() + 1 lowflow_df[clus_id_var].values[lowflow_df['iter_ev'].values] = data_iter[clus_id_var].values return lowflow_df
[docs] def filter_events(self, min_intensity=1, min_number_cells=1): """Remove events with max intensity below min_intensity or spatial extend below min_number_cells Parameters ---------- min_intensity : int or float Minimum criterion for intensity min_number_cells : int or float Minimum crietrion for number of grid cell Returns ------- Hazard """ haz_tmp = copy.deepcopy(self) haz_tmp.orig_tmp = copy.deepcopy(self.orig) haz_tmp.orig = np.array(np.ones(self.orig.size)) # identify events to be filtered out and set haz_tmp.orig to 0. for i_event, _ in enumerate(haz_tmp.event_id): if np.sum(haz_tmp.intensity[i_event] > 0) < min_number_cells or \ np.max(haz_tmp.intensity[i_event]) < min_intensity: haz_tmp.orig[i_event] = 0. haz_tmp = haz_tmp.select(orig=True) # select events with orig == 1 haz_tmp.orig = haz_tmp.orig_tmp # reset orig del haz_tmp.orig_tmp haz_tmp.event_id = np.arange(1, len(haz_tmp.event_id) + 1).astype(int) return haz_tmp
@staticmethod def _centroids_resolution(centroids): """Return resolution of the centroids in their units Parameters ---------- centroids : Centroids centroids instance Returns ------- float """ if centroids.meta: res_centr = abs(centroids.meta['transform'][4]), \ centroids.meta['transform'][0] else: res_centr = np.abs(u_coord.get_resolution(centroids.lat, centroids.lon)) if np.abs(res_centr[0] - res_centr[1]) > 1.0e-6: LOGGER.warning('Centroids do not represent regular pixels %s.', str(res_centr)) return (res_centr[0] + res_centr[1]) / 2 return res_centr[0] def _intensity_one_cluster(self, tree_centr, cluster_id, res_centr, num_centr): """For a given cluster, fill in an intensity np.array with the summed intensity at each centroid. Parameters ---------- cluster_id : int id of the selected cluster tree_centr : object BallTree instance created from centroids' coordinates res_centr : float resolution of centroids in degree num_centr : int number of centroids Returns ------- intensity_cl : np.array summed intensity of cluster at each centroids """ LOGGER.debug('Number of days below threshold corresponding to event %s.', str(cluster_id)) temp_data = self.lowflow_df.reindex( index=np.argwhere(np.array(self.lowflow_df['cluster_id'] == cluster_id)).reshape(-1), columns=['lat', 'lon', 'ndays']) # Identifies the unique (lat,lon) points of the lowflow_df dataframe -> lat_lon_uni # Set the same index value for each duplicate (lat,lon) points -> lat_lon_cpy lat_lon_uni, lat_lon_cpy = np.unique(temp_data[['lat', 'lon']].values, return_inverse=True, axis=0) index_uni = np.unique(lat_lon_cpy, axis=0) # Search closest centroid for each point ind, _ = tree_centr.query_radius(lat_lon_uni, r=res_centr / 2, count_only=False, return_distance=True, sort_results=True) ind = np.array([ind_i[0] if ind_i.size else -1 for ind_i in ind]) intensity_cl = _fill_intensity(num_centr, ind, index_uni, lat_lon_cpy, temp_data['ndays'].values) return intensity_cl @staticmethod def _intensity_one_cluster_pool(lowflow_df, tree_centr, cluster_id, res_centr, num_centr): """For a given cluster, fill in an intensity np.array with the summed intensity at each centroid. Version for self.pool = True Parameters ---------- lowflow_df : DataFrame tree_centr : object BallTree instance created from centroids' coordinates cluster_id : int id of the selected cluster res_centr : float resolution of centroids in degree num_centr : int number of centroids Returns ------- intensity_cl : np.array summed intensity of cluster at each centroids """ LOGGER.debug('Number of days below threshold corresponding to event %s.', str(cluster_id)) temp_data = lowflow_df.reindex( index=np.argwhere(np.array(lowflow_df['cluster_id'] == cluster_id)).reshape(-1), columns=['lat', 'lon', 'ndays']) lat_lon_uni, lat_lon_cpy = np.unique(temp_data[['lat', 'lon']].values, return_inverse=True, axis=0) index_uni = np.unique(lat_lon_cpy, axis=0) ind, _ = tree_centr.query_radius(lat_lon_uni, r=res_centr / 2, count_only=False, return_distance=True, sort_results=True) ind = np.array([ind_i[0] if ind_i.size else -1 for ind_i in ind]) intensity_cl = _fill_intensity(num_centr, ind, index_uni, lat_lon_cpy, temp_data['ndays'].values) return intensity_cl
def _init_centroids(dis_xarray, centr_res_factor=1): """Get centroids from the firms dataset and refactor them. Parameters ---------- dis_xarray : xarray dataset obtained from ISIMIP netcdf centr_res_factor : float the factor applied to voluntarly decrease/increase the centroids resolution Returns ------- centroids (Centroids) """ res_data = np.min(np.abs([np.diff(dis_xarray.lon.values).min(), np.diff(dis_xarray.lat.values).min()])) centroids = Centroids.from_pnt_bounds( (dis_xarray.lon.values.min(), dis_xarray.lat.values.min(), dis_xarray.lon.values.max(), dis_xarray.lat.values.max()), res=res_data / centr_res_factor) centroids.set_meta_to_lat_lon() centroids.set_area_approx() centroids.set_on_land() centroids.empty_geometry_points() return centroids def unique_clusters(lowflow_df): """identify unqiue clustes based on clusters in 3 dimensions and set unique cluster_id Parameters ---------- lowflow_df : pandas.DataFrame contains monthly gridded data of days below threshold Returns ------- lowflow_df : pandas.DataFrame As input with new values in column cluster_id """ lowflow_df.cluster_id = np.zeros(len(lowflow_df.c_lat_lon)) - 1 lowflow_df.loc[lowflow_df.c_lat_lon == -1, 'c_lat_lon'] = np.nan lowflow_df.loc[lowflow_df.c_lat_dt_month == -1, 'c_lat_dt_month'] = np.nan lowflow_df.loc[lowflow_df.c_lon_dt_month == -1, 'c_lon_dt_month'] = np.nan idc = 0 # event id counter current = 0 for c_lat_lon in lowflow_df.c_lat_lon.unique(): if np.isnan(c_lat_lon): lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'cluster_id'] = -1 else: if len(lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'cluster_id'].unique()) == 1 \ and -1 in lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'cluster_id'].unique(): idc += 1 current = idc else: current = max(lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'cluster_id'].unique()) lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'cluster_id'] = current for c_lat_dt_month in lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'c_lat_dt_month'].unique(): if not np.isnan(c_lat_dt_month): lowflow_df.loc[lowflow_df.c_lat_dt_month == c_lat_dt_month, 'cluster_id'] = current for c_lon_dt_month in lowflow_df.loc[lowflow_df.c_lat_lon == c_lat_lon, 'c_lon_dt_month'].unique(): if not np.isnan(c_lon_dt_month): lowflow_df.loc[lowflow_df.c_lon_dt_month == c_lon_dt_month, 'cluster_id'] = current lowflow_df.loc[np.isnan(lowflow_df.c_lon_dt_month), 'cluster_id'] = -1 lowflow_df.loc[np.isnan(lowflow_df.c_lat_dt_month), 'cluster_id'] = -1 lowflow_df.cluster_id = lowflow_df.cluster_id.astype(int) return lowflow_df def data_preprocessing_percentile(percentile, yearrange, yearrange_ref, input_dir, gh_model, cl_model, scenario, scenario_ref, soc, soc_ref, fn_str_var, bbox, min_days_per_month, keep_dis_data, yearchunks, mask_threshold): """load data and reference data and calculate monthly percentiles then extract intensity based on days below threshold returns geopandas dataframe Parameters ---------- c.f. parameters in LowFlow.set_from_nc() Returns ------- lowflow_df : pandas.DataFrame preprocessed data with days below threshold per grid cell and month centroids : Centroids regular grid centroid with same resolution as input data """ threshold_grid, mean_ref = _compute_threshold_grid(percentile, yearrange_ref, input_dir, gh_model, cl_model, scenario_ref, soc_ref, fn_str_var, bbox, yearchunks, mask_threshold=mask_threshold, keep_dis_data=keep_dis_data) first_file = True if yearchunks == 'default': yearchunks = YEARCHUNKS[scenario] # loop over yearchunks # (for memory reasons: only loading one file with daily data per step, # combining data after conversion to monthly data ) for yearchunk in yearchunks: # skip if file is not required, i.e., not in yearrange: if int(yearchunk[0:4]) <= yearrange[1] and int(yearchunk[-4:]) >= yearrange[0]: data_chunk = _read_and_combine_nc( (max(yearrange[0], int(yearchunk[0:4])), min(yearrange[-1], int(yearchunk[-4:]))), input_dir, gh_model, cl_model, scenario, soc, fn_str_var, bbox, [yearchunk]) data_chunk = _days_below_threshold_per_month(data_chunk, threshold_grid, mean_ref, min_days_per_month, keep_dis_data) if first_file: centroids = _init_centroids(data_chunk, centr_res_factor=1) dataf = _xarray_to_geopandas(data_chunk) first_file = False else: dataf = pd.concat([dataf, _xarray_to_geopandas(data_chunk)]) del data_chunk dataf = dataf.sort_values(['lat', 'lon', 'dtime'], ascending=[True, True, True]) return dataf.reset_index(drop=True), centroids def _read_and_combine_nc(yearrange, input_dir, gh_model, cl_model, scenario, soc, fn_str_var, bbox, yearchunks): """Import and combine data from nc files Parameters ---------- c.f. parameters in LowFlow.set_from_nc() Returns ------- dis_xarray : xarray """ first_file = True if yearchunks == 'default': yearchunks = YEARCHUNKS[scenario] for yearchunk in yearchunks: # skip if file is not required, i.e., not in yearrange: if int(yearchunk[0:4]) > yearrange[1] or int(yearchunk[-4:]) < yearrange[0]: continue if scenario == 'hist': bias_corr = 'nobc' else: bias_corr = 'ewembi' filepath = Path(input_dir, f'{gh_model}_{cl_model}_{bias_corr}_{scenario}_{soc}_{fn_str_var}_{yearchunk}.nc') if not filepath.is_file(): raise FileNotFoundError(f'Netcdf file not found: {filepath}') if first_file: dis_xarray = _read_single_nc(filepath, yearrange, bbox) first_file = False else: dis_xarray = dis_xarray.combine_first(_read_single_nc(filepath, yearrange, bbox)) # set negative discharge values to zero (debugging of input data): dis_xarray.dis.values[dis_xarray.dis.values < 0] = 0 return dis_xarray def _read_single_nc(filename, yearrange, bbox): """Import data from single nc file, return as xarray Parameters ---------- filename : str or pathlib.Path full path of input netcdf file yearrange : tuple year range to be extracted from file bbox : tuple of float geographical bounding box in the form: (lon_min, lat_min, lon_max, lat_max) Returns ------- dis_xarray : xarray """ dis_xarray = xr.open_dataset(filename) try: if not bbox: return dis_xarray.sel(time=slice(dt.datetime(yearrange[0], 1, 1), dt.datetime(yearrange[-1], 12, 31))) lon_min, lat_min, lon_max, lat_max = bbox return dis_xarray.sel(lon=slice(lon_min, lon_max), lat=slice(lat_max, lat_min), time=slice(dt.datetime(yearrange[0], 1, 1), dt.datetime(yearrange[-1], 12, 31))) except TypeError: # fix date format if not datetime if not bbox: dis_xarray = dis_xarray.sel(time=slice(cftime.DatetimeNoLeap(yearrange[0], 1, 1), cftime.DatetimeNoLeap(yearrange[-1], 12, 31))) else: lon_min, lat_min, lon_max, lat_max = bbox dis_xarray = dis_xarray.sel(lon=slice(lon_min, lon_max), lat=slice(lat_max, lat_min), time=slice(cftime.DatetimeNoLeap(yearrange[0], 1, 1), cftime.DatetimeNoLeap(yearrange[-1], 12, 31))) datetimeindex = dis_xarray.indexes['time'].to_datetimeindex() dis_xarray['time'] = datetimeindex return dis_xarray def _xarray_reduce(dis_xarray, fun=None, percentile=None): """wrapper function to reduce xarray along time axis Parameters ---------- dis_xarray : xarray fun : str function to be applied, either "mean" or "percentile" percentile : num percentile to be extracted, e.g. 5 for 5th percentile (only if fun=='percentile') Returns ------- xarray """ if fun == 'mean': return dis_xarray.mean(dim='time') if fun[0] == 'p': return dis_xarray.reduce(np.nanpercentile, dim='time', q=percentile) return None def _split_bbox(bbox, width=BBOX_WIDTH): """split bounding box into squares, return new set of bounding boxes Note: Could this function be a candidate for climada.util in the future? Parameters ---------- bbox : tuple of float geographical bounding box in the form: (lon_min, lat_min, lon_max, lat_max) width : float width and height of geographical bounding boxes for loop in degree lat/lon. i.e., the bounding box is split into square boxes with maximum size BBOX_WIDTH*BBOX_WIDTH Returns ------- bbox_list : list list of bounding boxes of the same format as bbox """ if not bbox: lon_min, lat_min, lon_max, lat_max = (-180, -85, 180, 85) else: lon_min, lat_min, lon_max, lat_max = bbox lons = [lon_min] + \ [int(idc) for idc in np.arange(np.ceil(lon_min+width-1), np.floor(lon_max-width+1), width)] + [lon_max] lats = [lat_min] + \ [int(idc) for idc in np.arange(np.ceil(lat_min+width-1), np.floor(lat_max-width+1), width)] + [lat_max] bbox_list = list() for ilon, _ in enumerate(lons[:-1]): for ilat, _ in enumerate(lats[:-1]): bbox_list.append([lons[ilon], lats[ilat], lons[ilon+1], lats[ilat+1]]) return bbox_list def _compute_threshold_grid(percentile, yearrange_ref, input_dir, gh_model, cl_model, scenario, soc, fn_str_var, bbox, yearchunks, mask_threshold=None, keep_dis_data=False): """given model run and year range specification, this function returns the x-th percentile for every pixel over a given time horizon (based on daily data) [all-year round percentiles!], as well as the mean at each grid cell. Parameters ---------- c.f. parameters in LowFlow.set_from_nc() mask_threshold : tuple or list Threshold(s) of below which the grid is masked out. e.g. ('mean', 1.) Returns ------- p_grid : xarray.Dataset grid with dis of given percentile (1-timestep) mean_grid : xarray.Dataset grid with mean(dis) """ LOGGER.info('Computing threshold value per grid cell for Q%i, %i-%i', percentile, yearrange_ref[0], yearrange_ref[1]) if isinstance(mask_threshold, tuple): mask_threshold = [mask_threshold] bbox = _split_bbox(bbox) p_grid_list = [] mean_grid_list = [] # loop over coordinate bounding boxes to save memory: for box in bbox: dis_xarray = _read_and_combine_nc(yearrange_ref, input_dir, gh_model, cl_model, scenario, soc, fn_str_var, box, yearchunks) if dis_xarray.dis.data.size: # only if data is not empty p_grid_list += [_xarray_reduce(dis_xarray, fun='p', percentile=percentile)] # only compute mean_grid if required by user or mask_threshold: if keep_dis_data or (mask_threshold and True in ['mean' in x for x in mask_threshold]): mean_grid_list += [_xarray_reduce(dis_xarray, fun='mean')] del dis_xarray p_grid = xr.combine_by_coords(p_grid_list) del p_grid_list if mean_grid_list: mean_grid = xr.combine_by_coords(mean_grid_list) del mean_grid_list if isinstance(mask_threshold, list): for crit in mask_threshold: if 'mean' in crit[0]: p_grid.dis.values[mean_grid.dis.values < crit[1]] = 0 mean_grid.dis.values[mean_grid.dis.values < crit[1]] = 0 if 'percentile' in crit[0]: p_grid.dis.values[p_grid.dis.values < crit[1]] = 0 mean_grid.dis.values[p_grid.dis.values < crit[1]] = 0 if keep_dis_data: return p_grid, mean_grid return p_grid, None def _days_below_threshold_per_month(dis_xarray, threshold_grid, mean_ref, min_days_per_month, keep_dis_data): """returns sum of days below threshold per month (as xarray with monthly data) if keep_dis_data is True, a DataFrame called 'lowflow_df' with additional data is saved within the hazard object. It provides data per event, grid cell, and month lowflow_df comes with the following columns: ['lat', 'lon', 'time', 'ndays', 'relative_dis', 'iter_ev', 'cons_id', 'dtime', 'dt_month', 'geometry', 'cluster_id', 'c_lat_lon', 'c_lat_dt_month', 'c_lon_dt_month'] Note: cluster_id corresponds 1:1 with associated event_id. Parameters ---------- c.f. parameters in LowFlow.set_from_nc() Returns ------- xarray """ # data = data.groupby('time.month')-threshold_grid # outdated data_threshold = dis_xarray - threshold_grid if keep_dis_data: data_low = dis_xarray.where(data_threshold < 0) / mean_ref data_low = data_low.resample(time='1M').mean() data_threshold.dis.values[data_threshold.dis.values >= 0] = 0 data_threshold.dis.values[data_threshold.dis.values < 0] = 1 data_threshold = data_threshold.resample(time='1M').sum() data_threshold.dis.values[data_threshold.dis.values < min_days_per_month] = 0 data_threshold = data_threshold.rename({'dis': 'ndays'}) if keep_dis_data: data_threshold['relative_dis'] = data_low['dis'] return data_threshold.where(data_threshold['ndays'] > 0) def _xarray_to_geopandas(dis_xarray): """create GeoDataFrame from xarray with NaN values dropped Note: Could this function be a candidate for climada.util in the future? Parameters ---------- dis_xarray : xarray data as xarray object Returns ------- lowflow_df : GeoDataFrame """ dataf = dis_xarray.to_dataframe() dataf.reset_index(inplace=True) dataf = dataf.dropna() dataf['iter_ev'] = np.ones(len(dataf), bool) dataf['cons_id'] = np.zeros(len(dataf), int) - 1 dataf['dtime'] = dataf['time'].apply(lambda x: x.toordinal()) dataf['dt_month'] = dataf['time'].apply(lambda x: x.year * 12 + x.month) return gpd.GeoDataFrame(dataf, geometry=[Point(x, y) for x, y in zip(dataf['lon'], dataf['lat'])]) @numba.njit def _fill_intensity(num_centr, ind, index_uni, lat_lon_cpy, intensity_raw): """fill intensity list for a single cluster Parameters ---------- num_centr : int total number of centroids ind : list list of centroid indices in cluster lat_lon_cpy : array of int index according to (lat, lon) in intensity_raw intensity_raw : array of int array of ndays values at each data point in cluster Returns ------- list with summed ndays (=intensity) per geographical point (lat, lon) """ intensity_cl = np.zeros((1, num_centr), dtype=numba.float64) for idx in range(index_uni.size): if ind[idx] != -1: intensity_cl[0, ind[idx]] = \ np.sum(intensity_raw[lat_lon_cpy == index_uni[idx]]) return intensity_cl[0]