Source code for climada_petals.entity.exposures.black_marble

"""
This file is part of CLIMADA.

Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.

CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.

CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE.  See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.

---

Define BlackMarble class.
"""

__all__ = ['BlackMarble']

import logging
import math
from pathlib import Path

import numpy as np
from numpy.polynomial.polynomial import polyval
from scipy import ndimage
import shapely.vectorized
from cartopy.io import shapereader

from climada.entity.exposures.base import Exposures, INDICATOR_IMPF
from climada.entity.exposures.litpop import nightlight as nl_utils
from climada.util.constants import SYSTEM_DIR, DEF_CRS
from climada.util.finance import gdp, income_group
import climada.util.coordinates as u_coord

LOGGER = logging.getLogger(__name__)

DEF_RES_NOAA_KM = 1
"""Default approximate resolution for NOAA NGDC nightlights in km."""

DEF_RES_NASA_KM = 0.5
"""Default approximate resolution for NASA's nightlights in km."""

DEF_HAZ_TYPE = 'TC'
"""Default hazard type used in impact functions id."""

DEF_POLY_VAL = [0, 0, 1]
"""Default polynomial transformation used."""

[docs] class BlackMarble(Exposures): """Defines exposures from night light intensity, GDP and income group. Attribute region_id is defined as: - United Nations Statistics Division (UNSD) 3-digit equivalent numeric code - 0 if country not found in UNSD. - -1 for water """ _metadata = Exposures._metadata + ['nightlight_file']
[docs] def set_countries(self, countries, ref_year=2016, res_km=None, from_hr=None, admin_file='admin_0_countries', **kwargs): """ Model countries using values at reference year. If GDP or income group not available for that year, consider the value of the closest available year. Parameters ---------- countries : list or dict list of country names (admin0 or subunits) or dict with key = admin0 name and value = [admin1 names] ref_year : int, optional reference year. Default: 2016 res_km : float, optional approx resolution in km. Default: nightlights resolution. from_hr : bool, optional force to use higher resolution image, independently of its year of acquisition. admin_file : str file name, admin_0_countries or admin_0_map_subunits kwargs : optional 'gdp' and 'inc_grp' dictionaries with keys the country ISO_alpha3 code. 'poly_val' list of polynomial coefficients [1,x,x^2,...] to apply to nightlight (DEF_POLY_VAL used if not provided). If provided, these are used. """ admin_key_dict = {'admin_0_countries': ['ADMIN', 'ADM0_A3'], 'admin_0_map_subunits': ['SUBUNIT', 'SU_A3']} shp_file = shapereader.natural_earth(resolution='10m', category='cultural', name=admin_file) shp_file = shapereader.Reader(shp_file) cntry_info, cntry_admin1 = country_iso_geom(countries, shp_file, admin_key_dict[admin_file]) fill_econ_indicators(ref_year, cntry_info, shp_file, **kwargs) nightlight, coord_nl, fn_nl, res_fact, res_km = get_nightlight( ref_year, cntry_info, res_km, from_hr) bkmrbl_list = [] descr_lines = [] for cntry_iso, cntry_val in cntry_info.items(): bkmrbl_list.append( self._set_one_country(cntry_val, nightlight, coord_nl, res_fact, res_km, cntry_admin1[cntry_iso], **kwargs).gdf) descr_lines.append(f"{cntry_val[1]} {cntry_val[3]:d}" f" GDP: {cntry_val[4]:.3e} income group: {cntry_val[5]:d}") BlackMarble.__init__( self, data=Exposures.concat(bkmrbl_list).gdf, crs=DEF_CRS, ref_year=ref_year, value_unit='USD', description="\n".join(descr_lines), nightlight_file = Path(fn_nl).name, ) rows, cols, ras_trans = u_coord.pts_to_raster_meta( (self.gdf.longitude.min(), self.gdf.latitude.min(), self.gdf.longitude.max(), self.gdf.latitude.max()), (coord_nl[0, 1], -coord_nl[0, 1]) ) self.meta = {'width': cols, 'height': rows, 'crs': self.crs, 'transform': ras_trans}
@staticmethod def _set_one_country(cntry_info, nightlight, coord_nl, res_fact, res_km, admin1_geom, **kwargs): """Model one country. Parameters ---------- cntry_info : lsit [cntry_id, cnytry_name, cntry_geometry, ref_year, gdp, income_group] nightlight : np.array nightlight in 30arcsec ~ 1km resolution. Row latitudes, col longitudes coord_nl : np.array nightlight coordinates: [[min_lat, lat_step], [min_lon, lon_step]] res_fact : float resampling factor res_km : float wished resolution in km admin1_geom : list list of admin1 geometries to filter poly_val : list list of polynomial coefficients to apply to nightlight """ LOGGER.info('Processing country %s.', cntry_info[1]) if 'poly_val' in kwargs: poly_val = kwargs['poly_val'] else: poly_val = DEF_POLY_VAL geom = cntry_info[2] nightlight_reg, lat_reg, lon_reg, on_land = _cut_country(geom, nightlight, coord_nl) nightlight_reg = _set_econ_indicators(nightlight_reg, cntry_info[4], cntry_info[5], poly_val) if admin1_geom: nightlight_reg, lat_reg, lon_reg, geom, on_land = _cut_admin1( nightlight_reg, lat_reg, lon_reg, admin1_geom, coord_nl, on_land) LOGGER.info('Generating resolution of approx %s km.', res_km) nightlight_reg, lat_reg, lon_reg = _resample_land(geom, nightlight_reg, lat_reg, lon_reg, res_fact, on_land) exp_bkmrb = BlackMarble(data={ 'value': np.asarray(nightlight_reg).reshape(-1,), 'latitude': lat_reg, 'longitude': lon_reg, }) exp_bkmrb.gdf['region_id'] = cntry_info[0] exp_bkmrb.gdf[INDICATOR_IMPF] = 1 return exp_bkmrb
def country_iso_geom(countries, shp_file, admin_key=['ADMIN', 'ADM0_A3']): """ Get country ISO alpha_3, country id (defined as the United Nations Statistics Division (UNSD) 3-digit equivalent numeric codes and 0 if country not found) and country's geometry shape. Parameters ---------- countries : list or dict list of country names (admin0) or dict with key = admin0 name and value = [admin1 names] shp_file : cartopy.io.shapereader.Reader shape file admin_key: str key to find admin0 or subunit name Returns ------- cntry_info : dict key = ISO alpha_3 country, value = [country id, country name, country geometry], cntry_admin1 : dict key = ISO alpha_3 country, value = [admin1 geometries] """ countries_shp = {} list_records = list(shp_file.records()) for info_idx, info in enumerate(list_records): countries_shp[info.attributes[admin_key[0]].title()] = info_idx cntry_info = dict() cntry_admin1 = dict() if isinstance(countries, list): countries = {cntry: [] for cntry in countries} admin1_rec = list() else: admin1_rec = shapereader.natural_earth(resolution='10m', category='cultural', name='admin_1_states_provinces') admin1_rec = shapereader.Reader(admin1_rec) admin1_rec = list(admin1_rec.records()) for country_name, prov_list in countries.items(): country_idx = countries_shp.get(country_name.title()) if country_idx is None: options = [country_opt for country_opt in countries_shp if country_name.title() in country_opt] if not options: options = list(countries_shp.keys()) raise ValueError('Country %s not found. Possible options: %s' % (country_name, options)) iso3 = list_records[country_idx].attributes[admin_key[1]] try: cntry_id = u_coord.country_to_iso(iso3, "numeric") except LookupError: cntry_id = 0 cntry_info[iso3] = [cntry_id, country_name.title(), list_records[country_idx].geometry] cntry_admin1[iso3] = _fill_admin1_geom(iso3, admin1_rec, prov_list) return cntry_info, cntry_admin1 def fill_econ_indicators(ref_year, cntry_info, shp_file, **kwargs): """Get GDP and income group per country in reference year, or it closest one. Source: world bank. Natural earth repository used when missing data. Modifies country info with values [country id, country name, country geometry, ref_year, gdp, income_group]. Parameters ---------- ref_year : int reference year cntry_info : dict key = ISO alpha_3 country, value = [country id, country name, country geometry] kwargs : optional 'gdp' and 'inc_grp' dictionaries with keys the country ISO_alpha3 code. If provided, these are used """ for cntry_iso, cntry_val in cntry_info.items(): cntry_val.append(ref_year) if 'gdp' in kwargs and kwargs['gdp'][cntry_iso] != '': gdp_val = kwargs['gdp'][cntry_iso] else: _, gdp_val = gdp(cntry_iso, ref_year, shp_file) cntry_val.append(gdp_val) if 'inc_grp' in kwargs and kwargs['inc_grp'][cntry_iso] != '': inc_grp = kwargs['inc_grp'][cntry_iso] else: _, inc_grp = income_group(cntry_iso, ref_year, shp_file) cntry_val.append(inc_grp) def get_nightlight(ref_year, cntry_info, res_km=None, from_hr=None): """Obtain nightlight from different sources depending on reference year. Compute resolution factor used at resampling depending on source. Parameters ---------- ref_year : int reference year cntry_info : dict key = ISO alpha_3 country, value = [country id, country name, country geometry] res_km : float approx resolution in km. from_hr : bool, optional Returns ------- nightlight (sparse.csr_matrix), coord_nl (np.array), fn_nl (str), res_fact (float) """ if from_hr is None and ref_year > 2013: from_hr = True elif from_hr is None and ref_year <= 2013: from_hr = False if from_hr: if not res_km: res_km = 0.5 nl_year = ref_year if ref_year > 2013: nl_year = 2016 else: nl_year = 2012 LOGGER.info("Nightlights from NASA's earth observatory for year %s.", str(nl_year)) res_fact = DEF_RES_NASA_KM / res_km geom = [info[2] for info in cntry_info.values()] geom = shapely.ops.cascaded_union(geom) req_files = nl_utils.get_required_nl_files(geom.bounds) files_exist = nl_utils.check_nl_local_file_exists(req_files, SYSTEM_DIR, nl_year) nl_utils.download_nl_files(req_files, files_exist, SYSTEM_DIR, nl_year) # nightlight intensity with 15 arcsec resolution nightlight, coord_nl = nl_utils.load_nightlight_nasa(geom.bounds, req_files, nl_year) fn_nl = [file.replace('*', str(nl_year)) for idx, file in enumerate(nl_utils.BM_FILENAMES) if req_files[idx]] fn_nl = ' + '.join(fn_nl) else: if not res_km: res_km = 1.0 nl_year = ref_year if ref_year < 1992: nl_year = 1992 elif ref_year > 2013: nl_year = 2013 LOGGER.info("Nightlights from NOAA's earth observation group for year %s.", str(nl_year)) res_fact = DEF_RES_NOAA_KM / res_km # nightlight intensity with 30 arcsec resolution nightlight, coord_nl, fn_nl = nl_utils.load_nightlight_noaa(nl_year) return nightlight, coord_nl, fn_nl, res_fact, res_km def _fill_admin1_geom(iso3, admin1_rec, prov_list): """Get admin1 polygons for each input province of country iso3. Parameters ---------- iso3 : str admin0 country name in alpha3 admin1_rec : list list of admin1 records prov_list : list province names Returns ------- list(geometry) """ prov_geom = list() for prov in prov_list: found = False for rec in admin1_rec: if prov == rec.attributes['name'] and \ rec.attributes['adm0_a3'] == iso3: found = True prov_geom.append(rec.geometry) break if not found: options = [rec.attributes['name'] for rec in admin1_rec if rec.attributes['adm0_a3'] == iso3] raise ValueError('%s not found. Possible provinces of %s are: %s' % (prov, iso3, options)) return prov_geom def _cut_admin1(nightlight, lat, lon, admin1_geom, coord_nl, on_land): """Cut nightlight image on box containing all the admin1 territories. Parameters ---------- nightlight : np.array nightlight values lat : np.array latitude values in meshgrid lon : np.array longitude values in meshgrid admin1_geom : list(shapely.geometry) all admin1 geometries coord_nl : np.array nightlight coordinates: [[min_lat, lat_step], [min_lon, lon_step]] on_land : np.array array with true values in land points. same size as nightlight, lat, lon Returns ------- nightlight_reg, lat_reg, lon_reg (2d arrays with nightlight values, and coordinates in a square containing the admin1) on_land_reg (2d array of same size as previous with True values on land points) """ all_geom = shapely.ops.cascaded_union(admin1_geom) in_lat = (math.floor((all_geom.bounds[1] - lat[0, 0]) / coord_nl[0, 1]), math.ceil((all_geom.bounds[3] - lat[0, 0]) / coord_nl[0, 1])) in_lon = (math.floor((all_geom.bounds[0] - lon[0, 0]) / coord_nl[1, 1]), math.ceil((all_geom.bounds[2] - lon[0, 0]) / coord_nl[1, 1])) nightlight_reg = nightlight[in_lat[0]:in_lat[-1] + 1, :][:, in_lon[0]:in_lon[-1] + 1] nightlight_reg[nightlight_reg < 0.0] = 0.0 lat_reg, lon_reg = np.mgrid[lat[0, 0] + in_lat[0] * coord_nl[0, 1]: lat[0, 0] + in_lat[1] * coord_nl[0, 1]: complex(0, nightlight_reg.shape[0]), lon[0, 0] + in_lon[0] * coord_nl[1, 1]: lon[0, 0] + in_lon[1] * coord_nl[1, 1]: complex(0, nightlight_reg.shape[1])] on_land_reg = on_land[in_lat[0]:in_lat[-1] + 1, :][:, in_lon[0]:in_lon[-1] + 1] return nightlight_reg, lat_reg, lon_reg, all_geom, on_land_reg def _cut_country(geom, nightlight, coord_nl): """Cut nightlight image on box containing all the land. Parameters ---------- geom : shapely.geometry geometry of the region to consider nightlight : sparse.csr_matrix nightlight values coord_nl : np.array nightlight coordinates: [[min_lat, lat_step], [min_lon, lon_step]] Returns ------- nightlight_reg, lat_reg, lon_reg (2d arrays with nightlight values, and coordinates in a square containing the country) on_land_reg (2d array of same size as previous with True values on land points) """ in_lat = (math.floor((geom.bounds[1] - coord_nl[0, 0]) / coord_nl[0, 1]), math.ceil((geom.bounds[3] - coord_nl[0, 0]) / coord_nl[0, 1])) in_lon = (math.floor((geom.bounds[0] - coord_nl[1, 0]) / coord_nl[1, 1]), math.ceil((geom.bounds[2] - coord_nl[1, 0]) / coord_nl[1, 1])) nightlight_reg = nightlight[in_lat[0]:in_lat[1] + 1, in_lon[0]:in_lon[-1] + 1] \ .toarray() lat_reg, lon_reg = np.mgrid[coord_nl[0, 0] + in_lat[0] * coord_nl[0, 1]: coord_nl[0, 0] + in_lat[1] * coord_nl[0, 1]: complex(0, nightlight_reg.shape[0]), coord_nl[1, 0] + in_lon[0] * coord_nl[1, 1]: coord_nl[1, 0] + in_lon[1] * coord_nl[1, 1]: complex(0, nightlight_reg.shape[1])] on_land_reg = np.zeros(lat_reg.shape, bool) try: iter(geom) except TypeError: geom = [geom] for poly in geom: in_lat = (math.floor((poly.bounds[1] - lat_reg[0, 0]) / coord_nl[0, 1]), math.ceil((poly.bounds[3] - lat_reg[0, 0]) / coord_nl[0, 1])) in_lon = (math.floor((poly.bounds[0] - lon_reg[0, 0]) / coord_nl[1, 1]), math.ceil((poly.bounds[2] - lon_reg[0, 0]) / coord_nl[1, 1])) on_land_reg[in_lat[0]:in_lat[1] + 1, in_lon[0]:in_lon[1] + 1] = ( on_land_reg[in_lat[0]:in_lat[1] + 1, in_lon[0]:in_lon[1] + 1] | shapely.vectorized.contains( poly, lon_reg[in_lat[0]:in_lat[1] + 1, in_lon[0]:in_lon[1] + 1], lat_reg[in_lat[0]:in_lat[1] + 1, in_lon[0]:in_lon[1] + 1])) # put zero values outside country nightlight_reg[~on_land_reg] = 0.0 return nightlight_reg, lat_reg, lon_reg, on_land_reg def _resample_land(geom, nightlight, lat, lon, res_fact, on_land): """Model land exposures from nightlight intensities and normalized to GDP * (income_group + 1). Parameters ---------- geom : shapely.geometry geometry of the region to consider nightlight : np.array nightlight values lat : np.array latitude values in meshgrid lon : np.array longitude values in meshgrid res_fact : float resampling factor on_land : np.array array with true values in land points. same size as nightlight, lat, lon Returns ------- nightlight_res, lat_res, lon_res (1d arrays with nightlight on land values and coordinates) """ nightlight_res, lat_res, lon_res = nightlight, lat, lon if res_fact != 1.0: sum_val = nightlight.sum() nightlight_res = ndimage.zoom(nightlight, res_fact, mode='nearest') nightlight_res[nightlight_res < 0.0] = 0.0 lat_res, lon_res = np.mgrid[ lat[0, 0]: lat[-1, 0]: complex(0, nightlight_res.shape[0]), lon[0, 0]: lon[0, -1]: complex(0, nightlight_res.shape[1])] on_land = shapely.vectorized.contains(geom, lon_res, lat_res) nightlight_res[~on_land] = 0.0 nightlight_res = nightlight_res / nightlight_res.sum() * sum_val return nightlight_res[on_land].ravel(), lat_res[on_land], lon_res[on_land] def _set_econ_indicators(nightlight, gdp_val, inc_grp, poly_val): """Model land exposures from nightlight intensities and normalized to GDP * (income_group + 1). Parameters ---------- nightlight : np.matrix nightlight values gdp : float GDP to interpolate in the region inc_grp : float index to weight exposures in the region poly_val : list list of polynomial coefficients to apply to nightlight Returns ------- np.array """ if nightlight.sum() > 0: nightlight = polyval(np.asarray(nightlight), poly_val) nightlight = nightlight / nightlight.sum() * gdp_val * (inc_grp + 1) return nightlight