diff --git a/flixopt/__init__.py b/flixopt/__init__.py index 34306ae32..aa2bd7129 100644 --- a/flixopt/__init__.py +++ b/flixopt/__init__.py @@ -16,6 +16,7 @@ FlowSystem, FullCalculation, InvestParameters, + InvestTimingParameters, LinearConverter, OnOffParameters, Piece, diff --git a/flixopt/calculation.py b/flixopt/calculation.py index 2f8b15073..e1d95ea06 100644 --- a/flixopt/calculation.py +++ b/flixopt/calculation.py @@ -28,7 +28,7 @@ from .config import CONFIG from .core import DataConverter, Scalar, TimeSeriesData, drop_constant_arrays from .elements import Component -from .features import InvestmentModel +from .features import InvestmentModel, InvestmentTimingModel from .flow_system import FlowSystem from .results import CalculationResults, SegmentedCalculationResults from .solvers import _Solver @@ -113,13 +113,15 @@ def main_results(self) -> Dict[str, Union[Scalar, Dict]]: model.label_of_element: model.size.solution for component in self.flow_system.components.values() for model in component.submodel.all_submodels - if isinstance(model, InvestmentModel) and model.size.solution.max() >= CONFIG.modeling.EPSILON + if isinstance(model, (InvestmentModel, InvestmentTimingModel)) + and model.size.solution.max() >= CONFIG.modeling.EPSILON }, 'Not invested': { model.label_of_element: model.size.solution for component in self.flow_system.components.values() for model in component.submodel.all_submodels - if isinstance(model, InvestmentModel) and model.size.solution.max() < CONFIG.modeling.EPSILON + if isinstance(model, (InvestmentModel, InvestmentTimingModel)) + and model.size.solution.max() < CONFIG.modeling.EPSILON }, }, 'Buses with excess': [ diff --git a/flixopt/commons.py b/flixopt/commons.py index 68412d6fe..68461f10e 100644 --- a/flixopt/commons.py +++ b/flixopt/commons.py @@ -18,7 +18,15 @@ from .effects import Effect from .elements import Bus, Flow from .flow_system import FlowSystem -from .interface import InvestParameters, OnOffParameters, Piece, Piecewise, PiecewiseConversion, PiecewiseEffects +from .interface import ( + InvestParameters, + InvestTimingParameters, + OnOffParameters, + Piece, + Piecewise, + PiecewiseConversion, + PiecewiseEffects, +) __all__ = [ 'TimeSeriesData', @@ -48,4 +56,5 @@ 'results', 'linear_converters', 'solvers', + 'InvestTimingParameters', ] diff --git a/flixopt/components.py b/flixopt/components.py index 2a9010a6d..04328b29d 100644 --- a/flixopt/components.py +++ b/flixopt/components.py @@ -94,7 +94,7 @@ def _plausibility_checks(self) -> None: f'(in {self.label_full}) and variable size is uncommon. Please check if this is intended!' ) - def transform_data(self, flow_system: 'FlowSystem'): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: super().transform_data(flow_system) if self.conversion_factors: self.conversion_factors = self._transform_conversion_factors(flow_system) @@ -207,7 +207,7 @@ def create_model(self, model: FlowSystemModel) -> 'StorageModel': self.submodel = StorageModel(model, self) return self.submodel - def transform_data(self, flow_system: 'FlowSystem') -> None: + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: super().transform_data(flow_system) self.relative_minimum_charge_state = flow_system.fit_to_model_coords( f'{self.label_full}|relative_minimum_charge_state', @@ -393,7 +393,7 @@ def create_model(self, model) -> 'TransmissionModel': self.submodel = TransmissionModel(model, self) return self.submodel - def transform_data(self, flow_system: 'FlowSystem') -> None: + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: super().transform_data(flow_system) self.relative_losses = flow_system.fit_to_model_coords( f'{self.label_full}|relative_losses', self.relative_losses diff --git a/flixopt/effects.py b/flixopt/effects.py index d0b552bf8..7ac25c289 100644 --- a/flixopt/effects.py +++ b/flixopt/effects.py @@ -13,7 +13,7 @@ import numpy as np import xarray as xr -from .core import Scalar, TemporalData, TemporalDataUser +from .core import NonTemporalDataUser, Scalar, TemporalData, TemporalDataUser from .features import ShareAllocationModel from .structure import Element, ElementModel, FlowSystemModel, Interface, Submodel, register_class_for_io @@ -192,7 +192,7 @@ def _do_modeling(self): TemporalEffectsUser = Union[TemporalDataUser, Dict[str, TemporalDataUser]] # User-specified Shares to Effects """ This datatype is used to define a temporal share to an effect by a certain attribute. """ -NonTemporalEffectsUser = Union[Scalar, Dict[str, Scalar]] # User-specified Shares to Effects +NonTemporalEffectsUser = Union[NonTemporalDataUser, Dict[str, NonTemporalDataUser]] # User-specified Shares to Effects """ This datatype is used to define a scalar share to an effect by a certain attribute. """ TemporalEffects = Dict[str, TemporalData] # User-specified Shares to Effects diff --git a/flixopt/elements.py b/flixopt/elements.py index 160dac660..6791bf6ae 100644 --- a/flixopt/elements.py +++ b/flixopt/elements.py @@ -13,8 +13,8 @@ from .config import CONFIG from .core import PlausibilityError, Scalar, TemporalData, TemporalDataUser from .effects import TemporalEffectsUser -from .features import InvestmentModel, ModelingPrimitives, OnOffModel -from .interface import InvestParameters, OnOffParameters +from .features import InvestmentModel, InvestmentTimingModel, ModelingPrimitives, OnOffModel +from .interface import InvestParameters, InvestTimingParameters, OnOffParameters from .modeling import BoundingPatterns, ModelingUtilitiesAbstract from .structure import Element, ElementModel, FlowSystemModel, register_class_for_io @@ -70,7 +70,7 @@ def create_model(self, model: FlowSystemModel) -> 'ComponentModel': self.submodel = ComponentModel(model, self) return self.submodel - def transform_data(self, flow_system: 'FlowSystem') -> None: + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: if self.on_off_parameters is not None: self.on_off_parameters.transform_data(flow_system, self.label_full) @@ -118,7 +118,7 @@ def create_model(self, model: FlowSystemModel) -> 'BusModel': self.submodel = BusModel(model, self) return self.submodel - def transform_data(self, flow_system: 'FlowSystem'): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: self.excess_penalty_per_flow_hour = flow_system.fit_to_model_coords( f'{self.label_full}|excess_penalty_per_flow_hour', self.excess_penalty_per_flow_hour ) @@ -158,7 +158,7 @@ def __init__( self, label: str, bus: str, - size: Union[Scalar, InvestParameters] = None, + size: Union[Scalar, InvestParameters, InvestTimingParameters] = None, fixed_relative_profile: Optional[TemporalDataUser] = None, relative_minimum: TemporalDataUser = 0, relative_maximum: TemporalDataUser = 1, @@ -238,7 +238,7 @@ def create_model(self, model: FlowSystemModel) -> 'FlowModel': self.submodel = FlowModel(model, self) return self.submodel - def transform_data(self, flow_system: 'FlowSystem'): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: self.relative_minimum = flow_system.fit_to_model_coords( f'{self.label_full}|relative_minimum', self.relative_minimum ) @@ -266,7 +266,7 @@ def transform_data(self, flow_system: 'FlowSystem'): if self.on_off_parameters is not None: self.on_off_parameters.transform_data(flow_system, self.label_full) - if isinstance(self.size, InvestParameters): + if isinstance(self.size, (InvestParameters, InvestTimingParameters)): self.size.transform_data(flow_system, self.label_full) else: self.size = flow_system.fit_to_model_coords(f'{self.label_full}|size', self.size, dims=['year', 'scenario']) @@ -276,7 +276,7 @@ def _plausibility_checks(self) -> None: if np.any(self.relative_minimum > self.relative_maximum): raise PlausibilityError(self.label_full + ': Take care, that relative_minimum <= relative_maximum!') - if not isinstance(self.size, InvestParameters) and ( + if not isinstance(self.size, (InvestParameters, InvestTimingParameters)) and ( np.any(self.size == CONFIG.modeling.BIG) and self.fixed_relative_profile is not None ): # Default Size --> Most likely by accident logger.warning( @@ -377,15 +377,28 @@ def _create_on_off_model(self): ) def _create_investment_model(self): - self.add_submodels( - InvestmentModel( - model=self._model, - label_of_element=self.label_of_element, - parameters=self.element.size, - label_of_model=self.label_of_element, - ), - 'investment', - ) + if isinstance(self.element.size, InvestParameters): + self.add_submodels( + InvestmentModel( + model=self._model, + label_of_element=self.label_of_element, + parameters=self.element.size, + label_of_model=self.label_of_element, + ), + 'investment', + ) + elif isinstance(self.element.size, InvestTimingParameters): + self.add_submodels( + InvestmentTimingModel( + model=self._model, + label_of_element=self.label_of_element, + parameters=self.element.size, + label_of_model=self.label_of_element, + ), + 'investment', + ) + else: + raise ValueError(f'Invalid InvestParameters type: {type(self.element.size)}') def _constraint_flow_rate(self): if not self.with_investment and not self.with_on_off: @@ -435,7 +448,7 @@ def with_on_off(self) -> bool: @property def with_investment(self) -> bool: - return isinstance(self.element.size, InvestParameters) + return isinstance(self.element.size, (InvestParameters, InvestTimingParameters)) # Properties for clean access to variables @property @@ -508,7 +521,7 @@ def absolute_flow_rate_bounds(self) -> Tuple[TemporalData, TemporalData]: if not self.with_investment: # Basic case without investment and without OnOff lb = lb_relative * self.element.size - elif not self.element.size.optional: + elif isinstance(self.element.size, InvestParameters) and not self.element.size.optional: # With non-optional Investment lb = lb_relative * self.element.size.minimum_or_fixed_size diff --git a/flixopt/features.py b/flixopt/features.py index d07844c8d..09b37f23f 100644 --- a/flixopt/features.py +++ b/flixopt/features.py @@ -9,9 +9,13 @@ import linopy import numpy as np -from .config import CONFIG from .core import FlowSystemDimensions, NonTemporalData, Scalar, TemporalData -from .interface import InvestParameters, OnOffParameters, Piecewise, PiecewiseEffects +from .interface import ( + InvestParameters, + InvestTimingParameters, + OnOffParameters, + Piecewise, +) from .modeling import BoundingPatterns, ModelingPrimitives, ModelingUtilities from .structure import FlowSystemModel, Submodel @@ -71,6 +75,35 @@ def _create_variables_and_constraints(self): bounds=(self.parameters.minimum_or_fixed_size, self.parameters.maximum_or_fixed_size), ) + def _add_effects(self): + """Add investment effects""" + if self.parameters.fix_effects: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={ + effect: self.is_invested * factor if self.is_invested is not None else factor + for effect, factor in self.parameters.fix_effects.items() + }, + target='invest', + ) + + if self.parameters.divest_effects and self.parameters.optional: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={ + effect: -self.is_invested * factor + factor + for effect, factor in self.parameters.divest_effects.items() + }, + target='invest', + ) + + if self.parameters.specific_effects: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={effect: self.size * factor for effect, factor in self.parameters.specific_effects.items()}, + target='invest', + ) + if self.parameters.piecewise_effects: self.piecewise_effects = self.add_submodels( PiecewiseEffectsModel( @@ -84,8 +117,150 @@ def _create_variables_and_constraints(self): short_name='segments', ) + @property + def size(self) -> linopy.Variable: + """Investment size variable""" + return self._variables['size'] + + @property + def is_invested(self) -> Optional[linopy.Variable]: + """Binary investment decision variable""" + if 'is_invested' not in self._variables: + return None + return self._variables['is_invested'] + + +class InvestmentTimingModel(Submodel): + """ + This feature model is used to model the timing of investments. + + Such an Investment is defined by a size, a year_of_investment, and a year_of_decommissioning. + In between these years, the size of the investment cannot vary. Outside, its 0. + """ + + parameters: InvestTimingParameters + + def __init__( + self, + model: FlowSystemModel, + label_of_element: str, + parameters: InvestTimingParameters, + label_of_model: Optional[str] = None, + ): + self.parameters = parameters + super().__init__(model, label_of_element, label_of_model) + + def _do_modeling(self): + super()._do_modeling() + self._basic_modeling() + self._add_effects() + + self._constraint_investment() + self._constraint_decommissioning() + + def _basic_modeling(self): + size_min, size_max = self.parameters.minimum_or_fixed_size, self.parameters.maximum_or_fixed_size + + ######################################################################## + self.add_variables( + short_name='size', + lower=0, + upper=size_max, + coords=self._model.get_coords(['year', 'scenario']), + ) + self.add_variables( + binary=True, + coords=self._model.get_coords(['year', 'scenario']), + short_name='is_invested', + ) + BoundingPatterns.bounds_with_state( + self, + variable=self.size, + variable_state=self.is_invested, + bounds=(size_min, size_max), + ) + + ######################################################################## + self.add_variables( + binary=True, + coords=self._model.get_coords(['year', 'scenario']), + short_name='size|investment_occurs', + ) + self.add_variables( + binary=True, + coords=self._model.get_coords(['year', 'scenario']), + short_name='size|decommissioning_occurs', + ) + BoundingPatterns.state_transition_bounds( + self, + state_variable=self.is_invested, + switch_on=self.investment_occurs, + switch_off=self.decommissioning_occurs, + name=self.is_invested.name, + previous_state=0, + coord='year', + ) + self.add_constraints( + self.investment_occurs.sum('year') <= 1, + short_name='investment_occurs|once', + ) + self.add_constraints( + self.decommissioning_occurs.sum('year') <= 1, + short_name='decommissioning_occurs|once', + ) + + ######################################################################## + previous_lifetime = self.parameters.previous_lifetime if self.parameters.previous_lifetime is not None else 0 + self.add_variables( + lower=0, + upper=self.parameters.maximum_or_fixed_lifetime + if self.parameters.maximum_or_fixed_lifetime is not None + else self._model.flow_system.years_per_year.sum('year') + previous_lifetime, + coords=self._model.get_coords(['scenario']), + short_name='size|lifetime', + ) + self.add_constraints( + self.lifetime + == (self.is_invested * self._model.flow_system.years_per_year).sum('year') + + self.is_invested.isel(year=0) * previous_lifetime, + short_name='size|lifetime', + ) + if self.parameters.minimum_or_fixed_lifetime is not None: + self.add_constraints( + self.lifetime + self.is_invested.isel(year=-1) * self.parameters.minimum_or_fixed_lifetime + >= self.investment_occurs * self.parameters.minimum_or_fixed_lifetime, + short_name='size|lifetime|lb', + ) + + ######################################################################## + self.add_variables( + coords=self._model.get_coords(['year', 'scenario']), + short_name='size|increase', + lower=0, + upper=size_max, + ) + self.add_variables( + coords=self._model.get_coords(['year', 'scenario']), + short_name='size|decrease', + lower=0, + upper=size_max, + ) + BoundingPatterns.link_changes_to_level_with_binaries( + self, + level_variable=self.size, + increase_variable=self.size_increase, + decrease_variable=self.size_decrease, + increase_binary=self.investment_occurs, + decrease_binary=self.decommissioning_occurs, + name=f'{self.label_of_element}|size|changes', + max_change=size_max, + initial_level=0, + coord='year', + ) + def _add_effects(self): - """Add investment effects""" + """Add investment effects to the model.""" + if self.parameters.fix_effects: self._model.effects.add_share_to_effects( name=self.label_of_element, @@ -96,23 +271,63 @@ def _add_effects(self): target='invest', ) - if self.parameters.divest_effects and self.parameters.optional: + if self.parameters.specific_effects: + self._model.effects.add_share_to_effects( + name=self.label_of_element, + expressions={effect: self.size * factor for effect, factor in self.parameters.specific_effects.items()}, + target='invest', + ) + + if self.parameters.fixed_effects_by_investment_year: + # Effects depending on when the investment is made + remapped_variable = self.investment_occurs.rename({'year': 'year_of_investment'}) + self._model.effects.add_share_to_effects( name=self.label_of_element, expressions={ - effect: -self.is_invested * factor + factor - for effect, factor in self.parameters.divest_effects.items() + effect: (remapped_variable * factor).sum('year_of_investment') + for effect, factor in self.parameters.fixed_effects_by_investment_year.items() }, target='invest', ) - if self.parameters.specific_effects: + if self.parameters.specific_effects_by_investment_year: + # Annual effects proportional to investment size + remapped_variable = self.size_increase.rename({'year': 'year_of_investment'}) + self._model.effects.add_share_to_effects( name=self.label_of_element, - expressions={effect: self.size * factor for effect, factor in self.parameters.specific_effects.items()}, + expressions={ + effect: (remapped_variable * factor).sum('year_of_investment') + for effect, factor in self.parameters.specific_effects_by_investment_year.items() + }, target='invest', ) + def _constraint_investment(self): + if self.parameters.force_investment.sum() > 0: + self.add_constraints( + self.investment_occurs == self.parameters.force_investment, + short_name='size|changes|fixed_start', + ) + else: + self.add_constraints( + self.investment_occurs <= self.parameters.allow_investment, + short_name='size|changes|restricted_start', + ) + + def _constraint_decommissioning(self): + if self.parameters.force_decommissioning.sum() > 0: + self.add_constraints( + self.decommissioning_occurs == self.parameters.force_decommissioning, + short_name='size|changes|fixed_end', + ) + else: + self.add_constraints( + self.decommissioning_occurs <= self.parameters.allow_decommissioning, + short_name='size|changes|restricted_end', + ) + @property def size(self) -> linopy.Variable: """Investment size variable""" @@ -125,6 +340,46 @@ def is_invested(self) -> Optional[linopy.Variable]: return None return self._variables['is_invested'] + @property + def investment_occurs(self) -> linopy.Variable: + """Binary increase decision variable""" + return self._variables['size|investment_occurs'] + + @property + def decommissioning_occurs(self) -> linopy.Variable: + """Binary decrease decision variable""" + return self._variables['size|decommissioning_occurs'] + + @property + def size_decrease(self) -> linopy.Variable: + """Binary decrease decision variable""" + return self._variables['size|decrease'] + + @property + def size_increase(self) -> linopy.Variable: + """Binary increase decision variable""" + return self._variables['size|increase'] + + @property + def investment_used(self) -> linopy.LinearExpression: + """Binary investment decision variable""" + return self.investment_occurs.sum('year') + + @property + def divestment_used(self) -> linopy.LinearExpression: + """Binary investment decision variable""" + return self.decommissioning_occurs.sum('year') + + @property + def lifetime(self) -> linopy.Variable: + """Lifetime variable""" + return self._variables['size|lifetime'] + + @property + def duration(self) -> linopy.Variable: + """Investment duration variable""" + return self._variables['duration'] + class OnOffModel(Submodel): """OnOff model using factory patterns""" @@ -206,6 +461,8 @@ def _do_modeling(self): short_name='consecutive_on_hours', minimum_duration=self.parameters.consecutive_on_hours_min, maximum_duration=self.parameters.consecutive_on_hours_max, + duration_per_step=self.hours_per_step, + duration_dim='time', previous_duration=self._get_previous_on_duration(), ) @@ -217,6 +474,8 @@ def _do_modeling(self): short_name='consecutive_off_hours', minimum_duration=self.parameters.consecutive_off_hours_min, maximum_duration=self.parameters.consecutive_off_hours_max, + duration_per_step=self.hours_per_step, + duration_dim='time', previous_duration=self._get_previous_off_duration(), ) # TODO: diff --git a/flixopt/flow_system.py b/flixopt/flow_system.py index c4b260dec..b852a39b0 100644 --- a/flixopt/flow_system.py +++ b/flixopt/flow_system.py @@ -63,6 +63,7 @@ def __init__( scenarios: Optional[pd.Index] = None, hours_of_last_timestep: Optional[float] = None, hours_of_previous_timesteps: Optional[Union[int, float, np.ndarray]] = None, + years_of_last_year: Optional[int] = None, weights: Optional[NonTemporalDataUser] = None, ): """ @@ -75,16 +76,21 @@ def __init__( If None, the first time increment of time_series is used. This is needed to calculate previous durations (for example consecutive_on_hours). If you use an array, take care that its long enough to cover all previous values! - weights: The weights of each year and scenario. If None, all have the same weight (normalized to 1). Its recommended to scale the weights to sum up to 1. + years_of_last_year: The duration of the last year. Defaults to the duration of the last year interval. + weights: The weights of each year and scenario. If None, all scenarios have the same weight, while the years have the weight of their represented year (all normalized to 1). Its recommended to scale the weights to sum up to 1. """ self.timesteps = self._validate_timesteps(timesteps) - self.timesteps_extra = self._create_timesteps_with_extra(timesteps, hours_of_last_timestep) self.hours_of_previous_timesteps = self._calculate_hours_of_previous_timesteps( timesteps, hours_of_previous_timesteps ) - self.years = None if years is None else self._validate_years(years) + if years is None: + self.years, self.years_per_year, self.years_of_investment = None, None, None + else: + self.years = self._validate_years(years) + self.years_per_year = self.calculate_years_per_year(self.years, years_of_last_year) + self.years_of_investment = self.years.rename('year_of_investment') self.scenarios = None if scenarios is None else self._validate_scenarios(scenarios) @@ -178,6 +184,17 @@ def calculate_hours_per_timestep(timesteps_extra: pd.DatetimeIndex) -> xr.DataAr hours_per_step, coords={'time': timesteps_extra[:-1]}, dims='time', name='hours_per_timestep' ) + @staticmethod + def calculate_years_per_year(years: pd.Index, years_of_last_year: Optional[int] = None) -> xr.DataArray: + """Calculate duration of each timestep as a 1D DataArray.""" + years_per_year = np.diff(years) + return xr.DataArray( + np.append(years_per_year, years_of_last_year or years_per_year[-1]), + coords={'year': years}, + dims='year', + name='years_per_year', + ) + @staticmethod def _calculate_hours_of_previous_timesteps( timesteps: pd.DatetimeIndex, hours_of_previous_timesteps: Optional[Union[float, np.ndarray]] @@ -350,8 +367,8 @@ def fit_to_model_coords( self, name: str, data: Optional[Union[TemporalDataUser, NonTemporalDataUser]], - has_time_dim: bool = True, dims: Optional[Collection[FlowSystemDimensions]] = None, + with_year_of_investment: bool = False, ) -> Optional[Union[TemporalData, NonTemporalData]]: """ Fit data to model coordinate system (currently time, but extensible). @@ -359,8 +376,8 @@ def fit_to_model_coords( Args: name: Name of the data data: Data to fit to model coordinates - has_time_dim: Wether to use the time dimension or not dims: Collection of dimension names to use for fitting. If None, all dimensions are used. + with_year_of_investment: Wether to use the year_of_investment dimension or not. Only if "year" is in dims. Returns: xr.DataArray aligned to model coordinate system. If data is None, returns None. @@ -368,20 +385,14 @@ def fit_to_model_coords( if data is None: return None - if dims is None: - coords = self.coords + coords = self.coords - if not has_time_dim: - warnings.warn( - 'has_time_dim is deprecated. Please pass dims to fit_to_model_coords instead.', - DeprecationWarning, - stacklevel=2, - ) - coords.pop('time') - else: - coords = self.coords + if dims is not None: coords = {k: coords[k] for k in dims if k in coords} + if with_year_of_investment and 'year' in coords: + coords['year_of_investment'] = coords['year'].rename('year_of_investment') + # Rest of your method stays the same, just pass coords if isinstance(data, TimeSeriesData): try: @@ -402,8 +413,8 @@ def fit_effects_to_model_coords( label_prefix: Optional[str], effect_values: Optional[Union[TemporalEffectsUser, NonTemporalEffectsUser]], label_suffix: Optional[str] = None, - has_time_dim: bool = True, dims: Optional[Collection[FlowSystemDimensions]] = None, + with_year_of_investment: bool = False, ) -> Optional[Union[TemporalEffects, NonTemporalEffects]]: """ Transform EffectValues from the user to Internal Datatypes aligned with model coordinates. @@ -417,8 +428,8 @@ def fit_effects_to_model_coords( effect: self.fit_to_model_coords( '|'.join(filter(None, [label_prefix, effect, label_suffix])), value, - has_time_dim=has_time_dim, dims=dims, + with_year_of_investment=with_year_of_investment, ) for effect, value in effect_values_dict.items() } @@ -753,19 +764,29 @@ def sel( if not self.connected_and_transformed: self.connect_and_transform() + ds = self.to_dataset() + # Build indexers dict from non-None parameters indexers = {} if time is not None: indexers['time'] = time if year is not None: indexers['year'] = year + if 'year_of_investment' in ds.dims: + indexers['year_of_investment'] = year if scenario is not None: indexers['scenario'] = scenario if not indexers: return self.copy() # Return a copy when no selection - selected_dataset = self.to_dataset().sel(**indexers) + selected_dataset = ds.sel(**indexers) + if 'year_of_investment' in selected_dataset.coords and selected_dataset.coords['year_of_investment'].size == 1: + logger.critical( + 'Selected a single year while using InvestmentTiming. This is not supported and will lead to Errors ' + 'when trying to create a Calculation from this FlowSystem. Please select multiple years instead, ' + 'or remove the InvestmentTimingParameters.' + ) return self.__class__.from_dataset(selected_dataset) def isel( @@ -788,19 +809,29 @@ def isel( if not self.connected_and_transformed: self.connect_and_transform() + ds = self.to_dataset() + # Build indexers dict from non-None parameters indexers = {} if time is not None: indexers['time'] = time if year is not None: indexers['year'] = year + if 'year_of_investment' in ds.dims: + indexers['year_of_investment'] = year if scenario is not None: indexers['scenario'] = scenario if not indexers: return self.copy() # Return a copy when no selection - selected_dataset = self.to_dataset().isel(**indexers) + selected_dataset = ds.isel(**indexers) + if 'year_of_investment' in selected_dataset.coords and selected_dataset.coords['year_of_investment'].size == 1: + logger.critical( + 'Selected a single year while using InvestmentTiming. This is not supported and will lead to Errors ' + 'when trying to create a Calculation from this FlowSystem. Please select multiple years instead, ' + 'or remove the InvestmentTimingParameters.' + ) return self.__class__.from_dataset(selected_dataset) def resample( diff --git a/flixopt/interface.py b/flixopt/interface.py index 7cb9604ac..736e82f08 100644 --- a/flixopt/interface.py +++ b/flixopt/interface.py @@ -6,6 +6,8 @@ import logging from typing import TYPE_CHECKING, Dict, Iterator, List, Literal, Optional, Union +import xarray as xr + from .config import CONFIG from .core import NonTemporalData, NonTemporalDataUser, Scalar, TemporalDataUser from .structure import Interface, register_class_for_io @@ -32,7 +34,7 @@ def __init__(self, start: TemporalDataUser, end: TemporalDataUser): self.end = end self.has_time_dim = False - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: dims = None if self.has_time_dim else ['year', 'scenario'] self.start = flow_system.fit_to_model_coords(f'{name_prefix}|start', self.start, dims=dims) self.end = flow_system.fit_to_model_coords(f'{name_prefix}|end', self.end, dims=dims) @@ -69,7 +71,7 @@ def __getitem__(self, index) -> Piece: def __iter__(self) -> Iterator[Piece]: return iter(self.pieces) # Enables iteration like for piece in piecewise: ... - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: for i, piece in enumerate(self.pieces): piece.transform_data(flow_system, f'{name_prefix}|Piece{i}') @@ -102,7 +104,7 @@ def has_time_dim(self, value): def items(self): return self.piecewises.items() - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: for name, piecewise in self.piecewises.items(): piecewise.transform_data(flow_system, f'{name_prefix}|{name}') @@ -133,7 +135,7 @@ def has_time_dim(self, value): for piecewise in self.piecewise_shares.values(): piecewise.has_time_dim = value - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: self.piecewise_origin.transform_data(flow_system, f'{name_prefix}|PiecewiseEffects|origin') for effect, piecewise in self.piecewise_shares.items(): piecewise.transform_data(flow_system, f'{name_prefix}|PiecewiseEffects|{effect}') @@ -184,7 +186,7 @@ def __init__( self.maximum_size = maximum_size if maximum_size is not None else CONFIG.modeling.BIG # default maximum self.investment_scenarios = investment_scenarios - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: self._plausibility_checks(flow_system) self.fix_effects = flow_system.fit_effects_to_model_coords( label_prefix=name_prefix, @@ -243,6 +245,276 @@ def maximum_or_fixed_size(self) -> NonTemporalData: return self.fixed_size if self.fixed_size is not None else self.maximum_size +YearOfInvestmentData = NonTemporalDataUser +"""This datatype is used to define things related to the year of investment.""" +YearOfInvestmentDataBool = Union[bool, YearOfInvestmentData] +"""This datatype is used to define things with boolean data related to the year of investment.""" + + +@register_class_for_io +class InvestTimingParameters(Interface): + """ + Investment with fixed start and end years. + + This is the simplest variant - investment is completely scheduled. + No optimization variables needed for timing, just size optimization. + """ + + def __init__( + self, + allow_investment: YearOfInvestmentDataBool = True, + allow_decommissioning: YearOfInvestmentDataBool = True, + force_investment: YearOfInvestmentDataBool = False, # TODO: Allow to simply pass the year + force_decommissioning: YearOfInvestmentDataBool = False, # TODO: Allow to simply pass the year + fixed_lifetime: Optional[Scalar] = None, + minimum_lifetime: Optional[Scalar] = None, + maximum_lifetime: Optional[Scalar] = None, + minimum_size: Optional[YearOfInvestmentData] = None, + maximum_size: Optional[YearOfInvestmentData] = None, + fixed_size: Optional[YearOfInvestmentData] = None, + fix_effects: Optional['NonTemporalEffectsUser'] = None, + specific_effects: Optional['NonTemporalEffectsUser'] = None, # costs per Flow-Unit/Storage-Size/... + fixed_effects_by_investment_year: Optional[xr.DataArray] = None, + specific_effects_by_investment_year: Optional[xr.DataArray] = None, + previous_lifetime: Optional[Scalar] = None, + ): + """ + These parameters are used to include the timing of investments in the model. + Two out of three parameters (year_of_investment, year_of_decommissioning, duration_in_years) can be fixed. + This has a 'year_of_investment' dimension in some parameters: + allow_investment: Whether investment is allowed in a certain year + allow_decommissioning: Whether divestment is allowed in a certain year + duration_between_investment_and_decommissioning: Duration between investment and decommissioning + + Args: + allow_investment: Allow investment in a certain year. By default, allow it in all years. + allow_decommissioning: Allow decommissioning in a certain year. By default, allow it in all years. + force_investment: Force the investment to occur in a certain year. + force_decommissioning: Force the decommissioning to occur in a certain year. + fixed_lifetime: Fix the lifetime of an investment (duration between investment and decommissioning). + minimum_size: Minimum possible size of the investment. Can depend on the year of investment. + maximum_size: Maximum possible size of the investment. Can depend on the year of investment. + fixed_size: Fix the size of the investment. Can depend on the year of investment. Can still be 0 if not forced. + specific_effects: Effects dependent on the size. + These will occur in each year, depending on the size in that year. + fix_effects: Effects of the Investment, independent of the size. + These will occur in each year, depending on wether the size is greater zero in that year. + + fixed_effects_by_investment_year: Effects dependent on the year of investment. + These effects will depend on the year of the investment. The actual effects can occur in other years, + letting you model things like annuities, which depend on when an investment was taken. + The passed xr.DataArray needs to match the FlowSystem dimensions (except time, but including "year_of_investment"). No internal Broadcasting! + "year_of_investment" has the same values as the year dimension. Access it through `flow_system.year_of_investment`. + specific_effects_by_investment_year: Effects dependent on the year of investment and the chosen size. + These effects will depend on the year of the investment. The actual effects can occur in other years, + letting you model things like annuities, which depend on when an investment was taken. + The passed xr.DataArray needs to match the FlowSystem dimensions (except time, but including "year_of_investment"). No internal Broadcasting! + "year_of_investment" has the same values as the year dimension. Access it through `flow_system.year_of_investment`. + + """ + self.minimum_size = minimum_size if minimum_size is not None else CONFIG.modeling.EPSILON + self.maximum_size = maximum_size if maximum_size is not None else CONFIG.modeling.BIG + self.fixed_size = fixed_size + + self.allow_investment = allow_investment + self.allow_decommissioning = allow_decommissioning + self.force_investment = force_investment + self.force_decommissioning = force_decommissioning + + self.maximum_lifetime = maximum_lifetime + self.minimum_lifetime = minimum_lifetime + self.fixed_lifetime = fixed_lifetime + self.previous_lifetime = previous_lifetime + + self.fix_effects: 'NonTemporalEffectsUser' = fix_effects if fix_effects is not None else {} + self.specific_effects: 'NonTemporalEffectsUser' = specific_effects if specific_effects is not None else {} + self.fixed_effects_by_investment_year = ( + fixed_effects_by_investment_year if fixed_effects_by_investment_year is not None else {} + ) + self.specific_effects_by_investment_year = ( + specific_effects_by_investment_year if specific_effects_by_investment_year is not None else {} + ) + + def _plausibility_checks(self, flow_system): + """Validate parameter consistency.""" + if flow_system.years is None: + raise ValueError("InvestTimingParameters requires the flow_system to have a 'years' dimension.") + + if (self.force_investment.sum('year') > 1).any(): + raise ValueError('force_investment can only be True for a single year.') + if (self.force_decommissioning.sum('year') > 1).any(): + raise ValueError('force_decommissioning can only be True for a single year.') + + if (self.force_investment.sum('year') == 1).any() and (self.force_decommissioning.sum('year') == 1).any(): + year_of_forced_investment = ( + self.force_investment.where(self.force_investment) * self.force_investment.year + ).sum('year') + year_of_forced_decommissioning = ( + self.force_decommissioning.where(self.force_decommissioning) * self.force_decommissioning.year + ).sum('year') + if not (year_of_forced_investment < year_of_forced_decommissioning).all(): + raise ValueError( + f'force_investment needs to be before force_decommissioning. Got:\n' + f'{self.force_investment}\nand\n{self.force_decommissioning}' + ) + + if self.previous_lifetime is not None: + if self.fixed_size is None: + # TODO: Might be only a warning + raise ValueError('previous_lifetime can only be used if fixed_size is defined.') + if self.force_investment is False: + # TODO: Might be only a warning + raise ValueError('previous_lifetime can only be used if force_investment is True.') + + if self.minimum_or_fixed_lifetime is not None and self.maximum_or_fixed_lifetime is not None: + years = flow_system.years.values + + infeasible_years = [] + for i, inv_year in enumerate(years[:-1]): # Exclude last year + future_years = years[i + 1 :] # All years after investment + min_decomm = self.minimum_or_fixed_lifetime + inv_year + max_decomm = self.maximum_or_fixed_lifetime + inv_year + if max_decomm >= years[-1]: + continue + + # Check if any future year falls in decommissioning window + future_years_da = xr.DataArray(future_years, dims=['year']) + valid_decomm = ((min_decomm <= future_years_da) & (future_years_da <= max_decomm)).any('year') + if not valid_decomm.all(): + infeasible_years.append(inv_year) + + if infeasible_years: + logger.warning( + f'Plausibility Check in {self.__class__.__name__}:\n' + f' Investment years with no feasible decommissioning: {[int(year) for year in infeasible_years]}\n' + f' Consider relaxing the lifetime constraints or including more years into your model.\n' + f' Lifetime:\n' + f' min={self.minimum_or_fixed_lifetime}\n' + f' max={self.maximum_or_fixed_lifetime}\n' + f' Model years: {list(flow_system.years)}\n' + ) + + specify_timing = ( + (self.fixed_lifetime is not None) + + bool((self.force_investment.sum('year') > 1).any()) + + bool((self.force_decommissioning.sum('year') > 1).any()) + ) + + if specify_timing in (0, 3): + # TODO: Is there a valid use case for this? Should this be checked at all? + logger.warning( + 'Either the the lifetime of an investment should be fixed, or the investment or decommissioning ' + 'needs to be forced in a certain year.' + ) + + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: + """Transform all parameter data to match the flow system's coordinate structure.""" + self.fix_effects = flow_system.fit_effects_to_model_coords( + label_prefix=name_prefix, + effect_values=self.fix_effects, + label_suffix='fix_effects', + dims=['year', 'scenario'], + ) + self.specific_effects = flow_system.fit_effects_to_model_coords( + label_prefix=name_prefix, + effect_values=self.specific_effects, + label_suffix='specific_effects', + dims=['year', 'scenario'], + ) + self.maximum_lifetime = flow_system.fit_to_model_coords( + f'{name_prefix}|maximum_lifetime', self.maximum_lifetime, dims=['scenario'] + ) + self.minimum_lifetime = flow_system.fit_to_model_coords( + f'{name_prefix}|minimum_lifetime', self.minimum_lifetime, dims=['scenario'] + ) + self.fixed_lifetime = flow_system.fit_to_model_coords( + f'{name_prefix}|fixed_lifetime', self.fixed_lifetime, dims=['scenario'] + ) + + self.force_investment = flow_system.fit_to_model_coords( + f'{name_prefix}|force_investment', self.force_investment, dims=['year', 'scenario'] + ) + self.force_decommissioning = flow_system.fit_to_model_coords( + f'{name_prefix}|force_decommissioning', self.force_decommissioning, dims=['year', 'scenario'] + ) + + self.minimum_size = flow_system.fit_to_model_coords( + f'{name_prefix}|minimum_size', self.minimum_size, dims=['year', 'scenario'] + ) + self.maximum_size = flow_system.fit_to_model_coords( + f'{name_prefix}|maximum_size', self.maximum_size, dims=['year', 'scenario'] + ) + if self.fixed_size is not None: + self.fixed_size = flow_system.fit_to_model_coords( + f'{name_prefix}|fixed_size', self.fixed_size, dims=['year', 'scenario'] + ) + + # TODO: self.previous_size to only scenarios + + # No Broadcasting! Until a safe way is established, we need to do check for this! + self.fixed_effects_by_investment_year = flow_system.effects.create_effect_values_dict( + self.fixed_effects_by_investment_year + ) + for effect, da in self.fixed_effects_by_investment_year.items(): + dims = set(da.coords) + if not {'year_of_investment', 'year'}.issubset(dims): + raise ValueError( + f'fixed_effects_by_investment_year need to have a "year_of_investment" dimension and a ' + f'"year" dimension. Got {dims} for effect {effect}' + ) + self.specific_effects_by_investment_year = flow_system.effects.create_effect_values_dict( + self.specific_effects_by_investment_year + ) + for effect, da in self.specific_effects_by_investment_year.items(): + dims = set(da.coords) + if not {'year_of_investment', 'year'}.issubset(dims): + raise ValueError( + f'specific_effects_by_investment_year need to have a "year_of_investment" dimension and a ' + f'"year" dimension. Got {dims} for effect {effect}' + ) + self.fixed_effects_by_investment_year = flow_system.fit_effects_to_model_coords( + label_prefix=name_prefix, + effect_values=self.fixed_effects_by_investment_year, + label_suffix='fixed_effects_by_investment_year', + dims=['year', 'scenario'], + with_year_of_investment=True, + ) + self.specific_effects_by_investment_year = flow_system.fit_effects_to_model_coords( + label_prefix=name_prefix, + effect_values=self.specific_effects_by_investment_year, + label_suffix='specific_effects_by_investment_year', + dims=['year', 'scenario'], + with_year_of_investment=True, + ) + + self._plausibility_checks(flow_system) + + @property + def minimum_or_fixed_size(self) -> NonTemporalData: + """Get the effective minimum size (fixed size takes precedence).""" + return self.fixed_size if self.fixed_size is not None else self.minimum_size + + @property + def maximum_or_fixed_size(self) -> NonTemporalData: + """Get the effective maximum size (fixed size takes precedence).""" + return self.fixed_size if self.fixed_size is not None else self.maximum_size + + @property + def is_fixed_size(self) -> bool: + """Check if investment size is fixed.""" + return self.fixed_size is not None + + @property + def minimum_or_fixed_lifetime(self) -> NonTemporalData: + """Get the effective minimum lifetime (fixed lifetime takes precedence).""" + return self.fixed_lifetime if self.fixed_lifetime is not None else self.minimum_lifetime + + @property + def maximum_or_fixed_lifetime(self) -> NonTemporalData: + """Get the effective maximum lifetime (fixed lifetime takes precedence).""" + return self.fixed_lifetime if self.fixed_lifetime is not None else self.maximum_lifetime + + @register_class_for_io class OnOffParameters(Interface): def __init__( @@ -293,7 +565,7 @@ def __init__( self.switch_on_total_max: Scalar = switch_on_total_max self.force_switch_on: bool = force_switch_on - def transform_data(self, flow_system: 'FlowSystem', name_prefix: str): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: self.effects_per_switch_on = flow_system.fit_effects_to_model_coords( name_prefix, self.effects_per_switch_on, 'per_switch_on' ) diff --git a/flixopt/modeling.py b/flixopt/modeling.py index 11880c5e8..c5205b07f 100644 --- a/flixopt/modeling.py +++ b/flixopt/modeling.py @@ -229,6 +229,8 @@ def consecutive_duration_tracking( short_name: str = None, minimum_duration: Optional[TemporalData] = None, maximum_duration: Optional[TemporalData] = None, + duration_dim: str = 'time', + duration_per_step: Union[Scalar, xr.DataArray] = None, previous_duration: TemporalData = 0, ) -> Tuple[linopy.Variable, Tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint]]: """ @@ -236,9 +238,9 @@ def consecutive_duration_tracking( Mathematical formulation: duration[t] ≤ state[t] * M ∀t - duration[t+1] ≤ duration[t] + hours_per_step[t] ∀t - duration[t+1] ≥ duration[t] + hours_per_step[t] + (state[t+1] - 1) * M ∀t - duration[0] = (hours_per_step[0] + previous_duration) * state[0] + duration[t+1] ≤ duration[t] + duration_per_step[t] ∀t + duration[t+1] ≥ duration[t] + duration_per_step[t] + (state[t+1] - 1) * M ∀t + duration[0] = (duration_per_step[0] + previous_duration) * state[0] If minimum_duration provided: duration[t] ≥ (state[t-1] - state[t]) * minimum_duration[t-1] ∀t > 0 @@ -257,14 +259,13 @@ def consecutive_duration_tracking( if not isinstance(model, Submodel): raise ValueError('ModelingPrimitives.sum_up_variable() can only be used with a Submodel') - hours_per_step = model.hours_per_step - mega = hours_per_step.sum('time') + previous_duration # Big-M value + mega = duration_per_step.sum(duration_dim) + previous_duration # Big-M value # Duration variable duration = model.add_variables( lower=0, upper=maximum_duration if maximum_duration is not None else mega, - coords=model.get_coords(), + coords=state_variable.coords, name=name, short_name=short_name, ) @@ -274,25 +275,26 @@ def consecutive_duration_tracking( # Upper bound: duration[t] ≤ state[t] * M constraints['ub'] = model.add_constraints(duration <= state_variable * mega, name=f'{duration.name}|ub') - # Forward constraint: duration[t+1] ≤ duration[t] + hours_per_step[t] + # Forward constraint: duration[t+1] ≤ duration[t] + duration_per_step[t] constraints['forward'] = model.add_constraints( - duration.isel(time=slice(1, None)) - <= duration.isel(time=slice(None, -1)) + hours_per_step.isel(time=slice(None, -1)), + duration.isel({duration_dim: slice(1, None)}) + <= duration.isel({duration_dim: slice(None, -1)}) + duration_per_step.isel({duration_dim: slice(None, -1)}), name=f'{duration.name}|forward', ) - # Backward constraint: duration[t+1] ≥ duration[t] + hours_per_step[t] + (state[t+1] - 1) * M + # Backward constraint: duration[t+1] ≥ duration[t] + duration_per_step[t] + (state[t+1] - 1) * M constraints['backward'] = model.add_constraints( - duration.isel(time=slice(1, None)) - >= duration.isel(time=slice(None, -1)) - + hours_per_step.isel(time=slice(None, -1)) - + (state_variable.isel(time=slice(1, None)) - 1) * mega, + duration.isel({duration_dim: slice(1, None)}) + >= duration.isel({duration_dim: slice(None, -1)}) + + duration_per_step.isel({duration_dim: slice(None, -1)}) + + (state_variable.isel({duration_dim: slice(1, None)}) - 1) * mega, name=f'{duration.name}|backward', ) - # Initial condition: duration[0] = (hours_per_step[0] + previous_duration) * state[0] + # Initial condition: duration[0] = (duration_per_step[0] + previous_duration) * state[0] constraints['initial'] = model.add_constraints( - duration.isel(time=0) == (hours_per_step.isel(time=0) + previous_duration) * state_variable.isel(time=0), + duration.isel({duration_dim: 0}) + == (duration_per_step.isel({duration_dim: 0}) + previous_duration) * state_variable.isel({duration_dim: 0}), name=f'{duration.name}|initial', ) @@ -300,15 +302,18 @@ def consecutive_duration_tracking( if minimum_duration is not None: constraints['lb'] = model.add_constraints( duration - >= (state_variable.isel(time=slice(None, -1)) - state_variable.isel(time=slice(1, None))) - * minimum_duration.isel(time=slice(None, -1)), + >= ( + state_variable.isel({duration_dim: slice(None, -1)}) + - state_variable.isel({duration_dim: slice(1, None)}) + ) + * minimum_duration.isel({duration_dim: slice(None, -1)}), name=f'{duration.name}|lb', ) # Handle initial condition for minimum duration - if previous_duration > 0 and previous_duration < minimum_duration.isel(time=0).max(): + if previous_duration > 0 and previous_duration < minimum_duration.isel({duration_dim: 0}).max(): constraints['initial_lb'] = model.add_constraints( - state_variable.isel(time=0) == 1, name=f'{duration.name}|initial_lb' + state_variable.isel({duration_dim: 0}) == 1, name=f'{duration.name}|initial_lb' ) variables = {'duration': duration} @@ -588,3 +593,146 @@ def state_transition_bounds( mutex = model.add_constraints(switch_on + switch_off <= 1, name=f'{name}|mutex') return transition, initial, mutex + + @staticmethod + def continuous_transition_bounds( + model: Submodel, + continuous_variable: linopy.Variable, + switch_on: linopy.Variable, + switch_off: linopy.Variable, + name: str, + max_change: Union[float, xr.DataArray], + previous_value: Union[float, xr.DataArray] = 0, + coord: str = 'time', + ) -> Tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint, linopy.Constraint]: + """ + Constrains a continuous variable to only change when switch variables are active. + + Mathematical formulation: + -max_change * (switch_on[t] + switch_off[t]) <= continuous[t] - continuous[t-1] <= max_change * (switch_on[t] + switch_off[t]) ∀t > 0 + -max_change * (switch_on[0] + switch_off[0]) <= continuous[0] - previous_value <= max_change * (switch_on[0] + switch_off[0]) + switch_on[t], switch_off[t] ∈ {0, 1} + + This ensures the continuous variable can only change when switch_on or switch_off is 1. + When both switches are 0, the variable must stay exactly constant. + + Args: + model: The submodel to add constraints to + continuous_variable: The continuous variable to constrain + switch_on: Binary variable indicating when changes are allowed (typically transitions to active state) + switch_off: Binary variable indicating when changes are allowed (typically transitions to inactive state) + name: Base name for the constraints + max_change: Maximum possible change in the continuous variable (Big-M value) + previous_value: Initial value of the continuous variable before first period + coord: Coordinate name for time dimension + + Returns: + Tuple of constraints: (transition_upper, transition_lower, initial_upper, initial_lower) + """ + if not isinstance(model, Submodel): + raise ValueError('BoundingPatterns.continuous_transition_bounds() can only be used with a Submodel') + + # Transition constraints for t > 0: continuous variable can only change when switches are active + transition_upper = model.add_constraints( + continuous_variable.isel({coord: slice(1, None)}) - continuous_variable.isel({coord: slice(None, -1)}) + <= max_change * (switch_on.isel({coord: slice(1, None)}) + switch_off.isel({coord: slice(1, None)})), + name=f'{name}|transition_ub', + ) + + transition_lower = model.add_constraints( + continuous_variable.isel({coord: slice(None, -1)}) - continuous_variable.isel({coord: slice(1, None)}) + <= max_change * (switch_on.isel({coord: slice(1, None)}) + switch_off.isel({coord: slice(1, None)})), + name=f'{name}|transition_lb', + ) + + # Initial constraints for t = 0 + initial_upper = model.add_constraints( + continuous_variable.isel({coord: 0}) - previous_value + <= max_change * (switch_on.isel({coord: 0}) + switch_off.isel({coord: 0})), + name=f'{name}|initial_ub', + ) + + initial_lower = model.add_constraints( + -continuous_variable.isel({coord: 0}) + previous_value + <= max_change * (switch_on.isel({coord: 0}) + switch_off.isel({coord: 0})), + name=f'{name}|initial_lb', + ) + + return transition_upper, transition_lower, initial_upper, initial_lower + + @staticmethod + def link_changes_to_level_with_binaries( + model: Submodel, + level_variable: linopy.Variable, + increase_variable: linopy.Variable, + decrease_variable: linopy.Variable, + increase_binary: linopy.Variable, + decrease_binary: linopy.Variable, + name: str, + max_change: Union[float, xr.DataArray], + initial_level: Union[float, xr.DataArray] = 0, + coord: str = 'year', + ) -> Tuple[linopy.Constraint, linopy.Constraint, linopy.Constraint, linopy.Constraint, linopy.Constraint]: + """ + Link changes to level evolution with binary control and mutual exclusivity. + + Creates the complete constraint system for ALL time periods: + 1. level[0] = initial_level + increase[0] - decrease[0] + 2. level[t] = level[t-1] + increase[t] - decrease[t] ∀t > 0 + 3. increase[t] <= max_change * increase_binary[t] ∀t + 4. decrease[t] <= max_change * decrease_binary[t] ∀t + 5. increase_binary[t] + decrease_binary[t] <= 1 ∀t + + Args: + model: The submodel to add constraints to + increase_variable: Incremental additions for ALL periods (>= 0) + decrease_variable: Incremental reductions for ALL periods (>= 0) + increase_binary: Binary indicators for increases for ALL periods + decrease_binary: Binary indicators for decreases for ALL periods + level_variable: Level variable for ALL periods + name: Base name for constraints + max_change: Maximum change per period + initial_level: Starting level before first period + coord: Time coordinate name + + Returns: + Tuple of (initial_constraint, transition_constraints, increase_bounds, decrease_bounds, mutual_exclusion) + """ + if not isinstance(model, Submodel): + raise ValueError('BoundingPatterns.link_changes_to_level_with_binaries() can only be used with a Submodel') + + # 1. Initial period: level[0] - initial_level = increase[0] - decrease[0] + initial_constraint = model.add_constraints( + level_variable.isel({coord: 0}) - initial_level + == increase_variable.isel({coord: 0}) - decrease_variable.isel({coord: 0}), + name=f'{name}|initial_level', + ) + + # 2. Transition periods: level[t] = level[t-1] + increase[t] - decrease[t] for t > 0 + transition_constraints = model.add_constraints( + level_variable.isel({coord: slice(1, None)}) + == level_variable.isel({coord: slice(None, -1)}) + + increase_variable.isel({coord: slice(1, None)}) + - decrease_variable.isel({coord: slice(1, None)}), + name=f'{name}|transitions', + ) + + # 3. Increase bounds: increase[t] <= max_change * increase_binary[t] for all t + increase_bounds = model.add_constraints( + increase_variable <= increase_binary * max_change, + name=f'{name}|increase_bounds', + ) + + # 4. Decrease bounds: decrease[t] <= max_change * decrease_binary[t] for all t + decrease_bounds = model.add_constraints( + decrease_variable <= decrease_binary * max_change, + name=f'{name}|decrease_bounds', + ) + + # 5. Mutual exclusivity: increase_binary[t] + decrease_binary[t] <= 1 for all t + mutual_exclusion = model.add_constraints( + increase_binary + decrease_binary <= 1, + name=f'{name}|mutual_exclusion', + ) + + return initial_constraint, transition_constraints, increase_bounds, decrease_bounds, mutual_exclusion diff --git a/flixopt/structure.py b/flixopt/structure.py index 24934547b..9fdc9206f 100644 --- a/flixopt/structure.py +++ b/flixopt/structure.py @@ -178,7 +178,11 @@ def get_coords( def weights(self) -> Union[int, xr.DataArray]: """Returns the scenario weights of the FlowSystem. If None, return weights that are normalized to 1 (one)""" if self.flow_system.weights is None: - weights = self.flow_system.fit_to_model_coords('weights', 1, dims=['year', 'scenario']) + weights = self.flow_system.fit_to_model_coords( + 'weights', + 1 if self.flow_system.years is None else self.flow_system.years_per_year, + dims=['year', 'scenario'], + ) return weights / weights.sum() @@ -225,11 +229,12 @@ class Interface: transform_data(flow_system): Transform data to match FlowSystem dimensions """ - def transform_data(self, flow_system: 'FlowSystem'): + def transform_data(self, flow_system: 'FlowSystem', name_prefix: str = '') -> None: """Transform the data of the interface to match the FlowSystem's dimensions. Args: flow_system: The FlowSystem containing timing and dimensional information + name_prefix: The prefix to use for the names of the variables. Defaults to '', which results in no prefix. Raises: NotImplementedError: Must be implemented by subclasses diff --git a/tests/test_models.py b/tests/test_models.py new file mode 100644 index 000000000..7545371b5 --- /dev/null +++ b/tests/test_models.py @@ -0,0 +1,231 @@ +from typing import Union + +import linopy +import numpy as np +import pandas as pd +import pytest +import xarray as xr + +import flixopt as fx + +from .conftest import ( + Buses, + Effects, + LoadProfiles, + Sinks, + Sources, + assert_conequal, + assert_sets_equal, + assert_var_equal, + create_linopy_model, +) + + +def calculate_annual_payment(total_cost: float, remaining_years: int, discount_rate: float) -> float: + """Calculate annualized payment for given remaining years. + + Args: + total_cost: Total cost to be annualized. + remaining_years: Number of remaining years. + discount_rate: Discount rate for annualization. + + Returns: + Annual payment amount. + """ + if remaining_years == 1: + return total_cost + + return ( + total_cost + * (discount_rate * (1 + discount_rate) ** remaining_years) + / ((1 + discount_rate) ** remaining_years - 1) + ) + + +def create_annualized_effects( + year_of_investments: Union[range, list, pd.Index], + all_years: Union[range, list, pd.Index], + total_cost: float, + discount_rate: float, + horizon_end: int, + extra_dim: str = 'year_of_investment', +) -> xr.DataArray: + """Create a 2D effects array for annualized costs. + + Creates an array where investing in year Y results in annualized costs + applied to years Y through horizon_end. + + Args: + year_of_investments: Years when investment decisions can be made. + all_years: All years in the model (for the 'year' dimension). + total_cost: Total upfront cost to be annualized. + discount_rate: Discount rate for annualization calculation. + horizon_end: Last year when effects apply. + extra_dim: Name for the investment year dimension. + + Returns: + xr.DataArray with dimensions [extra_dim, 'year'] containing annualized costs. + """ + + # Convert to lists for easier iteration + year_of_investments_list = list(year_of_investments) + all_years_list = list(all_years) + + # Initialize cost matrix + cost_matrix = np.zeros((len(year_of_investments_list), len(all_years_list))) + + # Fill matrix with annualized costs + for i, year_of_investment in enumerate(year_of_investments_list): + remaining_years = horizon_end - year_of_investment + 1 + if remaining_years > 0: + annual_cost = calculate_annual_payment(total_cost, remaining_years, discount_rate) + + # Apply cost to years from year_of_investment through horizon_end + for j, cost_year in enumerate(all_years_list): + if year_of_investment <= cost_year <= horizon_end: + cost_matrix[i, j] = annual_cost + + return xr.DataArray( + cost_matrix, coords={extra_dim: year_of_investments_list, 'year': all_years_list}, dims=[extra_dim, 'year'] + ) + + +@pytest.fixture +def flow_system() -> fx.FlowSystem: + """Create basic elements for component testing with coordinate parametrization.""" + years = pd.Index([2020, 2021, 2022, 2023, 2024, 2026, 2028, 2030], name='year') + timesteps = pd.date_range('2020-01-01', periods=24, freq='h', name='time') + flow_system = fx.FlowSystem(timesteps=timesteps, years=years) + + thermal_load = LoadProfiles.random_thermal(len(timesteps)) + p_el = LoadProfiles.random_electrical(len(timesteps)) + + costs = Effects.costs() + heat_load = Sinks.heat_load(thermal_load) + gas_source = Sources.gas_with_costs() + electricity_sink = Sinks.electricity_feed_in(p_el) + + flow_system.add_elements(*Buses.defaults()) + flow_system.buses['Fernwärme'].excess_penalty_per_flow_hour = 0 + flow_system.add_elements(costs, heat_load, gas_source, electricity_sink) + + return flow_system + + +class TestYearAwareInvestParameters: + """Test the YearAwareInvestParameters interface.""" + + def test_basic_initialization(self): + """Test basic parameter initialization.""" + params = fx.YearAwareInvestParameters( + minimum_size=10, + maximum_size=100, + ) + + assert params.minimum_size == 10 + assert params.maximum_size == 100 + assert params.fixed_size is None + assert not params.allow_divestment + assert params.fixed_year_of_investment is None + assert params.fixed_year_of_decommissioning is None + assert params.fixed_duration is None + + def test_fixed_size_initialization(self): + """Test initialization with fixed size.""" + params = fx.YearAwareInvestParameters(fixed_size=50) + + assert params.minimum_or_fixed_size == 50 + assert params.maximum_or_fixed_size == 50 + assert params.is_fixed_size + + def test_timing_constraints_initialization(self): + """Test initialization with various timing constraints.""" + params = fx.YearAwareInvestParameters( + fixed_year_of_investment=2, + minimum_duration=3, + maximum_duration=5, + earliest_year_of_decommissioning=4, + ) + + assert params.fixed_year_of_investment == 2 + assert params.minimum_duration == 3 + assert params.maximum_duration == 5 + assert params.earliest_year_of_decommissioning == 4 + + def test_effects_initialization(self): + """Test initialization with effects.""" + params = fx.YearAwareInvestParameters( + effects_of_investment={'costs': 1000}, + effects_of_investment_per_size={'costs': 100}, + allow_divestment=True, + effects_of_divestment={'costs': 500}, + effects_of_divestment_per_size={'costs': 50}, + ) + + assert params.effects_of_investment == {'costs': 1000} + assert params.effects_of_investment_per_size == {'costs': 100} + assert params.allow_divestment + assert params.effects_of_divestment == {'costs': 500} + assert params.effects_of_divestment_per_size == {'costs': 50} + + def test_property_methods(self): + """Test property methods.""" + # Test with fixed size + params_fixed = fx.YearAwareInvestParameters(fixed_size=50) + assert params_fixed.minimum_or_fixed_size == 50 + assert params_fixed.maximum_or_fixed_size == 50 + assert params_fixed.is_fixed_size + + # Test with min/max size + params_range = fx.YearAwareInvestParameters(minimum_size=10, maximum_size=100) + assert params_range.minimum_or_fixed_size == 10 + assert params_range.maximum_or_fixed_size == 100 + assert not params_range.is_fixed_size + + +class TestYearAwareInvestmentModelDirect: + """Test the YearAwareInvestmentModel class directly with linopy.""" + + def test_flow_invest_new(self, flow_system): + da = xr.DataArray( + [10] * 8, + coords=(flow_system.years_of_investment,), + ).expand_dims(year=flow_system.years) + da = da.where(da.year == da.year_of_investment).fillna(0) + + flow = fx.Flow( + 'Wärme', + bus='Fernwärme', + size=fx.InvestTimingParameters( + # year_of_decommissioning=2030, + minimum_lifetime=2, + maximum_lifetime=3, + minimum_size=9, + maximum_size=10, + specific_effects=xr.DataArray( + [25, 30, 35, 40, 45, 50, 55, 60], + coords=(flow_system.years,), + ) + * -0, + # fix_effects=-2e3, + specific_effects_by_investment_year=-1 * da, + ), + relative_maximum=np.linspace(0.5, 1, flow_system.timesteps.size), + ) + + flow_system.add_elements(fx.Source('Source', source=flow)) + calculation = fx.FullCalculation('GenericName', flow_system) + calculation.do_modeling() + # calculation.model.add_constraints(calculation.model['Source(Wärme)|is_invested'].sel(year=2022) == 1) + calculation.solve(fx.solvers.GurobiSolver(0, 60)) + + ds = calculation.results['Source'].solution + filtered_ds_year = ds[[v for v in ds.data_vars if ds[v].dims == ('year',)]] + print(filtered_ds_year.round(0).to_pandas().T) + + filtered_ds_scalar = ds[[v for v in ds.data_vars if ds[v].dims == tuple()]] + print(filtered_ds_scalar.round(0).to_pandas().T) + + print(calculation.results.solution['costs(invest)|total'].to_pandas()) + + print('##')