@@ -33,7 +33,11 @@ namespace arrow {
3333
3434using internal::checked_cast;
3535
36+ using compute::and_;
37+ using compute::field_ref;
3638using compute::FilterOptions;
39+ using compute::is_null;
40+ using compute::not_;
3741
3842namespace acero {
3943namespace {
@@ -51,14 +55,34 @@ class FilterNode : public MapNode {
5155 auto schema = inputs[0 ]->output_schema ();
5256
5357 const auto & filter_options = checked_cast<const FilterNodeOptions&>(options);
58+ Expression filter_expression;
59+ if (!filter_options.filter_not_null .empty ()) {
60+ std::vector<Expression> operands = {filter_options.filter_expression };
61+ for (const auto & c : filter_options.filter_not_null ) {
62+ auto field_index = schema->GetFieldIndex (c);
63+ if (field_index == -1 ) {
64+ return Status::KeyError (" Column not found: " , c);
65+ }
66+ auto & field = schema->field (field_index);
67+ if (field->nullable ()) {
68+ ARROW_ASSIGN_OR_RAISE (
69+ schema, schema->SetField (field_index, field->WithNullable (false )));
70+ }
71+ operands.push_back (not_ (is_null (field_ref (c))));
72+ }
73+ filter_expression = and_ (operands);
74+ } else {
75+ filter_expression = filter_options.filter_expression ;
76+ }
5477
55- auto filter_expression = filter_options.filter_expression ;
5678 if (!filter_expression.IsBound ()) {
5779 ARROW_ASSIGN_OR_RAISE (
5880 filter_expression,
5981 filter_expression.Bind (*schema, plan->query_context ()->exec_context ()));
6082 }
6183
84+ std::vector<std::string> filter_not_null;
85+
6286 if (filter_expression.type ()->id () != Type::BOOL) {
6387 return Status::TypeError (" Filter expression must evaluate to bool, but " ,
6488 filter_expression.ToString (), " evaluates to " ,
0 commit comments