|
19 | 19 | import pathlib |
20 | 20 | import subprocess |
21 | 21 | import sys |
| 22 | +import tempfile |
22 | 23 | from typing import Dict, List, Union |
23 | 24 |
|
24 | 25 | import numpy as np |
@@ -50,7 +51,7 @@ def run_benchmark_subprocess(args, log_env_name_var, filename=None, region=None) |
50 | 51 | subprocess.run(args, env=env, check=True) |
51 | 52 |
|
52 | 53 |
|
53 | | -def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame: |
| 54 | +def collect_benchmark_result(benchmark_path: str, iterations: int) -> pd.DataFrame: |
54 | 55 | """Generate a DataFrame report on HTTP queries, bytes processed, slot time and execution time from log files.""" |
55 | 56 | path = pathlib.Path(benchmark_path) |
56 | 57 | try: |
@@ -100,28 +101,23 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame: |
100 | 101 |
|
101 | 102 | with open(bytes_file, "r") as file: |
102 | 103 | lines = file.read().splitlines() |
103 | | - query_count = len(lines) |
104 | | - total_bytes = sum(int(line) for line in lines) |
| 104 | + query_count = len(lines) / iterations |
| 105 | + total_bytes = sum(int(line) for line in lines) / iterations |
105 | 106 |
|
106 | 107 | with open(millis_file, "r") as file: |
107 | 108 | lines = file.read().splitlines() |
108 | | - total_slot_millis = sum(int(line) for line in lines) |
| 109 | + total_slot_millis = sum(int(line) for line in lines) / iterations |
109 | 110 |
|
110 | 111 | if has_local_seconds: |
111 | | - # 'local_seconds' captures the total execution time for a benchmark as it |
112 | | - # starts timing immediately before the benchmark code begins and stops |
113 | | - # immediately after it ends. Unlike other metrics that might accumulate |
114 | | - # values proportional to the number of queries executed, 'local_seconds' is |
115 | | - # a singular measure of the time taken for the complete execution of the |
116 | | - # benchmark, from start to finish. |
117 | 112 | with open(local_seconds_file, "r") as file: |
118 | | - local_seconds = float(file.readline().strip()) |
| 113 | + lines = file.read().splitlines() |
| 114 | + local_seconds = sum(float(line) for line in lines) / iterations |
119 | 115 | else: |
120 | 116 | local_seconds = None |
121 | 117 |
|
122 | 118 | with open(bq_seconds_file, "r") as file: |
123 | 119 | lines = file.read().splitlines() |
124 | | - bq_seconds = sum(float(line) for line in lines) |
| 120 | + bq_seconds = sum(float(line) for line in lines) / iterations |
125 | 121 |
|
126 | 122 | results_dict[str(filename)] = [ |
127 | 123 | query_count, |
@@ -154,7 +150,12 @@ def collect_benchmark_result(benchmark_path: str) -> pd.DataFrame: |
154 | 150 | columns=columns, |
155 | 151 | ) |
156 | 152 |
|
157 | | - print("---BIGQUERY USAGE REPORT---") |
| 153 | + report_title = ( |
| 154 | + "---BIGQUERY USAGE REPORT---" |
| 155 | + if iterations == 1 |
| 156 | + else f"---BIGQUERY USAGE REPORT (Averages over {iterations} Iterations)---" |
| 157 | + ) |
| 158 | + print(report_title) |
158 | 159 | for index, row in benchmark_metrics.iterrows(): |
159 | 160 | formatted_local_exec_time = ( |
160 | 161 | f"{round(row['Local_Execution_Time_Sec'], 1)} seconds" |
@@ -259,32 +260,53 @@ def find_config(start_path): |
259 | 260 | return None |
260 | 261 |
|
261 | 262 |
|
262 | | -def run_benchmark_from_config(benchmark: str): |
| 263 | +def publish_to_bigquery(dataframe, notebook, project_name="bigframes-metrics"): |
| 264 | + bigquery_table = ( |
| 265 | + f"{project_name}.benchmark_report.notebook_benchmark" |
| 266 | + if notebook |
| 267 | + else f"{project_name}.benchmark_report.benchmark" |
| 268 | + ) |
| 269 | + |
| 270 | + repo_status = get_repository_status() |
| 271 | + for idx, col in enumerate(repo_status.keys()): |
| 272 | + dataframe.insert(idx, col, repo_status[col]) |
| 273 | + |
| 274 | + pandas_gbq.to_gbq( |
| 275 | + dataframe=dataframe, |
| 276 | + destination_table=bigquery_table, |
| 277 | + if_exists="append", |
| 278 | + ) |
| 279 | + print(f"Results have been successfully uploaded to {bigquery_table}.") |
| 280 | + |
| 281 | + |
| 282 | +def run_benchmark_from_config(benchmark: str, iterations: int): |
263 | 283 | print(benchmark) |
264 | 284 | config_path = find_config(benchmark) |
265 | 285 |
|
266 | 286 | if config_path: |
267 | 287 | benchmark_configs = [] |
268 | 288 | with open(config_path, "r") as f: |
269 | 289 | for line in f: |
270 | | - config = json.loads(line) |
271 | | - python_args = [f"--{key}={value}" for key, value in config.items()] |
272 | | - suffix = ( |
273 | | - config["benchmark_suffix"] |
274 | | - if "benchmark_suffix" in config |
275 | | - else "_".join(f"{key}_{value}" for key, value in config.items()) |
276 | | - ) |
277 | | - benchmark_configs.append((suffix, python_args)) |
| 290 | + if line.strip(): |
| 291 | + config = json.loads(line) |
| 292 | + python_args = [f"--{key}={value}" for key, value in config.items()] |
| 293 | + suffix = ( |
| 294 | + config["benchmark_suffix"] |
| 295 | + if "benchmark_suffix" in config |
| 296 | + else "_".join(f"{key}_{value}" for key, value in config.items()) |
| 297 | + ) |
| 298 | + benchmark_configs.append((suffix, python_args)) |
278 | 299 | else: |
279 | 300 | benchmark_configs = [(None, [])] |
280 | 301 |
|
281 | | - for benchmark_config in benchmark_configs: |
282 | | - args = ["python", str(benchmark)] |
283 | | - args.extend(benchmark_config[1]) |
284 | | - log_env_name_var = str(benchmark) |
285 | | - if benchmark_config[0] is not None: |
286 | | - log_env_name_var += f"_{benchmark_config[0]}" |
287 | | - run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var) |
| 302 | + for _ in range(iterations): |
| 303 | + for benchmark_config in benchmark_configs: |
| 304 | + args = ["python", str(benchmark)] |
| 305 | + args.extend(benchmark_config[1]) |
| 306 | + log_env_name_var = str(benchmark) |
| 307 | + if benchmark_config[0] is not None: |
| 308 | + log_env_name_var += f"_{benchmark_config[0]}" |
| 309 | + run_benchmark_subprocess(args=args, log_env_name_var=log_env_name_var) |
288 | 310 |
|
289 | 311 |
|
290 | 312 | def run_notebook_benchmark(benchmark_file: str, region: str): |
@@ -341,35 +363,59 @@ def parse_arguments(): |
341 | 363 | help="Set the benchmarks to be published to BigQuery.", |
342 | 364 | ) |
343 | 365 |
|
| 366 | + parser.add_argument( |
| 367 | + "--iterations", |
| 368 | + type=int, |
| 369 | + default=1, |
| 370 | + help="Number of iterations to run each benchmark.", |
| 371 | + ) |
| 372 | + parser.add_argument( |
| 373 | + "--output-csv", |
| 374 | + type=str, |
| 375 | + default=None, |
| 376 | + help="Determines whether to output results to a CSV file. If no location is provided, a temporary location is automatically generated.", |
| 377 | + ) |
| 378 | + |
344 | 379 | return parser.parse_args() |
345 | 380 |
|
346 | 381 |
|
347 | 382 | def main(): |
348 | 383 | args = parse_arguments() |
349 | 384 |
|
350 | 385 | if args.publish_benchmarks: |
351 | | - bigquery_table = ( |
352 | | - "bigframes-metrics.benchmark_report.notebook_benchmark" |
353 | | - if args.notebook |
354 | | - else "bigframes-metrics.benchmark_report.benchmark" |
| 386 | + benchmark_metrics = collect_benchmark_result( |
| 387 | + args.publish_benchmarks, args.iterations |
355 | 388 | ) |
356 | | - benchmark_metrics = collect_benchmark_result(args.publish_benchmarks) |
357 | | - |
358 | | - if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true": |
359 | | - repo_status = get_repository_status() |
360 | | - for idx, col in enumerate(repo_status.keys()): |
361 | | - benchmark_metrics.insert(idx, col, repo_status[col]) |
362 | | - |
363 | | - pandas_gbq.to_gbq( |
364 | | - dataframe=benchmark_metrics, |
365 | | - destination_table=bigquery_table, |
366 | | - if_exists="append", |
| 389 | + # Output results to CSV without specifying a location |
| 390 | + if args.output_csv == "True": |
| 391 | + current_time = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") |
| 392 | + temp_file = tempfile.NamedTemporaryFile( |
| 393 | + prefix=f"benchmark_{current_time}_", delete=False, suffix=".csv" |
367 | 394 | ) |
368 | | - print("Results have been successfully uploaded to BigQuery.") |
| 395 | + benchmark_metrics.to_csv(temp_file.name, index=False) |
| 396 | + print( |
| 397 | + f"Benchmark result is saved to a temporary location: {temp_file.name}" |
| 398 | + ) |
| 399 | + temp_file.close() |
| 400 | + # Output results to CSV with specified a custom location |
| 401 | + elif args.output_csv != "False": |
| 402 | + benchmark_metrics.to_csv(args.output_csv, index=False) |
| 403 | + print(f"Benchmark result is saved to: {args.output_csv}") |
| 404 | + |
| 405 | + # Publish the benchmark metrics to BigQuery under the 'bigframes-metrics' project. |
| 406 | + # The 'BENCHMARK_AND_PUBLISH' environment variable should be set to 'true' only |
| 407 | + # in specific Kokoro sessions. |
| 408 | + if os.getenv("BENCHMARK_AND_PUBLISH", "false") == "true": |
| 409 | + publish_to_bigquery(benchmark_metrics, args.notebook) |
| 410 | + # If the 'GCLOUD_BENCH_PUBLISH_PROJECT' environment variable is set, publish the |
| 411 | + # benchmark metrics to a specified BigQuery table in the provided project. This is |
| 412 | + # intended for local testing where the default behavior is not to publish results. |
| 413 | + elif project := os.getenv("GCLOUD_BENCH_PUBLISH_PROJECT", ""): |
| 414 | + publish_to_bigquery(benchmark_metrics, args.notebook, project) |
369 | 415 | elif args.notebook: |
370 | 416 | run_notebook_benchmark(args.benchmark_path, args.region) |
371 | 417 | else: |
372 | | - run_benchmark_from_config(args.benchmark_path) |
| 418 | + run_benchmark_from_config(args.benchmark_path, args.iterations) |
373 | 419 |
|
374 | 420 |
|
375 | 421 | if __name__ == "__main__": |
|
0 commit comments