"""Wrapping the tessif-fine post-processing."""
from collections import abc, defaultdict
import numpy as np
import pandas as pd
import tessif.post_process as base
from tessif.frused import namedtuples as nts
from tessif.frused import spellings as spl
from tessif.frused.defaults import energy_system_nodes as esn_defaults
[docs]class FINEResultier(base.Resultier):
"""Transform nodes and edges into their name representation. Child of
:class:`~tessif.transform.es2mapping.base.Resultier` and mother of all
fine Resultiers.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_nodes(self, optimized_es):
"""Return a list of node uids."""
# node uids are directly produced while creating the esM with fine
nodes = list()
for node in optimized_es.uid_dict:
nodes.append(node)
return nodes
def _map_node_uids(self, optimized_es):
"""Return a list of node uids."""
cmp_input = self._cmp_input(optimized_es)
_uid_nodes = dict()
pre_uids = dict(optimized_es.uid_dict)
for node in self.nodes:
prelim_uid = pre_uids[node]
uid_dict = prelim_uid._asdict()
if pre_uids[node].component is None:
if cmp_input[node].dimension == "2dim":
uid_dict["component"] = "bus"
if hasattr(cmp_input[node], "sign"):
if cmp_input[node].sign == 1:
uid_dict["component"] = "source"
elif cmp_input[node].sign == -1:
uid_dict["component"] = "sink"
if hasattr(cmp_input[node], "commodityConversionFactors"):
# If str unlimitted is in cmp, it is an emitting source
if node + ".unlimitted" in cmp_input or ".unlimitted" in node:
uid_dict["component"] = "source"
# If str reverse is in cmp, it is a connector
elif node + ".reverse" in cmp_input or ".reverse" in node:
uid_dict["component"] = "connector"
else:
uid_dict["component"] = "transformer"
if "StorageModel" in optimized_es.componentModelingDict:
if (
node
in optimized_es.componentModelingDict[
"StorageModel"
].componentsDict
):
uid_dict["component"] = "storage"
uid = nts.Uid(**uid_dict)
else:
uid = prelim_uid
_uid_nodes[str(node)] = uid
return _uid_nodes
def _map_edges(self, optimized_es):
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
edges = list()
for node in cmp_input:
# Start: determine the cmp source specific variables: dimension, commodity and sign
dim = getattr(cmp_input[node], "dimension")
# 2d -> indicator for transmission component. 1d indicator for edge component
if hasattr(cmp_input[node], "commodity"):
# commodity is not part of e.g conversion (multiple commodities)
com = getattr(cmp_input[node], "commodity")
else:
com = "none" # If the cmp has no commodity its defined as none for error handling
if hasattr(cmp_input[node], "sign"):
# sign is not part of e.g storage (two way)
sign = getattr(cmp_input[node], "sign")
else:
sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# indicator for link of the specific commodity (grid cmp)
if dim == "2dim":
source = node
for target in cmp_input:
# Determine target specific variables (DIM, SIGN and COMMODITY)
target_dim = getattr(cmp_input[target], "dimension")
if hasattr(cmp_input[target], "commodity"):
# commodity is not part of e.g conversion (two commodities)
target_com = getattr(cmp_input[target], "commodity")
else:
target_com = "none" # If the cmp has no commodity its defined as none for error handling
if hasattr(cmp_input[target], "sign"):
# sign is +1 for Source and -1 for Sink (one way)
target_sign = getattr(cmp_input[target], "sign")
else:
target_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# Source adding to edges for each commodity
if target_com == com and target_sign == 1 and target_dim == "1dim":
edges.append(nts.Edge(str(target), str(source)))
# Sink adding to edges for each commodity
if target_com == com and target_sign == -1:
edges.append(nts.Edge(str(source), str(target)))
# Storage adding to edges (indicator: no sign cause two way use)
if target_com == com and target_dim == "1dim" and target_sign == 0:
# Grid loading the storage
edges.append(nts.Edge(str(source), str(target)))
# Storage feed in grid
edges.append(nts.Edge(str(target), str(source)))
# Conversion needs to be added in both ways and beforehand determined what is inflow commodity
if dim == "1dim" and com == "none" and sign == 0:
source = node
if ".reverse" in source:
source = source.partition(".")[0]
# Conversion specific variables (whats goes in and what comes out)
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
# Checking which commodity is set negative -> representing inflow for conversion
inflows, outflows = _parse_commodity_factors(commodity_factors)
# inflows = dict((k, v)
# for k, v in commodity_factors.items() if v < 0)
# outflows = dict((k, v)
# for k, v in commodity_factors.items() if v > 0)
for target in cmp_input:
if ".reverse" in target:
target = target.partition(".")[0]
# Determine target specific variables (DIM, SIGN and COMMODITY)
target_dim = getattr(cmp_input[target], "dimension")
if hasattr(cmp_input[target], "sign"):
target_sign = getattr(cmp_input[target], "sign")
else:
target_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
if hasattr(cmp_input[target], "commodity"):
target_com = getattr(cmp_input[target], "commodity")
else:
# commodity is not part of conversion (multiple commodities)
target_com = "none" # If the cmp has no commodity its defined as none for error handling
# Conversion of commodity producing energy unit and feed it into grid with same commodity
for outflow in outflows:
if (
target_dim == "2dim"
and target_sign == 0
and target_com == outflow
):
edges.append(nts.Edge(str(source), str(target)))
# Conversion needs to be feed by specific commodity from a 2dim grid which is then transformed
for inflow in inflows:
if (
target_dim == "2dim"
and target_sign == 0
and target_com == inflow
):
edges.append(nts.Edge(str(target), str(source)))
return edges
def _map_edge_uids(self, optimized_es):
"""
Return string representation of (inflow, node) labels as :class:`list`
"""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
edges_uid = list()
for node in cmp_input:
# Start: determine the cmp source specific variables: dimension, commodity and sign
dim = getattr(cmp_input[node], "dimension")
# 2d -> indicator for transmission component. 1d indicator for edge component
if hasattr(cmp_input[node], "commodity"):
# commodity is not part of e.g conversion (multiple commodities)
com = getattr(cmp_input[node], "commodity")
else:
com = "none" # If the cmp has no commodity its defined as none for error handling
if hasattr(cmp_input[node], "sign"):
# sign is not part of e.g storage (two way)
sign = getattr(cmp_input[node], "sign")
else:
sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# indicator for link of the specific commodity (grid cmp)
if dim == "2dim":
source = node
for target in cmp_input:
# Determine target specific variables (DIM, SIGN and COMMODITY)
target_dim = getattr(cmp_input[target], "dimension")
if hasattr(cmp_input[target], "commodity"):
# commodity is not part of e.g conversion (two commodities)
target_com = getattr(cmp_input[target], "commodity")
else:
target_com = "none" # If the cmp has no commodity its defined as none for error handling
if hasattr(cmp_input[target], "sign"):
# sign is +1 for Source and -1 for Sink (one way)
target_sign = getattr(cmp_input[target], "sign")
else:
target_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# Source adding to edges_uid for each commodity
if target_com == com and target_sign == 1 and target_dim == "1dim":
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[target],
optimized_es.uid_dict[source],
)
)
# Sink adding to edges_uid for each commodity
if target_com == com and target_sign == -1:
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[source],
optimized_es.uid_dict[target],
)
)
# Storage adding to edges_uid (indicator: no sign cause two way use in and out of the grid)
if target_com == com and target_dim == "1dim" and target_sign == 0:
# Grid loading the storage
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[source],
optimized_es.uid_dict[target],
)
)
# Storage feed in grid
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[target],
optimized_es.uid_dict[source],
)
)
# Conversion needs to be added in both ways and beforehand determined what is inflow commodity
if dim == "1dim" and com == "none" and sign == 0:
source = node
# Conversion specific variables (whats goes in and what come out)
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
# Checking which commodity is set negative -> representing inflow for conversion
inflows = {k: v for k, v in commodity_factors.items() if v < 0}
outflows = {k: v for k, v in commodity_factors.items() if v > 0}
for target in cmp_input:
# Determine target specific variables (DIM, SIGN and COMMODITY)
target_dim = getattr(cmp_input[target], "dimension")
if hasattr(cmp_input[target], "sign"):
target_sign = getattr(cmp_input[target], "sign")
else:
target_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
if hasattr(cmp_input[target], "commodity"):
target_com = getattr(cmp_input[target], "commodity")
else:
# commodity is not part of conversion (multiple commodities)
target_com = "none" # If the cmp has no commodity its defined as none for error handling
# Conversion of commodity producing energy unit and feed it into grid with same commodity
for outflow in outflows:
if (
target_dim == "2dim"
and target_sign == 0
and target_com == outflow
):
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[source],
optimized_es.uid_dict[target],
)
)
# Conversion needs to be feed by specific commodity which is then transformed
for inflow in inflows:
if (
target_dim == "2dim"
and target_sign == 0
and target_com == inflow
):
edges_uid.append(
nts.Edge(
optimized_es.uid_dict[target],
optimized_es.uid_dict[source],
)
)
edges = edges_uid
return edges
@staticmethod
def _cmp_input(optimized_es):
"""
Returns the component specific values, defined by fine as DataFrame containing all relevant informations. The
component input dataframe is a usefull gathering of all relevant informations of a single node and all existing
nodes of an energy system.
"""
es = optimized_es
cmp_input = dict()
for mtype in es.componentModelingDict:
for node in es.componentNames:
Node = getattr(es.componentModelingDict[mtype], "componentsDict").get(
node
)
if Node is not None:
cmp_input.update({node: Node})
return cmp_input
def _re_indexing(self, optimized_es):
"""
Return the optimized energy system with concrete timeframe as index of in all later on used results.
"""
es = optimized_es
for mtype in es.componentModelingDict:
if hasattr(es.componentModelingDict[mtype], "operationVariablesOptimum"):
if (
getattr(
es.componentModelingDict[mtype], "operationVariablesOptimum"
)
is not None
):
indexing = getattr(
es.componentModelingDict[mtype], "operationVariablesOptimum"
)
indexing.columns = es.timesteps
if hasattr(
es.componentModelingDict[mtype], "chargeOperationVariablesOptimum"
):
if (
getattr(
es.componentModelingDict[mtype],
"chargeOperationVariablesOptimum",
)
is not None
):
indexing = getattr(
es.componentModelingDict[mtype],
"chargeOperationVariablesOptimum",
)
indexing.columns = es.timesteps
if hasattr(
es.componentModelingDict[mtype], "dischargeOperationVariablesOptimum"
):
if (
getattr(
es.componentModelingDict[mtype],
"dischargeOperationVariablesOptimum",
)
is not None
):
indexing = getattr(
es.componentModelingDict[mtype],
"dischargeOperationVariablesOptimum",
)
indexing.columns = es.timesteps
if hasattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
):
if (
getattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
)
is not None
):
indexing = getattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
)
indexing.columns = es.timesteps
# This fragment is crucial if the actual used storages do have any emissions
# -> if conversion components added to the es which charge/discharge the storage.
# Storage commodity is renamed due to precise formulation for the optimization process -> name it back
# conversion input/output need to be deleted for post processing capabillities
for node in es.componentNames:
if ".input" in node or ".output" in node:
storage = node.partition(".")[0]
com = (
es.componentModelingDict["StorageModel"]
.componentsDict[storage]
.commodity.partition("+")[0]
)
es.componentModelingDict["StorageModel"].componentsDict[
storage
].commodity = com
if node in es.componentModelingDict["ConversionModel"].componentsDict:
del [
es.componentModelingDict["ConversionModel"].componentsDict[node]
]
return es
def _ts_results(self, optimized_es):
"""
Time series results is a gatherer for all relevant results within the fine energy system. It is iterating
through the result series of the ES, bringing them all in one and start reforming the values to tessifs shape.
"""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(optimized_es)
time_series_results = pd.DataFrame()
for mtype in es.componentModelingDict:
index_list = list()
model_series_results = pd.DataFrame()
if hasattr(es.componentModelingDict[mtype], "operationVariablesOptimum"):
# Grabbing SourceSink + Conversion + Storage Dataframe from results representing the loads + CO2
model_series_results = getattr(
es.componentModelingDict[mtype],
"operationVariablesOptimum",
pd.DataFrame(),
)
if model_series_results is not None:
# Renaming the indexes of the modelresults to delete the location within the header
for index_double in model_series_results.index:
# Cut header to one level (avoid cutting to one letter with if statement)
if len(index_double) == 2:
index_list.append(str(index_double[0]))
else:
index_list.append(str(index_double))
model_series_results.index = index_list
time_series_results = time_series_results.append(
model_series_results
)
if hasattr(
es.componentModelingDict[mtype], "chargeOperationVariablesOptimum"
):
# Grabbing Storage Dataframe from results
if (
getattr(
es.componentModelingDict[mtype],
"chargeOperationVariablesOptimum",
)
is not None
):
charge = (
getattr(
es.componentModelingDict[mtype],
"chargeOperationVariablesOptimum",
pd.DataFrame(),
)
).abs()
discharge = (
getattr(
es.componentModelingDict[mtype],
"dischargeOperationVariablesOptimum",
pd.DataFrame(),
)
).abs()
# Adding Suffix Charge/Discharge to Storage Inflow/Outflow
[
charge.rename(
index={row: row + " Charge"}, level=0, inplace=True
)
for row in charge.index.levels[0]
]
[
discharge.rename(
index={row: row + " Discharge"}, level=0, inplace=True
)
for row in discharge.index.levels[0]
]
model_series_results = pd.concat([charge, discharge], axis="index")
if model_series_results is not None:
for index_double in model_series_results.index:
# Cut header to one level (avoid cutting to one letter with if statement)
if len(index_double) == 2:
index_list.append(str(index_double[0]))
else:
index_list.append(str(index_double))
model_series_results.index = index_list
time_series_results = time_series_results.append(
model_series_results
)
# when there are components which have no true design variable the processed timeseries has to be the result
# df = pd.DataFrame()
for cmp in cmp_input:
if cmp not in time_series_results.index:
if (
cmp
not in es.componentModelingDict["TransmissionModel"].componentsDict
):
if hasattr(cmp_input[cmp], "processedOperationRateFix"):
if cmp_input[cmp].processedOperationRateFix is not None:
df = cmp_input[cmp].processedOperationRateFix
df.rename(columns={"Default Region": cmp}, inplace=True)
df = df.transpose()
# reconstruct non multiindex data frame
df = pd.DataFrame(
df.values.tolist(),
index=(cmp,),
columns=time_series_results.columns,
)
time_series_results = time_series_results.append(df)
if hasattr(cmp_input[cmp], "processedOperationRateMax"):
if cmp_input[cmp].processedOperationRateMax is not None:
cap = cmp_input[cmp].capacityMax
df = cmp_input[cmp].processedOperationRateMax * cap
df.rename(columns={"Default Region": cmp}, inplace=True)
df = df.transpose()
# reconstruct non multiindex data frame
df = pd.DataFrame(
df.values.tolist(),
index=(cmp,),
columns=time_series_results.columns,
)
time_series_results = time_series_results.append(df)
# Adding Zero Lines for Components that are not calculated in the ESM but taking place
for cmp in cmp_input:
if cmp not in time_series_results.index:
if hasattr(es.componentModelingDict, "StorageModel"):
if cmp in es.componentModelingDict["StorageModel"].componentsDict:
if cmp + " Charge" not in time_series_results.index:
time_series_results.loc[cmp + " Charge"] = 0
if cmp + " Discharge" not in time_series_results.index:
time_series_results.loc[cmp + " Discharge"] = 0
else:
if (
cmp
not in es.componentModelingDict[
"TransmissionModel"
].componentsDict
):
time_series_results.loc[cmp] = 0
# fine simulating 0 values sometime to negative very small float, which have to be set as 0
time_series_results[time_series_results < 0] = 0
# Transposing Time Series Results to get tessifs shape
time_series_results = time_series_results.transpose()
time_series_results.round(decimals=3)
# Adding the given ESM timeseries as index to work with
time_series_results.index = es.timesteps
for node in time_series_results.columns:
if ".input" in node or ".output" in node:
time_series_results.drop(node, axis="columns", inplace=True)
return time_series_results
[docs]class IntegratedGlobalResultier(FINEResultier, base.IntegratedGlobalResultier):
"""
Global Results
Extracting the integrated global results out of the energy system and
conveniently aggregating them (rounded to unit place) inside a dictionairy
keyed by result name.
Integrated global results (IGR) mapped by result name.
Integrated global results currently consist of meta and non-meta
results. the **meta** results are handled by the :mod:`~tessif.analyze`
module (see :attr:`tessif.analyze.Comparatier.integrated_global_results`)
and consist of:
- ``time``
- ``memory``
results. whereas the **non-meta** results usually consist of:
- ``emissions``
- ``costs``
results, which are handled here. Tessif's energy system, however, allow to
formulate a number of
:attr:`~tessif.model.energy_system.AbstractEnergySystem.global_constraints`
which then would automatically be post processed here.
The befornamed strings serve as key inside the mapping.
Note
----
Capacity costs calculated within the optimizing process in fine are related to the overall capacities
which are installed in the energy system and not only to the new build capacity of the component.
This results in differences in simulated costs compared to the post processed costs.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.IntegratedGlobalResultier>`.
Examples
--------
1. Accessing the global results, containing simulated costs and emissions,
as well as post processed flow and capacity costs:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> resultier = post_process_fine.IntegratedGlobalResultier(
... coded_fine_examples.create_expansion_example())
>>> print(resultier.global_results)
{'emissions (sim)': 20.0, 'costs (sim)': 41.0, 'opex (ppcd)': 40.0, 'capex (ppcd)': 1.0}
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_global_results(self, optimized_es):
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
load_res = LoadResultier(es)
cap_res = CapacityResultier(es)
flow_res = FlowResultier(es)
ts_results = self._ts_results(es)
# Simulated overall costs
total_costs = float("+inf")
if hasattr(es, "objectiveValue"):
if getattr(es, "objectiveValue") is not None:
total_costs = es.objectiveValue * es.numberOfYears
# ---------- CAPEX - Recalculation -------------
# If the value investPerCapacity is available, the already installed capacities of the component are
# also included in the total costs of the simulation. these must be subsequently calculated out in order
# to ensure comparability.
for node in self.nodes:
if node in ts_results:
initial_capacity = cap_res.node_original_capacity[node]
final_capacity = cap_res.node_installed_capacity[node]
expansion_cost = float(
getattr(cmp_input[node], "investPerCapacity").values
)
# expansion_cost = cap_res.node_expansion_costs[node]
# The so far installed capacity costs need to be deducted if they are used in the fine esm
if getattr(cmp_input[node], "capacityMin") is not None:
if getattr(cmp_input[node], "capacityMin").values != 0.0:
capacityMin = float(
getattr(cmp_input[node], "capacityMin").values
)
initial_cost = capacityMin * expansion_cost
total_costs -= initial_cost
total_emissions = float("+inf")
if hasattr(es, "emission_default"):
emission_default = es.emission_default
if emission_default in ts_results:
total_emissions = sum(ts_results[emission_default])
else:
for limit in cmp_input:
if hasattr(cmp_input[limit], "yearlyLimit"):
if getattr(cmp_input[limit], "yearlyLimit") is not None:
simLimit = cmp_input[limit].yearlyLimit * es.numberOfYears
total_emissions = sum(ts_results[limit])
if simLimit < total_emissions:
print("constrained wrong")
emissions_ppcd = 0.0
for edge in flow_res.edges:
edge_emission_ppcd = (
flow_res.edge_specific_emissions[edge]
* flow_res.edge_net_energy_flow[edge]
)
emissions_ppcd += edge_emission_ppcd
# Outflow based post-processed OPEX results
opex_ppcd = dict()
for edge in self.edges:
opex_ppcd[edge] = float(
flow_res.edge_specific_flow_costs[edge]
* flow_res.edge_net_energy_flow[edge]
)
flow_costs = sum(opex_ppcd.values())
# Capex post-processed
capital_costs = 0.0
for node in self.nodes:
if getattr(cmp_input[node], "investPerCapacity").values != 0:
initial_capacity = pd.Series(cap_res.node_original_capacity[node])
final_capacity = pd.Series(cap_res.node_installed_capacity[node])
# expansion_cost = float(getattr(cmp_input[node], 'investPerCapacity').values)
expansion_cost = cap_res.node_expansion_costs[node]
if not any([cap is None for cap in (final_capacity, initial_capacity)]):
node_expansion_costs = (
final_capacity - initial_capacity
) * expansion_cost
else:
node_expansion_costs = 0.0
if isinstance(initial_capacity, pd.Series):
node_expansion_costs = sum(node_expansion_costs)
# If the actual used capacity in the esM is smaller than the initially one negative expansion costs occur
# to avoid negative capital costs, smaller than zero costs are capped to zero
if node_expansion_costs < 0:
node_expansion_costs = 0.0
capital_costs += node_expansion_costs
return {
"emissions (sim)": round(total_emissions, 0),
"costs (sim)": round(total_costs, 0),
"opex (ppcd)": round(flow_costs, 0),
"capex (ppcd)": round(capital_costs, 0),
}
[docs]class ScaleResultier(FINEResultier, base.ScaleResultier):
"""
Extract number of constraints and store them as int.
Parameters
----------
optimized_es:
:ref:`Model <SupportedModels>` specific, optimized energy system
containing its results.
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.ScaleResultier>`.
Examples
--------
1. Call and optimize a FINE energy system model.
>>> # import post fine processing and example modules:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> # optimize the energy system:
>>> fine_es = coded_fine_examples.create_mwe()
>>> # post process the capacity results:
>>> resultier = post_process_fine.ScaleResultier(fine_es)
2. Access the number of constraints.
>>> print(resultier.number_of_constraints)
27
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_number_of_constraints(self, optimized_es):
"""Interface to extract the number of constraints out of the
:ref:`model <SupportedModels>` specific, optimized energy system.
"""
optimized_es.pyM.compute_statistics()
return optimized_es.pyM.statistics.number_of_constraints
[docs]class LoadResultier(FINEResultier, base.LoadResultier):
"""
Loads2mapping
Transforming flow results into dictionairies keyed by node uid string
representation.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.LoadResultier>`.
Examples
--------
1. Accessing a node's outflows as positive numbers and a node's inflows as
negative numbers:
(See :attr:`tessif.transform.es2mapping.base.LoadResultier.node_load`
for more documentation):
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> resultier = post_process_fine.LoadResultier(
... coded_fine_examples.chp_example())
>>> print(resultier.node_load['CHP'])
CHP Gas Grid Heat Grid Power Line
1990-07-13 00:00:00 -33.333333 6.666667 10.0
1990-07-13 01:00:00 -33.333333 6.666667 10.0
1990-07-13 02:00:00 -33.333333 6.666667 10.0
1990-07-13 03:00:00 -33.333333 6.666667 10.0
2. Accessing a node's inflows as positive numbers
(See :attr:`tessif.transform.es2mapping.base.LoadResultier.node_inflows`
for more documentation):
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> resultier = post_process_fine.LoadResultier(
... coded_fine_examples.chp_example())
>>> print(resultier.node_inflows['CHP'])
CHP Gas Grid
1990-07-13 00:00:00 33.333333
1990-07-13 01:00:00 33.333333
1990-07-13 02:00:00 33.333333
1990-07-13 03:00:00 33.333333
3. Accessing a node's outflows as positive numbers (See
:attr:`tessif.transform.es2mapping.base.LoadResultier.node_outflows`
for more documentation):
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> resultier = post_process_fine.LoadResultier(
... coded_fine_examples.chp_example())
>>> print(resultier.node_outflows['CHP'])
CHP Heat Grid Power Line
1990-07-13 00:00:00 6.666667 10.0
1990-07-13 01:00:00 6.666667 10.0
1990-07-13 02:00:00 6.666667 10.0
1990-07-13 03:00:00 6.666667 10.0
4. Accessing a node's summed inflows as positive numbers (in case it is
of component
:attr:`sink <tessif.frused.defaults.registered_component_types>`)
or a node's summed outflows (in case it is not a sink).
(See
:attr:`tessif.transform.es2mapping.base.LoadResultier.node_summed_loads`
for more documentation):
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> resultier = post_process_fine.LoadResultier(coded_fine_examples.chp_example())
>>> print(resultier.node_summed_loads['CHP'])
1990-07-13 00:00:00 16.666667
1990-07-13 01:00:00 16.666667
1990-07-13 02:00:00 16.666667
1990-07-13 03:00:00 16.666667
Freq: H, dtype: float64
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_loads(self, optimized_es):
"""Map loads to node labels"""
time_series_results = self._ts_results(optimized_es)
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
# Use defaultdict of empty DataFrame as loads container:
_loads = defaultdict(lambda: pd.DataFrame())
for node in self.nodes:
inflows = pd.DataFrame()
outflows = pd.DataFrame()
for ntype in self.nodes:
if ntype != node:
# Declaring node specific variables
node_dim = getattr(cmp_input[node], "dimension")
if hasattr(cmp_input[node], "commodity"):
# commodity is not part of e.g conversion (multiple commodities)
node_com = getattr(cmp_input[node], "commodity")
else:
node_com = "none"
if hasattr(cmp_input[node], "sign"):
# sign is +1 for Source and -1 for Sink (one way)
node_sign = getattr(cmp_input[node], "sign")
else:
node_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# Declaring ntype specific variables
type_dim = getattr(cmp_input[ntype], "dimension")
if hasattr(cmp_input[ntype], "commodity"):
# commodity is not part of e.g conversion (two commodities)
type_com = getattr(cmp_input[ntype], "commodity")
else:
type_com = "none" # If the cmp has no commodity its defined as none for error handling
if hasattr(cmp_input[ntype], "sign"):
type_sign = getattr(cmp_input[ntype], "sign")
else:
type_sign = 0 # If the cmp has no sign +1/-1 its defined as 0 for error handling
# Actual Mapping starts here:
# Adding Grid Component with all loads included which show in/out
if node_dim == "2dim":
# Defining Node/Type and Target/Load to assign dim, sign and com for both
target, load = node, ntype
target_dim, target_com, target_sign = (
node_dim,
node_com,
node_sign,
)
load_dim, load_com, load_sign = type_dim, type_com, type_sign
# Adding Source and Sinks as loads of the Grid
# Source is inflow
if (
load_com == target_com
and load_dim == "1dim"
and load_sign == 1
):
col_name = [
col
for col in time_series_results.columns
if load in col and len(col) == len(load)
]
inflows = pd.concat(
[inflows, time_series_results[col_name].multiply(-1)],
axis="columns",
)
# Sink is outflow
if (
load_com == target_com
and load_dim == "1dim"
and load_sign == -1
):
col_name = [
col
for col in time_series_results.columns
if load in col and len(col) == len(load)
]
outflows = pd.concat(
[outflows, time_series_results[col_name]],
axis="columns",
)
# Conversion is tricky: Results represent the outflow -> recalculate the inflow from that
if load_com == "none":
col_name = [
col
for col in time_series_results.columns
if load in col and len(col) == len(load)
]
# Feed stands for the results feed from the Conversion-ModelingDict
conv_feed = pd.DataFrame(time_series_results[col_name])
# Next step is determining the inflow and outflow factors for recalculating
commodity_factors = getattr(
cmp_input[load], "commodityConversionFactors"
)
conv_in_com, conv_out_com = _parse_commodity_factors(
commodity_factors
)
# conv_in_com = dict(
# (k, v) for k, v in commodity_factors.items() if v < 0)
# conv_out_com = dict(
# (k, v) for k, v in commodity_factors.items() if v > 0)
# If the target commodity equals the factor name which is identical to the commodity names
# the for-loop declares the inflow for the grid and multiply it with the
# conversion factor (=efficiency)
for factor_out in conv_out_com:
if factor_out == target_com:
conv_factor = conv_out_com[factor_out]
if isinstance(conv_factor, pd.Series):
conv_outflow = inflows.apply(
lambda x: np.asarray(x)
* np.asarray(conv_factor)
)
else:
conv_outflow = conv_feed.multiply(conv_factor)
# conv_outflow = conv_feed.multiply(
# conv_factor)
# From sight of Grid the Transformer is signed as inflow
inflows = pd.concat(
[inflows, conv_outflow.multiply(-1)],
axis="columns",
)
# Adding Connector Outflows to Grid results
if load + ".reverse" in cmp_input:
outflows = pd.concat(
[
outflows,
time_series_results[load + ".reverse"],
],
axis="columns",
)
outflows.rename(
columns={
load
+ ".reverse": load.partition(".")[0]
},
inplace=True,
)
# Same procedure for the inflows of the trfo representing the outflows of the grid component
# no conversion factor is needed due to the efficiency is related to elec. production
for factor_in in conv_in_com:
if factor_in == target_com:
# From sight of grid the Transformer is signed as outflow
outflows = pd.concat(
[outflows, conv_feed], axis="columns"
)
# Adding Connector Inflows to Grid results
if load + ".reverse" in cmp_input:
rev_conv = getattr(
cmp_input[load + ".reverse"],
"commodityConversionFactors",
)
inflows = pd.concat(
[
inflows,
time_series_results[
load + ".reverse"
].multiply(-rev_conv[factor_in]),
],
axis="columns",
)
inflows.rename(
columns={
load
+ ".reverse": load.partition(".")[0]
},
inplace=True,
)
# Adding Storage for both ways in and out grid
if (
target_com == load_com
and target_sign == 0
and load_sign == 0
):
# Charge from Grid to Sto -> Outflow
col_name = [
col
for col in time_series_results.columns
if str(load + " Charge") in col
and len(col) == len(load + " Charge")
]
outflows = pd.concat(
[outflows, time_series_results[col_name]],
axis="columns",
)
outflows.rename(
columns={load + " Charge": load}, inplace=True
)
# Discharge from Sto to Grid -> Inflow
col_name = [
col
for col in time_series_results.columns
if str(load + " Discharge") in col
and len(col) == len(load + " Discharge")
]
inflows = pd.concat(
[inflows, time_series_results[col_name].multiply(-1)],
axis="columns",
)
inflows.rename(
columns={load + " Discharge": load}, inplace=True
)
# Handling generated data and storing it in loads
if not outflows.empty or not inflows.empty:
inflows = inflows.replace(
{float(0): -float(0), float(0): -float(0)}
)
outflows = outflows.replace({-float(0): float(0)})
temp_df = pd.concat([inflows, outflows], axis="columns")
temp_df.columns.name = str(node)
_loads[str(node)] = temp_df
# Adding Source, Sink and Conversion as Target and generating load values
if (
node_dim == "1dim"
): # Indicator for single attached nodes by loads
# Defining Node/Type and Target/Load to assign dim, sign and com for both
target, load = ntype, node
target_dim, target_com, target_sign = (
type_dim,
type_com,
type_sign,
)
load_dim, load_com, load_sign = node_dim, node_com, node_sign
# Target to which the load flows
# Adding Sources and its loads -> inflows
if (
load_com == target_com
and load_sign == 1
and target_dim == "2dim"
):
col_name = [
col
for col in time_series_results.columns
if load in col and len(col) == len(load)
]
outflows = pd.concat(
[outflows, time_series_results[col_name]],
axis="columns",
)
outflows.rename(columns={load: target}, inplace=True)
# Adding Sinks and its loads -> outflow
if (
load_com == target_com
and load_sign == -1
and target_dim == "2dim"
):
col_name = [
col
for col in time_series_results.columns
if load in col and len(col) == len(load)
]
inflows = pd.concat(
[inflows, time_series_results[col_name]], axis="columns"
).multiply(-1)
inflows.rename(columns={load: target}, inplace=True)
# Adding Storages and its loads
if (
load_com == target_com
and load_sign == 0
and target_dim == "2dim"
):
# Charge from Grid to Sto -> Outflow
col_name = [
col
for col in time_series_results.columns
if str(load + " Discharge") in col
and len(col) == len(load + " Discharge")
]
outflows = pd.concat(
[outflows, time_series_results[col_name]],
axis="columns",
)
outflows.rename(
columns={load + " Discharge": target}, inplace=True
)
# Discharge from Sto to Grid -> Inflow
col_name = [
col
for col in time_series_results.columns
if str(load + " Charge") in col
and len(col) == len(load + " Charge")
]
inflows = pd.concat(
[inflows, time_series_results[col_name].multiply(-1)],
axis="columns",
)
inflows.rename(
columns={load + " Charge": target}, inplace=True
)
# Handling generated data and storing it in loads
if not outflows.empty or not inflows.empty:
# inflows = inflows.multiply(-1)
inflows = inflows.replace(
{0: -float(0), float(0): -float(0)}
)
outflows = outflows.replace({-float(0): float(0)})
temp_df = pd.concat([inflows, outflows], axis="columns")
temp_df.columns.name = str(node)
_loads[str(node)] = temp_df
break
# Adding Conversion as target and its loads to/from grid
if node_com == "none":
target, load = node, ntype
target_dim, target_com, target_sign = (
node_dim,
node_com,
node_sign,
)
load_dim, load_com, load_sign = type_dim, type_com, type_sign
if load_dim == "2dim":
# Finding inflow of Transformer representing the outflow from the grid
col_name = [
col
for col in time_series_results.columns
if target in col and len(col) == len(target)
]
inflows = pd.concat(
[inflows, time_series_results[col_name]], axis="columns"
)
# Finding outflow of Transformer and the correct grid commodity
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
conv_in, conv_out = _parse_commodity_factors(
commodity_factors
)
# conv_in = dict(
# (k, v) for k, v in commodity_factors.items() if v < 0)
# conv_out = dict(
# (k, v) for k, v in commodity_factors.items() if v > 0)
for factor_out in conv_out:
# if factor_out == load_com:
conv_factor = conv_out[factor_out]
if isinstance(conv_factor, pd.Series):
outflow = inflows.apply(
lambda x: np.asarray(x)
* np.asarray(conv_factor)
)
else:
outflow = inflows.multiply(conv_factor)
# Next step is to define which product flows into which grid and create the outflows
for grid in es.componentModelingDict[
"TransmissionModel"
].componentsDict:
grid_com = getattr(cmp_input[grid], "commodity")
if grid_com == factor_out:
outflow.rename(
columns={target: grid}, inplace=True
)
outflows = pd.concat(
[outflows, outflow], axis="columns"
)
# Renaming: Grid Finder -> search for the correct transmission component
for key in conv_in.keys():
for grid in self.nodes:
grid_dim = getattr(cmp_input[grid], "dimension")
if grid_dim == "2dim":
grid_com = getattr(cmp_input[grid], "commodity")
if grid_com == key:
inflows.rename(
columns={target: grid}, inplace=True
)
# Handling generated data and storing it in loads
if not outflows.empty or not inflows.empty:
inflows = inflows.multiply(-1)
inflows = inflows.replace(
{0: -float(0), float(0): -float(0)}
)
outflows = outflows.replace({-float(0): float(0)})
temp_df = pd.concat([inflows, outflows], axis="columns")
# If emitting Source is added as transformer -> delete "inflow" from unlimitted source
if node in temp_df.columns:
temp_df.drop([node], axis=1, inplace=True)
temp_df.columns.name = str(node)
_loads[str(node)] = temp_df
break
# Connector component is present as two transformers -> combine them:
_loads = _loads
for node in cmp_input:
if ".reverse" in node:
connector = node.partition(".")[0]
commodity_factors = getattr(
cmp_input[connector], "commodityConversionFactors"
)
conv_out = {k: v for k, v in commodity_factors.items() if v > 0}
for conv in conv_out:
# Conversion Factor is related to the reverse grid
grid_rev = conv.partition(".")[0]
value = float(conv_out[conv])
connector_rev = node
commodity_factors_rev = getattr(
cmp_input[connector_rev], "commodityConversionFactors"
)
conv_out_rev = {k: v for k, v in commodity_factors_rev.items() if v > 0}
for conv_rev in conv_out_rev:
# related to the original grid
grid = conv_rev.partition(".")[0]
value_rev = float(conv_out_rev[conv_rev])
# Inflows representing the first connector way 1 -> 2
inflows = pd.concat(
[
time_series_results[connector],
time_series_results[connector_rev],
],
axis="columns",
)
inflows.rename(columns={connector: grid}, inplace=True)
inflows.rename(columns={connector_rev: grid_rev}, inplace=True)
inflows = inflows.multiply(-1)
inflows = inflows.replace({0: -float(0), float(0): -float(0)})
# Outflows representing the second connector way 2 -> 1
outflows = pd.concat(
[
time_series_results[connector_rev] * value_rev,
time_series_results[connector] * value,
],
axis="columns",
)
outflows.rename(columns={connector: grid_rev}, inplace=True)
outflows.rename(columns={connector_rev: grid}, inplace=True)
temp_df = pd.concat([inflows, outflows], axis="columns")
_loads.pop(connector)
# _loads.pop(connector_rev)
_loads[str(connector)] = temp_df
return dict(_loads)
[docs]class CapacityResultier(base.CapacityResultier, LoadResultier):
"""
Capacities2mapping
Transforming installed capacity results dictionairies keyed by node.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
Examples
--------
1. Accessing the installed capacities and the characteristic values of the
:attr:`minimum working example
<tessif.examples.data.fine.create_mwe>`
(See
:attr:`tessif.transform.es2mapping.base.CapacityResultier.node_installed_capacity`
for more documentation):
>>> # import post fine processing and example modules:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> # optimize the energy system:
>>> fine_es = coded_fine_examples.create_mwe()
>>> # post process the capacity results:
>>> resultier = post_process_fine.CapacityResultier(fine_es)
>>> # access the capacity results:
>>> for node, capacity in resultier.node_installed_capacity.items():
... print(f'{node}: {capacity}')
PowerLine: None
CBET: None
Demand: 10.0
Renewable: 10.0
Gas Station: 23.81
Transformer: 10.0
>>> # acces the characteristic value results:
>>> for node, cv in resultier.node_characteristic_value.items():
... if cv is not None:
... cv = round(cv, 2)
... print(f'{node}: {cv}')
PowerLine: None
CBET: None
Demand: 1.0
Renewable: 0.75
Gas Station: 0.25
Transformer: 0.25
2. Accessing the installed capacities and the characteristic value of the
:attr:`Storage Example
<tessif.examples.data.fine.storage_example>`
>>> # import fine postprocessing and example modules:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> # optimize the energy system:
>>> fine_es = coded_fine_examples.storage_example()
>>> # post process the capacity results:
>>> resultier = post_process_fine.CapacityResultier(fine_es)
>>> # access the capacity results:
>>> for node, capacity in resultier.node_installed_capacity.items():
... print(f'{node}: {capacity}')
Power Line: None
Demand: 10.0
Generator: 20.0
Storage: 10.0
>>> # acces the characteristic value results:
>>> for node, cv in resultier.node_characteristic_value.items():
... if cv is not None:
... cv = round(cv, 2)
... print(f'{node}: {cv}')
Power Line: None
Demand: 0.94
Generator: 0.47
Storage: 0.34
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
@property
def node_characteristic_value(self):
return self._characteristic_values
def _map_installed_capacities(self, optimized_es):
"""
Values are continuously stored over. e.g. Renewable: Have a variable operation rate, but a maximum capacity
has been set.
Therefore the op-rate is overstored. If a variable capacity was calculated by fine in order to find an optimum,
the initially defined op-rate is also stored over here. The logical order is:
OperationRateFix (e.g Demand) -> capacityVariableOptimum (e.g Gas Import restricted by Transformer)
-> capacityMax (e.g Renewable set by User and based on fluctuating data).
"""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
_installed_capacities = defaultdict(float)
for mtype in es.componentModelingDict:
for node in es.componentModelingDict[mtype].componentsDict:
if node in self.nodes:
# Make sure all busses have installed capacity of 'variable_capacity'
if getattr(cmp_input[node], "dimension") == "2dim":
value = esn_defaults["variable_capacity"]
else:
# Start with default for every node
value = esn_defaults["installed_capacity"]
# Getting variable capacities from component Modeling dicts as dataframe
if (
getattr(
es.componentModelingDict[mtype], "capacityVariablesOptimum"
)
is not None
):
cap = getattr(
es.componentModelingDict[mtype], "capacityVariablesOptimum"
)
value = esn_defaults["installed_capacity"]
for cmp in cap.index:
if node == cmp:
value = round(float(cap.loc[node]), 3)
# Transformer capacity is yet related to input
if (
mtype == "ConversionDynamicModel"
or mtype == "ConversionModel"
):
cap = value
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
# conv_out = dict(
# (k, v) for k, v in commodity_factors.items() if v > 0 and k not in spl.emissions)
conv_in, conv_out = _parse_commodity_factors(
commodity_factors
)
conv_out = {
key: value
for key, value in conv_out.copy().items()
if key not in spl.emissions
}
# distinguish between multiple or single outflows
if len(conv_out) > 1:
value = dict()
for outflow in conv_out:
temp_cap = round(cap * conv_out[outflow], 3)
# Finding Grid:
for grid in cmp_input:
if (
cmp_input[grid].dimension == "2dim"
and cmp_input[grid].commodity == outflow
):
value.update({grid: temp_cap})
value = pd.Series(value)
else:
grid = self.outbounds[node][0]
grid_com = cmp_input[grid].commodity
# check for time varying efficiency transformers:
if isinstance(conv_out[grid_com], pd.Series):
value = round(cap * min(conv_out[grid_com]))
else:
value = round(cap * conv_out[grid_com], 3)
# Sinks do not have a capacity bound, so the installed capacity is taken from the operation rate
if hasattr(cmp_input[node], "operationRateFix"):
if getattr(cmp_input[node], "operationRateFix") is not None:
cap = getattr(cmp_input[node], "operationRateFix")
value = round(float(cap.values.max()), 3)
# Storing capacity
_installed_capacities[node] = value
# Connector support
# Connectors do not have a fixed installed capacity why they are always in the variables
for node in self.nodes:
if ".reverse" in node:
temp_df = dict()
connector_reverse = node
connector = node.partition(".")[0]
for grid in self.outbounds:
if connector in self.outbounds[grid]:
temp_df.update({grid: _installed_capacities[connector]})
if connector_reverse in self.outbounds[grid]:
temp_df.update({grid: _installed_capacities[connector_reverse]})
_installed_capacities.pop(connector)
_installed_capacities.pop(connector_reverse)
_installed_capacities[connector] = temp_df
# Check if the processed capacity is zero and an initial capacity is
# given- set initital as installed
for node in _installed_capacities:
if isinstance(_installed_capacities[node], abc.Iterable):
if all(_installed_capacities[node]) == 0.0:
_installed_capacities[node] = pd.Series(
self._map_original_capacities(optimized_es)[node]
)
else:
if _installed_capacities[node] == 0.0:
_installed_capacities[node] = self._map_original_capacities(
optimized_es
)[node]
return dict(_installed_capacities)
def _map_original_capacities(self, optimized_es):
"""
Mapping of the previously set capacities before the actual optimisation.
Similar to capacity mapping,
but without the fine param: capacityVariablesOptimum.
"""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
ts_results = self._ts_results(es)
_installed_capacities = defaultdict(float)
for node in self.nodes:
cap_min = 0.0
if cmp_input[node].capacityMin is not None:
cap_min = round(
float((getattr(cmp_input[node], "capacityMin")).values), 3
)
elif cmp_input[node].capacityMax is not None:
cap_min = round(
float((getattr(cmp_input[node], "capacityMax")).values), 3
)
if cap_min == 0:
if cmp_input[node].capacityMax is not None:
cap_min = float((getattr(cmp_input[node], "capacityMax")).values)
if hasattr(cmp_input[node], "commodityConversionFactors"):
value = cap_min
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
# conv_out = dict((k, v) for k, v in commodity_factors.items(
# ) if v > 0 and k not in spl.emissions)
conv_in, conv_out = _parse_commodity_factors(commodity_factors)
conv_out = {
key: value
for key, value in conv_out.copy().items()
if key not in spl.emissions
}
if len(conv_out) > 1:
value = dict()
for conv in conv_out:
for grid in cmp_input:
if (
cmp_input[grid].dimension == "2dim"
and cmp_input[grid].commodity == conv
):
value.update({grid: (cap_min * conv_out[conv])})
value = pd.Series(value)
else:
grid_com = cmp_input[self.outbounds[node][0]].commodity
# check for time varying efficiency transformers:
if isinstance(conv_out[grid_com], pd.Series):
value = cap_min * min(conv_out[grid_com])
else:
value = cap_min * conv_out[grid_com]
cap_min = value
_installed_capacities[node] = cap_min
# Add the maximum used energy by any sink as installed capacity
if hasattr(cmp_input[node], "sign"):
if getattr(cmp_input[node], "sign") == -1:
_installed_capacities[node] = round(max(ts_results[node]), 3)
# Add maximum from operation rate as installed capacity if the component is fixed
if cmp_input[node].hasCapacityVariable is False:
if hasattr(cmp_input[node], "operationRateFix"):
value = max(ts_results[node])
_installed_capacities[node] = round(value, 3)
# Check if all nodes are existend and set if missing original cap to zero
for node in self.nodes:
if node not in _installed_capacities:
_installed_capacities[node] = esn_defaults["installed_capacity"]
return dict(_installed_capacities)
def _map_expansion_costs(self, optimized_es):
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
expansion_costs = dict()
if hasattr(es, "expansion_costs"):
expansion_costs = es.expansion_costs
else:
for node in self.nodes:
if getattr(cmp_input[node], "investPerCapacity").values != 0:
costs = float(getattr(cmp_input[node], "investPerCapacity").values)
else:
costs = esn_defaults["expansion_costs"]
expansion_costs[node] = costs
return expansion_costs
def _map_characteristic_values(self, optimized_es):
"""Map node uid string representation to characteristic value."""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
_characteristic_values = defaultdict(float)
for node in es.uid_dict:
node_dim = getattr(cmp_input[node], "dimension")
if node_dim == "2dim": # Indicator for Grid which is always variable
_characteristic_values[node] = esn_defaults["characteristic_value"]
else:
node_installed_capacities = self.node_installed_capacity[node]
if isinstance(node_installed_capacities, abc.Iterable):
multi_char_val = dict()
for cap in node_installed_capacities.index:
if node_installed_capacities[cap] > 0:
# print(self.node_outflows)
# print(node_installed_capacities[cap])
# print("node:", node)
# print("cap:", cap)
temp_value = (
self.node_outflows[node][cap].mean(axis="index")
/ node_installed_capacities[cap]
)
multi_char_val.update({cap: round(temp_value, 2)})
else:
multi_char_val.update({cap: 0.0})
_characteristic_values[node] = pd.Series(multi_char_val)
else:
if node_installed_capacities != 0:
char_mean = abs(self.node_summed_loads[node].mean(axis="index"))
_characteristic_values[node] = round(
char_mean / node_installed_capacities, 2
)
if _characteristic_values[node] > 1.0:
_characteristic_values[node] = 1.0
else:
_characteristic_values[node] = 0.0
if "StorageModel" in es.componentModelingDict:
if node in es.componentModelingDict["StorageModel"].componentsDict:
if node_installed_capacities != 0:
char_mean = abs(
StorageResultier(es).node_soc[node].mean(axis="index")
)
_characteristic_values[node] = round(
char_mean / node_installed_capacities, 2
)
else:
_characteristic_values[node] = 0.0
return dict(_characteristic_values)
[docs]class StorageResultier(FINEResultier, base.StorageResultier):
r"""Transforming storage results into dictionairies keyed by node.
Parameters
----------
optimized_es: fine energy system
An optimized fine energy system containing its results
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.StorageResultier>`.
Examples
--------
1. Display a storage-node's capacity:
Setting :attr:`spellings.get_from's <tessif.frused.spellings.get_from>`
logging level to debug for decluttering doctest output:
>>> from tessif.frused import configurations
>>> configurations.spellings_logging_level = 'debug'
Actual Example:
>>> # import post fine processing and example modules:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> # optimize the energy system:
>>> fine_es = coded_fine_examples.storage_example()
>>> # post process the capacity results:
>>> resultier = post_process_fine.StorageResultier(fine_es)
>>> print(post_process_fine.StorageResultier(fine_es).node_soc['Storage'])
1990-07-13 00:00:00 0.0
1990-07-13 01:00:00 0.0
1990-07-13 02:00:00 7.0
1990-07-13 03:00:00 0.0
1990-07-13 04:00:00 10.0
Freq: H, Name: Storage, dtype: float64
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_states_of_charge(self, optimized_es):
"""Map storage labels to their states of charge. The present value for the SOC calculated by fine,
describes the pure time sequence of charging and discharging and can therefore not be determined from
the operation variable optimum values. Example can be seen if the Storage example is simulated
--------
Examples
>>> # import post fine processing and example modules:
>>> import tessif.examples.data.fine.py_hard as coded_fine_examples
>>> import tessif.transform.es2mapping.fine as post_process_fine
>>> # optimize the energy system:
>>> fine_es = coded_fine_examples.storage_example()
>>> print(post_process_fine.StorageResultier(fine_es).node_soc['Storage'])
1990-07-13 00:00:00 0.0
1990-07-13 01:00:00 0.0
1990-07-13 02:00:00 7.0
1990-07-13 03:00:00 0.0
1990-07-13 04:00:00 10.0
Freq: H, Name: Storage, dtype: float64
>>> print(post_process_fine.LoadResultier(fine_es).node_load['Storage'])
Storage Power Line Power Line
1990-07-13 00:00:00 -0.0 0.0
1990-07-13 01:00:00 -7.0 0.0
1990-07-13 02:00:00 -0.0 7.0
1990-07-13 03:00:00 -10.0 0.0
1990-07-13 04:00:00 -0.0 10.0
"""
es = self._re_indexing(optimized_es)
_socs = dict()
es_soc = dict()
index_list = list()
for mtype in es.componentModelingDict:
if mtype == "StorageModel":
if hasattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
):
if (
getattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
)
is not None
):
es_soc = getattr(
es.componentModelingDict[mtype],
"stateOfChargeOperationVariablesOptimum",
)
# Region Deleter:
for index_double in es_soc.index:
# Cut header to one level (avoid cutting to one letter with if statement)
if len(index_double) == 2:
index_list.append(str(index_double[0]))
else:
index_list.append(str(index_double))
es_soc.index = index_list
for sto_type in es_soc.index:
df = es_soc.loc[sto_type]
_socs[sto_type] = df
for k, v in _socs.items():
_socs[k] = round(v, 4)
return dict(_socs)
[docs]class NodeCategorizer(FINEResultier, base.NodeCategorizer):
"""
Categorizing the nodes of an optimized fine energy system.
Categorization utilizes :attr:`~tessif.frused.namedtuples.Uid`.
Nodes are categorized by:
- Energy :paramref:`component
<tessif.frused.namedtuples.Uid.component>`
(One of the 'Bus', 'Sink', etc..)
- Energy :paramref:`sector <tessif.frused.namedtuples.Uid.sector>`
('power', 'heat', 'mobility', 'coupled')
- :paramref:`Region <tessif.frused.namedtuples.Uid.region>`
('arbitrary label')
- :paramref:`Coordinates <tessif.frused.namedtuples.Uid.latitude>`
(latitude, longitude in degree)
- Energy :paramref:`carrier <tessif.frused.namedtuples.Uid.carrier>`
('solar', 'wind', 'electricity', 'steam' ...)
- :paramref:`Node type <tessif.frused.namedtuples.Uid.node_type>`
('arbitrary label')
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.NodeCategorizer>`.
Examples
--------
1. Display the energy system component's
:paramref:`Coordinates <tessif.frused.namedtuples.Uid.latitude>`:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_coordinates)
{'CBET': Coordinates(latitude=53, longitude=10),
'Demand': Coordinates(latitude=53, longitude=10),
'Gas Station': Coordinates(latitude=53, longitude=10),
'PowerLine': Coordinates(latitude=53, longitude=10),
'Renewable': Coordinates(latitude=53, longitude=10),
'Transformer': Coordinates(latitude=53, longitude=10)}
2. Group energy system components by their
:paramref:`~tessif.frused.namedtuples.Uid.region`:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_region_grouped)
{'Germany': ['PowerLine',
'CBET',
'Demand',
'Renewable',
'Gas Station',
'Transformer']}
3. Group energy system components by their
:paramref:`~tessif.frused.namedtuples.Uid.sector`
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_sector_grouped)
{'Power': ['PowerLine',
'CBET',
'Demand',
'Renewable',
'Gas Station',
'Transformer']}
4. Group energy system components by their
:paramref:`~tessif.frused.namedtuples.Uid.node_type`:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_type_grouped)
{'AC-bus': ['PowerLine'],
'AC-source': ['Renewable'],
'Gas import': ['Gas Station'],
'Gas-bus': ['CBET'],
'Gas-powerplant': ['Transformer'],
'Sink': ['Demand']}
5. Group energy system components by their energy
:paramref:`~tessif.frused.namedtuples.Uid.carrier`:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_carrier_grouped)
{'Electricity': ['PowerLine', 'Demand', 'Renewable'],
'Gas': ['CBET', 'Gas Station', 'Transformer']}
6. Map the `node uid representation <Labeling_Concept>` of each component
of the energy system to their energy
:paramref:`~tessif.frused.namedtuples.Uid.carrier` :
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.create_mwe())
>>> pprint.pprint(resultier.node_energy_carriers)
{'CBET': 'gas',
'Demand': 'Electricity',
'Gas Station': 'gas',
'PowerLine': 'Electricity',
'Renewable': 'Electricity',
'Transformer': 'gas'}
7. Map the `node uid representation <Labeling_Concept>` of each component
of the energy system to their
:paramref:`~tessif.frused.namedtuples.Uid.component`:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.NodeCategorizer(fine_examples.emission_objective())
>>> pprint.pprint(resultier.node_components)
{'Bus': ['Power Line', 'CBET'],
'Sink': ['Demand'],
'Source': ['CBE', 'Renewable'],
'Transformer': ['Transformer']}
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_node_components(self, optimized_es):
es = self._re_indexing(optimized_es)
_component_nodes = defaultdict(list)
# Map the respective sectors:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "component"):
cmp = getattr(optimized_es.uid_dict[node], "component")
_component_nodes[cmp.lower().capitalize()].append(str(node))
# Node has no component attributed in node
else:
_component_nodes["Unspecified"].append(str(node))
return dict(_component_nodes)
def _map_node_sectors(self, optimized_es):
es = self._re_indexing(optimized_es)
_sectored_nodes = defaultdict(list)
# Map the respective sectors:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "sector"):
sec = getattr(optimized_es.uid_dict[node], "sector")
_sectored_nodes[sec].append(str(node))
# Node has no sector attributed in node
else:
_sectored_nodes["Unspecified"].append(str(node))
return dict(_sectored_nodes)
def _map_node_regions(self, optimized_es):
es = self._re_indexing(optimized_es)
_regionalized_nodes = defaultdict(list)
# Map the respective regions:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "region"):
cmp = getattr(optimized_es.uid_dict[node], "region")
_regionalized_nodes[cmp.lower().capitalize()].append(str(node))
# Node has no region attributed in node
else:
_regionalized_nodes["Unspecified"].append(str(node))
return dict(_regionalized_nodes)
# noinspection PyTypeChecker
def _map_node_coordinates(self, optimized_es):
es = self._re_indexing(optimized_es)
# Use default dict as lat/lon strings container
_coordinates = defaultdict(list)
# Map the respective latitudes:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "latitude") and hasattr(
optimized_es.uid_dict[node], "longitude"
):
lat = getattr(optimized_es.uid_dict[node], "latitude")
lon = getattr(optimized_es.uid_dict[node], "longitude")
_coordinates[str(node)] = nts.Coordinates(lat, lon)
# Node has no lat/lon attributed in node.label
else:
_coordinates["Unspecified"].append(str(node))
return dict(_coordinates)
def _map_node_energy_carriers(self, optimized_es):
es = self._re_indexing(optimized_es)
# Use default dict as carrier strings container
_carrier_grouped_nodes = defaultdict(list)
_node_energy_carriers = defaultdict(str)
# Map the respective carriers:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "carrier"):
car = getattr(optimized_es.uid_dict[node], "carrier")
_carrier_grouped_nodes[car.lower().capitalize()].append(str(node))
# Node has no carrier attributed in node
else:
_carrier_grouped_nodes["Unspecified"].append(str(node))
_node_energy_carriers[str(node.label)] = "Unspecified"
return dict(_carrier_grouped_nodes), dict(_node_energy_carriers)
def _map_node_types(self, optimized_es):
es = self._re_indexing(optimized_es)
_typed_nodes = defaultdict(list)
# Map the respective types:
for node in es.uid_dict:
if hasattr(optimized_es.uid_dict[node], "node_type"):
ntype = getattr(optimized_es.uid_dict[node], "node_type")
_typed_nodes[ntype].append(str(node))
# Node has no type attributed in node
else:
_typed_nodes["Unspecified"].append(str(node))
return dict(_typed_nodes)
[docs]class FlowResultier(base.FlowResultier, LoadResultier):
"""
Flows2mapping
Transforming flow results into dictionairies keyed by edges.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
See also
--------
For functionality documentation see the respective :class:`base class
<tessif.transform.es2mapping.base.FlowResultier>`.
Examples
--------
1. Display the net energy flows of a small energy system:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_net_energy_flow)
{Edge(source='CBE', target='CBET'): 30.0,
Edge(source='CBET', target='Transformer'): 30.0,
Edge(source='Power Line', target='Demand'): 40.0,
Edge(source='Renewable', target='Power Line'): 27.4,
Edge(source='Transformer', target='Power Line'): 12.6}
2. Display the total costs incurred sorted by edge/flow:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_total_costs_incurred)
{Edge(source='CBE', target='CBET'): 0.0,
Edge(source='CBET', target='Transformer'): 0.0,
Edge(source='Power Line', target='Demand'): 0.0,
Edge(source='Renewable', target='Power Line'): 137.0,
Edge(source='Transformer', target='Power Line'): 60.0}
3. Display the total emissions caused sorted by edge/flow:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_total_emissions_caused)
{Edge(source='CBE', target='CBET'): 0.0,
Edge(source='CBET', target='Transformer'): 0.0,
Edge(source='Power Line', target='Demand'): 0.0,
Edge(source='Renewable', target='Power Line'): 0.0,
Edge(source='Transformer', target='Power Line'): 5.292}
4. Display the specific flow costs of this energy system:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_specific_flow_costs)
{Edge(source='CBE', target='CBET'): 0.0,
Edge(source='CBET', target='Transformer'): 0.0,
Edge(source='Power Line', target='Demand'): 0.0,
Edge(source='Renewable', target='Power Line'): 5.0,
Edge(source='Transformer', target='Power Line'): 4.761904761904762}
5. Display the specific emission of this energy system:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_specific_emissions)
{Edge(source='CBE', target='CBET'): 0.0,
Edge(source='CBET', target='Transformer'): 0.0,
Edge(source='Power Line', target='Demand'): 0.0,
Edge(source='Renewable', target='Power Line'): 0.0,
Edge(source='Transformer', target='Power Line'): 0.42}
6. Show the caluclated edge weights of this energy system:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> import pprint
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> pprint.pprint(resultier.edge_weight)
{Edge(source='CBE', target='CBET'): 0.1,
Edge(source='CBET', target='Transformer'): 0.1,
Edge(source='Power Line', target='Demand'): 0.1,
Edge(source='Renewable', target='Power Line'): 1.0,
Edge(source='Transformer', target='Power Line'): 0.9523809523809523}
7. Access the reference emissions and net energy flow:
>>> import tessif.examples.data.fine.py_hard as fine_examples
>>> from tessif.transform.es2mapping import fine
>>> resultier = fine.FlowResultier(fine_examples.emission_objective())
>>> print(resultier.edge_reference_emissions)
0.42
>>> print(resultier.edge_reference_net_energy_flow)
40.0
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
def _map_specific_flow_costs(self, optimized_es):
r"""Energy specific flow costs mapped to edges."""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
_specific_flow_costs = defaultdict(float)
if hasattr(es, "flow_costs"):
# Reshape flow costs and relate them to edges
for flow in es.flow_costs:
for edge in self.edges:
if flow == edge.source:
if isinstance(es.flow_costs[flow], abc.Iterable):
_specific_flow_costs[edge] = es.flow_costs[flow][
edge.target
]
else:
_specific_flow_costs[edge] = es.flow_costs[flow]
if flow == edge.target:
if edge.target in es.sinks:
_specific_flow_costs[edge] = es.flow_costs[flow]
else:
for mtype in es.componentModelingDict:
for node in es.componentModelingDict[mtype].componentsDict:
if getattr(cmp_input[node], "dimension") != "2dim":
if hasattr(cmp_input[node], "opexPerOperation"):
factor = float(
getattr(cmp_input[node], "opexPerOperation").values
)
if hasattr(
es.componentModelingDict[mtype].componentsDict[node],
"commodityConversionFactors",
):
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
conv_out = {
k: v
for k, v in commodity_factors.items()
if v > 0 and k not in spl.emissions
}
for grid in self.outbounds[node]:
grid_com = cmp_input[grid].commodity
if grid_com in conv_out:
if len(conv_out) > 1:
_specific_flow_costs[
nts.Edge(node, grid)
] = (
float(factor) / (conv_out[grid_com])
) / len(
conv_out
)
else:
_specific_flow_costs[
nts.Edge(node, grid)
] = (float(factor) / conv_out[grid_com])
else:
for outflow in self.outbounds[node]:
_specific_flow_costs[
nts.Edge(node, outflow)
] = float(factor)
elif hasattr(cmp_input[node], "opexPerDischargeOperation"):
factor = getattr(
cmp_input[node], "opexPerDischargeOperation"
).values
for outflow in self.outbounds[node]:
_specific_flow_costs[nts.Edge(node, outflow)] = float(
factor
)
else:
for edge in self.edges:
_specific_flow_costs[edge] = esn_defaults["flow_costs"]
return dict(_specific_flow_costs)
def _map_specific_emissions(self, optimized_es):
r"""Energy specific emissions mapped to edges."""
es = self._re_indexing(optimized_es)
cmp_input = self._cmp_input(es)
_specific_emissions = dict()
limit_com = None
if hasattr(es, "flow_emissions"):
# Reshape flow emissions and relate them to edges
for emissions in es.flow_emissions:
for edge in self.edges:
if emissions == edge.source:
if isinstance(es.flow_emissions[emissions], abc.Iterable):
_specific_emissions[edge] = es.flow_emissions[emissions][
edge.target
]
else:
_specific_emissions[edge] = es.flow_emissions[emissions]
else:
# Declare limiting commodity (string)
if hasattr(es, "emission_default"):
limit_com = es.emission_default
else:
for limit in cmp_input:
if hasattr(cmp_input[limit], "yearlyLimit"):
if getattr(cmp_input[limit], "yearlyLimit") is not None:
limit_com = cmp_input[limit].commodity
if limit_com:
# Emissions are only stored in conversion components which can be an emitting source as well
for node in self.nodes:
if hasattr(cmp_input[node], "commodityConversionFactors"):
commodity_factors = getattr(
cmp_input[node], "commodityConversionFactors"
)
for outflow in self.node_outflows[node]:
conv = cmp_input[outflow].commodity
_specific_emissions[nts.Edge(node, outflow)] = (
commodity_factors[limit_com] * commodity_factors[conv]
)
for edge in self.edges:
if edge not in _specific_emissions:
_specific_emissions[edge] = esn_defaults["emissions"]
return dict(_specific_emissions)
[docs]class AllResultier(CapacityResultier, FlowResultier, StorageResultier, ScaleResultier):
r"""
Transform energy system results into a dictionary keyed by attribute.
Incorporates all the functionalities from its bases.
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
Note
----
This class allows interfacing with **ALL** framework processing utilities.
It extracts every bit of info the author ever needed in his postprocessing.
It is meant to be a "one fits all" solution for small energy systems.
Perfectly fit for showing "proof of concepts" or debugging energy system
components.
**Not** meant to be used with **large energy systems**.
"""
def __init__(self, optimized_es, **kwargs):
super().__init__(optimized_es=optimized_es, **kwargs)
[docs]class ICRHybridier(FINEResultier, base.ICRHybridier):
"""
Aggregate numerical and visual information for visualizing
the :ref:`Integrated_Component_Results` (ICR).
Parameters
----------
optimized_es: fine energy system model
An optimized fine energy system containing its results
See also
--------
For non :ref:`model <SupportedModels>` specific attributes see
the respective :class:`base class
<tessif.transform.es2mapping.base.ICRHybridier>`.
"""
def __init__(self, optimized_es, colored_by="name", **kwargs):
base.ICRHybridier.__init__(
self,
optimized_es=optimized_es,
node_formatier=NodeFormatier(optimized_es, cgrp=colored_by),
edge_formatier=EdgeFormatier(optimized_es),
mpl_legend_formatier=MplLegendFormatier(optimized_es),
**kwargs
)
@property
def node_characteristic_value(self):
r"""Map node label to characteristic value.
Components of variable size have a characteristic value of ``None``.
The **node fillsize** of :ref:`integrated compnent results graphs
<Integrated_Component_Results>` scales with the
**characteristic value**.
If no capacity is defined (i.e for nodes of variable size, like busses
or excess sources and sinks, node size is set to it's default (
:attr:`nxgrph_visualize_defaults[node_fill_size]
<tessif.frused.defaults.nxgrph_visualize_defaults>`).
"""
return self._caps.node_characteristic_value
def _parse_commodity_factors(commodity_factors):
inflows, outflows = {}, {}
for key, value in commodity_factors.items():
if isinstance(value, abc.Iterable):
if any(entry < 0 for entry in value):
inflows[key] = value
else:
outflows[key] = value
else:
if value < 0:
inflows[key] = value
else:
outflows[key] = value
return inflows, outflows