Skip to content

Commit b991f77

Browse files
committed
Refactor test_arrow_c_stream_interrupted to handle exceptions in a separate thread
1 parent 8f65f42 commit b991f77

File tree

1 file changed

+39
-13
lines changed

1 file changed

+39
-13
lines changed

python/tests/test_dataframe.py

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3289,6 +3289,7 @@ def test_arrow_c_stream_interrupted():
32893289
interrupted = False
32903290
interrupt_error = None
32913291
query_started = threading.Event()
3292+
read_thread_id = None
32923293
max_wait_time = 5.0
32933294

32943295
def trigger_interrupt():
@@ -3299,34 +3300,59 @@ def trigger_interrupt():
32993300
msg = f"Query did not start within {max_wait_time} seconds"
33003301
raise RuntimeError(msg)
33013302

3302-
thread_id = threading.main_thread().ident
3303-
if thread_id is None:
3304-
msg = "Cannot get main thread ID"
3303+
# Wait a bit to ensure read operation has started
3304+
time.sleep(0.1)
3305+
3306+
if read_thread_id is None:
3307+
msg = "Cannot get read thread ID"
33053308
raise RuntimeError(msg)
33063309

33073310
exception = ctypes.py_object(KeyboardInterrupt)
33083311
res = ctypes.pythonapi.PyThreadState_SetAsyncExc(
3309-
ctypes.c_long(thread_id), exception
3312+
ctypes.c_long(read_thread_id), exception
33103313
)
33113314
if res != 1:
33123315
ctypes.pythonapi.PyThreadState_SetAsyncExc(
3313-
ctypes.c_long(thread_id), ctypes.py_object(0)
3316+
ctypes.c_long(read_thread_id), ctypes.py_object(0)
33143317
)
3315-
msg = "Failed to raise KeyboardInterrupt in main thread"
3318+
msg = "Failed to raise KeyboardInterrupt in read thread"
33163319
raise RuntimeError(msg)
33173320

3321+
read_result = []
3322+
read_exception = []
3323+
3324+
def read_stream():
3325+
nonlocal read_thread_id
3326+
read_thread_id = threading.get_ident()
3327+
try:
3328+
query_started.set()
3329+
# consume the reader which should block and be interrupted
3330+
result = reader.read_all()
3331+
read_result.append(result)
3332+
except KeyboardInterrupt:
3333+
read_exception.append(KeyboardInterrupt)
3334+
except Exception as e:
3335+
read_exception.append(e)
3336+
3337+
read_thread = threading.Thread(target=read_stream)
3338+
read_thread.daemon = True
3339+
read_thread.start()
3340+
33183341
interrupt_thread = threading.Thread(target=trigger_interrupt)
33193342
interrupt_thread.daemon = True
33203343
interrupt_thread.start()
33213344

3322-
try:
3323-
query_started.set()
3324-
# consume the reader which should block and be interrupted
3325-
reader.read_all()
3326-
except KeyboardInterrupt:
3345+
# Wait for the read operation with a timeout
3346+
read_thread.join(timeout=10.0)
3347+
3348+
if read_thread.is_alive():
3349+
pytest.fail("Stream read operation timed out after 10 seconds")
3350+
3351+
# Check if we got the expected KeyboardInterrupt
3352+
if read_exception and isinstance(read_exception[0], type) and read_exception[0] == KeyboardInterrupt:
33273353
interrupted = True
3328-
except Exception as e: # pragma: no cover - unexpected errors
3329-
interrupt_error = e
3354+
elif read_exception:
3355+
interrupt_error = read_exception[0]
33303356

33313357
if not interrupted:
33323358
pytest.fail(f"Stream was not interrupted; got error: {interrupt_error}")

0 commit comments

Comments
 (0)