@@ -3286,22 +3286,16 @@ def test_arrow_c_stream_interrupted():
32863286
32873287 reader = pa .RecordBatchReader .from_stream (df )
32883288
3289- interrupted = False
3290- interrupt_error = None
3291- query_started = threading .Event ()
3289+ read_started = threading .Event ()
3290+ read_exception = []
32923291 read_thread_id = None
32933292 max_wait_time = 5.0
32943293
32953294 def trigger_interrupt ():
3296- start_time = time .time ()
3297- while not query_started .is_set ():
3298- time .sleep (0.1 )
3299- if time .time () - start_time > max_wait_time :
3300- msg = f"Query did not start within { max_wait_time } seconds"
3301- raise RuntimeError (msg )
3302-
3303- # Wait a bit to ensure read operation has started
3304- time .sleep (0.1 )
3295+ """Wait for read to start, then raise KeyboardInterrupt in read thread."""
3296+ if not read_started .wait (timeout = max_wait_time ):
3297+ msg = f"Read operation did not start within { max_wait_time } seconds"
3298+ raise RuntimeError (msg )
33053299
33063300 if read_thread_id is None :
33073301 msg = "Cannot get read thread ID"
@@ -3318,17 +3312,15 @@ def trigger_interrupt():
33183312 msg = "Failed to raise KeyboardInterrupt in read thread"
33193313 raise RuntimeError (msg )
33203314
3321- read_result = []
3322- read_exception = []
3323-
33243315 def read_stream ():
3316+ """Consume the reader, which should be interrupted."""
33253317 nonlocal read_thread_id
33263318 read_thread_id = threading .get_ident ()
33273319 try :
3328- query_started .set ()
3329- # consume the reader which should block and be interrupted
3320+ read_started .set ()
33303321 result = reader .read_all ()
3331- read_result .append (result )
3322+ # If we get here, the read completed without interruption
3323+ read_exception .append (RuntimeError ("Read completed without interruption" ))
33323324 except KeyboardInterrupt :
33333325 read_exception .append (KeyboardInterrupt )
33343326 except Exception as e :
@@ -3348,17 +3340,12 @@ def read_stream():
33483340 if read_thread .is_alive ():
33493341 pytest .fail ("Stream read operation timed out after 10 seconds" )
33503342
3351- # Check if we got the expected KeyboardInterrupt
3352- if read_exception :
3353- if isinstance (read_exception [0 ], KeyboardInterrupt ):
3354- interrupted = True
3355- elif "KeyboardInterrupt" in str (read_exception [0 ]):
3356- interrupted = True
3357- else :
3358- interrupt_error = read_exception [0 ]
3343+ # Verify we got the expected KeyboardInterrupt
3344+ if not read_exception :
3345+ pytest .fail ("No exception was raised during stream read" )
33593346
3360- if not interrupted :
3361- pytest .fail (f"Stream was not interrupted; got error : { interrupt_error } " )
3347+ if not isinstance ( read_exception [ 0 ], type ( KeyboardInterrupt )) :
3348+ pytest .fail (f"Expected KeyboardInterrupt, got: { read_exception [ 0 ] } " )
33623349
33633350 interrupt_thread .join (timeout = 1.0 )
33643351
0 commit comments