Skip to content

Conversation

@Yicong-Huang
Copy link
Contributor

To be filled

Extract the struct flattening/wrapping logic from ArrowStreamUDFSerializer into reusable transformers in a new transformers.py module.
…use-flatten-struct' into SPARK-55176/refactor/extract-arrow-to-pandas-converter
Resolved conflicts by:
- Adopting create_converter architecture while keeping arrow_to_pandas compatibility
- Using _load_group_dataframes helper from upstream
- Removing GroupArrowUDFSerializer (not in upstream)
- Keeping input_type (singular) parameter name
…er/mapper layers

This refactoring separates concerns between serialization and data transformation:
- Serializers now only handle data serialization/deserialization
- Wrappers handle UDF wrapping and data format conversion
- Mappers handle UDF result aggregation using transformer utilities

Key changes:
1. Simplified serializer hierarchy by removing redundant classes:
   - Removed ArrowStreamPandasUDFSerializer, ArrowStreamGroupUDFSerializer,
     ArrowStreamArrowUDFSerializer, ArrowStreamUDTFSerializer
   - Renamed ArrowStreamMapIterSerializer to ArrowStreamGroupSerializer
   - Unified grouped/non-grouped UDF handling in ArrowStreamGroupSerializer

2. Introduced transformer utility classes:
   - ArrowBatchTransformer: Arrow batch operations (wrap_struct, flatten_struct,
     partial_batch, partial_table, concat_batches, merge_batches, reorder_columns)
   - PandasBatchTransformer: Pandas/Arrow conversions (to_arrow, concat_series_batches)

3. Moved data transformations from serializers to wrappers/mappers:
   - Moved to_arrow conversion from mappers to wrappers for Pandas agg UDFs
   - Wrappers now return RecordBatch directly instead of (result, arrow_type) tuples
   - Mappers simplified to use transformer methods for common operations

Benefits:
- Clearer separation of concerns
- Reduced code duplication through transformer utilities
- Easier to maintain and extend
- Consistent data format handling across UDF types
- Use ArrowBatchTransformer.zip_batches for type coercion instead of manual loop
- Simplify error handling logic
- Handle empty struct case properly
- Unwrap wrapped batches from worker before type coercion, then wrap back for JVM
- All 46/47 UDTF tests pass (1 known failure unrelated to this change)
@Yicong-Huang Yicong-Huang marked this pull request as draft January 30, 2026 21:47
@github-actions
Copy link

⚠️ Pull Request Title Validation

This pull request title does not contain a JIRA issue ID.

Please update the title to either:

  • Include a JIRA ID: [SPARK-12345] Your description
  • Mark as minor change: [MINOR] Your description

For minor changes that don't require a JIRA ticket (e.g., typo fixes), please prefix the title with [MINOR].


This comment was automatically generated by GitHub Actions

This commit fixes several issues with pandas UDF handling after the
serializer refactoring:

1. Fix parameter initialization order in read_udfs():
   - Move pandas_udf_* parameter defaults BEFORE the if-elif chain
   - Previously they were reset AFTER being set, causing df_for_struct
     to always be False for scalar pandas UDFs

2. Add struct_in_pandas="dict" to scalar UDF wrappers:
   - wrap_scalar_pandas_udf: enables DataFrame→struct array conversion
   - wrap_pandas_batch_iter_udf: same fix for iter variant

3. Fix grouped map UDF column matching:
   - Use assign_cols_by_name to match DataFrame columns by name when
     available, otherwise by position
   - Handle empty DataFrame (0 columns) by creating empty struct array

4. Fix Arrow batch handling:
   - zip_batches: convert items to list for pa.RecordBatch.from_arrays
   - mapper: special handling for SQL_ARROW_BATCHED_UDF to return raw
     result instead of calling zip_batches

5. Fix error handling in create_array:
   - Only catch ArrowInvalid for arrow_cast retry (not ArrowTypeError)
   - Add ArrowTypeError to TypeError handler for proper error messages
   - Update error message format to match expected test output

All 264+ pandas UDF tests pass including struct type tests.
Remove timezone, int_to_decimal_coercion_enabled, and assign_cols_by_name
parameters that were stored but never used by the serializer or subclasses.
…to-pandas conversion

- Extract common verification functions: verify_result_length,
  verify_result_type, verify_is_iterable, verify_element_type
- Simplify wrapper functions using common verification utilities
- Centralize Arrow-to-pandas conversion in read_udfs mapper/func
- Remove unused pandas_udf_* variables from read_udfs
- Fix is_scalar_pandas_iter to not convert Arrow iter UDFs to pandas
- Separate iter UDF branches by type for clarity (scalar pandas/arrow,
  map pandas, map arrow)
- Wrapper functions now return (result, arrow_return_type) tuples,
  with output conversion centralized in mapper/func
- Grouped map pandas UDFs now receive Iterator[DataFrame] directly
  instead of Iterator[List[Series]]
- Inline concat_series_batches and series_batches_to_dataframe methods
Simplify dump_stream to fix CI failure in Arrow UDTF lateral join test.

The previous implementation had complex unwrapping logic that assumed
batches might come wrapped from the worker. However, the worker always
yields unwrapped batches, so the detection and unwrapping code was
causing issues with the data flow.

Changes:
- Remove batch unwrapping logic (batches are never wrapped at this point)
- Use direct array casting instead of zip_batches for type coercion
- Maintain the proper flow: receive unwrapped batch → coerce types → wrap for JVM
Move the load_stream and dump_stream logic from
TransformWithStateInPySparkRowSerializer and
TransformWithStateInPySparkRowInitStateSerializer into worker.py.

Both eval types now use ArrowStreamGroupSerializer directly,
simplifying the serializer hierarchy.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant