@@ -1582,30 +1582,6 @@ def test_empty_to_arrow_table(df):
15821582 assert set (pyarrow_table .column_names ) == {"a" , "b" , "c" }
15831583
15841584
1585- def test_arrow_c_stream_to_table (fail_collect ):
1586- ctx = SessionContext ()
1587-
1588- # Create a DataFrame with two separate record batches
1589- batch1 = pa .record_batch ([pa .array ([1 ])], names = ["a" ])
1590- batch2 = pa .record_batch ([pa .array ([2 ])], names = ["a" ])
1591- df = ctx .create_dataframe ([[batch1 ], [batch2 ]])
1592-
1593- table = pa .Table .from_batches (df )
1594- expected = pa .Table .from_batches ([batch1 , batch2 ])
1595-
1596- assert table .equals (expected )
1597- assert table .schema == df .schema ()
1598- assert table .column ("a" ).num_chunks == 2
1599-
1600-
1601- def test_arrow_c_stream_reader (df ):
1602- reader = pa .RecordBatchReader ._import_from_c_capsule (df .__arrow_c_stream__ ())
1603- assert isinstance (reader , pa .RecordBatchReader )
1604- table = pa .Table .from_batches (reader )
1605- expected = pa .Table .from_batches (df .collect ())
1606- assert table .equals (expected )
1607-
1608-
16091585def test_to_pylist (df ):
16101586 # Convert datafusion dataframe to Python list
16111587 pylist = df .to_pylist ()
@@ -2690,110 +2666,6 @@ def trigger_interrupt():
26902666 interrupt_thread .join (timeout = 1.0 )
26912667
26922668
2693- def test_arrow_c_stream_interrupted ():
2694- """__arrow_c_stream__ responds to ``KeyboardInterrupt`` signals.
2695-
2696- Similar to ``test_collect_interrupted`` this test issues a long running
2697- query, but consumes the results via ``__arrow_c_stream__``. It then raises
2698- ``KeyboardInterrupt`` in the main thread and verifies that the stream
2699- iteration stops promptly with the appropriate exception.
2700- """
2701-
2702- ctx = SessionContext ()
2703-
2704- batches = []
2705- for i in range (10 ):
2706- batch = pa .RecordBatch .from_arrays (
2707- [
2708- pa .array (list (range (i * 1000 , (i + 1 ) * 1000 ))),
2709- pa .array ([f"value_{ j } " for j in range (i * 1000 , (i + 1 ) * 1000 )]),
2710- ],
2711- names = ["a" , "b" ],
2712- )
2713- batches .append (batch )
2714-
2715- ctx .register_record_batches ("t1" , [batches ])
2716- ctx .register_record_batches ("t2" , [batches ])
2717-
2718- df = ctx .sql (
2719- """
2720- WITH t1_expanded AS (
2721- SELECT
2722- a,
2723- b,
2724- CAST(a AS DOUBLE) / 1.5 AS c,
2725- CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS d
2726- FROM t1
2727- CROSS JOIN (SELECT 1 AS dummy FROM t1 LIMIT 5)
2728- ),
2729- t2_expanded AS (
2730- SELECT
2731- a,
2732- b,
2733- CAST(a AS DOUBLE) * 2.5 AS e,
2734- CAST(a AS DOUBLE) * CAST(a AS DOUBLE) * CAST(a AS DOUBLE) AS f
2735- FROM t2
2736- CROSS JOIN (SELECT 1 AS dummy FROM t2 LIMIT 5)
2737- )
2738- SELECT
2739- t1.a, t1.b, t1.c, t1.d,
2740- t2.a AS a2, t2.b AS b2, t2.e, t2.f
2741- FROM t1_expanded t1
2742- JOIN t2_expanded t2 ON t1.a % 100 = t2.a % 100
2743- WHERE t1.a > 100 AND t2.a > 100
2744- """
2745- )
2746-
2747- reader = pa .RecordBatchReader ._import_from_c_capsule (df .__arrow_c_stream__ ())
2748-
2749- interrupted = False
2750- interrupt_error = None
2751- query_started = threading .Event ()
2752- max_wait_time = 5.0
2753-
2754- def trigger_interrupt ():
2755- start_time = time .time ()
2756- while not query_started .is_set ():
2757- time .sleep (0.1 )
2758- if time .time () - start_time > max_wait_time :
2759- msg = f"Query did not start within { max_wait_time } seconds"
2760- raise RuntimeError (msg )
2761-
2762- thread_id = threading .main_thread ().ident
2763- if thread_id is None :
2764- msg = "Cannot get main thread ID"
2765- raise RuntimeError (msg )
2766-
2767- exception = ctypes .py_object (KeyboardInterrupt )
2768- res = ctypes .pythonapi .PyThreadState_SetAsyncExc (
2769- ctypes .c_long (thread_id ), exception
2770- )
2771- if res != 1 :
2772- ctypes .pythonapi .PyThreadState_SetAsyncExc (
2773- ctypes .c_long (thread_id ), ctypes .py_object (0 )
2774- )
2775- msg = "Failed to raise KeyboardInterrupt in main thread"
2776- raise RuntimeError (msg )
2777-
2778- interrupt_thread = threading .Thread (target = trigger_interrupt )
2779- interrupt_thread .daemon = True
2780- interrupt_thread .start ()
2781-
2782- try :
2783- query_started .set ()
2784- # consume the reader which should block and be interrupted
2785- reader .read_all ()
2786- except KeyboardInterrupt :
2787- interrupted = True
2788- except Exception as e : # pragma: no cover - unexpected errors
2789- interrupt_error = e
2790-
2791- if not interrupted :
2792- pytest .fail (f"Stream was not interrupted; got error: { interrupt_error } " )
2793-
2794- interrupt_thread .join (timeout = 1.0 )
2795-
2796-
27972669def test_show_select_where_no_rows (capsys ) -> None :
27982670 ctx = SessionContext ()
27992671 df = ctx .sql ("SELECT 1 WHERE 1=0" )
0 commit comments