Skip to content

Commit 4cd0d63

Browse files
committed
fix: add a timeout to RedisSteamBroker xautoclaim lock to prevent infinite locking
1 parent 8bc1ed5 commit 4cd0d63

File tree

1 file changed

+6
-1
lines changed

1 file changed

+6
-1
lines changed

taskiq_redis/redis_broker.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ def __init__(
161161
approximate: bool = True,
162162
idle_timeout: int = 600000, # 10 minutes
163163
unacknowledged_batch_size: int = 100,
164+
unacknowledged_lock_timeout: int = 30,
164165
xread_count: int | None = 100,
165166
additional_streams: dict[str, str | int] | None = None,
166167
**connection_kwargs: Any,
@@ -188,8 +189,10 @@ def __init__(
188189
:param xread_count: number of messages to fetch from the stream at once.
189190
:param additional_streams: additional streams to read from.
190191
Each key is a stream name, value is a consumer id.
191-
:param redeliver_timeout: time in ms to wait before redelivering a message.
192192
:param unacknowledged_batch_size: number of unacknowledged messages to fetch.
193+
:param unacknowledged_lock_timeout: time in seconds before auto-releasing
194+
the lock. Useful when the worker crashes or gets killed.
195+
Set to a bigger value if your tasks take a long time to complete.
193196
"""
194197
super().__init__(
195198
url,
@@ -209,6 +212,7 @@ def __init__(
209212
self.additional_streams = additional_streams or {}
210213
self.idle_timeout = idle_timeout
211214
self.unacknowledged_batch_size = unacknowledged_batch_size
215+
self.unacknowledged_lock_timeout = unacknowledged_lock_timeout
212216
self.count = xread_count
213217

214218
async def _declare_consumer_group(self) -> None:
@@ -290,6 +294,7 @@ async def listen(self) -> AsyncGenerator[AckableMessage, None]:
290294
for stream in [self.queue_name, *self.additional_streams.keys()]:
291295
lock = redis_conn.lock(
292296
f"autoclaim:{self.consumer_group_name}:{stream}",
297+
timeout=self.unacknowledged_lock_timeout,
293298
)
294299
if await lock.locked():
295300
continue

0 commit comments

Comments
 (0)