|
2 | 2 | import base64 |
3 | 3 | import binascii |
4 | 4 | import collections |
| 5 | +import functools |
5 | 6 | import hashlib |
6 | 7 | import itertools |
7 | 8 | import logging |
@@ -179,6 +180,19 @@ class UnsupportedRedis(Exception): |
179 | 180 | pass |
180 | 181 |
|
181 | 182 |
|
| 183 | +class BoundedQueue(asyncio.Queue): |
| 184 | + def put_nowait(self, item): |
| 185 | + if self.full(): |
| 186 | + # see: https://github.com/django/channels_redis/issues/212 |
| 187 | + # if we actually get into this code block, it likely means that |
| 188 | + # this specific consumer has stopped reading |
| 189 | + # if we get into this code block, it's better to drop messages |
| 190 | + # that exceed the channel layer capacity than to continue to |
| 191 | + # malloc() forever |
| 192 | + self.get_nowait() |
| 193 | + return super(BoundedQueue, self).put_nowait(item) |
| 194 | + |
| 195 | + |
182 | 196 | class RedisChannelLayer(BaseChannelLayer): |
183 | 197 | """ |
184 | 198 | Redis channel layer. |
@@ -226,7 +240,9 @@ def __init__( |
226 | 240 | # Event loop they are trying to receive on |
227 | 241 | self.receive_event_loop = None |
228 | 242 | # Buffered messages by process-local channel name |
229 | | - self.receive_buffer = collections.defaultdict(asyncio.Queue) |
| 243 | + self.receive_buffer = collections.defaultdict( |
| 244 | + functools.partial(BoundedQueue, self.capacity) |
| 245 | + ) |
230 | 246 | # Detached channel cleanup tasks |
231 | 247 | self.receive_cleaners = [] |
232 | 248 | # Per-channel cleanup locks to prevent a receive starting and moving |
@@ -544,7 +560,11 @@ async def new_channel(self, prefix="specific"): |
544 | 560 | Returns a new channel name that can be used by something in our |
545 | 561 | process as a specific channel. |
546 | 562 | """ |
547 | | - return "%s.%s!%s" % (prefix, self.client_prefix, uuid.uuid4().hex,) |
| 563 | + return "%s.%s!%s" % ( |
| 564 | + prefix, |
| 565 | + self.client_prefix, |
| 566 | + uuid.uuid4().hex, |
| 567 | + ) |
548 | 568 |
|
549 | 569 | ### Flush extension ### |
550 | 570 |
|
|
0 commit comments