[Feature] extend saga storage interface #48
Merged
+368
−21
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
🎯 Overview
This release extends the Saga Storage interface with two improvements for recovery workflows: explicit control over the recovery attempt counter and optional filtering of recovery candidates by saga name. All changes are backward compatible.
✨ What's New
🔢
set_recovery_attempts— Explicit Recovery CounterA new method on
ISagaStorageand its implementations lets you set the recovery attempt counter to any value instead of only incrementing it.Use cases:
await storage.set_recovery_attempts(saga_id, 0)await storage.set_recovery_attempts(saga_id, max_recovery_attempts)— it will no longer appear inget_sagas_for_recovery()Signature:
Implemented in:
MemorySagaStorage,SqlAlchemySagaStorage.🏷️
get_sagas_for_recovery— Optional Filter by Saga Nameget_sagas_for_recovery()now accepts an optionalsaga_nameparameter. You can run separate recovery jobs per saga type and only fetch the sagas that job is responsible for.saga_nameNone(default)"OrderSaga"name == "OrderSaga"Example — one job per saga type:
Updated signature:
📋 Summary of Changes
ISagaStorage)set_recovery_attempts(saga_id, attempts);get_sagas_for_recoverygains optionalsaga_name=Noneset_recovery_attempts; addsWHERE name = :saga_namewhensaga_nameis setset_recovery_attempts; filters in-memory bydata["name"] == saga_namewhensaga_nameis setset_recovery_attempts(set value, exclude from recovery, not found) and forsaga_name(filter by name,Nonereturns all types) in both storage backends🔄 Migration & Compatibility
get_sagas_for_recovery(limit=..., max_recovery_attempts=..., stale_after_seconds=...)continues to work unchanged;saga_namedefaults toNone.ISagaStoragemust implement the new abstract methodset_recovery_attempts(saga_id, attempts)and add the optionalsaga_nameparameter toget_sagas_for_recoveryto satisfy the interface.📦 Full Changelog
Added
set_recovery_attempts(saga_id, attempts)to set recovery attempt counter explicitlysaga_nameinget_sagas_for_recovery()for filtering recovery candidates by saga nameset_recovery_attemptsandsaga_namefiltering (Memory and SqlAlchemy)Changed
get_sagas_for_recovery()signature extended withsaga_name: str | None = None(backward compatible)