Skip to content
Draft

Bench #1697

Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5d58213
local files for benchmarking
chandra-siri Dec 3, 2025
c797586
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Dec 23, 2025
20d2d2d
add test_reads.py for microbenchmarking reads
chandra-siri Dec 27, 2025
f493bd8
push local files
chandra-siri Dec 27, 2025
68c8ba0
1p 1c working copy
chandra-siri Dec 28, 2025
9e2afa8
Add microbenchmarking tests and utility functions for performance ana…
chandra-siri Dec 28, 2025
3ffc98d
Update microbenchmark configuration and tests for improved performanc…
chandra-siri Dec 28, 2025
bef9dcb
upload local changes
chandra-siri Dec 29, 2025
75007a7
just upload one
chandra-siri Dec 30, 2025
a85fff1
Refactor get_persisted_size_async to improve logging and update get_p…
chandra-siri Dec 31, 2025
4c24f66
working copy
chandra-siri Jan 2, 2026
e216644
add regional tests
chandra-siri Jan 3, 2026
80120a1
Add JSON to CSV conversion script and update benchmark tests for mult…
chandra-siri Jan 3, 2026
99bc3eb
Refactor benchmark configuration and cleanup unused code in test_read…
chandra-siri Jan 3, 2026
f4a622b
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Jan 3, 2026
af98e0e
Implement write benchmarks
chandra-siri Jan 3, 2026
1405e92
Merge branch 'main' of github.com:googleapis/python-storage into bench
chandra-siri Jan 4, 2026
3c7e7af
Merge branch 'bench' of github.com:googleapis/python-storage into bench
chandra-siri Jan 4, 2026
970b162
working copy
chandra-siri Jan 8, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions benchmarks/async_tasks_downloade_mp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import asyncio
import time
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool


from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

BUCKET_NAME = "chandrasiri-rs"
OBJECT_SIZE = 100 * 1024 * 1024


async def download_object_async(bucket_name, object_name, client=None):
"""Downloads a single object."""
if client is None:
client = AsyncGrpcClient().grpc_client

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()
buffer = BytesIO()
await mrd.download_ranges(read_ranges=[(0, 0, buffer)])
await mrd.close()

assert buffer.getbuffer().nbytes == OBJECT_SIZE

# Save the downloaded object to a local file
# with open(object_name, "wb") as f:
# f.write(buffer.getvalue())

print(f"Finished downloading {object_name}")


async def download_objects_pool(start_obj_num, end_obj_num):
""" """
print(f"starting for {start_obj_num}, {end_obj_num}")

client = AsyncGrpcClient().grpc_client
tasks = []
pool_start_time = time.monotonic_ns()
for obj_num in range(start_obj_num, end_obj_num):
tasks.append(
asyncio.create_task(
download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client)
)
)

await asyncio.gather(*tasks)
pool_end_time = time.monotonic_ns()
print(
f"for {start_obj_num} , {end_obj_num}, {end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s"
)


def async_runner(start_obj_num, end_obj_num):
asyncio.run(download_objects_pool(start_obj_num, end_obj_num))


def main():
num_object = 3000
process_count = 60
objects_per_process = num_object // process_count # 150
args = []
start_obj_num = 0
for _ in range(process_count):
args.append((start_obj_num, start_obj_num + objects_per_process))
start_obj_num += objects_per_process
# print(f"start {process_count} proc")
start_time_proc = time.monotonic_ns()
print(args, len(args))

with Pool(processes=process_count) as pool:
results = pool.starmap(async_runner, args)
end_time_proc = time.monotonic_ns()

print(
f"TOTAL: bytes - {num_object*OBJECT_SIZE}, time: {(end_time_proc - start_time_proc) / (10**9)}s"
)
print(
f"Throuput: {num_object*OBJECT_SIZE /((end_time_proc - start_time_proc) / (10**9))*10**-6} MBps"
)


if __name__ == "__main__":
main()
70 changes: 70 additions & 0 deletions benchmarks/async_tasks_downloader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import asyncio
import time
from io import BytesIO
from concurrent.futures import ThreadPoolExecutor

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)

BUCKET_NAME = "chandrasiri-rs"
OBJECT_SIZE = 100 * 1024 * 1024


async def download_object_async(bucket_name, object_name, client=None):
"""Downloads a single object."""
if client is None:
client = AsyncGrpcClient().grpc_client

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()
buffer = BytesIO()
await mrd.download_ranges(read_ranges=[(0, 0, buffer)])
await mrd.close()

assert buffer.getbuffer().nbytes == OBJECT_SIZE

# Save the downloaded object to a local file
# with open(object_name, "wb") as f:
# f.write(buffer.getvalue())

# print(f"Finished downloading {object_name}")


async def download_objects_pool(start_obj_num, end_obj_num):
""" """

client = AsyncGrpcClient().grpc_client
tasks = []
pool_start_time = time.monotonic_ns()
for obj_num in range(start_obj_num, end_obj_num):
tasks.append(
asyncio.create_task(
download_object_async(BUCKET_NAME, f"para_64-{obj_num}", client=client)
)
)

await asyncio.gather(*tasks)
pool_end_time = time.monotonic_ns()
print(
f"{end_obj_num - start_obj_num} tasks done! in {(pool_end_time - pool_start_time) / (10**9)}s"
)


async def main():
"""Main function to orchestrate parallel downloads using threads."""
num_objects = 1000
pool_size = 100
start_time = time.monotonic_ns()

for i in range(0, num_objects, pool_size):
await download_objects_pool(i, i + pool_size)
end_time = time.monotonic_ns()
print(
f"FINSHED: total bytes downloaded - {num_objects*OBJECT_SIZE} in time {(end_time - start_time) / (10**9)}s"
)


if __name__ == "__main__":
asyncio.run(main())
181 changes: 181 additions & 0 deletions benchmarks/download_one_object_using_n_streams.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
import argparse
import asyncio
from io import BytesIO
import os
import time
import threading

from google.cloud.storage._experimental.asyncio.async_grpc_client import AsyncGrpcClient
from google.cloud.storage._experimental.asyncio.async_multi_range_downloader import (
AsyncMultiRangeDownloader,
)


async def download_range_async(
client, bucket_name, object_name, start_byte, end_byte, chunk_size
):
"""
Downloads a specific byte range of an object.
This is a modified version of the original download_one_async, adapted to
download a portion of an object.
"""
download_size = end_byte - start_byte
print(
f"Downloading {object_name} from byte {start_byte} to {end_byte} (size {download_size}) in chunks of {chunk_size} from {bucket_name} from process {os.getpid()} and thread {threading.get_ident()}"
)

mrd = AsyncMultiRangeDownloader(client, bucket_name, object_name)
await mrd.open()

offset = 0
output_buffer = BytesIO()

start_time = time.perf_counter()
while offset < download_size:
bytes_to_download = min(chunk_size, download_size - offset)
await mrd.download_ranges(
[(start_byte + offset, bytes_to_download, output_buffer)]
)
offset += bytes_to_download
end_time = time.perf_counter()

elapsed_time = end_time - start_time
throughput_mbs = (
(download_size / elapsed_time) / (1000 * 1000) if elapsed_time > 0 else 0
)

print(f"Time taken for download loop: {elapsed_time:.4f} seconds")
print(f"Throughput for this range: {throughput_mbs:.2f} MB/s")

assert (
output_buffer.getbuffer().nbytes == download_size
), f"downloaded size incorrect for portion of {object_name}"

await mrd.close()
return output_buffer


async def download_one_object_with_n_streams_async(
bucket_name, object_name, download_size, chunk_size, num_streams
):
"""
Downloads a single object using 'n' concurrent streams.
It divides the object into 'n' parts and creates an async task to download each part.
"""
print(
f"Downloading {object_name} of size {download_size} from {bucket_name} using {num_streams} streams."
)

# Create one client to be shared by all download tasks.
client = AsyncGrpcClient().grpc_client

tasks = []

# Calculate the byte range for each stream.
portion_size = download_size // num_streams

for i in range(num_streams):
start = i * portion_size
end = start + portion_size
if i == num_streams - 1:
# The last stream downloads any remaining bytes.
end = download_size

task = asyncio.create_task(
download_range_async(
client, bucket_name, object_name, start, end, chunk_size
)
)
tasks.append(task)

# Wait for all download tasks to complete.
downloaded_parts = await asyncio.gather(*tasks)

# Stitch the downloaded parts together in the correct order.
final_buffer = BytesIO()
for part in downloaded_parts:
final_buffer.write(part.getbuffer())

# Verify the final size.
final_size = final_buffer.getbuffer().nbytes
assert (
final_size == download_size
), f"Downloaded size incorrect for {object_name}. Expected {download_size}, got {final_size}"
print(f"Successfully downloaded {object_name} with size {final_size}")


def main():
parser = argparse.ArgumentParser(
description="Download a single GCS object using multiple concurrent streams."
)
parser.add_argument("--bucket_name", type=str, default="chandrasiri-rs")
parser.add_argument(
"--download_size", type=int, default=1024 * 1024 * 1024
) # 1 GiB
parser.add_argument(
"--chunk_size", type=int, default=64 * 1024 * 1024
) # 64 MiB
parser.add_argument(
"--count",
type=int,
default=1,
help="Number of times to run the download (for benchmarking).",
)
parser.add_argument(
"--start_object_num",
type=int,
default=0,
help="The number of the object to download (e.g., py-sdk-mb-mt-{start_object_num}).",
)
parser.add_argument(
"-n",
"--num_workers",
type=int,
default=10,
help="Number of streams to use for downloading.",
)
args = parser.parse_args()

total_start_time = time.perf_counter()

object_name = f"py-sdk-mb-mt-{args.start_object_num}"

for i in range(args.count):
print(f"\n--- Starting download run {i+1}/{args.count} ---")
run_start_time = time.perf_counter()

asyncio.run(
download_one_object_with_n_streams_async(
args.bucket_name,
object_name,
args.download_size,
args.chunk_size,
args.num_workers,
)
)

run_end_time = time.perf_counter()
run_latency = run_end_time - run_start_time
run_throughput = (args.download_size / run_latency) / (1000 * 1000)
print(f"Run {i+1} throughput: {run_throughput:.2f} MB/s")

total_end_time = time.perf_counter()
total_latency = total_end_time - total_start_time
total_downloaded_bytes = args.download_size * args.count
aggregate_throughput = (total_downloaded_bytes / total_latency) / (
1000 * 1000
) # MB/s

print("\n--- Aggregate Results ---")
print(f"Total download runs: {args.count}")
print(f"Object name: {object_name}")
print(
f"Total data downloaded: {total_downloaded_bytes / (1024*1024*1024):.2f} GiB"
)
print(f"Total time taken: {total_latency:.2f} seconds")
print(f"Aggregate throughput: {aggregate_throughput:.2f} MB/s")
print(f"Number of streams used per download: {args.num_workers}")


if __name__ == "__main__":
main()
Loading