From f704e0106f2a4fb16cefa3299832cad7c7be83cd Mon Sep 17 00:00:00 2001 From: batmanknows <122247678+batmnnn@users.noreply.github.com> Date: Fri, 19 Dec 2025 10:49:08 +0530 Subject: [PATCH 1/5] fix: implement custom nullability for spark abs function Fixes apache/datafusion#19162 The SparkAbs UDF was using the default is_nullable=true for all outputs, even when inputs were non-nullable. This commit implements return_field_from_args to properly propagate nullability from input arguments. Changes: - Add return_field_from_args implementation to SparkAbs - Output nullability now matches input nullability - Handle edge case where scalar argument is explicitly null - Add tests for nullability behavior --- datafusion/spark/src/function/math/abs.rs | 121 +++++++++++++++++++++- 1 file changed, 117 insertions(+), 4 deletions(-) diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index 97703937f39f2..d17340d0c74ee 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -16,10 +16,11 @@ // under the License. use arrow::array::*; -use arrow::datatypes::DataType; +use arrow::datatypes::{DataType, Field, FieldRef}; use datafusion_common::{DataFusionError, Result, ScalarValue, internal_err}; use datafusion_expr::{ - ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, + ColumnarValue, ReturnFieldArgs, ScalarFunctionArgs, ScalarUDFImpl, Signature, + Volatility, }; use datafusion_functions::{ downcast_named_arg, make_abs_function, make_wrapping_abs_function, @@ -69,8 +70,32 @@ impl ScalarUDFImpl for SparkAbs { &self.signature } - fn return_type(&self, arg_types: &[DataType]) -> Result { - Ok(arg_types[0].clone()) + fn return_type(&self, _arg_types: &[DataType]) -> Result { + internal_err!( + "SparkAbs: return_type() is not used; return_field_from_args() is implemented" + ) + } + + fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { + if args.arg_fields.is_empty() { + return internal_err!("abs expects at least 1 argument"); + } + + let input_field = &args.arg_fields[0]; + let out_dt = input_field.data_type().clone(); + let mut out_nullable = input_field.is_nullable(); + + // If any scalar argument is explicitly null, output must be nullable + let scalar_null_present = args + .scalar_arguments + .iter() + .any(|opt_s| opt_s.is_some_and(|sv| sv.is_null())); + + if scalar_null_present { + out_nullable = true; + } + + Ok(Arc::new(Field::new(self.name(), out_dt, out_nullable))) } fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { @@ -375,4 +400,92 @@ mod tests { as_decimal256_array ); } + + #[test] + fn test_abs_nullability() { + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ReturnFieldArgs; + use std::sync::Arc; + + let abs = SparkAbs::new(); + + // --- non-nullable Int32 input --- + let non_nullable_i32 = Arc::new(Field::new("c", DataType::Int32, false)); + let out_non_null = abs + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&non_nullable_i32)], + scalar_arguments: &[None], + }) + .unwrap(); + + // result should be non-nullable and the same DataType as input + assert!(!out_non_null.is_nullable()); + assert_eq!(out_non_null.data_type(), &DataType::Int32); + + // --- nullable Int32 input --- + let nullable_i32 = Arc::new(Field::new("c", DataType::Int32, true)); + let out_nullable = abs + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&nullable_i32)], + scalar_arguments: &[None], + }) + .unwrap(); + + // result should be nullable and the same DataType as input + assert!(out_nullable.is_nullable()); + assert_eq!(out_nullable.data_type(), &DataType::Int32); + + // --- non-nullable Float64 input --- + let non_nullable_f64 = Arc::new(Field::new("c", DataType::Float64, false)); + let out_f64 = abs + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&non_nullable_f64)], + scalar_arguments: &[None], + }) + .unwrap(); + + assert!(!out_f64.is_nullable()); + assert_eq!(out_f64.data_type(), &DataType::Float64); + + // --- nullable Float64 input --- + let nullable_f64 = Arc::new(Field::new("c", DataType::Float64, true)); + let out_f64_null = abs + .return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&nullable_f64)], + scalar_arguments: &[None], + }) + .unwrap(); + + assert!(out_f64_null.is_nullable()); + assert_eq!(out_f64_null.data_type(), &DataType::Float64); + } + + #[test] + fn test_abs_nullability_with_null_scalar() -> Result<()> { + use arrow::datatypes::{DataType, Field}; + use datafusion_expr::ReturnFieldArgs; + use std::sync::Arc; + + let func = SparkAbs::new(); + + // Non-nullable field with non-null scalar argument -> non-nullable result + let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Int32, false)); + let out = func.return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&non_nullable)], + scalar_arguments: &[None], + })?; + assert!(!out.is_nullable()); + assert_eq!(out.data_type(), &DataType::Int32); + + // Non-nullable field with null scalar argument -> nullable result + let null_scalar = ScalarValue::Int32(None); + let out_with_null_scalar = func.return_field_from_args(ReturnFieldArgs { + arg_fields: &[Arc::clone(&non_nullable)], + scalar_arguments: &[Some(&null_scalar)], + })?; + assert!(out_with_null_scalar.is_nullable()); + assert_eq!(out_with_null_scalar.data_type(), &DataType::Int32); + + Ok(()) + } } From 85903a2305146b45305e6b89aebd38d8ba81613c Mon Sep 17 00:00:00 2001 From: Shubham Yadav <122247678+batmnnn@users.noreply.github.com> Date: Fri, 19 Dec 2025 21:01:22 +0530 Subject: [PATCH 2/5] Apply suggestions from code review Co-authored-by: Martin Grigorov --- datafusion/spark/src/function/math/abs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index d17340d0c74ee..df4ab4344c70c 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -78,7 +78,7 @@ impl ScalarUDFImpl for SparkAbs { fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { if args.arg_fields.is_empty() { - return internal_err!("abs expects at least 1 argument"); + return internal_err!("abs takes exactly 1 argument, but got: {}", args.arg_fields.len()); } let input_field = &args.arg_fields[0]; @@ -472,7 +472,7 @@ mod tests { let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Int32, false)); let out = func.return_field_from_args(ReturnFieldArgs { arg_fields: &[Arc::clone(&non_nullable)], - scalar_arguments: &[None], + scalar_arguments: &[Some(ScalarValue::Int32(Some(1)))], })?; assert!(!out.is_nullable()); assert_eq!(out.data_type(), &DataType::Int32); From b149aab6d69569f2662406d5d8558f891fe6e7ce Mon Sep 17 00:00:00 2001 From: batmanknows <122247678+batmnnn@users.noreply.github.com> Date: Fri, 19 Dec 2025 21:12:17 +0530 Subject: [PATCH 3/5] fix: format internal_err and fix ScalarValue reference --- datafusion/spark/src/function/math/abs.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index df4ab4344c70c..86610493a4ab1 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -78,7 +78,10 @@ impl ScalarUDFImpl for SparkAbs { fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { if args.arg_fields.is_empty() { - return internal_err!("abs takes exactly 1 argument, but got: {}", args.arg_fields.len()); + return internal_err!( + "abs takes exactly 1 argument, but got: {}", + args.arg_fields.len() + ); } let input_field = &args.arg_fields[0]; @@ -470,9 +473,10 @@ mod tests { // Non-nullable field with non-null scalar argument -> non-nullable result let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Int32, false)); + let non_null_scalar = ScalarValue::Int32(Some(1)); let out = func.return_field_from_args(ReturnFieldArgs { arg_fields: &[Arc::clone(&non_nullable)], - scalar_arguments: &[Some(ScalarValue::Int32(Some(1)))], + scalar_arguments: &[Some(&non_null_scalar)], })?; assert!(!out.is_nullable()); assert_eq!(out.data_type(), &DataType::Int32); From 653771c8bb04bb5cbe8e3b3cc6a8c35e14247116 Mon Sep 17 00:00:00 2001 From: batmanknows <122247678+batmnnn@users.noreply.github.com> Date: Wed, 24 Dec 2025 23:14:22 +0530 Subject: [PATCH 4/5] refactor(spark-abs): remove redundant arg count check --- datafusion/spark/src/function/math/abs.rs | 7 ------- 1 file changed, 7 deletions(-) diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index 86610493a4ab1..80d868b3bd7f4 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -77,13 +77,6 @@ impl ScalarUDFImpl for SparkAbs { } fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { - if args.arg_fields.is_empty() { - return internal_err!( - "abs takes exactly 1 argument, but got: {}", - args.arg_fields.len() - ); - } - let input_field = &args.arg_fields[0]; let out_dt = input_field.data_type().clone(); let mut out_nullable = input_field.is_nullable(); From 3b7e10ff3aca6143fa3807cb5da653c7a4bc53ec Mon Sep 17 00:00:00 2001 From: batmanknows <122247678+batmnnn@users.noreply.github.com> Date: Thu, 25 Dec 2025 18:54:45 +0530 Subject: [PATCH 5/5] remove unnecessary scalar null check --- datafusion/spark/src/function/math/abs.rs | 42 +---------------------- 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/datafusion/spark/src/function/math/abs.rs b/datafusion/spark/src/function/math/abs.rs index 80d868b3bd7f4..101291ac5f66e 100644 --- a/datafusion/spark/src/function/math/abs.rs +++ b/datafusion/spark/src/function/math/abs.rs @@ -79,17 +79,7 @@ impl ScalarUDFImpl for SparkAbs { fn return_field_from_args(&self, args: ReturnFieldArgs) -> Result { let input_field = &args.arg_fields[0]; let out_dt = input_field.data_type().clone(); - let mut out_nullable = input_field.is_nullable(); - - // If any scalar argument is explicitly null, output must be nullable - let scalar_null_present = args - .scalar_arguments - .iter() - .any(|opt_s| opt_s.is_some_and(|sv| sv.is_null())); - - if scalar_null_present { - out_nullable = true; - } + let out_nullable = input_field.is_nullable(); Ok(Arc::new(Field::new(self.name(), out_dt, out_nullable))) } @@ -455,34 +445,4 @@ mod tests { assert!(out_f64_null.is_nullable()); assert_eq!(out_f64_null.data_type(), &DataType::Float64); } - - #[test] - fn test_abs_nullability_with_null_scalar() -> Result<()> { - use arrow::datatypes::{DataType, Field}; - use datafusion_expr::ReturnFieldArgs; - use std::sync::Arc; - - let func = SparkAbs::new(); - - // Non-nullable field with non-null scalar argument -> non-nullable result - let non_nullable: FieldRef = Arc::new(Field::new("col", DataType::Int32, false)); - let non_null_scalar = ScalarValue::Int32(Some(1)); - let out = func.return_field_from_args(ReturnFieldArgs { - arg_fields: &[Arc::clone(&non_nullable)], - scalar_arguments: &[Some(&non_null_scalar)], - })?; - assert!(!out.is_nullable()); - assert_eq!(out.data_type(), &DataType::Int32); - - // Non-nullable field with null scalar argument -> nullable result - let null_scalar = ScalarValue::Int32(None); - let out_with_null_scalar = func.return_field_from_args(ReturnFieldArgs { - arg_fields: &[Arc::clone(&non_nullable)], - scalar_arguments: &[Some(&null_scalar)], - })?; - assert!(out_with_null_scalar.is_nullable()); - assert_eq!(out_with_null_scalar.data_type(), &DataType::Int32); - - Ok(()) - } }