"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define WildFire class.
"""
__all__ = ['WildFire']
import logging
import itertools
from dataclasses import dataclass
from datetime import date
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy import sparse
from sklearn.neighbors import BallTree
from sklearn.cluster import DBSCAN
import numba
from climada.hazard.centroids.centr import Centroids
from climada.hazard.base import Hazard
from climada.util.constants import ONE_LAT_KM
import climada.util.dates_times as u_dt
import climada.util.coordinates as u_coord
LOGGER = logging.getLogger(__name__)
HAZ_TYPE = 'WF'
""" Hazard type acronym for Wild Fire, might be changed to WFseason or WFsingle """
[docs]
class WildFire(Hazard):
"""Contains wild fire events.
Wildfires comprise the challenge that the definition of an event is unclear.
Reporting standards vary accross regions and over time. Hence, to have
consistency, we consider an event as a whole fire season. A fire season
is defined as a whole year (Jan-Dec in the NHS, Jul-Jun in SHS). This allows
consistent risk assessment across the globe and over time. Hazard for
which events refer to a fire season have the haz_type 'WFseason'.
In order to perform concrete case studies or calibrate impact functions,
events can be displayed as single fires. In that case they have the haz_type
'WFsingle'.
Attributes
----------
date_end : array
integer date corresponding to the proleptic Gregorian ordinal,
where January 1 of year 1 has ordinal 1 (ordinal format of
datetime library). Represents last day of a wild fire instance
where the fire was still active.
n_fires : array
number of single fires in a fire season
"""
[docs]
@dataclass
class FirmsParams():
""" DataClass as container for firms parameters.
Attributes
----------
clean_thresh : int, default = 30
Minimal confidence value for the data from MODIS instrument
to be use as input
days_thres_firms : int, default = 2
Minimum number of days to consider different fires
clus_thres_firms : int, default = 15
Clustering factor which multiplies instrument resolution
remove_minor_fires_firms : bool, default = True
removes FIRMS fires below defined theshold of entries
minor_fire_thres_firms : int, default = 3
number of FIRMS entries required to be considered a fire
"""
clean_thresh: int = 30
days_thres_firms: int = 2
clus_thres_firms: int = 15
remove_minor_fires_firms: bool = True
minor_fire_thres_firms: int = 3
[docs]
@dataclass
class ProbaParams():
""" Dataclass as container for parameters for generation of
probabilistic events.
PLEASE BE AWARE: Parameter values did not undergo any calibration.
Attributes
----------
blurr_steps : int, default = 4
steps with exponential decay for fire propagation matrix
prop_proba : float, default = 0.21
max_it_propa : int, default = 500000
"""
blurr_steps: int = 4
prop_proba: float = 0.21
max_it_propa: int = 500000
[docs]
def __init__(self):
"""Empty constructor. """
Hazard.__init__(self, HAZ_TYPE)
self.FirmsParams = self.FirmsParams()
self.ProbaParams = self.ProbaParams()
[docs]
@classmethod
def from_hist_fire_FIRMS(cls, df_firms, centr_res_factor=1.0, centroids=None):
""" Parse FIRMS data and generate historical fires by temporal and spatial
clustering. Single fire events are defined as a set of data points
that are geographically close and/or have consecutive dates. The
unique identification is made in two steps. First a temporal
clustering is applied to cleaned data obtained from FIRMS. Data points
with acquisition dates more than days_thres_firms days apart are
in different temporal clusters. Second, for each temporal cluster,
unique event are identified by performing a spatial clustering. This
is done iteratively until all firms data points are assigned to an event.
This method sets the attributes self.n_fires, self.date_end, in
addition to all attributes required by the hazard class.
This method creates a centroids raster if centroids=None with
resolution given by centr_res_factor. The centroids can be retrieved
from Wildfire.centroids()
Parameters
----------
df_firms : pd.DataFrame
FIRMS data as pd.Dataframe
(https://firms.modaps.eosdis.nasa.gov/download/)
centr_res_factor : float, optional, default=1.0
resolution factor with respect to the satellite data to use
for centroids creation. Hence, if MODIS data (1 km res) is
used and centr_res_factor is set to 0.2, the grid spacing of
the generated centroids will equal 5 km (=1/0.2). If centroids
are defined, this parameter has no effect.
centroids : Centroids, optional
centroids in degrees to map data, centroids need to be on a
regular raster grid in order for the clustrering to work.
Returns
----------
haz : WildFire instance
"""
haz = cls()
# read and initialize data
df_firms = haz._clean_firms_df(df_firms)
# compute centroids
res_data = haz._firms_resolution(df_firms)
if not centroids:
centroids = haz._firms_centroids_creation(df_firms, res_data, centr_res_factor)
else:
if not centroids.lat.any():
centroids.set_meta_to_lat_lon()
res_centr = haz._centroids_resolution(centroids)
# fire identification
while df_firms.iter_ev.any():
# Compute cons_id: consecutive fires in current iteration
haz._firms_cons_days(df_firms)
# Compute clus_id: cluster identifier inside cons_id
haz._firms_clustering(df_firms, res_data)
# compute event_id
haz._firms_fire(df_firms)
LOGGER.info('Remaining fires to identify: %s.', str(np.argwhere(\
df_firms.iter_ev.values).size))
# remove minor fires
if haz.FirmsParams.remove_minor_fires_firms:
df_firms = haz._firms_remove_minor_fires(df_firms,
haz.FirmsParams.minor_fire_thres_firms)
# compute brightness and fill class attributes
LOGGER.info('Computing intensity of %s fires.',
np.unique(df_firms.event_id).size)
haz._calc_brightness(df_firms, centroids, res_centr)
return haz
[docs]
def set_hist_fire_FIRMS(self, *args, **kwargs):
"""This function is deprecated, use WildFire.from_hist_fire_FIRMS instead."""
LOGGER.warning("The use of WildFire.set_hist_fire_FIRMS is deprecated."
"Use WildFire.from_hist_fire_FIRMS .")
self.__dict__ = WildFire.from_hist_fire_FIRMS(*args, **kwargs).__dict__
[docs]
@classmethod
def from_hist_fire_seasons_FIRMS(cls, df_firms, centr_res_factor=1.0,
centroids=None, hemisphere=None,
year_start=None, year_end=None,
keep_all_fires=False):
""" Parse FIRMS data and generate historical fire seasons.
Individual fires are created using temporal and spatial clustering
according to the 'set_hist_fire_FIRMS' method. single fires are then
summarized to seasons using max intensity at each centroid for each year.
This method sets the attributes self.n_fires, self.date_end, in
addition to all attributes required by the hazard class.
This method creates a centroids raster if centroids=None with
resolution given by centr_res_factor. The centroids can be retrieved
from Wildfire.centroids()
Parameters
----------
df_firms : pd.DataFrame
FIRMS data as pd.Dataframe
(https://firms.modaps.eosdis.nasa.gov/download/)
centr_res_factor : float, optional, default=1.0
resolution factor with respect to the satellite data to use
for centroids creation
centroids : Centroids, optional
centroids in degrees to map data, centroids need to be on a
regular grid in order for the clustrering to work.
hemisphere : str, optional
'SHS' or 'NHS' to define fire seasons. The hemisphere parameter
is only used for the definition of the start of the fire season
year_start : int, optional
start year; FIRMS fires before that are cut; no cut if not
specified
year_end : int, optional
end year; FIRMS fires after that are cut; no cut if not
specified
keep_all_fires : bool, optional
keep list of all individual fires; default is False to save
memory. If set to true, fires are stored in self.hist_fire_seasons
Returns
----------
haz : WildFire instance
"""
LOGGER.info('Setting up historical fires for year set.')
haz = cls()
# read and initialize data
df_firms = haz._clean_firms_df(df_firms)
# compute centroids
res_data = haz._firms_resolution(df_firms)
if not centroids:
centroids = haz._firms_centroids_creation(df_firms, res_data, centr_res_factor)
else:
if not centroids.coord.size:
centroids.set_meta_to_lat_lon()
# define hemisphere
if hemisphere is None:
if centroids.lat[0] > 0:
hemisphere = 'NHS'
elif centroids.lat[0] < 0:
hemisphere = 'SHS'
if not all(i >= 0 for i in centroids.lat) or \
all(i <= 0 for i in centroids.lat):
LOGGER.warning('Not all centroids are on the same hemisphere. \
Hemisphere is set to: %s.', hemisphere)
# define years
year_i = year_start if year_start is not None else \
date.fromordinal(df_firms.datenum.min()).year
year_e = year_end if year_end is not None else \
date.fromordinal(df_firms.datenum.max()).year
years = np.arange(year_i, year_e+1)
# make fire seasons
hist_fire_seasons = [] # list to save fire seasons
for year in years:
LOGGER.info('Setting up historical fire seasons %s.', str(year))
firms_temp = haz._select_fire_season(df_firms, year, hemisphere=hemisphere)
# calculate historic fire seasons
wf_year = WildFire()
wf_year.set_hist_fire_FIRMS(firms_temp, centroids=centroids)
hist_fire_seasons.append(wf_year)
# fires season (used to define distribution of n fire for the
# probabilistic fire seasons)
n_fires = np.zeros(len(years))
for idx, wf in enumerate(hist_fire_seasons):
n_fires[idx] = len(wf.event_id)
if keep_all_fires:
haz.hist_fire_seasons = hist_fire_seasons
# save
haz.haz_type = 'WFseason'
haz.centroids = centroids
haz.n_fires = n_fires
haz.units = 'K' # Kelvin brightness
# Following values are defined for each fire
haz.event_id = np.arange(1, len(years)+1).astype(int)
haz.event_name = list(map(str, years))
haz.date = np.zeros(len(years), int)
haz.date_end = np.zeros(len(years), int)
if hemisphere == 'NHS':
for y_idx, year in enumerate(years):
haz.date[y_idx] = date.toordinal(date(year, 1, 1))
haz.date_end[y_idx] = date.toordinal(date(year, 12, 31))
elif hemisphere == 'SHS':
for y_idx, year in enumerate(years):
haz.date[y_idx] = date.toordinal(date(year, 7, 1))
haz.date_end[y_idx] = date.toordinal(date(year+1, 6, 30))
haz.orig = np.ones(len(years), bool)
haz._set_frequency()
# Following values are defined for each fire and centroid
haz.intensity = sparse.lil_matrix(np.zeros((len(years), len(centroids.lat))))
for idx, wf in enumerate(hist_fire_seasons):
haz.intensity[idx] = wf.intensity.max(axis=0)
haz.intensity = haz.intensity.tocsr()
haz.fraction = haz.intensity.copy()
haz.fraction.data.fill(1.0)
return haz
[docs]
def set_hist_fire_seasons_FIRMS(self, *args, **kwargs):
"""This function is deprecated, use WildFire.from_hist_fire_seasons_FIRMS instead."""
LOGGER.warning("The use of WildFire.set_hist_fire_seasons_FIRMS is deprecated."
"Use WildFire.from_hist_fire_seasons_FIRMS .")
self.__dict__ = WildFire.from_hist_fire_seasons_FIRMS(*args, **kwargs).__dict__
[docs]
def set_proba_fire_seasons(self, n_fire_seasons=1, n_ignitions=None,
keep_all_fires=False):
""" Generate probabilistic fire seasons.
Fire seasons are created by running n probabilistic fires per year
which are then summarized into a probabilistic fire season by
calculating the max intensity at each centroid for each probabilistic
fire season.
Probabilistic fires are created using the logic described in the
method '_run_one_bushfire'.
The fire propagation matrix can be assigned separately, if that is not
done it will be generated on the available historic fire (seasons).
Intensities are drawn randomly from historic events. Thus, this method
requires at least one fire to draw from.
This method modifies self (climada.hazard.WildFire instance)
by adding probabilistic wildfire seasons.
Parameters
----------
self : climada.Hazard.WildFire
must have calculated historic fire seasons before
n_fire_seasons : int, optional
number of fire seasons to be generated
n_ignitions : array, optional
[min, max]: min/max of uniform distribution to sample from,
in order to determin n_fire per probabilistic year set.
If none, min/max is taken from hist.
keep_all_fires : bool, optional
keep detailed list of all fires; default is False to save
memory.
"""
# min/max for uniform distribtion to sample for n_fires per year
if n_ignitions is None:
ign_min = np.min(self.n_fires)
ign_max = np.max(self.n_fires)
else:
ign_min = n_ignitions[0]
ign_max = n_ignitions[1]
prob_fire_seasons = [] # list to save probabilistic fire seasons
# create probabilistic fire seasons
for i in range(n_fire_seasons):
n_ign = np.random.randint(ign_min, ign_max)
LOGGER.info('Setting up probabilistic fire season with %s fires.',\
str(n_ign))
prob_fire_seasons.append(self._set_one_proba_fire_season(n_ign, seed=i))
if keep_all_fires:
self.prob_fire_seasons = prob_fire_seasons
# save
# Following values are defined for each fire
new_event_id = np.arange(np.max(self.event_id)+1, np.max(self.event_id)+n_fire_seasons+1)
self.event_id = np.concatenate((self.event_id, new_event_id), axis=None)
new_event_name = list(map(str, new_event_id))
self.event_name = np.append(self.event_name, new_event_name)
new_orig = np.zeros(len(new_event_id), bool)
self.orig = np.concatenate((self.orig, new_orig))
self._set_frequency()
# Following values are defined for each event and centroid
new_intensity = sparse.lil_matrix((np.zeros([n_fire_seasons, len(self.centroids.lat)])))
for idx, wf in enumerate(prob_fire_seasons):
new_intensity[idx] = sparse.csr_matrix(wf).max(0)
new_intensity = new_intensity.tocsr()
self.intensity = sparse.vstack([self.intensity, new_intensity],
format='csr')
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
[docs]
def combine_fires(self, event_id_merge=None, remove_rest=False,
probabilistic=False):
""" Combine events that are identified as different fire to one event
Orig fires are removed and a new fire id created; max intensity at
overlapping centroids is assigned.
This method modifies self (climada.hazard.WildFire instance) by
combining single fires.
Parameters
----------
event_id_merge : array of int, optional
events to be merged
remove_rest : bool, optional
if set to true, only the merged event is returned.
probabilistic : bool, optional
differentiate, because probabilistic events have no date.
"""
if probabilistic is False:
if event_id_merge is not None:
# get index of events to merge
evt_idx_merge = []
for i in event_id_merge:
evt_idx_merge.append(np.argwhere(self.event_id == i).reshape(-1)[0])
# get dates
date_start = np.min(self.date[evt_idx_merge])
date_end = np.max(self.date_end[evt_idx_merge])
if remove_rest:
self.intensity = sparse.csr_matrix( \
np.amax(self.intensity[evt_idx_merge], 0))
self.event_id = np.array([np.max(self.event_id)+1])
self.event_name = list(map(str, self.event_id))
self.date = np.array([date_start])
self.date_end = np.array([date_end])
self.orig = np.ones(1, bool)
self._set_frequency()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
else:
# merge event & append
self.intensity = sparse.vstack([self.intensity, \
np.amax(self.intensity[evt_idx_merge], 0)], format='csr')
self.event_id = np.append(self.event_id, np.max(self.event_id)+1)
self.event_name = list(map(str, self.event_id))
self.date = np.append(self.date, date_start)
self.date_end = np.append(self.date_end, date_end)
self.orig = np.append(self.orig, np.ones(1, bool))
# remove initial events
self.intensity = self.intensity[np.delete(np.arange(0, \
self.intensity.get_shape()[0]), evt_idx_merge)]
self.event_id = np.delete(self.event_id, evt_idx_merge)
self.event_name = list(map(str, self.event_id))
self.date = np.delete(self.date, evt_idx_merge)
self.date_end = np.delete(self.date_end, evt_idx_merge)
self.orig = np.delete(self.orig, evt_idx_merge)
self._set_frequency()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
else:
self.intensity = sparse.csr_matrix(np.amax(self.intensity, 0))
self.event_id = np.array([np.max(self.event_id)+1])
self.event_name = list(map(str, self.event_id))
self.date = np.array([np.min(self.date)])
self.date_end = np.array([np.max(self.date_end)])
self.orig = np.ones(1, bool)
self._set_frequency()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
LOGGER.info('The merged event has event_id %s', self.event_id[-1])
else:
self.intensity = sparse.csr_matrix(np.amax(self.intensity, 0))
self.event_id = np.array([np.max(self.event_id)+1])
self.orig = np.zeros(1, bool)
self._set_frequency()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
[docs]
def summarize_fires_to_seasons(self, year_start=None, year_end=None,
hemisphere=None):
""" Summarize historic fires into fire seasons.
Fires are summarized by taking the max intensity at each grid point.
This method modifies self (climada.hazard.WildFire instance) by
summarizing individual fires into seasons.
Parameters
----------
year_start : int, optional
start year; fires before that are cut; no cut if not specified
year_end : int, optional
end year; fires after that are cut; no cut if not specified
hemisphere : str, optional
'SHS' or 'NHS' to define fire seasons
"""
# define hemisphere
if hemisphere is None:
if self.centroids.lat[0] > 0:
hemisphere = 'NHS'
elif self.centroids.lat[0] < 0:
hemisphere = 'SHS'
if not all(i >= 0 for i in self.centroids.lat) or \
all(i <= 0 for i in self.centroids.lat):
LOGGER.warning('Not all centroids are on the same hemisphere. \
Hemisphere is set to: %s.', hemisphere)
# define years
fire_years = np.array([date.fromordinal(i).year for i in self.date])
years = np.arange(np.min(fire_years), np.max(fire_years)+1)
if year_start is not None:
years = np.delete(years, np.argwhere(years < year_start))
if year_end is not None:
years = np.delete(years, np.argwhere(years > year_end))
# summarize to fire season
date_new = np.zeros(len(years), int)
date_end_new = np.zeros(len(years), int)
n_fires = np.zeros(len(years), int)
intensity_new = sparse.lil_matrix(np.zeros((len(years), len(self.centroids.lat))))
for i, year in enumerate(years):
if hemisphere == 'NHS':
start = date.toordinal(date(year, 1, 1))
end = date.toordinal(date(year+1, 1, 1))
elif hemisphere == 'SHS':
start = date.toordinal(date(year, 7, 1))
end = date.toordinal(date(year+1, 7, 1))
date_new[i] = start
date_end_new[i] = end
idx = np.where((self.date > start-1) & \
(self.date < end + 1))
n_fires[i] = len(idx)
intensity_new[i] = sparse.csr_matrix( \
np.amax(self.intensity[idx], 0))
# save
self.haz_type = 'WFseason'
self.units = 'K' # Kelvin brightness
# Following values are defined for each fire season
self.event_id = np.arange(1, len(years)+1).astype(int)
self.event_name = list(map(str, years))
self.date = date_new
self.date_end = date_end_new
self.n_fires = n_fires
self.orig = np.ones(len(years), bool)
self._set_frequency()
# Following values are defined for each fire and centroid
self.intensity = intensity_new.tocsr()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
#@staticmethod
def _clean_firms_df(self, df_firms):
"""Read and remove low confidence data from firms:
- MODIS: remove data where confidence values are lower than clean_thresh
- VIIRS: remove data where confidence values are set to low (keep
nominal and high values)
Parameters
----------
df_firms : pd.DataFrame
FIRMS data as pd.Dataframe
Returns
-------
df_firms : pd.DataFrame
"""
# Check for the type of instrument (MODIS vs VIIRS)
# Remove data with low confidence interval
# Uniformize the name of the birghtness columns between VIIRS and MODIS
if 'instrument' in df_firms.columns:
if (df_firms.instrument == 'MODIS').any() or (df_firms.instrument == 'VIIRS').any():
df_firms_modis = df_firms.drop(df_firms[df_firms.instrument == 'VIIRS'].index)
df_firms_modis.confidence = np.array(
list(map(int, df_firms_modis.confidence.values.tolist())))
df_firms_modis = df_firms_modis.drop(df_firms_modis[ \
df_firms_modis.confidence < self.FirmsParams.clean_thresh].index)
temp = df_firms_modis
df_firms_viirs = df_firms.drop(df_firms[df_firms.instrument == 'MODIS'].index)
if df_firms_viirs.size:
df_firms_viirs = df_firms_viirs.drop(df_firms_viirs[ \
df_firms_viirs.confidence == 'l'].index)
df_firms_viirs = df_firms_viirs.rename(columns={'bright_ti4':'brightness'})
temp = pd.concat([temp, df_firms_viirs], sort=True)
temp = temp.drop(columns=['bright_ti4'])
df_firms = temp
df_firms = df_firms.reset_index()
df_firms = df_firms.drop(columns=['index'])
df_firms['iter_ev'] = np.ones(len(df_firms), bool)
df_firms['cons_id'] = np.zeros(len(df_firms), int) - 1
df_firms['event_id'] = np.zeros(len(df_firms), int)
df_firms['clus_id'] = np.zeros(len(df_firms), int) - 1
df_firms['datenum'] = np.array(u_dt.str_to_date(df_firms['acq_date'].values))
return df_firms
@staticmethod
def _firms_resolution(df_firms):
""" Returns resolution of satellite used in FIRMS in degrees
Parameters
----------
csv_firms : pd.DataFrame or str
path to csv file of FIRMS data or FIRMS data as pd.Dataframe
Returns
-------
res_data/ONE_LAT_KM : float
resolution in degrees
"""
# Resolution in km of the centroids depends on the data origin.
if 'instrument' in df_firms.columns:
if (df_firms['instrument'] == 'MODIS').any():
res_data = 1.0
else:
res_data = 0.375 # For VIIRS data
return res_data/ONE_LAT_KM
@staticmethod
def _firms_centroids_creation(df_firms, res_data, centr_res_factor):
""" Create centroids according to the extent of the firms data.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
res_data : float
FIRMS instrument resolution in degrees
centr_res_factor : float
the factor applied to voluntarly decrease/increase the
centroids resolution. Hence, in order to set hazard resolution
of MODIS data to 4 km, a centr_res_factor of 0.25 is applied.
Returns
-------
centroids : Centroids
"""
centroids = Centroids.from_pnt_bounds((df_firms['longitude'].min(), \
df_firms['latitude'].min(), df_firms['longitude'].max(), \
df_firms['latitude'].max()), res=res_data/centr_res_factor)
centroids.set_meta_to_lat_lon()
centroids.set_area_approx()
centroids.set_on_land()
centroids.empty_geometry_points()
return centroids
@staticmethod
def _centroids_resolution(centroids):
""" Return resolution of the scalar grid of the centroids
Parameters
----------
centroids (Centroids): centroids instance
Returns
-------
res_centr : float
grid resolution of centroids
"""
if centroids.meta:
res_centr = abs(centroids.meta['transform'][4]), \
centroids.meta['transform'][0]
else:
res_centr, _ = u_coord.get_resolution(centroids.lat, centroids.lon)
if abs(abs(res_centr[0]) - abs(res_centr[1])) > 1.0e-6:
raise ValueError('Centroids are not a regular raster')
return res_centr[0]
def _firms_cons_days(self, df_firms):
""" Compute clusters of consecutive days (temporal clusters).
An interruption of n days (as defined in FirmsParams) is
necessary to be set in two different temporal clusters.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
Returns
-------
df_firms : pd.DataFrame
FIRMS data including info on temporal cluster per point
"""
LOGGER.debug('Computing clusters of consecutive days.')
firms_iter = df_firms[df_firms['iter_ev']][['datenum', 'cons_id', 'event_id']]
max_cons_id = df_firms.cons_id.max() + 1
for event_id in np.unique(firms_iter.event_id.values):
firms_cons = firms_iter[firms_iter.event_id == event_id].reset_index()
# Order firms dataframe per ascending acq_date order
firms_cons = firms_cons.sort_values('datenum')
sort_idx = firms_cons.index
firms_cons = firms_cons.reset_index()
firms_cons = firms_cons.drop(columns=['index'])
# Check if there is more than 1 day interruption between data
firms_cons.at[0, 'cons_id'] = max_cons_id
max_cons_id += 1
for index in range(1, len(firms_cons)):
if abs((firms_cons.at[index, 'datenum'] - firms_cons.at[index-1, 'datenum'])) \
>= self.FirmsParams.days_thres_firms:
firms_cons.at[index, 'cons_id'] = max_cons_id
max_cons_id += 1
else:
firms_cons.at[index, 'cons_id'] = firms_cons.at[(index-1), 'cons_id']
re_order = np.zeros(len(firms_cons), int)
for data, order in zip(firms_cons.cons_id.values, sort_idx):
re_order[order] = data
firms_iter.cons_id.values[firms_iter.event_id == event_id] = re_order
df_firms.cons_id.values[df_firms['iter_ev'].values] = firms_iter.cons_id.values
return df_firms
def _firms_clustering(self, df_firms, res_data):
""" Compute geographic clusters and sort firms with ascending clus_id
for each cons_id. Geographic clusters are identified using sci-kit
learn's DBSCAN algorithm, which finds core samples of high density
and expands clusters from them.
https://scikit-learn.org/stable/modules/generated/sklearn.cluster.DBSCAN.html
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
res_data : float
FIRMS instrument resolution in degrees
clus_thres : int
Clustering factor which multiplies instrument resolution
Returns
-------
df_firms : pd.DataFrame
FIRMS data including info on spacial cluster per point
"""
LOGGER.debug('Computing geographic clusters in consecutive fires.')
firms_iter = df_firms[df_firms['iter_ev']][['latitude', 'longitude', 'cons_id',
'clus_id', 'event_id']]
for event_id in np.unique(firms_iter.event_id.values):
firms_cons = firms_iter[firms_iter.event_id == event_id]
# Creation of an identifier for geographical clustering
# For each temporal cluster, perform geographical clustering with DBSCAN algo
for cons_id in np.unique(firms_cons['cons_id'].values):
temp = np.argwhere(firms_cons['cons_id'].values == cons_id).reshape(-1,)
lat_lon = firms_cons.iloc[temp][['latitude', 'longitude']].values
lat_lon_uni, lat_lon_cpy = np.unique(lat_lon, return_inverse=True, axis=0)
cluster_id = DBSCAN(eps=res_data * self.FirmsParams.clus_thres_firms,
min_samples=1).\
fit(lat_lon_uni).labels_
cluster_id = cluster_id[lat_lon_cpy]
firms_cons.clus_id.values[temp] = cluster_id
firms_iter.clus_id.values[firms_iter.event_id == event_id] = \
firms_cons.clus_id.values
df_firms.clus_id.values[df_firms['iter_ev'].values] = firms_iter.clus_id.values
return df_firms
def _firms_fire(self, df_firms):
""" Creation of event_id for each dataset point.
A fire is characterized by a unique combination of 'cons_id' and 'clus_id'.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data including info on temporal and spatial cluster per point
Returns
-------
df_firms : pd.DataFrame
FIRMS data including info on final cluster (=event) per point
"""
ev_id = 0
for cons_id in np.unique(df_firms.cons_id.values):
firms_cons = df_firms.clus_id.values[df_firms.cons_id.values == cons_id]
for clus_id in np.unique(firms_cons):
df_firms.event_id.values[np.logical_and(df_firms.cons_id.values == cons_id, \
df_firms.clus_id.values == clus_id)] = ev_id
ev_id += 1
for ev_id in np.unique(df_firms.event_id.values):
date_ord = np.sort(df_firms.datenum.values[df_firms.event_id.values == ev_id])
if (np.diff(date_ord) < self.FirmsParams.days_thres_firms).all():
df_firms.iter_ev.values[df_firms.event_id.values == ev_id] = False
else:
df_firms.iter_ev.values[df_firms.event_id.values == ev_id] = True
@staticmethod
def _firms_remove_minor_fires(df_firms, minor_fires_thres):
""" Remove fires containg fewer FIRMS entries than threshold.
A fire is characterized by a unique combination of 'cons_id' and
'clus_id'. This function modifies the df_firms in place.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
minor_fires_thres : int
threshold of FIRMS data points for an event
Returns
-------
df_firms : pd.DataFrame
FIRMS data excluding minor fire events
"""
# drop minor fires
for i in range(np.unique(df_firms.event_id).size):
if (df_firms.event_id == i).sum() < minor_fires_thres:
df_firms = df_firms.drop(df_firms[df_firms.event_id == i].index)
# assign new event IDs
event_id_new = 1
for i in np.unique(df_firms.event_id):
df_firms.event_id[df_firms.event_id == i] = event_id_new
event_id_new = event_id_new + 1
df_firms = df_firms.reset_index()
return df_firms
def _calc_brightness(self, df_firms, centroids, res_centr):
""" Compute intensity matrix per fire with the maximum brightness at
each centroid and all other hazard attributes.
This method modifies self (climada.hazard.WildFire instance) by
assigning values to the intensity matrix.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
centroids : Centroids
res_centr : float
centroids resolution in centroids unit
"""
uni_ev = np.unique(df_firms['event_id'].values)
num_ev = uni_ev.size
num_centr = centroids.size
# For one fire, if more than one points of firms dataframe have the
# same coordinates, take the maximum brightness value
# of these points (maximal damages).
tree_centr = BallTree(centroids.coord, metric='chebyshev')
if self.pool:
chunksize = max(min(num_ev//self.pool.ncpus, 1000), 1)
bright_list = self.pool.map(self._brightness_one_fire,
itertools.repeat(df_firms, num_ev),
itertools.repeat(tree_centr, num_ev),
uni_ev, itertools.repeat(res_centr),
itertools.repeat(num_centr),
chunksize=chunksize)
else:
bright_list = []
for ev_id in uni_ev:
bright_list.append(self._brightness_one_fire(df_firms, tree_centr, \
ev_id, res_centr, num_centr))
bright_list, df_firms = self._remove_empty_fires(bright_list, df_firms)
uni_ev = np.unique(df_firms['event_id'].values)
num_ev = uni_ev.size
# save
self.haz_type = 'WFsingle'
self.centroids = centroids
self.units = 'K' # Kelvin brightness
# Following values are defined for each fire
self.event_id = np.arange(1, num_ev+1).astype(int)
self.event_name = list(map(str, self.event_id))
self.date = np.zeros(num_ev, int)
self.date_end = np.zeros(num_ev, int)
for ev_idx, ev_id in enumerate(uni_ev):
self.date[ev_idx] = df_firms[df_firms.event_id == ev_id].datenum.min()
self.date_end[ev_idx] = df_firms[df_firms.event_id == ev_id].datenum.max()
self.orig = np.ones(num_ev, bool)
self._set_frequency()
# Following values are defined for each fire and centroid
self.intensity = sparse.lil_matrix(np.zeros((num_ev, num_centr)))
for idx, ev_bright in enumerate(bright_list):
self.intensity[idx] = ev_bright
self.intensity = self.intensity.tocsr()
self.fraction = self.intensity.copy()
self.fraction.data.fill(1.0)
@staticmethod
def _brightness_one_fire(df_firms, tree_centr, ev_id, res_centr, num_centr):
""" For a given fire, fill in an intensity np.array with the maximum brightness
at each centroid.
Parameters
----------
df_firms : pd.DataFrame
FIRMS data
centroids : Centroids
ev_id : int
id of the selected event
Returns
-------
brightness_ev : lil_matrix
maximum brightness at each centroids
"""
LOGGER.debug('Brightness corresponding to FIRMS event %s.', str(ev_id))
temp_firms = df_firms.reindex(
index=(np.argwhere(df_firms['event_id'].values == ev_id).reshape(-1,)),
columns=['latitude', 'longitude', 'brightness', 'datenum'])
# Identifies the unique (lat,lon) points of the firms dataframe -> lat_lon_uni
# Set the same index value for each duplicate (lat,lon) points -> lat_lon_cpy
lat_lon_uni, lat_lon_cpy = np.unique(temp_firms[['latitude', 'longitude']].values,
return_inverse=True, axis=0)
index_uni = np.unique(lat_lon_cpy, axis=0)
# Search closest centroid for each firms point
ind, _ = tree_centr.query_radius(lat_lon_uni, r=res_centr/2, count_only=False,
return_distance=True, sort_results=True)
ind = np.array([ind_i[0] if ind_i.size else -1 for ind_i in ind])
brightness_ev = _fill_intensity_max(num_centr, ind, index_uni, lat_lon_cpy,
temp_firms['brightness'].values)
return sparse.lil_matrix(brightness_ev)
@staticmethod
def _remove_empty_fires(bright_list, df_firms):
""" Removes instances which were identified as an eventbut which
contain no intensity. This happens for events which occur
outside of the defined centroids.
Parameters
----------
bright_list : list
idnividual wild fires
firms : pd.DataFrame
FIRMS data
Returns
-------
bright_list_nonzero : list
list with events that occured on the defined centroids
firms : pd.DataFrame
FIRMS data (with data that occured on the defined centroids)
"""
bright_list_nonzero = []
event_id_new = 1
for i, br_list in enumerate(bright_list):
if br_list.count_nonzero() > 0:
bright_list_nonzero.append(br_list)
df_firms.event_id.values[df_firms.event_id == i+1] = event_id_new
event_id_new = event_id_new + 1
else:
df_firms = df_firms.drop(df_firms[df_firms.event_id == i].index)
df_firms = df_firms.reset_index()
LOGGER.info('Returning %s fires that impacted the defined centroids.',
len(bright_list_nonzero))
return bright_list_nonzero, df_firms
def _set_one_proba_fire_season(self, n_ignitions, seed=8):
""" Generate a probabilistic fire season.
Parameters
----------
n_ignitions : int
number of wild fires for the season
seed : int
Returns
-------
proba_fires : lil_matrix
probablistic hazard
"""
np.random.seed(seed)
proba_fires = sparse.lil_matrix(np.zeros((n_ignitions, self.centroids.size)))
for i in range(n_ignitions):
if np.mod(i, 10) == 0:
LOGGER.info('Created %s fires', str(i))
centr_burned = self._run_one_fire()
proba_fires[i, :] = self._set_proba_intensity(centr_burned)
return proba_fires
def _run_one_fire(self):
""" Run one bushfire on a fire propagation probability matrix.
If the matrix is not defined, it is constructed using past fire
experience -> a fire can only propagate on centroids that burned
in the past including a exponentially blurred range around the
historic fires.
The ignition point of a fire can be on any centroid, on which
the propagation probability equals 1. The fire is then propagated
with a cellular automat.
If the fire has not stopped burning after a defined number of
iterations (self.ProbaParams.max_it_propa, default=500'000),
the propagation is interrupted.
Propagation rules:
1. select a burning centroid (at the start there is only one,
afterwards select one randomly)
2. every adjunct centroid to the selected centroid can start
burning with a probability of the overall propagation
probability (self.ProbaParams.prop_proba) times the
centroid specific propagation probability (which is
defined on the fire_propa_matrix).
3. the selected burning centroid becomes an ember centroid
which can not start burning again and thus no longer
propagate any fire.
Properties from centr_burned:
0 = unburned centroid
1 = burning centroid
2 = ember centroid
Stop criteria: the propagation stops when no centroid is burning.
The initial version of this code was inspired by
https://scipython.com/blog/the-forest-fire-model/
Parameters
----------
self : climada.hazard.WildFire instance
needs to contain information of at least 1 historic wildfire
Returns
-------
centr_burned : np.array
array indicating which centroids burned
"""
# set fire propagation matrix if not already defined
if not hasattr(self.centroids, 'fire_propa_matrix'):
self._set_fire_propa_matrix()
# Ignation only at centroids that burned in the past
pos_centr = np.argwhere(self.centroids.fire_propa_matrix.reshape( \
len(self.centroids.lat)) == 1)[:, 1]
LOGGER.debug('Start ignition.')
# Random selection of ignition centroid
for _ in range(self.centroids.size):
centr = np.random.choice(pos_centr)
centr_ix = int(centr/self.centroids.shape[1])
centr_iy = centr%self.centroids.shape[1]
centr_ix = max(0, centr_ix)
centr_ix = min(self.centroids.shape[0]-1, centr_ix)
centr_iy = max(0, centr_iy)
centr_iy = min(self.centroids.shape[1]-1, centr_iy)
centr = centr_ix*self.centroids.shape[1] + centr_iy
if 1 <= centr_ix < self.centroids.shape[0] - 1 and \
1 <= centr_iy < self.centroids.shape[1] - 1:
break
LOGGER.debug('Propagate fire.')
centr_burned = np.zeros((self.centroids.shape), int)
centr_burned[centr_ix, centr_iy] = 1
# Iterate the fire according to the propagation rules
count_it = 0
while np.any(centr_burned == 1) and count_it < self.ProbaParams.max_it_propa:
count_it += 1
# Select randomly one of the burning centroids
# and propagate throught its neighborhood
burned = np.argwhere(centr_burned == 1)
if len(burned) > 1:
centr_ix, centr_iy = burned[np.random.randint(0, len(burned))]
elif len(burned) == 1:
centr_ix, centr_iy = burned[0]
if not count_it % (self.ProbaParams.max_it_propa-1):
LOGGER.warning('Fire propagation not converging at iteration %s.',
count_it)
if 1 <= centr_ix < self.centroids.shape[0]-1 and \
1 <= centr_iy < self.centroids.shape[1]-1 and \
self.centroids.on_land[(centr_ix*self.centroids.shape[1] + centr_iy)]:
centr_burned = self._fire_propagation_on_matrix(self.centroids.shape, \
self.centroids.fire_propa_matrix, self.ProbaParams.prop_proba, \
centr_ix, centr_iy, centr_burned, np.random.random(500))
return centr_burned
@staticmethod
@numba.njit
def _fire_propagation_on_matrix(centr_shape, fire_propa_matrix, prop_proba,
centr_ix, centr_iy, centr_burned, prob_array):
""" Propagation of the fire in the 8 neighbouring cells around
(centr_ix, centr_iy) according to propagation rules.
Parameters
----------
centr_shape : np.array
shape of centroids array
fire_propa_matrix : np.array
fire proagation matrix indicating centroid specific fire
spread probability
prop_proba : float
global propagation probability
centr_ix : int
x coordinates of the burning centroid in the centroids matrix
centr_iy : int
y coordinates of the burning centroid in the centroids matrix
centr_burned : np.array
array containing information on burned centroids
prob_array: np.array
array of random numbers to draw from for random fire propagation
Returns
-------
centr_burned : np.array
updated centr_burned matrix
"""
# Neighbourhood
hood = ((-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1))
# Creation of the temporary centroids grids
burned_one_step = np.zeros(centr_shape, dtype=numba.int32)
burned_one_step[centr_ix, centr_iy] = 1
# Displacements from a centroid to its eight neighbours
for i_neig, (delta_x, delta_y) in enumerate(hood):
if prob_array[i_neig] <= prop_proba * \
fire_propa_matrix[centr_ix+delta_x, centr_iy+delta_y] and \
centr_burned[centr_ix+delta_x, centr_iy+delta_y] == 0:
burned_one_step[centr_ix+delta_x, centr_iy+delta_y] = 1
# Calculate burned area
centr_burned += burned_one_step
return centr_burned
def _set_proba_intensity(self, centr_burned):
""" The intensity values are chosen randomly at every burned centroid
from the intensity values of the historical fire
Parameters
----------
self : climada.hazard.WildFire instance
centr_burned : np.array
array indicating which centroids burned
Returns
-------
proba_intensity : lil_matrix
hazard intensity matrix of generated probabilistic fire
"""
# The brightness values are chosen randomly at every burned centroids
# from the brightness values of the historical fire
ev_proba_uni = centr_burned.nonzero()[0] * self.centroids.shape[1] + \
centr_burned.nonzero()[1]
proba_intensity = sparse.lil_matrix(np.zeros((1, self.centroids.size)))
for ev_prob in ev_proba_uni:
proba_intensity[0, ev_prob] = np.random.choice( \
self.intensity.data)
return proba_intensity
def _set_fire_propa_matrix(self):
""" sets fire propagation matrix which is used to propagate
probabilistic fires. The matrix is set so that burn probability on
centroids which burned historically is set to 1. A blurr with
exponential decay of burn probabilities is set around these
centroids. The blurr width is defined within self.ProbaParams
Alternatively, the fire propagation probability matrix can be any
matrix that coresponds to the shape of the centroids and thus not have
to be set this way.
This method modifies self (climada.hazard.WildFire instance) by
populating self.centroids.fire_propa_matrix as np.array
Parameters
----------
self : climada.hazard.WildFire instance
"""
# historically burned centroids
hist_burned = np.zeros(self.centroids.lat.shape, dtype=bool)
hist_burned = self.intensity.sum(0) > 0.
self.centroids.hist_burned = hist_burned
fire_propa_matrix = hist_burned.reshape(self.centroids.shape).astype(float)
blurr_lvl = np.ones(self.ProbaParams.blurr_steps)
# exponential decay of fire propagation on cell level
for i in range(self.ProbaParams.blurr_steps):
blurr_lvl[i] = 2.**(-i)
# Neighbourhood
hood = ((-1, -1), (-1, 0), (-1, 1), (0, -1), (0, 1), (1, -1), (1, 0), (1, 1))
# loop over matrix to create blurr around historical fires
for blurr in range(self.ProbaParams.blurr_steps-1):
for i in range(fire_propa_matrix.shape[0]):
for j in range(fire_propa_matrix.shape[1]):
if fire_propa_matrix[i, j] == blurr_lvl[blurr]:
for _, (delta_x, delta_y) in enumerate(hood):
try:
if fire_propa_matrix[i+delta_x, j+delta_y] == 0:
fire_propa_matrix[i+delta_x, j+delta_y] = blurr_lvl[blurr+1]
except IndexError:
pass
self.centroids.fire_propa_matrix = fire_propa_matrix
[docs]
def plot_fire_prob_matrix(self):
""" Plots fire propagation probability matrix as contour plot.
At this point just to check the matrix but could easily be improved to
normal map.
Parameters
----------
self : climada.hazard.WildFire instance
Returns
-------
contour plot : plt
contour plot of fire_propa_matrix
"""
lon = np.reshape(self.centroids.lon, self.centroids.fire_propa_matrix.shape)
lat = np.reshape(self.centroids.lat, self.centroids.fire_propa_matrix.shape)
plt.contourf(lon, lat, self.centroids.fire_propa_matrix)
@staticmethod
def _select_fire_season(df_firms, year, hemisphere='SHS'):
""" Selects data to create historic fire season. Need to
differentiate between Northern & Souther hemisphere
Parameters
----------
firms : pd.DataFrame
FIRMS data
year : int
hemisphere : str, optional
'NHS' or 'SHS'
Returns
-------
firms : pd.DataFrame
FIRMS data for specified fire season
"""
df_firms['date'] = df_firms['acq_date'].apply(pd.to_datetime)
if hemisphere == 'NHS':
start = pd.Timestamp(year, 1, 1)
end = pd.Timestamp(year+1, 1, 1)
elif hemisphere == 'SHS':
start = pd.Timestamp(year, 7, 1)
end = pd.Timestamp(year+1, 7, 1)
df_firms = df_firms[(df_firms['date'] > start) & (df_firms['date'] < end)]
df_firms = df_firms.drop('date', axis=1)
return df_firms
def _set_frequency(self):
"""Set hazard frequency from intensity matrix.
Parameters
----------
self : climada.hazard.WildFire instance
Returns
-------
self.frequency : np.array
"""
delta_time = date.fromordinal(int(np.max(self.date))).year - \
date.fromordinal(int(np.min(self.date))).year + 1
num_orig = self.orig.nonzero()[0].size
if num_orig > 0:
ens_size = self.event_id.size / num_orig
else:
ens_size = 1
self.frequency = np.ones(self.event_id.size) / delta_time / ens_size
@numba.njit
def _fill_intensity_max(num_centr, ind, index_uni, lat_lon_cpy, fir_bright):
""" Assigns maximum intensity value for each centroid. This is required
as it can happen that several firms data points are mapped on to one
centroid.
Parameters
----------
num_centr : int
number of centroids
ind : np.array
index of closest centroid of each firms point
index_uni : np.array
unique index of each centroid
lat_lon_cpy : np.array
lat /lon information of each firms point
fir_bright : np.array
brightness of each firms data point
Returns
-------
brightness_ev : np.array
maximum brightness at each centroids
"""
brightness_ev = np.zeros((1, num_centr), dtype=numba.float64)
for idx in range(index_uni.size):
if ind[idx] != -1:
brightness_ev[0, ind[idx]] = max(brightness_ev[0, ind[idx]], \
np.max(fir_bright[lat_lon_cpy == index_uni[idx]]))
return brightness_ev