From 4aa1c2d58d9da33863c50033b1e6d6f6d08de7b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Wed, 28 Jan 2026 18:10:51 +0300 Subject: [PATCH 1/3] Extend ISagaStorage interface --- pyproject.toml | 2 +- src/cqrs/saga/storage/memory.py | 14 ++ src/cqrs/saga/storage/protocol.py | 143 +++++++++++++++--- src/cqrs/saga/storage/sqlalchemy.py | 25 +++ tests/integration/test_saga_storage_memory.py | 89 +++++++++++ .../test_saga_storage_sqlalchemy.py | 80 ++++++++++ 6 files changed, 332 insertions(+), 21 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 56f53ad..b8a7774 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,7 +31,7 @@ maintainers = [{name = "Vadim Kozyrevskiy", email = "vadikko2@mail.ru"}] name = "python-cqrs" readme = "README.md" requires-python = ">=3.10" -version = "4.7.1" +version = "4.7.2" [project.optional-dependencies] aiobreaker = ["aiobreaker>=0.3.0"] diff --git a/src/cqrs/saga/storage/memory.py b/src/cqrs/saga/storage/memory.py index d89ba6f..dfe851a 100644 --- a/src/cqrs/saga/storage/memory.py +++ b/src/cqrs/saga/storage/memory.py @@ -122,6 +122,7 @@ async def get_sagas_for_recovery( limit: int, max_recovery_attempts: int = 5, stale_after_seconds: int | None = None, + saga_name: str | None = None, ) -> list[uuid.UUID]: recoverable = (SagaStatus.RUNNING, SagaStatus.COMPENSATING) now = datetime.datetime.now(datetime.timezone.utc) @@ -136,6 +137,7 @@ async def get_sagas_for_recovery( if data["status"] in recoverable and data.get("recovery_attempts", 0) < max_recovery_attempts and (threshold is None or data["updated_at"] < threshold) + and (saga_name is None or data["name"] == saga_name) ] candidates.sort(key=lambda sid: self._sagas[sid]["updated_at"]) return candidates[:limit] @@ -153,3 +155,15 @@ async def increment_recovery_attempts( data["version"] += 1 if new_status is not None: data["status"] = new_status + + async def set_recovery_attempts( + self, + saga_id: uuid.UUID, + attempts: int, + ) -> None: + if saga_id not in self._sagas: + raise ValueError(f"Saga {saga_id} not found") + data = self._sagas[saga_id] + data["recovery_attempts"] = attempts + data["updated_at"] = datetime.datetime.now(datetime.timezone.utc) + data["version"] += 1 diff --git a/src/cqrs/saga/storage/protocol.py b/src/cqrs/saga/storage/protocol.py index 1ebf80c..789ddde 100644 --- a/src/cqrs/saga/storage/protocol.py +++ b/src/cqrs/saga/storage/protocol.py @@ -7,7 +7,14 @@ class ISagaStorage(abc.ABC): - """Interface for saga persistence storage.""" + """Interface for saga persistence storage. + + Storage is responsible for persisting saga execution state so that: + - Saga progress (status, context, step history) survives process restarts. + - Recovery jobs can find interrupted sagas (RUNNING/COMPENSATING) and retry them. + - Optimistic locking (version) prevents lost updates when multiple workers + touch the same saga. + """ @abc.abstractmethod async def create_saga( @@ -16,7 +23,16 @@ async def create_saga( name: str, context: dict[str, typing.Any], ) -> None: - """Initialize a new saga in storage.""" + """Create a new saga record in storage (initial state). + + Called when a saga is started for the first time. Creates the execution + record with PENDING status, initial context, and version 1. + + Args: + saga_id: Unique identifier of the saga (used as primary key). + name: Saga name (e.g. handler/type name) for diagnostics and filtering. + context: Initial context as a JSON-serializable dict (step inputs/outputs). + """ @abc.abstractmethod async def update_context( @@ -25,15 +41,22 @@ async def update_context( context: dict[str, typing.Any], current_version: int | None = None, ) -> None: - """Save saga context snapshot. + """Save saga context snapshot (e.g. after a step completes). + + Persists the current context so recovery can resume with up-to-date data. + When current_version is provided, implements optimistic locking: update + succeeds only if the stored version equals current_version (and version + is incremented), otherwise a concurrent update is detected. Args: saga_id: The ID of the saga to update. - context: The new context data. + context: The new context data (full snapshot, JSON-serializable). current_version: The expected current version of the saga execution. - If provided, optimistic locking will be used. + If provided, optimistic locking is used; if the stored version + differs, the update is rejected. + Raises: - SagaConcurrencyError: If optimistic locking fails. + SagaConcurrencyError: If optimistic locking fails (version mismatch). """ @abc.abstractmethod @@ -42,7 +65,17 @@ async def update_status( saga_id: uuid.UUID, status: SagaStatus, ) -> None: - """Update saga global status.""" + """Update the saga's global status. + + Status drives lifecycle: PENDING → RUNNING → COMPLETED, or RUNNING → + COMPENSATING → FAILED. Used by execution and recovery to know whether + to run steps, compensate, or consider the saga finished. + + Args: + saga_id: The ID of the saga to update. + status: New status (e.g. SagaStatus.RUNNING, SagaStatus.COMPLETED, + SagaStatus.COMPENSATING, SagaStatus.FAILED). + """ @abc.abstractmethod async def log_step( @@ -53,7 +86,19 @@ async def log_step( status: SagaStepStatus, details: str | None = None, ) -> None: - """Log a step transition.""" + """Append a step transition to the saga log. + + Used to record each step's outcome (started/completed/failed/compensated) + so that recovery can determine which steps have already been executed + and which need to be run or compensated. + + Args: + saga_id: The ID of the saga this step belongs to. + step_name: Name of the step (must match the step handler name). + action: "act" for forward execution, "compensate" for compensation. + status: Step outcome: STARTED, COMPLETED, FAILED, or COMPENSATED. + details: Optional message (e.g. error text when status is FAILED). + """ @abc.abstractmethod async def load_saga_state( @@ -62,14 +107,39 @@ async def load_saga_state( *, read_for_update: bool = False, ) -> tuple[SagaStatus, dict[str, typing.Any], int]: - """Load current saga status, context, and version.""" + """Load current saga status, context, and version. + + Used by execution and recovery to restore in-memory state. When + read_for_update is True, the implementation may lock the row (e.g. + SELECT FOR UPDATE) to avoid concurrent updates. + + Args: + saga_id: The ID of the saga to load. + read_for_update: If True, lock the row for update (e.g. for + subsequent update_context with optimistic locking). + + Returns: + Tuple of (status, context_dict, version). context_dict is the + last persisted context; version is used for optimistic locking. + """ @abc.abstractmethod async def get_step_history( self, saga_id: uuid.UUID, ) -> list[SagaLogEntry]: - """Get step execution history.""" + """Return the ordered list of step log entries for the saga. + + Used by recovery to determine which steps completed successfully + (and must be compensated in reverse order if compensating) and + which steps still need to be executed. + + Args: + saga_id: The ID of the saga whose step history to load. + + Returns: + List of SagaLogEntry in chronological order (oldest first). + """ @abc.abstractmethod async def get_sagas_for_recovery( @@ -77,23 +147,35 @@ async def get_sagas_for_recovery( limit: int, max_recovery_attempts: int = 5, stale_after_seconds: int | None = None, + saga_name: str | None = None, ) -> list[uuid.UUID]: - """Return saga IDs that need recovery. + """Return saga IDs that are candidates for recovery. + + Used by a recovery job/scheduler to find sagas that were left in + RUNNING or COMPENSATING (e.g. process crash) and should be retried. + Excludes COMPLETED and optionally limits by recovery attempts, + staleness, and saga name to avoid re-processing fresh or repeatedly + failing sagas. Args: - limit: Maximum number of saga IDs to return. + limit: Maximum number of saga IDs to return per call. max_recovery_attempts: Only include sagas with recovery_attempts - strictly less than this value. Default 5. + strictly less than this value. Sagas that have failed + recovery this many times can be excluded (e.g. marked FAILED). + Default 5. stale_after_seconds: If set, only include sagas whose updated_at is older than (now_utc - stale_after_seconds). Use this to avoid picking sagas that are currently being executed (recently updated). None means no staleness filter (backward compatible). + saga_name: If set, only include sagas with this name (e.g. handler + or type name). None means return all saga types (default). Returns: - List of saga IDs (RUNNING or COMPENSATING only; FAILED sagas are - not included), ordered by updated_at ascending, with + List of saga IDs (RUNNING or COMPENSATING only; FAILED/COMPLETED + are not included), ordered by updated_at ascending, with recovery_attempts < max_recovery_attempts, and optionally - updated_at older than the staleness threshold. + updated_at older than the staleness threshold and name equal to + saga_name when saga_name is provided. """ @abc.abstractmethod @@ -102,12 +184,33 @@ async def increment_recovery_attempts( saga_id: uuid.UUID, new_status: SagaStatus | None = None, ) -> None: - """Atomically increment recovery attempts after a failed recovery. + """Increment recovery attempt counter after a failed recovery run. + + Called when recovery of a saga fails (e.g. exception). Increments + recovery_attempts and optionally sets status (e.g. to FAILED) so that + get_sagas_for_recovery can exclude this saga or limit retries. + + Args: + saga_id: The saga that failed recovery. + new_status: If provided, set saga status to this value (e.g. + SagaStatus.FAILED) in the same atomic update. + """ + + @abc.abstractmethod + async def set_recovery_attempts( + self, + saga_id: uuid.UUID, + attempts: int, + ) -> None: + """Set recovery attempt counter to an explicit value. - Updates recovery_attempts += 1, updated_at = now(), and optionally - status. Also increments version for optimistic locking. + Used to reset the counter after successfully recovering one of the + steps (e.g. set to 0), or to set it to the maximum value so that + get_sagas_for_recovery excludes this saga from further recovery + (e.g. mark as permanently failed without changing status). Args: saga_id: The saga to update. - new_status: If provided, set saga status to this value (e.g. FAILED). + attempts: The value to set recovery_attempts to (e.g. 0 to reset, + or max_recovery_attempts to exclude from recovery). """ diff --git a/src/cqrs/saga/storage/sqlalchemy.py b/src/cqrs/saga/storage/sqlalchemy.py index cdc6104..83e8cd3 100644 --- a/src/cqrs/saga/storage/sqlalchemy.py +++ b/src/cqrs/saga/storage/sqlalchemy.py @@ -317,6 +317,7 @@ async def get_sagas_for_recovery( limit: int, max_recovery_attempts: int = 5, stale_after_seconds: int | None = None, + saga_name: str | None = None, ) -> list[uuid.UUID]: recoverable = ( SagaStatus.RUNNING, @@ -328,6 +329,8 @@ async def get_sagas_for_recovery( .where(SagaExecutionModel.status.in_(recoverable)) .where(SagaExecutionModel.recovery_attempts < max_recovery_attempts) ) + if saga_name is not None: + stmt = stmt.where(SagaExecutionModel.name == saga_name) if stale_after_seconds is not None: threshold = datetime.datetime.now( datetime.timezone.utc, @@ -364,3 +367,25 @@ async def increment_recovery_attempts( except SQLAlchemyError: await session.rollback() raise + + async def set_recovery_attempts( + self, + saga_id: uuid.UUID, + attempts: int, + ) -> None: + async with self.session_factory() as session: + try: + result = await session.execute( + sqlalchemy.update(SagaExecutionModel) + .where(SagaExecutionModel.id == saga_id) + .values( + recovery_attempts=attempts, + version=SagaExecutionModel.version + 1, + ), + ) + if result.rowcount == 0: # type: ignore[attr-defined] + raise ValueError(f"Saga {saga_id} not found") + await session.commit() + except SQLAlchemyError: + await session.rollback() + raise diff --git a/tests/integration/test_saga_storage_memory.py b/tests/integration/test_saga_storage_memory.py index 1c5115c..5e4279b 100644 --- a/tests/integration/test_saga_storage_memory.py +++ b/tests/integration/test_saga_storage_memory.py @@ -283,6 +283,49 @@ async def test_get_sagas_for_recovery_without_stale_after_unchanged_behavior( ids = await storage.get_sagas_for_recovery(limit=10) assert sid in ids + async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( + self, + storage: MemorySagaStorage, + test_context: dict[str, str], + ) -> None: + """Positive: when saga_name is set, only sagas with that name are returned.""" + id_foo1 = uuid.uuid4() + id_foo2 = uuid.uuid4() + id_bar = uuid.uuid4() + await storage.create_saga(saga_id=id_foo1, name="OrderSaga", context=test_context) + await storage.create_saga(saga_id=id_foo2, name="OrderSaga", context=test_context) + await storage.create_saga(saga_id=id_bar, name="PaymentSaga", context=test_context) + await storage.update_status(id_foo1, SagaStatus.RUNNING) + await storage.update_status(id_foo2, SagaStatus.RUNNING) + await storage.update_status(id_bar, SagaStatus.RUNNING) + + ids_all = await storage.get_sagas_for_recovery(limit=10) + assert len(ids_all) == 3 + ids_order = await storage.get_sagas_for_recovery(limit=10, saga_name="OrderSaga") + assert set(ids_order) == {id_foo1, id_foo2} + ids_payment = await storage.get_sagas_for_recovery(limit=10, saga_name="PaymentSaga") + assert ids_payment == [id_bar] + ids_nonexistent = await storage.get_sagas_for_recovery( + limit=10, + saga_name="NonExistentSaga", + ) + assert ids_nonexistent == [] + + async def test_get_sagas_for_recovery_saga_name_none_returns_all_types( + self, + storage: MemorySagaStorage, + test_context: dict[str, str], + ) -> None: + """Backward compat: when saga_name is None, all saga types are returned.""" + id1 = uuid.uuid4() + id2 = uuid.uuid4() + await storage.create_saga(saga_id=id1, name="SagaA", context=test_context) + await storage.create_saga(saga_id=id2, name="SagaB", context=test_context) + await storage.update_status(id1, SagaStatus.RUNNING) + await storage.update_status(id2, SagaStatus.RUNNING) + ids = await storage.get_sagas_for_recovery(limit=10, saga_name=None) + assert set(ids) == {id1, id2} + # --- get_sagas_for_recovery: negative --- async def test_get_sagas_for_recovery_empty_when_none_recoverable( @@ -377,3 +420,49 @@ async def test_increment_recovery_attempts_raises_when_saga_not_found( unknown_id = uuid.uuid4() with pytest.raises(ValueError, match="not found"): await storage.increment_recovery_attempts(unknown_id) + + # --- set_recovery_attempts: positive --- + + async def test_set_recovery_attempts_sets_value( + self, + storage: MemorySagaStorage, + saga_id: uuid.UUID, + test_context: dict[str, str], + ) -> None: + """Positive: recovery_attempts is set to the given value.""" + await storage.create_saga(saga_id=saga_id, name="saga", context=test_context) + await storage.update_status(saga_id, SagaStatus.RUNNING) + await storage.increment_recovery_attempts(saga_id) + await storage.increment_recovery_attempts(saga_id) + assert storage._sagas[saga_id]["recovery_attempts"] == 2 + + await storage.set_recovery_attempts(saga_id, 0) + assert storage._sagas[saga_id]["recovery_attempts"] == 0 + + await storage.set_recovery_attempts(saga_id, 5) + assert storage._sagas[saga_id]["recovery_attempts"] == 5 + + async def test_set_recovery_attempts_excludes_from_recovery_when_set_to_max( + self, + storage: MemorySagaStorage, + saga_id: uuid.UUID, + test_context: dict[str, str], + ) -> None: + """Positive: setting to max_recovery_attempts excludes saga from get_sagas_for_recovery.""" + await storage.create_saga(saga_id=saga_id, name="saga", context=test_context) + await storage.update_status(saga_id, SagaStatus.RUNNING) + await storage.set_recovery_attempts(saga_id, 5) + + ids = await storage.get_sagas_for_recovery(limit=10, max_recovery_attempts=5) + assert saga_id not in ids + + # --- set_recovery_attempts: negative --- + + async def test_set_recovery_attempts_raises_when_saga_not_found( + self, + storage: MemorySagaStorage, + ) -> None: + """Negative: raises ValueError when saga_id does not exist.""" + unknown_id = uuid.uuid4() + with pytest.raises(ValueError, match="not found"): + await storage.set_recovery_attempts(unknown_id, 0) diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/test_saga_storage_sqlalchemy.py index 0c8d3a4..ac9fadc 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/test_saga_storage_sqlalchemy.py @@ -388,6 +388,49 @@ async def test_get_sagas_for_recovery_without_stale_after_unchanged_behavior( ids = await storage.get_sagas_for_recovery(limit=10) assert sid in ids + async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( + self, + storage: SqlAlchemySagaStorage, + test_context: dict[str, str], + ) -> None: + """Positive: when saga_name is set, only sagas with that name are returned.""" + id_foo1 = uuid.uuid4() + id_foo2 = uuid.uuid4() + id_bar = uuid.uuid4() + await storage.create_saga(saga_id=id_foo1, name="OrderSaga", context=test_context) + await storage.create_saga(saga_id=id_foo2, name="OrderSaga", context=test_context) + await storage.create_saga(saga_id=id_bar, name="PaymentSaga", context=test_context) + await storage.update_status(id_foo1, SagaStatus.RUNNING) + await storage.update_status(id_foo2, SagaStatus.RUNNING) + await storage.update_status(id_bar, SagaStatus.RUNNING) + + ids_all = await storage.get_sagas_for_recovery(limit=10) + assert len(ids_all) == 3 + ids_order = await storage.get_sagas_for_recovery(limit=10, saga_name="OrderSaga") + assert set(ids_order) == {id_foo1, id_foo2} + ids_payment = await storage.get_sagas_for_recovery(limit=10, saga_name="PaymentSaga") + assert ids_payment == [id_bar] + ids_nonexistent = await storage.get_sagas_for_recovery( + limit=10, + saga_name="NonExistentSaga", + ) + assert ids_nonexistent == [] + + async def test_get_sagas_for_recovery_saga_name_none_returns_all_types( + self, + storage: SqlAlchemySagaStorage, + test_context: dict[str, str], + ) -> None: + """Backward compat: when saga_name is None, all saga types are returned.""" + id1 = uuid.uuid4() + id2 = uuid.uuid4() + await storage.create_saga(saga_id=id1, name="SagaA", context=test_context) + await storage.create_saga(saga_id=id2, name="SagaB", context=test_context) + await storage.update_status(id1, SagaStatus.RUNNING) + await storage.update_status(id2, SagaStatus.RUNNING) + ids = await storage.get_sagas_for_recovery(limit=10, saga_name=None) + assert set(ids) == {id1, id2} + # --- get_sagas_for_recovery: negative --- async def test_get_sagas_for_recovery_empty_when_none_recoverable( @@ -474,3 +517,40 @@ async def test_increment_recovery_attempts_raises_when_saga_not_found( unknown_id = uuid.uuid4() with pytest.raises(ValueError, match="not found"): await storage.increment_recovery_attempts(unknown_id) + + # --- set_recovery_attempts: positive --- + + async def test_set_recovery_attempts_sets_value( + self, + storage: SqlAlchemySagaStorage, + saga_id: uuid.UUID, + test_context: dict[str, str], + ) -> None: + """Positive: recovery_attempts is set to the given value.""" + await storage.create_saga(saga_id=saga_id, name="saga", context=test_context) + await storage.update_status(saga_id, SagaStatus.RUNNING) + await storage.increment_recovery_attempts(saga_id) + await storage.increment_recovery_attempts(saga_id) + + await storage.set_recovery_attempts(saga_id, 0) + ids_after_reset = await storage.get_sagas_for_recovery( + limit=10, + max_recovery_attempts=5, + ) + assert saga_id in ids_after_reset + + await storage.set_recovery_attempts(saga_id, 5) + ids_after_max = await storage.get_sagas_for_recovery( + limit=10, + max_recovery_attempts=5, + ) + assert saga_id not in ids_after_max + + async def test_set_recovery_attempts_raises_when_saga_not_found( + self, + storage: SqlAlchemySagaStorage, + ) -> None: + """Negative: raises ValueError when saga_id does not exist.""" + unknown_id = uuid.uuid4() + with pytest.raises(ValueError, match="not found"): + await storage.set_recovery_attempts(unknown_id, 0) From fcfa69fb1fbf1dcf0cdebb70e0103736376a2892 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Wed, 28 Jan 2026 18:12:28 +0300 Subject: [PATCH 2/3] Extend ISagaStorage interface --- tests/integration/test_saga_storage_memory.py | 20 ++++++++++++++----- .../test_saga_storage_sqlalchemy.py | 20 ++++++++++++++----- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_saga_storage_memory.py b/tests/integration/test_saga_storage_memory.py index 5e4279b..52a1900 100644 --- a/tests/integration/test_saga_storage_memory.py +++ b/tests/integration/test_saga_storage_memory.py @@ -292,18 +292,28 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( id_foo1 = uuid.uuid4() id_foo2 = uuid.uuid4() id_bar = uuid.uuid4() - await storage.create_saga(saga_id=id_foo1, name="OrderSaga", context=test_context) - await storage.create_saga(saga_id=id_foo2, name="OrderSaga", context=test_context) - await storage.create_saga(saga_id=id_bar, name="PaymentSaga", context=test_context) + await storage.create_saga( + saga_id=id_foo1, name="OrderSaga", context=test_context + ) + await storage.create_saga( + saga_id=id_foo2, name="OrderSaga", context=test_context + ) + await storage.create_saga( + saga_id=id_bar, name="PaymentSaga", context=test_context + ) await storage.update_status(id_foo1, SagaStatus.RUNNING) await storage.update_status(id_foo2, SagaStatus.RUNNING) await storage.update_status(id_bar, SagaStatus.RUNNING) ids_all = await storage.get_sagas_for_recovery(limit=10) assert len(ids_all) == 3 - ids_order = await storage.get_sagas_for_recovery(limit=10, saga_name="OrderSaga") + ids_order = await storage.get_sagas_for_recovery( + limit=10, saga_name="OrderSaga" + ) assert set(ids_order) == {id_foo1, id_foo2} - ids_payment = await storage.get_sagas_for_recovery(limit=10, saga_name="PaymentSaga") + ids_payment = await storage.get_sagas_for_recovery( + limit=10, saga_name="PaymentSaga" + ) assert ids_payment == [id_bar] ids_nonexistent = await storage.get_sagas_for_recovery( limit=10, diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/test_saga_storage_sqlalchemy.py index ac9fadc..8fa4fd8 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/test_saga_storage_sqlalchemy.py @@ -397,18 +397,28 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( id_foo1 = uuid.uuid4() id_foo2 = uuid.uuid4() id_bar = uuid.uuid4() - await storage.create_saga(saga_id=id_foo1, name="OrderSaga", context=test_context) - await storage.create_saga(saga_id=id_foo2, name="OrderSaga", context=test_context) - await storage.create_saga(saga_id=id_bar, name="PaymentSaga", context=test_context) + await storage.create_saga( + saga_id=id_foo1, name="OrderSaga", context=test_context + ) + await storage.create_saga( + saga_id=id_foo2, name="OrderSaga", context=test_context + ) + await storage.create_saga( + saga_id=id_bar, name="PaymentSaga", context=test_context + ) await storage.update_status(id_foo1, SagaStatus.RUNNING) await storage.update_status(id_foo2, SagaStatus.RUNNING) await storage.update_status(id_bar, SagaStatus.RUNNING) ids_all = await storage.get_sagas_for_recovery(limit=10) assert len(ids_all) == 3 - ids_order = await storage.get_sagas_for_recovery(limit=10, saga_name="OrderSaga") + ids_order = await storage.get_sagas_for_recovery( + limit=10, saga_name="OrderSaga" + ) assert set(ids_order) == {id_foo1, id_foo2} - ids_payment = await storage.get_sagas_for_recovery(limit=10, saga_name="PaymentSaga") + ids_payment = await storage.get_sagas_for_recovery( + limit=10, saga_name="PaymentSaga" + ) assert ids_payment == [id_bar] ids_nonexistent = await storage.get_sagas_for_recovery( limit=10, From eef2524f629d2a9c5a20ed9caafc5e092f219667 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=92=D0=B0=D0=B4=D0=B8=D0=BC=20=D0=9A=D0=BE=D0=B7=D1=8B?= =?UTF-8?q?=D1=80=D0=B5=D0=B2=D1=81=D0=BA=D0=B8=D0=B9?= Date: Wed, 28 Jan 2026 18:13:28 +0300 Subject: [PATCH 3/3] Fixes after pre-commit --- tests/integration/test_saga_storage_memory.py | 18 +++++++++++++----- .../test_saga_storage_sqlalchemy.py | 18 +++++++++++++----- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/tests/integration/test_saga_storage_memory.py b/tests/integration/test_saga_storage_memory.py index 52a1900..d38277c 100644 --- a/tests/integration/test_saga_storage_memory.py +++ b/tests/integration/test_saga_storage_memory.py @@ -293,13 +293,19 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( id_foo2 = uuid.uuid4() id_bar = uuid.uuid4() await storage.create_saga( - saga_id=id_foo1, name="OrderSaga", context=test_context + saga_id=id_foo1, + name="OrderSaga", + context=test_context, ) await storage.create_saga( - saga_id=id_foo2, name="OrderSaga", context=test_context + saga_id=id_foo2, + name="OrderSaga", + context=test_context, ) await storage.create_saga( - saga_id=id_bar, name="PaymentSaga", context=test_context + saga_id=id_bar, + name="PaymentSaga", + context=test_context, ) await storage.update_status(id_foo1, SagaStatus.RUNNING) await storage.update_status(id_foo2, SagaStatus.RUNNING) @@ -308,11 +314,13 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( ids_all = await storage.get_sagas_for_recovery(limit=10) assert len(ids_all) == 3 ids_order = await storage.get_sagas_for_recovery( - limit=10, saga_name="OrderSaga" + limit=10, + saga_name="OrderSaga", ) assert set(ids_order) == {id_foo1, id_foo2} ids_payment = await storage.get_sagas_for_recovery( - limit=10, saga_name="PaymentSaga" + limit=10, + saga_name="PaymentSaga", ) assert ids_payment == [id_bar] ids_nonexistent = await storage.get_sagas_for_recovery( diff --git a/tests/integration/test_saga_storage_sqlalchemy.py b/tests/integration/test_saga_storage_sqlalchemy.py index 8fa4fd8..3d7e5fd 100644 --- a/tests/integration/test_saga_storage_sqlalchemy.py +++ b/tests/integration/test_saga_storage_sqlalchemy.py @@ -398,13 +398,19 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( id_foo2 = uuid.uuid4() id_bar = uuid.uuid4() await storage.create_saga( - saga_id=id_foo1, name="OrderSaga", context=test_context + saga_id=id_foo1, + name="OrderSaga", + context=test_context, ) await storage.create_saga( - saga_id=id_foo2, name="OrderSaga", context=test_context + saga_id=id_foo2, + name="OrderSaga", + context=test_context, ) await storage.create_saga( - saga_id=id_bar, name="PaymentSaga", context=test_context + saga_id=id_bar, + name="PaymentSaga", + context=test_context, ) await storage.update_status(id_foo1, SagaStatus.RUNNING) await storage.update_status(id_foo2, SagaStatus.RUNNING) @@ -413,11 +419,13 @@ async def test_get_sagas_for_recovery_filters_by_saga_name_when_provided( ids_all = await storage.get_sagas_for_recovery(limit=10) assert len(ids_all) == 3 ids_order = await storage.get_sagas_for_recovery( - limit=10, saga_name="OrderSaga" + limit=10, + saga_name="OrderSaga", ) assert set(ids_order) == {id_foo1, id_foo2} ids_payment = await storage.get_sagas_for_recovery( - limit=10, saga_name="PaymentSaga" + limit=10, + saga_name="PaymentSaga", ) assert ids_payment == [id_bar] ids_nonexistent = await storage.get_sagas_for_recovery(