Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,10 @@ flow_system.topology.set_component_colors({'Oranges': ['Solar1', 'Solar2']}) #
flow_system.topology.set_component_colors('turbo', overwrite=False) # Only unset colors
```

#### FlowContainer for Component Flows (#587)

`Component.inputs`, `Component.outputs`, and `Component.flows` now use `FlowContainer` (dict-like) with dual access by index or label: `inputs[0]` or `inputs['Q_th']`.

### 💥 Breaking Changes

#### tsam v3 Migration
Expand Down Expand Up @@ -296,6 +300,7 @@ fs.transform.cluster(
#### Other Breaking Changes

- `FlowSystem.scenario_weights` are now always normalized to sum to 1 when set (including after `.sel()` subsetting)
- `Component.inputs`/`outputs` and `Bus.inputs`/`outputs` are now `FlowContainer` (dict-like). Use `.values()` to iterate flows.

### ♻️ Changed

Expand Down
6 changes: 3 additions & 3 deletions flixopt/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,7 +811,7 @@ class TransmissionModel(ComponentModel):

def __init__(self, model: FlowSystemModel, element: Transmission):
if (element.absolute_losses is not None) and np.any(element.absolute_losses != 0):
for flow in element.inputs + element.outputs:
for flow in element.flows.values():
if flow.status_parameters is None:
flow.status_parameters = StatusParameters()
flow.status_parameters.link_to_flow_system(
Expand Down Expand Up @@ -877,8 +877,8 @@ def _do_modeling(self):

# Create conversion factor constraints if specified
if self.element.conversion_factors:
all_input_flows = set(self.element.inputs)
all_output_flows = set(self.element.outputs)
all_input_flows = set(self.element.inputs.values())
all_output_flows = set(self.element.outputs.values())

# für alle linearen Gleichungen:
for i, conv_factors in enumerate(self.element.conversion_factors):
Expand Down
109 changes: 79 additions & 30 deletions flixopt/elements.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from .structure import (
Element,
ElementModel,
FlowContainer,
FlowSystemModel,
VariableCategory,
register_class_for_io,
Expand Down Expand Up @@ -89,23 +90,44 @@ class Component(Element):
def __init__(
self,
label: str,
inputs: list[Flow] | None = None,
outputs: list[Flow] | None = None,
inputs: list[Flow] | dict[str, Flow] | None = None,
outputs: list[Flow] | dict[str, Flow] | None = None,
status_parameters: StatusParameters | None = None,
prevent_simultaneous_flows: list[Flow] | None = None,
meta_data: dict | None = None,
color: str | None = None,
):
super().__init__(label, meta_data=meta_data, color=color)
self.inputs: list[Flow] = inputs or []
self.outputs: list[Flow] = outputs or []
self.status_parameters = status_parameters
self.prevent_simultaneous_flows: list[Flow] = prevent_simultaneous_flows or []

self._check_unique_flow_labels()
self._connect_flows()
# Convert dict to list (for deserialization compatibility)
# FlowContainers serialize as dicts, but constructor expects lists
if isinstance(inputs, dict):
inputs = list(inputs.values())
if isinstance(outputs, dict):
outputs = list(outputs.values())

# Use temporary lists, connect flows first (sets component name on flows),
# then create FlowContainers (which use label_full as key)
_inputs = inputs or []
_outputs = outputs or []
self._check_unique_flow_labels(_inputs, _outputs)
self._connect_flows(_inputs, _outputs)

self.flows: dict[str, Flow] = {flow.label: flow for flow in self.inputs + self.outputs}
# Create FlowContainers after connecting (so label_full is correct)
self.inputs: FlowContainer = FlowContainer(_inputs, element_type_name='inputs')
self.outputs: FlowContainer = FlowContainer(_outputs, element_type_name='outputs')

@functools.cached_property
def flows(self) -> FlowContainer:
"""All flows (inputs and outputs) as a FlowContainer.

Supports access by label_full or short label:
component.flows['Boiler(Q_th)'] # Full label
component.flows['Q_th'] # Short label
"""
return self.inputs + self.outputs

def create_model(self, model: FlowSystemModel) -> ComponentModel:
self._plausibility_checks()
Expand All @@ -120,18 +142,29 @@ def link_to_flow_system(self, flow_system, prefix: str = '') -> None:
super().link_to_flow_system(flow_system, self.label_full)
if self.status_parameters is not None:
self.status_parameters.link_to_flow_system(flow_system, self._sub_prefix('status_parameters'))
for flow in self.inputs + self.outputs:
for flow in self.flows.values():
flow.link_to_flow_system(flow_system)

def transform_data(self) -> None:
if self.status_parameters is not None:
self.status_parameters.transform_data()

for flow in self.inputs + self.outputs:
for flow in self.flows.values():
flow.transform_data()

def _check_unique_flow_labels(self):
all_flow_labels = [flow.label for flow in self.inputs + self.outputs]
def _check_unique_flow_labels(self, inputs: list[Flow] = None, outputs: list[Flow] = None):
"""Check that all flow labels within a component are unique.

Args:
inputs: List of input flows (optional, defaults to self.inputs)
outputs: List of output flows (optional, defaults to self.outputs)
"""
if inputs is None:
inputs = list(self.inputs.values())
if outputs is None:
outputs = list(self.outputs.values())

all_flow_labels = [flow.label for flow in inputs + outputs]

if len(set(all_flow_labels)) != len(all_flow_labels):
duplicates = {label for label in all_flow_labels if all_flow_labels.count(label) > 1}
Expand All @@ -143,17 +176,28 @@ def _plausibility_checks(self) -> None:
# Component with status_parameters requires all flows to have sizes set
# (status_parameters are propagated to flows in _do_modeling, which need sizes for big-M constraints)
if self.status_parameters is not None:
flows_without_size = [flow.label for flow in self.inputs + self.outputs if flow.size is None]
flows_without_size = [flow.label for flow in self.flows.values() if flow.size is None]
if flows_without_size:
raise PlausibilityError(
f'Component "{self.label_full}" has status_parameters, but the following flows have no size: '
f'{flows_without_size}. All flows need explicit sizes when the component uses status_parameters '
f'(required for big-M constraints).'
)

def _connect_flows(self):
def _connect_flows(self, inputs: list[Flow] = None, outputs: list[Flow] = None):
"""Connect flows to this component by setting component name and direction.

Args:
inputs: List of input flows (optional, defaults to self.inputs)
outputs: List of output flows (optional, defaults to self.outputs)
"""
if inputs is None:
inputs = list(self.inputs.values())
if outputs is None:
outputs = list(self.outputs.values())

# Inputs
for flow in self.inputs:
for flow in inputs:
if flow.component not in ('UnknownComponent', self.label_full):
raise ValueError(
f'Flow "{flow.label}" already assigned to component "{flow.component}". '
Expand All @@ -162,7 +206,7 @@ def _connect_flows(self):
flow.component = self.label_full
flow.is_input_in_component = True
# Outputs
for flow in self.outputs:
for flow in outputs:
if flow.component not in ('UnknownComponent', self.label_full):
raise ValueError(
f'Flow "{flow.label}" already assigned to component "{flow.component}". '
Expand All @@ -178,7 +222,7 @@ def _connect_flows(self):
self.prevent_simultaneous_flows = [
f for f in self.prevent_simultaneous_flows if id(f) not in seen and not seen.add(id(f))
]
local = set(self.inputs + self.outputs)
local = set(inputs + outputs)
foreign = [f for f in self.prevent_simultaneous_flows if f not in local]
if foreign:
names = ', '.join(f.label_full for f in foreign)
Expand Down Expand Up @@ -275,8 +319,13 @@ def __init__(
self._validate_kwargs(kwargs)
self.carrier = carrier.lower() if carrier else None # Store as lowercase string
self.imbalance_penalty_per_flow_hour = imbalance_penalty_per_flow_hour
self.inputs: list[Flow] = []
self.outputs: list[Flow] = []
self.inputs: FlowContainer = FlowContainer(element_type_name='inputs')
self.outputs: FlowContainer = FlowContainer(element_type_name='outputs')

@property
def flows(self) -> FlowContainer:
"""All flows (inputs and outputs) as a FlowContainer."""
return self.inputs + self.outputs

def create_model(self, model: FlowSystemModel) -> BusModel:
self._plausibility_checks()
Expand All @@ -289,7 +338,7 @@ def link_to_flow_system(self, flow_system, prefix: str = '') -> None:
Elements use their label_full as prefix by default, ignoring the passed prefix.
"""
super().link_to_flow_system(flow_system, self.label_full)
for flow in self.inputs + self.outputs:
for flow in self.flows.values():
flow.link_to_flow_system(flow_system)

def transform_data(self) -> None:
Expand Down Expand Up @@ -959,10 +1008,10 @@ def _do_modeling(self):
"""Create variables, constraints, and nested submodels"""
super()._do_modeling()
# inputs == outputs
for flow in self.element.inputs + self.element.outputs:
for flow in self.element.flows.values():
self.register_variable(flow.submodel.flow_rate, flow.label_full)
inputs = sum([flow.submodel.flow_rate for flow in self.element.inputs])
outputs = sum([flow.submodel.flow_rate for flow in self.element.outputs])
inputs = sum([flow.submodel.flow_rate for flow in self.element.inputs.values()])
outputs = sum([flow.submodel.flow_rate for flow in self.element.outputs.values()])
eq_bus_balance = self.add_constraints(inputs == outputs, short_name='balance')

# Add virtual supply/demand to balance and penalty if needed
Expand Down Expand Up @@ -997,8 +1046,8 @@ def _do_modeling(self):
)

def results_structure(self):
inputs = [flow.submodel.flow_rate.name for flow in self.element.inputs]
outputs = [flow.submodel.flow_rate.name for flow in self.element.outputs]
inputs = [flow.submodel.flow_rate.name for flow in self.element.inputs.values()]
outputs = [flow.submodel.flow_rate.name for flow in self.element.outputs.values()]
if self.virtual_supply is not None:
inputs.append(self.virtual_supply.name)
if self.virtual_demand is not None:
Expand All @@ -1007,7 +1056,7 @@ def results_structure(self):
**super().results_structure(),
'inputs': inputs,
'outputs': outputs,
'flows': [flow.label_full for flow in self.element.inputs + self.element.outputs],
'flows': [flow.label_full for flow in self.element.flows.values()],
}


Expand All @@ -1022,7 +1071,7 @@ def _do_modeling(self):
"""Create variables, constraints, and nested submodels"""
super()._do_modeling()

all_flows = self.element.inputs + self.element.outputs
all_flows = list(self.element.flows.values())

# Set status_parameters on flows if needed
if self.element.status_parameters:
Expand Down Expand Up @@ -1087,9 +1136,9 @@ def _do_modeling(self):
def results_structure(self):
return {
**super().results_structure(),
'inputs': [flow.submodel.flow_rate.name for flow in self.element.inputs],
'outputs': [flow.submodel.flow_rate.name for flow in self.element.outputs],
'flows': [flow.label_full for flow in self.element.inputs + self.element.outputs],
'inputs': [flow.submodel.flow_rate.name for flow in self.element.inputs.values()],
'outputs': [flow.submodel.flow_rate.name for flow in self.element.outputs.values()],
'flows': [flow.label_full for flow in self.element.flows.values()],
}

@property
Expand All @@ -1098,7 +1147,7 @@ def previous_status(self) -> xr.DataArray | None:
if self.element.status_parameters is None:
raise ValueError(f'StatusModel not present in \n{self}\nCant access previous_status')

previous_status = [flow.submodel.status._previous_status for flow in self.element.inputs + self.element.outputs]
previous_status = [flow.submodel.status._previous_status for flow in self.element.flows.values()]
previous_status = [da for da in previous_status if da is not None]

if not previous_status: # Empty list
Expand Down
16 changes: 8 additions & 8 deletions flixopt/flow_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ def from_old_dataset(cls, path: str | pathlib.Path) -> FlowSystem:
# Now previous_flow_rate=None means relaxed (no constraint at t=0)
for comp in flow_system.components.values():
if getattr(comp, 'status_parameters', None) is not None:
for flow in comp.inputs + comp.outputs:
for flow in comp.flows.values():
if flow.previous_flow_rate is None:
flow.previous_flow_rate = 0

Expand Down Expand Up @@ -1912,9 +1912,9 @@ def _add_buses(self, *buses: Bus):
def _connect_network(self):
"""Connects the network of components and buses. Can be rerun without changes if no elements were added"""
for component in self.components.values():
for flow in component.inputs + component.outputs:
for flow in component.flows.values():
flow.component = component.label_full
flow.is_input_in_component = True if flow in component.inputs else False
flow.is_input_in_component = flow.label_full in component.inputs

# Connect Buses
bus = self.buses.get(flow.bus)
Expand All @@ -1923,10 +1923,10 @@ def _connect_network(self):
f'Bus {flow.bus} not found in the FlowSystem, but used by "{flow.label_full}". '
f'Please add it first.'
)
if flow.is_input_in_component and flow not in bus.outputs:
bus.outputs.append(flow)
elif not flow.is_input_in_component and flow not in bus.inputs:
bus.inputs.append(flow)
if flow.is_input_in_component and flow.label_full not in bus.outputs:
bus.outputs.add(flow)
elif not flow.is_input_in_component and flow.label_full not in bus.inputs:
bus.inputs.add(flow)

# Count flows manually to avoid triggering cache rebuild
flow_count = sum(len(c.inputs) + len(c.outputs) for c in self.components.values())
Expand Down Expand Up @@ -2010,7 +2010,7 @@ def _get_container_groups(self) -> dict[str, ElementContainer]:
@property
def flows(self) -> ElementContainer[Flow]:
if self._flows_cache is None:
flows = [f for c in self.components.values() for f in c.inputs + c.outputs]
flows = [f for c in self.components.values() for f in c.flows.values()]
# Deduplicate by id and sort for reproducibility
flows = sorted({id(f): f for f in flows}.values(), key=lambda f: f.label_full.lower())
self._flows_cache = ElementContainer(flows, element_type_name='flows', truncate_repr=10)
Expand Down
4 changes: 2 additions & 2 deletions flixopt/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -1392,12 +1392,12 @@ def format_flow_details(obj: Any, has_inputs: bool = True, has_outputs: bool = T

if has_inputs and hasattr(obj, 'inputs') and obj.inputs:
flow_lines.append(' inputs:')
for flow in obj.inputs:
for flow in obj.inputs.values():
flow_lines.append(f' * {repr(flow)}')

if has_outputs and hasattr(obj, 'outputs') and obj.outputs:
flow_lines.append(' outputs:')
for flow in obj.outputs:
for flow in obj.outputs.values():
flow_lines.append(f' * {repr(flow)}')

return '\n' + '\n'.join(flow_lines) if flow_lines else ''
Expand Down
14 changes: 7 additions & 7 deletions flixopt/statistics_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -779,7 +779,7 @@ def get_effect_shares(
if element not in self._fs.components:
raise ValueError(f'Only use Components when retrieving Effects including flows. Got {element}')
comp = self._fs.components[element]
flows = [f.label_full.split('|')[0] for f in comp.inputs + comp.outputs]
flows = [flow.split('|')[0] for flow in comp.flows]
return xr.merge(
[ds]
+ [
Expand Down Expand Up @@ -1509,8 +1509,8 @@ def balance(
else:
raise KeyError(f"'{node}' not found in buses or components")

input_labels = [f.label_full for f in element.inputs]
output_labels = [f.label_full for f in element.outputs]
input_labels = [f.label_full for f in element.inputs.values()]
output_labels = [f.label_full for f in element.outputs.values()]
all_labels = input_labels + output_labels

filtered_labels = _filter_by_pattern(all_labels, include, exclude)
Expand Down Expand Up @@ -1617,9 +1617,9 @@ def carrier_balance(
output_labels: list[str] = [] # Outputs from buses = consumption

for bus in carrier_buses:
for flow in bus.inputs:
for flow in bus.inputs.values():
input_labels.append(flow.label_full)
for flow in bus.outputs:
for flow in bus.outputs.values():
output_labels.append(flow.label_full)

all_labels = input_labels + output_labels
Expand Down Expand Up @@ -2230,8 +2230,8 @@ def storage(
raise ValueError(f"'{storage}' is not a storage (no charge_state variable found)")

# Get flow data
input_labels = [f.label_full for f in component.inputs]
output_labels = [f.label_full for f in component.outputs]
input_labels = [f.label_full for f in component.inputs.values()]
output_labels = [f.label_full for f in component.outputs.values()]
all_labels = input_labels + output_labels

if unit == 'flow_rate':
Expand Down
Loading