diff --git a/CHANGELOG.md b/CHANGELOG.md index 68aaa8a60..2f5414adb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,12 +59,66 @@ If upgrading from v2.x, see the [v3.0.0 release notes](https://github.com/flixOp ### πŸ’₯ Breaking Changes +**Class and module renaming:** +- `FullCalculation` β†’ `Optimization` +- `AggregatedCalculation` β†’ `ClusteredOptimization` +- `SegmentedCalculation` β†’ `SegmentedOptimization` +- `CalculationResults` β†’ `Results` +- `SegmentedCalculationResults` β†’ `SegmentedResults` +- `Aggregation` β†’ `Clustering` +- `AggregationParameters` β†’ `ClusteringParameters` +- `AggregationModel` β†’ `ClusteringModel` +- Module: `calculation.py` β†’ `optimization.py` +- Module: `aggregation.py` β†’ `clustering.py` + +Old names remain available with deprecation warnings (removed in v5.0.0). + + ### ♻️ Changed ### πŸ—‘οΈ Deprecated ### πŸ”₯ Removed +**Deprecated parameters removed** (all were deprecated in v4.0.0 or earlier): + +**TimeSeriesData:** +- `agg_group` β†’ use `aggregation_group` +- `agg_weight` β†’ use `aggregation_weight` +- Properties: `agg_group`, `agg_weight` + +**Effect:** +- Constructor parameters: `minimum_operation` β†’ use `minimum_temporal`, `maximum_operation` β†’ use `maximum_temporal`, `minimum_invest` β†’ use `minimum_periodic`, `maximum_invest` β†’ use `maximum_periodic`, `minimum_operation_per_hour` β†’ use `minimum_per_hour`, `maximum_operation_per_hour` β†’ use `maximum_per_hour` +- Properties: `minimum_operation`, `maximum_operation`, `minimum_invest`, `maximum_invest`, `minimum_operation_per_hour`, `maximum_operation_per_hour`, `minimum_total_per_period`, `maximum_total_per_period` + +**Flow:** +- Constructor parameters: `flow_hours_per_period_max` β†’ use `flow_hours_max`, `flow_hours_per_period_min` β†’ use `flow_hours_min`, `flow_hours_total_max` β†’ use `flow_hours_max`, `flow_hours_total_min` β†’ use `flow_hours_min`, `total_flow_hours_max` β†’ use `flow_hours_max_over_periods`, `total_flow_hours_min` β†’ use `flow_hours_min_over_periods` +- Properties: `flow_hours_total_max`, `flow_hours_total_min` + +**InvestParameters:** +- Constructor parameters: `fix_effects` β†’ use `effects_of_investment`, `specific_effects` β†’ use `effects_of_investment_per_size`, `divest_effects` β†’ use `effects_of_retirement`, `piecewise_effects` β†’ use `piecewise_effects_of_investment`, `optional` β†’ use `mandatory` (with inverted logic) +- Properties: `optional`, `fix_effects`, `specific_effects`, `divest_effects`, `piecewise_effects` + +**OnOffParameters:** +- Constructor parameters: `on_hours_total_min` β†’ use `on_hours_min`, `on_hours_total_max` β†’ use `on_hours_max`, `switch_on_total_max` β†’ use `switch_on_max` + +**Storage:** +- `initial_charge_state="lastValueOfSim"` β†’ use `initial_charge_state="equals_final"` + +**Source, Sink, SourceAndSink:** +- Constructor parameters: + - Source: `source` β†’ use `outputs` + - Sink: `sink` β†’ use `inputs` + - SourceAndSink: `source` β†’ use `outputs`, `sink` β†’ use `inputs`, `prevent_simultaneous_sink_and_source` β†’ use `prevent_simultaneous_flow_rates` +- Properties: + - Source: `source` property + - Sink: `sink` property + - SourceAndSink: `source`, `sink`, `prevent_simultaneous_sink_and_source` properties + +**Linear Converters** (Boiler, CHP, HeatPump, etc.): +- Flow parameters: `Q_fu` β†’ use `fuel_flow`, `P_el` β†’ use `electrical_flow`, `Q_th` β†’ use `thermal_flow`, `Q_ab` β†’ use `heat_source_flow` +- Efficiency parameters: `eta` β†’ use `thermal_efficiency`, `eta_th` β†’ use `thermal_efficiency`, `eta_el` β†’ use `electrical_efficiency`, `COP` β†’ use `cop` + ### πŸ› Fixed ### πŸ”’ Security diff --git a/flixopt/__init__.py b/flixopt/__init__.py index a55a57b3f..e7d314017 100644 --- a/flixopt/__init__.py +++ b/flixopt/__init__.py @@ -14,8 +14,10 @@ # Import commonly used classes and functions from . import linear_converters, plotting, results, solvers -from .aggregation import AggregationParameters + +# Import old Calculation classes for backwards compatibility (deprecated) from .calculation import AggregatedCalculation, FullCalculation, SegmentedCalculation +from .clustering import AggregationParameters, ClusteringParameters # AggregationParameters is deprecated from .components import ( LinearConverter, Sink, @@ -31,6 +33,9 @@ from .flow_system import FlowSystem from .interface import InvestParameters, OnOffParameters, Piece, Piecewise, PiecewiseConversion, PiecewiseEffects +# Import new Optimization classes +from .optimization import ClusteredOptimization, Optimization, SegmentedOptimization + __all__ = [ 'TimeSeriesData', 'CONFIG', @@ -45,16 +50,22 @@ 'LinearConverter', 'Transmission', 'FlowSystem', + # New Optimization classes (preferred) + 'Optimization', + 'ClusteredOptimization', + 'SegmentedOptimization', + # Old Calculation classes (deprecated, for backwards compatibility) 'FullCalculation', - 'SegmentedCalculation', 'AggregatedCalculation', + 'SegmentedCalculation', 'InvestParameters', 'OnOffParameters', 'Piece', 'Piecewise', 'PiecewiseConversion', 'PiecewiseEffects', - 'AggregationParameters', + 'ClusteringParameters', + 'AggregationParameters', # Deprecated, use ClusteringParameters 'plotting', 'results', 'linear_converters', diff --git a/flixopt/calculation.py b/flixopt/calculation.py index ee6742c22..0552af825 100644 --- a/flixopt/calculation.py +++ b/flixopt/calculation.py @@ -1,49 +1,59 @@ """ -This module contains the Calculation functionality for the flixopt framework. -It is used to calculate a FlowSystemModel for a given FlowSystem through a solver. -There are three different Calculation types: - 1. FullCalculation: Calculates the FlowSystemModel for the full FlowSystem - 2. AggregatedCalculation: Calculates the FlowSystemModel for the full FlowSystem, but aggregates the TimeSeriesData. - This simplifies the mathematical model and usually speeds up the solving process. - 3. SegmentedCalculation: Solves a FlowSystemModel for each individual Segment of the FlowSystem. +This module provides backwards-compatible aliases for the renamed Optimization classes. + +DEPRECATED: This module is deprecated. Use the optimization module instead. +The following classes have been renamed: + - Calculation -> Optimization + - FullCalculation -> Optimization (now the standard, no "Full" prefix) + - AggregatedCalculation -> ClusteredOptimization + - SegmentedCalculation -> SegmentedOptimization + +Import from flixopt.optimization or use the new names from flixopt directly. """ from __future__ import annotations import logging -import math -import pathlib -import sys -import timeit import warnings -from collections import Counter -from typing import TYPE_CHECKING, Annotated, Any - -import numpy as np -from tqdm import tqdm - -from . import io as fx_io -from .aggregation import Aggregation, AggregationModel, AggregationParameters -from .components import Storage -from .config import CONFIG -from .core import DataConverter, TimeSeriesData, drop_constant_arrays -from .features import InvestmentModel -from .flow_system import FlowSystem -from .results import CalculationResults, SegmentedCalculationResults +from typing import TYPE_CHECKING + +from .core import DEPRECATION_REMOVAL_VERSION +from .optimization import ( + ClusteredOptimization as _ClusteredOptimization, +) +from .optimization import ( + Optimization as _Optimization, +) +from .optimization import ( + SegmentedOptimization as _SegmentedOptimization, +) if TYPE_CHECKING: + import pathlib + from typing import Annotated + import pandas as pd - import xarray as xr + from .clustering import AggregationParameters from .elements import Component - from .solvers import _Solver - from .structure import FlowSystemModel + from .flow_system import FlowSystem logger = logging.getLogger('flixopt') -class Calculation: +def _deprecation_warning(old_name: str, new_name: str): + """Issue a deprecation warning for renamed classes.""" + warnings.warn( + f'{old_name} is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. Use {new_name} instead.', + DeprecationWarning, + stacklevel=3, + ) + + +class Calculation(_Optimization): """ + DEPRECATED: Use Optimization instead. + class for defined way of solving a flow_system optimization Args: @@ -54,8 +64,6 @@ class for defined way of solving a flow_system optimization active_timesteps: Deprecated. Use FlowSystem.sel(time=...) or FlowSystem.isel(time=...) instead. """ - model: FlowSystemModel | None - def __init__( self, name: str, @@ -67,114 +75,14 @@ def __init__( folder: pathlib.Path | None = None, normalize_weights: bool = True, ): - self.name = name - if flow_system.used_in_calculation: - logger.warning( - f'This FlowSystem is already used in a calculation:\n{flow_system}\n' - f'Creating a copy of the FlowSystem for Calculation "{self.name}".' - ) - flow_system = flow_system.copy() - - if active_timesteps is not None: - warnings.warn( - "The 'active_timesteps' parameter is deprecated and will be removed in a future version. " - 'Use flow_system.sel(time=timesteps) or flow_system.isel(time=indices) before passing ' - 'the FlowSystem to the Calculation instead.', - DeprecationWarning, - stacklevel=2, - ) - flow_system = flow_system.sel(time=active_timesteps) - self._active_timesteps = active_timesteps # deprecated - self.normalize_weights = normalize_weights - - flow_system._used_in_calculation = True - - self.flow_system = flow_system - self.model = None - - self.durations = {'modeling': 0.0, 'solving': 0.0, 'saving': 0.0} - self.folder = pathlib.Path.cwd() / 'results' if folder is None else pathlib.Path(folder) - self.results: CalculationResults | None = None - - if self.folder.exists() and not self.folder.is_dir(): - raise NotADirectoryError(f'Path {self.folder} exists and is not a directory.') - self.folder.mkdir(parents=False, exist_ok=True) - - @property - def main_results(self) -> dict[str, int | float | dict]: - from flixopt.features import InvestmentModel - - main_results = { - 'Objective': self.model.objective.value, - 'Penalty': self.model.effects.penalty.total.solution.values, - 'Effects': { - f'{effect.label} [{effect.unit}]': { - 'temporal': effect.submodel.temporal.total.solution.values, - 'periodic': effect.submodel.periodic.total.solution.values, - 'total': effect.submodel.total.solution.values, - } - for effect in sorted(self.flow_system.effects.values(), key=lambda e: e.label_full.upper()) - }, - 'Invest-Decisions': { - 'Invested': { - model.label_of_element: model.size.solution - for component in self.flow_system.components.values() - for model in component.submodel.all_submodels - if isinstance(model, InvestmentModel) and model.size.solution.max() >= CONFIG.Modeling.epsilon - }, - 'Not invested': { - model.label_of_element: model.size.solution - for component in self.flow_system.components.values() - for model in component.submodel.all_submodels - if isinstance(model, InvestmentModel) and model.size.solution.max() < CONFIG.Modeling.epsilon - }, - }, - 'Buses with excess': [ - { - bus.label_full: { - 'input': bus.submodel.excess_input.solution.sum('time'), - 'output': bus.submodel.excess_output.solution.sum('time'), - } - } - for bus in self.flow_system.buses.values() - if bus.with_excess - and ( - bus.submodel.excess_input.solution.sum() > 1e-3 or bus.submodel.excess_output.solution.sum() > 1e-3 - ) - ], - } - - return fx_io.round_nested_floats(main_results) - - @property - def summary(self): - return { - 'Name': self.name, - 'Number of timesteps': len(self.flow_system.timesteps), - 'Calculation Type': self.__class__.__name__, - 'Constraints': self.model.constraints.ncons, - 'Variables': self.model.variables.nvars, - 'Main Results': self.main_results, - 'Durations': self.durations, - 'Config': CONFIG.to_dict(), - } - - @property - def active_timesteps(self) -> pd.DatetimeIndex: - warnings.warn( - 'active_timesteps is deprecated. Use flow_system.sel(time=...) or flow_system.isel(time=...) instead.', - DeprecationWarning, - stacklevel=2, - ) - return self._active_timesteps - - @property - def modeled(self) -> bool: - return True if self.model is not None else False - - -class FullCalculation(Calculation): + _deprecation_warning('Calculation', 'Optimization') + super().__init__(name, flow_system, active_timesteps, folder, normalize_weights) + + +class FullCalculation(_Optimization): """ + DEPRECATED: Use Optimization instead (the "Full" prefix has been removed). + FullCalculation solves the complete optimization problem using all time steps. This is the most comprehensive calculation type that considers every time step @@ -188,97 +96,30 @@ class FullCalculation(Calculation): active_timesteps: Deprecated. Use FlowSystem.sel(time=...) or FlowSystem.isel(time=...) instead. """ - def do_modeling(self) -> FullCalculation: - t_start = timeit.default_timer() - self.flow_system.connect_and_transform() - - self.model = self.flow_system.create_model(self.normalize_weights) - self.model.do_modeling() - - self.durations['modeling'] = round(timeit.default_timer() - t_start, 2) - return self - - def fix_sizes(self, ds: xr.Dataset, decimal_rounding: int | None = 5) -> FullCalculation: - """Fix the sizes of the calculations to specified values. - - Args: - ds: The dataset that contains the variable names mapped to their sizes. If None, the dataset is loaded from the results. - decimal_rounding: The number of decimal places to round the sizes to. If no rounding is applied, numerical errors might lead to infeasibility. - """ - if not self.modeled: - raise RuntimeError('Model was not created. Call do_modeling() first.') - if decimal_rounding is not None: - ds = ds.round(decimal_rounding) - - for name, da in ds.data_vars.items(): - if '|size' not in name: - continue - if name not in self.model.variables: - logger.debug(f'Variable {name} not found in calculation model. Skipping.') - continue - - con = self.model.add_constraints( - self.model[name] == da, - name=f'{name}-fixed', - ) - logger.debug(f'Fixed "{name}":\n{con}') - - return self - - def solve( - self, solver: _Solver, log_file: pathlib.Path | None = None, log_main_results: bool | None = None - ) -> FullCalculation: - # Auto-call do_modeling() if not already done - if not self.modeled: - logger.info('Model not yet created. Calling do_modeling() automatically.') - self.do_modeling() - - t_start = timeit.default_timer() - - self.model.solve( - log_fn=pathlib.Path(log_file) if log_file is not None else self.folder / f'{self.name}.log', - solver_name=solver.name, - **solver.options, - ) - self.durations['solving'] = round(timeit.default_timer() - t_start, 2) - logger.success(f'Model solved with {solver.name} in {self.durations["solving"]:.2f} seconds.') - logger.info(f'Model status after solve: {self.model.status}') - - if self.model.status == 'warning': - # Save the model and the flow_system to file in case of infeasibility - paths = fx_io.CalculationResultsPaths(self.folder, self.name) - from .io import document_linopy_model - - document_linopy_model(self.model, paths.model_documentation) - self.flow_system.to_netcdf(paths.flow_system) - raise RuntimeError( - f'Model was infeasible. Please check {paths.model_documentation=} and {paths.flow_system=} for more information.' - ) - - # Log the formatted output - should_log = log_main_results if log_main_results is not None else CONFIG.Solving.log_main_results - if should_log and logger.isEnabledFor(logging.INFO): - logger.info( - f'{" Main Results ":#^80}\n' + fx_io.format_yaml_string(self.main_results, compact_numeric_lists=True) - ) - - self.results = CalculationResults.from_calculation(self) - - return self - - -class AggregatedCalculation(FullCalculation): + def __init__( + self, + name: str, + flow_system: FlowSystem, + active_timesteps: Annotated[ + pd.DatetimeIndex | None, + 'DEPRECATED: Use flow_system.sel(time=...) or flow_system.isel(time=...) instead', + ] = None, + folder: pathlib.Path | None = None, + normalize_weights: bool = True, + ): + _deprecation_warning('FullCalculation', 'Optimization') + super().__init__(name, flow_system, active_timesteps, folder, normalize_weights) + + +class AggregatedCalculation(_ClusteredOptimization): """ + DEPRECATED: Use ClusteredOptimization instead. + AggregatedCalculation reduces computational complexity by clustering time series into typical periods. This calculation approach aggregates time series data using clustering techniques (tsam) to identify representative time periods, significantly reducing computation time while maintaining solution accuracy. - Note: - The quality of the solution depends on the choice of aggregation parameters. - The optimal parameters depend on the specific problem and the characteristics of the time series data. - For more information, refer to the [tsam documentation](https://tsam.readthedocs.io/en/latest/). - Args: name: Name of the calculation flow_system: FlowSystem to be optimized @@ -287,10 +128,6 @@ class AggregatedCalculation(FullCalculation): This equalizes variables in the components according to the typical periods computed in the aggregation active_timesteps: DatetimeIndex of timesteps to use for calculation. If None, all timesteps are used folder: Folder where results should be saved. If None, current working directory is used - - Attributes: - aggregation (Aggregation | None): Contains the clustered time series data - aggregation_model (AggregationModel | None): Contains Variables and Constraints that equalize clusters of the time series data """ def __init__( @@ -305,218 +142,23 @@ def __init__( ] = None, folder: pathlib.Path | None = None, ): - if flow_system.scenarios is not None: - raise ValueError('Aggregation is not supported for scenarios yet. Please use FullCalculation instead.') - super().__init__(name, flow_system, active_timesteps, folder=folder) - self.aggregation_parameters = aggregation_parameters - self.components_to_clusterize = components_to_clusterize - self.aggregation: Aggregation | None = None - self.aggregation_model: AggregationModel | None = None - - def do_modeling(self) -> AggregatedCalculation: - t_start = timeit.default_timer() - self.flow_system.connect_and_transform() - self._perform_aggregation() - - # Model the System - self.model = self.flow_system.create_model(self.normalize_weights) - self.model.do_modeling() - # Add Aggregation Submodel after modeling the rest - self.aggregation_model = AggregationModel( - self.model, self.aggregation_parameters, self.flow_system, self.aggregation, self.components_to_clusterize - ) - self.aggregation_model.do_modeling() - self.durations['modeling'] = round(timeit.default_timer() - t_start, 2) - return self - - def _perform_aggregation(self): - from .aggregation import Aggregation - - t_start_agg = timeit.default_timer() - - # Validation - dt_min = float(self.flow_system.hours_per_timestep.min().item()) - dt_max = float(self.flow_system.hours_per_timestep.max().item()) - if not dt_min == dt_max: - raise ValueError( - f'Aggregation failed due to inconsistent time step sizes:' - f'delta_t varies from {dt_min} to {dt_max} hours.' - ) - ratio = self.aggregation_parameters.hours_per_period / dt_max - if not np.isclose(ratio, round(ratio), atol=1e-9): - raise ValueError( - f'The selected {self.aggregation_parameters.hours_per_period=} does not match the time ' - f'step size of {dt_max} hours. It must be an integer multiple of {dt_max} hours.' - ) - - logger.info(f'{"":#^80}') - logger.info(f'{" Aggregating TimeSeries Data ":#^80}') - - ds = self.flow_system.to_dataset() - - temporaly_changing_ds = drop_constant_arrays(ds, dim='time') - - # Aggregation - creation of aggregated timeseries: - self.aggregation = Aggregation( - original_data=temporaly_changing_ds.to_dataframe(), - hours_per_time_step=float(dt_min), - hours_per_period=self.aggregation_parameters.hours_per_period, - nr_of_periods=self.aggregation_parameters.nr_of_periods, - weights=self.calculate_aggregation_weights(temporaly_changing_ds), - time_series_for_high_peaks=self.aggregation_parameters.labels_for_high_peaks, - time_series_for_low_peaks=self.aggregation_parameters.labels_for_low_peaks, - ) - - self.aggregation.cluster() - self.aggregation.plot(show=CONFIG.Plotting.default_show, save=self.folder / 'aggregation.html') - if self.aggregation_parameters.aggregate_data_and_fix_non_binary_vars: - ds = self.flow_system.to_dataset() - for name, series in self.aggregation.aggregated_data.items(): - da = ( - DataConverter.to_dataarray(series, self.flow_system.coords) - .rename(name) - .assign_attrs(ds[name].attrs) - ) - if TimeSeriesData.is_timeseries_data(da): - da = TimeSeriesData.from_dataarray(da) - - ds[name] = da - - self.flow_system = FlowSystem.from_dataset(ds) - self.flow_system.connect_and_transform() - self.durations['aggregation'] = round(timeit.default_timer() - t_start_agg, 2) - - @classmethod - def calculate_aggregation_weights(cls, ds: xr.Dataset) -> dict[str, float]: - """Calculate weights for all datavars in the dataset. Weights are pulled from the attrs of the datavars.""" - - groups = [da.attrs['aggregation_group'] for da in ds.data_vars.values() if 'aggregation_group' in da.attrs] - group_counts = Counter(groups) - - # Calculate weight for each group (1/count) - group_weights = {group: 1 / count for group, count in group_counts.items()} - - weights = {} - for name, da in ds.data_vars.items(): - group_weight = group_weights.get(da.attrs.get('aggregation_group')) - if group_weight is not None: - weights[name] = group_weight - else: - weights[name] = da.attrs.get('aggregation_weight', 1) - - if np.all(np.isclose(list(weights.values()), 1, atol=1e-6)): - logger.info('All Aggregation weights were set to 1') - - return weights - - -class SegmentedCalculation(Calculation): - """Solve large optimization problems by dividing time horizon into (overlapping) segments. - - This class addresses memory and computational limitations of large-scale optimization - problems by decomposing the time horizon into smaller overlapping segments that are - solved sequentially. Each segment uses final values from the previous segment as - initial conditions, ensuring dynamic continuity across the solution. - - Key Concepts: - **Temporal Decomposition**: Divides long time horizons into manageable segments - **Overlapping Windows**: Segments share timesteps to improve storage dynamics - **Value Transfer**: Final states of one segment become initial states of the next - **Sequential Solving**: Each segment solved independently but with coupling - - Limitations and Constraints: - **Investment Parameters**: InvestParameters are not supported in segmented calculations - as investment decisions must be made for the entire time horizon, not per segment. - - **Global Constraints**: Time-horizon-wide constraints (flow_hours_total_min/max, - load_factor_min/max) may produce suboptimal results as they cannot be enforced - globally across segments. - - **Storage Dynamics**: While overlap helps, storage optimization may be suboptimal - compared to full-horizon solutions due to limited foresight in each segment. + _deprecation_warning('AggregatedCalculation', 'ClusteredOptimization') + super().__init__(name, flow_system, aggregation_parameters, components_to_clusterize, active_timesteps, folder) + + +class SegmentedCalculation(_SegmentedOptimization): + """ + DEPRECATED: Use SegmentedOptimization instead. + + Solve large optimization problems by dividing time horizon into (overlapping) segments. Args: name: Unique identifier for the calculation, used in result files and logging. flow_system: The FlowSystem to optimize, containing all components, flows, and buses. timesteps_per_segment: Number of timesteps in each segment (excluding overlap). - Must be > 2 to avoid internal side effects. Larger values provide better - optimization at the cost of memory and computation time. overlap_timesteps: Number of additional timesteps added to each segment. - Improves storage optimization by providing lookahead. Higher values - improve solution quality but increase computational cost. - nr_of_previous_values: Number of previous timestep values to transfer between - segments for initialization. Typically 1 is sufficient. + nr_of_previous_values: Number of previous timestep values to transfer between segments for initialization. folder: Directory for saving results. Defaults to current working directory + 'results'. - - Examples: - Annual optimization with monthly segments: - - ```python - # 8760 hours annual data with monthly segments (730 hours) and 48-hour overlap - segmented_calc = SegmentedCalculation( - name='annual_energy_system', - flow_system=energy_system, - timesteps_per_segment=730, # ~1 month - overlap_timesteps=48, # 2 days overlap - folder=Path('results/segmented'), - ) - segmented_calc.do_modeling_and_solve(solver='gurobi') - ``` - - Weekly optimization with daily overlap: - - ```python - # Weekly segments for detailed operational planning - weekly_calc = SegmentedCalculation( - name='weekly_operations', - flow_system=industrial_system, - timesteps_per_segment=168, # 1 week (hourly data) - overlap_timesteps=24, # 1 day overlap - nr_of_previous_values=1, - ) - ``` - - Large-scale system with minimal overlap: - - ```python - # Large system with minimal overlap for computational efficiency - large_calc = SegmentedCalculation( - name='large_scale_grid', - flow_system=grid_system, - timesteps_per_segment=100, # Shorter segments - overlap_timesteps=5, # Minimal overlap - ) - ``` - - Design Considerations: - **Segment Size**: Balance between solution quality and computational efficiency. - Larger segments provide better optimization but require more memory and time. - - **Overlap Duration**: More overlap improves storage dynamics and reduces - end-effects but increases computational cost. Typically 5-10% of segment length. - - **Storage Systems**: Systems with large storage components benefit from longer - overlaps to capture charge/discharge cycles effectively. - - **Investment Decisions**: Use FullCalculation for problems requiring investment - optimization, as SegmentedCalculation cannot handle investment parameters. - - Common Use Cases: - - **Annual Planning**: Long-term planning with seasonal variations - - **Large Networks**: Spatially or temporally large energy systems - - **Memory-Limited Systems**: When full optimization exceeds available memory - - **Operational Planning**: Detailed short-term optimization with limited foresight - - **Sensitivity Analysis**: Quick approximate solutions for parameter studies - - Performance Tips: - - Start with FullCalculation and use this class if memory issues occur - - Use longer overlaps for systems with significant storage - - Monitor solution quality at segment boundaries for discontinuities - - Warning: - The evaluation of the solution is a bit more complex than FullCalculation or AggregatedCalculation - due to the overlapping individual solutions. - """ def __init__( @@ -528,209 +170,8 @@ def __init__( nr_of_previous_values: int = 1, folder: pathlib.Path | None = None, ): - super().__init__(name, flow_system, folder=folder) - self.timesteps_per_segment = timesteps_per_segment - self.overlap_timesteps = overlap_timesteps - self.nr_of_previous_values = nr_of_previous_values - self.sub_calculations: list[FullCalculation] = [] - - self.segment_names = [ - f'Segment_{i + 1}' for i in range(math.ceil(len(self.all_timesteps) / self.timesteps_per_segment)) - ] - self._timesteps_per_segment = self._calculate_timesteps_per_segment() - - assert timesteps_per_segment > 2, 'The Segment length must be greater 2, due to unwanted internal side effects' - assert self.timesteps_per_segment_with_overlap <= len(self.all_timesteps), ( - f'{self.timesteps_per_segment_with_overlap=} cant be greater than the total length {len(self.all_timesteps)}' - ) - - self.flow_system._connect_network() # Connect network to ensure that all Flows know their Component - # Storing all original start values - self._original_start_values = { - **{flow.label_full: flow.previous_flow_rate for flow in self.flow_system.flows.values()}, - **{ - comp.label_full: comp.initial_charge_state - for comp in self.flow_system.components.values() - if isinstance(comp, Storage) - }, - } - self._transfered_start_values: list[dict[str, Any]] = [] - - def _create_sub_calculations(self): - for i, (segment_name, timesteps_of_segment) in enumerate( - zip(self.segment_names, self._timesteps_per_segment, strict=True) - ): - calc = FullCalculation(f'{self.name}-{segment_name}', self.flow_system.sel(time=timesteps_of_segment)) - calc.flow_system._connect_network() # Connect to have Correct names of Flows! - - self.sub_calculations.append(calc) - logger.info( - f'{segment_name} [{i + 1:>2}/{len(self.segment_names):<2}] ' - f'({timesteps_of_segment[0]} -> {timesteps_of_segment[-1]}):' - ) - - def _solve_single_segment( - self, - i: int, - calculation: FullCalculation, - solver: _Solver, - log_file: pathlib.Path | None, - log_main_results: bool, - suppress_output: bool, - ) -> None: - """Solve a single segment calculation.""" - if i > 0 and self.nr_of_previous_values > 0: - self._transfer_start_values(i) - - calculation.do_modeling() - - # Warn about Investments, but only in first run - if i == 0: - invest_elements = [ - model.label_full - for component in calculation.flow_system.components.values() - for model in component.submodel.all_submodels - if isinstance(model, InvestmentModel) - ] - if invest_elements: - logger.critical( - f'Investments are not supported in Segmented Calculation! ' - f'Following InvestmentModels were found: {invest_elements}' - ) - - log_path = pathlib.Path(log_file) if log_file is not None else self.folder / f'{self.name}.log' - - if suppress_output: - with fx_io.suppress_output(): - calculation.solve(solver, log_file=log_path, log_main_results=log_main_results) - else: - calculation.solve(solver, log_file=log_path, log_main_results=log_main_results) - - def do_modeling_and_solve( - self, - solver: _Solver, - log_file: pathlib.Path | None = None, - log_main_results: bool = False, - show_individual_solves: bool = False, - ) -> SegmentedCalculation: - """Model and solve all segments of the segmented calculation. - - This method creates sub-calculations for each time segment, then iteratively - models and solves each segment. It supports two output modes: a progress bar - for compact output, or detailed individual solve information. - - Args: - solver: The solver instance to use for optimization (e.g., Gurobi, HiGHS). - log_file: Optional path to the solver log file. If None, defaults to - folder/name.log. - log_main_results: Whether to log main results (objective, effects, etc.) - after each segment solve. Defaults to False. - show_individual_solves: If True, shows detailed output for each segment - solve with logger messages. If False (default), shows a compact progress - bar with suppressed solver output for cleaner display. - - Returns: - Self, for method chaining. - - Note: - The method automatically transfers all start values between segments to ensure - continuity of storage states and flow rates across segment boundaries. - """ - logger.info(f'{"":#^80}') - logger.info(f'{" Segmented Solving ":#^80}') - self._create_sub_calculations() - - if show_individual_solves: - # Path 1: Show individual solves with detailed output - for i, calculation in enumerate(self.sub_calculations): - logger.info( - f'Solving segment {i + 1}/{len(self.sub_calculations)}: ' - f'{calculation.flow_system.timesteps[0]} -> {calculation.flow_system.timesteps[-1]}' - ) - self._solve_single_segment(i, calculation, solver, log_file, log_main_results, suppress_output=False) - else: - # Path 2: Show only progress bar with suppressed output - progress_bar = tqdm( - enumerate(self.sub_calculations), - total=len(self.sub_calculations), - desc='Solving segments', - unit='segment', - file=sys.stdout, - disable=not CONFIG.Solving.log_to_console, - ) - - try: - for i, calculation in progress_bar: - progress_bar.set_description( - f'Solving ({calculation.flow_system.timesteps[0]} -> {calculation.flow_system.timesteps[-1]})' - ) - self._solve_single_segment(i, calculation, solver, log_file, log_main_results, suppress_output=True) - finally: - progress_bar.close() - - for calc in self.sub_calculations: - for key, value in calc.durations.items(): - self.durations[key] += value - - logger.success(f'Model solved with {solver.name} in {self.durations["solving"]:.2f} seconds.') - - self.results = SegmentedCalculationResults.from_calculation(self) - - return self - - def _transfer_start_values(self, i: int): - """ - This function gets the last values of the previous solved segment and - inserts them as start values for the next segment - """ - timesteps_of_prior_segment = self.sub_calculations[i - 1].flow_system.timesteps_extra - - start = self.sub_calculations[i].flow_system.timesteps[0] - start_previous_values = timesteps_of_prior_segment[self.timesteps_per_segment - self.nr_of_previous_values] - end_previous_values = timesteps_of_prior_segment[self.timesteps_per_segment - 1] - - logger.debug( - f'Start of next segment: {start}. Indices of previous values: {start_previous_values} -> {end_previous_values}' - ) - current_flow_system = self.sub_calculations[i - 1].flow_system - next_flow_system = self.sub_calculations[i].flow_system - - start_values_of_this_segment = {} - - for current_flow in current_flow_system.flows.values(): - next_flow = next_flow_system.flows[current_flow.label_full] - next_flow.previous_flow_rate = current_flow.submodel.flow_rate.solution.sel( - time=slice(start_previous_values, end_previous_values) - ).values - start_values_of_this_segment[current_flow.label_full] = next_flow.previous_flow_rate - - for current_comp in current_flow_system.components.values(): - next_comp = next_flow_system.components[current_comp.label_full] - if isinstance(next_comp, Storage): - next_comp.initial_charge_state = current_comp.submodel.charge_state.solution.sel(time=start).item() - start_values_of_this_segment[current_comp.label_full] = next_comp.initial_charge_state - - self._transfered_start_values.append(start_values_of_this_segment) - - def _calculate_timesteps_per_segment(self) -> list[pd.DatetimeIndex]: - timesteps_per_segment = [] - for i, _ in enumerate(self.segment_names): - start = self.timesteps_per_segment * i - end = min(start + self.timesteps_per_segment_with_overlap, len(self.all_timesteps)) - timesteps_per_segment.append(self.all_timesteps[start:end]) - return timesteps_per_segment - - @property - def timesteps_per_segment_with_overlap(self): - return self.timesteps_per_segment + self.overlap_timesteps - - @property - def start_values_of_segments(self) -> list[dict[str, Any]]: - """Gives an overview of the start values of all Segments""" - return [{name: value for name, value in self._original_start_values.items()}] + [ - start_values for start_values in self._transfered_start_values - ] - - @property - def all_timesteps(self) -> pd.DatetimeIndex: - return self.flow_system.timesteps + _deprecation_warning('SegmentedCalculation', 'SegmentedOptimization') + super().__init__(name, flow_system, timesteps_per_segment, overlap_timesteps, nr_of_previous_values, folder) + + +__all__ = ['Calculation', 'FullCalculation', 'AggregatedCalculation', 'SegmentedCalculation'] diff --git a/flixopt/aggregation.py b/flixopt/clustering.py similarity index 85% rename from flixopt/aggregation.py rename to flixopt/clustering.py index adaed3e42..fd9287a19 100644 --- a/flixopt/aggregation.py +++ b/flixopt/clustering.py @@ -1,6 +1,6 @@ """ -This module contains the Aggregation functionality for the flixopt framework. -Through this, aggregating TimeSeriesData is possible. +This module contains the Clustering functionality for the flixopt framework. +Through this, clustering TimeSeriesData is possible. """ from __future__ import annotations @@ -9,10 +9,13 @@ import logging import pathlib import timeit +import warnings as _warnings from typing import TYPE_CHECKING import numpy as np +from .core import DEPRECATION_REMOVAL_VERSION + try: import tsam.timeseriesaggregation as tsam @@ -40,9 +43,9 @@ logger = logging.getLogger('flixopt') -class Aggregation: +class Clustering: """ - aggregation organizing class + Clustering organizing class """ def __init__( @@ -239,7 +242,7 @@ def get_equation_indices(self, skip_first_index_of_period: bool = True) -> tuple return np.array(idx_var1), np.array(idx_var2) -class AggregationParameters: +class ClusteringParameters: def __init__( self, hours_per_period: float, @@ -252,7 +255,7 @@ def __init__( time_series_for_low_peaks: list[TimeSeriesData] | None = None, ): """ - Initializes aggregation parameters for time series data + Initializes clustering parameters for time series data Args: hours_per_period: Duration of each period in hours. @@ -295,26 +298,26 @@ def use_low_peaks(self) -> bool: return bool(self.time_series_for_low_peaks) -class AggregationModel(Submodel): - """The AggregationModel holds equations and variables related to the Aggregation of a FlowSystem. +class ClusteringModel(Submodel): + """The ClusteringModel holds equations and variables related to the Clustering of a FlowSystem. It creates Equations that equates indices of variables, and introduces penalties related to binary variables, that escape the equation to their related binaries in other periods""" def __init__( self, model: FlowSystemModel, - aggregation_parameters: AggregationParameters, + clustering_parameters: ClusteringParameters, flow_system: FlowSystem, - aggregation_data: Aggregation, + clustering_data: Clustering, components_to_clusterize: list[Component] | None, ): """ Modeling-Element for "index-equating"-equations """ - super().__init__(model, label_of_element='Aggregation', label_of_model='Aggregation') + super().__init__(model, label_of_element='Clustering', label_of_model='Clustering') self.flow_system = flow_system - self.aggregation_parameters = aggregation_parameters - self.aggregation_data = aggregation_data + self.clustering_parameters = clustering_parameters + self.clustering_data = clustering_data self.components_to_clusterize = components_to_clusterize def do_modeling(self): @@ -323,7 +326,7 @@ def do_modeling(self): else: components = [component for component in self.components_to_clusterize] - indices = self.aggregation_data.get_equation_indices(skip_first_index_of_period=True) + indices = self.clustering_data.get_equation_indices(skip_first_index_of_period=True) time_variables: set[str] = { name for name in self._model.variables if 'time' in self._model.variables[name].dims @@ -332,22 +335,22 @@ def do_modeling(self): binary_time_variables: set[str] = time_variables & binary_variables for component in components: - if isinstance(component, Storage) and not self.aggregation_parameters.fix_storage_flows: + if isinstance(component, Storage) and not self.clustering_parameters.fix_storage_flows: continue # Fix Nothing in The Storage all_variables_of_component = set(component.submodel.variables) - if self.aggregation_parameters.aggregate_data_and_fix_non_binary_vars: + if self.clustering_parameters.aggregate_data_and_fix_non_binary_vars: relevant_variables = component.submodel.variables[all_variables_of_component & time_variables] else: relevant_variables = component.submodel.variables[all_variables_of_component & binary_time_variables] for variable in relevant_variables: self._equate_indices(component.submodel.variables[variable], indices) - penalty = self.aggregation_parameters.penalty_of_period_freedom - if (self.aggregation_parameters.percentage_of_period_freedom > 0) and penalty != 0: + penalty = self.clustering_parameters.penalty_of_period_freedom + if (self.clustering_parameters.percentage_of_period_freedom > 0) and penalty != 0: for variable in self.variables_direct.values(): - self._model.effects.add_share_to_penalty('Aggregation', variable * penalty) + self._model.effects.add_share_to_penalty('Clustering', variable * penalty) def _equate_indices(self, variable: linopy.Variable, indices: tuple[np.ndarray, np.ndarray]) -> None: assert len(indices[0]) == len(indices[1]), 'The length of the indices must match!!' @@ -363,7 +366,7 @@ def _equate_indices(self, variable: linopy.Variable, indices: tuple[np.ndarray, # Korrektur: (bisher nur fΓΌr BinΓ€rvariablen:) if ( variable.name in self._model.variables.binaries - and self.aggregation_parameters.percentage_of_period_freedom > 0 + and self.clustering_parameters.percentage_of_period_freedom > 0 ): sel = variable.isel(time=indices[0]) coords = {d: sel.indexes[d] for d in sel.dims} @@ -385,8 +388,44 @@ def _equate_indices(self, variable: linopy.Variable, indices: tuple[np.ndarray, # Begrenzung der Korrektur-Anzahl: # eq: sum(K) <= n_Corr_max - limit = int(np.floor(self.aggregation_parameters.percentage_of_period_freedom / 100 * length)) + limit = int(np.floor(self.clustering_parameters.percentage_of_period_freedom / 100 * length)) self.add_constraints( var_k0.sum(dim='time') + var_k1.sum(dim='time') <= limit, short_name=f'limit_corrections|{variable.name}', ) + + +# ===== Deprecated aliases for backward compatibility ===== + + +def _create_deprecation_warning(old_name: str, new_name: str): + """Helper to create a deprecation warning""" + _warnings.warn( + f"'{old_name}' is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. Use '{new_name}' instead.", + DeprecationWarning, + stacklevel=3, + ) + + +class Aggregation(Clustering): + """Deprecated: Use Clustering instead.""" + + def __init__(self, *args, **kwargs): + _create_deprecation_warning('Aggregation', 'Clustering') + super().__init__(*args, **kwargs) + + +class AggregationParameters(ClusteringParameters): + """Deprecated: Use ClusteringParameters instead.""" + + def __init__(self, *args, **kwargs): + _create_deprecation_warning('AggregationParameters', 'ClusteringParameters') + super().__init__(*args, **kwargs) + + +class AggregationModel(ClusteringModel): + """Deprecated: Use ClusteringModel instead.""" + + def __init__(self, *args, **kwargs): + _create_deprecation_warning('AggregationModel', 'ClusteringModel') + super().__init__(*args, **kwargs) diff --git a/flixopt/components.py b/flixopt/components.py index a7f8b6314..07bc5f204 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -5,14 +5,12 @@ from __future__ import annotations import logging -import warnings from typing import TYPE_CHECKING, Literal import numpy as np import xarray as xr from . import io as fx_io -from .config import DEPRECATION_REMOVAL_VERSION from .core import PlausibilityError from .elements import Component, ComponentModel, Flow from .features import InvestmentModel, PiecewiseModel @@ -23,7 +21,6 @@ if TYPE_CHECKING: import linopy - from .flow_system import FlowSystem from .types import Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -417,14 +414,6 @@ def __init__( prevent_simultaneous_flows=[charging, discharging] if prevent_simultaneous_charge_and_discharge else None, meta_data=meta_data, ) - if isinstance(initial_charge_state, str) and initial_charge_state == 'lastValueOfSim': - warnings.warn( - f'{initial_charge_state=} is deprecated. Use "equals_final" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - initial_charge_state = 'equals_final' self.charging = charging self.discharging = discharging @@ -1098,22 +1087,7 @@ def __init__( outputs: list[Flow] | None = None, prevent_simultaneous_flow_rates: bool = True, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters using centralized helper - outputs = self._handle_deprecated_kwarg(kwargs, 'source', 'outputs', outputs, transform=lambda x: [x]) - inputs = self._handle_deprecated_kwarg(kwargs, 'sink', 'inputs', inputs, transform=lambda x: [x]) - prevent_simultaneous_flow_rates = self._handle_deprecated_kwarg( - kwargs, - 'prevent_simultaneous_sink_and_source', - 'prevent_simultaneous_flow_rates', - prevent_simultaneous_flow_rates, - check_conflict=False, - ) - - # Validate any remaining unexpected kwargs - self._validate_kwargs(kwargs) - super().__init__( label, inputs=inputs, @@ -1123,36 +1097,6 @@ def __init__( ) self.prevent_simultaneous_flow_rates = prevent_simultaneous_flow_rates - @property - def source(self) -> Flow: - warnings.warn( - 'The source property is deprecated. Use the outputs property instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.outputs[0] - - @property - def sink(self) -> Flow: - warnings.warn( - 'The sink property is deprecated. Use the inputs property instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.inputs[0] - - @property - def prevent_simultaneous_sink_and_source(self) -> bool: - warnings.warn( - 'The prevent_simultaneous_sink_and_source property is deprecated. Use the prevent_simultaneous_flow_rates property instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.prevent_simultaneous_flow_rates - @register_class_for_io class Source(Component): @@ -1236,14 +1180,7 @@ def __init__( outputs: list[Flow] | None = None, meta_data: dict | None = None, prevent_simultaneous_flow_rates: bool = False, - **kwargs, ): - # Handle deprecated parameter using centralized helper - outputs = self._handle_deprecated_kwarg(kwargs, 'source', 'outputs', outputs, transform=lambda x: [x]) - - # Validate any remaining unexpected kwargs - self._validate_kwargs(kwargs) - self.prevent_simultaneous_flow_rates = prevent_simultaneous_flow_rates super().__init__( label, @@ -1252,16 +1189,6 @@ def __init__( prevent_simultaneous_flows=outputs if prevent_simultaneous_flow_rates else None, ) - @property - def source(self) -> Flow: - warnings.warn( - 'The source property is deprecated. Use the outputs property instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.outputs[0] - @register_class_for_io class Sink(Component): @@ -1346,29 +1273,16 @@ def __init__( inputs: list[Flow] | None = None, meta_data: dict | None = None, prevent_simultaneous_flow_rates: bool = False, - **kwargs, ): """Initialize a Sink (consumes flow from the system). - Supports legacy `sink=` keyword for backward compatibility (deprecated): if `sink` is provided - it is used as the single input flow and a DeprecationWarning is issued; specifying both - `inputs` and `sink` raises ValueError. - Args: label: Unique element label. inputs: Input flows for the sink. meta_data: Arbitrary metadata attached to the element. prevent_simultaneous_flow_rates: If True, prevents simultaneous nonzero flow rates across the element's inputs by wiring that restriction into the base Component setup. - - Note: - The deprecated `sink` kwarg is accepted for compatibility but will be removed in future releases. """ - # Handle deprecated parameter using centralized helper - inputs = self._handle_deprecated_kwarg(kwargs, 'sink', 'inputs', inputs, transform=lambda x: [x]) - - # Validate any remaining unexpected kwargs - self._validate_kwargs(kwargs) self.prevent_simultaneous_flow_rates = prevent_simultaneous_flow_rates super().__init__( @@ -1377,13 +1291,3 @@ def __init__( meta_data=meta_data, prevent_simultaneous_flows=inputs if prevent_simultaneous_flow_rates else None, ) - - @property - def sink(self) -> Flow: - warnings.warn( - 'The sink property is deprecated. Use the inputs property instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.inputs[0] diff --git a/flixopt/core.py b/flixopt/core.py index 71e389315..0c6656349 100644 --- a/flixopt/core.py +++ b/flixopt/core.py @@ -4,15 +4,13 @@ """ import logging -import warnings from itertools import permutations -from typing import Any, Literal, Union +from typing import Any, Literal import numpy as np import pandas as pd import xarray as xr -from .config import DEPRECATION_REMOVAL_VERSION from .types import NumericOrBool logger = logging.getLogger('flixopt') @@ -43,8 +41,6 @@ def __init__( *args: Any, aggregation_group: str | None = None, aggregation_weight: float | None = None, - agg_group: str | None = None, - agg_weight: float | None = None, **kwargs: Any, ): """ @@ -52,26 +48,8 @@ def __init__( *args: Arguments passed to DataArray aggregation_group: Aggregation group name aggregation_weight: Aggregation weight (0-1) - agg_group: Deprecated, use aggregation_group instead - agg_weight: Deprecated, use aggregation_weight instead **kwargs: Additional arguments passed to DataArray """ - if agg_group is not None: - warnings.warn( - f'agg_group is deprecated, use aggregation_group instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - aggregation_group = agg_group - if agg_weight is not None: - warnings.warn( - f'agg_weight is deprecated, use aggregation_weight instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - aggregation_weight = agg_weight if (aggregation_group is not None) and (aggregation_weight is not None): raise ValueError('Use either aggregation_group or aggregation_weight, not both') @@ -143,26 +121,6 @@ def __repr__(self): info_str = f'TimeSeriesData({", ".join(agg_info)})' if agg_info else 'TimeSeriesData' return f'{info_str}\n{super().__repr__()}' - @property - def agg_group(self): - warnings.warn( - f'agg_group is deprecated, use aggregation_group instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.aggregation_group - - @property - def agg_weight(self): - warnings.warn( - f'agg_weight is deprecated, use aggregation_weight instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.aggregation_weight - class DataConverter: """ diff --git a/flixopt/effects.py b/flixopt/effects.py index 43afcd0cf..6748211b1 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -23,7 +23,6 @@ if TYPE_CHECKING: from collections.abc import Iterator - from .flow_system import FlowSystem from .types import Effect_PS, Effect_TPS, Numeric_PS, Numeric_S, Numeric_TPS, Scalar logger = logging.getLogger('flixopt') @@ -204,7 +203,6 @@ def __init__( maximum_total: Numeric_PS | None = None, minimum_over_periods: Numeric_S | None = None, maximum_over_periods: Numeric_S | None = None, - **kwargs, ): super().__init__(label, meta_data=meta_data) self.unit = unit @@ -218,23 +216,6 @@ def __init__( self.share_from_temporal = share_from_temporal if share_from_temporal is not None else {} self.share_from_periodic = share_from_periodic if share_from_periodic is not None else {} - # Handle backwards compatibility for deprecated parameters using centralized helper - minimum_temporal = self._handle_deprecated_kwarg( - kwargs, 'minimum_operation', 'minimum_temporal', minimum_temporal - ) - maximum_temporal = self._handle_deprecated_kwarg( - kwargs, 'maximum_operation', 'maximum_temporal', maximum_temporal - ) - minimum_periodic = self._handle_deprecated_kwarg(kwargs, 'minimum_invest', 'minimum_periodic', minimum_periodic) - maximum_periodic = self._handle_deprecated_kwarg(kwargs, 'maximum_invest', 'maximum_periodic', maximum_periodic) - minimum_per_hour = self._handle_deprecated_kwarg( - kwargs, 'minimum_operation_per_hour', 'minimum_per_hour', minimum_per_hour - ) - maximum_per_hour = self._handle_deprecated_kwarg( - kwargs, 'maximum_operation_per_hour', 'maximum_per_hour', maximum_per_hour - ) - self._validate_kwargs(kwargs) - # Set attributes directly self.minimum_temporal = minimum_temporal self.maximum_temporal = maximum_temporal @@ -247,167 +228,6 @@ def __init__( self.minimum_over_periods = minimum_over_periods self.maximum_over_periods = maximum_over_periods - # Backwards compatible properties (deprecated) - @property - def minimum_operation(self): - """DEPRECATED: Use 'minimum_temporal' property instead.""" - warnings.warn( - "Property 'minimum_operation' is deprecated. Use 'minimum_temporal' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.minimum_temporal - - @minimum_operation.setter - def minimum_operation(self, value): - """DEPRECATED: Use 'minimum_temporal' property instead.""" - warnings.warn( - "Property 'minimum_operation' is deprecated. Use 'minimum_temporal' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.minimum_temporal = value - - @property - def maximum_operation(self): - """DEPRECATED: Use 'maximum_temporal' property instead.""" - warnings.warn( - "Property 'maximum_operation' is deprecated. Use 'maximum_temporal' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.maximum_temporal - - @maximum_operation.setter - def maximum_operation(self, value): - """DEPRECATED: Use 'maximum_temporal' property instead.""" - warnings.warn( - "Property 'maximum_operation' is deprecated. Use 'maximum_temporal' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.maximum_temporal = value - - @property - def minimum_invest(self): - """DEPRECATED: Use 'minimum_periodic' property instead.""" - warnings.warn( - "Property 'minimum_invest' is deprecated. Use 'minimum_periodic' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.minimum_periodic - - @minimum_invest.setter - def minimum_invest(self, value): - """DEPRECATED: Use 'minimum_periodic' property instead.""" - warnings.warn( - "Property 'minimum_invest' is deprecated. Use 'minimum_periodic' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.minimum_periodic = value - - @property - def maximum_invest(self): - """DEPRECATED: Use 'maximum_periodic' property instead.""" - warnings.warn( - "Property 'maximum_invest' is deprecated. Use 'maximum_periodic' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.maximum_periodic - - @maximum_invest.setter - def maximum_invest(self, value): - """DEPRECATED: Use 'maximum_periodic' property instead.""" - warnings.warn( - "Property 'maximum_invest' is deprecated. Use 'maximum_periodic' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.maximum_periodic = value - - @property - def minimum_operation_per_hour(self): - """DEPRECATED: Use 'minimum_per_hour' property instead.""" - warnings.warn( - "Property 'minimum_operation_per_hour' is deprecated. Use 'minimum_per_hour' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.minimum_per_hour - - @minimum_operation_per_hour.setter - def minimum_operation_per_hour(self, value): - """DEPRECATED: Use 'minimum_per_hour' property instead.""" - warnings.warn( - "Property 'minimum_operation_per_hour' is deprecated. Use 'minimum_per_hour' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.minimum_per_hour = value - - @property - def maximum_operation_per_hour(self): - """DEPRECATED: Use 'maximum_per_hour' property instead.""" - warnings.warn( - "Property 'maximum_operation_per_hour' is deprecated. Use 'maximum_per_hour' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.maximum_per_hour - - @maximum_operation_per_hour.setter - def maximum_operation_per_hour(self, value): - """DEPRECATED: Use 'maximum_per_hour' property instead.""" - warnings.warn( - "Property 'maximum_operation_per_hour' is deprecated. Use 'maximum_per_hour' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.maximum_per_hour = value - - @property - def minimum_total_per_period(self): - """DEPRECATED: Use 'minimum_total' property instead.""" - warnings.warn( - "Property 'minimum_total_per_period' is deprecated. Use 'minimum_total' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.minimum_total - - @minimum_total_per_period.setter - def minimum_total_per_period(self, value): - """DEPRECATED: Use 'minimum_total' property instead.""" - warnings.warn( - "Property 'minimum_total_per_period' is deprecated. Use 'minimum_total' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.minimum_total = value - - @property - def maximum_total_per_period(self): - """DEPRECATED: Use 'maximum_total' property instead.""" - warnings.warn( - "Property 'maximum_total_per_period' is deprecated. Use 'maximum_total' instead.", - DeprecationWarning, - stacklevel=2, - ) - return self.maximum_total - - @maximum_total_per_period.setter - def maximum_total_per_period(self, value): - """DEPRECATED: Use 'maximum_total' property instead.""" - warnings.warn( - "Property 'maximum_total_per_period' is deprecated. Use 'maximum_total' instead.", - DeprecationWarning, - stacklevel=2, - ) - self.maximum_total = value - def transform_data(self, name_prefix: str = '') -> None: prefix = '|'.join(filter(None, [name_prefix, self.label_full])) self.minimum_per_hour = self._fit_coords(f'{prefix}|minimum_per_hour', self.minimum_per_hour) diff --git a/flixopt/elements.py b/flixopt/elements.py index 611b0bd9f..a4934f211 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -12,7 +12,7 @@ import xarray as xr from . import io as fx_io -from .config import CONFIG, DEPRECATION_REMOVAL_VERSION +from .config import CONFIG from .core import PlausibilityError from .features import InvestmentModel, OnOffModel from .interface import InvestParameters, OnOffParameters @@ -28,13 +28,7 @@ if TYPE_CHECKING: import linopy - from .flow_system import FlowSystem from .types import ( - Bool_PS, - Bool_S, - Bool_TPS, - Effect_PS, - Effect_S, Effect_TPS, Numeric_PS, Numeric_S, @@ -469,7 +463,6 @@ def __init__( load_factor_max: Numeric_PS | None = None, previous_flow_rate: Scalar | list[Scalar] | None = None, meta_data: dict | None = None, - **kwargs, ): super().__init__(label, meta_data=meta_data) self.size = CONFIG.Modeling.big if size is None else size @@ -480,26 +473,6 @@ def __init__( self.load_factor_min = load_factor_min self.load_factor_max = load_factor_max - # Handle deprecated parameters - flow_hours_max = self._handle_deprecated_kwarg( - kwargs, 'flow_hours_per_period_max', 'flow_hours_max', flow_hours_max - ) - flow_hours_min = self._handle_deprecated_kwarg( - kwargs, 'flow_hours_per_period_min', 'flow_hours_min', flow_hours_min - ) - # Also handle the older deprecated names - flow_hours_max = self._handle_deprecated_kwarg(kwargs, 'flow_hours_total_max', 'flow_hours_max', flow_hours_max) - flow_hours_min = self._handle_deprecated_kwarg(kwargs, 'flow_hours_total_min', 'flow_hours_min', flow_hours_min) - flow_hours_max_over_periods = self._handle_deprecated_kwarg( - kwargs, 'total_flow_hours_max', 'flow_hours_max_over_periods', flow_hours_max_over_periods - ) - flow_hours_min_over_periods = self._handle_deprecated_kwarg( - kwargs, 'total_flow_hours_min', 'flow_hours_min_over_periods', flow_hours_min_over_periods - ) - - # Validate any remaining unexpected kwargs - self._validate_kwargs(kwargs) - # self.positive_gradient = TimeSeries('positive_gradient', positive_gradient, self) self.effects_per_flow_hour = effects_per_flow_hour if effects_per_flow_hour is not None else {} self.flow_hours_max = flow_hours_max @@ -618,51 +591,6 @@ def size_is_fixed(self) -> bool: # Wenn kein InvestParameters existiert --> True; Wenn Investparameter, den Wert davon nehmen return False if (isinstance(self.size, InvestParameters) and self.size.fixed_size is None) else True - # Backwards compatible properties (deprecated) - @property - def flow_hours_total_max(self): - """DEPRECATED: Use 'flow_hours_max' property instead.""" - warnings.warn( - f"Property 'flow_hours_total_max' is deprecated. Use 'flow_hours_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.flow_hours_max - - @flow_hours_total_max.setter - def flow_hours_total_max(self, value): - """DEPRECATED: Use 'flow_hours_max' property instead.""" - warnings.warn( - f"Property 'flow_hours_total_max' is deprecated. Use 'flow_hours_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.flow_hours_max = value - - @property - def flow_hours_total_min(self): - """DEPRECATED: Use 'flow_hours_min' property instead.""" - warnings.warn( - f"Property 'flow_hours_total_min' is deprecated. Use 'flow_hours_min' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.flow_hours_min - - @flow_hours_total_min.setter - def flow_hours_total_min(self, value): - """DEPRECATED: Use 'flow_hours_min' property instead.""" - warnings.warn( - f"Property 'flow_hours_total_min' is deprecated. Use 'flow_hours_min' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.flow_hours_min = value - def _format_invest_params(self, params: InvestParameters) -> str: """Format InvestParameters for display.""" return f'size: {params.format_for_repr()}' diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index 63bb7b16d..c9a319872 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -8,7 +8,7 @@ import warnings from collections import defaultdict from itertools import chain -from typing import TYPE_CHECKING, Any, Literal, Optional +from typing import TYPE_CHECKING, Any, Literal import numpy as np import pandas as pd @@ -32,7 +32,7 @@ import pyvis - from .types import Bool_TPS, Effect_TPS, Numeric_PS, Numeric_S, Numeric_TPS, NumericOrBool + from .types import Effect_TPS, Numeric_S, Numeric_TPS, NumericOrBool logger = logging.getLogger('flixopt') diff --git a/flixopt/interface.py b/flixopt/interface.py index 852c3e8f8..81bbe9e93 100644 --- a/flixopt/interface.py +++ b/flixopt/interface.py @@ -6,20 +6,18 @@ from __future__ import annotations import logging -import warnings -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING import numpy as np import pandas as pd import xarray as xr -from .config import CONFIG, DEPRECATION_REMOVAL_VERSION +from .config import CONFIG from .structure import Interface, register_class_for_io if TYPE_CHECKING: # for type checking and preventing circular imports from collections.abc import Iterator - from .flow_system import FlowSystem from .types import Effect_PS, Effect_TPS, Numeric_PS, Numeric_TPS logger = logging.getLogger('flixopt') @@ -732,18 +730,6 @@ class InvestParameters(Interface): linked_periods: Describes which periods are linked. 1 means linked, 0 means size=0. None means no linked periods. For convenience, pass a tuple containing the first and last period (2025, 2039), linking them and those in between - Deprecated Args: - fix_effects: **Deprecated**. Use `effects_of_investment` instead. - Will be removed in version 5.0.0. - specific_effects: **Deprecated**. Use `effects_of_investment_per_size` instead. - Will be removed in version 5.0.0. - divest_effects: **Deprecated**. Use `effects_of_retirement` instead. - Will be removed in version 5.0.0. - piecewise_effects: **Deprecated**. Use `piecewise_effects_of_investment` instead. - Will be removed in version 5.0.0. - optional: DEPRECATED. Use `mandatory` instead. Opposite of `mandatory`. - Will be removed in version 5.0.0. - Cost Annualization Requirements: All cost values must be properly weighted to match the optimization model's time horizon. For long-term investments, the cost values should be annualized to the corresponding operation time (annuity). @@ -900,36 +886,7 @@ def __init__( effects_of_retirement: Effect_PS | Numeric_PS | None = None, piecewise_effects_of_investment: PiecewiseEffects | None = None, linked_periods: Numeric_PS | tuple[int, int] | None = None, - **kwargs, ): - # Handle deprecated parameters using centralized helper - effects_of_investment = self._handle_deprecated_kwarg( - kwargs, 'fix_effects', 'effects_of_investment', effects_of_investment - ) - effects_of_investment_per_size = self._handle_deprecated_kwarg( - kwargs, 'specific_effects', 'effects_of_investment_per_size', effects_of_investment_per_size - ) - effects_of_retirement = self._handle_deprecated_kwarg( - kwargs, 'divest_effects', 'effects_of_retirement', effects_of_retirement - ) - piecewise_effects_of_investment = self._handle_deprecated_kwarg( - kwargs, 'piecewise_effects', 'piecewise_effects_of_investment', piecewise_effects_of_investment - ) - # For mandatory parameter with non-None default, disable conflict checking - if 'optional' in kwargs: - warnings.warn( - 'Deprecated parameter "optional" used. Check conflicts with new parameter "mandatory" manually! ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - mandatory = self._handle_deprecated_kwarg( - kwargs, 'optional', 'mandatory', mandatory, transform=lambda x: not x, check_conflict=False - ) - - # Validate any remaining unexpected kwargs - self._validate_kwargs(kwargs) - self.effects_of_investment = effects_of_investment if effects_of_investment is not None else {} self.effects_of_retirement = effects_of_retirement if effects_of_retirement is not None else {} self.fixed_size = fixed_size @@ -1007,74 +964,6 @@ def transform_data(self, name_prefix: str = '') -> None: ) self.fixed_size = self._fit_coords(f'{name_prefix}|fixed_size', self.fixed_size, dims=['period', 'scenario']) - @property - def optional(self) -> bool: - """DEPRECATED: Use 'mandatory' property instead. Returns the opposite of 'mandatory'.""" - import warnings - - warnings.warn( - f"Property 'optional' is deprecated. Use 'mandatory' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return not self.mandatory - - @optional.setter - def optional(self, value: bool): - """DEPRECATED: Use 'mandatory' property instead. Sets the opposite of the given value to 'mandatory'.""" - warnings.warn( - f"Property 'optional' is deprecated. Use 'mandatory' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.mandatory = not value - - @property - def fix_effects(self) -> Effect_PS | Numeric_PS: - """Deprecated property. Use effects_of_investment instead.""" - warnings.warn( - f'The fix_effects property is deprecated. Use effects_of_investment instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.effects_of_investment - - @property - def specific_effects(self) -> Effect_PS | Numeric_PS: - """Deprecated property. Use effects_of_investment_per_size instead.""" - warnings.warn( - f'The specific_effects property is deprecated. Use effects_of_investment_per_size instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.effects_of_investment_per_size - - @property - def divest_effects(self) -> Effect_PS | Numeric_PS: - """Deprecated property. Use effects_of_retirement instead.""" - warnings.warn( - f'The divest_effects property is deprecated. Use effects_of_retirement instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.effects_of_retirement - - @property - def piecewise_effects(self) -> PiecewiseEffects | None: - """Deprecated property. Use piecewise_effects_of_investment instead.""" - warnings.warn( - f'The piecewise_effects property is deprecated. Use piecewise_effects_of_investment instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.piecewise_effects_of_investment - @property def minimum_or_fixed_size(self) -> Numeric_PS: return self.fixed_size if self.fixed_size is not None else self.minimum_size @@ -1310,14 +1199,7 @@ def __init__( consecutive_off_hours_max: Numeric_TPS | None = None, switch_on_max: Numeric_PS | None = None, force_switch_on: bool = False, - **kwargs, ): - # Handle deprecated parameters - on_hours_min = self._handle_deprecated_kwarg(kwargs, 'on_hours_total_min', 'on_hours_min', on_hours_min) - on_hours_max = self._handle_deprecated_kwarg(kwargs, 'on_hours_total_max', 'on_hours_max', on_hours_max) - switch_on_max = self._handle_deprecated_kwarg(kwargs, 'switch_on_total_max', 'switch_on_max', switch_on_max) - self._validate_kwargs(kwargs) - self.effects_per_switch_on = effects_per_switch_on if effects_per_switch_on is not None else {} self.effects_per_running_hour = effects_per_running_hour if effects_per_running_hour is not None else {} self.on_hours_min = on_hours_min @@ -1390,70 +1272,3 @@ def use_switch_on(self) -> bool: self.switch_on_max, ] ) - - # Backwards compatible properties (deprecated) - @property - def on_hours_total_min(self): - """DEPRECATED: Use 'on_hours_min' property instead.""" - warnings.warn( - f"Property 'on_hours_total_min' is deprecated. Use 'on_hours_min' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.on_hours_min - - @on_hours_total_min.setter - def on_hours_total_min(self, value): - """DEPRECATED: Use 'on_hours_min' property instead.""" - warnings.warn( - f"Property 'on_hours_total_min' is deprecated. Use 'on_hours_min' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.on_hours_min = value - - @property - def on_hours_total_max(self): - """DEPRECATED: Use 'on_hours_max' property instead.""" - warnings.warn( - f"Property 'on_hours_total_max' is deprecated. Use 'on_hours_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.on_hours_max - - @on_hours_total_max.setter - def on_hours_total_max(self, value): - """DEPRECATED: Use 'on_hours_max' property instead.""" - warnings.warn( - f"Property 'on_hours_total_max' is deprecated. Use 'on_hours_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.on_hours_max = value - - @property - def switch_on_total_max(self): - """DEPRECATED: Use 'switch_on_max' property instead.""" - warnings.warn( - f"Property 'switch_on_total_max' is deprecated. Use 'switch_on_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.switch_on_max - - @switch_on_total_max.setter - def switch_on_total_max(self, value): - """DEPRECATED: Use 'switch_on_max' property instead.""" - warnings.warn( - f"Property 'switch_on_total_max' is deprecated. Use 'switch_on_max' instead. " - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.switch_on_max = value diff --git a/flixopt/linear_converters.py b/flixopt/linear_converters.py index 2ac60e70d..9ca73519e 100644 --- a/flixopt/linear_converters.py +++ b/flixopt/linear_converters.py @@ -5,14 +5,11 @@ from __future__ import annotations import logging -import warnings from typing import TYPE_CHECKING import numpy as np from .components import LinearConverter -from .config import DEPRECATION_REMOVAL_VERSION -from .core import TimeSeriesData from .structure import register_class_for_io if TYPE_CHECKING: @@ -41,9 +38,6 @@ class Boiler(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - eta: *Deprecated*. Use `thermal_efficiency` instead. - Q_fu: *Deprecated*. Use `fuel_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Natural gas boiler: @@ -87,14 +81,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - fuel_flow = self._handle_deprecated_kwarg(kwargs, 'Q_fu', 'fuel_flow', fuel_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - thermal_efficiency = self._handle_deprecated_kwarg(kwargs, 'eta', 'thermal_efficiency', thermal_efficiency) - self._validate_kwargs(kwargs) - # Validate required parameters if fuel_flow is None: raise ValueError(f"'{label}': fuel_flow is required and cannot be None") @@ -123,66 +110,6 @@ def thermal_efficiency(self, value): check_bounds(value, 'thermal_efficiency', self.label_full, 0, 1) self.conversion_factors = [{self.fuel_flow.label: value, self.thermal_flow.label: 1}] - @property - def eta(self) -> Numeric_TPS: - warnings.warn( - 'The "eta" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_efficiency - - @eta.setter - def eta(self, value: Numeric_TPS) -> None: - warnings.warn( - 'The "eta" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_efficiency = value - - @property - def Q_fu(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_fu" property is deprecated. Use "fuel_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.fuel_flow - - @Q_fu.setter - def Q_fu(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_fu" property is deprecated. Use "fuel_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.fuel_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - @register_class_for_io class Power2Heat(LinearConverter): @@ -204,9 +131,6 @@ class Power2Heat(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - eta: *Deprecated*. Use `thermal_efficiency` instead. - P_el: *Deprecated*. Use `electrical_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Electric resistance heater: @@ -252,14 +176,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - electrical_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'electrical_flow', electrical_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - thermal_efficiency = self._handle_deprecated_kwarg(kwargs, 'eta', 'thermal_efficiency', thermal_efficiency) - self._validate_kwargs(kwargs) - # Validate required parameters if electrical_flow is None: raise ValueError(f"'{label}': electrical_flow is required and cannot be None") @@ -289,66 +206,6 @@ def thermal_efficiency(self, value): check_bounds(value, 'thermal_efficiency', self.label_full, 0, 1) self.conversion_factors = [{self.electrical_flow.label: value, self.thermal_flow.label: 1}] - @property - def eta(self) -> Numeric_TPS: - warnings.warn( - 'The "eta" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_efficiency - - @eta.setter - def eta(self, value: Numeric_TPS) -> None: - warnings.warn( - 'The "eta" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_efficiency = value - - @property - def P_el(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_flow - - @P_el.setter - def P_el(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - @register_class_for_io class HeatPump(LinearConverter): @@ -370,9 +227,6 @@ class HeatPump(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - COP: *Deprecated*. Use `cop` instead. - P_el: *Deprecated*. Use `electrical_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Air-source heat pump with constant COP: @@ -417,14 +271,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - electrical_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'electrical_flow', electrical_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - cop = self._handle_deprecated_kwarg(kwargs, 'COP', 'cop', cop) - self._validate_kwargs(kwargs) - # Validate required parameters if electrical_flow is None: raise ValueError(f"'{label}': electrical_flow is required and cannot be None") @@ -454,64 +301,6 @@ def cop(self, value): check_bounds(value, 'cop', self.label_full, 1, 20) self.conversion_factors = [{self.electrical_flow.label: value, self.thermal_flow.label: 1}] - @property - def COP(self) -> Numeric_TPS: # noqa: N802 - warnings.warn( - f'The "COP" property is deprecated. Use "cop" instead. Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.cop - - @COP.setter - def COP(self, value: Numeric_TPS) -> None: # noqa: N802 - warnings.warn( - f'The "COP" property is deprecated. Use "cop" instead. Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.cop = value - - @property - def P_el(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_flow - - @P_el.setter - def P_el(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - @register_class_for_io class CoolingTower(LinearConverter): @@ -533,8 +322,6 @@ class CoolingTower(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - P_el: *Deprecated*. Use `electrical_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Industrial cooling tower: @@ -581,13 +368,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - electrical_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'electrical_flow', electrical_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - self._validate_kwargs(kwargs) - # Validate required parameters if electrical_flow is None: raise ValueError(f"'{label}': electrical_flow is required and cannot be None") @@ -615,46 +396,6 @@ def specific_electricity_demand(self, value): check_bounds(value, 'specific_electricity_demand', self.label_full, 0, 1) self.conversion_factors = [{self.electrical_flow.label: -1, self.thermal_flow.label: value}] - @property - def P_el(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_flow - - @P_el.setter - def P_el(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - @register_class_for_io class CHP(LinearConverter): @@ -678,11 +419,6 @@ class CHP(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - eta_th: *Deprecated*. Use `thermal_efficiency` instead. - eta_el: *Deprecated*. Use `electrical_efficiency` instead. - Q_fu: *Deprecated*. Use `fuel_flow` instead. - P_el: *Deprecated*. Use `electrical_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Natural gas CHP unit: @@ -736,18 +472,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - fuel_flow = self._handle_deprecated_kwarg(kwargs, 'Q_fu', 'fuel_flow', fuel_flow) - electrical_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'electrical_flow', electrical_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - thermal_efficiency = self._handle_deprecated_kwarg(kwargs, 'eta_th', 'thermal_efficiency', thermal_efficiency) - electrical_efficiency = self._handle_deprecated_kwarg( - kwargs, 'eta_el', 'electrical_efficiency', electrical_efficiency - ) - self._validate_kwargs(kwargs) - # Validate required parameters if fuel_flow is None: raise ValueError(f"'{label}': fuel_flow is required and cannot be None") @@ -801,106 +526,6 @@ def electrical_efficiency(self, value): check_bounds(value, 'electrical_efficiency', self.label_full, 0, 1) self.conversion_factors[1] = {self.fuel_flow.label: value, self.electrical_flow.label: 1} - @property - def eta_th(self) -> Numeric_TPS: - warnings.warn( - 'The "eta_th" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_efficiency - - @eta_th.setter - def eta_th(self, value: Numeric_TPS) -> None: - warnings.warn( - 'The "eta_th" property is deprecated. Use "thermal_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_efficiency = value - - @property - def eta_el(self) -> Numeric_TPS: - warnings.warn( - 'The "eta_el" property is deprecated. Use "electrical_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_efficiency - - @eta_el.setter - def eta_el(self, value: Numeric_TPS) -> None: - warnings.warn( - 'The "eta_el" property is deprecated. Use "electrical_efficiency" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_efficiency = value - - @property - def Q_fu(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_fu" property is deprecated. Use "fuel_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.fuel_flow - - @Q_fu.setter - def Q_fu(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_fu" property is deprecated. Use "fuel_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.fuel_flow = value - - @property - def P_el(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_flow - - @P_el.setter - def P_el(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - @register_class_for_io class HeatPumpWithSource(LinearConverter): @@ -924,10 +549,6 @@ class HeatPumpWithSource(LinearConverter): on_off_parameters: Parameters defining binary operation constraints and costs. meta_data: Used to store additional information. Not used internally but saved in results. Only use Python native types. - COP: *Deprecated*. Use `cop` instead. - P_el: *Deprecated*. Use `electrical_flow` instead. - Q_ab: *Deprecated*. Use `heat_source_flow` instead. - Q_th: *Deprecated*. Use `thermal_flow` instead. Examples: Ground-source heat pump with explicit ground coupling: @@ -981,15 +602,7 @@ def __init__( thermal_flow: Flow | None = None, on_off_parameters: OnOffParameters | None = None, meta_data: dict | None = None, - **kwargs, ): - # Handle deprecated parameters - electrical_flow = self._handle_deprecated_kwarg(kwargs, 'P_el', 'electrical_flow', electrical_flow) - heat_source_flow = self._handle_deprecated_kwarg(kwargs, 'Q_ab', 'heat_source_flow', heat_source_flow) - thermal_flow = self._handle_deprecated_kwarg(kwargs, 'Q_th', 'thermal_flow', thermal_flow) - cop = self._handle_deprecated_kwarg(kwargs, 'COP', 'cop', cop) - self._validate_kwargs(kwargs) - # Validate required parameters if electrical_flow is None: raise ValueError(f"'{label}': electrical_flow is required and cannot be None") @@ -1026,84 +639,6 @@ def cop(self, value): {self.heat_source_flow.label: value / (value - 1), self.thermal_flow.label: 1}, ] - @property - def COP(self) -> Numeric_TPS: # noqa: N802 - warnings.warn( - f'The "COP" property is deprecated. Use "cop" instead. Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.cop - - @COP.setter - def COP(self, value: Numeric_TPS) -> None: # noqa: N802 - warnings.warn( - f'The "COP" property is deprecated. Use "cop" instead. Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.cop = value - - @property - def P_el(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.electrical_flow - - @P_el.setter - def P_el(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "P_el" property is deprecated. Use "electrical_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.electrical_flow = value - - @property - def Q_ab(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_ab" property is deprecated. Use "heat_source_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.heat_source_flow - - @Q_ab.setter - def Q_ab(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_ab" property is deprecated. Use "heat_source_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.heat_source_flow = value - - @property - def Q_th(self) -> Flow: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - return self.thermal_flow - - @Q_th.setter - def Q_th(self, value: Flow) -> None: # noqa: N802 - warnings.warn( - 'The "Q_th" property is deprecated. Use "thermal_flow" instead. ' - f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}.', - DeprecationWarning, - stacklevel=2, - ) - self.thermal_flow = value - def check_bounds( value: Numeric_TPS, diff --git a/flixopt/optimization.py b/flixopt/optimization.py new file mode 100644 index 000000000..566f775f4 --- /dev/null +++ b/flixopt/optimization.py @@ -0,0 +1,744 @@ +""" +This module contains the Optimization functionality for the flixopt framework. +It is used to optimize a FlowSystemModel for a given FlowSystem through a solver. +There are three different Optimization types: + 1. Optimization: Optimizes the FlowSystemModel for the full FlowSystem + 2. ClusteredOptimization: Optimizes the FlowSystemModel for the full FlowSystem, but clusters the TimeSeriesData. + This simplifies the mathematical model and usually speeds up the solving process. + 3. SegmentedOptimization: Solves a FlowSystemModel for each individual Segment of the FlowSystem. +""" + +from __future__ import annotations + +import logging +import math +import pathlib +import sys +import timeit +import warnings +from collections import Counter +from typing import TYPE_CHECKING, Annotated, Any + +import numpy as np +from tqdm import tqdm + +from . import io as fx_io +from .clustering import Clustering, ClusteringModel, ClusteringParameters +from .components import Storage +from .config import CONFIG +from .core import DEPRECATION_REMOVAL_VERSION, DataConverter, TimeSeriesData, drop_constant_arrays +from .features import InvestmentModel +from .flow_system import FlowSystem +from .results import Results, SegmentedResults + +if TYPE_CHECKING: + import pandas as pd + import xarray as xr + + from .elements import Component + from .solvers import _Solver + from .structure import FlowSystemModel + +logger = logging.getLogger('flixopt') + + +class _Optimization: + """ + Base class for optimization implementations. + + This is an internal base class that provides common functionality for all optimization types. + Users should use Optimization, ClusteredOptimization, or SegmentedOptimization instead. + """ + + model: FlowSystemModel | None + + def __init__( + self, + name: str, + flow_system: FlowSystem, + active_timesteps: Annotated[ + pd.DatetimeIndex | None, + 'DEPRECATED: Use flow_system.sel(time=...) or flow_system.isel(time=...) instead', + ] = None, + folder: pathlib.Path | None = None, + normalize_weights: bool = True, + ): + self.name = name + if flow_system.used_in_calculation: + logger.warning( + f'This FlowSystem is already used in an optimization:\n{flow_system}\n' + f'Creating a copy of the FlowSystem for Optimization "{self.name}".' + ) + flow_system = flow_system.copy() + + if active_timesteps is not None: + warnings.warn( + f"The 'active_timesteps' parameter is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. " + 'Use flow_system.sel(time=timesteps) or flow_system.isel(time=indices) before passing ' + 'the FlowSystem to the Optimization instead.', + DeprecationWarning, + stacklevel=2, + ) + flow_system = flow_system.sel(time=active_timesteps) + self._active_timesteps = active_timesteps # deprecated + self.normalize_weights = normalize_weights + + flow_system._used_in_calculation = True + + self.flow_system = flow_system + self.model = None + + self.durations = {'modeling': 0.0, 'solving': 0.0, 'saving': 0.0} + self.folder = pathlib.Path.cwd() / 'results' if folder is None else pathlib.Path(folder) + self.results: Results | None = None + + if self.folder.exists() and not self.folder.is_dir(): + raise NotADirectoryError(f'Path {self.folder} exists and is not a directory.') + self.folder.mkdir(parents=False, exist_ok=True) + + @property + def main_results(self) -> dict[str, int | float | dict]: + main_results = { + 'Objective': self.model.objective.value, + 'Penalty': self.model.effects.penalty.total.solution.values, + 'Effects': { + f'{effect.label} [{effect.unit}]': { + 'temporal': effect.submodel.temporal.total.solution.values, + 'periodic': effect.submodel.periodic.total.solution.values, + 'total': effect.submodel.total.solution.values, + } + for effect in sorted(self.flow_system.effects.values(), key=lambda e: e.label_full.upper()) + }, + 'Invest-Decisions': { + 'Invested': { + model.label_of_element: model.size.solution + for component in self.flow_system.components.values() + for model in component.submodel.all_submodels + if isinstance(model, InvestmentModel) and model.size.solution.max() >= CONFIG.Modeling.epsilon + }, + 'Not invested': { + model.label_of_element: model.size.solution + for component in self.flow_system.components.values() + for model in component.submodel.all_submodels + if isinstance(model, InvestmentModel) and model.size.solution.max() < CONFIG.Modeling.epsilon + }, + }, + 'Buses with excess': [ + { + bus.label_full: { + 'input': bus.submodel.excess_input.solution.sum('time'), + 'output': bus.submodel.excess_output.solution.sum('time'), + } + } + for bus in self.flow_system.buses.values() + if bus.with_excess + and ( + bus.submodel.excess_input.solution.sum() > 1e-3 or bus.submodel.excess_output.solution.sum() > 1e-3 + ) + ], + } + + return fx_io.round_nested_floats(main_results) + + @property + def summary(self): + return { + 'Name': self.name, + 'Number of timesteps': len(self.flow_system.timesteps), + 'Calculation Type': self.__class__.__name__, + 'Constraints': self.model.constraints.ncons, + 'Variables': self.model.variables.nvars, + 'Main Results': self.main_results, + 'Durations': self.durations, + 'Config': CONFIG.to_dict(), + } + + @property + def active_timesteps(self) -> pd.DatetimeIndex: + warnings.warn( + 'active_timesteps is deprecated. Use flow_system.sel(time=...) or flow_system.isel(time=...) instead.', + DeprecationWarning, + stacklevel=2, + ) + return self._active_timesteps + + @property + def modeled(self) -> bool: + return True if self.model is not None else False + + +class Optimization(_Optimization): + """ + Standard optimization that solves the complete problem using all time steps. + + This is the default optimization approach that considers every time step, + providing the most accurate but computationally intensive solution. + + For large problems, consider using ClusteredOptimization (time aggregation) + or SegmentedOptimization (temporal decomposition) instead. + + Args: + name: name of optimization + flow_system: flow_system which should be optimized + folder: folder where results should be saved. If None, then the current working directory is used. + normalize_weights: Whether to automatically normalize the weights of scenarios to sum up to 1 when solving. + active_timesteps: Deprecated. Use FlowSystem.sel(time=...) or FlowSystem.isel(time=...) instead. + + Examples: + Basic usage: + ```python + from flixopt import Optimization + + opt = Optimization(name='my_optimization', flow_system=energy_system, folder=Path('results')) + opt.do_modeling() + opt.solve(solver=gurobi) + results = opt.results + ``` + """ + + def do_modeling(self) -> Optimization: + t_start = timeit.default_timer() + self.flow_system.connect_and_transform() + + self.model = self.flow_system.create_model(self.normalize_weights) + self.model.do_modeling() + + self.durations['modeling'] = round(timeit.default_timer() - t_start, 2) + return self + + def fix_sizes(self, ds: xr.Dataset, decimal_rounding: int | None = 5) -> Optimization: + """Fix the sizes of the calculations to specified values. + + Args: + ds: The dataset that contains the variable names mapped to their sizes. If None, the dataset is loaded from the results. + decimal_rounding: The number of decimal places to round the sizes to. If no rounding is applied, numerical errors might lead to infeasibility. + """ + if not self.modeled: + raise RuntimeError('Model was not created. Call do_modeling() first.') + if decimal_rounding is not None: + ds = ds.round(decimal_rounding) + + for name, da in ds.data_vars.items(): + if '|size' not in name: + continue + if name not in self.model.variables: + logger.debug(f'Variable {name} not found in calculation model. Skipping.') + continue + + con = self.model.add_constraints( + self.model[name] == da, + name=f'{name}-fixed', + ) + logger.debug(f'Fixed "{name}":\n{con}') + + return self + + def solve( + self, solver: _Solver, log_file: pathlib.Path | None = None, log_main_results: bool | None = None + ) -> Optimization: + # Auto-call do_modeling() if not already done + if not self.modeled: + logger.info('Model not yet created. Calling do_modeling() automatically.') + self.do_modeling() + + t_start = timeit.default_timer() + + self.model.solve( + log_fn=pathlib.Path(log_file) if log_file is not None else self.folder / f'{self.name}.log', + solver_name=solver.name, + **solver.options, + ) + self.durations['solving'] = round(timeit.default_timer() - t_start, 2) + logger.success(f'Model solved with {solver.name} in {self.durations["solving"]:.2f} seconds.') + logger.info(f'Model status after solve: {self.model.status}') + + if self.model.status == 'warning': + # Save the model and the flow_system to file in case of infeasibility + paths = fx_io.CalculationResultsPaths(self.folder, self.name) + from .io import document_linopy_model + + document_linopy_model(self.model, paths.model_documentation) + self.flow_system.to_netcdf(paths.flow_system) + raise RuntimeError( + f'Model was infeasible. Please check {paths.model_documentation=} and {paths.flow_system=} for more information.' + ) + + # Log the formatted output + should_log = log_main_results if log_main_results is not None else CONFIG.Solving.log_main_results + if should_log and logger.isEnabledFor(logging.INFO): + logger.info( + f'{" Main Results ":#^80}\n' + fx_io.format_yaml_string(self.main_results, compact_numeric_lists=True) + ) + + self.results = Results.from_optimization(self) + + return self + + +class ClusteredOptimization(_Optimization): + """ + ClusteredOptimization reduces computational complexity by clustering time series into typical periods. + + This optimization approach clusters time series data using techniques from the tsam library to identify + representative time periods, significantly reducing computation time while maintaining solution accuracy. + + Note: + The quality of the solution depends on the choice of aggregation parameters. + The optimal parameters depend on the specific problem and the characteristics of the time series data. + For more information, refer to the [tsam documentation](https://tsam.readthedocs.io/en/latest/). + + Args: + name: Name of the optimization + flow_system: FlowSystem to be optimized + clustering_parameters: Parameters for clustering. See ClusteringParameters class documentation + components_to_clusterize: list of Components to perform aggregation on. If None, all components are aggregated. + This equalizes variables in the components according to the typical periods computed in the aggregation + active_timesteps: DatetimeIndex of timesteps to use for optimization. If None, all timesteps are used + folder: Folder where results should be saved. If None, current working directory is used + + Attributes: + clustering (Clustering | None): Contains the clustered time series data + clustering_model (ClusteringModel | None): Contains Variables and Constraints that equalize clusters of the time series data + """ + + def __init__( + self, + name: str, + flow_system: FlowSystem, + clustering_parameters: ClusteringParameters, + components_to_clusterize: list[Component] | None = None, + active_timesteps: Annotated[ + pd.DatetimeIndex | None, + 'DEPRECATED: Use flow_system.sel(time=...) or flow_system.isel(time=...) instead', + ] = None, + folder: pathlib.Path | None = None, + ): + if flow_system.scenarios is not None: + raise ValueError('Clustering is not supported for scenarios yet. Please use Optimization instead.') + super().__init__(name, flow_system, active_timesteps, folder=folder) + self.clustering_parameters = clustering_parameters + self.components_to_clusterize = components_to_clusterize + self.clustering: Clustering | None = None + self.clustering_model: ClusteringModel | None = None + + def do_modeling(self) -> ClusteredOptimization: + t_start = timeit.default_timer() + self.flow_system.connect_and_transform() + self._perform_aggregation() + + # Model the System + self.model = self.flow_system.create_model(self.normalize_weights) + self.model.do_modeling() + # Add Clustering Submodel after modeling the rest + self.clustering_model = ClusteringModel( + self.model, self.clustering_parameters, self.flow_system, self.clustering, self.components_to_clusterize + ) + self.clustering_model.do_modeling() + self.durations['modeling'] = round(timeit.default_timer() - t_start, 2) + return self + + def _perform_aggregation(self): + from .clustering import Clustering + + t_start_agg = timeit.default_timer() + + # Validation + dt_min = float(self.flow_system.hours_per_timestep.min().item()) + dt_max = float(self.flow_system.hours_per_timestep.max().item()) + if not dt_min == dt_max: + raise ValueError( + f'Clustering failed due to inconsistent time step sizes: ' + f'delta_t varies from {dt_min} to {dt_max} hours.' + ) + ratio = self.clustering_parameters.hours_per_period / dt_max + if not np.isclose(ratio, round(ratio), atol=1e-9): + raise ValueError( + f'The selected {self.clustering_parameters.hours_per_period=} does not match the time ' + f'step size of {dt_max} hours. It must be an integer multiple of {dt_max} hours.' + ) + + logger.info(f'{"":#^80}') + logger.info(f'{" Clustering TimeSeries Data ":#^80}') + + ds = self.flow_system.to_dataset() + + temporaly_changing_ds = drop_constant_arrays(ds, dim='time') + + # Clustering - creation of clustered timeseries: + self.clustering = Clustering( + original_data=temporaly_changing_ds.to_dataframe(), + hours_per_time_step=float(dt_min), + hours_per_period=self.clustering_parameters.hours_per_period, + nr_of_periods=self.clustering_parameters.nr_of_periods, + weights=self.calculate_aggregation_weights(temporaly_changing_ds), + time_series_for_high_peaks=self.clustering_parameters.labels_for_high_peaks, + time_series_for_low_peaks=self.clustering_parameters.labels_for_low_peaks, + ) + + self.clustering.cluster() + self.clustering.plot(show=CONFIG.Plotting.default_show, save=self.folder / 'clustering.html') + if self.clustering_parameters.aggregate_data_and_fix_non_binary_vars: + ds = self.flow_system.to_dataset() + for name, series in self.clustering.aggregated_data.items(): + da = ( + DataConverter.to_dataarray(series, self.flow_system.coords) + .rename(name) + .assign_attrs(ds[name].attrs) + ) + if TimeSeriesData.is_timeseries_data(da): + da = TimeSeriesData.from_dataarray(da) + + ds[name] = da + + self.flow_system = FlowSystem.from_dataset(ds) + self.flow_system.connect_and_transform() + self.durations['clustering'] = round(timeit.default_timer() - t_start_agg, 2) + + @classmethod + def calculate_aggregation_weights(cls, ds: xr.Dataset) -> dict[str, float]: + """Calculate weights for all datavars in the dataset. Weights are pulled from the attrs of the datavars.""" + + groups = [da.attrs['aggregation_group'] for da in ds.data_vars.values() if 'aggregation_group' in da.attrs] + group_counts = Counter(groups) + + # Calculate weight for each group (1/count) + group_weights = {group: 1 / count for group, count in group_counts.items()} + + weights = {} + for name, da in ds.data_vars.items(): + group_weight = group_weights.get(da.attrs.get('aggregation_group')) + if group_weight is not None: + weights[name] = group_weight + else: + weights[name] = da.attrs.get('aggregation_weight', 1) + + if np.all(np.isclose(list(weights.values()), 1, atol=1e-6)): + logger.info('All Aggregation weights were set to 1') + + return weights + + +class SegmentedOptimization(_Optimization): + """Solve large optimization problems by dividing time horizon into (overlapping) segments. + + This class addresses memory and computational limitations of large-scale optimization + problems by decomposing the time horizon into smaller overlapping segments that are + solved sequentially. Each segment uses final values from the previous segment as + initial conditions, ensuring dynamic continuity across the solution. + + Key Concepts: + **Temporal Decomposition**: Divides long time horizons into manageable segments + **Overlapping Windows**: Segments share timesteps to improve storage dynamics + **Value Transfer**: Final states of one segment become initial states of the next + **Sequential Solving**: Each segment solved independently but with coupling + + Limitations and Constraints: + **Investment Parameters**: InvestParameters are not supported in segmented calculations + as investment decisions must be made for the entire time horizon, not per segment. + + **Global Constraints**: Time-horizon-wide constraints (flow_hours_total_min/max, + load_factor_min/max) may produce suboptimal results as they cannot be enforced + globally across segments. + + **Storage Dynamics**: While overlap helps, storage optimization may be suboptimal + compared to full-horizon solutions due to limited foresight in each segment. + + Args: + name: Unique identifier for the calculation, used in result files and logging. + flow_system: The FlowSystem to optimize, containing all components, flows, and buses. + timesteps_per_segment: Number of timesteps in each segment (excluding overlap). + Must be > 2 to avoid internal side effects. Larger values provide better + optimization at the cost of memory and computation time. + overlap_timesteps: Number of additional timesteps added to each segment. + Improves storage optimization by providing lookahead. Higher values + improve solution quality but increase computational cost. + nr_of_previous_values: Number of previous timestep values to transfer between + segments for initialization. Typically 1 is sufficient. + folder: Directory for saving results. Defaults to current working directory + 'results'. + + Examples: + Annual optimization with monthly segments: + + ```python + # 8760 hours annual data with monthly segments (730 hours) and 48-hour overlap + segmented_calc = SegmentedOptimization( + name='annual_energy_system', + flow_system=energy_system, + timesteps_per_segment=730, # ~1 month + overlap_timesteps=48, # 2 days overlap + folder=Path('results/segmented'), + ) + segmented_calc.do_modeling_and_solve(solver='gurobi') + ``` + + Weekly optimization with daily overlap: + + ```python + # Weekly segments for detailed operational planning + weekly_calc = SegmentedOptimization( + name='weekly_operations', + flow_system=industrial_system, + timesteps_per_segment=168, # 1 week (hourly data) + overlap_timesteps=24, # 1 day overlap + nr_of_previous_values=1, + ) + ``` + + Large-scale system with minimal overlap: + + ```python + # Large system with minimal overlap for computational efficiency + large_calc = SegmentedOptimization( + name='large_scale_grid', + flow_system=grid_system, + timesteps_per_segment=100, # Shorter segments + overlap_timesteps=5, # Minimal overlap + ) + ``` + + Design Considerations: + **Segment Size**: Balance between solution quality and computational efficiency. + Larger segments provide better optimization but require more memory and time. + + **Overlap Duration**: More overlap improves storage dynamics and reduces + end-effects but increases computational cost. Typically 5-10% of segment length. + + **Storage Systems**: Systems with large storage components benefit from longer + overlaps to capture charge/discharge cycles effectively. + + **Investment Decisions**: Use Optimization for problems requiring investment + optimization, as SegmentedOptimization cannot handle investment parameters. + + Common Use Cases: + - **Annual Planning**: Long-term planning with seasonal variations + - **Large Networks**: Spatially or temporally large energy systems + - **Memory-Limited Systems**: When full optimization exceeds available memory + - **Operational Planning**: Detailed short-term optimization with limited foresight + - **Sensitivity Analysis**: Quick approximate solutions for parameter studies + + Performance Tips: + - Start with Optimization and use this class if memory issues occur + - Use longer overlaps for systems with significant storage + - Monitor solution quality at segment boundaries for discontinuities + + Warning: + The evaluation of the solution is a bit more complex than Optimization or ClusteredOptimization + due to the overlapping individual solutions. + + """ + + def __init__( + self, + name: str, + flow_system: FlowSystem, + timesteps_per_segment: int, + overlap_timesteps: int, + nr_of_previous_values: int = 1, + folder: pathlib.Path | None = None, + ): + super().__init__(name, flow_system, folder=folder) + self.timesteps_per_segment = timesteps_per_segment + self.overlap_timesteps = overlap_timesteps + self.nr_of_previous_values = nr_of_previous_values + self.sub_calculations: list[Optimization] = [] + + self.segment_names = [ + f'Segment_{i + 1}' for i in range(math.ceil(len(self.all_timesteps) / self.timesteps_per_segment)) + ] + self._timesteps_per_segment = self._calculate_timesteps_per_segment() + + assert timesteps_per_segment > 2, 'The Segment length must be greater 2, due to unwanted internal side effects' + assert self.timesteps_per_segment_with_overlap <= len(self.all_timesteps), ( + f'{self.timesteps_per_segment_with_overlap=} cant be greater than the total length {len(self.all_timesteps)}' + ) + + self.flow_system._connect_network() # Connect network to ensure that all Flows know their Component + # Storing all original start values + self._original_start_values = { + **{flow.label_full: flow.previous_flow_rate for flow in self.flow_system.flows.values()}, + **{ + comp.label_full: comp.initial_charge_state + for comp in self.flow_system.components.values() + if isinstance(comp, Storage) + }, + } + self._transfered_start_values: list[dict[str, Any]] = [] + + def _create_sub_calculations(self): + for i, (segment_name, timesteps_of_segment) in enumerate( + zip(self.segment_names, self._timesteps_per_segment, strict=True) + ): + calc = Optimization(f'{self.name}-{segment_name}', self.flow_system.sel(time=timesteps_of_segment)) + calc.flow_system._connect_network() # Connect to have Correct names of Flows! + + self.sub_calculations.append(calc) + logger.info( + f'{segment_name} [{i + 1:>2}/{len(self.segment_names):<2}] ' + f'({timesteps_of_segment[0]} -> {timesteps_of_segment[-1]}):' + ) + + def _solve_single_segment( + self, + i: int, + calculation: Optimization, + solver: _Solver, + log_file: pathlib.Path | None, + log_main_results: bool, + suppress_output: bool, + ) -> None: + """Solve a single segment calculation.""" + if i > 0 and self.nr_of_previous_values > 0: + self._transfer_start_values(i) + + calculation.do_modeling() + + # Warn about Investments, but only in first run + if i == 0: + invest_elements = [ + model.label_full + for component in calculation.flow_system.components.values() + for model in component.submodel.all_submodels + if isinstance(model, InvestmentModel) + ] + if invest_elements: + logger.critical( + f'Investments are not supported in Segmented Calculation! ' + f'Following InvestmentModels were found: {invest_elements}' + ) + + log_path = pathlib.Path(log_file) if log_file is not None else self.folder / f'{self.name}.log' + + if suppress_output: + with fx_io.suppress_output(): + calculation.solve(solver, log_file=log_path, log_main_results=log_main_results) + else: + calculation.solve(solver, log_file=log_path, log_main_results=log_main_results) + + def do_modeling_and_solve( + self, + solver: _Solver, + log_file: pathlib.Path | None = None, + log_main_results: bool = False, + show_individual_solves: bool = False, + ) -> SegmentedOptimization: + """Model and solve all segments of the segmented calculation. + + This method creates sub-calculations for each time segment, then iteratively + models and solves each segment. It supports two output modes: a progress bar + for compact output, or detailed individual solve information. + + Args: + solver: The solver instance to use for optimization (e.g., Gurobi, HiGHS). + log_file: Optional path to the solver log file. If None, defaults to + folder/name.log. + log_main_results: Whether to log main results (objective, effects, etc.) + after each segment solve. Defaults to False. + show_individual_solves: If True, shows detailed output for each segment + solve with logger messages. If False (default), shows a compact progress + bar with suppressed solver output for cleaner display. + + Returns: + Self, for method chaining. + + Note: + The method automatically transfers all start values between segments to ensure + continuity of storage states and flow rates across segment boundaries. + """ + logger.info(f'{"":#^80}') + logger.info(f'{" Segmented Solving ":#^80}') + self._create_sub_calculations() + + if show_individual_solves: + # Path 1: Show individual solves with detailed output + for i, calculation in enumerate(self.sub_calculations): + logger.info( + f'Solving segment {i + 1}/{len(self.sub_calculations)}: ' + f'{calculation.flow_system.timesteps[0]} -> {calculation.flow_system.timesteps[-1]}' + ) + self._solve_single_segment(i, calculation, solver, log_file, log_main_results, suppress_output=False) + else: + # Path 2: Show only progress bar with suppressed output + progress_bar = tqdm( + enumerate(self.sub_calculations), + total=len(self.sub_calculations), + desc='Solving segments', + unit='segment', + file=sys.stdout, + disable=not CONFIG.Solving.log_to_console, + ) + + try: + for i, calculation in progress_bar: + progress_bar.set_description( + f'Solving ({calculation.flow_system.timesteps[0]} -> {calculation.flow_system.timesteps[-1]})' + ) + self._solve_single_segment(i, calculation, solver, log_file, log_main_results, suppress_output=True) + finally: + progress_bar.close() + + for calc in self.sub_calculations: + for key, value in calc.durations.items(): + self.durations[key] += value + + logger.success(f'Model solved with {solver.name} in {self.durations["solving"]:.2f} seconds.') + + self.results = SegmentedResults.from_optimization(self) + + return self + + def _transfer_start_values(self, i: int): + """ + This function gets the last values of the previous solved segment and + inserts them as start values for the next segment + """ + timesteps_of_prior_segment = self.sub_calculations[i - 1].flow_system.timesteps_extra + + start = self.sub_calculations[i].flow_system.timesteps[0] + start_previous_values = timesteps_of_prior_segment[self.timesteps_per_segment - self.nr_of_previous_values] + end_previous_values = timesteps_of_prior_segment[self.timesteps_per_segment - 1] + + logger.debug( + f'Start of next segment: {start}. Indices of previous values: {start_previous_values} -> {end_previous_values}' + ) + current_flow_system = self.sub_calculations[i - 1].flow_system + next_flow_system = self.sub_calculations[i].flow_system + + start_values_of_this_segment = {} + + for current_flow in current_flow_system.flows.values(): + next_flow = next_flow_system.flows[current_flow.label_full] + next_flow.previous_flow_rate = current_flow.submodel.flow_rate.solution.sel( + time=slice(start_previous_values, end_previous_values) + ).values + start_values_of_this_segment[current_flow.label_full] = next_flow.previous_flow_rate + + for current_comp in current_flow_system.components.values(): + next_comp = next_flow_system.components[current_comp.label_full] + if isinstance(next_comp, Storage): + next_comp.initial_charge_state = current_comp.submodel.charge_state.solution.sel(time=start).item() + start_values_of_this_segment[current_comp.label_full] = next_comp.initial_charge_state + + self._transfered_start_values.append(start_values_of_this_segment) + + def _calculate_timesteps_per_segment(self) -> list[pd.DatetimeIndex]: + timesteps_per_segment = [] + for i, _ in enumerate(self.segment_names): + start = self.timesteps_per_segment * i + end = min(start + self.timesteps_per_segment_with_overlap, len(self.all_timesteps)) + timesteps_per_segment.append(self.all_timesteps[start:end]) + return timesteps_per_segment + + @property + def timesteps_per_segment_with_overlap(self): + return self.timesteps_per_segment + self.overlap_timesteps + + @property + def start_values_of_segments(self) -> list[dict[str, Any]]: + """Gives an overview of the start values of all Segments""" + return [{name: value for name, value in self._original_start_values.items()}] + [ + start_values for start_values in self._transfered_start_values + ] + + @property + def all_timesteps(self) -> pd.DatetimeIndex: + return self.flow_system.timesteps diff --git a/flixopt/plotting.py b/flixopt/plotting.py index 94959ecb5..0a8dfbc9b 100644 --- a/flixopt/plotting.py +++ b/flixopt/plotting.py @@ -25,9 +25,7 @@ from __future__ import annotations -import itertools import logging -import os import pathlib from typing import TYPE_CHECKING, Any, Literal diff --git a/flixopt/results.py b/flixopt/results.py index ccc36952f..7c595aa48 100644 --- a/flixopt/results.py +++ b/flixopt/results.py @@ -16,15 +16,15 @@ from . import plotting from .color_processing import process_colors from .config import CONFIG +from .core import DEPRECATION_REMOVAL_VERSION from .flow_system import FlowSystem -from .structure import CompositeContainerMixin, ElementContainer, ResultsContainer +from .structure import CompositeContainerMixin, ResultsContainer if TYPE_CHECKING: import matplotlib.pyplot as plt import plotly import pyvis - from .calculation import Calculation, SegmentedCalculation from .core import FlowSystemDimensions logger = logging.getLogger('flixopt') @@ -53,8 +53,8 @@ class _FlowSystemRestorationError(Exception): pass -class CalculationResults(CompositeContainerMixin['ComponentResults | BusResults | EffectResults | FlowResults']): - """Comprehensive container for optimization calculation results and analysis tools. +class Results(CompositeContainerMixin['ComponentResults | BusResults | EffectResults | FlowResults']): + """Comprehensive container for optimization results and analysis tools. This class provides unified access to all optimization results including flow rates, component states, bus balances, and system effects. It offers powerful analysis @@ -93,7 +93,7 @@ class CalculationResults(CompositeContainerMixin['ComponentResults | BusResults ```python # Load results from file - results = CalculationResults.from_file('results', 'annual_optimization') + results = Results.from_file('results', 'annual_optimization') # Access specific component results boiler_results = results['Boiler_01'] @@ -150,15 +150,15 @@ class CalculationResults(CompositeContainerMixin['ComponentResults | BusResults model: linopy.Model | None @classmethod - def from_file(cls, folder: str | pathlib.Path, name: str) -> CalculationResults: - """Load CalculationResults from saved files. + def from_file(cls, folder: str | pathlib.Path, name: str) -> Results: + """Load Results from saved files. Args: folder: Directory containing saved files. name: Base name of saved files (without extensions). Returns: - CalculationResults: Loaded instance. + Results: Loaded instance. """ folder = pathlib.Path(folder) paths = fx_io.CalculationResultsPaths(folder, name) @@ -183,22 +183,22 @@ def from_file(cls, folder: str | pathlib.Path, name: str) -> CalculationResults: ) @classmethod - def from_calculation(cls, calculation: Calculation) -> CalculationResults: - """Create CalculationResults from a Calculation object. + def from_optimization(cls, optimization) -> Results: + """Create Results from an Optimization instance. Args: - calculation: Calculation object with solved model. + optimization: The Optimization instance to extract results from. Returns: - CalculationResults: New instance with extracted results. + Results: New instance containing the optimization results. """ return cls( - solution=calculation.model.solution, - flow_system_data=calculation.flow_system.to_dataset(), - summary=calculation.summary, - model=calculation.model, - name=calculation.name, - folder=calculation.folder, + solution=optimization.model.solution, + flow_system_data=optimization.flow_system.to_dataset(), + summary=optimization.summary, + model=optimization.model, + name=optimization.name, + folder=optimization.folder, ) def __init__( @@ -211,8 +211,9 @@ def __init__( model: linopy.Model | None = None, **kwargs, # To accept old "flow_system" parameter ): - """Initialize CalculationResults with optimization data. - Usually, this class is instantiated by the Calculation class, or by loading from file. + """Initialize Results with optimization data. + Usually, this class is instantiated by an Optimization object via `Results.from_optimization()` + or by loading from file using `Results.from_file()`. Args: solution: Optimization solution dataset. @@ -223,6 +224,9 @@ def __init__( model: Linopy optimization model. Deprecated: flow_system: Use flow_system_data instead. + + Note: + The legacy alias `CalculationResults` is deprecated. Use `Results` instead. """ # Handle potential old "flow_system" parameter for backward compatibility if 'flow_system' in kwargs and flow_system_data is None: @@ -1084,23 +1088,54 @@ def to_file( if save_linopy_model: if self.model is None: - logger.critical('No model in the CalculationResults. Saving the model is not possible.') + logger.critical('No model in the Results. Saving the model is not possible.') else: self.model.to_netcdf(paths.linopy_model, engine='netcdf4') if document_model: if self.model is None: - logger.critical('No model in the CalculationResults. Documenting the model is not possible.') + logger.critical('No model in the Results. Documenting the model is not possible.') else: fx_io.document_linopy_model(self.model, path=paths.model_documentation) logger.success(f'Saved calculation results "{name}" to {paths.model_documentation.parent}') +class CalculationResults(Results): + """DEPRECATED: Use Results instead. + + Backwards-compatible alias for Results class. + All functionality is inherited from Results. + """ + + def __init__(self, *args, **kwargs): + # Only warn if directly instantiating CalculationResults (not subclasses) + if self.__class__.__name__ == 'CalculationResults': + warnings.warn( + f'CalculationResults is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. Use Results instead.', + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) + + @classmethod + def from_calculation(cls, calculation): + """Create CalculationResults from a Calculation object. + + DEPRECATED: Use Results.from_optimization() instead. + Backwards-compatible method that redirects to from_optimization(). + + Args: + calculation: Calculation object with solved model. + + Returns: + CalculationResults: New instance with extracted results. + """ + return cls.from_optimization(calculation) + + class _ElementResults: - def __init__( - self, calculation_results: CalculationResults, label: str, variables: list[str], constraints: list[str] - ): + def __init__(self, calculation_results: Results, label: str, variables: list[str], constraints: list[str]): self._calculation_results = calculation_results self.label = label self._variable_names = variables @@ -1183,7 +1218,7 @@ def filter_solution( class _NodeResults(_ElementResults): def __init__( self, - calculation_results: CalculationResults, + calculation_results: Results, label: str, variables: list[str], constraints: list[str], @@ -1925,7 +1960,7 @@ def get_shares_from(self, element: str) -> xr.Dataset: class FlowResults(_ElementResults): def __init__( self, - calculation_results: CalculationResults, + calculation_results: Results, label: str, variables: list[str], constraints: list[str], @@ -1958,7 +1993,7 @@ def size(self) -> xr.DataArray: return xr.DataArray(np.nan).rename(name) -class SegmentedCalculationResults: +class SegmentedResults: """Results container for segmented optimization calculations with temporal decomposition. This class manages results from SegmentedCalculation runs where large optimization @@ -1985,7 +2020,7 @@ class SegmentedCalculationResults: ```python # Load segmented calculation results - results = SegmentedCalculationResults.from_file('results', 'annual_segmented') + results = SegmentedResults.from_file('results', 'annual_segmented') # Access unified results across all segments full_timeline = results.all_timesteps @@ -2014,7 +2049,7 @@ class SegmentedCalculationResults: segmented_calc.do_modeling_and_solve(solver='gurobi') # Extract unified results - results = SegmentedCalculationResults.from_calculation(segmented_calc) + results = SegmentedResults.from_calculation(segmented_calc) # Save combined results results.to_file(compression=5) @@ -2055,33 +2090,41 @@ class SegmentedCalculationResults: """ @classmethod - def from_calculation(cls, calculation: SegmentedCalculation): + def from_optimization(cls, optimization): + """Create SegmentedResults from a SegmentedOptimization instance. + + Args: + optimization: The SegmentedOptimization instance to extract results from. + + Returns: + SegmentedResults: New instance containing the optimization results. + """ return cls( - [calc.results for calc in calculation.sub_calculations], - all_timesteps=calculation.all_timesteps, - timesteps_per_segment=calculation.timesteps_per_segment, - overlap_timesteps=calculation.overlap_timesteps, - name=calculation.name, - folder=calculation.folder, + [calc.results for calc in optimization.sub_calculations], + all_timesteps=optimization.all_timesteps, + timesteps_per_segment=optimization.timesteps_per_segment, + overlap_timesteps=optimization.overlap_timesteps, + name=optimization.name, + folder=optimization.folder, ) @classmethod - def from_file(cls, folder: str | pathlib.Path, name: str) -> SegmentedCalculationResults: - """Load SegmentedCalculationResults from saved files. + def from_file(cls, folder: str | pathlib.Path, name: str) -> SegmentedResults: + """Load SegmentedResults from saved files. Args: folder: Directory containing saved files. name: Base name of saved files. Returns: - SegmentedCalculationResults: Loaded instance. + SegmentedResults: Loaded instance. """ folder = pathlib.Path(folder) path = folder / name logger.info(f'loading calculation "{name}" from file ("{path.with_suffix(".nc4")}")') meta_data = fx_io.load_json(path.with_suffix('.json')) return cls( - [CalculationResults.from_file(folder, sub_name) for sub_name in meta_data['sub_calculations']], + [Results.from_file(folder, sub_name) for sub_name in meta_data['sub_calculations']], all_timesteps=pd.DatetimeIndex( [datetime.datetime.fromisoformat(date) for date in meta_data['all_timesteps']], name='time' ), @@ -2093,7 +2136,7 @@ def from_file(cls, folder: str | pathlib.Path, name: str) -> SegmentedCalculatio def __init__( self, - segment_results: list[CalculationResults], + segment_results: list[Results], all_timesteps: pd.DatetimeIndex, timesteps_per_segment: int, overlap_timesteps: int, @@ -2327,6 +2370,40 @@ def to_file(self, folder: str | pathlib.Path | None = None, name: str | None = N logger.info(f'Saved calculation "{name}" to {path}') +class SegmentedCalculationResults(SegmentedResults): + """DEPRECATED: Use SegmentedResults instead. + + Backwards-compatible alias for SegmentedResults class. + All functionality is inherited from SegmentedResults. + """ + + def __init__(self, *args, **kwargs): + # Only warn if directly instantiating SegmentedCalculationResults (not subclasses) + if self.__class__.__name__ == 'SegmentedCalculationResults': + warnings.warn( + f'SegmentedCalculationResults is deprecated and will be removed in v{DEPRECATION_REMOVAL_VERSION}. ' + 'Use SegmentedResults instead.', + DeprecationWarning, + stacklevel=2, + ) + super().__init__(*args, **kwargs) + + @classmethod + def from_calculation(cls, calculation): + """Create SegmentedCalculationResults from a SegmentedCalculation object. + + DEPRECATED: Use SegmentedResults.from_optimization() instead. + Backwards-compatible method that redirects to from_optimization(). + + Args: + calculation: SegmentedCalculation object with solved model. + + Returns: + SegmentedCalculationResults: New instance with extracted results. + """ + return cls.from_optimization(calculation) + + def plot_heatmap( data: xr.DataArray | xr.Dataset, name: str | None = None, @@ -2353,7 +2430,7 @@ def plot_heatmap( """Plot heatmap visualization with support for multi-variable, faceting, and animation. This function provides a standalone interface to the heatmap plotting capabilities, - supporting the same modern features as CalculationResults.plot_heatmap(). + supporting the same modern features as Results.plot_heatmap(). Args: data: Data to plot. Can be a single DataArray or an xarray Dataset. diff --git a/pyproject.toml b/pyproject.toml index d7510b1ce..c410305f4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -162,7 +162,6 @@ select = [ "TCH", # flake8-type-checking (optimize imports for type checking) ] ignore = [ # Ignore specific rules - "F401", # Allow unused imports in some cases (use __all__) "UP038", "E501" # ignore long lines ] diff --git a/scripts/extract_changelog.py b/scripts/extract_changelog.py index d05229896..44790fec6 100644 --- a/scripts/extract_changelog.py +++ b/scripts/extract_changelog.py @@ -4,7 +4,6 @@ Simple script to create one file per release. """ -import os import re from pathlib import Path diff --git a/test_deprecations.py b/test_deprecations.py index d530841b2..e69de29bb 100644 --- a/test_deprecations.py +++ b/test_deprecations.py @@ -1,318 +0,0 @@ -"""Comprehensive pytest-based test for all deprecation warnings with v5.0.0 removal message.""" - -import warnings - -import pytest - -import flixopt as fx -from flixopt.config import DEPRECATION_REMOVAL_VERSION -from flixopt.linear_converters import CHP, Boiler, HeatPump, HeatPumpWithSource, Power2Heat - - -# === Parameter deprecations (via _handle_deprecated_kwarg) === -@pytest.mark.parametrize( - 'name,factory', - [ - ("Source 'source'", lambda: fx.Source('s1', source=fx.Flow('out1', bus='bus', size=10))), - ("Sink 'sink'", lambda: fx.Sink('sink1', sink=fx.Flow('in2', bus='bus', size=10))), - ("InvestParameters 'fix_effects'", lambda: fx.InvestParameters(minimum_size=10, fix_effects={'costs': 100})), - ( - "InvestParameters 'specific_effects'", - lambda: fx.InvestParameters(minimum_size=10, specific_effects={'costs': 10}), - ), - ( - "InvestParameters 'divest_effects'", - lambda: fx.InvestParameters(minimum_size=10, divest_effects={'costs': 50}), - ), - ( - "InvestParameters 'piecewise_effects'", - lambda: fx.InvestParameters(minimum_size=10, piecewise_effects=[]), - ), - ("InvestParameters 'optional'", lambda: fx.InvestParameters(minimum_size=10, optional=True)), - ("OnOffParameters 'on_hours_total_min'", lambda: fx.OnOffParameters(on_hours_total_min=10)), - ("OnOffParameters 'on_hours_total_max'", lambda: fx.OnOffParameters(on_hours_total_max=20)), - ("OnOffParameters 'switch_on_total_max'", lambda: fx.OnOffParameters(switch_on_total_max=5)), - ("Flow 'flow_hours_total_min'", lambda: fx.Flow('f1', bus='bus', size=10, flow_hours_total_min=5)), - ("Flow 'flow_hours_total_max'", lambda: fx.Flow('f2', bus='bus', size=10, flow_hours_total_max=20)), - ( - "Flow 'flow_hours_per_period_min'", - lambda: fx.Flow('f3', bus='bus', size=10, flow_hours_per_period_min=5), - ), - ( - "Flow 'flow_hours_per_period_max'", - lambda: fx.Flow('f4', bus='bus', size=10, flow_hours_per_period_max=20), - ), - ("Flow 'total_flow_hours_min'", lambda: fx.Flow('f5', bus='bus', size=10, total_flow_hours_min=5)), - ("Flow 'total_flow_hours_max'", lambda: fx.Flow('f6', bus='bus', size=10, total_flow_hours_max=20)), - ( - "Effect 'minimum_operation'", - lambda: fx.Effect('e1', unit='€', description='test', minimum_operation=100), - ), - ( - "Effect 'maximum_operation'", - lambda: fx.Effect('e2', unit='€', description='test', maximum_operation=200), - ), - ("Effect 'minimum_invest'", lambda: fx.Effect('e3', unit='€', description='test', minimum_invest=50)), - ("Effect 'maximum_invest'", lambda: fx.Effect('e4', unit='€', description='test', maximum_invest=150)), - ( - "Effect 'minimum_operation_per_hour'", - lambda: fx.Effect('e5', unit='€', description='test', minimum_operation_per_hour=10), - ), - ( - "Effect 'maximum_operation_per_hour'", - lambda: fx.Effect('e6', unit='€', description='test', maximum_operation_per_hour=30), - ), - # Linear converters - ( - "Boiler 'Q_fu'", - lambda: Boiler( - 'b1', Q_fu=fx.Flow('f1', 'bus', 10), thermal_flow=fx.Flow('h1', 'bus', 9), thermal_efficiency=0.9 - ), - ), - ( - "Boiler 'Q_th'", - lambda: Boiler( - 'b2', fuel_flow=fx.Flow('f2', 'bus', 10), Q_th=fx.Flow('h2', 'bus', 9), thermal_efficiency=0.9 - ), - ), - ( - "Boiler 'eta'", - lambda: Boiler('b3', fuel_flow=fx.Flow('f3', 'bus', 10), thermal_flow=fx.Flow('h3', 'bus', 9), eta=0.9), - ), - ( - "Power2Heat 'P_el'", - lambda: Power2Heat( - 'p1', P_el=fx.Flow('e1', 'bus', 10), thermal_flow=fx.Flow('h4', 'bus', 9), thermal_efficiency=0.9 - ), - ), - ( - "Power2Heat 'Q_th'", - lambda: Power2Heat( - 'p2', electrical_flow=fx.Flow('e2', 'bus', 10), Q_th=fx.Flow('h5', 'bus', 9), thermal_efficiency=0.9 - ), - ), - ( - "Power2Heat 'eta'", - lambda: Power2Heat( - 'p3', electrical_flow=fx.Flow('e3', 'bus', 10), thermal_flow=fx.Flow('h6', 'bus', 9), eta=0.9 - ), - ), - ( - "HeatPump 'P_el'", - lambda: HeatPump('hp1', P_el=fx.Flow('e4', 'bus', 10), thermal_flow=fx.Flow('h7', 'bus', 30), cop=3.0), - ), - ( - "HeatPump 'Q_th'", - lambda: HeatPump('hp2', electrical_flow=fx.Flow('e5', 'bus', 10), Q_th=fx.Flow('h8', 'bus', 30), cop=3.0), - ), - ( - "HeatPump 'COP'", - lambda: HeatPump( - 'hp3', electrical_flow=fx.Flow('e6', 'bus', 10), thermal_flow=fx.Flow('h9', 'bus', 30), COP=3.0 - ), - ), - ( - "CHP 'Q_fu'", - lambda: CHP( - 'chp1', - Q_fu=fx.Flow('f4', 'bus', 100), - electrical_flow=fx.Flow('e7', 'bus', 30), - thermal_flow=fx.Flow('h10', 'bus', 60), - thermal_efficiency=0.6, - electrical_efficiency=0.3, - ), - ), - ( - "CHP 'P_el'", - lambda: CHP( - 'chp2', - fuel_flow=fx.Flow('f5', 'bus', 100), - P_el=fx.Flow('e8', 'bus', 30), - thermal_flow=fx.Flow('h11', 'bus', 60), - thermal_efficiency=0.6, - electrical_efficiency=0.3, - ), - ), - ( - "CHP 'Q_th'", - lambda: CHP( - 'chp3', - fuel_flow=fx.Flow('f6', 'bus', 100), - electrical_flow=fx.Flow('e9', 'bus', 30), - Q_th=fx.Flow('h12', 'bus', 60), - thermal_efficiency=0.6, - electrical_efficiency=0.3, - ), - ), - ( - "CHP 'eta_th'", - lambda: CHP( - 'chp4', - fuel_flow=fx.Flow('f7', 'bus', 100), - electrical_flow=fx.Flow('e10', 'bus', 30), - thermal_flow=fx.Flow('h13', 'bus', 60), - eta_th=0.6, - electrical_efficiency=0.3, - ), - ), - ( - "CHP 'eta_el'", - lambda: CHP( - 'chp5', - fuel_flow=fx.Flow('f8', 'bus', 100), - electrical_flow=fx.Flow('e11', 'bus', 30), - thermal_flow=fx.Flow('h14', 'bus', 60), - thermal_efficiency=0.6, - eta_el=0.3, - ), - ), - ( - "HeatPumpWithSource 'COP'", - lambda: HeatPumpWithSource( - 'hps1', - electrical_flow=fx.Flow('e12', 'bus', 10), - heat_source_flow=fx.Flow('hs1', 'bus', 20), - thermal_flow=fx.Flow('h15', 'bus', 30), - COP=3.0, - ), - ), - ( - "HeatPumpWithSource 'P_el'", - lambda: HeatPumpWithSource( - 'hps2', - P_el=fx.Flow('e13', 'bus', 10), - heat_source_flow=fx.Flow('hs2', 'bus', 20), - thermal_flow=fx.Flow('h16', 'bus', 30), - cop=3.0, - ), - ), - ( - "HeatPumpWithSource 'Q_ab'", - lambda: HeatPumpWithSource( - 'hps3', - electrical_flow=fx.Flow('e14', 'bus', 10), - Q_ab=fx.Flow('hs3', 'bus', 20), - thermal_flow=fx.Flow('h17', 'bus', 30), - cop=3.0, - ), - ), - ( - "HeatPumpWithSource 'Q_th'", - lambda: HeatPumpWithSource( - 'hps4', - electrical_flow=fx.Flow('e15', 'bus', 10), - heat_source_flow=fx.Flow('hs4', 'bus', 20), - Q_th=fx.Flow('h18', 'bus', 30), - cop=3.0, - ), - ), - ], - ids=lambda x: x if isinstance(x, str) else '', -) -def test_parameter_deprecations(name, factory): - """Test all parameter deprecations include removal version message.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', DeprecationWarning) - factory() - assert len(w) > 0, f'No warning raised for {name}' - assert f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}' in str(w[0].message), ( - f'Missing removal version in {name}' - ) - - -# === Property deprecations === -@pytest.fixture(scope='module') -def deprecated_instances(): - """Create instances for property testing.""" - return { - 'data': fx.TimeSeriesData([1, 2, 3], aggregation_group=1), - 'boiler': Boiler( - 'b_prop', fuel_flow=fx.Flow('f_p', 'bus', 10), thermal_flow=fx.Flow('h_p', 'bus', 9), thermal_efficiency=0.9 - ), - 'invest_with_effects': fx.InvestParameters( - minimum_size=10, - maximum_size=100, - mandatory=False, - effects_of_investment={'costs': 100}, - effects_of_investment_per_size={'costs': 10}, - effects_of_retirement={'costs': 50}, - piecewise_effects_of_investment=None, - ), - 'invest': fx.InvestParameters(minimum_size=10, maximum_size=100, mandatory=False), - 'onoff': fx.OnOffParameters( - on_hours_min=5, - on_hours_max=10, - switch_on_max=3, - ), - 'flow': fx.Flow('f_prop', bus='bus', size=10, flow_hours_min=5, flow_hours_max=20), - 'chp': CHP( - 'chp_prop', - fuel_flow=fx.Flow('f_chp', 'bus', 100), - electrical_flow=fx.Flow('e_chp', 'bus', 30), - thermal_flow=fx.Flow('h_chp', 'bus', 60), - thermal_efficiency=0.6, - electrical_efficiency=0.3, - ), - 'hp': HeatPump( - 'hp_prop', electrical_flow=fx.Flow('e_hp', 'bus', 10), thermal_flow=fx.Flow('h_hp', 'bus', 30), cop=3.0 - ), - 'hps': HeatPumpWithSource( - 'hps_prop', - electrical_flow=fx.Flow('e_hps', 'bus', 10), - heat_source_flow=fx.Flow('hs_hps', 'bus', 20), - thermal_flow=fx.Flow('h_hps', 'bus', 30), - cop=3.0, - ), - } - - -@pytest.mark.parametrize( - 'name,accessor', - [ - # TimeSeriesData properties - ('TimeSeriesData.agg_group', lambda objs: objs['data'].agg_group), - ('TimeSeriesData.agg_weight', lambda objs: objs['data'].agg_weight), - # InvestParameters properties - ('InvestParameters.optional', lambda objs: objs['invest'].optional), - ('InvestParameters.fix_effects', lambda objs: objs['invest_with_effects'].fix_effects), - ('InvestParameters.specific_effects', lambda objs: objs['invest_with_effects'].specific_effects), - ('InvestParameters.divest_effects', lambda objs: objs['invest_with_effects'].divest_effects), - ('InvestParameters.piecewise_effects', lambda objs: objs['invest_with_effects'].piecewise_effects), - # OnOffParameters properties - ('OnOffParameters.on_hours_total_min', lambda objs: objs['onoff'].on_hours_total_min), - ('OnOffParameters.on_hours_total_max', lambda objs: objs['onoff'].on_hours_total_max), - ('OnOffParameters.switch_on_total_max', lambda objs: objs['onoff'].switch_on_total_max), - # Flow properties - ('Flow.flow_hours_total_min', lambda objs: objs['flow'].flow_hours_total_min), - ('Flow.flow_hours_total_max', lambda objs: objs['flow'].flow_hours_total_max), - # Boiler properties - ('Boiler.eta', lambda objs: objs['boiler'].eta), - ('Boiler.Q_fu', lambda objs: objs['boiler'].Q_fu), - ('Boiler.Q_th', lambda objs: objs['boiler'].Q_th), - # CHP properties - ('CHP.eta_th', lambda objs: objs['chp'].eta_th), - ('CHP.eta_el', lambda objs: objs['chp'].eta_el), - ('CHP.Q_fu', lambda objs: objs['chp'].Q_fu), - ('CHP.P_el', lambda objs: objs['chp'].P_el), - ('CHP.Q_th', lambda objs: objs['chp'].Q_th), - # HeatPump properties - ('HeatPump.COP', lambda objs: objs['hp'].COP), - ('HeatPump.P_el', lambda objs: objs['hp'].P_el), - ('HeatPump.Q_th', lambda objs: objs['hp'].Q_th), - # HeatPumpWithSource properties - ('HeatPumpWithSource.COP', lambda objs: objs['hps'].COP), - ('HeatPumpWithSource.P_el', lambda objs: objs['hps'].P_el), - ('HeatPumpWithSource.Q_ab', lambda objs: objs['hps'].Q_ab), - ('HeatPumpWithSource.Q_th', lambda objs: objs['hps'].Q_th), - ], - ids=lambda x: x if isinstance(x, str) else '', -) -def test_property_deprecations(name, accessor, deprecated_instances): - """Test all property deprecations include removal version message.""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', DeprecationWarning) - accessor(deprecated_instances) - assert len(w) > 0, f'No warning raised for {name}' - assert f'Will be removed in v{DEPRECATION_REMOVAL_VERSION}' in str(w[0].message), ( - f'Missing removal version in {name}' - ) diff --git a/tests/test_flow_system_resample.py b/tests/test_flow_system_resample.py index 8946dd02f..551fcf483 100644 --- a/tests/test_flow_system_resample.py +++ b/tests/test_flow_system_resample.py @@ -172,7 +172,7 @@ def test_converter_resample(complex_fs): fs_r = complex_fs.resample('4h', method='mean') assert 'boiler' in fs_r.components boiler = fs_r.components['boiler'] - assert hasattr(boiler, 'eta') + assert hasattr(boiler, 'thermal_efficiency') def test_invest_resample(complex_fs): diff --git a/tests/test_integration.py b/tests/test_integration.py index 88e4a21af..04fdd0936 100644 --- a/tests/test_integration.py +++ b/tests/test_integration.py @@ -1,4 +1,3 @@ -import numpy as np import pytest import flixopt as fx diff --git a/tests/test_invest_parameters_deprecation.py b/tests/test_invest_parameters_deprecation.py deleted file mode 100644 index 438d7f4b8..000000000 --- a/tests/test_invest_parameters_deprecation.py +++ /dev/null @@ -1,344 +0,0 @@ -""" -Test backward compatibility and deprecation warnings for InvestParameters. - -This test verifies that: -1. Old parameter names (fix_effects, specific_effects, divest_effects, piecewise_effects) still work with warnings -2. New parameter names (effects_of_investment, effects_of_investment_per_size, effects_of_retirement, piecewise_effects_of_investment) work correctly -3. Both old and new approaches produce equivalent results -""" - -import warnings - -import pytest - -from flixopt.interface import InvestParameters - - -class TestInvestParametersDeprecation: - """Test suite for InvestParameters parameter deprecation.""" - - def test_new_parameters_no_warnings(self): - """Test that new parameter names don't trigger warnings.""" - with warnings.catch_warnings(): - warnings.simplefilter('error', DeprecationWarning) - # Should not raise DeprecationWarning - params = InvestParameters( - fixed_size=100, - effects_of_investment={'cost': 25000}, - effects_of_investment_per_size={'cost': 1200}, - effects_of_retirement={'cost': 5000}, - ) - assert params.effects_of_investment == {'cost': 25000} - assert params.effects_of_investment_per_size == {'cost': 1200} - assert params.effects_of_retirement == {'cost': 5000} - - def test_old_fix_effects_deprecation_warning(self): - """Test that fix_effects triggers deprecation warning.""" - with pytest.warns(DeprecationWarning, match='fix_effects.*deprecated.*effects_of_investment'): - params = InvestParameters(fix_effects={'cost': 25000}) - # Verify backward compatibility - assert params.effects_of_investment == {'cost': 25000} - - # Accessing the property also triggers warning - with pytest.warns(DeprecationWarning, match='fix_effects.*deprecated.*effects_of_investment'): - assert params.fix_effects == {'cost': 25000} - - def test_old_specific_effects_deprecation_warning(self): - """Test that specific_effects triggers deprecation warning.""" - with pytest.warns(DeprecationWarning, match='specific_effects.*deprecated.*effects_of_investment_per_size'): - params = InvestParameters(specific_effects={'cost': 1200}) - # Verify backward compatibility - assert params.effects_of_investment_per_size == {'cost': 1200} - - # Accessing the property also triggers warning - with pytest.warns(DeprecationWarning, match='specific_effects.*deprecated.*effects_of_investment_per_size'): - assert params.specific_effects == {'cost': 1200} - - def test_old_divest_effects_deprecation_warning(self): - """Test that divest_effects triggers deprecation warning.""" - with pytest.warns(DeprecationWarning, match='divest_effects.*deprecated.*effects_of_retirement'): - params = InvestParameters(divest_effects={'cost': 5000}) - # Verify backward compatibility - assert params.effects_of_retirement == {'cost': 5000} - - # Accessing the property also triggers warning - with pytest.warns(DeprecationWarning, match='divest_effects.*deprecated.*effects_of_retirement'): - assert params.divest_effects == {'cost': 5000} - - def test_old_piecewise_effects_deprecation_warning(self): - """Test that piecewise_effects triggers deprecation warning.""" - from flixopt.interface import Piece, Piecewise, PiecewiseEffects - - test_piecewise = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 100)]), - piecewise_shares={'cost': Piecewise([Piece(800, 600)])}, - ) - with pytest.warns(DeprecationWarning, match='piecewise_effects.*deprecated.*piecewise_effects_of_investment'): - params = InvestParameters(piecewise_effects=test_piecewise) - # Verify backward compatibility - assert params.piecewise_effects_of_investment is test_piecewise - - # Accessing the property also triggers warning - with pytest.warns(DeprecationWarning, match='piecewise_effects.*deprecated.*piecewise_effects_of_investment'): - assert params.piecewise_effects is test_piecewise - - def test_all_old_parameters_together(self): - """Test all old parameters work together with warnings.""" - from flixopt.interface import Piece, Piecewise, PiecewiseEffects - - test_piecewise = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 100)]), - piecewise_shares={'cost': Piecewise([Piece(800, 600)])}, - ) - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', DeprecationWarning) - params = InvestParameters( - fixed_size=100, - fix_effects={'cost': 25000}, - specific_effects={'cost': 1200}, - divest_effects={'cost': 5000}, - piecewise_effects=test_piecewise, - ) - - # Should trigger 4 deprecation warnings (from kwargs) - assert len([warning for warning in w if issubclass(warning.category, DeprecationWarning)]) == 4 - - # Verify all mappings work (accessing new properties - no warnings) - assert params.effects_of_investment == {'cost': 25000} - assert params.effects_of_investment_per_size == {'cost': 1200} - assert params.effects_of_retirement == {'cost': 5000} - assert params.piecewise_effects_of_investment is test_piecewise - - # Verify old attributes still work (accessing deprecated properties - triggers warnings) - with pytest.warns(DeprecationWarning): - assert params.fix_effects == {'cost': 25000} - with pytest.warns(DeprecationWarning): - assert params.specific_effects == {'cost': 1200} - with pytest.warns(DeprecationWarning): - assert params.divest_effects == {'cost': 5000} - with pytest.warns(DeprecationWarning): - assert params.piecewise_effects is test_piecewise - - def test_both_old_and_new_raises_error(self): - """Test that specifying both old and new parameter names raises ValueError.""" - # fix_effects + effects_of_investment - with pytest.raises( - ValueError, match='Either fix_effects or effects_of_investment can be specified, but not both' - ): - InvestParameters( - fix_effects={'cost': 10000}, - effects_of_investment={'cost': 25000}, - ) - - # specific_effects + effects_of_investment_per_size - with pytest.raises( - ValueError, - match='Either specific_effects or effects_of_investment_per_size can be specified, but not both', - ): - InvestParameters( - specific_effects={'cost': 1200}, - effects_of_investment_per_size={'cost': 1500}, - ) - - # divest_effects + effects_of_retirement - with pytest.raises( - ValueError, match='Either divest_effects or effects_of_retirement can be specified, but not both' - ): - InvestParameters( - divest_effects={'cost': 5000}, - effects_of_retirement={'cost': 6000}, - ) - - # piecewise_effects + piecewise_effects_of_investment - from flixopt.interface import Piece, Piecewise, PiecewiseEffects - - test_piecewise1 = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 100)]), - piecewise_shares={'cost': Piecewise([Piece(800, 600)])}, - ) - test_piecewise2 = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 200)]), - piecewise_shares={'cost': Piecewise([Piece(900, 700)])}, - ) - with pytest.raises( - ValueError, - match='Either piecewise_effects or piecewise_effects_of_investment can be specified, but not both', - ): - InvestParameters( - piecewise_effects=test_piecewise1, - piecewise_effects_of_investment=test_piecewise2, - ) - - def test_piecewise_effects_of_investment_new_parameter(self): - """Test that piecewise_effects_of_investment works correctly.""" - from flixopt.interface import Piece, Piecewise, PiecewiseEffects - - test_piecewise = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 100)]), - piecewise_shares={'cost': Piecewise([Piece(800, 600)])}, - ) - - with warnings.catch_warnings(): - warnings.simplefilter('error', DeprecationWarning) - # Should not raise DeprecationWarning when using new parameter - params = InvestParameters(piecewise_effects_of_investment=test_piecewise) - assert params.piecewise_effects_of_investment is test_piecewise - - # Accessing deprecated property triggers warning - with pytest.warns(DeprecationWarning): - assert params.piecewise_effects is test_piecewise - - def test_backward_compatibility_with_features(self): - """Test that old attribute names remain accessible for features.py compatibility.""" - from flixopt.interface import Piece, Piecewise, PiecewiseEffects - - test_piecewise = PiecewiseEffects( - piecewise_origin=Piecewise([Piece(0, 100)]), - piecewise_shares={'cost': Piecewise([Piece(800, 600)])}, - ) - - params = InvestParameters( - effects_of_investment={'cost': 25000}, - effects_of_investment_per_size={'cost': 1200}, - effects_of_retirement={'cost': 5000}, - piecewise_effects_of_investment=test_piecewise, - ) - - # Old properties should still be accessible (for features.py) but with warnings - with pytest.warns(DeprecationWarning): - assert params.fix_effects == {'cost': 25000} - with pytest.warns(DeprecationWarning): - assert params.specific_effects == {'cost': 1200} - with pytest.warns(DeprecationWarning): - assert params.divest_effects == {'cost': 5000} - with pytest.warns(DeprecationWarning): - assert params.piecewise_effects is test_piecewise - - # Properties should return the same objects as the new attributes - with pytest.warns(DeprecationWarning): - assert params.fix_effects is params.effects_of_investment - with pytest.warns(DeprecationWarning): - assert params.specific_effects is params.effects_of_investment_per_size - with pytest.warns(DeprecationWarning): - assert params.divest_effects is params.effects_of_retirement - with pytest.warns(DeprecationWarning): - assert params.piecewise_effects is params.piecewise_effects_of_investment - - def test_empty_parameters(self): - """Test that empty/None parameters work correctly.""" - params = InvestParameters() - - assert params.effects_of_investment == {} - assert params.effects_of_investment_per_size == {} - assert params.effects_of_retirement == {} - assert params.piecewise_effects_of_investment is None - - # Old properties should also be empty (but with warnings) - with pytest.warns(DeprecationWarning): - assert params.fix_effects == {} - with pytest.warns(DeprecationWarning): - assert params.specific_effects == {} - with pytest.warns(DeprecationWarning): - assert params.divest_effects == {} - with pytest.warns(DeprecationWarning): - assert params.piecewise_effects is None - - def test_mixed_old_and_new_parameters(self): - """Test mixing old and new parameter names (not recommended but should work).""" - with warnings.catch_warnings(record=True) as w: - warnings.simplefilter('always', DeprecationWarning) - params = InvestParameters( - effects_of_investment={'cost': 25000}, # New - specific_effects={'cost': 1200}, # Old - effects_of_retirement={'cost': 5000}, # New - ) - - # Should only warn about the old parameter - assert len([warning for warning in w if issubclass(warning.category, DeprecationWarning)]) == 1 - - # All should work correctly - assert params.effects_of_investment == {'cost': 25000} - assert params.effects_of_investment_per_size == {'cost': 1200} - assert params.effects_of_retirement == {'cost': 5000} - - def test_unexpected_keyword_arguments(self): - """Test that unexpected keyword arguments raise TypeError.""" - # Single unexpected argument - with pytest.raises( - TypeError, match="InvestParameters.__init__\\(\\) got unexpected keyword argument\\(s\\): 'invalid_param'" - ): - InvestParameters(invalid_param='value') - - # Multiple unexpected arguments - with pytest.raises( - TypeError, - match="InvestParameters.__init__\\(\\) got unexpected keyword argument\\(s\\): 'param1', 'param2'", - ): - InvestParameters(param1='value1', param2='value2') - - # Mix of valid and invalid arguments - with pytest.raises( - TypeError, match="InvestParameters.__init__\\(\\) got unexpected keyword argument\\(s\\): 'typo'" - ): - InvestParameters(effects_of_investment={'cost': 100}, typo='value') - - def test_optional_parameter_deprecation(self): - """Test that optional parameter triggers deprecation warning and maps to mandatory.""" - # Test optional=True (should map to mandatory=False) - with pytest.warns(DeprecationWarning, match='optional.*deprecated.*mandatory'): - params = InvestParameters(optional=True) - assert params.mandatory is False - - # Test optional=False (should map to mandatory=True) - with pytest.warns(DeprecationWarning, match='optional.*deprecated.*mandatory'): - params = InvestParameters(optional=False) - assert params.mandatory is True - - def test_mandatory_parameter_no_warning(self): - """Test that mandatory parameter doesn't trigger warnings.""" - with warnings.catch_warnings(): - warnings.simplefilter('error', DeprecationWarning) - # Test mandatory=True - params = InvestParameters(mandatory=True) - assert params.mandatory is True - - # Test mandatory=False (explicit) - params = InvestParameters(mandatory=False) - assert params.mandatory is False - - def test_mandatory_default_value(self): - """Test that default value of mandatory is False when neither optional nor mandatory is specified.""" - params = InvestParameters() - assert params.mandatory is False - - def test_both_optional_and_mandatory_no_error(self): - """Test that specifying both optional and mandatory doesn't raise error. - - Note: Conflict checking is disabled for mandatory/optional because mandatory has - a non-None default value (False), making it impossible to distinguish between - an explicit mandatory=False and the default value. The deprecated optional - parameter will take precedence when both are specified. - """ - # When both are specified, optional takes precedence (with deprecation warning) - with pytest.warns(DeprecationWarning, match='optional.*deprecated.*mandatory'): - params = InvestParameters(optional=True, mandatory=False) - # optional=True should result in mandatory=False - assert params.mandatory is False - - with pytest.warns(DeprecationWarning, match='optional.*deprecated.*mandatory'): - params = InvestParameters(optional=False, mandatory=True) - # optional=False should result in mandatory=True (optional takes precedence) - assert params.mandatory is True - - def test_optional_property_deprecation(self): - """Test that accessing optional property triggers deprecation warning.""" - params = InvestParameters(mandatory=True) - - # Reading the property triggers warning - with pytest.warns(DeprecationWarning, match="Property 'optional' is deprecated"): - assert params.optional is False - - # Setting the property triggers warning - with pytest.warns(DeprecationWarning, match="Property 'optional' is deprecated"): - params.optional = True - assert params.mandatory is False diff --git a/tests/test_io.py b/tests/test_io.py index 6d225734e..5b64a6f35 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -1,6 +1,5 @@ import uuid -import numpy as np import pytest import flixopt as fx @@ -83,7 +82,6 @@ def test_flow_system_io(flow_system): def test_suppress_output_file_descriptors(tmp_path): """Test that suppress_output() redirects file descriptors to /dev/null.""" import os - import sys from flixopt.io import suppress_output diff --git a/tests/test_scenarios.py b/tests/test_scenarios.py index 91c9513d6..cdc2ce994 100644 --- a/tests/test_scenarios.py +++ b/tests/test_scenarios.py @@ -594,8 +594,6 @@ def test_selective_scenario_independence(): def test_scenario_parameters_io_persistence(): """Test that scenario_independent_sizes and scenario_independent_flow_rates persist through IO operations.""" - import shutil - import tempfile timesteps = pd.date_range('2023-01-01', periods=24, freq='h') scenarios = pd.Index(['base', 'high'], name='scenario')