Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 63 additions & 11 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,15 +1106,6 @@ def _read_csv_w_bigquery_engine(
if dtype is not None and not utils.is_dict_like(dtype):
raise ValueError("dtype should be a dict-like object.")

if names is not None:
if len(names) != len(set(names)):
raise ValueError("Duplicated names are not allowed.")
if not (
bigframes.core.utils.is_list_like(names, allow_sets=False)
or isinstance(names, abc.KeysView)
):
raise ValueError("Names should be an ordered collection.")

if index_col is True:
raise ValueError("The value of index_col couldn't be 'True'")

Expand Down Expand Up @@ -1143,16 +1134,31 @@ def _read_csv_w_bigquery_engine(

job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.autodetect = True
job_config.field_delimiter = sep
job_config.encoding = encoding
job_config.labels = {"bigframes-api": "read_csv"}

# b/409070192: When header > 0, pandas and BigFrames returns different column naming.
if names is not None:
if len(names) != len(set(names)):
raise ValueError("Duplicated names are not allowed.")
if not (
bigframes.core.utils.is_list_like(names, allow_sets=False)
or isinstance(names, abc.KeysView)
):
raise ValueError("Names should be an ordered collection.")
else:
# names = None
names = self._try_to_get_names_from_pandas_read_csv(
filepath_or_buffer,
header=header,
index_col=index_col,
usecols=usecols,
)

# We want to match pandas behavior. If header is 0, no rows should be skipped, so we
# do not need to set `skip_leading_rows`. If header is None, then there is no header.
# Setting skip_leading_rows to 0 does that. If header=N and N>0, we want to skip N rows.
job_config.autodetect = True
if header is None:
job_config.skip_leading_rows = 0
elif header > 0:
Expand All @@ -1173,6 +1179,52 @@ def _read_csv_w_bigquery_engine(
df[column] = df[column].astype(dtype)
return df

def _try_to_get_names_from_pandas_read_csv(
self,
filepath_or_buffer,
*,
header,
index_col,
usecols,
) -> Optional[Sequence[str]]:
"""Attempts to infer column names from a CSV file using pandas.

This method uses `pandas.read_csv` to preview the column names. It is
neccesary because BigQuery's CSV loading behavior can differ from pandas:
- b/409070192: When `header > 0`, BigQuery generate names based on data type
(e.g., bool_field_0, string_field_1, etc.), while pandas use the literal
content of the specified header row for column names.
- b/324483018: BigQuery engine can throw error when column header in the CSV
file is not a valid BigQuery identifier (e.g., contains spaces, special characters)
"""
# Cannot infer names if index_col and usecols are specified.
if not (index_col is None or index_col == ()) or usecols is not None:
return None

try:
# Read only the header by setting nrows=0 for efficiency.
pandas_df = pandas.read_csv(
filepath_or_buffer,
header=header,
# Only get the header.
nrows=0,
)
columns = pandas_df.columns.tolist()
return columns if columns else None
except Exception:
msg = bfe.format_message(
"Could not infer column names with pandas; "
"The BigQuery backend will attempt to infer them instead. "
)
warnings.warn(msg, UserWarning)
finally:
if not isinstance(filepath_or_buffer, str):
# If the buffer is a BytesIO, we need to reset it to the beginning
# so that it can be read again later.
filepath_or_buffer.seek(0)

return None

def read_pickle(
self,
filepath_or_buffer: FilePath | ReadPickleBuffer,
Expand Down
26 changes: 9 additions & 17 deletions tests/system/small/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -1568,32 +1568,24 @@ def test_read_csv_for_gcs_file_w_header(session, df_and_gcs_csv, header):
# Compares results for pandas and bigframes engines
scalars_df, path = df_and_gcs_csv
bf_df = session.read_csv(path, engine="bigquery", index_col=False, header=header)
pd_df = session.read_csv(
path, index_col=False, header=header, dtype=scalars_df.dtypes.to_dict()
)

# b/408461403: workaround the issue where the slice does not work for DataFrame.
expected_df = session.read_pandas(scalars_df.to_pandas()[header:])
dtypes = {col: dtype for col, dtype in zip(bf_df.columns, bf_df.dtypes.to_list())}
pd_df = session.read_csv(path, index_col=False, header=header, dtype=dtypes)

assert pd_df.shape[0] == expected_df.shape[0]
expected_df = scalars_df[header:]
assert pd_df.shape[0] == scalars_df[header:].shape[0]
assert bf_df.shape[0] == pd_df.shape[0]

# We use a default index because of index_col=False, so the previous index
# column is just loaded as a column.
assert len(pd_df.columns) == len(expected_df.columns) + 1
assert len(bf_df.columns) == len(pd_df.columns)

# When `header > 0`, pandas and BigFrames may handle column naming differently.
# Pandas uses the literal content of the specified header row for column names,
# regardless of what it is. BigQuery, however, might generate default names based
# on data type (e.g.,bool_field_0,string_field_1, etc.).
if header == 0:
# BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs
# (b/280889935) or guarantee row ordering.
bf_df = bf_df.set_index("rowindex").sort_index()
pd_df = pd_df.set_index("rowindex")
pd.testing.assert_frame_equal(bf_df.to_pandas(), scalars_df.to_pandas())
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())
# BigFrames requires `sort_index()` because BigQuery doesn't preserve row IDs
# (b/280889935) or guarantee row ordering.
bf_df = bf_df.set_index(pd_df.columns[0]).sort_index()
pd_df = pd_df.set_index(pd_df.columns[0])
pd.testing.assert_frame_equal(bf_df.to_pandas(), pd_df.to_pandas())


def test_read_csv_w_usecols(session, df_and_local_csv):
Expand Down
Loading