Skip to content

Commit 855e8cc

Browse files
committed
Add examples and tests for registering and using Python UDTFs
1 parent 981bc99 commit 855e8cc

File tree

3 files changed

+40
-4
lines changed

3 files changed

+40
-4
lines changed

docs/source/user-guide/common-operations/udf-and-udfa.rst

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,26 @@ Provider as described in the ref:`_io_custom_table_provider` page.
254254
Once you have a table function, you can register it with the session context
255255
by using :py:func:`datafusion.context.SessionContext.register_udtf`.
256256

257+
.. code-block:: python
258+
259+
from datafusion import SessionContext, Table, udtf
260+
261+
ctx = SessionContext()
262+
263+
@udtf("table_from_sql")
264+
def table_from_sql_udtf() -> Table:
265+
return Table.from_dataframe(ctx.sql("SELECT 1 AS value"))
266+
267+
ctx.register_udtf(table_from_sql_udtf)
268+
269+
ctx.sql("SELECT * FROM table_from_sql()").show()
270+
271+
.. note::
272+
273+
The ``TABLE(...)`` wrapper syntax is not currently implemented for
274+
invoking table functions from SQL. Call the function directly instead,
275+
such as ``SELECT * FROM my_table_function()``.
276+
257277
There are examples of both rust backed and python based table functions in the
258278
examples folder of the repository. If you have a rust backed table function
259279
that you wish to expose via PyO3, you need to expose it as a ``PyCapsule``.

examples/python-udtf-table-capsule-regression.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ def table_from_sql_udtf() -> Table:
1919

2020
ctx.register_udtf(table_from_sql_udtf)
2121

22-
try:
23-
ctx.sql("SELECT * FROM table(table_from_sql())").collect()
24-
except NotImplementedError as err:
25-
print("Collecting from table_from_sql() failed:", err)
22+
result = ctx.sql("SELECT * FROM table_from_sql()").collect()
23+
as_pydict = [batch.to_pydict() for batch in result]
24+
print("table_from_sql() returned:", as_pydict)
25+
assert as_pydict == [{"value": [1]}]
2626

2727
ctx.register_table("numbers", Table(ctx.sql("SELECT 1 AS value")))
2828

python/tests/test_context.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
Table,
3232
column,
3333
literal,
34+
udtf,
3435
)
3536

3637

@@ -115,6 +116,21 @@ def test_register_record_batches(ctx):
115116
assert result[0].column(1) == pa.array([-3, -3, -3])
116117

117118

119+
def test_register_python_table_function(ctx):
120+
@udtf("table_from_sql")
121+
def table_from_sql_udtf() -> Table:
122+
return Table.from_dataframe(
123+
ctx.sql("SELECT 1 AS value UNION ALL SELECT 2 AS value")
124+
)
125+
126+
ctx.register_udtf(table_from_sql_udtf)
127+
128+
result = ctx.sql("SELECT * FROM table_from_sql() ORDER BY value").collect()
129+
table = pa.Table.from_batches(result)
130+
131+
assert table.to_pydict() == {"value": [1, 2]}
132+
133+
118134
def test_create_dataframe_registers_unique_table_name(ctx):
119135
# create a RecordBatch and register it as memtable
120136
batch = pa.RecordBatch.from_arrays(

0 commit comments

Comments
 (0)