Skip to content

Commit 8469db5

Browse files
authored
Merge branch 'main' into udf-type
2 parents 5163128 + 51057fc commit 8469db5

File tree

6 files changed

+89
-40
lines changed

6 files changed

+89
-40
lines changed

bigframes/pandas/io/api.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import functools
1818
import inspect
19+
import os
1920
import threading
2021
import typing
2122
from typing import (
@@ -56,6 +57,7 @@
5657
from bigframes.session import dry_runs
5758
import bigframes.session._io.bigquery
5859
import bigframes.session.clients
60+
import bigframes.session.metrics
5961

6062
# Note: the following methods are duplicated from Session. This duplication
6163
# enables the following:
@@ -625,6 +627,11 @@ def _get_bqclient() -> bigquery.Client:
625627

626628
def _dry_run(query, bqclient) -> bigquery.QueryJob:
627629
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
630+
631+
# Fix for b/435183833. Log metrics even if a Session isn't available.
632+
if bigframes.session.metrics.LOGGING_NAME_ENV_VAR in os.environ:
633+
metrics = bigframes.session.metrics.ExecutionMetrics()
634+
metrics.count_job_stats(job)
628635
return job
629636

630637

bigframes/session/metrics.py

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -40,32 +40,54 @@ def count_job_stats(
4040
):
4141
if query_job is None:
4242
assert row_iterator is not None
43-
total_bytes_processed = getattr(row_iterator, "total_bytes_processed", None)
44-
query = getattr(row_iterator, "query", None)
45-
if total_bytes_processed is None or query is None:
46-
return
43+
44+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
45+
bytes_processed = getattr(row_iterator, "total_bytes_processed", 0)
46+
query_char_count = len(getattr(row_iterator, "query", ""))
47+
slot_millis = getattr(row_iterator, "slot_millis", 0)
48+
exec_seconds = 0.0
4749

4850
self.execution_count += 1
49-
self.query_char_count += len(query)
50-
self.bytes_processed += total_bytes_processed
51-
write_stats_to_disk(len(query), total_bytes_processed)
52-
return
51+
self.query_char_count += query_char_count
52+
self.bytes_processed += bytes_processed
53+
self.slot_millis += slot_millis
54+
55+
elif query_job.configuration.dry_run:
56+
query_char_count = len(query_job.query)
5357

54-
if query_job.configuration.dry_run:
55-
write_stats_to_disk(len(query_job.query), 0, 0, 0)
58+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
59+
bytes_processed = 0
60+
slot_millis = 0
61+
exec_seconds = 0.0
5662

57-
stats = get_performance_stats(query_job)
58-
if stats is not None:
59-
query_char_count, bytes_processed, slot_millis, execution_secs = stats
63+
elif (stats := get_performance_stats(query_job)) is not None:
64+
query_char_count, bytes_processed, slot_millis, exec_seconds = stats
6065
self.execution_count += 1
6166
self.query_char_count += query_char_count
6267
self.bytes_processed += bytes_processed
6368
self.slot_millis += slot_millis
64-
self.execution_secs += execution_secs
69+
self.execution_secs += exec_seconds
6570
write_stats_to_disk(
66-
query_char_count, bytes_processed, slot_millis, execution_secs
71+
query_char_count=query_char_count,
72+
bytes_processed=bytes_processed,
73+
slot_millis=slot_millis,
74+
exec_seconds=exec_seconds,
6775
)
6876

77+
else:
78+
# TODO(tswast): Pass None after making benchmark publishing robust to missing data.
79+
bytes_processed = 0
80+
query_char_count = 0
81+
slot_millis = 0
82+
exec_seconds = 0
83+
84+
write_stats_to_disk(
85+
query_char_count=query_char_count,
86+
bytes_processed=bytes_processed,
87+
slot_millis=slot_millis,
88+
exec_seconds=exec_seconds,
89+
)
90+
6991

7092
def get_performance_stats(
7193
query_job: bigquery.QueryJob,
@@ -103,10 +125,11 @@ def get_performance_stats(
103125

104126

105127
def write_stats_to_disk(
128+
*,
106129
query_char_count: int,
107130
bytes_processed: int,
108-
slot_millis: Optional[int] = None,
109-
exec_seconds: Optional[float] = None,
131+
slot_millis: int,
132+
exec_seconds: float,
110133
):
111134
"""For pytest runs only, log information about the query job
112135
to a file in order to create a performance report.
@@ -118,18 +141,17 @@ def write_stats_to_disk(
118141
test_name = os.environ[LOGGING_NAME_ENV_VAR]
119142
current_directory = os.getcwd()
120143

121-
if (slot_millis is not None) and (exec_seconds is not None):
122-
# store slot milliseconds
123-
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
124-
with open(slot_file, "a") as f:
125-
f.write(str(slot_millis) + "\n")
144+
# store slot milliseconds
145+
slot_file = os.path.join(current_directory, test_name + ".slotmillis")
146+
with open(slot_file, "a") as f:
147+
f.write(str(slot_millis) + "\n")
126148

127-
# store execution time seconds
128-
exec_time_file = os.path.join(
129-
current_directory, test_name + ".bq_exec_time_seconds"
130-
)
131-
with open(exec_time_file, "a") as f:
132-
f.write(str(exec_seconds) + "\n")
149+
# store execution time seconds
150+
exec_time_file = os.path.join(
151+
current_directory, test_name + ".bq_exec_time_seconds"
152+
)
153+
with open(exec_time_file, "a") as f:
154+
f.write(str(exec_seconds) + "\n")
133155

134156
# store length of query
135157
query_char_count_file = os.path.join(

samples/snippets/conftest.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
from typing import Iterator
15+
from typing import Generator, Iterator
1616

17-
from google.cloud import bigquery
17+
from google.cloud import bigquery, storage
1818
import pytest
1919
import test_utils.prefixer
2020

@@ -42,11 +42,27 @@ def bigquery_client() -> bigquery.Client:
4242
return bigquery_client
4343

4444

45+
@pytest.fixture(scope="session")
46+
def storage_client(project_id: str) -> storage.Client:
47+
return storage.Client(project=project_id)
48+
49+
4550
@pytest.fixture(scope="session")
4651
def project_id(bigquery_client: bigquery.Client) -> str:
4752
return bigquery_client.project
4853

4954

55+
@pytest.fixture(scope="session")
56+
def gcs_bucket(storage_client: storage.Client) -> Generator[str, None, None]:
57+
bucket_name = "bigframes_blob_test_with_data_wipeout"
58+
59+
yield bucket_name
60+
61+
bucket = storage_client.get_bucket(bucket_name)
62+
for blob in bucket.list_blobs():
63+
blob.delete()
64+
65+
5066
@pytest.fixture(autouse=True)
5167
def reset_session() -> None:
5268
"""An autouse fixture ensuring each sample runs in a fresh session.
@@ -78,11 +94,6 @@ def dataset_id_eu(bigquery_client: bigquery.Client, project_id: str) -> Iterator
7894
bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True)
7995

8096

81-
@pytest.fixture(scope="session")
82-
def gcs_dst_bucket() -> str:
83-
return "gs://bigframes_blob_test"
84-
85-
8697
@pytest.fixture
8798
def random_model_id(
8899
bigquery_client: bigquery.Client, project_id: str, dataset_id: str

samples/snippets/multimodal_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@
1313
# limitations under the License.
1414

1515

16-
def test_multimodal_dataframe(gcs_dst_bucket: str) -> None:
16+
def test_multimodal_dataframe(gcs_bucket: str) -> None:
1717
# destination folder must be in a GCS bucket that the BQ connection service account (default or user provided) has write access to.
18-
dst_bucket = gcs_dst_bucket
18+
dst_bucket = f"gs://{gcs_bucket}"
1919
# [START bigquery_dataframes_multimodal_dataframe_create]
2020
import bigframes
2121

samples/snippets/sessions_and_io_test.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,11 @@
1313
# limitations under the License.
1414

1515

16-
def test_sessions_and_io(project_id: str, dataset_id: str) -> None:
16+
def test_sessions_and_io(project_id: str, dataset_id: str, gcs_bucket: str) -> None:
1717
YOUR_PROJECT_ID = project_id
1818
YOUR_DATASET_ID = dataset_id
1919
YOUR_LOCATION = "us"
20+
YOUR_BUCKET = gcs_bucket
2021

2122
# [START bigquery_dataframes_create_and_use_session_instance]
2223
import bigframes
@@ -139,6 +140,15 @@ def test_sessions_and_io(project_id: str, dataset_id: str) -> None:
139140
# [END bigquery_dataframes_read_data_from_csv]
140141
assert df is not None
141142

143+
# [START bigquery_dataframes_write_data_to_csv]
144+
import bigframes.pandas as bpd
145+
146+
df = bpd.DataFrame({"my_col": [1, 2, 3]})
147+
# Write a dataframe to a CSV file in GCS
148+
df.to_csv(f"gs://{YOUR_BUCKET}/myfile*.csv")
149+
# [END bigquery_dataframes_write_data_to_csv]
150+
assert df is not None
151+
142152
# [START bigquery_dataframes_read_data_from_bigquery_table]
143153
import bigframes.pandas as bpd
144154

tests/benchmark/read_gbq_colab/aggregate_output.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def aggregate_output(*, project_id, dataset_id, table_id):
4848

4949

5050
if __name__ == "__main__":
51-
config = utils.get_configuration(include_table_id=True)
51+
config = utils.get_configuration(include_table_id=True, start_session=False)
5252
current_path = pathlib.Path(__file__).absolute()
5353

5454
utils.get_execution_time(
@@ -58,5 +58,4 @@ def aggregate_output(*, project_id, dataset_id, table_id):
5858
project_id=config.project_id,
5959
dataset_id=config.dataset_id,
6060
table_id=config.table_id,
61-
session=config.session,
6261
)

0 commit comments

Comments
 (0)