Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/explorer/backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ defmodule Explorer.Backend.DataFrame do
[df()],
out_df :: df(),
on :: list({column_name(), column_name()}),
how :: :left | :inner | :outer | :right | :cross
how :: :left | :inner | :outer | :right | :cross,
nulls_equal :: boolean()
) :: df

@callback join_asof(
Expand Down
19 changes: 17 additions & 2 deletions lib/explorer/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5151,6 +5151,8 @@ defmodule Explorer.DataFrame do

* `:on` - The column(s) to join on. Defaults to overlapping columns. Does not apply to cross join.
* `:how` - One of the join types (as an atom) described above. Defaults to `:inner`.
* `:nulls_equal` - If `true`, `nil` values are considered equal for matching.
Defaults to `false` (standard SQL semantics where `nil != nil`).

## Examples

Expand Down Expand Up @@ -5217,6 +5219,18 @@ defmodule Explorer.DataFrame do
c string ["d", "e", "f", "d", "e", ...]
>

Join with nulls equal:

iex> left = Explorer.DataFrame.new(a: [1, nil], b: ["a", "b"])
iex> right = Explorer.DataFrame.new(a: [1, nil], c: ["d", "e"])
iex> Explorer.DataFrame.join(left, right, nulls_equal: true)
#Explorer.DataFrame<
Polars[2 x 3]
a s64 [1, nil]
b string ["a", "b"]
c string ["d", "e"]
>

Inner join with different names:

iex> left = Explorer.DataFrame.new(a: [1, 2, 3], b: ["a", "b", "c"])
Expand Down Expand Up @@ -5323,7 +5337,8 @@ defmodule Explorer.DataFrame do
opts =
Keyword.validate!(opts,
on: find_overlapping_columns(left_columns, right_columns),
how: :inner
how: :inner,
nulls_equal: false
)

unless opts[:how] in @valid_join_types do
Expand Down Expand Up @@ -5357,7 +5372,7 @@ defmodule Explorer.DataFrame do

out_df = out_df_for_join(how, left, right, on)

Shared.apply_dataframe([left, right], :join, [out_df, on, how])
Shared.apply_dataframe([left, right], :join, [out_df, on, how, opts[:nulls_equal]])
end

defp find_overlapping_columns(left_columns, right_columns) do
Expand Down
4 changes: 2 additions & 2 deletions lib/explorer/polars_backend/data_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -916,11 +916,11 @@ defmodule Explorer.PolarsBackend.DataFrame do
# Two or more table verbs

@impl true
def join([left, right], out_df, on, how) do
def join([left, right], out_df, on, how, nulls_equal) do
left = lazy(left)
right = lazy(right)

ldf = LazyFrame.join([left, right], out_df, on, how)
ldf = LazyFrame.join([left, right], out_df, on, how, nulls_equal)
LazyFrame.collect(ldf)
end

Expand Down
18 changes: 8 additions & 10 deletions lib/explorer/polars_backend/lazy_frame.ex
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ defmodule Explorer.PolarsBackend.LazyFrame do
# Two or more tables

@impl true
def join([%DF{} = left, %DF{} = right], %DF{} = out_df, on, how)
def join([%DF{} = left, %DF{} = right], %DF{} = out_df, on, how, nulls_equal)
when is_list(on) and how in [:left, :inner, :cross, :outer] do
how = Atom.to_string(how)

Expand All @@ -587,16 +587,18 @@ defmodule Explorer.PolarsBackend.LazyFrame do
|> Enum.map(fn {left, right} -> {Native.expr_column(left), Native.expr_column(right)} end)
|> Enum.unzip()

opts = %{suffix: "_right", nulls_equal: nulls_equal}

Shared.apply_dataframe(
left,
out_df,
:lf_join,
[right.data, left_on, right_on, how, "_right"]
[right.data, left_on, right_on, how, opts]
)
end

@impl true
def join([%DF{} = left, %DF{} = right], %DF{} = out_df, on, :right)
def join([%DF{} = left, %DF{} = right], %DF{} = out_df, on, :right, nulls_equal)
when is_list(on) do
# Right join is the opposite of left join. So we swap the "on" keys, and swap the DFs
# in the join.
Expand All @@ -605,17 +607,13 @@ defmodule Explorer.PolarsBackend.LazyFrame do
|> Enum.map(fn {left, right} -> {Native.expr_column(right), Native.expr_column(left)} end)
|> Enum.unzip()

opts = %{suffix: "_left", nulls_equal: nulls_equal}

Shared.apply_dataframe(
right,
out_df,
:lf_join,
[
left.data,
left_on,
right_on,
"left",
"_left"
]
[left.data, left_on, right_on, "left", opts]
)
end

Expand Down
2 changes: 1 addition & 1 deletion lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ defmodule Explorer.PolarsBackend.Native do
def lf_rename_columns(_df, _column_pairs), do: err()
def lf_drop_nils(_df, _column_pairs), do: err()
def lf_pivot_longer(_df, _id_vars, _value_vars, _names_to, _values_to), do: err()
def lf_join(_df, _other, _left_on, _right_on, _how, _suffix), do: err()
def lf_join(_df, _other, _left_on, _right_on, _how, _opts), do: err()

def lf_join_asof(
_df,
Expand Down
15 changes: 12 additions & 3 deletions native/explorer/src/lazyframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,13 @@ use crate::{
ExplorerError,
};
use polars::{lazy::dsl::Selector, prelude::*};
use rustler::NifMap;

#[derive(NifMap)]
pub struct ExJoinOptions {
pub suffix: String,
pub nulls_equal: bool,
}

// Loads the IO functions for read/writing CSV, NDJSON, Parquet, etc.
pub mod io;
Expand Down Expand Up @@ -320,7 +327,7 @@ pub fn lf_join(
left_on: Vec<ExExpr>,
right_on: Vec<ExExpr>,
how: &str,
suffix: &str,
opts: ExJoinOptions,
) -> Result<ExLazyFrame, ExplorerError> {
let how = match how {
"left" => JoinType::Left,
Expand All @@ -344,15 +351,17 @@ pub fn lf_join(
.join_builder()
.with(ldf1)
.how(JoinType::Cross)
.suffix(suffix)
.suffix(&opts.suffix)
.join_nulls(opts.nulls_equal)
.finish(),
_ => ldf
.join_builder()
.with(ldf1)
.how(how)
.left_on(ex_expr_to_exprs(left_on))
.right_on(ex_expr_to_exprs(right_on))
.suffix(suffix)
.suffix(&opts.suffix)
.join_nulls(opts.nulls_equal)
.finish(),
};

Expand Down
37 changes: 37 additions & 0 deletions test/explorer/data_frame/lazy_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,43 @@ defmodule Explorer.DataFrame.LazyTest do
d_left: [5, 6, 6]
}
end

test "nulls_equal: false is the default (SQL semantics)" do
left = DF.new([a: [1, nil], b: ["x", "y"]], lazy: true)
right = DF.new([a: [1, nil], c: ["p", "q"]], lazy: true)

# Default behavior: nil != nil
ldf = DF.join(left, right)
df = DF.collect(ldf)
assert DF.n_rows(df) == 1
assert DF.to_columns(df, atom_keys: true) == %{a: [1], b: ["x"], c: ["p"]}
end

test "nulls_equal: true matches nil values in inner join" do
left = DF.new([a: [1, 2, nil], b: ["a", "b", "c"]], lazy: true)
right = DF.new([a: [1, nil, 4], c: ["d", "e", "f"]], lazy: true)

ldf = DF.join(left, right, nulls_equal: true)
df = DF.collect(ldf)
assert DF.n_rows(df) == 2

assert DF.to_columns(df, atom_keys: true) == %{
a: [1, nil],
b: ["a", "c"],
c: ["d", "e"]
}
end

test "nulls_equal: true with right join" do
left = DF.new([a: [1, nil], b: ["a", "b"]], lazy: true)
right = DF.new([a: [nil, 2, 3], c: ["d", "e", "f"]], lazy: true)

ldf = DF.join(left, right, how: :right, nulls_equal: true)
df = DF.collect(ldf)
assert DF.n_rows(df) == 3
# Right join keeps right table order
assert DF.to_columns(df, atom_keys: true).c == ["d", "e", "f"]
end
end

describe "join_asof/3" do
Expand Down
34 changes: 34 additions & 0 deletions test/explorer/data_frame_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2574,6 +2574,40 @@ defmodule Explorer.DataFrameTest do

assert_raise ArgumentError, msg, fn -> DF.join(left, right, on: [0]) end
end

test "nulls_equal: false is the default (SQL semantics)" do
left = DF.new(a: [1, nil], b: ["x", "y"])
right = DF.new(a: [1, nil], c: ["p", "q"])

# Default behavior: nil != nil
df = DF.join(left, right)
assert DF.n_rows(df) == 1
assert DF.to_columns(df, atom_keys: true) == %{a: [1], b: ["x"], c: ["p"]}
end

test "nulls_equal: true matches nil values in inner join" do
left = DF.new(a: [1, 2, nil], b: ["a", "b", "c"])
right = DF.new(a: [1, nil, 4], c: ["d", "e", "f"])

df = DF.join(left, right, nulls_equal: true)
assert DF.n_rows(df) == 2

assert DF.to_columns(df, atom_keys: true) == %{
a: [1, nil],
b: ["a", "c"],
c: ["d", "e"]
}
end

test "nulls_equal: true with right join" do
left = DF.new(a: [1, nil], b: ["a", "b"])
right = DF.new(a: [nil, 2, 3], c: ["d", "e", "f"])

df = DF.join(left, right, how: :right, nulls_equal: true)
assert DF.n_rows(df) == 3
# Right join keeps right table order
assert DF.to_columns(df, atom_keys: true).c == ["d", "e", "f"]
end
end

describe "table/1" do
Expand Down
Loading