diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f8f8e128..916aa812b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -258,6 +258,10 @@ flow_system.topology.set_component_colors({'Oranges': ['Solar1', 'Solar2']}) # flow_system.topology.set_component_colors('turbo', overwrite=False) # Only unset colors ``` +#### FlowContainer for Component Flows (#587) + +`Component.inputs`, `Component.outputs`, and `Component.flows` now use `FlowContainer` (dict-like) with dual access by index or label: `inputs[0]` or `inputs['Q_th']`. + ### 💥 Breaking Changes #### tsam v3 Migration @@ -296,6 +300,7 @@ fs.transform.cluster( #### Other Breaking Changes - `FlowSystem.scenario_weights` are now always normalized to sum to 1 when set (including after `.sel()` subsetting) +- `Component.inputs`/`outputs` and `Bus.inputs`/`outputs` are now `FlowContainer` (dict-like). Use `.values()` to iterate flows. ### ♻️ Changed diff --git a/flixopt/components.py b/flixopt/components.py index 20b4b6ae5..bff070d0d 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -811,7 +811,7 @@ class TransmissionModel(ComponentModel): def __init__(self, model: FlowSystemModel, element: Transmission): if (element.absolute_losses is not None) and np.any(element.absolute_losses != 0): - for flow in element.inputs + element.outputs: + for flow in element.flows.values(): if flow.status_parameters is None: flow.status_parameters = StatusParameters() flow.status_parameters.link_to_flow_system( @@ -877,8 +877,8 @@ def _do_modeling(self): # Create conversion factor constraints if specified if self.element.conversion_factors: - all_input_flows = set(self.element.inputs) - all_output_flows = set(self.element.outputs) + all_input_flows = set(self.element.inputs.values()) + all_output_flows = set(self.element.outputs.values()) # für alle linearen Gleichungen: for i, conv_factors in enumerate(self.element.conversion_factors): diff --git a/flixopt/elements.py b/flixopt/elements.py index 791596b28..446ef4bd7 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -20,6 +20,7 @@ from .structure import ( Element, ElementModel, + FlowContainer, FlowSystemModel, VariableCategory, register_class_for_io, @@ -89,23 +90,44 @@ class Component(Element): def __init__( self, label: str, - inputs: list[Flow] | None = None, - outputs: list[Flow] | None = None, + inputs: list[Flow] | dict[str, Flow] | None = None, + outputs: list[Flow] | dict[str, Flow] | None = None, status_parameters: StatusParameters | None = None, prevent_simultaneous_flows: list[Flow] | None = None, meta_data: dict | None = None, color: str | None = None, ): super().__init__(label, meta_data=meta_data, color=color) - self.inputs: list[Flow] = inputs or [] - self.outputs: list[Flow] = outputs or [] self.status_parameters = status_parameters self.prevent_simultaneous_flows: list[Flow] = prevent_simultaneous_flows or [] - self._check_unique_flow_labels() - self._connect_flows() + # Convert dict to list (for deserialization compatibility) + # FlowContainers serialize as dicts, but constructor expects lists + if isinstance(inputs, dict): + inputs = list(inputs.values()) + if isinstance(outputs, dict): + outputs = list(outputs.values()) + + # Use temporary lists, connect flows first (sets component name on flows), + # then create FlowContainers (which use label_full as key) + _inputs = inputs or [] + _outputs = outputs or [] + self._check_unique_flow_labels(_inputs, _outputs) + self._connect_flows(_inputs, _outputs) - self.flows: dict[str, Flow] = {flow.label: flow for flow in self.inputs + self.outputs} + # Create FlowContainers after connecting (so label_full is correct) + self.inputs: FlowContainer = FlowContainer(_inputs, element_type_name='inputs') + self.outputs: FlowContainer = FlowContainer(_outputs, element_type_name='outputs') + + @functools.cached_property + def flows(self) -> FlowContainer: + """All flows (inputs and outputs) as a FlowContainer. + + Supports access by label_full or short label: + component.flows['Boiler(Q_th)'] # Full label + component.flows['Q_th'] # Short label + """ + return self.inputs + self.outputs def create_model(self, model: FlowSystemModel) -> ComponentModel: self._plausibility_checks() @@ -120,18 +142,29 @@ def link_to_flow_system(self, flow_system, prefix: str = '') -> None: super().link_to_flow_system(flow_system, self.label_full) if self.status_parameters is not None: self.status_parameters.link_to_flow_system(flow_system, self._sub_prefix('status_parameters')) - for flow in self.inputs + self.outputs: + for flow in self.flows.values(): flow.link_to_flow_system(flow_system) def transform_data(self) -> None: if self.status_parameters is not None: self.status_parameters.transform_data() - for flow in self.inputs + self.outputs: + for flow in self.flows.values(): flow.transform_data() - def _check_unique_flow_labels(self): - all_flow_labels = [flow.label for flow in self.inputs + self.outputs] + def _check_unique_flow_labels(self, inputs: list[Flow] = None, outputs: list[Flow] = None): + """Check that all flow labels within a component are unique. + + Args: + inputs: List of input flows (optional, defaults to self.inputs) + outputs: List of output flows (optional, defaults to self.outputs) + """ + if inputs is None: + inputs = list(self.inputs.values()) + if outputs is None: + outputs = list(self.outputs.values()) + + all_flow_labels = [flow.label for flow in inputs + outputs] if len(set(all_flow_labels)) != len(all_flow_labels): duplicates = {label for label in all_flow_labels if all_flow_labels.count(label) > 1} @@ -143,7 +176,7 @@ def _plausibility_checks(self) -> None: # Component with status_parameters requires all flows to have sizes set # (status_parameters are propagated to flows in _do_modeling, which need sizes for big-M constraints) if self.status_parameters is not None: - flows_without_size = [flow.label for flow in self.inputs + self.outputs if flow.size is None] + flows_without_size = [flow.label for flow in self.flows.values() if flow.size is None] if flows_without_size: raise PlausibilityError( f'Component "{self.label_full}" has status_parameters, but the following flows have no size: ' @@ -151,9 +184,20 @@ def _plausibility_checks(self) -> None: f'(required for big-M constraints).' ) - def _connect_flows(self): + def _connect_flows(self, inputs: list[Flow] = None, outputs: list[Flow] = None): + """Connect flows to this component by setting component name and direction. + + Args: + inputs: List of input flows (optional, defaults to self.inputs) + outputs: List of output flows (optional, defaults to self.outputs) + """ + if inputs is None: + inputs = list(self.inputs.values()) + if outputs is None: + outputs = list(self.outputs.values()) + # Inputs - for flow in self.inputs: + for flow in inputs: if flow.component not in ('UnknownComponent', self.label_full): raise ValueError( f'Flow "{flow.label}" already assigned to component "{flow.component}". ' @@ -162,7 +206,7 @@ def _connect_flows(self): flow.component = self.label_full flow.is_input_in_component = True # Outputs - for flow in self.outputs: + for flow in outputs: if flow.component not in ('UnknownComponent', self.label_full): raise ValueError( f'Flow "{flow.label}" already assigned to component "{flow.component}". ' @@ -178,7 +222,7 @@ def _connect_flows(self): self.prevent_simultaneous_flows = [ f for f in self.prevent_simultaneous_flows if id(f) not in seen and not seen.add(id(f)) ] - local = set(self.inputs + self.outputs) + local = set(inputs + outputs) foreign = [f for f in self.prevent_simultaneous_flows if f not in local] if foreign: names = ', '.join(f.label_full for f in foreign) @@ -275,8 +319,13 @@ def __init__( self._validate_kwargs(kwargs) self.carrier = carrier.lower() if carrier else None # Store as lowercase string self.imbalance_penalty_per_flow_hour = imbalance_penalty_per_flow_hour - self.inputs: list[Flow] = [] - self.outputs: list[Flow] = [] + self.inputs: FlowContainer = FlowContainer(element_type_name='inputs') + self.outputs: FlowContainer = FlowContainer(element_type_name='outputs') + + @property + def flows(self) -> FlowContainer: + """All flows (inputs and outputs) as a FlowContainer.""" + return self.inputs + self.outputs def create_model(self, model: FlowSystemModel) -> BusModel: self._plausibility_checks() @@ -289,7 +338,7 @@ def link_to_flow_system(self, flow_system, prefix: str = '') -> None: Elements use their label_full as prefix by default, ignoring the passed prefix. """ super().link_to_flow_system(flow_system, self.label_full) - for flow in self.inputs + self.outputs: + for flow in self.flows.values(): flow.link_to_flow_system(flow_system) def transform_data(self) -> None: @@ -959,10 +1008,10 @@ def _do_modeling(self): """Create variables, constraints, and nested submodels""" super()._do_modeling() # inputs == outputs - for flow in self.element.inputs + self.element.outputs: + for flow in self.element.flows.values(): self.register_variable(flow.submodel.flow_rate, flow.label_full) - inputs = sum([flow.submodel.flow_rate for flow in self.element.inputs]) - outputs = sum([flow.submodel.flow_rate for flow in self.element.outputs]) + inputs = sum([flow.submodel.flow_rate for flow in self.element.inputs.values()]) + outputs = sum([flow.submodel.flow_rate for flow in self.element.outputs.values()]) eq_bus_balance = self.add_constraints(inputs == outputs, short_name='balance') # Add virtual supply/demand to balance and penalty if needed @@ -997,8 +1046,8 @@ def _do_modeling(self): ) def results_structure(self): - inputs = [flow.submodel.flow_rate.name for flow in self.element.inputs] - outputs = [flow.submodel.flow_rate.name for flow in self.element.outputs] + inputs = [flow.submodel.flow_rate.name for flow in self.element.inputs.values()] + outputs = [flow.submodel.flow_rate.name for flow in self.element.outputs.values()] if self.virtual_supply is not None: inputs.append(self.virtual_supply.name) if self.virtual_demand is not None: @@ -1007,7 +1056,7 @@ def results_structure(self): **super().results_structure(), 'inputs': inputs, 'outputs': outputs, - 'flows': [flow.label_full for flow in self.element.inputs + self.element.outputs], + 'flows': [flow.label_full for flow in self.element.flows.values()], } @@ -1022,7 +1071,7 @@ def _do_modeling(self): """Create variables, constraints, and nested submodels""" super()._do_modeling() - all_flows = self.element.inputs + self.element.outputs + all_flows = list(self.element.flows.values()) # Set status_parameters on flows if needed if self.element.status_parameters: @@ -1087,9 +1136,9 @@ def _do_modeling(self): def results_structure(self): return { **super().results_structure(), - 'inputs': [flow.submodel.flow_rate.name for flow in self.element.inputs], - 'outputs': [flow.submodel.flow_rate.name for flow in self.element.outputs], - 'flows': [flow.label_full for flow in self.element.inputs + self.element.outputs], + 'inputs': [flow.submodel.flow_rate.name for flow in self.element.inputs.values()], + 'outputs': [flow.submodel.flow_rate.name for flow in self.element.outputs.values()], + 'flows': [flow.label_full for flow in self.element.flows.values()], } @property @@ -1098,7 +1147,7 @@ def previous_status(self) -> xr.DataArray | None: if self.element.status_parameters is None: raise ValueError(f'StatusModel not present in \n{self}\nCant access previous_status') - previous_status = [flow.submodel.status._previous_status for flow in self.element.inputs + self.element.outputs] + previous_status = [flow.submodel.status._previous_status for flow in self.element.flows.values()] previous_status = [da for da in previous_status if da is not None] if not previous_status: # Empty list diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 5f8ddf1a2..13e97401b 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -968,7 +968,7 @@ def from_old_dataset(cls, path: str | pathlib.Path) -> FlowSystem: # Now previous_flow_rate=None means relaxed (no constraint at t=0) for comp in flow_system.components.values(): if getattr(comp, 'status_parameters', None) is not None: - for flow in comp.inputs + comp.outputs: + for flow in comp.flows.values(): if flow.previous_flow_rate is None: flow.previous_flow_rate = 0 @@ -1912,9 +1912,9 @@ def _add_buses(self, *buses: Bus): def _connect_network(self): """Connects the network of components and buses. Can be rerun without changes if no elements were added""" for component in self.components.values(): - for flow in component.inputs + component.outputs: + for flow in component.flows.values(): flow.component = component.label_full - flow.is_input_in_component = True if flow in component.inputs else False + flow.is_input_in_component = flow.label_full in component.inputs # Connect Buses bus = self.buses.get(flow.bus) @@ -1923,10 +1923,10 @@ def _connect_network(self): f'Bus {flow.bus} not found in the FlowSystem, but used by "{flow.label_full}". ' f'Please add it first.' ) - if flow.is_input_in_component and flow not in bus.outputs: - bus.outputs.append(flow) - elif not flow.is_input_in_component and flow not in bus.inputs: - bus.inputs.append(flow) + if flow.is_input_in_component and flow.label_full not in bus.outputs: + bus.outputs.add(flow) + elif not flow.is_input_in_component and flow.label_full not in bus.inputs: + bus.inputs.add(flow) # Count flows manually to avoid triggering cache rebuild flow_count = sum(len(c.inputs) + len(c.outputs) for c in self.components.values()) @@ -2010,7 +2010,7 @@ def _get_container_groups(self) -> dict[str, ElementContainer]: @property def flows(self) -> ElementContainer[Flow]: if self._flows_cache is None: - flows = [f for c in self.components.values() for f in c.inputs + c.outputs] + flows = [f for c in self.components.values() for f in c.flows.values()] # Deduplicate by id and sort for reproducibility flows = sorted({id(f): f for f in flows}.values(), key=lambda f: f.label_full.lower()) self._flows_cache = ElementContainer(flows, element_type_name='flows', truncate_repr=10) diff --git a/flixopt/io.py b/flixopt/io.py index caa063b83..33599f1c4 100644 --- a/flixopt/io.py +++ b/flixopt/io.py @@ -1392,12 +1392,12 @@ def format_flow_details(obj: Any, has_inputs: bool = True, has_outputs: bool = T if has_inputs and hasattr(obj, 'inputs') and obj.inputs: flow_lines.append(' inputs:') - for flow in obj.inputs: + for flow in obj.inputs.values(): flow_lines.append(f' * {repr(flow)}') if has_outputs and hasattr(obj, 'outputs') and obj.outputs: flow_lines.append(' outputs:') - for flow in obj.outputs: + for flow in obj.outputs.values(): flow_lines.append(f' * {repr(flow)}') return '\n' + '\n'.join(flow_lines) if flow_lines else '' diff --git a/flixopt/statistics_accessor.py b/flixopt/statistics_accessor.py index 55259b0ba..ac527bc6a 100644 --- a/flixopt/statistics_accessor.py +++ b/flixopt/statistics_accessor.py @@ -779,7 +779,7 @@ def get_effect_shares( if element not in self._fs.components: raise ValueError(f'Only use Components when retrieving Effects including flows. Got {element}') comp = self._fs.components[element] - flows = [f.label_full.split('|')[0] for f in comp.inputs + comp.outputs] + flows = [flow.split('|')[0] for flow in comp.flows] return xr.merge( [ds] + [ @@ -1509,8 +1509,8 @@ def balance( else: raise KeyError(f"'{node}' not found in buses or components") - input_labels = [f.label_full for f in element.inputs] - output_labels = [f.label_full for f in element.outputs] + input_labels = [f.label_full for f in element.inputs.values()] + output_labels = [f.label_full for f in element.outputs.values()] all_labels = input_labels + output_labels filtered_labels = _filter_by_pattern(all_labels, include, exclude) @@ -1617,9 +1617,9 @@ def carrier_balance( output_labels: list[str] = [] # Outputs from buses = consumption for bus in carrier_buses: - for flow in bus.inputs: + for flow in bus.inputs.values(): input_labels.append(flow.label_full) - for flow in bus.outputs: + for flow in bus.outputs.values(): output_labels.append(flow.label_full) all_labels = input_labels + output_labels @@ -2230,8 +2230,8 @@ def storage( raise ValueError(f"'{storage}' is not a storage (no charge_state variable found)") # Get flow data - input_labels = [f.label_full for f in component.inputs] - output_labels = [f.label_full for f in component.outputs] + input_labels = [f.label_full for f in component.inputs.values()] + output_labels = [f.label_full for f in component.outputs.values()] all_labels = input_labels + output_labels if unit == 'flow_rate': diff --git a/flixopt/structure.py b/flixopt/structure.py index 6d4040419..cc7a5fa72 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -1508,6 +1508,25 @@ def __repr__(self) -> str: """Return a string representation using the instance's truncate_repr setting.""" return self._get_repr() + def __add__(self, other: ContainerMixin[T]) -> ContainerMixin[T]: + """Concatenate two containers. + + Returns a new container of the same type containing elements from both containers. + Does not modify the original containers. + + Args: + other: Another container to concatenate + + Returns: + New container with elements from both containers + """ + result = self.__class__(element_type_name=self._element_type_name) + for element in self.values(): + result.add(element) + for element in other.values(): + result.add(element) + return result + class ElementContainer(ContainerMixin[T]): """ @@ -1533,6 +1552,95 @@ def _get_label(self, element: T) -> str: return element.label +class FlowContainer(ContainerMixin[T]): + """Container for Flow objects with dual access: by index or by label_full. + + Supports: + - container['Boiler(Q_th)'] # label_full-based access + - container['Q_th'] # short-label access (when all flows share same component) + - container[0] # index-based access + - container.add(flow) + - for flow in container.values() + - container1 + container2 # concatenation + + Examples: + >>> boiler = Boiler(label='Boiler', inputs=[Flow('Q_th', bus=heat_bus)]) + >>> boiler.inputs[0] # Index access + >>> boiler.inputs['Boiler(Q_th)'] # Full label access + >>> boiler.inputs['Q_th'] # Short label access (same component) + >>> for flow in boiler.inputs.values(): + ... print(flow.label_full) + """ + + def _get_label(self, flow: T) -> str: + """Extract label_full from Flow.""" + return flow.label_full + + def __getitem__(self, key: str | int) -> T: + """Get flow by label_full, short label, or index. + + Args: + key: Flow's label_full (string), short label (string), or index (int). + Short label access (e.g., 'Q_th' instead of 'Boiler(Q_th)') is only + supported when all flows in the container belong to the same component. + + Returns: + The Flow at the given key/index + + Raises: + KeyError: If string key not found + IndexError: If integer index out of range + """ + if isinstance(key, int): + # Index-based access: convert to list and index + try: + return list(self.values())[key] + except IndexError: + raise IndexError(f'Flow index {key} out of range (container has {len(self)} flows)') from None + + # Try exact label_full match first + if dict.__contains__(self, key): + return super().__getitem__(key) + + # Try short-label match if all flows share the same component + if len(self) > 0: + components = {flow.component for flow in self.values()} + if len(components) == 1: + component = next(iter(components)) + full_key = f'{component}({key})' + if dict.__contains__(self, full_key): + return super().__getitem__(full_key) + + # Key not found - raise with helpful message + raise KeyError(f"'{key}' not found in {self._element_type_name}") + + def __contains__(self, key: object) -> bool: + """Check if key exists (supports label_full or short label). + + Args: + key: Flow's label_full or short label + + Returns: + True if the key matches a flow in the container + """ + if not isinstance(key, str): + return False + + # Try exact label_full match first + if dict.__contains__(self, key): + return True + + # Try short-label match if all flows share the same component + if len(self) > 0: + components = {flow.component for flow in self.values()} + if len(components) == 1: + component = next(iter(components)) + full_key = f'{component}({key})' + return dict.__contains__(self, full_key) + + return False + + T_element = TypeVar('T_element') diff --git a/tests/deprecated/test_integration.py b/tests/deprecated/test_integration.py index 8ec23265e..e49c977bc 100644 --- a/tests/deprecated/test_integration.py +++ b/tests/deprecated/test_integration.py @@ -231,7 +231,7 @@ def test_piecewise_conversion(self, flow_system_piecewise_conversion, highs_solv [0, 0, 0, 45, 0, 0, 0, 0, 0], 'Kessel doesnt match expected value', ) - kwk_flows = {flow.label: flow for flow in comps['KWK'].inputs + comps['KWK'].outputs} + kwk_flows = {flow.label: flow for flow in (comps['KWK'].inputs + comps['KWK'].outputs).values()} assert_almost_equal_numeric( kwk_flows['Q_th'].submodel.flow_rate.solution.values, [45.0, 45.0, 64.5962087, 100.0, 61.3136, 45.0, 45.0, 12.86469565, 0.0],