Skip to content

Commit 6b9587a

Browse files
fix: Fix issue with stream upload batch size upload limit
1 parent 0cb5217 commit 6b9587a

File tree

3 files changed

+82
-12
lines changed

3 files changed

+82
-12
lines changed

bigframes/core/local_data.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,13 @@ def to_arrow(
124124
geo_format: Literal["wkb", "wkt"] = "wkt",
125125
duration_type: Literal["int", "duration"] = "duration",
126126
json_type: Literal["string"] = "string",
127+
max_chunksize: Optional[int] = None,
127128
) -> tuple[pa.Schema, Iterable[pa.RecordBatch]]:
128129
if geo_format != "wkt":
129130
raise NotImplementedError(f"geo format {geo_format} not yet implemented")
130131
assert json_type == "string"
131132

132-
batches = self.data.to_batches()
133+
batches = self.data.to_batches(max_chunksize=max_chunksize)
133134
schema = self.data.schema
134135
if duration_type == "int":
135136
schema = _schema_durations_to_ints(schema)

bigframes/session/loader.py

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import datetime
2020
import io
2121
import itertools
22+
import math
2223
import os
2324
import typing
2425
from typing import (
@@ -397,6 +398,13 @@ def stream_data(
397398
offsets_col: str,
398399
) -> bq_data.BigqueryDataSource:
399400
"""Load managed data into bigquery"""
401+
MAX_BYTES = 10000000 # streaming api has 10MB limit
402+
SAFETY_MARGIN = (
403+
40 # Perf seems bad for large chunks, so do 40x smaller than max
404+
)
405+
batch_count = data.metadata.total_bytes // (MAX_BYTES // SAFETY_MARGIN)
406+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
407+
400408
schema_w_offsets = data.schema.append(
401409
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
402410
)
@@ -410,16 +418,24 @@ def stream_data(
410418
)
411419
rows_w_offsets = ((*row, offset) for offset, row in enumerate(rows))
412420

413-
for errors in self._bqclient.insert_rows(
414-
load_table_destination,
415-
rows_w_offsets,
416-
selected_fields=bq_schema,
417-
row_ids=map(str, itertools.count()), # used to ensure only-once insertion
418-
):
419-
if errors:
420-
raise ValueError(
421-
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
422-
)
421+
# TODO: don't use batched
422+
batches = _batched(rows_w_offsets, rows_per_batch)
423+
ids_iter = map(str, itertools.count())
424+
425+
for batch in batches:
426+
batch_rows = list(batch)
427+
row_ids = itertools.islice(ids_iter, len(batch_rows))
428+
429+
for errors in self._bqclient.insert_rows(
430+
load_table_destination,
431+
batch_rows,
432+
selected_fields=bq_schema,
433+
row_ids=row_ids, # used to ensure only-once insertion
434+
):
435+
if errors:
436+
raise ValueError(
437+
f"Problem loading at least one row from DataFrame: {errors}. {constants.FEEDBACK_LINK}"
438+
)
423439
destination_table = self._bqclient.get_table(load_table_destination)
424440
return bq_data.BigqueryDataSource(
425441
bq_data.GbqTable.from_table(destination_table),
@@ -434,6 +450,13 @@ def write_data(
434450
offsets_col: str,
435451
) -> bq_data.BigqueryDataSource:
436452
"""Load managed data into bigquery"""
453+
MAX_BYTES = 10000000 # streaming api has 10MB limit
454+
SAFETY_MARGIN = (
455+
4 # aim for 2.5mb to account for row variance, format differences, etc.
456+
)
457+
batch_count = data.metadata.total_bytes // (MAX_BYTES // SAFETY_MARGIN)
458+
rows_per_batch = math.ceil(data.metadata.row_count / batch_count)
459+
437460
schema_w_offsets = data.schema.append(
438461
schemata.SchemaItem(offsets_col, bigframes.dtypes.INT_DTYPE)
439462
)
@@ -450,7 +473,9 @@ def write_data(
450473

451474
def request_gen() -> Generator[bq_storage_types.AppendRowsRequest, None, None]:
452475
schema, batches = data.to_arrow(
453-
offsets_col=offsets_col, duration_type="int"
476+
offsets_col=offsets_col,
477+
duration_type="int",
478+
max_chunksize=rows_per_batch,
454479
)
455480
offset = 0
456481
for batch in batches:
@@ -1332,3 +1357,10 @@ def _validate_dtype_can_load(name: str, column_type: bigframes.dtypes.Dtype):
13321357
f"Nested JSON types, found in column `{name}`: `{column_type}`', "
13331358
f"are currently unsupported for upload. {constants.FEEDBACK_LINK}"
13341359
)
1360+
1361+
1362+
# itertools.batched not available in python <3.12, so we use this instead
1363+
def _batched(iterator: Iterable, n: int) -> Iterable:
1364+
assert n > 0
1365+
while batch := tuple(itertools.islice(iterator, n)):
1366+
yield batch

tests/system/large/test_session.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,50 @@
1717

1818
import google.cloud.bigquery as bigquery
1919
import google.cloud.exceptions
20+
import numpy as np
21+
import pandas as pd
2022
import pytest
2123

2224
import bigframes
2325
import bigframes.pandas as bpd
2426
import bigframes.session._io.bigquery
2527

2628

29+
@pytest.fixture
30+
def large_pd_df():
31+
nrows = 1000000
32+
33+
np_int1 = np.random.randint(0, 1000, size=nrows, dtype=np.int32)
34+
np_int2 = np.random.randint(10000, 20000, size=nrows, dtype=np.int64)
35+
np_bool = np.random.choice([True, False], size=nrows)
36+
np_float1 = np.random.rand(nrows).astype(np.float32)
37+
np_float2 = np.random.normal(loc=50.0, scale=10.0, size=nrows).astype(np.float64)
38+
39+
return pd.DataFrame(
40+
{
41+
"int_col_1": np_int1,
42+
"int_col_2": np_int2,
43+
"bool_col": np_bool,
44+
"float_col_1": np_float1,
45+
"float_col_2": np_float2,
46+
}
47+
)
48+
49+
50+
@pytest.mark.parametrize(
51+
("write_engine"),
52+
[
53+
("bigquery_load"),
54+
("bigquery_streaming"),
55+
("bigquery_write"),
56+
],
57+
)
58+
def test_read_pandas_large_df(session, large_pd_df, write_engine: str):
59+
df = session.read_pandas(large_pd_df, write_engine=write_engine)
60+
assert len(df.peek(5)) == 5
61+
assert len(large_pd_df) == 1000000
62+
63+
2764
def test_close(session: bigframes.Session):
2865
# we will create two tables and confirm that they are deleted
2966
# when the session is closed

0 commit comments

Comments
 (0)