"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define the SupplyChain class.
"""
__all__ = ["SupplyChain"]
import logging
from pathlib import Path
import zipfile
import warnings
import pandas as pd
import numpy as np
import pymrio
from climada import CONFIG
from climada.util import files_handler as u_fh
import climada.util.coordinates as u_coord
LOGGER = logging.getLogger(__name__)
WIOD_FILE_LINK = CONFIG.engine.supplychain.resources.wiod16.str()
"""Link to the 2016 release of the WIOD tables."""
MRIOT_DIRECTORY = CONFIG.engine.supplychain.local_data.mriot.dir()
"""Directory where Multi-Regional Input-Output Tables (MRIOT) are downloaded."""
calc_G = pymrio.calc_L
def calc_v(Z, x):
"""Calculate value added (v) from Z and x
value added = industry output (x) - inter-industry inputs (sum_rows(Z))
Parameters
----------
Z : pandas.DataFrame or numpy.array
Symmetric input output table (flows)
x : pandas.DataFrame or numpy.array
industry output
Returns
-------
pandas.DataFrame or numpy.array
Value added v as row vector
Notes
-----
This function adapts pymrio.tools.iomath.calc_x to compute
value added (v).
"""
value_added = np.diff(np.vstack((Z.sum(0), x.T)), axis=0)
if isinstance(Z, pd.DataFrame):
value_added = pd.DataFrame(value_added, columns=Z.index, index=["indout"])
if isinstance(value_added, pd.Series):
value_added = pd.DataFrame(value_added)
if isinstance(value_added, pd.DataFrame):
value_added.index = ["indout"]
return value_added
def calc_B(Z, x):
"""Calculate the B matrix (allocation coefficients matrix)
from Z matrix and x vector
Parameters
----------
Z : pandas.DataFrame or numpy.array
Symmetric input output table (flows)
x : pandas.DataFrame or numpy.array
Industry output column vector
Returns
-------
pandas.DataFrame or numpy.array
Allocation coefficients matrix B.
Same type as input parameter ``Z``.
Notes
-----
This function adapts pymrio.tools.iomath.calc_A to compute
the allocation coefficients matrix B.
"""
if isinstance(x, (pd.DataFrame, pd.Series)):
x = x.to_numpy()
if not isinstance(x, np.ndarray) and (x == 0):
recix = 0
else:
with warnings.catch_warnings():
# Ignore devide by zero warning, we set to 0 afterwards
warnings.filterwarnings("ignore", message="divide by zero")
recix = 1 / x
recix[np.isinf(recix)] = 0
if isinstance(Z, pd.DataFrame):
return pd.DataFrame(Z.to_numpy() * recix, index=Z.index, columns=Z.columns)
return Z * recix
def calc_x_from_G(G, v):
"""Calculate the industry output x from a v vector and G matrix
x = vG
The industry output x is computed from a value-added vector v
Parameters
----------
v : pandas.DataFrame or numpy.array
a row vector of the total final added-value
G : pandas.DataFrame or numpy.array
Symmetric input output Ghosh table
Returns
-------
pandas.DataFrame or numpy.array
Industry output x as column vector.
Same type as input parameter ``G``.
Notes
-----
This function adapts the function pymrio.tools.iomath.calc_x_from_L to
compute total output (x) from the Ghosh inverse.
"""
x = v.dot(G)
if isinstance(x, pd.Series):
x = pd.DataFrame(x)
if isinstance(x, pd.DataFrame):
x.columns = ["indout"]
return x
def parse_mriot_from_df(
mriot_df=None, col_iso3=None, col_sectors=None, rows_data=None, cols_data=None
):
"""Build multi-index dataframes of the transaction matrix, final demand and total
production from a Multi-Regional Input-Output Table dataframe.
Parameters
----------
mriot_df : pandas.DataFrame
The Multi-Regional Input-Output Table
col_iso3 : int
Column's position of regions' iso names
col_sectors : int
Column's position of sectors' names
rows_data : (int, int)
Tuple of integers with positions of rows
containing the MRIOT data
cols_data : (int, int)
Tuple of integers with positions of columns
containing the MRIOT data
"""
start_row, end_row = rows_data
start_col, end_col = cols_data
sectors = mriot_df.iloc[start_row:end_row, col_sectors].unique()
regions = mriot_df.iloc[start_row:end_row, col_iso3].unique()
multiindex = pd.MultiIndex.from_product(
[regions, sectors], names=["region", "sector"]
)
Z = mriot_df.iloc[start_row:end_row, start_col:end_col].values.astype(float)
Z = pd.DataFrame(data=Z, index=multiindex, columns=multiindex)
Y = mriot_df.iloc[start_row:end_row, end_col:-1].sum(1).values.astype(float)
Y = pd.DataFrame(data=Y, index=multiindex, columns=["final demand"])
x = mriot_df.iloc[start_row:end_row, -1].values.astype(float)
x = pd.DataFrame(data=x, index=multiindex, columns=["total production"])
return Z, Y, x
def mriot_file_name(mriot_type, mriot_year):
"""Retrieve the original EXIOBASE3, WIOD16 or OECD21 MRIOT file name
Parameters
----------
mriot_type : str
mriot_year : int
Returns
-------
str
name of MRIOT file
"""
if mriot_type == "EXIOBASE3":
return f"IOT_{mriot_year}_ixi.zip"
if mriot_type == "WIOD16":
return f"WIOT{mriot_year}_Nov16_ROW.xlsb"
if mriot_type == "OECD21":
return f"ICIO2021_{mriot_year}.csv"
raise ValueError("Unknown MRIOT type")
def download_mriot(mriot_type, mriot_year, download_dir):
"""Download EXIOBASE3, WIOD16 or OECD21 Multi-Regional Input Output Tables
for specific years.
Parameters
----------
mriot_type : str
mriot_year : int
download_dir : pathlib.PosixPath
Notes
-----
The download of EXIOBASE3 and OECD21 tables makes use of pymrio functions.
The download of WIOD16 tables requires ad-hoc functions, since the
related pymrio functions were broken at the time of implementation
of this function.
"""
if mriot_type == "EXIOBASE3":
pymrio.download_exiobase3(
storage_folder=download_dir, system="ixi", years=[mriot_year]
)
elif mriot_type == "WIOD16":
download_dir.mkdir(parents=True, exist_ok=True)
downloaded_file_name = u_fh.download_file(
WIOD_FILE_LINK,
download_dir=download_dir,
)
downloaded_file_zip_path = Path(downloaded_file_name + ".zip")
Path(downloaded_file_name).rename(downloaded_file_zip_path)
with zipfile.ZipFile(downloaded_file_zip_path, "r") as zip_ref:
zip_ref.extractall(download_dir)
elif mriot_type == "OECD21":
years_groups = ["1995-1999", "2000-2004", "2005-2009", "2010-2014", "2015-2018"]
year_group = years_groups[int(np.floor((mriot_year - 1995) / 5))]
pymrio.download_oecd(storage_folder=download_dir, years=year_group)
def parse_mriot(mriot_type, downloaded_file):
"""Parse EXIOBASE3, WIOD16 or OECD21 MRIOT for specific years
Parameters
----------
mriot_type : str
downloaded_file : pathlib.PosixPath
Notes
-----
The parsing of EXIOBASE3 and OECD21 tables makes use of pymrio functions.
The parsing of WIOD16 tables requires ad-hoc functions, since the
related pymrio functions were broken at the time of implementation
of this function.
"""
if mriot_type == "EXIOBASE3":
mriot = pymrio.parse_exiobase3(path=downloaded_file)
# no need to store A
mriot.A = None
elif mriot_type == "WIOD16":
mriot_df = pd.read_excel(downloaded_file, engine="pyxlsb")
Z, Y, x = parse_mriot_from_df(
mriot_df,
col_iso3=2,
col_sectors=1,
rows_data=(5, 2469),
cols_data=(4, 2468),
)
mriot = pymrio.IOSystem(Z=Z, Y=Y, x=x)
multiindex_unit = pd.MultiIndex.from_product(
[mriot.get_regions(), mriot.get_sectors()],
names = ['region', 'sector']
)
mriot.unit = pd.DataFrame(
data = np.repeat(["M.EUR"], len(multiindex_unit)),
index = multiindex_unit,
columns = ["unit"]
)
elif mriot_type == "OECD21":
mriot = pymrio.parse_oecd(path=downloaded_file)
mriot.x = pymrio.calc_x(mriot.Z, mriot.Y)
else:
raise RuntimeError(f"Unknown mriot_type: {mriot_type}")
return mriot
[docs]
class SupplyChain:
"""SupplyChain class.
The SupplyChain class provides methods for loading Multi-Regional Input-Output
Tables (MRIOT) and computing direct, indirect and total impacts.
Attributes
----------
mriot : pymrio.IOSystem
An object containing all MRIOT related info (see also pymrio package):
mriot.Z : transaction matrix, or interindustry flows matrix
mriot.Y : final demand
mriot.x : industry or total output
mriot.meta : metadata
secs_exp : pd.DataFrame
Exposure dataframe of each country/sector in the MRIOT. Columns are the
same as the chosen MRIOT.
secs_imp : pd.DataFrame
Impact dataframe for the directly affected countries/sectors for each event with
impacts. Columns are the same as the chosen MRIOT and rows are the hazard events ids.
secs_shock : pd.DataFrame
Shocks (i.e. impact / exposure) dataframe for the directly affected countries/sectors
for each event with impacts. Columns are the same as the chosen MRIOT and rows are the
hazard events ids.
inverse : dict
Dictionary with keys being the chosen approach (ghosh, leontief or eeioa)
and values the Leontief (L, if approach is leontief or eeioa) or Ghosh (G, if
approach is ghosh) inverse matrix.
coeffs : dict
Dictionary with keys being the chosen approach (ghosh, leontief or eeioa)
and values the Technical (A, if approach is leontief or eeioa) or allocation
(B, if approach is ghosh) coefficients matrix.
supchain_imp : dict
Dictionary with keys the chosen approach (ghosh, leontief or eeioa)
and values dataframes of indirect impacts to countries/sectors for each event
with direct impacts. For each dataframe, columns are the same as the chosen
MRIOT and rows are the hazard events ids.
"""
[docs]
def __init__(self, mriot):
"""Initialize SupplyChain.
Parameters
----------
mriot : pymrio.IOSystem
An object containing all MRIOT related info (see also pymrio package):
mriot.Z : transaction matrix, or interindustry flows matrix
mriot.Y : final demand
mriot.x : industry or total output
mriot.meta : metadata
"""
self.mriot = mriot
self.secs_exp = None
self.secs_imp = None
self.secs_shock = None
self.inverse = dict()
self.coeffs = dict()
self.supchain_imp = dict()
[docs]
@classmethod
def from_mriot(
cls, mriot_type, mriot_year, mriot_dir=MRIOT_DIRECTORY, del_downloads=True
):
"""Download, parse and read WIOD16, EXIOBASE3, or OECD21 Multi-Regional
Input-Output Tables.
Parameters
----------
mriot_type : str
Type of mriot table to use.
The three possible types are: 'EXIOBASE3', 'WIOD16', 'OECD21'
mriot_year : int
Year of MRIOT
mriot_dir : pathlib.PosixPath
Path to the MRIOT folder. Default is CLIMADA storing directory.
del_downloads : bool
If the downloaded files are deleted after saving the parsed data. Default is
True. WIOD16 and OECD21 data are downloaded as group of years.
Returns
-------
mriot : pymrio.IOSystem
An object containing all MRIOT related info (see also pymrio package):
mriot.Z : transaction matrix, or interindustry flows matrix
mriot.Y : final demand
mriot.x : total output
mriot.meta : metadata
"""
# download directory and file of interest
downloads_dir = mriot_dir / mriot_type / "downloads"
downloaded_file = downloads_dir / mriot_file_name(mriot_type, mriot_year)
# parsed data directory
parsed_data_dir = mriot_dir / mriot_type / str(mriot_year)
# if data were not downloaded nor parsed: download, parse and save parsed
if not downloaded_file.exists() and not parsed_data_dir.exists():
download_mriot(mriot_type, mriot_year, downloads_dir)
mriot = parse_mriot(mriot_type, downloaded_file)
mriot.save(parsed_data_dir)
if del_downloads:
for dwn in downloads_dir.iterdir():
dwn.unlink()
downloads_dir.rmdir()
# if data were downloaded but not parsed: parse and save parsed
elif downloaded_file.exists() and not parsed_data_dir.exists():
mriot = parse_mriot(mriot_type, downloaded_file)
mriot.save(parsed_data_dir)
# if data were parsed and saved: load them
else:
mriot = pymrio.load(path=parsed_data_dir)
# aggregate ROWs for EXIOBASE:
if mriot_type == 'EXIOBASE3':
agg_regions = mriot.get_regions().tolist()[:-5] + ['ROW']*5
mriot = mriot.aggregate(region_agg = agg_regions)
mriot.meta.change_meta(
"description", "Metadata for pymrio Multi Regional Input-Output Table"
)
mriot.meta.change_meta("name", f"{mriot_type}-{mriot_year}")
return cls(mriot=mriot)
[docs]
def calc_shock_to_sectors(self,
exposure,
impact,
impacted_secs=None,
shock_factor=None
):
"""Calculate exposure, impact and shock at the sectorial level.
This function translate spatially-distrubted exposure and impact
information into exposure and impact of MRIOT's country/sectors and
for each hazard event.
Parameters
----------
exposure : climada.entity.Exposure
CLIMADA Exposure object of direct impact calculation
impact : climada.engine.Impact
CLIMADA Impact object of direct impact calculation
impacted_secs : (range, np.ndarray, str, list)
Information regarding the impacted sectors. This can be provided
as positions of the impacted sectors in the MRIOT (as range or np.ndarray)
or as sector names (as string or list).
shock_factor : np.array
It has lenght equal to the number of sectors. For each sector, it defines to
what extent the fraction of indirect losses differs from the one of direct
losses (i.e., impact / exposure). Deafult value is None, which means that shock
factors for all sectors are equal to 1, i.e., that production and stock losses
fractions are the same.
"""
if impacted_secs is None:
warnings.warn(
"No impacted sectors were specified. It is assumed that the exposure is "
"representative of all sectors in the IO table"
)
impacted_secs = self.mriot.get_sectors().tolist()
elif isinstance(impacted_secs, (range, np.ndarray)):
impacted_secs = self.mriot.get_sectors()[impacted_secs].tolist()
elif isinstance(impacted_secs, str):
impacted_secs = [impacted_secs]
if shock_factor is None:
shock_factor = np.repeat(1, self.mriot.x.shape[0])
events_w_imp_bool = np.asarray(impact.imp_mat.sum(1)!=0).flatten()
self.secs_exp = pd.DataFrame(
0,
index=["total_value"],
columns=self.mriot.Z.columns
)
self.secs_imp = pd.DataFrame(
0,
index=impact.event_id[events_w_imp_bool],
columns=self.mriot.Z.columns
)
self.secs_imp.index = self.secs_imp.index.set_names('event_id')
mriot_type = self.mriot.meta.name.split("-")[0]
for exp_regid in exposure.gdf.region_id.unique():
exp_bool = exposure.gdf.region_id == exp_regid
tot_value_reg_id = exposure.gdf[exp_bool].value.sum()
tot_imp_reg_id = impact.imp_mat[events_w_imp_bool][:,exp_bool].sum(1)
mriot_reg_name = self.map_exp_to_mriot(exp_regid, mriot_type)
secs_prod = self.mriot.x.loc[(mriot_reg_name, impacted_secs), :]
secs_prod_ratio = (secs_prod / secs_prod.sum()).values.flatten()
# Overall sectorial stock exposure and impact are distributed among
# subsectors proportionally to their their own contribution to overall
# sectorial production: Sum needed below in case of many ROWs, which are
# aggregated into one country as per WIOD table.
self.secs_exp.loc[:, (mriot_reg_name, impacted_secs)] += (
tot_value_reg_id * secs_prod_ratio
)
self.secs_imp.loc[:, (mriot_reg_name, impacted_secs)] += (
tot_imp_reg_id * secs_prod_ratio
)
self.secs_shock = self.secs_imp.divide(
self.secs_exp.values
).fillna(0) * shock_factor
if not np.all(self.secs_shock <= 1):
warnings.warn(
"Consider changing the provided provided stock-to-production losses "
"ratios, as some of them lead to some sectors' production losses to "
"exceed the maximum sectorial production. For these sectors, total "
"production loss is assumed."
)
self.secs_shock[self.secs_shock > 1] = 1
[docs]
def calc_matrices(self, io_approach):
"""Build technical coefficient and Leontief inverse matrixes
(if leontief or eeioa approach) or allocation coefficients and
Ghosh matrixes (if ghosh approach).
Parameters
----------
io_approach : str
The adopted input-output modeling approach.
Possible choices are 'leontief', 'ghosh' and 'eeioa'.
"""
io_model = {
"leontief": (pymrio.calc_A, pymrio.calc_L),
"eeioa": (pymrio.calc_A, pymrio.calc_L),
"ghosh": (calc_B, calc_G),
}
coeff_func, inv_func = io_model[io_approach]
self.coeffs.update({io_approach: coeff_func(self.mriot.Z, self.mriot.x)})
self.inverse.update({io_approach: inv_func(self.coeffs[io_approach])})
[docs]
def calc_impacts(self,
io_approach,
exposure=None,
impact=None,
impacted_secs=None):
"""Calculate indirect production impacts based on to the
chosen input-output approach.
Parameters
----------
io_approach : str
The adopted input-output modeling approach.
Possible choices are 'leontief', 'ghosh' and 'eeioa'.
exposure : climada.entity.Exposure
CLIMADA Exposure object of direct impact calculation. Default is None.
impact : climada.engine.Impact
CLIMADA Impact object of direct impact calculation. Default is None.
impacted_secs : (range, np.ndarray, str, list)
Information regarding the impacted sectors. This can be provided
as positions of the impacted sectors in the MRIOT (as range or np.ndarray)
or as sector names (as string or list). Default is None.
shock_factor : np.array
It has lenght equal to the number of sectors. For each sector, it defines to
what extent the fraction of indirect losses differs from the one of direct
losses (i.e., impact / exposure). Deafult value is None, which means that shock
factors for all sectors are equal to 1, i.e., that production and stock losses
fractions are the same.
References
----------
[1] W. W. Leontief, Output, employment, consumption, and investment,
The Quarterly Journal of Economics 58, 1944.
[2] Ghosh, A., Input-Output Approach in an Allocation System,
Economica, New Series, 25, no. 97: 58-64. doi:10.2307/2550694, 1958.
[3] Kitzes, J., An Introduction to Environmentally-Extended Input-Output
Analysis, Resources, 2, 489-503; doi:10.3390/resources2040489, 2013.
"""
self.calc_matrices(io_approach=io_approach)
if self.secs_shock is None:
self.calc_shock_to_sectors(exposure, impact, impacted_secs)
n_events = self.secs_shock.shape[0]
if io_approach == "leontief":
degr_demand = (
self.secs_shock * self.mriot.Y.sum(1)
)
self.supchain_imp.update({io_approach : pd.concat(
[
pymrio.calc_x_from_L(self.inverse[io_approach], degr_demand.iloc[i])
for i in range(n_events)
],
axis=1,
).T.set_index(self.secs_shock.index)})
elif io_approach == "ghosh":
value_added = calc_v(self.mriot.Z, self.mriot.x)
degr_value_added = (
self.secs_shock * value_added.values
)
self.supchain_imp.update({io_approach : pd.concat(
[
calc_x_from_G(self.inverse[io_approach], degr_value_added.iloc[i])
for i in range(n_events)
],
axis=1,
).T.set_index(self.secs_shock.index)})
elif io_approach == "eeioa":
self.supchain_imp.update({io_approach : (
pd.DataFrame(
self.secs_shock.dot(self.inverse[io_approach])
* self.mriot.x.values.flatten()
)
)})
else:
raise RuntimeError(f"Unknown io_approach: {io_approach}")
[docs]
def map_exp_to_mriot(self, exp_regid, mriot_type):
"""
Map regions names in exposure into Input-Output regions names.
exp_regid must follow ISO 3166 numeric country codes.
"""
if mriot_type == "EXIOBASE3":
mriot_reg_name = u_coord.country_to_iso(exp_regid, "alpha2")
elif mriot_type in ["WIOD16", "OECD21"]:
mriot_reg_name = u_coord.country_to_iso(exp_regid, "alpha3")
else:
warnings.warn(
"For a correct calculation the format of regions' names in exposure and "
"the IO table must match."
)
return exp_regid
idx_country = np.where(self.mriot.get_regions() == mriot_reg_name)[0]
if not idx_country.size > 0.0:
mriot_reg_name = "ROW"
return mriot_reg_name