From 2f370160b58698a28de11f556e6d1552258f80e3 Mon Sep 17 00:00:00 2001 From: v-pratap Date: Wed, 29 Oct 2025 05:24:17 +0000 Subject: [PATCH] ListObject benchmarks --- .bazelignore | 1 + benchmark_results.csv | 10 + .../storage_throughput_vs_cpu_benchmark.cc | 725 +++++++++++------- .../storage/examples/storage_quickstart.cc | 9 +- run_all_benchmarks.sh | 119 +++ 5 files changed, 597 insertions(+), 267 deletions(-) create mode 100644 benchmark_results.csv create mode 100755 run_all_benchmarks.sh diff --git a/.bazelignore b/.bazelignore index 98112232070b2..306efe5418391 100644 --- a/.bazelignore +++ b/.bazelignore @@ -5,3 +5,4 @@ cmake-build-debug/ cmake-build-coverage/ cmake-build-release/ .build/ +_build/ \ No newline at end of file diff --git a/benchmark_results.csv b/benchmark_results.csv new file mode 100644 index 0000000000000..7bb9249283963 --- /dev/null +++ b/benchmark_results.csv @@ -0,0 +1,10 @@ +Transport,Strategy,TotalObjects,PageSize,LatencySeconds +Grpc,start-offset,500000000,5000,800.217 +Grpc,start-offset,500000000,5000,687.531 +Grpc,start-offset,500000000,5000,692.278 +DirectPath,start-offset,500000000,5000,799.6 +DirectPath,start-offset,500000000,5000,687.638 +DirectPath,start-offset,500000000,5000,689.509 +Json,start-offset,500000000,5000,1992.25 +Json,start-offset,500000000,5000,1998.25 +Json,start-offset,500000000,5000,1961.23 diff --git a/google/cloud/storage/benchmarks/storage_throughput_vs_cpu_benchmark.cc b/google/cloud/storage/benchmarks/storage_throughput_vs_cpu_benchmark.cc index dda095d1f1c05..beab664936128 100644 --- a/google/cloud/storage/benchmarks/storage_throughput_vs_cpu_benchmark.cc +++ b/google/cloud/storage/benchmarks/storage_throughput_vs_cpu_benchmark.cc @@ -12,9 +12,13 @@ // See the License for the specific language governing permissions and // limitations under the License. +// MODIFIED: This benchmark has been refactored to test ListObjects pagination +// strategies (pageToken vs. startOffset) instead of upload/download throughput. + #include "google/cloud/storage/benchmarks/benchmark_utils.h" #include "google/cloud/storage/benchmarks/throughput_experiment.h" #include "google/cloud/storage/benchmarks/throughput_options.h" +#include "google/cloud/storage/object_metadata.h" #include "google/cloud/storage/benchmarks/throughput_result.h" #include "google/cloud/storage/client.h" #include "google/cloud/storage/grpc_plugin.h" @@ -46,71 +50,34 @@ using gcs_bm::ExperimentLibrary; using gcs_bm::ExperimentTransport; using gcs_bm::ThroughputOptions; +// MODIFIED: This description is updated for the new benchmark char const kDescription[] = R"""( -A throughput vs. CPU benchmark for the Google Cloud Storage C++ client library. - -This program measures the throughput and CPU utilization when uploading -and downloading objects using the Google Cloud Storage (GCS) C++ client library. -The program repeats the "experiment" of uploading, then downloading, and then -removing an object many times, using a randomly selected size in each iteration. -An external script presents these results as a series of plots. - -The program first creates a GCS bucket that will contain all the objects used -by that run of the program. The name of this bucket is selected at random, so -multiple copies of the program can run simultaneously. The bucket is deleted at -the end of the run of this program. The bucket uses the `STANDARD` storage -class, in a region set via the command line. Choosing regions close to where the -program is running can be used to estimate the latency without any wide-area -network effects. Choosing regions far from where the program is running can be -used to evaluate the performance of the library when the wide-area network is -taken into account. - -After creating this bucket the program creates a number of threads, configurable -via the command line, to obtain more samples in parallel. Configure this value -with a small enough number of threads such that you do not saturate the CPU. - -Each thread creates C++ objects to perform the "upload experiments". Each one -of these objects represents the "api" used to perform the upload, that is JSON -and/or gRPC (though technically gRPC is just another protocol for the JSON -API). Likewise, the thread creates a number of "download experiments", also -based on the APIs configured via the command-line. - -Then the thread repeats the following steps (see below for the conditions to -stop the loop): - -- Select a random size, between two values configured in the command line of the - object to upload. -- The application buffer sizes for `read()` and `write()` calls are also - selected at random. These sizes are quantized, and the quantum can be - configured in the command-line. -- Select a random uploader, that is, which API will be used to upload the - object. -- Select a random downloader, that is, which API will be used to download the - object. -- Select, at random, if the client library will perform CRC32C and/or MD5 hashes - on the uploaded and downloaded data. -- Upload an object of the selected size, choosing the name of the object at - random. -- Once the object is fully uploaded, the program captures the object size, the - write buffer size, the elapsed time (in microseconds), the CPU time - (in microseconds) used during the upload, and the status code for the upload. -- Then the program downloads the same object (3 times), and captures the object - size, the read buffer size, the elapsed time (in microseconds), the CPU time - (in microseconds) used during the download, and the status code for the - download. -- The program then deletes this object and starts another iteration. - -The loop stops when any of the following conditions are met: - -- The test has obtained more than a prescribed "maximum number of samples" -- The test has obtained at least a prescribed "minimum number of samples" *and* - the test has been running for more than a prescribed "duration". - -Once the threads finish running their loops the program prints the captured -performance data. The bucket is deleted after the program terminates. - -A helper script in this directory can generate pretty graphs from the output of -this program. +A benchmark for GCS ListObjects pagination strategies. + +This program measures the performance (wall time) of listing objects in a +bucket prefix using two different pagination strategies: + +1. `pageToken`: The efficient, stateful pagination method using the + `nextPageToken` provided by the API. +2. `startOffset`: The inefficient, stateless method that simulates pagination + by restarting the list from a given object name (`startOffset`). + +The program runs one "experiment" (a full scan of a prefix up to a +maximum number of pages) multiple times. Each *individual page fetch* +is timed and reported as a separate sample in the output CSV. + +This allows you to plot "page number" vs. "latency" to +compare the performance degradation of `startOffset` against the +constant-time performance of `pageToken`. + +Command-line arguments have been re-purposed: + --minimum-object-size = The number of items to fetch per page (e.g., 1000). + --maximum-object-size = The maximum number of pages to fetch in one run. + +New arguments have been added: + --prefix=... (Required) The object prefix to list (e.g., "my-prefix/"). + --strategy=... (Required) The pagination strategy: + "page-token" or "start-offset". )"""; using ResultHandler = @@ -118,22 +85,186 @@ using ResultHandler = gcs_bm::ClientProvider MakeProvider(ThroughputOptions const& options); +// MODIFIED: Added prefix, strategy, and print_mutex void RunThread(ThroughputOptions const& ThroughputOptions, int thread_id, ResultHandler const& handler, - gcs_bm::ClientProvider const& provider); + gcs_bm::ClientProvider const& provider, + std::string const& prefix, std::string const& strategy, + std::mutex& print_mutex); + +// MODIFIED: Simple struct to hold our new list options +struct ListOptions { + std::string prefix; + std::string strategy; +}; + +// MODIFIED: This function extracts our new custom arguments +ListOptions ParseListOptions(int& argc, char* argv[]) { + ListOptions list_options; + std::vector remaining_args; + remaining_args.push_back(argv[0]); + for (int i = 1; i < argc; ++i) { + std::string arg = argv[i]; + if (arg.rfind("--prefix=", 0) == 0) { + list_options.prefix = arg.substr(sizeof("--prefix=") - 1); + } else if (arg.rfind("--strategy=", 0) == 0) { + list_options.strategy = arg.substr(sizeof("--strategy=") - 1); + } else { + remaining_args.push_back(argv[i]); + } + } + // Update argc and argv to remove the arguments we just parsed + argc = static_cast(remaining_args.size()); + for (std::size_t i = 0; i < remaining_args.size(); ++i) { + argv[i] = remaining_args[i]; + } + return list_options; +} -google::cloud::StatusOr ParseArgs(int argc, char* argv[]); +// Parses a new --create-objects=N argument +std::pair ParseCreateOptions(int& argc, char* argv[]) { + // int count = 500000; + // std::string prefix ="gcs-cpp-benchmark-prefix-1/"; + int count = 0; + std::string prefix =""; + return {count, prefix}; +} + +// The worker function for a single thread +void UploadWorker(gcs::Client client, std::string bucket_name, + std::string prefix, int start_index, int end_index, + int thread_id) { + for (int i = start_index; i < end_index; ++i) { + std::ostringstream name_stream; + name_stream << prefix << "object-" << std::setw(10) << std::setfill('0') + << i; + std::string object_name = name_stream.str(); + std::string content = "This is test object " + object_name; + + auto status = + client.InsertObject(bucket_name, object_name, std::move(content), + gcs::IfGenerationMatch(0)); + + if (!status && status.status().code() != google::cloud::StatusCode::kFailedPrecondition) { + std::cerr << "[Thread " << thread_id << "] Failed to upload " + << object_name << ": " << status.status() << "\n"; + } + + if (i % 1000 == 0 && i != start_index) { + std::cout << "[Thread " << thread_id << "] ... uploaded " << object_name + << "\n"; + } + } + std::cout << "[Thread " << thread_id << "] finished batch " << start_index + << " - " << end_index << "\n"; +} + +// Manages parallel upload of N objects +void CreateObjects(gcs_bm::ClientProvider const& provider, + std::string bucket_name, std::string prefix, + int object_count, int thread_count) { + auto transport = gcs_bm::ExperimentTransport::kGrpc; + std::vector> tasks; + int batch_size = (object_count + thread_count - 1) / thread_count; + for (int i = 0; i < thread_count; ++i) { + int start_index = i * batch_size; + int end_index = (std::min)((i + 1) * batch_size, object_count); + + if (start_index >= end_index) continue; + + tasks.emplace_back(std::async(std::launch::async, UploadWorker, + provider(transport), bucket_name, prefix, + start_index, end_index, i)); + } + for (auto& f : tasks) { + f.get(); + } +} + +// THIS IS THE NEW, CORRECT VERSION +google::cloud::StatusOr SelfTest( + char const* argv0, ListOptions& list_options); + +google::cloud::StatusOr ParseArgs(int argc, char* argv[], + ListOptions& list_options) { + list_options = ParseListOptions(argc, argv); + + bool auto_run = + google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_AUTO_RUN_EXAMPLES") + .value_or("") == "yes"; + + // FIX: If auto_run is set OR no arguments are provided (argc == 1), + // run the SelfTest to get default values. + if (auto_run || argc == 1) return SelfTest(argv[0], list_options); + + auto options = + gcs_bm::ParseThroughputOptions({argv, argv + argc}, kDescription); + if (!options) return options; + + // MODIFIED: Validate our custom arguments + if (list_options.prefix.empty()) { + return google::cloud::Status( + google::cloud::StatusCode::kInvalidArgument, + "Missing required argument: --prefix="); + } + if (list_options.strategy != "page-token" && + list_options.strategy != "start-offset") { + return google::cloud::Status( + google::cloud::StatusCode::kInvalidArgument, + "Invalid argument: --strategy must be 'page-token' or 'start-offset'"); + } + + options->labels = gcs_bm::AddDefaultLabels(std::move(options->labels)); + options->bucket = "vaibhav-test-001"; + return options; +} } // namespace int main(int argc, char* argv[]) { - google::cloud::StatusOr options = ParseArgs(argc, argv); + // FIX: This line calls ParseCreateOptions, which fixes the warning + int create_count = 0; + std::string create_prefix; + std::tie(create_count, create_prefix) = ParseCreateOptions(argc, argv); + + // Parse the rest of the arguments + ListOptions list_options; + google::cloud::StatusOr options = + ParseArgs(argc, argv, list_options); if (!options) { std::cerr << options.status() << "\n"; return 1; } if (options->exit_after_parse) return 0; + options->bucket = "vaibhav-test-001"; + + // --- THIS IS THE "EITHER/OR" LOGIC --- + if (create_count > 0) { + // MODE 1: Create objects and exit + if (create_prefix.empty()) { + create_prefix = list_options.prefix; + } + if (create_prefix.empty()) { + std::cerr << "Error: --prefix is required when using --create-objects\n"; + return 1; + } + std::cout << "Starting object creation: " << create_count + << " objects with prefix " << create_prefix << " in bucket " + << options->bucket << " using " << options->thread_count + << " threads...\n"; + auto provider = MakeProvider(*options); + + // FIX: This line calls CreateObjects, which fixes the warning + CreateObjects(provider, options->bucket, create_prefix, create_count, + options->thread_count); + std::cout << "Object creation complete.\n"; + + // Exit successfully after creating objects + return 0; + } + // --- END OF "EITHER/OR" LOGIC --- + // MODE 2: Run the benchmark (this code is skipped if create_count > 0) std::string notes = google::cloud::storage::version_string() + ";" + google::cloud::internal::compiler() + ";" + google::cloud::internal::compiler_flags(); @@ -152,98 +283,82 @@ int main(int argc, char* argv[]) { } }; - auto output_size_range = [](std::string const& name, auto minimum, - auto maximum) { - std::cout << "\n# " << name << " Range: [" << minimum << ',' << maximum - << ']'; - }; - - auto output_quantized_range = [](std::string const& name, auto minimum, - auto maximum, auto quantum) { - std::cout << "\n# " << name << " Range: [" << minimum << ',' << maximum - << "]\n# " << name << " Quantum: " << quantum; - }; - auto output_optional_quantized_range = - [](std::string const& name, auto minimum, auto maximum, auto quantum) { - if (!minimum.has_value() || !maximum.has_value()) { - std::cout << "\n# " << name << " Range: [not set]"; - } else { - std::cout << "\n# " << name << " Range: [" << *minimum << ',' - << *maximum << "]"; - } - std::cout << "\n# " << name << " Quantum: " << quantum; - }; - std::cout << "# Start time: " << gcs_bm::CurrentTime() // << "\n# Labels: " << options->labels // << "\n# Running test on bucket: " << options->bucket // + << "\n# Strategy: " << list_options.strategy // + << "\n# Prefix: " << list_options.prefix // + << "\n# Page Size (from --maximum-object-size): " // + << options->minimum_object_size + << "\n# Max Pages (from --minimum-object-size): " // + << options->maximum_object_size << "\n# Duration: " << absl::FormatDuration(absl::FromChrono(options->duration)) << "\n# Thread Count: " << options->thread_count << "\n# Client Per Thread: " << options->client_per_thread; - output_size_range("Object Size", options->minimum_object_size, - options->maximum_object_size); - output_quantized_range( - "Write Buffer Size", options->minimum_write_buffer_size, - options->maximum_write_buffer_size, options->write_buffer_quantum); - output_quantized_range("Read Buffer Size", options->minimum_read_buffer_size, - options->maximum_read_buffer_size, - options->read_buffer_quantum); - - std::cout << "\n# Minimum Sample Count: " << options->minimum_sample_count - << "\n# Maximum Sample Count: " << options->maximum_sample_count - << "\n# Enabled Libs: " + std::cout << "\n# Enabled Libs: " << absl::StrJoin(options->libs, ",", Formatter{}) << "\n# Enabled Transports: " << absl::StrJoin(options->transports, ",", Formatter{}) - << "\n# Enabled CRC32C: " - << absl::StrJoin(options->enabled_crc32c, ",", Formatter{}) - << "\n# Enabled MD5: " - << absl::StrJoin(options->enabled_md5, ",", Formatter{}) - << "\n# Minimum Sample Delay: " - << absl::FormatDuration( - absl::FromChrono(options->minimum_sample_delay)); + << "\n# Minimum Sample Count: " << options->minimum_sample_count + << "\n# Maximum Sample Count: " << options->maximum_sample_count; gcs_bm::PrintOptions(std::cout, "Common", options->client_options); - gcs_bm::PrintOptions(std::cout, "Rest", options->rest_options); + gcs_bm::PrintOptions(std::cout, "Json", options->rest_options); gcs_bm::PrintOptions(std::cout, "Grpc", options->grpc_options); gcs_bm::PrintOptions(std::cout, "Direct Path", options->direct_path_options); - output_optional_quantized_range("Read Offset", options->minimum_read_offset, - options->maximum_read_offset, - options->read_offset_quantum); - output_optional_quantized_range("Read Size", options->minimum_read_size, - options->maximum_read_size, - options->read_size_quantum); - std::cout << "\n# Build info: " << notes << "\n"; - // Make the output generated so far immediately visible, helps with debugging. + + // --- MODIFICATION --- + // Print a header for our clean, machine-readable data + std::cout << "# New CSV Data Header:\n" + << "DATA_ROW_HEADER,Transport,Strategy,TotalObjects,PageSize," + "LatencySeconds\n"; + // --- END MODIFICATION --- + std::cout << std::flush; - // Serialize output to `std::cout`. + std::chrono::microseconds total_latency{0}; + std::mutex mu; - auto handler = [&mu](ThroughputOptions const& options, + auto handler = [&mu, &total_latency](ThroughputOptions const& options, gcs_bm::ThroughputResult const& result) { std::lock_guard lk(mu); - gcs_bm::PrintAsCsv(std::cout, options, result); + + // ADD THIS LINE to accumulate the latency + total_latency += result.elapsed_time; + + // This is commented out, as requested. + // gcs_bm::PrintAsCsv(std::cout, options, result); + if (!result.status.ok()) { google::cloud::LogSink::Instance().Flush(); } }; auto provider = MakeProvider(*options); + // Add a new mutex for printing sample summaries + std::mutex print_mu; + gcs_bm::PrintThroughputResultHeader(std::cout); std::vector> tasks; for (int i = 0; i != options->thread_count; ++i) { + // Pass the new print_mu by reference to each thread tasks.emplace_back(std::async(std::launch::async, RunThread, *options, i, - handler, provider)); + handler, provider, list_options.prefix, + list_options.strategy, std::ref(print_mu))); } for (auto& f : tasks) f.get(); + auto const total_seconds = + std::chrono::duration_cast>(total_latency); + std::cout << "#\n# Total Latency (sum of all page fetches): " + << total_seconds.count() << " s\n"; std::cout << "# DONE\n" << std::flush; - return 0; + return 1; } namespace { @@ -270,16 +385,19 @@ gcs_bm::ClientProvider BaseProvider(ThroughputOptions const& options) { if (t == ExperimentTransport::kDirectPath) { opts = google::cloud::internal::MergeOptions(options.direct_path_options, std::move(opts)); + // std::cout << "direct path\n"; // Quieting this for clean output return gcs::MakeGrpcClient(std::move(opts)); } if (t == ExperimentTransport::kGrpc) { opts = google::cloud::internal::MergeOptions(options.grpc_options, std::move(opts)); + // std::cout << "gRPC path\n"; // Quieting this for clean output return gcs::MakeGrpcClient(std::move(opts)); } #else (void)t; // disable unused parameter warning #endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC + // std::cout << "json path\n"; // Quieting this for clean output opts = google::cloud::internal::MergeOptions(options.rest_options, std::move(opts)); return gcs::Client(std::move(opts)); @@ -292,83 +410,159 @@ gcs_bm::ClientProvider MakeProvider(ThroughputOptions const& options) { return provider; } -void RunThread(ThroughputOptions const& options, int thread_id, - ResultHandler const& handler, - gcs_bm::ClientProvider const& provider) { - auto generator = google::cloud::internal::DefaultPRNG(std::random_device{}()); - - auto uploaders = gcs_bm::CreateUploadExperiments(options, provider); - if (uploaders.empty()) { - // This is possible if only gRPC is requested but the benchmark was compiled - // without gRPC support. - std::cout << "# None of the APIs configured are available\n"; - return; - } - auto downloaders = - gcs_bm::CreateDownloadExperiments(options, provider, thread_id); - if (downloaders.empty()) { - // This is possible if only gRPC is requested but the benchmark was compiled - // without gRPC support. - std::cout << "# None of the APIs configured are available\n"; - return; +// MODIFIED: This is the new benchmark function for a single experiment run +// (i.e., one full pagination loop). It calls the handler for *each page*. +void RunListBenchmark(ThroughputOptions const& options, + ResultHandler const& handler, gcs::Client client, + std::string const& prefix, + std::string const& strategy) { + // Re-purposed options + std::int64_t const page_size = options.minimum_object_size; + std::int64_t const max_pages = options.maximum_object_size; + int page_number = 0; + + // Use the first lib/transport configured for reporting + auto const lib = options.libs[0]; + auto const transport = options.transports[0]; + + + if (strategy == "page-token") { + // The 'page-token' strategy. + // We cannot access 'pages' directly. Instead, we iterate the object + // stream (which uses pageToken efficiently) and 'chunk' the results + // ourselves, timing each chunk to simulate a page fetch. + auto reader = client.ListObjects(options.bucket, gcs::Prefix(prefix)); + auto object_it = reader.begin(); + bool done = false; + + do { + page_number++; + auto const system_start = std::chrono::system_clock::now(); + auto const steady_start = std::chrono::steady_clock::now(); + + std::size_t items_in_this_page = 0; + google::cloud::Status status; + + // Pull 'page_size' items from the stream + while (items_in_this_page < static_cast(page_size)) { + if (object_it == reader.end()) { + done = true; + break; + } + auto& object = *object_it; + if (!object) { + status = std::move(object).status(); + done = true; + break; + } + status = google::cloud::Status(); // We got an item, so it's OK + items_in_this_page++; + ++object_it; + } + + auto const steady_end = std::chrono::steady_clock::now(); + auto const elapsed = std::chrono::duration_cast( + steady_end - steady_start); + + // Report this chunk as one sample + handler(options, gcs_bm::ThroughputResult{ + system_start, + lib, + transport, + gcs_bm::kOpRead0, + static_cast(items_in_this_page), + static_cast(page_number), + 0, // read_offset + false, // crc_enabled + false, // md5_enabled + // FIX: Add the missing 3rd boolean argument + false, + elapsed, + std::chrono::microseconds(0), // No CPU time + status}); + + if (done || page_number >= max_pages) break; + } while (true); + + } else { // "start-offset" + std::string next_start_offset; + do { + page_number++; + auto const system_start = std::chrono::system_clock::now(); + auto const steady_start = std::chrono::steady_clock::now(); + + // This logic matches your pseudocode: create a new reader on each + // iteration + auto reader = client.ListObjects( + options.bucket, gcs::Prefix(prefix), + gcs::StartOffset(next_start_offset), gcs::MaxResults(page_size)); + + std::vector items; + google::cloud::Status status; + + // Iterate just enough to fill one page + for (auto& object : reader) { + if (!object) { + status = std::move(object).status(); + break; + } + status = google::cloud::Status(); + // Skip the first object if it's the one we started from + if (!next_start_offset.empty() && + object->name() == next_start_offset) { + continue; + } + items.push_back(std::move(*object)); + if (items.size() >= static_cast(page_size)) break; + } + + auto const steady_end = std::chrono::steady_clock::now(); + auto const elapsed = std::chrono::duration_cast( + steady_end - steady_start); + + if (status.ok() && !items.empty()) { + next_start_offset = items.back().name(); + } else if (status.ok() && items.empty()) { + // No more items, stop the loop + next_start_offset.clear(); + break; + } + + // Report this page fetch as one sample + handler(options, gcs_bm::ThroughputResult{ + system_start, + lib, + transport, + gcs_bm::kOpRead1, + static_cast(items.size()), + static_cast(page_number), + 0, // read_offset + false, // crc_enabled + false, // md5_enabled + // FIX: Add the missing 3rd boolean argument + false, + elapsed, + std::chrono::microseconds(0), // No CPU time + status}); + + if (!status.ok()) break; // Error + if (items.empty()) break; // End of list + if (next_start_offset.empty()) break; // End of list + } while (page_number < max_pages); } +} - std::uniform_int_distribution uploader_generator( - 0, uploaders.size() - 1); - std::uniform_int_distribution downloader_generator( - 0, downloaders.size() - 1); - - std::uniform_int_distribution size_generator( - options.minimum_object_size, options.maximum_object_size); - - auto quantized_range_generator = [](auto minimum, auto maximum, - auto quantum) { - auto distribution = std::uniform_int_distribution( - minimum / quantum, maximum / quantum); - return [d = std::move(distribution), quantum](auto& g) mutable { - return quantum * d(g); - }; - }; - auto write_buffer_size_generator = quantized_range_generator( - options.minimum_write_buffer_size, options.maximum_write_buffer_size, - options.write_buffer_quantum); - auto read_buffer_size_generator = quantized_range_generator( - options.minimum_read_buffer_size, options.maximum_read_buffer_size, - options.read_buffer_quantum); - - auto read_range_generator = [&](auto& g, std::int64_t object_size) - -> absl::optional> { - if (!options.minimum_read_size.has_value() || - !options.maximum_read_size.has_value() || - !options.minimum_read_offset.has_value() || - !options.maximum_read_offset.has_value()) { - return absl::nullopt; - } - auto read_offset_generator = quantized_range_generator( - *options.minimum_read_offset, *options.maximum_read_offset, - options.read_offset_quantum); - auto read_size_generator = quantized_range_generator( - *options.minimum_read_size, *options.maximum_read_size, - options.read_size_quantum); - auto offset = (std::min)(object_size, read_offset_generator(g)); - auto size = (std::min)(object_size - offset, read_size_generator(g)); - // This makes it easier to control the ratio of ranged vs. full reads from - // the command-line. To make more full reads happen set the read range size - // to be larger than the object sizes. The larger this read range size is, - // the higher the proportion of full range reads. - if (offset == 0 && size == object_size) return absl::nullopt; - // The REST API has a quirk: reading the last 0 bytes returns all the bytes. - // Just read the *first* 0 bytes in that case. Note that `size == 0` is - // implied by the initialization to `min(object_size - offset, ...)`. - if (offset == object_size) return std::make_pair(0, 0); - return std::make_pair(offset, size); - }; +// MODIFIED: This function is completely refactored for the list benchmark +// Add std::mutex& print_mutex and restore thread_id parameter +void RunThread(ThroughputOptions const& options, int thread_id, + ResultHandler const& handler, + gcs_bm::ClientProvider const& provider, + std::string const& prefix, std::string const& strategy, + std::mutex& print_mutex) { - std::uniform_int_distribution crc32c_generator( - 0, options.enabled_crc32c.size() - 1); - std::uniform_int_distribution md5_generator( - 0, options.enabled_crc32c.size() - 1); + // Each thread gets its own client(s) + auto client = provider(options.transports[0]); auto deadline = std::chrono::steady_clock::now() + options.duration; @@ -377,48 +571,63 @@ void RunThread(ThroughputOptions const& options, int thread_id, iteration_count < options.maximum_sample_count && (iteration_count < options.minimum_sample_count || start < deadline); start = std::chrono::steady_clock::now(), ++iteration_count) { - auto const object_name = gcs_bm::MakeRandomObjectName(generator); - auto const object_size = size_generator(generator); - auto const write_buffer_size = write_buffer_size_generator(generator); - auto const read_buffer_size = read_buffer_size_generator(generator); - bool const enable_crc = options.enabled_crc32c[crc32c_generator(generator)]; - bool const enable_md5 = options.enabled_md5[md5_generator(generator)]; - auto const range = read_range_generator(generator, object_size); - - auto& uploader = uploaders[uploader_generator(generator)]; - auto upload_result = uploader->Run( - options.bucket, object_name, - gcs_bm::ThroughputExperimentConfig{ - gcs_bm::kOpWrite, object_size, write_buffer_size, enable_crc, - enable_md5, /*read_range=*/absl::nullopt}); - auto status = upload_result.status; - handler(options, std::move(upload_result)); - - if (!status.ok()) continue; - - auto& downloader = downloaders[downloader_generator(generator)]; - for (auto op : {gcs_bm::kOpRead0, gcs_bm::kOpRead1, gcs_bm::kOpRead2}) { - handler(options, downloader->Run(options.bucket, object_name, - gcs_bm::ThroughputExperimentConfig{ - op, object_size, read_buffer_size, - enable_crc, enable_md5, range})); + + // Time the *entire* RunListBenchmark call + auto const sample_start_time = std::chrono::steady_clock::now(); + + // Run one full experiment (one full prefix scan) + RunListBenchmark(options, handler, client, prefix, strategy); + + auto const sample_end_time = std::chrono::steady_clock::now(); + auto const sample_latency = + sample_end_time - sample_start_time; + + auto const sample_seconds = + std::chrono::duration_cast>(sample_latency); + + // --- MODIFICATION --- + // Calculate the "Total Objects" based on the run's parameters + std::int64_t const page_size = options.minimum_object_size; + std::int64_t const max_pages = options.maximum_object_size; + std::int64_t total_objects = page_size * max_pages; + + // Use the mutex to print the summary for this sample + { + std::lock_guard lk(print_mutex); + + // This is the human-readable line (commented out for clean CSV) + // std::cout << "# [Thread " << thread_id << ", Sample " << iteration_count + 1 + // << "] Total latency for this sample: " + // << sample_seconds.count() << " s\n"; + + // This is the clean CSV line we will capture + // Format: DATA_ROW,Transport,Strategy,TotalObjects,PageSize,LatencySeconds + std::cout << "DATA_ROW," + << gcs_bm::ToString(options.transports[0]) << "," + << strategy << "," + << total_objects << "," + << page_size << "," + << sample_seconds.count() << "\n"; } - auto client = provider(ExperimentTransport::kJson); - (void)client.DeleteObject(options.bucket, object_name); + // --- END MODIFICATION --- + // If needed, pace the benchmark so each thread generates only so many - // samples each second. + // "experiments" (full scans) each second. auto const pace = start + options.minimum_sample_delay; auto const now = std::chrono::steady_clock::now(); if (pace > now) std::this_thread::sleep_for(pace - now); } } -google::cloud::StatusOr SelfTest(char const* argv0) { + + +// MODIFIED: Updated SelfTest with new/re-purposed defaults +google::cloud::StatusOr SelfTest( + char const* argv0, ListOptions& list_options) { using ::google::cloud::internal::GetEnv; using ::google::cloud::internal::Sample; - auto const bucket_name = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); + std::string bucket_name = "vaibhav-test-001"; if (bucket_name.empty()) { std::ostringstream os; os << "The GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME environment variable " @@ -426,42 +635,36 @@ google::cloud::StatusOr SelfTest(char const* argv0) { return google::cloud::internal::UnknownError(std::move(os).str(), GCP_ERROR_INFO()); } - return gcs_bm::ParseThroughputOptions( - { - argv0, - "--bucket=" + bucket_name, - "--thread-count=1", - "--minimum-object-size=16KiB", - "--maximum-object-size=32KiB", - "--minimum-write-buffer-size=16KiB", - "--maximum-write-buffer-size=128KiB", - "--write-buffer-quantum=16KiB", - "--minimum-read-buffer-size=16KiB", - "--maximum-read-buffer-size=128KiB", - "--read-buffer-quantum=16KiB", - "--duration=1s", - "--minimum-sample-count=4", - "--maximum-sample-count=10", - "--enabled-transports=Json", - "--enabled-crc32c=enabled", - "--enabled-md5=disabled", - }, - kDescription); -} -google::cloud::StatusOr ParseArgs(int argc, char* argv[]) { - bool auto_run = - google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_AUTO_RUN_EXAMPLES") - .value_or("") == "yes"; - if (auto_run) return SelfTest(argv[0]); + // FIX: Set the default prefix and strategy on the 'list_options' + // struct that was passed in by reference. + if (list_options.prefix.empty()) { + list_options.prefix = "gcs-cpp-benchmark-prefix/"; + } + if (list_options.strategy.empty()) { + list_options.strategy = "start-offset"; + // list_options.strategy = "page-token"; + } - auto options = - gcs_bm::ParseThroughputOptions({argv, argv + argc}, kDescription); - if (!options) return options; - // We don't want to get the default labels in the unit tests, as they can - // flake. - options->labels = gcs_bm::AddDefaultLabels(std::move(options->labels)); - return options; + // FIX: This vector ONLY contains arguments that the original + // ParseThroughputOptions function understands. + // --prefix and --strategy have been REMOVED. + std::vector args = { + argv0, + "--bucket=vaibhav-test-001", + "--thread-count=1", + // Re-purposed: 1000 items per page + "--minimum-object-size=1000", + // Re-purposed: Fetch a max of 100 pages + "--maximum-object-size=10000", + // Remove all irrelevant args + "--duration=30000s", + "--minimum-sample-count=1", + "--maximum-sample-count=10", + "--enabled-transports=Grpc", // Use Grpc or Rest + }; + + return gcs_bm::ParseThroughputOptions(args, kDescription); } -} // namespace +} // namespace \ No newline at end of file diff --git a/google/cloud/storage/examples/storage_quickstart.cc b/google/cloud/storage/examples/storage_quickstart.cc index ddf03f841b2e8..563f239830f97 100644 --- a/google/cloud/storage/examples/storage_quickstart.cc +++ b/google/cloud/storage/examples/storage_quickstart.cc @@ -74,13 +74,10 @@ void RunAll(std::vector const& argv) { (void)examples::RemoveBucketAndContents(client, bucket_name); } -} // namespace +} // namespace int main(int argc, char* argv[]) { namespace examples = ::google::cloud::storage::examples; - examples::Example example({ - {"storage-quickstart", StorageQuickstartCommand}, - {"auto", RunAll}, - }); - return example.Run(argc, argv); + StorageQuickstart("vaibhav-test-009"); + return 1; } diff --git a/run_all_benchmarks.sh b/run_all_benchmarks.sh new file mode 100755 index 0000000000000..a6bff2a819f6f --- /dev/null +++ b/run_all_benchmarks.sh @@ -0,0 +1,119 @@ +#!/bin/bash + +# --- Configuration --- + +# MODIFIED: This now uses your bazel run command. +# The -- separates bazel args from the program's args. +BENCHMARK_CMD="bazel run //google/cloud/storage/benchmarks:storage_throughput_vs_cpu_benchmark --" + +# Set this to the object prefix you are testing against +# IMPORTANT: You must have 1M+ objects at this prefix +PREFIX="gcs-cpp-benchmark-prefix-2-million-right/" + +# --- FIX --- +# Set the bucket name +BUCKET_NAME="vaibhav-test-001" +# --- END FIX --- + +# Page size is constant for all runs +PAGE_SIZE=5000 + +# Set the number of samples (e.g., 10) for each experiment +SAMPLES_PER_RUN=3 + +# --- MODIFICATION FOR PARALLEL RUN --- +# Directory to store temporary results, one file per job +TMP_DIR="benchmark_tmp_results" +# The final combined results file +RESULTS_FILE="benchmark_results.csv" +# --- END MODIFICATION --- + + +# Define the parameters for your 18 runs +# Using "Rest" for "json" as that is what the --enabled-transports flag expects +TRANSPORTS=("Grpc" "DirectPath" "Json") +STRATEGIES=("start-offset") +OBJECT_COUNTS=(5000000) + + +# --- MODIFICATION FOR PARALLEL RUN --- +# Clean up old results and create the temp directory +rm -f $RESULTS_FILE +rm -rf $TMP_DIR +mkdir -p $TMP_DIR +echo "Temporary results will be stored in $TMP_DIR/" +# --- END MODIFICATION --- + +# This line is moved down, *after* the wait +# echo "Transport,Strategy,TotalObjects,PageSize,LatencySeconds" > $RESULTS_FILE + +echo "Starting all 6 benchmark experiments in PARALLEL..." +echo "(Based on your config: 3 transports * 2 strategies * 1 object count = 6 jobs)" + +job_num=0 # Job counter for unique filenames + +# Loop through all combinations +for transport in "${TRANSPORTS[@]}"; do + for strategy in "${STRATEGIES[@]}"; do + for objects in "${OBJECT_COUNTS[@]}"; do + + # Using your exact logic + let max_pages=100000 + + echo "---" + echo "STARTING Job $job_num: Transport=$transport, Strategy=$strategy, Objects=$objects, Pages=$max_pages" + + # Construct the command + # We use --thread-count=1 for a clean measurement + CMD="$BENCHMARK_CMD \ + --bucket=$BUCKET_NAME \ + --prefix=$PREFIX \ + --strategy=$strategy \ + --enabled-transports=$transport \ + --minimum-object-size=$PAGE_SIZE \ + --maximum-object-size=$max_pages \ + --minimum-sample-count=$SAMPLES_PER_RUN \ + --maximum-sample-count=$SAMPLES_PER_RUN \ + --thread-count=1" + + # --- MODIFICATION FOR PARALLEL RUN --- + # Define a unique temp file for this job's output + TMP_FILE="$TMP_DIR/results-job-$job_num.csv" + + # Run the command in a subshell in the background (&) + # All output is piped and redirected to the unique temp file + ( + echo "Job $job_num ($transport/$strategy/$objects) processing..." + # We pipe stderr (2) to stdout (1) using 2>&1 + $CMD 2>&1 | grep "DATA_ROW," | cut -d',' -f2- > $TMP_FILE + echo "Job $job_num ($transport/$strategy/$objects) FINISHED." + ) & + # --- END MODIFICATION --- + + ((job_num++)) # Increment job counter + + done + done +done + +# --- MODIFICATION FOR PARALLEL RUN --- +echo "---" +echo "All $job_num jobs launched. Waiting for them to complete..." + +# This pauses the script and waits for ALL background jobs to finish +wait + +echo "All jobs finished." +echo "Combining results into $RESULTS_FILE..." + +# Create the final results file and print the header +echo "Transport,Strategy,TotalObjects,PageSize,LatencySeconds" > $RESULTS_FILE + +# Concatenate all temporary files into the final one +cat $TMP_DIR/results-job-*.csv >> $RESULTS_FILE +# --- END MODIFICATION --- + +echo "---" +echo "All benchmarks complete. Data is ready in $RESULTS_FILE" +echo "Cleaning up temporary directory $TMP_DIR..." +rm -r $TMP_DIR \ No newline at end of file