1717import types
1818import weakref
1919import errno
20- import ctypes
2120
2221from queue import Empty , Full , ShutDown
2322
@@ -55,21 +54,21 @@ def __init__(self, maxsize=0, *, ctx):
5554 # For use by concurrent.futures
5655 self ._ignore_epipe = False
5756 self ._reset ()
58- self ._shutdown_state = context ._default_context .Value (
59- ctypes .c_uint8 , lock = self ._rlock
60- )
57+ self ._shutdown_state = ctx .Value ('i' , _queue_alive )
6158
6259 if sys .platform != 'win32' :
6360 register_after_fork (self , Queue ._after_fork )
6461
6562 def __getstate__ (self ):
6663 context .assert_spawning (self )
6764 return (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
68- self ._rlock , self ._wlock , self ._sem , self ._opid )
65+ self ._rlock , self ._wlock , self ._sem , self ._opid ,
66+ self ._shutdown_state )
6967
7068 def __setstate__ (self , state ):
7169 (self ._ignore_epipe , self ._maxsize , self ._reader , self ._writer ,
72- self ._rlock , self ._wlock , self ._sem , self ._opid ) = state
70+ self ._rlock , self ._wlock , self ._sem , self ._opid ,
71+ self ._shutdown_state ) = state
7372 self ._reset ()
7473
7574 def _after_fork (self ):
@@ -91,55 +90,77 @@ def _reset(self, after_fork=False):
9190 self ._recv_bytes = self ._reader .recv_bytes
9291 self ._poll = self ._reader .poll
9392
93+ def _is_alive (self ):
94+ return self ._shutdown_state .value == _queue_alive
95+
96+ def _is_shutdown (self ):
97+ return self ._shutdown_state .value == _queue_shutdown
98+
99+ def _is_shutdown_immediate (self ):
100+ return self ._shutdown_state .value == _queue_shutdown_immediate
101+
102+ def _set_shutdown (self ):
103+ self ._shutdown_state .value = _queue_shutdown
104+
105+ def _set_shutdown_immediate (self ):
106+ self ._shutdown_state .value = _queue_shutdown_immediate
107+
94108 def put (self , obj , block = True , timeout = None ):
95109 if self ._closed :
96110 raise ValueError (f"Queue { self !r} is closed" )
97- if self ._shutdown_state . value != _queue_alive :
111+ if not self ._is_alive () :
98112 raise ShutDown
99113 if not self ._sem .acquire (block , timeout ):
114+ if not self ._is_alive ():
115+ raise ShutDown
100116 raise Full
101117
102118 with self ._notempty :
103- if self ._shutdown_state .value != _queue_alive :
104- raise ShutDown
105119 if self ._thread is None :
106120 self ._start_thread ()
107121 self ._buffer .append (obj )
108122 self ._notempty .notify ()
109123
110124 def get (self , block = True , timeout = None ):
111- if self ._shutdown_state .value == _queue_shutdown_immediate :
112- raise ShutDown
113125 if self ._closed :
114126 raise ValueError (f"Queue { self !r} is closed" )
115127 if block and timeout is None :
116128 with self ._rlock :
117- if self ._shutdown_state .value != _queue_alive :
129+ # checks shutdown state
130+ if (self ._is_shutdown_immediate ()
131+ or (self ._is_shutdown () and self .empty ())):
118132 raise ShutDown
119133 res = self ._recv_bytes ()
120134 self ._sem .release ()
121135 else :
122136 if block :
123137 deadline = time .monotonic () + timeout
124138 if not self ._rlock .acquire (block , timeout ):
139+ if (self ._is_shutdown_immediate ()
140+ or (self ._is_shutdown () and self .empty ())):
141+ raise ShutDown
125142 raise Empty
126143 try :
127144 if block :
128145 timeout = deadline - time .monotonic ()
129146 if not self ._poll (timeout ):
130- if self ._shutdown_state . value != _queue_alive :
147+ if not self ._is_alive () :
131148 raise ShutDown
132149 raise Empty
133- if self ._shutdown_state .value != _queue_alive :
134- raise ShutDown
135150 elif not self ._poll ():
151+ if not self ._is_alive ():
152+ raise ShutDown
136153 raise Empty
154+
155+ # here queue is not empty
156+ if self ._is_shutdown_immediate ():
157+ raise ShutDown
158+ # here shutdown state queue is alive or shutdown
137159 res = self ._recv_bytes ()
138160 self ._sem .release ()
139161 finally :
140162 self ._rlock .release ()
141- if self ._shutdown_state .value == _queue_shutdown :
142- raise ShutDown
163+
143164 # unserialize the data after having released the lock
144165 return _ForkingPickler .loads (res )
145166
@@ -159,6 +180,19 @@ def get_nowait(self):
159180 def put_nowait (self , obj ):
160181 return self .put (obj , False )
161182
183+ def shutdown (self , immediate = False ):
184+ if self ._closed :
185+ raise ValueError (f"Queue { self !r} is closed" )
186+ with self ._shutdown_state .get_lock ():
187+ if self ._is_shutdown_immediate ():
188+ return
189+ if immediate :
190+ self ._set_shutdown_immediate ()
191+ with self ._notempty :
192+ self ._notempty .notify_all ()
193+ else :
194+ self ._set_shutdown ()
195+
162196 def close (self ):
163197 self ._closed = True
164198 close = self ._close
@@ -332,7 +366,11 @@ def __setstate__(self, state):
332366 def put (self , obj , block = True , timeout = None ):
333367 if self ._closed :
334368 raise ValueError (f"Queue { self !r} is closed" )
369+ if not self ._is_alive ():
370+ raise ShutDown
335371 if not self ._sem .acquire (block , timeout ):
372+ if not self ._is_alive ():
373+ raise ShutDown
336374 raise Full
337375
338376 with self ._notempty , self ._cond :
@@ -344,17 +382,28 @@ def put(self, obj, block=True, timeout=None):
344382
345383 def task_done (self ):
346384 with self ._cond :
385+ if self ._is_shutdown_immediate ():
386+ raise ShutDown
347387 if not self ._unfinished_tasks .acquire (False ):
348388 raise ValueError ('task_done() called too many times' )
349389 if self ._unfinished_tasks ._semlock ._is_zero ():
350390 self ._cond .notify_all ()
351391
352392 def join (self ):
353393 with self ._cond :
354- if self ._shutdown_state . value == _queue_shutdown_immediate :
355- return
394+ if self ._is_shutdown_immediate () :
395+ raise ShutDown
356396 if not self ._unfinished_tasks ._semlock ._is_zero ():
357397 self ._cond .wait ()
398+ if self ._is_shutdown_immediate ():
399+ raise ShutDown
400+
401+ def shutdown (self , immediate = False ):
402+ with self ._cond :
403+ is_alive = self ._is_alive ()
404+ super ().shutdown (immediate )
405+ if is_alive :
406+ self ._cond .notify_all ()
358407
359408#
360409# Simplified Queue type -- really just a locked pipe
0 commit comments