99
1010from redis import asyncio as aioredis
1111
12- from channels .exceptions import ChannelFull
13- from channels .layers import BaseChannelLayer
12+ from channels .exceptions import ChannelFull # type: ignore[import-untyped]
13+ from channels .layers import BaseChannelLayer # type: ignore[import-untyped]
1414
1515from .serializers import registry
1616from .utils import (
@@ -38,12 +38,10 @@ class ChannelLock:
3838 """
3939
4040 def __init__ (self ) -> None :
41- self .locks : collections .defaultdict [str , asyncio .Lock ] = (
42- collections .defaultdict (asyncio .Lock )
43- )
44- self .wait_counts : collections .defaultdict [str , int ] = collections .defaultdict (
45- int
41+ self .locks : typing .DefaultDict [str , asyncio .Lock ] = collections .defaultdict (
42+ asyncio .Lock
4643 )
44+ self .wait_counts : typing .DefaultDict [str , int ] = collections .defaultdict (int )
4745
4846 async def acquire (self , channel : str ) -> bool :
4947 """
@@ -69,7 +67,7 @@ def release(self, channel: str) -> None:
6967 del self .wait_counts [channel ]
7068
7169
72- class BoundedQueue (asyncio .Queue [ typing . Any ] ):
70+ class BoundedQueue (asyncio .Queue ):
7371 def put_nowait (self , item : typing .Any ) -> None :
7472 if self .full ():
7573 # see: https://github.com/django/channels_redis/issues/212
@@ -157,11 +155,11 @@ def __init__(
157155 # Event loop they are trying to receive on
158156 self .receive_event_loop : typing .Optional [asyncio .AbstractEventLoop ] = None
159157 # Buffered messages by process-local channel name
160- self .receive_buffer : collections . defaultdict [str , BoundedQueue ] = (
158+ self .receive_buffer : typing . DefaultDict [str , BoundedQueue ] = (
161159 collections .defaultdict (functools .partial (BoundedQueue , self .capacity ))
162160 )
163161 # Detached channel cleanup tasks
164- self .receive_cleaners : typing .List [asyncio .Task [typing .Any ]] = []
162+ self .receive_cleaners : typing .List [" asyncio.Task[typing.Any]" ] = []
165163 # Per-channel cleanup locks to prevent a receive starting and moving
166164 # a message back into the main queue before its cleanup has completed
167165 self .receive_clean_locks = ChannelLock ()
@@ -240,7 +238,7 @@ async def _brpop_with_clean(
240238 connection = self .connection (index )
241239 # Cancellation here doesn't matter, we're not doing anything destructive
242240 # and the script executes atomically...
243- await connection .eval (cleanup_script , 0 , channel , backup_queue )
241+ await connection .eval (cleanup_script , 0 , channel , backup_queue ) # type: ignore[misc]
244242 # ...and it doesn't matter here either, the message will be safe in the backup.
245243 result = await connection .bzpopmin (channel , timeout = timeout )
246244
@@ -271,9 +269,9 @@ async def receive(self, channel: str) -> typing.Any:
271269 assert self .valid_channel_name (channel )
272270 if "!" in channel :
273271 real_channel = self .non_local_name (channel )
274- assert real_channel .endswith (self . client_prefix + "!" ), (
275- "Wrong client prefix "
276- )
272+ assert real_channel .endswith (
273+ self . client_prefix + "! "
274+ ), "Wrong client prefix"
277275 # Enter receiving section
278276 loop = asyncio .get_running_loop ()
279277 self .receive_count += 1
@@ -293,7 +291,7 @@ async def receive(self, channel: str) -> typing.Any:
293291 message = None
294292 while self .receive_buffer [channel ].empty ():
295293 _tasks = [
296- self .receive_lock .acquire (),
294+ self .receive_lock .acquire (), # type: ignore[union-attr]
297295 self .receive_buffer [channel ].get (),
298296 ]
299297 tasks = [asyncio .ensure_future (task ) for task in _tasks ]
@@ -312,7 +310,7 @@ async def receive(self, channel: str) -> typing.Any:
312310 if not task .cancel ():
313311 assert task .done ()
314312 if task .result () is True :
315- self .receive_lock .release ()
313+ self .receive_lock .release () # type: ignore[union-attr]
316314
317315 raise
318316
@@ -335,7 +333,7 @@ async def receive(self, channel: str) -> typing.Any:
335333 if message or exception :
336334 if token :
337335 # We will not be receving as we already have the message.
338- self .receive_lock .release ()
336+ self .receive_lock .release () # type: ignore[union-attr]
339337
340338 if exception :
341339 raise exception
@@ -362,7 +360,7 @@ async def receive(self, channel: str) -> typing.Any:
362360 del self .receive_buffer [channel ]
363361 raise
364362 finally :
365- self .receive_lock .release ()
363+ self .receive_lock .release () # type: ignore[union-attr]
366364
367365 # We know there's a message available, because there
368366 # couldn't have been any interruption between empty() and here
@@ -377,7 +375,7 @@ async def receive(self, channel: str) -> typing.Any:
377375 self .receive_count -= 1
378376 # If we were the last out, drop the receive lock
379377 if self .receive_count == 0 :
380- assert not self .receive_lock .locked ()
378+ assert not self .receive_lock .locked () # type: ignore[union-attr]
381379 self .receive_lock = None
382380 self .receive_event_loop = None
383381 else :
@@ -422,7 +420,7 @@ async def receive_single(
422420 )
423421 self .receive_cleaners .append (cleaner )
424422
425- def _cleanup_done (cleaner : asyncio .Task [ typing . Any ] ) -> None :
423+ def _cleanup_done (cleaner : " asyncio.Task" ) -> None :
426424 self .receive_cleaners .remove (cleaner )
427425 self .receive_clean_locks .release (channel_key )
428426
@@ -468,7 +466,7 @@ async def flush(self) -> None:
468466 # Go through each connection and remove all with prefix
469467 for i in range (self .ring_size ):
470468 connection = self .connection (i )
471- await connection .eval (delete_prefix , 0 , self .prefix + "*" )
469+ await connection .eval (delete_prefix , 0 , self .prefix + "*" ) # type: ignore[union-attr,misc]
472470 # Now clear the pools as well
473471 await self .close_pools ()
474472
@@ -584,7 +582,7 @@ async def group_send(self, group: str, message: typing.Any) -> None:
584582
585583 # channel_keys does not contain a single redis key more than once
586584 connection = self .connection (connection_index )
587- channels_over_capacity = await connection .eval (
585+ channels_over_capacity = await connection .eval ( # type: ignore[misc]
588586 group_send_lua , len (channel_redis_keys ), * channel_redis_keys , * args
589587 )
590588 _channels_over_capacity = - 1.0
0 commit comments