Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.7.0"
version = "4.7.1"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down
8 changes: 8 additions & 0 deletions src/cqrs/saga/compensation.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ async def compensate_steps(
"""
await self._storage.update_status(self._saga_id, SagaStatus.COMPENSATING)

if not completed_steps:
logger.info(
f"Saga {self._saga_id}: completed_steps is empty, "
"skipping compensation (no step.compensate() will be called).",
)
await self._storage.update_status(self._saga_id, SagaStatus.FAILED)
return

# Load history to skip already compensated steps
history = await self._storage.get_step_history(self._saga_id)
compensated_steps = {
Expand Down
8 changes: 8 additions & 0 deletions src/cqrs/saga/saga.py
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ async def __aiter__(
for step in reconstructed_steps
]

if not self._completed_steps:
logger.warning(
f"Saga {self._saga_id}: no completed steps to compensate "
"(saga failed before any step finished 'act', or step names in "
"storage do not match saga step class names). "
"Marking as FAILED without calling compensate().",
)

# Immediately proceed to compensation - no forward execution
await self._compensate()

Expand Down
2 changes: 1 addition & 1 deletion src/cqrs/saga/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ async def get_sagas_for_recovery(
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
) -> list[uuid.UUID]:
recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING, SagaStatus.FAILED)
recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING)
now = datetime.datetime.now(datetime.timezone.utc)
threshold = (
(now - datetime.timedelta(seconds=stale_after_seconds))
Expand Down
7 changes: 4 additions & 3 deletions src/cqrs/saga/storage/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,10 @@ async def get_sagas_for_recovery(
updated). None means no staleness filter (backward compatible).

Returns:
List of saga IDs (RUNNING, COMPENSATING, or FAILED), ordered by
updated_at ascending, with recovery_attempts < max_recovery_attempts,
and optionally updated_at older than the staleness threshold.
List of saga IDs (RUNNING or COMPENSATING only; FAILED sagas are
not included), ordered by updated_at ascending, with
recovery_attempts < max_recovery_attempts, and optionally
updated_at older than the staleness threshold.
"""

@abc.abstractmethod
Expand Down
1 change: 0 additions & 1 deletion src/cqrs/saga/storage/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,6 @@ async def get_sagas_for_recovery(
recoverable = (
SagaStatus.RUNNING,
SagaStatus.COMPENSATING,
SagaStatus.FAILED,
)
async with self.session_factory() as session:
stmt = (
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/test_saga_storage_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
storage: MemorySagaStorage,
test_context: dict[str, str],
) -> None:
"""Positive: returns RUNNING, COMPENSATING, FAILED sagas only."""
"""Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded."""
id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
for sid in (id1, id2, id3):
await storage.create_saga(saga_id=sid, name="saga", context=test_context)
Expand All @@ -181,8 +181,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
await storage.update_status(id3, SagaStatus.FAILED)

ids = await storage.get_sagas_for_recovery(limit=10)
assert set(ids) == {id1, id2, id3}
assert len(ids) == 3
assert set(ids) == {id1, id2}
assert id3 not in ids
assert len(ids) == 2

async def test_get_sagas_for_recovery_respects_limit(
self,
Expand Down
7 changes: 4 additions & 3 deletions tests/integration/test_saga_storage_sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
storage: SqlAlchemySagaStorage,
test_context: dict[str, str],
) -> None:
"""Positive: returns RUNNING, COMPENSATING, FAILED sagas only."""
"""Positive: returns RUNNING and COMPENSATING sagas only; FAILED excluded."""
id1, id2, id3 = uuid.uuid4(), uuid.uuid4(), uuid.uuid4()
for sid in (id1, id2, id3):
await storage.create_saga(saga_id=sid, name="saga", context=test_context)
Expand All @@ -306,8 +306,9 @@ async def test_get_sagas_for_recovery_returns_recoverable_sagas(
await storage.update_status(id3, SagaStatus.FAILED)

ids = await storage.get_sagas_for_recovery(limit=10)
assert set(ids) == {id1, id2, id3}
assert len(ids) == 3
assert set(ids) == {id1, id2}
assert id3 not in ids
assert len(ids) == 2

async def test_get_sagas_for_recovery_respects_limit(
self,
Expand Down