Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}]
name = "python-cqrs"
readme = "README.md"
requires-python = ">=3.10"
version = "4.7.1"
version = "4.7.2"

[project.optional-dependencies]
aiobreaker = ["aiobreaker>=0.3.0"]
Expand Down
14 changes: 14 additions & 0 deletions src/cqrs/saga/storage/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ async def get_sagas_for_recovery(
limit: int,
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
saga_name: str | None = None,
) -> list[uuid.UUID]:
recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING)
now = datetime.datetime.now(datetime.timezone.utc)
Expand All @@ -136,6 +137,7 @@ async def get_sagas_for_recovery(
if data["status"] in recoverable
and data.get("recovery_attempts", 0) < max_recovery_attempts
and (threshold is None or data["updated_at"] < threshold)
and (saga_name is None or data["name"] == saga_name)
]
candidates.sort(key=lambda sid: self._sagas[sid]["updated_at"])
return candidates[:limit]
Expand All @@ -153,3 +155,15 @@ async def increment_recovery_attempts(
data["version"] += 1
if new_status is not None:
data["status"] = new_status

async def set_recovery_attempts(
self,
saga_id: uuid.UUID,
attempts: int,
) -> None:
if saga_id not in self._sagas:
raise ValueError(f"Saga {saga_id} not found")
data = self._sagas[saga_id]
data["recovery_attempts"] = attempts
data["updated_at"] = datetime.datetime.now(datetime.timezone.utc)
data["version"] += 1
143 changes: 123 additions & 20 deletions src/cqrs/saga/storage/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,14 @@


class ISagaStorage(abc.ABC):
"""Interface for saga persistence storage."""
"""Interface for saga persistence storage.

Storage is responsible for persisting saga execution state so that:
- Saga progress (status, context, step history) survives process restarts.
- Recovery jobs can find interrupted sagas (RUNNING/COMPENSATING) and retry them.
- Optimistic locking (version) prevents lost updates when multiple workers
touch the same saga.
"""

@abc.abstractmethod
async def create_saga(
Expand All @@ -16,7 +23,16 @@ async def create_saga(
name: str,
context: dict[str, typing.Any],
) -> None:
"""Initialize a new saga in storage."""
"""Create a new saga record in storage (initial state).

Called when a saga is started for the first time. Creates the execution
record with PENDING status, initial context, and version 1.

Args:
saga_id: Unique identifier of the saga (used as primary key).
name: Saga name (e.g. handler/type name) for diagnostics and filtering.
context: Initial context as a JSON-serializable dict (step inputs/outputs).
"""

@abc.abstractmethod
async def update_context(
Expand All @@ -25,15 +41,22 @@ async def update_context(
context: dict[str, typing.Any],
current_version: int | None = None,
) -> None:
"""Save saga context snapshot.
"""Save saga context snapshot (e.g. after a step completes).

Persists the current context so recovery can resume with up-to-date data.
When current_version is provided, implements optimistic locking: update
succeeds only if the stored version equals current_version (and version
is incremented), otherwise a concurrent update is detected.

Args:
saga_id: The ID of the saga to update.
context: The new context data.
context: The new context data (full snapshot, JSON-serializable).
current_version: The expected current version of the saga execution.
If provided, optimistic locking will be used.
If provided, optimistic locking is used; if the stored version
differs, the update is rejected.

Raises:
SagaConcurrencyError: If optimistic locking fails.
SagaConcurrencyError: If optimistic locking fails (version mismatch).
"""

@abc.abstractmethod
Expand All @@ -42,7 +65,17 @@ async def update_status(
saga_id: uuid.UUID,
status: SagaStatus,
) -> None:
"""Update saga global status."""
"""Update the saga's global status.

Status drives lifecycle: PENDING β†’ RUNNING β†’ COMPLETED, or RUNNING β†’
COMPENSATING β†’ FAILED. Used by execution and recovery to know whether
to run steps, compensate, or consider the saga finished.

Args:
saga_id: The ID of the saga to update.
status: New status (e.g. SagaStatus.RUNNING, SagaStatus.COMPLETED,
SagaStatus.COMPENSATING, SagaStatus.FAILED).
"""

@abc.abstractmethod
async def log_step(
Expand All @@ -53,7 +86,19 @@ async def log_step(
status: SagaStepStatus,
details: str | None = None,
) -> None:
"""Log a step transition."""
"""Append a step transition to the saga log.

Used to record each step's outcome (started/completed/failed/compensated)
so that recovery can determine which steps have already been executed
and which need to be run or compensated.

Args:
saga_id: The ID of the saga this step belongs to.
step_name: Name of the step (must match the step handler name).
action: "act" for forward execution, "compensate" for compensation.
status: Step outcome: STARTED, COMPLETED, FAILED, or COMPENSATED.
details: Optional message (e.g. error text when status is FAILED).
"""

@abc.abstractmethod
async def load_saga_state(
Expand All @@ -62,38 +107,75 @@ async def load_saga_state(
*,
read_for_update: bool = False,
) -> tuple[SagaStatus, dict[str, typing.Any], int]:
"""Load current saga status, context, and version."""
"""Load current saga status, context, and version.

Used by execution and recovery to restore in-memory state. When
read_for_update is True, the implementation may lock the row (e.g.
SELECT FOR UPDATE) to avoid concurrent updates.

Args:
saga_id: The ID of the saga to load.
read_for_update: If True, lock the row for update (e.g. for
subsequent update_context with optimistic locking).

Returns:
Tuple of (status, context_dict, version). context_dict is the
last persisted context; version is used for optimistic locking.
"""

@abc.abstractmethod
async def get_step_history(
self,
saga_id: uuid.UUID,
) -> list[SagaLogEntry]:
"""Get step execution history."""
"""Return the ordered list of step log entries for the saga.

Used by recovery to determine which steps completed successfully
(and must be compensated in reverse order if compensating) and
which steps still need to be executed.

Args:
saga_id: The ID of the saga whose step history to load.

Returns:
List of SagaLogEntry in chronological order (oldest first).
"""

@abc.abstractmethod
async def get_sagas_for_recovery(
self,
limit: int,
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
saga_name: str | None = None,
) -> list[uuid.UUID]:
"""Return saga IDs that need recovery.
"""Return saga IDs that are candidates for recovery.

Used by a recovery job/scheduler to find sagas that were left in
RUNNING or COMPENSATING (e.g. process crash) and should be retried.
Excludes COMPLETED and optionally limits by recovery attempts,
staleness, and saga name to avoid re-processing fresh or repeatedly
failing sagas.

Args:
limit: Maximum number of saga IDs to return.
limit: Maximum number of saga IDs to return per call.
max_recovery_attempts: Only include sagas with recovery_attempts
strictly less than this value. Default 5.
strictly less than this value. Sagas that have failed
recovery this many times can be excluded (e.g. marked FAILED).
Default 5.
stale_after_seconds: If set, only include sagas whose updated_at
is older than (now_utc - stale_after_seconds). Use this to
avoid picking sagas that are currently being executed (recently
updated). None means no staleness filter (backward compatible).
saga_name: If set, only include sagas with this name (e.g. handler
or type name). None means return all saga types (default).

Returns:
List of saga IDs (RUNNING or COMPENSATING only; FAILED sagas are
not included), ordered by updated_at ascending, with
List of saga IDs (RUNNING or COMPENSATING only; FAILED/COMPLETED
are not included), ordered by updated_at ascending, with
recovery_attempts < max_recovery_attempts, and optionally
updated_at older than the staleness threshold.
updated_at older than the staleness threshold and name equal to
saga_name when saga_name is provided.
"""

@abc.abstractmethod
Expand All @@ -102,12 +184,33 @@ async def increment_recovery_attempts(
saga_id: uuid.UUID,
new_status: SagaStatus | None = None,
) -> None:
"""Atomically increment recovery attempts after a failed recovery.
"""Increment recovery attempt counter after a failed recovery run.

Called when recovery of a saga fails (e.g. exception). Increments
recovery_attempts and optionally sets status (e.g. to FAILED) so that
get_sagas_for_recovery can exclude this saga or limit retries.

Args:
saga_id: The saga that failed recovery.
new_status: If provided, set saga status to this value (e.g.
SagaStatus.FAILED) in the same atomic update.
"""

@abc.abstractmethod
async def set_recovery_attempts(
self,
saga_id: uuid.UUID,
attempts: int,
) -> None:
"""Set recovery attempt counter to an explicit value.

Updates recovery_attempts += 1, updated_at = now(), and optionally
status. Also increments version for optimistic locking.
Used to reset the counter after successfully recovering one of the
steps (e.g. set to 0), or to set it to the maximum value so that
get_sagas_for_recovery excludes this saga from further recovery
(e.g. mark as permanently failed without changing status).

Args:
saga_id: The saga to update.
new_status: If provided, set saga status to this value (e.g. FAILED).
attempts: The value to set recovery_attempts to (e.g. 0 to reset,
or max_recovery_attempts to exclude from recovery).
"""
25 changes: 25 additions & 0 deletions src/cqrs/saga/storage/sqlalchemy.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,7 @@ async def get_sagas_for_recovery(
limit: int,
max_recovery_attempts: int = 5,
stale_after_seconds: int | None = None,
saga_name: str | None = None,
) -> list[uuid.UUID]:
recoverable = (
SagaStatus.RUNNING,
Expand All @@ -328,6 +329,8 @@ async def get_sagas_for_recovery(
.where(SagaExecutionModel.status.in_(recoverable))
.where(SagaExecutionModel.recovery_attempts < max_recovery_attempts)
)
if saga_name is not None:
stmt = stmt.where(SagaExecutionModel.name == saga_name)
if stale_after_seconds is not None:
threshold = datetime.datetime.now(
datetime.timezone.utc,
Expand Down Expand Up @@ -364,3 +367,25 @@ async def increment_recovery_attempts(
except SQLAlchemyError:
await session.rollback()
raise

async def set_recovery_attempts(
self,
saga_id: uuid.UUID,
attempts: int,
) -> None:
async with self.session_factory() as session:
try:
result = await session.execute(
sqlalchemy.update(SagaExecutionModel)
.where(SagaExecutionModel.id == saga_id)
.values(
recovery_attempts=attempts,
version=SagaExecutionModel.version + 1,
),
)
if result.rowcount == 0: # type: ignore[attr-defined]
raise ValueError(f"Saga {saga_id} not found")
await session.commit()
except SQLAlchemyError:
await session.rollback()
raise
Loading