Skip to content

Commit ed8047b

Browse files
committed
checkpoint: single-index dev complete. still facing errors when dealing with multi-index
1 parent 6ea3f63 commit ed8047b

File tree

2 files changed

+72
-22
lines changed

2 files changed

+72
-22
lines changed

bigframes/core/blocks.py

Lines changed: 56 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2345,30 +2345,48 @@ def merge(
23452345
left_post_join_ids = tuple(get_column_left[id] for id in left_join_ids)
23462346
right_post_join_ids = tuple(get_column_right[id] for id in right_join_ids)
23472347

2348-
joined_expr, coalesced_ids = coalesce_columns(
2349-
joined_expr, left_post_join_ids, right_post_join_ids, how=how, drop=False
2350-
)
2348+
if left_index or right_index:
2349+
# For some reason pandas coalesces two joining columns if one side is an index.
2350+
joined_expr, resolved_join_ids = coalesce_columns(
2351+
joined_expr, left_post_join_ids, right_post_join_ids
2352+
)
2353+
else:
2354+
joined_expr, resolved_join_ids = resolve_col_join_ids(
2355+
joined_expr,
2356+
left_post_join_ids,
2357+
right_post_join_ids,
2358+
how=how,
2359+
drop=False,
2360+
)
23512361

23522362
result_columns = []
23532363
matching_join_labels = []
2364+
23542365
# Select left value columns
23552366
for col_id in self.value_columns:
23562367
if col_id in left_join_ids:
23572368
key_part = left_join_ids.index(col_id)
23582369
matching_right_id = right_join_ids[key_part]
2359-
if self.col_id_to_label[col_id] == other.col_id_to_label.get(
2360-
matching_right_id, None
2370+
if (
2371+
right_index
2372+
or self.col_id_to_label[col_id]
2373+
== other.col_id_to_label[matching_right_id]
23612374
):
23622375
matching_join_labels.append(self.col_id_to_label[col_id])
2363-
result_columns.append(coalesced_ids[key_part])
2376+
result_columns.append(resolved_join_ids[key_part])
23642377
else:
23652378
result_columns.append(get_column_left[col_id])
23662379
else:
23672380
result_columns.append(get_column_left[col_id])
2381+
2382+
# Select right value columns
23682383
for col_id in other.value_columns:
23692384
if col_id in right_join_ids:
23702385
if other.col_id_to_label[col_id] in matching_join_labels:
23712386
pass
2387+
elif left_index:
2388+
key_part = right_join_ids.index(col_id)
2389+
result_columns.append(resolved_join_ids[key_part])
23722390
else:
23732391
result_columns.append(get_column_right[col_id])
23742392
else:
@@ -2379,14 +2397,19 @@ def merge(
23792397
joined_expr = joined_expr.order_by(
23802398
[
23812399
ordering.OrderingExpression(ex.deref(col_id))
2382-
for col_id in coalesced_ids
2400+
for col_id in resolved_join_ids
23832401
],
23842402
)
23852403

23862404
left_idx_id_post_join = [get_column_left[id] for id in self.index_columns]
23872405
right_idx_id_post_join = [get_column_right[id] for id in other.index_columns]
23882406
index_cols = _resolve_index_col(
2389-
left_idx_id_post_join, right_idx_id_post_join, left_index, right_index, how
2407+
left_idx_id_post_join,
2408+
right_idx_id_post_join,
2409+
resolved_join_ids,
2410+
left_index,
2411+
right_index,
2412+
how,
23902413
)
23912414

23922415
joined_expr = joined_expr.select_columns(result_columns + index_cols)
@@ -3123,7 +3146,7 @@ def join_mono_indexed(
31233146
left_index = get_column_left[left.index_columns[0]]
31243147
right_index = get_column_right[right.index_columns[0]]
31253148
# Drop original indices from each side. and used the coalesced combination generated by the join.
3126-
combined_expr, coalesced_join_cols = coalesce_columns(
3149+
combined_expr, coalesced_join_cols = resolve_col_join_ids(
31273150
combined_expr, [left_index], [right_index], how=how
31283151
)
31293152
if sort:
@@ -3188,7 +3211,7 @@ def join_multi_indexed(
31883211
left_ids_post_join = [get_column_left[id] for id in left_join_ids]
31893212
right_ids_post_join = [get_column_right[id] for id in right_join_ids]
31903213
# Drop original indices from each side. and used the coalesced combination generated by the join.
3191-
combined_expr, coalesced_join_cols = coalesce_columns(
3214+
combined_expr, coalesced_join_cols = resolve_col_join_ids(
31923215
combined_expr, left_ids_post_join, right_ids_post_join, how=how
31933216
)
31943217
if sort:
@@ -3231,13 +3254,17 @@ def resolve_label_id(label: Label) -> str:
32313254

32323255

32333256
# TODO: Rewrite just to return expressions
3234-
def coalesce_columns(
3257+
def resolve_col_join_ids(
32353258
expr: core.ArrayValue,
32363259
left_ids: typing.Sequence[str],
32373260
right_ids: typing.Sequence[str],
32383261
how: str,
32393262
drop: bool = True,
32403263
) -> Tuple[core.ArrayValue, Sequence[str]]:
3264+
"""
3265+
Collapses and selects the joining column IDs, with the assumption that
3266+
the ids are all belong to value columns.
3267+
"""
32413268
result_ids = []
32423269
for left_id, right_id in zip(left_ids, right_ids):
32433270
if how == "left" or how == "inner" or how == "cross":
@@ -3249,7 +3276,6 @@ def coalesce_columns(
32493276
if drop:
32503277
expr = expr.drop_columns([left_id])
32513278
elif how == "outer":
3252-
coalesced_id = guid.generate_guid()
32533279
expr, coalesced_id = expr.project_to_id(
32543280
ops.coalesce_op.as_expr(left_id, right_id)
32553281
)
@@ -3261,6 +3287,21 @@ def coalesce_columns(
32613287
return expr, result_ids
32623288

32633289

3290+
def coalesce_columns(
3291+
expr: core.ArrayValue,
3292+
left_ids: typing.Sequence[str],
3293+
right_ids: typing.Sequence[str],
3294+
) -> tuple[core.ArrayValue, list[str]]:
3295+
result_ids = []
3296+
for left_id, right_id in zip(left_ids, right_ids):
3297+
expr, coalesced_id = expr.project_to_id(
3298+
ops.coalesce_op.as_expr(left_id, right_id)
3299+
)
3300+
result_ids.append(coalesced_id)
3301+
3302+
return expr, result_ids
3303+
3304+
32643305
def _cast_index(block: Block, dtypes: typing.Sequence[bigframes.dtypes.Dtype]):
32653306
original_block = block
32663307
result_ids = []
@@ -3481,6 +3522,7 @@ def _pd_index_to_array_value(
34813522
def _resolve_index_col(
34823523
left_index_cols: list[str],
34833524
right_index_cols: list[str],
3525+
resolved_join_ids: list[str],
34843526
left_index: bool,
34853527
right_index: bool,
34863528
how: typing.Literal[
@@ -3497,12 +3539,13 @@ def _resolve_index_col(
34973539
if how == "right":
34983540
return right_index_cols
34993541
if how == "outer":
3500-
return []
3542+
return resolved_join_ids
35013543
else:
35023544
return []
35033545
elif left_index and not right_index:
35043546
return right_index_cols
35053547
elif right_index and not left_index:
35063548
return left_index_cols
35073549
else:
3550+
# Joining with value columns only. Existing indices will be discarded.
35083551
return []

tests/system/small/core/test_reshape.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,17 @@
2424
("left_on", "right_on", "left_index", "right_index"),
2525
[
2626
("col_a", None, False, True),
27-
(None, "col_c", True, False),
27+
(None, "col_d", True, False),
2828
(None, None, True, True),
2929
],
3030
)
31+
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
3132
def test_join_with_index(
32-
session: session.Session, left_on, right_on, left_index, right_index
33+
session: session.Session, left_on, right_on, left_index, right_index, how
3334
):
34-
df1 = pd.DataFrame({"col_a": [1, 2, 3], "col_b": [2, 3, 4]})
35+
df1 = pd.DataFrame({"col_a": [1, 2, 3], "col_b": [2, 3, 4]}, index=[1, 2, 3])
3536
bf1 = session.read_pandas(df1)
36-
df2 = pd.DataFrame({"col_c": [1, 2, 3], "col_d": [2, 3, 4]})
37+
df2 = pd.DataFrame({"col_c": [1, 2, 3], "col_d": [2, 3, 4]}, index=[2, 3, 4])
3738
bf2 = session.read_pandas(df2)
3839

3940
bf_result = merge.merge(
@@ -43,6 +44,7 @@ def test_join_with_index(
4344
right_on=right_on,
4445
left_index=left_index,
4546
right_index=right_index,
47+
how=how
4648
).to_pandas()
4749
pd_result = pd.merge(
4850
df1,
@@ -51,6 +53,7 @@ def test_join_with_index(
5153
right_on=right_on,
5254
left_index=left_index,
5355
right_index=right_index,
56+
how=how
5457
)
5558

5659
pandas.testing.assert_frame_equal(
@@ -66,13 +69,15 @@ def test_join_with_index(
6669
(None, None, True, True),
6770
],
6871
)
72+
@pytest.mark.parametrize("how", ["inner", "left", "right", "outer"])
6973
def test_join_with_multiindex(
70-
session: session.Session, left_on, right_on, left_index, right_index
74+
session: session.Session, left_on, right_on, left_index, right_index, how
7175
):
72-
multi_idx = pd.MultiIndex.from_tuples([(1, 2), (2, 3), (3, 4)])
73-
df1 = pd.DataFrame({"col_a": [1, 2, 3], "col_b": [2, 3, 4]}, index=multi_idx)
76+
multi_idx1 = pd.MultiIndex.from_tuples([(1, 2), (2, 3), (3, 5)])
77+
df1 = pd.DataFrame({"col_a": [1, 2, 3], "col_b": [2, 3, 4]}, index=multi_idx1)
7478
bf1 = session.read_pandas(df1)
75-
df2 = pd.DataFrame({"col_c": [1, 2, 3], "col_d": [2, 3, 4]}, index=multi_idx)
79+
multi_idx2 = pd.MultiIndex.from_tuples([(1, 2), (2, 3), (3, 2)])
80+
df2 = pd.DataFrame({"col_c": [1, 2, 3], "col_d": [2, 3, 4]}, index=multi_idx2)
7681
bf2 = session.read_pandas(df2)
7782

7883
bf_result = merge.merge(
@@ -82,6 +87,7 @@ def test_join_with_multiindex(
8287
right_on=right_on,
8388
left_index=left_index,
8489
right_index=right_index,
90+
how=how
8591
).to_pandas()
8692
pd_result = pd.merge(
8793
df1,
@@ -90,8 +96,9 @@ def test_join_with_multiindex(
9096
right_on=right_on,
9197
left_index=left_index,
9298
right_index=right_index,
99+
how=how
93100
)
94101

95102
pandas.testing.assert_frame_equal(
96-
bf_result, pd_result, check_dtype=False, check_index_type=False
103+
bf_result.sort_index(), pd_result.sort_index(), check_dtype=False, check_index_type=False,
97104
)

0 commit comments

Comments
 (0)