"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Agriculture exposures from ISIMIP and FAO.
"""
import logging
from pathlib import Path
import copy
import numpy as np
import xarray as xr
import pandas as pd
import h5py
from matplotlib import pyplot as plt
from climada import CONFIG
from climada.entity import Exposures, INDICATOR_IMPF
import climada.util.coordinates as u_coord
logging.root.setLevel(logging.DEBUG)
LOGGER = logging.getLogger(__name__)
DEF_HAZ_TYPE = 'RC'
"""Default hazard type used in impact functions id."""
BBOX = (-180, -85, 180, 85) # [Lon min, lat min, lon max, lat max]
""""Default geographical bounding box of the total global agricultural land extent"""
#ISIMIP input data specific global variables
YEARCHUNKS = dict()
"""start and end years per ISIMIP version and senario as in ISIMIP-filenames
of landuse data containing harvest area per crop"""
# two types of 1860soc (1661-2299 not implemented)
YEARCHUNKS['ISIMIP2'] = dict()
YEARCHUNKS['ISIMIP2']['1860soc'] = {'yearrange': (1800, 1860), 'startyear': 1661, 'endyear': 1860}
YEARCHUNKS['ISIMIP2']['histsoc'] = {'yearrange': (1976, 2005), 'startyear': 1861, 'endyear': 2005}
YEARCHUNKS['ISIMIP2']['2005soc'] = {'yearrange': (2006, 2099), 'startyear': 2006, 'endyear': 2299}
YEARCHUNKS['ISIMIP2']['rcp26soc'] = {'yearrange': (2006, 2099), 'startyear': 2006, 'endyear': 2099}
YEARCHUNKS['ISIMIP2']['rcp60soc'] = {'yearrange': (2006, 2099), 'startyear': 2006, 'endyear': 2099}
YEARCHUNKS['ISIMIP2']['2100rcp26soc'] = {'yearrange': (2100, 2299), 'startyear': 2100,
'endyear': 2299}
YEARCHUNKS['ISIMIP3'] = dict()
YEARCHUNKS['ISIMIP3']['histsoc'] = {'yearrange': (1983, 2013), 'startyear': 1850, 'endyear': 2014}
YEARCHUNKS['ISIMIP3']['2015soc'] = {'yearrange': (1983, 2013), 'startyear': 1850, 'endyear': 2014}
FN_STR_VAR = 'landuse-15crops_annual'
"""fix filename part in input data"""
CROP_NAME = dict()
"""mapping of crop names"""
CROP_NAME['mai'] = {'input': 'maize', 'fao': 'Maize', 'print': 'Maize'}
CROP_NAME['ric'] = {'input': 'rice', 'fao': 'Rice, paddy', 'print': 'Rice'}
CROP_NAME['whe'] = {'input': 'temperate_cereals', 'fao': 'Wheat', 'print': 'Wheat'}
CROP_NAME['soy'] = {'input': 'oil_crops_soybean', 'fao': 'Soybeans', 'print': 'Soybeans'}
CROP_NAME['ri1'] = {'input': 'rice', 'fao': 'Rice, paddy', 'print': 'Rice 1st season'}
CROP_NAME['ri2'] = {'input': 'rice', 'fao': 'Rice, paddy', 'print': 'Rice 2nd season'}
CROP_NAME['swh'] = {'input': 'temperate_cereals', 'fao': 'Wheat', 'print': 'Spring Wheat'}
CROP_NAME['wwh'] = {'input': 'temperate_cereals', 'fao': 'Wheat', 'print': 'Winter Wheat'}
"""mapping of irrigation parameter long names"""
IRR_NAME = {'combined': {'name': 'combined'},
'noirr': {'name': 'rainfed'},
'firr': {'name': 'irrigated'},
}
"""Conversion factor weight [tons] to nutritional value [kcal].
Based on Mueller et al. (2021), https://doi.org/10.1088/1748-9326/abd8fc :
"For the aggregation of different crops, we compute total calories, assuming
net water contents of 12% for maize, spring and winter wheat, 13% for rice and
9% for soybean, according to Wirsenius (2000) and caloric contents of the
“as purchased” biomass (i.e. including the water content) of 3.56kcal/g for maize,
2.8kcal/g for rice, 3.35kcal/g for soybean and of 3.34kcal/g for spring and
winter wheat, following FAO (2001).” (Müller et al., 2021)
Version 1: conversion factors for crop biomass "as purchased",
here applied as default for FAO-normalized production:
Production [kcal] = Production [t] * KCAL_PER_TON [kcal/t]
"""
KCAL_PER_TON = dict()
KCAL_PER_TON['biomass'] = {'mai': 3.56e6,
'ric': 2.80e6,
'soy': 3.35e6,
'whe': 3.34e6,
}
"""
Version 2: conversion factors for crop dry matter as simulated by most crop models,
here applied as default for raw ISIMIP model yields and derived production values:
Yield [kcal] = Yield [t] * KCAL_PER_TON [kcal/t] / (1-net_water_content_fraction)
"""
KCAL_PER_TON['drymatter'] = {'mai': 3.56e6 / (1-.12),
'ric': 2.80e6 / (1-.13),
'soy': 3.35e6 / (1-.09),
'whe': 3.34e6 / (1-.12),
}
# Default folder structure for ISIMIP data:
# deposit the landuse and FAO files in the directory:
# {CONFIG.exposures.crop_production.local_data}/Input/Exposure
# The FAO files need to be downloaded and renamed
# FAO_FILE: contains producer prices per crop, country and year
# (http://www.fao.org/faostat/en/#data/PP)
# FAO_FILE2: contains production quantity per crop, country and year
# (http://www.fao.org/faostat/en/#data/QC)
DATA_DIR = CONFIG.exposures.crop_production.local_data.dir()
INPUT_DIR = DATA_DIR.joinpath('Input', 'Exposure')
FAO_FILE = "FAOSTAT_data_producer_prices.csv"
FAO_FILE2 = "FAOSTAT_data_production_quantity.csv"
YEARS_FAO = (2008, 2018)
"""Default years from FAO used (data file contains values for 1991-2018)"""
# default output directory: climada_python/data/ISIMIP_crop/Output/Exposure
# by default the hist_mean files created by climada_python/hazard/crop_potential are saved in
# climada_python/data/ISIMIP_crop/Output/hist_mean/
HIST_MEAN_PATH = DATA_DIR.joinpath('Output', 'Hist_mean')
OUTPUT_DIR = DATA_DIR.joinpath('Output')
[docs]
class CropProduction(Exposures):
"""Defines agriculture exposures from ISIMIP input data and FAO crop data
geopandas GeoDataFrame with metadata and columns (pd.Series) defined in
Attributes and Exposures.
Attributes
----------
crop : str
crop typee.g., 'mai', 'ric', 'whe', 'soy'
"""
_metadata = Exposures._metadata + ['crop']
[docs]
def set_from_isimip_netcdf(self, *args, **kwargs):
"""This function is deprecated, use LitPop.from_isimip_netcdf instead."""
LOGGER.warning("The use of LitPop.set_from_isimip_netcdf is deprecated."
"Use LitPop.from_isimip_netcdf instead.")
self.__dict__ = CropProduction.from_isimip_netcdf(*args, **kwargs).__dict__
[docs]
@classmethod
def from_isimip_netcdf(cls, input_dir=None, filename=None, hist_mean=None,
bbox=None, yearrange=None, cl_model=None, scenario=None,
crop=None, irr=None, isimip_version=None,
unit=None, fn_str_var=None):
"""Wrapper to fill exposure from NetCDF file from ISIMIP. Requires historical
mean relative cropyield module as additional input.
Parameters
----------
input_dir : Path or str
path to input data directory,
default: {CONFIG.exposures.crop_production.local_data}/Input/Exposure
filename : string
name of the landuse data file to use,
e.g. "histsoc_landuse-15crops_annual_1861_2005.nc""
hist_mean : str or array
historic mean crop yield per centroid (or path)
bbox : list of four floats
bounding box:
[lon min, lat min, lon max, lat max]
yearrange : int tuple
year range for exposure set
e.g., (1990, 2010)
scenario : string
climate change and socio economic scenario
e.g., '1860soc', 'histsoc', '2005soc', 'rcp26soc','rcp60soc','2100rcp26soc'
cl_model : string
abbrev. climate model (only for future projections of lu data)
e.g., 'gfdl-esm2m', 'hadgem2-es', 'ipsl-cm5a-lr','miroc5'
crop : string
crop type
e.g., 'mai', 'ric', 'whe', 'soy'
irr : string
irrigation type, default: 'combined'
f.i 'firr' (full irrigation), 'noirr' (no irrigation) or 'combined'= firr+noirr
isimip_version : str
'ISIMIP2' (default) or 'ISIMIP3'
unit : string
unit of the exposure (per year)
f.i 't/y' (default), 'USD/y', or 'kcal/y'
fn_str_var : string
FileName STRing depending on VARiable and
ISIMIP simuation round
Returns
-------
Exposure
"""
# parameters not provided in method call are set to default values:
if irr is None:
irr = 'combined'
if not bbox:
bbox = BBOX
if not input_dir:
input_dir = INPUT_DIR
input_dir = Path(input_dir)
if hist_mean is None:
hist_mean = HIST_MEAN_PATH
if isinstance(hist_mean, str):
hist_mean = Path(hist_mean)
if not fn_str_var:
fn_str_var = FN_STR_VAR
if (not isimip_version) or (isimip_version in ('ISIMIP2a', 'ISIMIP2b')):
isimip_version = 'ISIMIP2'
elif isimip_version in ('ISIMIP3a', 'ISIMIP3b'):
isimip_version = 'ISIMIP3'
if (not scenario) or (scenario in ('historical', 'hist')):
scenario = 'histsoc'
if yearrange is None:
yearrange = YEARCHUNKS[isimip_version][scenario]['yearrange']
if not unit:
unit = 't/y'
if isinstance(filename, Path): # if Path, extract pure filename as string
if filename.is_file() and filename.parent.is_dir():
LOGGER.info('input_dir is reset from %s to %s', input_dir, filename.parent)
input_dir = filename.parent
filename = filename.parts[-1]
# The filename is set or other variables (cl_model, scenario) are extracted of the
# specified filename
if filename is None:
yearchunk = YEARCHUNKS[isimip_version][scenario]
# if scenario == 'histsoc' or scenario == '1860soc':
if scenario in ('histsoc', '1860soc'):
string = '{}_{}_{}_{}.nc'
filepath = Path(input_dir, string.format(scenario, fn_str_var,
yearchunk['startyear'],
yearchunk['endyear']))
else:
string = '{}_{}_{}_{}_{}.nc'
filepath = Path(input_dir, string.format(scenario, cl_model, fn_str_var,
yearchunk['startyear'],
yearchunk['endyear']))
elif scenario == 'flexible':
_, _, _, _, _, _, startyear, endyearnc = filename.split('_')
endyear = endyearnc.split('.')[0]
yearchunk = dict()
yearchunk = {'yearrange': (int(startyear), int(endyear)),
'startyear': int(startyear), 'endyear': int(endyear)}
filepath = Path(input_dir, filename)
else:
scenario, *_ = filename.split('_')
yearchunk = YEARCHUNKS[isimip_version][scenario]
filepath = Path(input_dir, filename)
# Dataset is opened and data within the bbox extends is extracted
data_set = xr.open_dataset(filepath, decode_times=False)
[lonmin, latmin, lonmax, latmax] = bbox
data = data_set.sel(lon=slice(lonmin, lonmax), lat=slice(latmax, latmin))
# The latitude and longitude are set; the region_id is determined
lon, lat = np.meshgrid(data.lon.values, data.lat.values)
exp = cls()
exp.gdf['latitude'] = lat.flatten()
exp.gdf['longitude'] = lon.flatten()
exp.gdf['region_id'] = u_coord.get_country_code(exp.gdf.latitude,
exp.gdf.longitude,
gridded=True)
# The indeces of the yearrange to be extracted are determined
time_idx = (int(yearrange[0] - yearchunk['startyear']),
int(yearrange[1] - yearchunk['startyear']))
# The area covered by a grid cell is calculated depending on the latitude
area = u_coord.get_gridcellarea(lat, resolution=0.5)
# The area covered by a crop is calculated as the product of the fraction and
# the grid cell size
if irr == 'combined':
irr_types = ['firr', 'noirr']
else:
irr_types = [irr]
area_crop = dict()
for irr_var in irr_types:
area_crop[irr_var] = (
getattr(
data, (CROP_NAME[crop])['input']+'_'+ (IRR_NAME[irr_var])['name']
)[time_idx[0]:time_idx[1], :, :].mean(dim='time')*area
).values
area_crop[irr_var] = np.nan_to_num(area_crop[irr_var]).flatten()
# set historic mean, its latitude, and longitude:
hist_mean_dict = dict()
# if hist_mean is given as np.ndarray or dict,
# code assumes it contains hist_mean as returned by relative_cropyield
# however structured in dictionary as hist_mean_dict, with same
# bbox extensions as the exposure:
if isinstance(hist_mean, dict):
if not ('firr' in hist_mean.keys() or 'noirr' in hist_mean.keys()):
# as a dict hist_mean, needs to contain key 'firr' or 'noirr';
# if irr=='combined', both 'firr' and 'noirr' are required.
raise ValueError(f'Invalid hist_mean provided: {hist_mean}')
hist_mean_dict = hist_mean
lat_mean = exp.gdf.latitude.values
elif isinstance(hist_mean, np.ndarray) or isinstance(hist_mean, list):
hist_mean_dict[irr_types[0]] = np.array(hist_mean)
lat_mean = exp.gdf.latitude.values
elif Path(hist_mean).is_dir(): # else if hist_mean is given as path to directory
# The adequate file from the directory (depending on crop and irrigation) is extracted
# and the variables hist_mean, lat_mean and lon_mean are set accordingly
for irr_var in irr_types:
filename = str(Path(hist_mean, 'hist_mean_%s-%s_%i-%i.hdf5' %(
crop, irr_var, yearrange[0], yearrange[1])))
hist_mean_dict[irr_var] = (h5py.File(filename, 'r'))['mean'][()]
lat_mean = (h5py.File(filename, 'r'))['lat'][()]
lon_mean = (h5py.File(filename, 'r'))['lon'][()]
elif Path(input_dir, hist_mean).is_file(): # file in input_dir
# Hist_mean, lat_mean and lon_mean are extracted from the given file
if len(irr_types) > 1:
raise ValueError("For irr=='combined', hist_mean cannot be a single file.")
hist_mean = h5py.File(str(Path(input_dir, hist_mean)), 'r')
hist_mean_dict[irr_types[0]] = hist_mean['mean'][()]
lat_mean = hist_mean['lat'][()]
lon_mean = hist_mean['lon'][()]
elif hist_mean.is_file(): # fall back: complete file path
# Hist_mean, lat_mean and lon_mean are extracted from the given file
if len(irr_types) > 1:
raise ValueError("For irr=='combined', hist_mean can not be single file.")
hist_mean = h5py.File(str(Path(input_dir, hist_mean)), 'r')
hist_mean_dict[irr_types[0]] = hist_mean['mean'][()]
lat_mean = hist_mean['lat'][()]
lon_mean = hist_mean['lon'][()]
else:
raise ValueError(f"Invalid hist_mean provided: {hist_mean}")
# The bbox is cut out of the hist_mean data file if needed
if len(lat_mean) != len(exp.gdf.latitude.values):
idx_mean = np.zeros(len(exp.gdf.latitude.values), dtype=int)
for i in range(len(exp.gdf.latitude.values)):
idx_mean[i] = np.where(
(lat_mean == exp.gdf.latitude.values[i])
& (lon_mean == exp.gdf.longitude.values[i])
)[0][0]
else:
idx_mean = np.arange(0, len(lat_mean))
# The exposure [t/y] is computed per grid cell as the product of the area covered
# by a crop [ha] and its yield [t/ha/y]
exp.gdf['value'] = np.squeeze(area_crop[irr_types[0]] * \
hist_mean_dict[irr_types[0]][idx_mean])
exp.gdf['value'] = np.nan_to_num(exp.gdf.value) # replace NaN by 0.0
for irr_val in irr_types[1:]: # add other irrigation types if irr=='combined'
value_tmp = np.squeeze(area_crop[irr_val]*hist_mean_dict[irr_val][idx_mean])
value_tmp = np.nan_to_num(value_tmp) # replace NaN by 0.0
exp.gdf['value'] += value_tmp
exp.description=("Crop production exposure from ISIMIP"
f" {CROP_NAME[crop]['print']} {irr}"
f" {yearrange[0]} {yearrange[-1]}")
exp.value_unit = 't/y' # input unit, will be reset below if required by user
exp.crop = crop
exp.ref_year = yearrange
try:
rows, cols, ras_trans = u_coord.pts_to_raster_meta(
(exp.gdf.longitude.min(), exp.gdf.latitude.min(),
exp.gdf.longitude.max(), exp.gdf.latitude.max()),
u_coord.get_resolution(exp.gdf.longitude, exp.gdf.latitude))
exp.meta = {
'width': cols,
'height': rows,
'crs': exp.crs,
'transform': ras_trans,
}
except ValueError:
LOGGER.warning('Could not write attribute meta, because exposure'
' has only 1 data point')
exp.meta = {}
if 'USD' in unit:
# set_value_to_usd() is called to compute the exposure in USD/y (country specific)
exp.set_value_to_usd(input_dir=input_dir)
elif 'kcal' in unit:
# set_value_to_kcal() is called to compute the exposure in kcal/y
# here, biomass=False because most crop models provide yield weight
# for dry matter, not biomass:
exp.set_value_to_kcal(biomass=False)
exp.check()
return exp
[docs]
def set_from_area_and_yield_nc4(self, *args, **kwargs):
"""This function is deprecated, use LitPop.from_area_and_yield_nc4 instead."""
LOGGER.warning("The use of LitPop.set_from_area_and_yield_nc4 is deprecated."
"Use LitPop.from_area_and_yield_nc4 instead.")
self.__dict__ = CropProduction.from_area_and_yield_nc4(*args, **kwargs).__dict__
[docs]
@classmethod
def from_area_and_yield_nc4(cls, crop_type, layer_yield, layer_area,
filename_yield, filename_area, var_yield,
var_area, bbox=BBOX, input_dir=INPUT_DIR):
"""
Set crop_production exposure from cultivated area [ha] and
yield [t/ha/year] provided in two netcdf files with the same grid.
Both input files need to be netcdf format and come with dimensions
'lon', 'lat' and 'crop'. The information which crop type is saved in which
crop layer in each input files needs to be provided manually via
the parameters 'layer_*'.
A convenience wrapper around this expert method is provided with
from_spam_ray_mirca().
Parameters
----------
crop_type : str
Crop type, e.g. 'mai' for maize, or 'ric', 'whe', 'soy', etc.
layer_yield : int
crop layer in yield input data set. Index typically starts with 1.
layer_area : int
crop layer in area input data set. Index typically starts with 1.
filename_yield : str
Name of netcdf-file containing gridded yield data.
Requires coordinates 'lon', 'lat', and 'crop'.
filename_area : str
Name of netcdf-file containing gridded cultivated area.
Requires coordinates 'lon', 'lat', and 'crop'.
var_yield : str
variable name to be extracted from yield file, e.g. 'yield.rf',
'yield.ir', 'yield.tot', or depending on netcdf structure.
var_area : str
variable name to be extracted from area file,
e.g. 'cultivated area rainfed', 'cultivated area irrigated',
'cultivated area all', or depending on netcdf structure.
bbox (tuple of four floats): bounding box:
bounding box to be extracted: (lon min, lat min, lon max, lat max).
The default is (-180, -85, 180, 85).
input_dir : Path, optional
directory where input data is found. The default is
{CONFIG.exposures.crop_production.local_data}/Input/Exposure.
Returns
-------
CropProduction
crop production exposure instance based on yield and area data
"""
if isinstance(input_dir, str):
input_dir = Path(input_dir)
[lonmin, latmin, lonmax, latmax] = bbox
# extract yield data to xarray.DataArray:
data_set_tmp = xr.open_dataset(input_dir / filename_yield, decode_times=False)
data_yield = data_set_tmp.sel(lon=slice(lonmin, lonmax),
lat=slice(latmax, latmin),
crop=layer_yield
)[var_yield]
# extract cultivated area data to xarray.DataArray:
data_set_tmp = xr.open_dataset(input_dir / filename_area, decode_times=False)
data_area = data_set_tmp.sel(lon=slice(lonmin, lonmax),
lat=slice(latmax, latmin),
crop=layer_area
)[var_area]
del data_set_tmp
# The latitude and longitude are set; region_id is determined
lon, lat = np.meshgrid(data_area.lon.values, data_area.lat.values)
exp = cls()
# initiate coordinates and values in GeoDatFrame:
exp.gdf['latitude'] = lat.flatten()
exp.gdf['longitude'] = lon.flatten()
exp.gdf['region_id'] = u_coord.get_country_code(exp.gdf.latitude,
exp.gdf.longitude, gridded=True)
exp.gdf[INDICATOR_IMPF + DEF_HAZ_TYPE] = 1
exp.gdf[INDICATOR_IMPF] = 1
# calc annual crop production, [t/y] = [ha] * [t/ha/y]:
exp.gdf['value'] = np.multiply(data_area.values, data_yield.values).flatten()
exp.crop = crop_type
exp.description=(f"Annual crop production from {var_area} and {var_yield} for"
f" {exp.crop} from files {filename_area} and {filename_yield}")
exp.value_unit = 't/y'
try:
rows, cols, ras_trans = u_coord.pts_to_raster_meta(
(exp.gdf.longitude.min(), exp.gdf.latitude.min(),
exp.gdf.longitude.max(), exp.gdf.latitude.max()),
u_coord.get_resolution(exp.gdf.longitude, exp.gdf.latitude))
exp.meta = {
'width': cols,
'height': rows,
'crs': exp.crs,
'transform': ras_trans,
}
except ValueError:
LOGGER.warning('Could not write attribute meta, because exposure'
' has only 1 data point')
exp.meta = {}
return exp
[docs]
def set_from_spam_ray_mirca(self, *args, **kwargs):
"""This function is deprecated, use CropPoduction.from_spam_ray_mirca instead."""
LOGGER.warning("The use of CropPoduction.set_from_spam_ray_mirca is deprecated."
"Use CropPoduction.from_spam_ray_mirca instead.")
self.__dict__ = CropProduction.from_spam_ray_mirca(*args, **kwargs).__dict__
[docs]
@classmethod
def from_spam_ray_mirca(cls, crop_type, irrigation_type='all',
bbox=BBOX, input_dir=INPUT_DIR):
"""
Wrapper method around from_area_and_yield_nc4().
Set crop_production exposure from cultivated area [ha] and
yield [t/ha/year] provided in default input files.
The default input files are based on the public yield data from
SPAM2005 with gaps filled based on Ray et.al (2012); and cultivated area
from MIRCA2000, both as post-processed by Jägermeyr et al. 2020; See
https://doi.org/10.1073/pnas.1919049117 for more information and cite
when using this data for publication.
Parameters
----------
crop_type : str
Crop type, e.g. 'mai' for maize, or 'ric', 'whe', 'soy', etc.
irrigation_type : str, optional
irrigation type to be extracted, the options are:
'all' : total crop production, i.e. irrigated + rainfed
'firr' : fully irrigated
'noirr' : not irrigated, i.e., rainfed
The default is 'all'
bbox (list of four floats): bounding box:
[lon min, lat min, lon max, lat max]
input_dir : Path, optional
directory where input data is found. The default is
{CONFIG.exposures.crop_production.local_data}/Input/Exposure.
Returns
------
Exposure
Crop production exposure based on SPAM and MIRCA data set
"""
filename_yield = 'spam_ray_yields.nc4'
filename_area = 'cultivated_area_MIRCA_GGCMI.nc4'
# crop layers and variable names in default input files:
layers_yield = {'mai': 1, 'whe': 2, 'soy': 4, 'ric': 3}
layers_area = {'mai': 1, 'whe': 2, 'soy': 3, 'ric': 4}
# Note: layer numbers fo rice and soybean differ between input files.
varnames_yield = {'noirr': 'yield.rf',
'firr': 'yield.ir',
'all': 'yield.tot'}
varnames_area = {'noirr': 'cultivated area rainfed',
'firr': 'cultivated area irrigated',
'all': 'cultivated area all'}
# return exposure from netcdf files:
return cls.from_area_and_yield_nc4(crop_type, layers_yield[crop_type],
layers_area[crop_type],
filename_yield, filename_area,
varnames_yield[irrigation_type],
varnames_area[irrigation_type],
bbox=bbox, input_dir=input_dir)
[docs]
def set_mean_of_several_isimip_models(self, *args, **kwargs):
"""This function is deprecated, use
CropPoduction.from_mean_of_several_isimip_models instead."""
LOGGER.warning("The use of CropPoduction.set_mean_of_several_isimip_models is deprecated."
"Use CropPoduction.from_mean_of_several_isimip_models instead.")
self.__dict__ = CropProduction.from_mean_of_several_isimip_models(*args, **kwargs).__dict__
[docs]
@classmethod
def from_mean_of_several_isimip_models(cls, input_dir=None, hist_mean=None, bbox=None,
yearrange=None, cl_model=None, scenario=None,
crop=None, irr=None, isimip_version=None,
unit=None, fn_str_var=None):
"""Wrapper to init exposure from several NetCDF files with crop yield data
from ISIMIP.
Parameters
----------
input_dir : string
path to input data directory
hist_mean : array
historic mean crop production per centroid
bbox : list of four floats
bounding box:
[lon min, lat min, lon max, lat max]
yearrange : int tuple
year range for exposure set,e.g., (1976, 2005)
scenario : string
climate change and socio economic scenario
e.g., 'histsoc' or 'rcp60soc'
cl_model : string
abbrev. climate model (only when landuse data
is future projection)
e.g., 'gfdl-esm2m' etc.
crop : string
crop type
e.g., 'mai', 'ric', 'whe', 'soy'
irr : string
irrigation type
f.i 'rainfed', 'irrigated' or 'combined'= rainfed+irrigated
isimip_version : str
'ISIMIP2' (default) or 'ISIMIP3'
unit : string
unit of the exposure (per year)
f.i 't/y' (default), 'USD/y', or 'kcal/y'
fn_str_var : string
FileName STRing depending on VARiable and
ISIMIP simuation round
Returns
-------
Exposure
"""
if not bbox:
bbox = BBOX
if (not isimip_version) or (isimip_version in ('ISIMIP2a', 'ISIMIP2b')):
isimip_version = 'ISIMIP2'
elif isimip_version in ('ISIMIP3a', 'ISIMIP3b'):
isimip_version = 'ISIMIP3'
if not input_dir:
input_dir = INPUT_DIR
input_dir = Path(input_dir)
if not hist_mean:
hist_mean = HIST_MEAN_PATH
if yearrange is None:
yearrange = YEARCHUNKS[isimip_version]['histsoc']['yearrange']
if not unit:
unit = 't/y'
if not fn_str_var:
fn_str_var = FN_STR_VAR
filenames = dict()
filenames['all'] = [f for f in Path(input_dir).iterdir()
if f.is_file() and 'nc' in f.name and not f.name.startswith('.')]
# If only files with a certain scenario and or cl_model shall be considered, they
# are extracted from the original list of files
filenames['subset'] = list()
for name in filenames['all']:
if cl_model is not None and scenario is not None:
if cl_model in name or scenario in name:
filenames['subset'].append(name)
elif cl_model is not None and scenario is None:
if cl_model in name:
filenames['subset'].append(name)
elif cl_model is None and scenario is not None:
if scenario in name:
filenames['subset'].append(name)
else:
filenames['subset'] = filenames['all']
# The first exposure is calculate to determine its size
# and initialize the combined exposure
exp = cls.from_isimip_netcdf(input_dir, filename=filenames['subset'][0],
hist_mean=hist_mean, bbox=bbox, yearrange=yearrange,
crop=crop, irr=irr, isimip_version=isimip_version,
unit=unit, fn_str_var=fn_str_var)
combined_exp = np.zeros([exp.gdf.value.size, len(filenames['subset'])])
combined_exp[:, 0] = exp.gdf.value
# The calculations are repeated for all remaining exposures (starting from index 1 as
# the first exposure has been saved in combined_exp[:, 0])
for j, fname in enumerate(filenames['subset'][1:]):
exp = cls.from_isimip_netcdf(input_dir, filename=fname, hist_mean=hist_mean,
bbox=bbox, yearrange=yearrange,
crop=crop, irr=irr, unit=unit,
isimip_version=isimip_version)
combined_exp[:, j+1] = exp.gdf.value
exp.gdf.value = np.mean(combined_exp, 1)
exp.gdf['crop'] = crop
exp.check()
return exp
[docs]
def set_value_to_kcal(self, *args, **kwargs):
"""This function is deprecated, use function value_to_kcal instead."""
LOGGER.warning("The use of CropProduction.set_value_to_kcal is deprecated."
"Use function value_to_kcal instead.")
self.__dict__ = value_to_kcal(self, *args, **kwargs).__dict__
[docs]
def set_value_to_usd(self, *args, **kwargs):
"""This function is deprecated, use functiom value_to_usd instead."""
LOGGER.warning("The use of CropProduction.set_value_to_usd is deprecated."
"Use function value_to_usd instead.")
self.__dict__ = value_to_usd(self, *args, **kwargs).__dict__
[docs]
def aggregate_countries(self):
"""Aggregate exposure data by country.
Returns
-------
list_countries : list
country codes (numerical ISO3)
country_values : array
aggregated exposure value
"""
list_countries = np.unique(self.gdf.region_id)
country_values = np.zeros(len(list_countries))
for i, iso_nr in enumerate(list_countries):
country_values[i] = self.gdf.loc[self.gdf.region_id == iso_nr].value.sum()
return list_countries, country_values
[docs]
def value_to_kcal(exp_cp, biomass=True):
"""Converts the exposure value from tonnes to kcal per year using
conversion factor per crop type.
Parameters
----------
exp_cp : CropProduction
CropProduction exposure object with units tonnes per year ('t/y')
biomass : bool, optional
if true, KCAL_PER_TON['biomass'] is used (default,
for FAO normalized crop production). If False, KCAL_PER_TON['drymatter']
is used (best for crop model output in dry matter, default for
raw crop model output).
Default: True
Returns
-------
new_exp : CropProduction
CropProduction exposure object with unit 'kcal/y'
"""
if exp_cp.value_unit != 't/y':
LOGGER.warning('self.unit is not t/y.')
# create deep copy of exposure
new_exp = copy.deepcopy(exp_cp)
new_exp.gdf['tonnes_per_year'] = exp_cp.gdf['value'].values
if biomass:
new_exp.gdf.value *= KCAL_PER_TON['biomass'][exp_cp.crop]
else:
new_exp.gdf.value *= KCAL_PER_TON['drymatter'][exp_cp.crop]
new_exp.value_unit = 'kcal/y'
return new_exp
[docs]
def value_to_usd(exp_cp, input_dir=None, yearrange=None):
# to do: check api availability?; default yearrange for single year (e.g. 5a)
"""Calculates the exposure in USD per year using country and year specific
data published by the FAO, requires crop production exposure with unit 't/y'
Parameters
----------
exp_cp : CropProduction
CropProduction exposure object with units tonnes per year ('t/y')
input_dir : Path or str, optional
directory containing the input (FAO pricing) data,
default: {CONFIG.exposures.crop_production.local_data}/Input/Exposure
yearrange : np.array, optional
year range for prices, can also be set to a single year
Default is set to the arbitrary time range (2000, 2018)
The data is available for the years 1991-2018
crop : str
crop type
e.g., 'mai', 'ric', 'whe', 'soy'
Returns
-------
new_exp : CropProduction
CropProduction exposure object with unit 'USD/y'
"""
if not input_dir:
input_dir = INPUT_DIR
input_dir = Path(input_dir)
if yearrange is None:
yearrange = YEARS_FAO
# create deep copy of exposure
new_exp = copy.deepcopy(exp_cp)
# account for the case of only specifying one year as yearrange
if len(yearrange) == 1:
yearrange = (yearrange[0], yearrange[0])
# open both FAO files and extract needed variables
# FAO_FILE: contains producer prices per crop, country and year
fao = dict()
fao['file'] = pd.read_csv(input_dir / FAO_FILE)
fao['crops'] = fao['file'].Item.values
fao['year'] = fao['file'].Year.values
fao['price'] = fao['file'].Value.values
fao_country = u_coord.country_faocode2iso(getattr(fao['file'], 'Area Code').values)
# create deep copy of exposure
new_exp = copy.deepcopy(exp_cp)
# create a list of the countries contained in the exposure
iso3num = list()
new_exp.gdf.region_id[exp_cp.gdf.region_id == -99] = 0
iso3num = np.asarray(u_coord.country_to_iso(
new_exp.gdf.region_id, representation="numeric", fillvalue=999), dtype=object)
list_countries = np.unique(iso3num)
# iterate over all countries that are covered in the exposure, extract the according price
# and calculate the crop production in USD/y
area_price = np.zeros(new_exp.gdf.value.size)
for country in list_countries:
[idx_country] = (iso3num == country).nonzero()
if country == 999:
price = 0
area_price[idx_country] = new_exp.gdf.value[idx_country] * price
# zero means no country, 999 other country
elif country != 0 and country != 999:
idx_price = np.where((np.asarray(fao_country) == country) &
(np.asarray(fao['crops']) == \
(CROP_NAME[new_exp.crop])['fao']) &
(fao['year'] >= yearrange[0]) &
(fao['year'] <= yearrange[1]))
# if no price can be determined for a specific yearrange and country, the world
# average for that crop (in the specified yearrange) is used
if idx_price[0].size != 0:
price = np.mean(fao['price'][idx_price])
else:
idx_price = np.where((np.asarray(fao['crops']) == \
(CROP_NAME[new_exp.crop])['fao']) &
(fao['year'] >= yearrange[0]) &
(fao['year'] <= yearrange[1]))
price = np.mean(fao['price'][idx_price])
area_price[idx_country] = new_exp.gdf.value[idx_country] * price
# the exposure values in t/y is saved as 'tonnes_per_year'
new_exp.gdf['tonnes_per_year'] = exp_cp.gdf['value'].values
new_exp.gdf['value'] = area_price
new_exp.value_unit = 'USD/y'
new_exp.check()
return new_exp
[docs]
def init_full_exp_set_isimip(input_dir=None, filename=None, hist_mean_dir=None,
output_dir=None, bbox=None, yearrange=None, unit=None,
isimip_version=None, return_data=False):
"""Generates CropProduction instances (exposure sets) for all files found in the
input directory and saves them as hdf5 files in the output directory.
Exposures are aggregated per crop and irrigation type.
Parameters
----------
input_dir : str or Path
path to input data directory,
default: {CONFIG.exposures.crop_production.local_data}/Input/Exposure
filename : string
if not specified differently, the file
'histsoc_landuse-15crops_annual_1861_2005.nc' will be used
output_dir : string
path to output data directory
bbox : list of four floats
bounding box:
[lon min, lat min, lon max, lat max]
yearrange : array
year range for hazard set, e.g., (1976, 2005)
isimip_version : str
'ISIMIP2' (default) or 'ISIMIP3'
unit : str
unit in which to return exposure (e.g., t/y or USD/y)
return_data : boolean
returned output
False: returns list of filenames only, True: returns also list of data
Returns
-------
filename_list : list
all filenames of saved initiated exposure files
output_list : list
list containing all inisiated Exposure instances
"""
if not bbox:
bbox = BBOX
if (not isimip_version) or (isimip_version in ('ISIMIP2a', 'ISIMIP2b')):
isimip_version = 'ISIMIP2'
elif isimip_version in ('ISIMIP3a', 'ISIMIP3b'):
isimip_version = 'ISIMIP3'
if not input_dir:
input_dir = INPUT_DIR
input_dir = Path(input_dir)
if not hist_mean_dir:
hist_mean_dir = HIST_MEAN_PATH
hist_mean_dir = Path(hist_mean_dir)
if not output_dir:
output_dir = OUTPUT_DIR
output_dir = Path(output_dir)
if yearrange is None:
yearrange = YEARCHUNKS[isimip_version]['histsoc']['yearrange']
if not unit:
unit = 't/y'
filenames = [f.name for f in hist_mean_dir.iterdir()
if f.is_file() and not f.name.startswith('.')]
# generate output directory if it does not exist yet
target_dir = output_dir / 'Exposure'
target_dir.mkdir(exist_ok=True)
# create exposures for all crop-irrigation combinations and save them
filename_list = list()
output_list = list()
for file in filenames:
_, _, crop_irr, *_ = file.split('_')
crop, irr = crop_irr.split('-')
crop_production = CropProduction.from_isimip_netcdf(input_dir=input_dir,
filename=filename,
hist_mean=hist_mean_dir, bbox=bbox,
isimip_version=isimip_version,
yearrange=yearrange, crop=crop, irr=irr, unit=unit)
filename_expo = ('crop_production_' + crop + '-'+ irr + '_'
+ str(yearrange[0]) + '-' + str(yearrange[1]) + '.hdf5')
filename_list.append(filename_expo)
crop_production.write_hdf5(str(Path(target_dir, filename_expo)))
if return_data:
output_list.append(crop_production)
return filename_list, output_list
[docs]
def normalize_with_fao_cp(exp_firr, exp_noirr, input_dir=None,
yearrange=None, unit=None, return_data=True):
"""Normalize (i.e., bias correct) the given exposures countrywise with the mean
crop production quantity documented by the FAO.
Refer to the beginning of the script for guidance on where to download the
required crop production data from FAO.Stat.
Parameters
----------
exp_firr : crop_production
exposure under full irrigation
exp_noirr : crop_production
exposure under no irrigation
input_dir : Path or str
directory containing exposure input data,
default: {CONFIG.exposures.crop_production.local_data}/Input/Exposure
yearrange : array
the mean crop production in this year range is used to normalize
the exposure data
Default is set to the arbitrary time range (2008, 2018)
The data is available for the years 1961-2018
unit : str
unit in which to return exposure (t/y or USD/y)
return_data : boolean
returned output
True: returns country list, ratio = FAO/ISIMIP, normalized exposures, crop production
per country as documented by the FAO and calculated by the ISIMIP dataset
False: country list, ratio = FAO/ISIMIP, normalized exposures
Returns
-------
country_list : list
List of country codes (numerical ISO3)
ratio : list
List of ratio of FAO crop production and aggregated exposure
for each country
exp_firr_norm : CropProduction
Normalized CropProduction (full irrigation)
exp_noirr_norm : CropProduction
Normalized CropProduction (no irrigation)
Returns : optional
fao_crop_production : list
FAO crop production value per country
exp_tot_production : list
Exposure crop production value per country
(before normalization)
"""
if not input_dir:
input_dir = INPUT_DIR
input_dir = Path(input_dir)
if yearrange is None:
yearrange = YEARS_FAO
if not unit:
unit = 't/y'
# if the exposure unit is USD/y or kcal/y, temporarily reset the exposure to t/y
# (stored in tonnes_per_year) in order to normalize with FAO crop production
# values and then apply set_to_XXX() for the normalized exposure to restore the
# initial exposure unit
if exp_firr.value_unit == 'USD/y' or 'kcal' in exp_firr.value_unit:
exp_firr.gdf.value = exp_firr.tonnes_per_year
if exp_noirr.value_unit == 'USD/y' or 'kcal' in exp_noirr.value_unit:
exp_noirr.gdf.value = exp_noirr.tonnes_per_year
country_list, countries_firr = exp_firr.aggregate_countries()
country_list, countries_noirr = exp_noirr.aggregate_countries()
exp_tot_production = countries_firr + countries_noirr
fao = pd.read_csv(input_dir / FAO_FILE2)
fao_crops = fao.Item.values
fao_year = fao.Year.values
fao_values = fao.Value.values
fao_code = getattr(fao, 'Area Code').values
fao_country = u_coord.country_iso2faocode(country_list)
fao_crop_production = np.zeros(len(country_list))
ratio = np.ones(len(country_list))
exp_firr_norm = exp_firr.copy(deep=True)
exp_noirr_norm = exp_noirr.copy(deep=True)
# loop over countries: compute ratio & apply normalization:
for country, iso_nr in enumerate(country_list):
idx = np.where((np.asarray(fao_code) == fao_country[country])
& (np.asarray(fao_crops) == (CROP_NAME[exp_firr.crop])['fao'])
& (fao_year >= yearrange[0]) & (fao_year <= yearrange[1]))
if len(idx) >= 1:
fao_crop_production[country] = np.mean(fao_values[idx])
# if a country has no values in the exposure (e.g. Cyprus) the exposure value
# is set to the FAO average value
# in this case the ratio is left being 1 (as initiated)
if exp_tot_production[country] == 0:
exp_tot_production[country] = fao_crop_production[country]
elif fao_crop_production[country] != np.nan and fao_crop_production[country] != 0:
ratio[country] = fao_crop_production[country] / exp_tot_production[country]
exp_firr_norm.gdf.value[exp_firr.gdf.region_id == iso_nr] = ratio[country] * \
exp_firr.gdf.value[exp_firr.gdf.region_id == iso_nr]
exp_noirr_norm.gdf.value[exp_firr.gdf.region_id == iso_nr] = ratio[country] * \
exp_noirr.gdf.value[exp_noirr.gdf.region_id == iso_nr]
if unit == 'USD/y' or exp_noirr.value_unit == 'USD/y':
exp_noirr.set_value_to_usd(input_dir=input_dir)
elif 'kcal' in unit or 'kcal' in exp_noirr.value_unit:
exp_noirr.set_value_to_kcal(biomass=True)
# FAO production is provided in biomass, not dry matter
if unit == 'USD/y' or exp_firr.value_unit == 'USD/y':
exp_firr.set_value_to_usd(input_dir=input_dir)
elif 'kcal' in unit or 'kcal' in exp_firr.value_unit:
exp_firr.set_value_to_kcal(biomass=True)
exp_firr_norm.description = " ".join([exp_firr_norm.description or "", "normalized"])
exp_noirr_norm.description = " ".join([exp_noirr_norm.description or "", "normalized"])
if return_data:
return country_list, ratio, exp_firr_norm, exp_noirr_norm, \
fao_crop_production, exp_tot_production
return country_list, ratio, exp_firr_norm, exp_noirr_norm
[docs]
def normalize_several_exp(input_dir=None, output_dir=None,
yearrange=None, unit=None, return_data=True):
"""
Multiple exposure sets saved as HDF5 files in input directory are normalized
(i.e. bias corrected) against FAO statistics of crop production.
Parameters
----------
input_dir : Path or str
directory containing exposure input data
output_dir : Path or str
directory containing exposure datasets (output of
exposure creation)
yearrange : array
the mean crop production in this year range is used to normalize
the exposure data (default 2008-2018)
unit : str
unit in which to return exposure (t/y or USD/y)
return_data : boolean
returned output
True: lists containing data for each exposure file. Lists: crops, country list,
ratio = FAO/ISIMIP, normalized exposures, crop production
per country as documented by the FAO and calculated by the ISIMIP dataset
False: lists containing data for each exposure file. Lists: crops, country list,
ratio = FAO/ISIMIP, normalized exposures
Returns
-------
crop_list : list
List of crops
country_list : list
List of country codes (numerical ISO3)
ratio : list
List of ratio of FAO crop production and aggregated exposure
for each country
exp_firr_norm : list
List of normalized CropProduction Exposures (full irrigation)
exp_noirr_norm : list
List of normalize CropProduction Exposures (no irrigation)
fao_crop_production : list, optional
FAO crop production value per country
exp_tot_production : list, optional
Exposure crop production value per country
(before normalization)
"""
if input_dir is None:
input_dir = INPUT_DIR
if output_dir is None:
output_dir = OUTPUT_DIR
output_dir = Path(output_dir)
if not unit:
unit = 't/y'
if yearrange is None:
yearrange = YEARS_FAO
filenames_firr = [f.parts[-1] for f in (output_dir / 'Exposure').iterdir() if
f.is_file() if not f.parts[-1].startswith('.') if
'firr' in f.parts[-1]]
crop_list = list()
countries_list = list()
ratio_list = list()
exp_firr_norm = list()
exp_noirr_norm = list()
fao_cp_list = list()
exp_tot_cp_list = list()
for file_firr in filenames_firr:
_, _, crop_irr, years = file_firr.split('_')
crop, _ = crop_irr.split('-')
exp_firr = CropProduction.from_hdf5(str(Path(output_dir, 'Exposure', file_firr)))
filename_noirr = 'crop_production_' + crop + '-' + 'noirr' + '_' + years
exp_noirr = CropProduction.from_hdf5(str(Path(output_dir, 'Exposure', filename_noirr)))
if return_data:
countries, ratio, exp_firr2, exp_noirr2, fao_cp, \
exp_tot_cp = normalize_with_fao_cp(exp_firr, exp_noirr, input_dir=input_dir,
yearrange=yearrange, unit=unit)
fao_cp_list.append(fao_cp)
exp_tot_cp_list.append(exp_tot_cp)
else:
countries, ratio, exp_firr2, exp_noirr2 = normalize_with_fao_cp(
exp_firr, exp_noirr, input_dir=input_dir,
yearrange=yearrange, unit=unit, return_data=False)
crop_list.append(crop)
countries_list.append(countries)
ratio_list.append(ratio)
exp_firr_norm.append(exp_firr2)
exp_noirr_norm.append(exp_noirr2)
if return_data:
return crop_list, countries_list, ratio_list, exp_firr_norm, exp_noirr_norm, \
fao_cp_list, exp_tot_cp_list
return crop_list, countries_list, ratio_list, exp_firr_norm, exp_noirr_norm
[docs]
def semilogplot_ratio(crop, countries, ratio, output_dir=None, save=True):
"""Plot ratio = FAO/ISIMIP against country codes.
Parameters
----------
crop : str
crop to plot
countries : list
country codes of countries to plot
ratio : array
ratio = FAO/ISIMIP crop production data of countries to plot
save : boolean
True saves figure, else figure is not saved.
output_dir : str
directory to save figure
Returns
-------
fig (plt figure handle)
axes (plot axes handle)
"""
if output_dir is None:
output_dir = OUTPUT_DIR
output_dir = Path(output_dir)
fig = plt.figure()
axes = plt.gca()
axes.scatter(countries[ratio != 1], ratio[ratio != 1])
axes.set_yscale('log')
axes.set_ylabel('Ratio= FAO / ISIMIP')
axes.set_xlabel('ISO3 country code')
axes.set_ylim(np.nanmin(ratio), np.nanmax(ratio))
plt.title(crop)
if save:
target_dir = output_dir / 'Exposure_norm_plots'
target_dir.mkdir(exist_ok=True)
plt.savefig(target_dir / 'fig_ratio_norm_' + crop)
return fig, axes