"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define Drought class.
"""
__all__ = ['Drought']
import logging
from pathlib import Path
import numpy as np
import xarray as xr
import pandas as pd
from scipy import sparse
import matplotlib as mpl
from matplotlib import pyplot as plt
from climada import CONFIG
from climada.hazard import Hazard, Centroids
from climada.util.files_handler import download_file
from climada.util.dates_times import datetime64_to_ordinal
from climada.util.constants import SYSTEM_DIR
from climada.util.dates_times import str_to_date, date_to_str
logging.root.setLevel(logging.DEBUG)
LOGGER = logging.getLogger(__name__)
DFL_THRESHOLD = -1
DFL_INTENSITY_DEF = 1
SPEI_FILE_URL = CONFIG.hazard.drought.resources.spei_file_url.str()
SPEI_FILE_NAME = SPEI_FILE_URL.split("/")[-1]
SPEI_FILE_DIR = SYSTEM_DIR
LATMIN = 44.5
LATMAX = 50
LONMIN = 5
LONMAX = 12
HAZ_TYPE = 'DR'
"""Hazard type acronym Drought"""
[docs]
class Drought(Hazard):
"""Contains drought events.
Attributes
----------
SPEI : float
Standardize Precipitation Evapotraspiration Index
"""
vars_opt = Hazard.vars_opt.union({'spei'})
"""Name of the variables that aren't need to compute the impact."""
[docs]
def __init__(self):
"""Empty constructor."""
Hazard.__init__(self, HAZ_TYPE)
self.file_path = Path(SPEI_FILE_DIR, SPEI_FILE_NAME)
self.threshold = DFL_THRESHOLD
self.intensity_definition = DFL_INTENSITY_DEF
self.latmin = LATMIN
self.latmax = LATMAX
self.lonmin = LONMIN
self.lonmax = LONMAX
[docs]
def set_area(self, latmin, lonmin, latmax, lonmax):
"""Set the area to analyse"""
self.latmin = latmin
self.latmax = latmax
self.lonmin = lonmin
self.lonmax = lonmax
[docs]
def set_file_path(self, path):
"""Set path of the SPEI data"""
self.file_path = Path(path)
[docs]
def set_threshold(self, threshold):
"""Set threshold"""
self.threshold = threshold
[docs]
def set_intensity_def(self, intensity_definition):
"""Set intensity definition"""
self.intensity_definition = intensity_definition
def __read_indices_spei(self, dataset):
"""Read the NETCDF file containing SPEI"""
lat_total = dataset.lat.data
lon_total = dataset.lon.data
index_lon = np.where((lon_total >= self.lonmin) & (lon_total <= self.lonmax))[0]
index_lat = np.where((lat_total >= self.latmin) & (lat_total <= self.latmax))[0]
lat_vector = dataset.lat[index_lat[0]:index_lat[len(index_lat) - 1]].data
lon_vector = dataset.lon[index_lon[0]:index_lon[len(index_lon) - 1]].data
self.time_vector = dataset.time.data
self.lat_vector = lat_vector
self.lon_vector = lon_vector
self.timeforname = self.time_vector
spei_matrix = dataset.spei[:, index_lat[0]:index_lat[len(index_lat) - 1],
index_lon[0]:index_lon[len(index_lon) - 1]].data
return spei_matrix
[docs]
def setup(self):
"""Set up the hazard drought"""
try:
if not self.file_path.is_file():
download_file(SPEI_FILE_URL, download_dir=SPEI_FILE_DIR)
LOGGER.debug('Importing %s', str(SPEI_FILE_NAME))
dataset = xr.open_dataset(self.file_path)
except Exception as err:
raise type(err)('Importing the SPEI data file failed: ' + str(err)) from err
spei_3d = self.__read_indices_spei(dataset)
spei_2d = self.__traslate_matrix(spei_3d)
intensity_matrix_min = self.__get_intensity_from_2d(spei_2d, self.intensity_definition)
self.hazard_def(intensity_matrix_min)
return self
def __traslate_matrix(self, spei_3d):
"""return hazard intensity as a simple threshold on the SPEI values
Parameters
----------
see read_indices_spei, just call before
Returns
-------
matrix, sparse.csr_matrix
"""
intensity_thres = self.threshold
n_centroids = spei_3d.shape[1] * spei_3d.shape[2]
n_timesteps = spei_3d.shape[0]
spei_2d = np.zeros((n_timesteps, n_centroids))
for i in range(n_timesteps):
one_event_1d = spei_3d[i, :, :]
# get rid of nan's
nan_pos = np.isnan(one_event_1d)
one_event_1d[nan_pos] = 0
# apply threshold
non_drought_pos = np.where(one_event_1d > intensity_thres)
one_event_1d[non_drought_pos] = 0
one_event_array = one_event_1d.reshape(n_centroids)
spei_2d[i, :] = one_event_array
return spei_2d
[docs]
def hazard_def(self, intensity_matrix):
"""return hazard set
Parameters
----------
see intensity_from_spei
Returns
-------
Drought, full hazard set
check using new_haz.check()
"""
if self.intensity_definition == 2:
# TODO: what is the purpose of re-assigning the module constant HAZ_TYPE?
HAZ_TYPE = 'DR_sumthr'
self.haz_type = HAZ_TYPE
elif self.intensity_definition == 3:
# TODO: HAZ_TYPE, s.a.
HAZ_TYPE = 'DR_sum'
self.haz_type = HAZ_TYPE
self.intensity = sparse.csr_matrix(intensity_matrix)
self.units = 'SPEI'
# fill centroids th bad way (there must be a code like grid...)
lat_2d = np.zeros([self.lat_vector.shape[0], self.lon_vector.shape[0]])
lon_2d = np.zeros([self.lat_vector.shape[0], self.lon_vector.shape[0]])
n_centroids = self.lat_vector.shape[0] * self.lon_vector.shape[0]
for lat_i in range(0, self.lat_vector.shape[0]):
for lon_i in range(0, self.lon_vector.shape[0]):
lat_2d[lat_i, lon_i] = self.lat_vector[lat_i]
lon_2d[lat_i, lon_i] = self.lon_vector[lon_i]
lon_1d = lon_2d.reshape(n_centroids,)
lat_1d = lat_2d.reshape(n_centroids,)
self.centroids = Centroids.from_lat_lon(lat_1d, lon_1d)
self.event_id = np.arange(1, self.n_years + 1, 1)
# frequency set when all eventsavailable
#self.frequency = np.array([1])
# per default equal to event_id
name_list = []
time = pd.to_datetime(self.timeforname)
for i in range(13, len(time), 12):
name_list.append(str(time[i].year))
self.event_name = name_list
self.frequency = np.ones(self.n_years) / self.n_years
self.fraction = self.intensity.copy()
self.fraction = self.intensity.copy().tocsr()
self.fraction.data.fill(1)
self.date = np.arange(1, self.n_years + 1, 1)
# new_haz.orig =
self.check()
return self
def __get_intensity_from_2d(self, spei_2d, intensity_definition=1):
"""Parameters: the 2D matrix called 'spei_2D' defined in
intensity_from_spei, which containes every time and spacial resolution
pixel with either the SPEI value or zero if the pixel value doesn't
reach the threshold value.
Returns: matrix
The matrix with the intensity of every event (maximum one per year).
The intensity is simply the maximum value for
the event."""
#n_centroids = spei_2d.shape[1]
time = pd.to_datetime(self.time_vector)
first_year = time[0].year + 1
#first_month = time[0].month
# index_offset to get index of january of first year considered
index_offset = 12 - time[0].month + 1
if time[0].month > 10:
first_year += 1
index_offset += 12
last_year = time[len(time) - 1].year
if time[len(time) - 1].month < 9:
last_year -= 1
n_years = last_year - first_year + 1 # the first year not counted
#years_vector = np.arange(first_year, last_year)
self.date = np.arange(first_year, last_year)
self.n_years = n_years
intensity_min_matrix = np.zeros((n_years, spei_2d.shape[1]))
intensity_sum_matrix = np.zeros((n_years, spei_2d.shape[1]))
intensity_sum_without_th_matrix = np.zeros((n_years, spei_2d.shape[1]))
date_start_matrix = np.zeros((n_years, spei_2d.shape[1]))
date_end_matrix = np.zeros((n_years, spei_2d.shape[1]))
time = time[index_offset - 3: index_offset + 12 * n_years - 3]
self.time_vector = self.time_vector[index_offset - 3: index_offset +
12 * n_years - 3]
for pixel in range(spei_2d.shape[1]):
array_time_centroid = spei_2d[index_offset - 3: index_offset +
12 * n_years - 3, pixel]
list_events = self.__create_list(array_time_centroid)
[intmin, intsum, intsumthr, start, end] = self.__read_list(
list_events, np.arange(first_year, last_year), first_year)
intensity_min_matrix[:, pixel] = intmin
intensity_sum_matrix[:, pixel] = intsum
intensity_sum_without_th_matrix[:, pixel] = intsumthr
date_start_matrix[:, pixel] = start
date_end_matrix[:, pixel] = end
#self.date_start = date_start_matrix
self.date_end = date_end_matrix
self.date_start = sparse.csr_matrix(date_start_matrix)
if intensity_definition == 1:
return intensity_min_matrix
if intensity_definition == 2:
return intensity_sum_without_th_matrix
return intensity_sum_matrix
def __create_list(self, array_time_in_centroid):
"""Return a list of all the events exceeding the threshold.
The list contains start end date, the minimum value, the sum (integral)
and the sum minus threshold"""
event = 0
min_spei = 0
sum_spei = 0
sum_spei_thr = []
start_time = []
end_time = []
list_events_info = list()
# create a list with every event exeeding the threshold
for time_idx in range(len(array_time_in_centroid)):
# for time_idx in enumerate(array_time_in_centroid):
if array_time_in_centroid[time_idx] == 0:
if event:
event = 0
list_events_info.append([start_time, end_time, min_spei,
sum_spei, sum_spei_thr])
min_spei = 0
sum_spei = 0
sum_spei_thr = 0
else:
if event:
end_time = self.time_vector[time_idx]
sum_spei += array_time_in_centroid[time_idx]
sum_spei_thr += (array_time_in_centroid[time_idx] - self.threshold)
if array_time_in_centroid[time_idx] < min_spei:
min_spei = array_time_in_centroid[time_idx]
else:
start_time = self.time_vector[time_idx]
end_time = self.time_vector[time_idx]
min_spei = array_time_in_centroid[time_idx]
sum_spei = array_time_in_centroid[time_idx]
sum_spei_thr = sum_spei - self.threshold
event = 1
return list_events_info
def __read_list(self, list_events_info, years_vector, first_year):
"""read the list created in method __create_list and choose the
events to include in the hazard set. For every year maximum one
event is taken, the one with the lowest spei value on it"""
intensity_min_array = np.zeros((self.n_years))
intensity_sum_array = np.zeros((self.n_years))
intensity_sum_thr_array = np.zeros((self.n_years))
date_start_array = np.zeros((self.n_years))
date_end_array = np.zeros((self.n_years))
year_offset = first_year
min_spei_offset = 0
for idx_event in range(0, len(list_events_info)):
start = list_events_info[idx_event][0]
end = list_events_info[idx_event][1]
min_spei = list_events_info[idx_event][2]
sum_spei = list_events_info[idx_event][3]
sum_spei_thr = list_events_info[idx_event][4]
year_start = pd.to_datetime(list_events_info[idx_event][0]).year
month_start = pd.to_datetime(list_events_info[idx_event][0]).month
if month_start > 10:
year_start += 1
idx_year = np.where(years_vector == year_start)
if year_offset == year_start:
if min_spei < min_spei_offset:
intensity_min_array[idx_year] = min_spei
intensity_sum_array[idx_year] = sum_spei
intensity_sum_thr_array[idx_year] = sum_spei_thr
date_start_array[idx_year] = datetime64_to_ordinal(start)
date_end_array[idx_year] = datetime64_to_ordinal(end)
min_spei_offset = min_spei
else:
intensity_min_array[idx_year] = min_spei
intensity_sum_array[idx_year] = sum_spei
intensity_sum_thr_array[idx_year] = sum_spei_thr
date_start_array[idx_year] = datetime64_to_ordinal(start)
date_end_array[idx_year] = datetime64_to_ordinal(end)
min_spei_offset = min_spei
year_offset = year_start
return intensity_min_array, intensity_sum_array, \
intensity_sum_thr_array, date_start_array, date_end_array
[docs]
def plot_intensity_drought(self, event=None):
"""plot drought intensity"""
# limits of the plots and colormap settings
if self.intensity_definition == 1:
minimum = -4
colourmap = 'afmhot'
elif self.intensity_definition == 2:
minimum = -8
colourmap = 'hot'
else:
minimum = -15
colourmap = 'gist_heat'
# create boundaries between minimum and maximum
boundaries = np.arange(minimum, -1 + 0.02, 0.01)
# create list of colors from colormap
list_colours = mpl.cm.get_cmap(colourmap, len(boundaries))
index_thr = np.where(boundaries > self.threshold)[0]
if self.intensity_definition == 2:
colors = list(list_colours(np.arange(len(boundaries))))
else:
colors = list(list_colours(np.arange(len(boundaries) - len(index_thr))))
# set all values above the threshold to white
colors.extend(["white" for x in range(len(index_thr))])
# define colourmap
cmap = mpl.colors.ListedColormap(colors[1:], "")
# set over-shoot to last color of list
cmap.set_over(colors[len(colors) - 1])
if self.intensity_definition == 2:
self.plot_intensity(event=event, cmap=cmap, vmin=minimum, vmax=0.01)
else:
self.plot_intensity(event=event, cmap=cmap, vmin=minimum, vmax=-1 + 0.01)
[docs]
def post_processing(self, date):
"""Date in format '2003-08-01'
Sets intensity of events starting after that date to zero"""
year = date[:4]
index_event = self.event_name.index(year)
shape_haz = self.intensity.shape
month = str_to_date(date)
for i in range(shape_haz[1]):
if self.date_start[index_event, i] >= month:
self.intensity[index_event, i] = 0
self.date_start[index_event, i] = 0
self.intensity = sparse.csr_matrix(self.intensity)
self.date_start = sparse.csr_matrix(self.date_start)
[docs]
def plot_start_end_date(self, event=None):
"""plot start and end date of the chosen event"""
startyear = str_to_date(str(int(event) - 1) + '-09-15')
startdate = str_to_date(str(int(event) - 1) + '-10-01')
enddate = str_to_date(str(int(event) + 1) + '-01-01')
dates = np.arange(np.ceil(startdate / 100) * 100,
np.ceil(startdate / 100) * 100 + 400,
100)
list_dates = list()
for i in range(len(dates)):
list_dates.append(date_to_str(dates.astype(np.int64)[i]))
colourmap = 'plasma'
boundaries = np.arange(startyear, enddate, 15)
# create list of colors from colormap
cmap_reds = mpl.cm.get_cmap(colourmap, len(boundaries))
index_thr = np.where(boundaries < startdate)[0]
colors = ["white" for x in range(len(index_thr))]
colors.extend(list(cmap_reds(np.arange(len(boundaries) - len(index_thr)))))
# define colourmap
cmap = mpl.colors.ListedColormap(colors[1:], "")
# set over-color to last color of list
cmap.set_over(colors[len(colors) - 1])
# Plot Start
self.intensity = sparse.csr_matrix(self.date_start)
self.plot_intensity(event=event, cmap=cmap, vmin=startdate,
vmax=enddate, snap="true")
plt.ylabel('Date')
plt.yticks(dates, list_dates)
# Plot End
self.intensity = sparse.csr_matrix(self.date_end)
self.plot_intensity(event=event, cmap=cmap, vmin=startdate,
vmax=enddate, snap="true")
plt.ylabel('Date')
plt.yticks(dates, list_dates)