"""
This file is part of CLIMADA.
Copyright (C) 2017 ETH Zurich, CLIMADA contributors listed in AUTHORS.
CLIMADA is free software: you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free
Software Foundation, version 3.
CLIMADA is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.
You should have received a copy of the GNU General Public License along
with CLIMADA. If not, see <https://www.gnu.org/licenses/>.
---
Define the main supply chain classes: DirectShocksSet and models classes.
"""
from __future__ import (
annotations,
)
from abc import (
ABC,
)
from climada_petals.engine.supplychain.mriot_handling import lexico_reindex
__all__ = ["DirectShocksSet", "IndirectCostModel", "BoARIOModel"]
import logging
from typing import Any, Dict, Iterable, Literal, Sequence
import warnings
import pandas as pd
import numpy as np
import pymrio
from boario.extended_models import ARIOPsiModel
from boario.simulation import Simulation
from boario import event as boario_event_module
from climada import CONFIG
from climada.engine import Impact
from climada.entity import Exposures
from climada_petals.engine.supplychain.utils import (
distribute_reg_impact_to_sectors,
calc_va,
translate_exp_to_regions,
translate_exp_to_sectors,
translate_imp_to_regions,
_thin_to_wide,
)
LOGGER = logging.getLogger(__name__)
MRIOT_DIRECTORY = CONFIG.engine.supplychain.local_data.mriot.dir()
"""Directory where Multi-Regional Input-Output Tables (MRIOT) are downloaded."""
VA_NAME = "value added"
"""Index name for value added"""
SIMULATION_LENGTH_BUFFER = 365
"""Default buffer for BoARIO simulation length (in days)"""
[docs]
class DirectShocksSet:
"""DirectShocksSet class
The DirectShocksSet class provides methods for 'translating' Impact and Exposure objects into
economic shocks, corresponding to a specific MRIOT table, from which indirect economic
costs can be computed.
Attributes
----------
mriot_name: str
The name of the MRIOT defining the typology of assets.
mriot_sectors: pd.Index
The list of possible sectors.
mriot_regions: pd.Index
The list of possible regions.
name: str, default "unnamed"
A name to identify the object. For convenience only.
monetary_factor: int
The monetary factor of the correponding MRIOT.
exposure_assets: pd.Series
Exposure translated in the region/sector typology of the MRIOT. The index of the Series
are the possible (region, sector).
impacted_assets: pd.DataFrame
Impact translated in the region/sector typology of the MRIOT. The columns index are the possible (region, sector),
and the row index correspond to event_ids of the Impact.
event_dates: pd.Series
Series of the dates of the events, as ordinals, with event ids as index.
"""
[docs]
def __init__(
self,
mriot_name: str,
mriot_sectors: pd.Index,
mriot_regions: pd.Index,
exposure_assets: pd.Series,
impacted_assets: pd.DataFrame,
event_dates: np.ndarray,
monetary_factor: int,
shock_name: str = "unnamed",
) -> None:
# Sanity checks
if not isinstance(exposure_assets, pd.Series):
raise ValueError("Exposure assets must be a pandas Series")
self.mriot_sectors = mriot_sectors
self.mriot_regions = mriot_regions
self.mriot_industries = pd.MultiIndex.from_product(
[self.mriot_regions, self.mriot_sectors], names=["region", "sector"]
)
if not (
not_present := exposure_assets.index.difference(self.mriot_industries)
).empty:
raise ValueError(
f"Exposure assets indices do not match MRIOT industries.\nNot matching: {not_present}"
)
if not isinstance(impacted_assets, pd.DataFrame):
raise ValueError("Impacted assets must be a pandas DataFrame")
if not (
not_present := impacted_assets.columns.difference(self.mriot_industries)
).empty:
raise ValueError(
f"Impacted assets columns do not match MRIOT industries.\nNot matching: {not_present}"
)
self.event_ids = impacted_assets.index
if not event_dates.size == self.event_ids.size:
raise ValueError(
f"Number of events mismatch between dates ({event_dates.size} events) and impacted assets ({self.event_ids.size} events)"
)
if not (
not_present := impacted_assets.columns.difference(exposure_assets.index)
).empty:
warnings.warn(
f"Some impacted assets do not have a corresponding exposure value ({not_present}). The impact will not be considered."
)
self.mriot_name = mriot_name
self.monetary_factor = monetary_factor
self.name = shock_name
self.exposure_assets = _thin_to_wide(exposure_assets, self.mriot_industries)
self.exposure_assets.sort_index(inplace=True)
self.impacted_assets = _thin_to_wide(
impacted_assets, self.event_ids, self.mriot_industries
)
self.impacted_assets = self.impacted_assets.reindex(
sorted(self.impacted_assets.columns), axis=1
)
self.event_dates = pd.Series(event_dates, index=self.event_ids)
@classmethod
def _init_with_mriot(
cls,
mriot: pymrio.IOSystem,
exposure_assets: pd.Series,
impacted_assets: pd.DataFrame,
event_dates: np.ndarray,
shock_name: str,
):
mriot = lexico_reindex(mriot)
return cls(
mriot.name,
mriot.get_sectors(),
mriot.get_regions(),
exposure_assets,
impacted_assets,
event_dates,
mriot.monetary_factor,
shock_name,
)
[docs]
@classmethod
def from_exp_and_imp(
cls,
mriot: pymrio.IOSystem,
exposure: Exposures,
impact: Impact,
affected_sectors: Iterable[str] | dict[str, float] | pd.Series | Literal["all"],
impact_distribution: dict[str, float] | pd.Series | None,
shock_name: str | None = None,
exp_value_col: str = "value",
custom_mriot: bool = False,
):
"""Build a DirectShocksSet from MRIOT, Exposure and Impact objects.
This method translates both the given Exposure and Impact objects to the MRIOT typology.pd.Series
First, it aggregates exposure values by `region_id` and then maps them to the
regions of the given mriot. Assets are then distributed to the
different sectors specified by `affected_sectors`.
It proceeds with a similar process for the Impact object, using `impact_distribution` to
distribute the impact across sectors.
Parameters
----------
mriot : pymrio.IOSystem
The MRIOT to use for the typology of region and sectors.
exposure : Exposures
The Exposures object to use to derive total assets from.
impact : Impact
The Impact object to derive the impact on assets from.
shock_name : str | None
An optional name to identify the object, defaults to "unnamed".
affected_sectors : Iterable[str] | dict[str, float] | pd.Series | Literal["all"]
The sectors of the MRIOT that are impacted. If given as a
collection of string, or `"all"`, then the total assets of the
region are distributed proportionally to each sectors gross output.
A dictionary `sector:share` can also be passed to specify which
share of the total regional assets should be distributed to each
sector.
impact_distribution : dict[str, float] | pd.Series, optional
This argument specify how the impact per region should be
distributed to the impacted sectors. Using `None` will distribute
proportionally to each sectors gross output in the MRIOT. A
dictionary in the form `sector:share` or similarly a `Series` can
be used to specify a custom distribution.
exp_value_col : str
The name of the column of the Exposure data representing the value
of assets in each centroids.
custom_mriot : bool
Whether to consider the MRIOT as a custom one (skips name checking), defaults to False.
Note that its regions has to be ISO3 countries name for it to work.
"""
mriot = lexico_reindex(mriot)
exp = translate_exp_to_regions(
exposure, mriot_name=mriot.name, custom_mriot=custom_mriot
)
exposure_assets = translate_exp_to_sectors(
exp, affected_sectors=affected_sectors, mriot=mriot, value_col=exp_value_col
)
return cls.from_assets_and_imp(
mriot=mriot,
exposure_assets=exposure_assets,
impact=impact,
shock_name=shock_name,
affected_sectors=affected_sectors,
impact_distribution=impact_distribution,
custom_mriot=custom_mriot,
)
[docs]
@classmethod
def from_assets_and_imp(
cls,
mriot: pymrio.IOSystem,
exposure_assets: pd.Series,
impact: Impact,
shock_name: str | None,
affected_sectors: Iterable[str] | dict[str, float] | pd.Series | Literal["all"],
impact_distribution: dict[str, float] | pd.Series | None,
custom_mriot: bool = False,
):
"""Build a DirectShocksSetfrom an MRIOT, assets Series, and Impact objects.
This method translates the given Impact object to the MRIOT typology
(see :py:meth:`from_exp_and_imp`).
Parameters
----------
mriot : pymrio.IOSystem
The MRIOT to use for the typology of region and sectors.
exposure_assets : pd.Series
A pandas `Series` with (region,sector) as index and assets value.
impact : Impact
The Impact object to derive the impact on assets from.
shock_name : str or None
An optional name to identify the object.
affected_sectors : Iterable[str] | dict[str, float] | pd.Series | Literal["all"]
The sectors of the MRIOT that are impacted. If given as a
collection of string, or `"all"`, then the total assets of the
region are distributed proportionally to each sectors gross output.
A dictionary `sector:share` can also be passed to specify which
share of the total regional assets should be distributed to each
sector.
impact_distribution : dict[str, float] | pd.Series, optional
This argument specify how the impact per region should be
distributed to the impacted sectors. Using `None` will distribute
proportionally to each sectors gross output in the MRIOT. A
dictionary in the form `sector:share` or similarly a `Series` can
be used to specify a custom distribution.
custom_mriot : bool
Whether to consider the MRIOT as a custom one (skips name checking), defaults to False.
Note that its regions has to be ISO3 countries name for it to work.
"""
mriot = lexico_reindex(mriot)
event_dates = impact.date
# get regional impact in MRIOT format
impacted_assets = impact.impact_at_reg()
impacted_assets = impacted_assets.loc[:, (impacted_assets != 0).any()]
impacted_assets = translate_imp_to_regions(
impacted_assets, mriot, custom_mriot=custom_mriot
)
# Setup distribution toward sectors
if isinstance(affected_sectors, str) and affected_sectors == "all":
affected_sectors = list(mriot.get_sectors()) # type: ignore
if impact_distribution is None:
# Default uses production distribution across sectors, region.
impact_distribution = mriot.x.loc[
pd.IndexSlice[impacted_assets.columns, affected_sectors],
mriot.x.columns[0],
]
if isinstance(impact_distribution, dict):
impact_distribution = pd.Series(impact_distribution)
if not isinstance(impact_distribution, pd.Series):
raise ValueError(f"Impact_distribution could not be converted to a Series")
impact_distribution = impact_distribution.sort_index()
impacted_assets = distribute_reg_impact_to_sectors(
impacted_assets, impact_distribution
)
impacted_assets.rename_axis(index="event_id", inplace=True)
# Call constructor
return cls._init_with_mriot(
mriot=mriot,
exposure_assets=exposure_assets,
impacted_assets=impacted_assets,
event_dates=event_dates,
shock_name=shock_name,
)
@property
def event_ids_with_impact(self) -> pd.Index:
"""
Get the IDs of events that have a non-zero impact on assets.
This property filters the `impacted_assets` DataFrame to retrieve the indices
of rows where at least one asset has a non-zero impact.
Returns
-------
pandas.Index
Index of events with at least one impacted asset.
"""
return self.impacted_assets.loc[self.impacted_assets.ne(0).any(axis=1)].index
@property
def impacted_assets_not_null(self) -> pd.DataFrame:
"""
Get a subset of the `impacted_assets` DataFrame containing only non-null impacts.
This property filters the `impacted_assets` DataFrame to include only rows and
columns where at least one non-zero impact exists.
Returns
-------
pandas.DataFrame
A DataFrame containing the impacted assets with non-zero values.
"""
return self.impacted_assets.loc[
self.impacted_assets.ne(0).any(axis=1),
self.impacted_assets.ne(0).any(axis=0),
]
@property
def exposure_assets_not_null(self) -> pd.Series:
"""
Get a subset of the `exposure_assets` DataFrame containing only non-null exposure.
This property filters the `exposure_assets` DataFrame to include only rows with non-zero values.
Returns
-------
pandas.Series
A Series containing the impacted assets with non-zero values.
"""
return self.exposure_assets.loc[self.exposure_assets.ne(0)]
@property
def relative_impact(self) -> pd.DataFrame:
"""The ratio of impacted assets over total assets (0. if total assets are 0.)."""
return (self.impacted_assets / self.exposure_assets).fillna(0.0).replace(
[np.inf, -np.inf], 0
) * (self.exposure_assets > 0)
@property
def relative_impact_not_null(self) -> pd.DataFrame:
"""
Get a subset of the `relative_impact` DataFrame containing only non-null values.
This property filters the `relative_impact` DataFrame to include only rows and
columns where at least one non-zero impact exists.
Returns
-------
pandas.DataFrame
A DataFrame containing the impacted assets with non-zero values.
"""
return self.relative_impact.loc[
self.relative_impact.ne(0).any(axis=1),
self.relative_impact.ne(0).any(axis=0),
]
@staticmethod
def _check_mriot_name(mriot_names: Sequence[str]) -> bool:
"""Checks that a list of MRIOT names are all identical"""
return all(name == mriot_names[0] for name in mriot_names)
@staticmethod
def _combine_event_dates(event_dates: list[pd.Series]) -> pd.Series:
"""Combines multiple series of event dates into one"""
combined_event_dates = pd.concat(event_dates)
combined_event_dates = combined_event_dates.groupby(level=0).transform(
lambda x: x.iloc[0] if x.nunique() == 1 else "conflicting value"
)
if "conflicting value" in combined_event_dates.values:
raise ValueError(
f"Conflicting values for event dates for events: {combined_event_dates[combined_event_dates=='conflicting value'].index.unique()}"
)
combined_event_dates = combined_event_dates.groupby(level=0).first()
return combined_event_dates
@staticmethod
def _combine_exp_assets(assets: Sequence[pd.Series]) -> pd.Series:
"""Combines exposure assets together"""
if not all(asset.index.equals(assets[0].index) for asset in assets):
raise ValueError("Mismatching indexes amongst the different assets.")
assets_no0 = [asset.loc[asset > 0.0] for asset in assets]
concat_assets = pd.concat(assets_no0, join="outer")
duplicated_idx = concat_assets.index[concat_assets.index.duplicated()]
duplicated_values = concat_assets.loc[duplicated_idx]
# This checks that all exposure values related to one (region, sector) are the same.
# Thus multiple different exposure for different sub-sectors belonging to the same
# MRIOT sector is not possible.
if not duplicated_values.groupby(duplicated_values.index).nunique().eq(1).all():
raise ValueError("Conflicting values amongst the different assets.")
else:
return _thin_to_wide(
concat_assets.groupby(level=[0, 1]).first(), assets[0].index
)
@staticmethod
def _merge_imp_assets(imp_assets: Sequence[pd.DataFrame]):
"""Merges impacted assets together (summing impacts on same industries)."""
merged_imp_assets = pd.concat(
imp_assets,
join="outer",
sort=True,
axis=1,
).fillna(0)
merged_imp_assets = merged_imp_assets.groupby(merged_imp_assets.index).sum()
merged_imp_assets = (
merged_imp_assets.T.groupby(merged_imp_assets.columns).sum().T
)
return merged_imp_assets
[docs]
@classmethod
def combine(
cls,
direct_shocks: list[DirectShocksSet],
kind: Literal["merge", "concat"] = "merge",
combine_name: str = "unnamed_combine",
):
"""
Combine multiple `DirectShocksSet` instances into a single instance.
This method supports two modes of combination: merging and concatenating.
- In "merge" mode, the direct shocks are summed together for each matching event
and per region/sector.
- In "concat" mode, the direct shocks are assumed to originate from
independent events and are concatenated along the event axis.
Parameters
----------
direct_shocks : list of DirectShocksSet
A list of `DirectShocksSet` instances to be combined.
kind : {"merge", "concat"}, optional (Default: "merge")
The type of combination to apply.
- "merge": Sum the shocks across all events.
- "concat": Concatenate shocks assuming independent events.
combine_name : str, optional
Name for the combined `DirectShocksSet`. Default is "unnamed_combine".
Returns
-------
DirectShocksSet
A new `DirectShocksSet` instance representing the combination of the input shocks.
Raises
------
ValueError
If the `DirectShocksSet` instances do not share the same `mriot_name`.
Notes
-----
- All `DirectShocksSet` instances must share the same `mriot_name`.
- Merging sums the shocks and combines the corresponding impacted and exposed assets.
- Concatenating assumes independent events and stacks the shocks along the event axis.
Examples
--------
Assuming:
- `mriot` is an IOSystem object representing a MRIOT with (at least) Agriculture and Manufacture sectors.
- `exp_agri` contains exposure data for the Agriculture sector
- `exp_manu` contains exposure data for the for the Manufacture sector.
- `imp_TC_agri` is an Impact object with impacts on the agriculture sector from different tropical cyclones
- `imp_TC_manu` is an Impact object with impacts on the manufacture sector from different tropical cyclones
- `imp_FL_agri` is an Impact object with impacts on the manufacture sector from different floods on the agriculture sector
- `imp_TC_surges_manu` is an Impact object with impacts from surges that occurred during the same tropical cyclones as defined in `imp_TC_manu`
>>> shocks_TC_agri = DirectShocksSet.from_assets_and_imp(mriot=mriot, exposure=exp_agri, impact=imp_TC_agri, affected_sectors=["Agriculture"])
>>> shocks_TC_manu = DirectShocksSet.from_assets_and_imp(mriot=mriot, exposure=exp_manu, impact=imp_TC_manu, affected_sectors=["Manufacture"])
>>> shocks_FL_agri = DirectShocksSet.from_assets_and_imp(mriot=mriot, exposure=exp_agri, impact=imp_FL_agri, affected_sectors=["Agriculture"])
>>> shocks_TC_surges_manu = DirectShocksSet.from_assets_and_imp(mriot=mriot, exposure=exp_manu, impact=imp_TC_surges_agri, affected_sectors=["Manufacture"])
Assume you want to compute the impacts from TCs on both Agriculture and Manufacture sectors, you can "merge" `shocks_TC_agri` and `shocks_TC_manu` for that purpose:
>>> combined_set = DirectShocksSet.combine([shock_TC_agri, shock_TC_manu], kind="merge")
If you want to evaluate the perturbations from TCs and floods on the Agriculture sectors, you can "concat" `shocks_TC_agri` and `shocks_FL_agri` for that purpose:
>>> combined_set = DirectShocksSet.combine([shocks_TC_agri, shocks_FL_agri], kind="concat", combine_name="impact from floods and TCs")
Finally if you want to look at the shocks of "co-occurence" of surges and TCs for which the impacts were computed independently, you can merge `shocks_TC_manu` and `shocks_TC_surge_manu`.
Of course you can use more that one sectors at once per individual DirectShocksSet. The purpose of this `combine` method is to allow flexibility with employing different exposure layers and impact computation.
"""
if not len(direct_shocks) > 1:
return direct_shocks[0]
assets = [shock.exposure_assets for shock in direct_shocks]
combined_exp_assets = cls._combine_exp_assets(assets)
imp_assets = [shock.impacted_assets for shock in direct_shocks]
mriot_names = [shock.mriot_name for shock in direct_shocks]
if not DirectShocksSet._check_mriot_name(mriot_names):
raise ValueError("DirectShocksSetdo not all have the same mriot_name.")
if kind == "merge":
event_dates = cls._combine_event_dates(
[shock.event_dates for shock in direct_shocks]
)
LOGGER.info(
"Merging direct shocks together. The resulting direct shock per event and per each region,sector will be the sum of the different direct shocks."
)
merged_imp_assets = cls._merge_imp_assets(imp_assets)
return cls(
direct_shocks[0].mriot_name,
direct_shocks[0].mriot_sectors,
direct_shocks[0].mriot_regions,
combined_exp_assets,
merged_imp_assets,
event_dates,
direct_shocks[0].monetary_factor,
combine_name,
)
if kind == "concat":
LOGGER.info(
"Concatenating direct shocks. This assume the different direct shocks are from independent events."
)
event_dates = pd.concat([shock.event_dates for shock in direct_shocks])
concatenated_imp_assets = pd.concat(
imp_assets,
axis=0,
sort=True,
).fillna(0)
return cls(
direct_shocks[0].mriot_name,
direct_shocks[0].mriot_sectors,
direct_shocks[0].mriot_regions,
direct_shocks[0].exposure_assets,
concatenated_imp_assets,
event_dates,
direct_shocks[0].monetary_factor,
combine_name,
)
[docs]
class IndirectCostModel(ABC):
"""
Abstract base class for modeling indirect costs using a Multi-Regional Input-Output Table (MRIOT).
This class provides a structure for incorporating direct shocks into an indirect cost model.
Parameters
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
direct_shocks : DirectShocksSet or None, optional
The direct shocks to apply to the model. If provided, the model will
initialize with these shocks. Default is None.
Attributes
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
direct_shocks : DirectShocksSet or None
The direct shocks applied to the model. None if no shocks have been applied.
"""
[docs]
def __init__(
self,
mriot: pymrio.IOSystem,
direct_shocks: DirectShocksSet | None = None,
) -> None:
self.mriot = mriot
self.direct_shocks = None
if direct_shocks is not None:
self.shock_model_with(direct_shocks)
[docs]
def shock_model_with(
self,
direct_shocks: DirectShocksSet | list[DirectShocksSet],
combine_mode: Literal["concat"] | Literal["merge"] = "concat",
):
"""
Apply direct shocks to the model, combining them if multiple shocks are provided.
Parameters
----------
direct_shocks : DirectShocksSet or list of DirectShocksSet
The direct shocks to apply to the model. If a list is provided, the shocks
will be combined based on the `combine_mode`.
combine_mode : {"concat", "merge"}, optional
The method for combining multiple shocks. Default is "concat".
- "concat": Concatenate shocks assuming independent events.
- "merge": Merge shocks by summing their impacts for matching (event,region,sector).
Raises
------
ValueError
If the MRIOT name of the provided shocks does not match the model's MRIOT name.
Notes
-----
- The `combine_mode` is only applicable if `direct_shocks` is provided as a list.
- The shocks must correspond to the same MRIOT used in the model.
"""
if isinstance(direct_shocks, list):
direct_shocks = DirectShocksSet.combine(direct_shocks, combine_mode)
if self.mriot.name != direct_shocks.mriot_name:
raise ValueError(
f"""Cannot shock model with a shock from a different MRIOT:
Model {self.mriot.name} != Shock {direct_shocks.mriot_name}"""
)
self.direct_shocks = direct_shocks
[docs]
class StaticIOModel(IndirectCostModel):
"""
Static Input-Output Model for analyzing indirect economic impacts.
Extends and makes concrete the IndirectCostModel abstract class by providing methods to calculate
degraded economic metrics and indirect impacts using Leontief and Ghosh models.
Parameters
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
direct_shocks : DirectShocksSet or None, optional
The direct shocks to apply to the model. Default is None.
Attributes
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
direct_shocks : DirectShocksSet or None
The direct shocks applied to the model. None if no shocks have been applied.
"""
[docs]
def __init__(
self,
mriot: pymrio.IOSystem,
direct_shocks: DirectShocksSet | None = None,
) -> None:
super().__init__(mriot, direct_shocks)
@property
def value_added(self) -> pd.Series:
"""
Calculates and returns the value added from the MRIOT.
Returns
-------
pd.Series
A series representing the value added by sector.
"""
return calc_va(self.mriot.Z, self.mriot.x).loc[VA_NAME]
@property
def final_demand(self) -> pd.Series:
"""
Retrieves the (total) final demand addressed to
each sectors from the MRIOT.
Returns
-------
pd.Series
A series representing the final demand by sector.
"""
return self.mriot.Y.sum(1)
@property
def degraded_value_added(self) -> pd.DataFrame | pd.Series:
"""
Calculates and returns the degraded value added considering
the direct shocks.
Returns
-------
pd.Series
A series representing the degraded value added by sector.
"""
if self.direct_shocks:
return -self.direct_shocks.relative_impact * self.value_added
else:
LOGGER.warning("The model currently is not shocked. Returning value added.")
return self.value_added
@property
def degraded_final_demand(self) -> pd.DataFrame | pd.Series:
"""
Calculates and returns the degraded final demand considering
the direct shocks.
Returns
-------
pd.Series
A series representing the degraded final demand by sector.
"""
if self.direct_shocks:
return -self.direct_shocks.relative_impact * self.final_demand
else:
LOGGER.warning(
"The model currently is not shocked. Returning final demand."
)
return self.final_demand
[docs]
def calc_leontief(
self, event_ids: list[int] | pd.Index | None = None
) -> pd.DataFrame:
"""
Computes indirect impacts using the Leontief model.
Parameters
----------
event_ids : list[int] or pd.Index or None, optional
A list of event IDs to calculate impacts for. If None, all events are used.
Returns
-------
pd.DataFrame
A DataFrame of indirect outputs for the specified events.
"""
if self.direct_shocks:
if event_ids is None:
event_ids = self.direct_shocks.event_ids
res_leontief = [
(
pymrio.calc_x_from_L(
self.mriot.L, self.degraded_final_demand.loc[event_id]
)
)[self.mriot.x.columns[0]]
for event_id in event_ids
]
return pd.DataFrame(res_leontief, index=event_ids)
else:
LOGGER.warning(
"The model currently is not shocked. Returning empty DataFrame."
)
return pd.DataFrame()
[docs]
def calc_ghosh(self, event_ids: list[int] | pd.Index | None = None) -> pd.DataFrame:
"""
Computes indirect impacts using the Ghosh model.
Parameters
----------
event_ids : list[int] or pd.Index or None, optional
A list of event IDs to calculate impacts for. If None, all events are used.
Returns
-------
pd.DataFrame
A DataFrame of indirect outputs for the specified events.
"""
if self.direct_shocks:
if event_ids is None:
event_ids = self.direct_shocks.event_ids
res_ghosh = [
(self.degraded_value_added.loc[event_id].dot(self.mriot.G))
for event_id in event_ids
]
return pd.DataFrame(res_ghosh, index=event_ids)
else:
LOGGER.warning(
"The model currently is not shocked. Returning empty DataFrame."
)
return pd.DataFrame()
[docs]
def calc_indirect_impacts(
self,
event_ids: list[int] | pd.Index | Literal["with_impact"] | None = "with_impact",
) -> pd.DataFrame | None:
"""
Calculates detailed indirect impacts using both Leontief and Ghosh models.
Parameters
----------
event_ids : "with_impact" or list[int] or pd.Index or None, default "with_impact"
A list of event IDs to calculate impacts for. Only events with non null impacts by default. If None, all events are used.
Returns
-------
pd.DataFrame or None
A DataFrame containing indirect impacts with various metrics and methods,
or None if no shocks are defined for the model.
"""
def create_df_metrics(event_ids, method, indout, abs_shock):
if event_ids == "with_impact":
event_ids = self.direct_shocks.event_ids_with_impact
if method == "leontief":
df = self.calc_leontief(event_ids)
elif method == "ghosh":
df = self.calc_ghosh(event_ids)
else:
raise ValueError(f"Unrecognized methods: {method}")
if event_ids is not None:
abs_shock = abs_shock.loc[event_ids]
# Create absolute production change dataframe
df_abs = df.melt(ignore_index=False)
df_abs["metric"] = "absolute production change"
df_abs["method"] = method
# Create relative production change dataframe (filtering null production)
df_rel = ((df / indout) * (indout > 0.0)).melt(ignore_index=False)
df_rel["metric"] = "relative production change"
df_rel["method"] = method
# Create relative to total direct shock dataframe
df_rel_shock = (
(df.div(abs_shock.sum(axis=1), axis=0))
.melt(ignore_index=False)
.fillna(0.0)
.replace([np.inf, -np.inf], 0)
)
df_rel_shock["metric"] = "production lost to total shock ratio"
df_rel_shock["method"] = method
# Create relative to sector direct shock dataframe
df_rel_sec_shock = (
(df / abs_shock)
.melt(ignore_index=False)
.fillna(0.0)
.replace([np.inf, -np.inf], 0)
)
df_rel_sec_shock["metric"] = "production lost to sector shock ratio"
df_rel_sec_shock["method"] = method
return df_rel, df_abs, df_rel_shock, df_rel_sec_shock
if self.direct_shocks is None:
LOGGER.warning("The model has no shock defined, returning None.")
return None
df_leontief_rel, df_leontief_abs, df_leontief_shock, df_leontief_sec_shock = (
create_df_metrics(
event_ids,
"leontief",
self.mriot.x.iloc[:, 0],
self.direct_shocks.impacted_assets,
)
)
df_ghosh_rel, df_ghosh_abs, df_ghosh_shock, df_ghosh_sec_shock = (
create_df_metrics(
event_ids,
"ghosh",
self.mriot.x.iloc[:, 0],
self.direct_shocks.impacted_assets,
)
)
res = pd.concat(
[
df
for df in [
df_leontief_abs,
df_leontief_rel,
df_leontief_shock,
df_leontief_sec_shock,
df_ghosh_abs,
df_ghosh_rel,
df_ghosh_shock,
df_ghosh_sec_shock,
]
],
axis=0,
)
res = res.reset_index().sort_values(
["event_id", "region", "sector", "method", "metric"]
)[["event_id", "region", "sector", "method", "metric", "value"]]
res.index = res.index.sort_values()
return res
[docs]
class BoARIOModel(IndirectCostModel):
"""
BoARIO Model for simulating the economic impacts of shocks using the ARIO model.
Extends and makes concrete the IndirectCostModel abstract class
by integrating the ARIOPsiModel and Simulation classes
to simulate the effects of direct shocks over time, with customizable event parameters.
Parameters
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
direct_shocks : DirectShocksSet
The direct shocks to apply to the model.
model_kwargs : dict, optional
A dictionary of model parameters to be passed to the ARIOPsiModel.
simulation_kwargs : dict, optional
A dictionary of simulation parameters to be passed to the Simulation class.
event_kwargs : dict, optional
A dictionary of event-specific parameters (e.g., recovery time) to be passed
to the event creation logic.
Attributes
----------
mriot : pymrio.IOSystem
The Multi-Regional Input-Output System used in the model.
model : ARIOPsiModel
The ARIO model used to represent the economy.
sim : Simulation
The simulation object used to run the model.
direct_shocks : DirectShocksSet or None
The direct shocks applied to the model.
Notes
-----
We highly recommend users to go through `BoARIO's documentation <https://spjuhel.github.io/BoARIO>`
"""
[docs]
def __init__(
self,
mriot: pymrio.IOSystem,
direct_shocks: DirectShocksSet,
model_kwargs: Dict[str, Any] | None = None,
simulation_kwargs: Dict[str, Any] | None = None,
event_kwargs: Dict[str, Any] | None = None,
) -> None:
self.mriot = mriot
# Set the different parameters from defaults and user given.
default_event_params = {"recovery_tau": 180, "rebuild_tau": 180}
event_params = {**default_event_params,**(event_kwargs or {})}
default_model_params = {}
model_params = {**default_model_params,**(model_kwargs or {})}
default_sim_params = {}
sim_params = {**default_sim_params,**(simulation_kwargs or {})}
# Instantiate the BoARIO objects
model = ARIOPsiModel(
self.mriot,
productive_capital_vector=direct_shocks.exposure_assets,
**model_params,
)
self.sim = Simulation(
model,
n_temporal_units_to_sim=int(
direct_shocks.event_dates.max()
- direct_shocks.event_dates.min()
+ SIMULATION_LENGTH_BUFFER
),
**sim_params,
)
self.shock_model_with(direct_shocks, event_kwargs=event_params)
[docs]
def shock_model_with(
self,
direct_shocks: DirectShocksSet | list[DirectShocksSet],
combine_mode: Literal["concat"] | Literal["merge"] = "concat",
event_kwargs: Dict[str, Any] | None = None,
):
"""
Apply direct shocks to the model, adding events for recovery if necessary.
Parameters
----------
direct_shocks : DirectShocksSet or list of DirectShocksSet
The direct shocks to apply to the model. If a list is provided, the shocks
will be combined based on the `combine_mode`.
combine_mode : {"concat", "merge"}, optional
The method for combining multiple shocks. Default is "concat".
- "concat": Concatenate shocks assuming independent events.
- "merge": Merge shocks by summing their impacts.
event_kwargs : dict, optional
A dictionary of event-specific parameters to be passed to event creation.
See BoARIO documentation on `Events <https://spjuhel.github.io/BoARIO/tutorials/boario-events.html>`
"""
default_boario_event_params = {"recovery_tau": 180, "rebuild_tau": 180}
event_params = {**default_boario_event_params, **(event_kwargs or {})}
if direct_shocks is not None:
super().shock_model_with(direct_shocks, combine_mode)
if self.direct_shocks is not None:
skipped = []
for date, event_id in zip(
self.direct_shocks.event_dates, self.direct_shocks.event_ids
):
if (
self.direct_shocks.impacted_assets.loc[event_id]
> boario_event_module.LOW_DEMAND_THRESH
/ self.direct_shocks.monetary_factor
).any():
ev = boario_event_module.from_series(
event_type="recovery",
impact=self.direct_shocks.impacted_assets.loc[event_id],
occurrence=date - self.direct_shocks.event_dates.min() + 5,
event_monetary_factor=self.direct_shocks.monetary_factor,
name=f"Event {event_id}",
**event_params,
)
self.sim.add_event(ev)
else:
skipped.append(event_id)
LOGGER.warning(
f"Impact for following events was too small to have an effect, skipping them for efficiency: {skipped}"
)
else:
self.direct_shocks = None
[docs]
def run_sim(self):
"""
Run the simulation for the model.
Executes the simulation loop and returns the simulation object.
Returns
-------
Simulation
The simulation object after running the simulation loop.
"""
self.sim.loop()
return self.sim