diff --git a/pyproject.toml b/pyproject.toml index 61b1944..56f53ad 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.7.0" +version = "4.7.1" [project.optional-dependencies] aiobreaker = ["aiobreaker>=0.3.0"] diff --git a/src/cqrs/saga/compensation.py b/src/cqrs/saga/compensation.py index c058cc4..8c7f194 100644 --- a/src/cqrs/saga/compensation.py +++ b/src/cqrs/saga/compensation.py @@ -54,6 +54,14 @@ async def compensate_steps( """ await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING) + if not completed_steps: + logger.info( + f"Saga {self._saga_id}: completed_steps is empty, " + "skipping compensation (no step.compensate() will be called).", + ) + await self._storage.update_status(self._saga_id, SagaStatus.FAILED) + return + # Load history to skip already compensated steps history = await self._storage.get_step_history(self._saga_id) compensated_steps = { diff --git a/src/cqrs/saga/saga.py b/src/cqrs/saga/saga.py index acf0401..9650439 100644 --- a/src/cqrs/saga/saga.py +++ b/src/cqrs/saga/saga.py @@ -234,6 +234,14 @@ async def __aiter__( for step in reconstructed_steps ] + if not self._completed_steps: + logger.warning( + f"Saga {self._saga_id}: no completed steps to compensate " + "(saga failed before any step finished 'act', or step names in " + "storage do not match saga step class names). " + "Marking as FAILED without calling compensate().", + ) + # Immediately proceed to compensation - no forward execution await self._compensate() diff --git a/src/cqrs/saga/storage/memory.py b/src/cqrs/saga/storage/memory.py index adc0c40..d89ba6f 100644 --- a/src/cqrs/saga/storage/memory.py +++ b/src/cqrs/saga/storage/memory.py @@ -123,7 +123,7 @@ async def get_sagas_for_recovery( max_recovery_attempts: int = 5, stale_after_seconds: int | None = None, ) -> list[uuid.UUID]: - recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING, SagaStatus.FAILED) + recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING) now = datetime.datetime.now(datetime.timezone.utc) threshold = ( (now - datetime.timedelta(seconds=stale_after_seconds)) diff --git a/src/cqrs/saga/storage/protocol.py b/src/cqrs/saga/storage/protocol.py index 74c6157..1ebf80c 100644 --- a/src/cqrs/saga/storage/protocol.py +++ b/src/cqrs/saga/storage/protocol.py @@ -90,9 +90,10 @@ async def get_sagas_for_recovery( updated). None means no staleness filter (backward compatible). Returns: - List of saga IDs (RUNNING, COMPENSATING, or FAILED), ordered by - updated_at ascending, with recovery_attempts < max_recovery_attempts, - and optionally updated_at older than the staleness threshold. + List of saga IDs (RUNNING or COMPENSATING only; FAILED sagas are + not included), ordered by updated_at ascending, with + recovery_attempts < max_recovery_attempts, and optionally + updated_at older than the staleness threshold. """ @abc.abstractmethod diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index a4da774..cdc6104 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -321,7 +321,6 @@ async def get_sagas_for_recovery( recoverable = ( SagaStatus.RUNNING, SagaStatus.COMPENSATING, - SagaStatus.FAILED, ) async with self.session_factory() as session: stmt = ( diff --git a/tests/integration/test_saga_storage_memory.py b/tests/integration/test_saga_storage_memory.py index 2aadc44..1c5115c 100644 --- a/tests/integration/test_saga_storage_memory.py +++ b/tests/integration/test_saga_storage_memory.py @@ -172,7 +172,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas( storage: MemorySagaStorage, test_context: dict[str, str], ) -> None: - """Positive: returns RUNNING, COMPENSATING, FAILED sagas only.""" + """Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded.""" id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4() for sid in (id1, id2, id3): await storage.create_saga(saga_id=sid, name="saga", context=test_context) @@ -181,8 +181,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas( await storage.update_status(id3, SagaStatus.FAILED) ids = await storage.get_sagas_for_recovery(limit=10) - assert set(ids) == {id1, id2, id3} - assert len(ids) == 3 + assert set(ids) == {id1, id2} + assert id3 not in ids + assert len(ids) == 2 async def test_get_sagas_for_recovery_respects_limit( self, diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/test_saga_storage_sqlalchemy.py index 124a6ec..0c8d3a4 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/test_saga_storage_sqlalchemy.py @@ -297,7 +297,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas( storage: SqlAlchemySagaStorage, test_context: dict[str, str], ) -> None: - """Positive: returns RUNNING, COMPENSATING, FAILED sagas only.""" + """Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded.""" id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4() for sid in (id1, id2, id3): await storage.create_saga(saga_id=sid, name="saga", context=test_context) @@ -306,8 +306,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas( await storage.update_status(id3, SagaStatus.FAILED) ids = await storage.get_sagas_for_recovery(limit=10) - assert set(ids) == {id1, id2, id3} - assert len(ids) == 3 + assert set(ids) == {id1, id2} + assert id3 not in ids + assert len(ids) == 2 async def test_get_sagas_for_recovery_respects_limit( self,