[Feature] Add saga attemptes supporintg #46
Merged
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Saga recovery attempts
ISagaStorage.get_sagas_for_recovery()— returns saga IDs that need recovery (statusRUNNING,COMPENSATING, orFAILED) with optional filters:limit— maximum number of IDs to returnmax_recovery_attempts(default:5) — only sagas withrecovery_attemptsstrictly less than this value; excludes repeatedly failing sagas from retrystale_after_seconds(optional) — only sagas whoseupdated_atis older thannow - stale_after_seconds; avoids picking sagas currently being executed by another workerISagaStorage.increment_recovery_attempts()— atomically incrementsrecovery_attemptsand optionally updates saga status (e.g. toFAILED). Intended for use after a failed recovery;recover_saga()calls it automatically on exception, so callers do not need to call it manually.recovery_attemptsfield in saga storage — each saga execution now has a counter of failed recovery attempts. Used byget_sagas_for_recovery()to limit retries and byincrement_recovery_attempts()on recovery failure.Implemented in both MemorySagaStorage and SqlAlchemySagaStorage.
Changed
recover_saga()— on recovery failure (any exception during resume), the storage'sincrement_recovery_attempts(saga_id, new_status=SagaStatus.FAILED)is invoked automatically. Sagas can then be retried untilmax_recovery_attemptsor excluded from future recovery runs viaget_sagas_for_recovery(max_recovery_attempts=...).Documentation
get_sagas_for_recovery(), andincrement_recovery_attempts().saga_recovery_scheduler.pydemonstrates a recovery loop usingget_sagas_for_recovery(limit, max_recovery_attempts, stale_after_seconds)andrecover_saga()without manualincrement_recovery_attemptscalls.Upgrade notes
ISagaStorage, you must add:get_sagas_for_recovery(limit, max_recovery_attempts=5, stale_after_seconds=None) -> list[uuid.UUID]increment_recovery_attempts(saga_id, new_status=None) -> Nonesaga_executionstable gains a new columnrecovery_attempts(INTEGER, default 0). For existing databases, add the column and backfill if needed, for example:storage.get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...)instead of custom queries to select sagas for recovery.