"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define GDPAsset class.
"""
__all__ = ['GDP2Asset']
import logging
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import scipy as sp
import climada.util.coordinates as u_coord
from climada.util.constants import RIVER_FLOOD_REGIONS_CSV, SYSTEM_DIR
from climada.entity import Exposures, INDICATOR_IMPF
LOGGER = logging.getLogger(__name__)
DEF_HAZ_TYPE = 'RF'
CONVERTER = SYSTEM_DIR.joinpath('GDP2Asset_converter_2.5arcmin.nc')
[docs]
class GDP2Asset(Exposures):
[docs]
def set_countries(self, countries=[], reg=[], ref_year=2000,
path=None):
"""Model countries using values at reference year. If GDP or income
group not available for that year, consider the value of the closest
available year.
Parameters
----------
countries : list
list of country names ISO3
ref_year : int, optional
reference year. Default: 2016
path : string
path to exposure dataset (ISIMIP)
"""
gdp2a_list = []
description = ''
if path is None:
raise NameError('No path for exposure data set')
if not Path(path).is_file():
raise NameError('Invalid path %s' % path)
try:
if not countries:
if reg:
natISO = u_coord.region2isos(reg)
countries = np.array(natISO)
else:
raise ValueError('set_countries requires countries or reg')
for cntr_ind in range(len(countries)):
gdp2a_list.append(self._set_one_country(countries[cntr_ind],
ref_year, path))
description += ("{} GDP2Asset \n").\
format(countries[cntr_ind])
except KeyError as err:
raise KeyError(f'Exposure countries: {countries} or reg {reg} could not be set, '
f'check ISO3 or reference year {ref_year}') from err
description += 'GDP2Asset ' + str(self.ref_year)
Exposures.__init__(
self,
data=Exposures.concat(gdp2a_list).gdf,
ref_year=ref_year,
value_unit='USD'
)
# set meta
res = 0.0416666
rows, cols, ras_trans = u_coord.pts_to_raster_meta(
(self.longitude.min(), self.latitude.min(),
self.longitude.max(), self.latitude.max()), res)
self.meta = {'width': cols, 'height': rows, 'crs': self.crs,
'transform': ras_trans}
@staticmethod
def _set_one_country(countryISO, ref_year, path=None):
"""Extract coordinates of selected countries or region
from NatID grid.
Parameters
----------
countryISO : str
ISO3 of country
ref_year : int
year under consideration
path : str
path for gdp-files
Raises
------
KeyError, OSError
Returns
-------
GDP2Asset
"""
natID = u_coord.country_iso2natid(countryISO)
natID_info = pd.read_csv(RIVER_FLOOD_REGIONS_CSV)
reg_id, impf_rf = _fast_impf_mapping(natID, natID_info)
lat, lon = u_coord.get_region_gridpoints(countries=[natID], iso=False, basemap="isimip")
coord = np.stack([lat, lon], axis=1)
assets = _read_GDP(coord, ref_year, path)
reg_id_info = np.full((len(assets),), reg_id)
impf_rf_info = np.full((len(assets),), impf_rf)
return GDP2Asset(
ref_year = ref_year,
value = assets,
lat = coord[:, 0],
lon = coord[:, 1],
data = {
INDICATOR_IMPF + DEF_HAZ_TYPE: impf_rf_info,
'region_id': reg_id_info,
}
)
def _read_GDP(shp_exposures, ref_year, path=None):
"""Read GDP-values for the selected area and convert it to asset.
Parameters
----------
shp_exposure : 2d-array float
coordinates of area
ref_year : int
year under consideration
path : str
path for gdp-files
Raises
------
KeyError, OSError
Returns
-------
np.array
"""
try:
gdp_file = xr.open_dataset(path)
asset_converter = xr.open_dataset(CONVERTER)
gdp_lon = gdp_file.lon.data
gdp_lat = gdp_file.lat.data
time = gdp_file.time.dt.year
except OSError as err:
raise OSError(f'Problems while reading {path} check exposure_file specifications') from err
try:
year_index = np.where(time == ref_year)[0][0]
except IndexError as err:
raise KeyError(f'No data available for year {ref_year}') from err
conv_lon = asset_converter.lon.data
conv_lat = asset_converter.lat.data
gridX, gridY = np.meshgrid(conv_lon, conv_lat)
coordinates = np.zeros((gridX.size, 2))
coordinates[:, 0] = gridY.flatten()
coordinates[:, 1] = gridX.flatten()
gdp = gdp_file.gdp_grid[year_index, :, :].data
_test_gdp_centr_match(gdp_lat, gdp_lon, shp_exposures)
conv_factors = asset_converter.conversion_factor.data
if gdp.shape == conv_factors.shape:
asset = gdp * conv_factors
asset = sp.interpolate.interpn((gdp_lat, gdp_lon),
np.nan_to_num(asset),
(shp_exposures[:, 0],
shp_exposures[:, 1]),
method='nearest',
bounds_error=False,
fill_value=None)
else:
conv_factors = sp.interpolate.interpn((conv_lat, conv_lon),
np.nan_to_num(conv_factors),
(shp_exposures[:, 0],
shp_exposures[:, 1]),
method='nearest',
bounds_error=False,
fill_value=None)
gdp = sp.interpolate.interpn((gdp_lat, gdp_lon),
np.nan_to_num(gdp),
(shp_exposures[:, 0],
shp_exposures[:, 1]),
method='nearest',
bounds_error=False,
fill_value=None)
asset = gdp * conv_factors
return asset
def _test_gdp_centr_match(gdp_lat, gdp_lon, shp_exposures):
if (max(gdp_lat) + 0.5 < max(shp_exposures[:, 0])) or\
(max(gdp_lon) + 0.5 < max(shp_exposures[:, 1])) or\
(min(gdp_lat) - 0.5 > min(shp_exposures[:, 0])) or\
(min(gdp_lon) - 0.5 > min(shp_exposures[:, 1])):
raise IOError('Asset Data does not match selected country')
def _fast_impf_mapping(countryID, natID_info):
"""Assign region-ID and impact function id.
Parameters
----------
countryID : int
natID_info :
dataframe of lookuptable
Raises
------
KeyError
Returns
-------
float,float
"""
nat = natID_info['ID']
impf_RF = natID_info['impf_RF']
reg_ID = natID_info['Reg_ID']
fancy_impf = np.zeros((max(nat) + 1))
fancy_impf[:] = np.nan
fancy_impf[nat] = impf_RF
fancy_reg = np.zeros((max(nat) + 1))
fancy_reg[:] = np.nan
fancy_reg[nat] = reg_ID
try:
reg_id = fancy_reg[countryID]
impf_rf = fancy_impf[countryID]
except KeyError as err:
raise KeyError(f'Country ISO unknown: {countryID}') from err
return reg_id, impf_rf