@@ -3363,76 +3363,7 @@ def read_stream():
33633363 interrupt_thread .join (timeout = 1.0 )
33643364
33653365
3366- def test_record_batch_reader_interrupt_exits_quickly (ctx ):
3367- df = ctx .sql (
3368- """
3369- SELECT t1.value AS a, t2.value AS a2
3370- FROM range(0, 1000000, 1) AS t1
3371- JOIN range(0, 1000000, 1) AS t2 ON t1.value = t2.value
3372- """
3373- )
3374-
3375- reader = pa .RecordBatchReader .from_stream (df )
3376-
3377- query_started = threading .Event ()
3378- read_thread_id = None
3379- interrupt_time = None
3380- completion_time = None
3381- read_exception = []
3382-
3383- def trigger_interrupt ():
3384- nonlocal interrupt_time
3385- if not query_started .wait (timeout = 5.0 ):
3386- pytest .fail ("Query did not start in time" )
3387-
3388- time .sleep (0.1 )
3389- interrupt_time = time .time ()
3390-
3391- if read_thread_id is None :
3392- pytest .fail ("Read thread did not record an identifier" )
3393-
3394- res = ctypes .pythonapi .PyThreadState_SetAsyncExc (
3395- ctypes .c_long (read_thread_id ), ctypes .py_object (KeyboardInterrupt )
3396- )
3397- if res != 1 :
3398- ctypes .pythonapi .PyThreadState_SetAsyncExc (
3399- ctypes .c_long (read_thread_id ), ctypes .py_object (0 )
3400- )
3401- pytest .fail ("Failed to raise KeyboardInterrupt in read thread" )
3402-
3403- def read_stream ():
3404- nonlocal read_thread_id , completion_time
3405- read_thread_id = threading .get_ident ()
3406- try :
3407- query_started .set ()
3408- reader .read_all ()
3409- except KeyboardInterrupt :
3410- completion_time = time .time ()
3411- except Exception as exc : # pragma: no cover - unexpected failure path
3412- completion_time = time .time ()
3413- read_exception .append (exc )
3414-
3415- read_thread = threading .Thread (target = read_stream , daemon = True )
3416- interrupt_thread = threading .Thread (target = trigger_interrupt , daemon = True )
3417-
3418- read_thread .start ()
3419- interrupt_thread .start ()
3420-
3421- read_thread .join (timeout = 10.0 )
3422- if read_thread .is_alive ():
3423- pytest .fail ("Stream read operation timed out after 10 seconds" )
3424-
3425- interrupt_thread .join (timeout = 1.0 )
3426-
3427- if read_exception and "KeyboardInterrupt" not in str (read_exception [0 ]):
3428- pytest .fail (f"Read thread raised unexpected exception: { read_exception [0 ]} " )
3429-
3430- assert completion_time is not None , "Read thread did not finish"
3431- assert interrupt_time is not None , "Interrupt was not sent"
34323366
3433- elapsed = completion_time - interrupt_time
3434- assert elapsed >= 0 , "Completion recorded before interrupt was sent"
3435- assert elapsed < 1.5 , f"Cancellation took too long: { elapsed } s"
34363367
34373368
34383369def test_show_select_where_no_rows (capsys ) -> None :
0 commit comments