Skip to content

Commit 022e4b3

Browse files
authored
Add user defined window function support (#880)
* Adding PyWindowUDF and implementing PartitionEvaluator for it. Still requires python side work. * Add python wrappers for UDWF * adding unit tests for user defined window functions * Change udwf() to take an instance rather than a class so we can parameterize it * Pass multiple arrays for udwf evaluate so we can capture the order_by and also multiple columns * Update udwf to take multiple input columns * Add user exampe for UDWF * Update template for how values are passed to update * Add user documentation for UDWF * Updating documentation per PR review
1 parent f822495 commit 022e4b3

File tree

9 files changed

+1306
-31
lines changed

9 files changed

+1306
-31
lines changed

docs/source/user-guide/common-operations/udf-and-udfa.rst

Lines changed: 176 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,24 @@
1515
.. specific language governing permissions and limitations
1616
.. under the License.
1717
18-
User Defined Functions
18+
User-Defined Functions
1919
======================
2020

21-
DataFusion provides powerful expressions and functions, reducing the need for custom Python functions.
22-
However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs), with the :py:func:`~datafusion.udf.ScalarUDF.udf` function.
21+
DataFusion provides powerful expressions and functions, reducing the need for custom Python
22+
functions. However you can still incorporate your own functions, i.e. User-Defined Functions (UDFs).
23+
24+
Scalar Functions
25+
----------------
26+
27+
When writing a user-defined function that can operate on a row by row basis, these are called Scalar
28+
Functions. You can define your own scalar function by calling
29+
:py:func:`~datafusion.udf.ScalarUDF.udf` .
30+
31+
The basic definition of a scalar UDF is a python function that takes one or more
32+
`pyarrow <https://arrow.apache.org/docs/python/index.html>`_ arrays and returns a single array as
33+
output. DataFusion scalar UDFs operate on an entire batch of records at a time, though the
34+
evaluation of those records should be on a row by row basis. In the following example, we compute
35+
if the input array contains null values.
2336

2437
.. ipython:: python
2538
@@ -35,14 +48,67 @@ However you can still incorporate your own functions, i.e. User-Defined Function
3548
ctx = datafusion.SessionContext()
3649
3750
batch = pyarrow.RecordBatch.from_arrays(
38-
[pyarrow.array([1, 2, 3]), pyarrow.array([4, 5, 6])],
51+
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
3952
names=["a", "b"],
4053
)
4154
df = ctx.create_dataframe([[batch]], name="batch_array")
4255
43-
df.select(is_null_arr(col("a"))).to_pandas()
56+
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
57+
58+
In the previous example, we used the fact that pyarrow provides a variety of built in array
59+
functions such as ``is_null()``. There are additional pyarrow
60+
`compute functions <https://arrow.apache.org/docs/python/compute.html>`_ available. When possible,
61+
it is highly recommended to use these functions because they can perform computations without doing
62+
any copy operations from the original arrays. This leads to greatly improved performance.
63+
64+
If you need to perform an operation in python that is not available with the pyarrow compute
65+
functions, you will need to convert the record batch into python values, perform your operation,
66+
and construct an array. This operation of converting the built in data type of the array into a
67+
python object can be one of the slowest operations in DataFusion, so it should be done sparingly.
68+
69+
The following example performs the same operation as before with ``is_null`` but demonstrates
70+
converting to Python objects to do the evaluation.
71+
72+
.. ipython:: python
73+
74+
import pyarrow
75+
import datafusion
76+
from datafusion import udf, col
77+
78+
def is_null(array: pyarrow.Array) -> pyarrow.Array:
79+
return pyarrow.array([value.as_py() is None for value in array])
80+
81+
is_null_arr = udf(is_null, [pyarrow.int64()], pyarrow.bool_(), 'stable')
82+
83+
ctx = datafusion.SessionContext()
84+
85+
batch = pyarrow.RecordBatch.from_arrays(
86+
[pyarrow.array([1, None, 3]), pyarrow.array([4, 5, 6])],
87+
names=["a", "b"],
88+
)
89+
df = ctx.create_dataframe([[batch]], name="batch_array")
4490
45-
Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined Aggregate Functions (UDAFs)
91+
df.select(col("a"), is_null_arr(col("a")).alias("is_null")).show()
92+
93+
Aggregate Functions
94+
-------------------
95+
96+
The :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows you to define User-Defined
97+
Aggregate Functions (UDAFs). To use this you must implement an
98+
:py:class:`~datafusion.udf.Accumulator` that determines how the aggregation is performed.
99+
100+
When defining a UDAF there are four methods you need to implement. The ``update`` function takes the
101+
array(s) of input and updates the internal state of the accumulator. You should define this function
102+
to have as many input arguments as you will pass when calling the UDAF. Since aggregation may be
103+
split into multiple batches, we must have a method to combine multiple batches. For this, we have
104+
two functions, ``state`` and ``merge``. ``state`` will return an array of scalar values that contain
105+
the current state of a single batch accumulation. Then we must ``merge`` the results of these
106+
different states. Finally ``evaluate`` is the call that will return the final result after the
107+
``merge`` is complete.
108+
109+
In the following example we want to define a custom aggregate function that will return the
110+
difference between the sum of two columns. The state can be represented by a single value and we can
111+
also see how the inputs to ``update`` and ``merge`` differ.
46112

47113
.. code-block:: python
48114
@@ -57,30 +123,122 @@ Additionally the :py:func:`~datafusion.udf.AggregateUDF.udaf` function allows yo
57123
Interface of a user-defined accumulation.
58124
"""
59125
def __init__(self):
60-
self._sum = pyarrow.scalar(0.0)
126+
self._sum = 0.0
61127
62-
def update(self, values: pyarrow.Array) -> None:
63-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
64-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(values).as_py())
128+
def update(self, values_a: pyarrow.Array, values_b: pyarrow.Array) -> None:
129+
self._sum = self._sum + pyarrow.compute.sum(values_a).as_py() - pyarrow.compute.sum(values_b).as_py()
65130
66131
def merge(self, states: List[pyarrow.Array]) -> None:
67-
# not nice since pyarrow scalars can't be summed yet. This breaks on `None`
68-
self._sum = pyarrow.scalar(self._sum.as_py() + pyarrow.compute.sum(states[0]).as_py())
132+
self._sum = self._sum + pyarrow.compute.sum(states[0]).as_py()
69133
70134
def state(self) -> pyarrow.Array:
71-
return pyarrow.array([self._sum.as_py()])
135+
return pyarrow.array([self._sum])
72136
73137
def evaluate(self) -> pyarrow.Scalar:
74-
return self._sum
138+
return pyarrow.scalar(self._sum)
75139
76140
ctx = datafusion.SessionContext()
77141
df = ctx.from_pydict(
78142
{
79-
"a": [1, 2, 3],
80-
"b": [4, 5, 6],
143+
"a": [4, 5, 6],
144+
"b": [1, 2, 3],
81145
}
82146
)
83147
84-
my_udaf = udaf(MyAccumulator, pyarrow.float64(), pyarrow.float64(), [pyarrow.float64()], 'stable')
148+
my_udaf = udaf(MyAccumulator, [pyarrow.float64(), pyarrow.float64()], pyarrow.float64(), [pyarrow.float64()], 'stable')
149+
150+
df.aggregate([], [my_udaf(col("a"), col("b")).alias("col_diff")])
151+
152+
Window Functions
153+
----------------
154+
155+
To implement a User-Defined Window Function (UDWF) you must call the
156+
:py:func:`~datafusion.udf.WindowUDF.udwf` function using a class that implements the abstract
157+
class :py:class:`~datafusion.udf.WindowEvaluator`.
158+
159+
There are three methods of evaluation of UDWFs.
160+
161+
- ``evaluate`` is the simplest case, where you are given an array and are expected to calculate the
162+
value for a single row of that array. This is the simplest case, but also the least performant.
163+
- ``evaluate_all`` computes the values for all rows for an input array at a single time.
164+
- ``evaluate_all_with_rank`` computes the values for all rows, but you only have the rank
165+
information for the rows.
166+
167+
Which methods you implement are based upon which of these options are set.
168+
169+
.. list-table::
170+
:header-rows: 1
171+
172+
* - ``uses_window_frame``
173+
- ``supports_bounded_execution``
174+
- ``include_rank``
175+
- function_to_implement
176+
* - False (default)
177+
- False (default)
178+
- False (default)
179+
- ``evaluate_all``
180+
* - False
181+
- True
182+
- False
183+
- ``evaluate``
184+
* - False
185+
- True
186+
- False
187+
- ``evaluate_all_with_rank``
188+
* - True
189+
- True/False
190+
- True/False
191+
- ``evaluate``
192+
193+
UDWF options
194+
^^^^^^^^^^^^
195+
196+
When you define your UDWF you can override the functions that return these values. They will
197+
determine which evaluate functions are called.
198+
199+
- ``uses_window_frame`` is set for functions that compute based on the specified window frame. If
200+
your function depends upon the specified frame, set this to ``True``.
201+
- ``supports_bounded_execution`` specifies if your function can be incrementally computed.
202+
- ``include_rank`` is set to ``True`` for window functions that can be computed only using the rank
203+
information.
204+
205+
206+
.. code-block:: python
207+
208+
import pyarrow as pa
209+
from datafusion import udwf, col, SessionContext
210+
from datafusion.udf import WindowEvaluator
211+
212+
class ExponentialSmooth(WindowEvaluator):
213+
def __init__(self, alpha: float) -> None:
214+
self.alpha = alpha
215+
216+
def evaluate_all(self, values: list[pa.Array], num_rows: int) -> pa.Array:
217+
results = []
218+
curr_value = 0.0
219+
values = values[0]
220+
for idx in range(num_rows):
221+
if idx == 0:
222+
curr_value = values[idx].as_py()
223+
else:
224+
curr_value = values[idx].as_py() * self.alpha + curr_value * (
225+
1.0 - self.alpha
226+
)
227+
results.append(curr_value)
228+
229+
return pa.array(results)
230+
231+
exp_smooth = udwf(
232+
ExponentialSmooth(0.9),
233+
pa.float64(),
234+
pa.float64(),
235+
volatility="immutable",
236+
)
237+
238+
ctx = SessionContext()
239+
240+
df = ctx.from_pydict({
241+
"a": [1.0, 2.1, 2.9, 4.0, 5.1, 6.0, 6.9, 8.0]
242+
})
85243
86-
df.aggregate([],[my_udaf(col("a"))])
244+
df.select("a", exp_smooth(col("a")).alias("smooth_a")).show()

0 commit comments

Comments
 (0)