diff --git a/google/cloud/grpc_options.cc b/google/cloud/grpc_options.cc index c5f42ed01c0f9..c6c80f884905f 100644 --- a/google/cloud/grpc_options.cc +++ b/google/cloud/grpc_options.cc @@ -112,6 +112,14 @@ grpc::ChannelArguments MakeChannelArguments(Options const& opts) { static_cast(kKeepaliveTimeout.count())); } + if (opts.has()) { + std::cout<<"Setting GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES to " + << opts.get() << "\n"; + channel_arguments.SetInt(GRPC_ARG_HTTP2_STREAM_LOOKAHEAD_BYTES, + opts.get()); + channel_arguments.SetInt(GRPC_ARG_HTTP2_BDP_PROBE, 0); + } + auto const proxy = MakeGrpcHttpProxy(opts.get()); if (!proxy.empty()) channel_arguments.SetString(GRPC_ARG_HTTP_PROXY, proxy); diff --git a/google/cloud/grpc_options.h b/google/cloud/grpc_options.h index 92eee8969f044..5e48ae17189c8 100644 --- a/google/cloud/grpc_options.h +++ b/google/cloud/grpc_options.h @@ -181,6 +181,19 @@ struct GrpcBackgroundThreadsFactoryOption { using Type = BackgroundThreadsFactory; }; +/** + * The gRPC HTTP/2 stream lookahead bytes. + * + * If set to a value > 0, this will also set `GRPC_ARG_HTTP2_BDP_PROBE` to 0. + * See go/gcs-grpc-stream-lookahead-proposal for details. + * + * @ingroup options + */ +struct GrpcStreamLookaheadBytesOption { + using Type = int; +}; + + /** * A list of all the gRPC options. */ diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index fed96694e768d..4ef53c43824c9 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -17,8 +17,8 @@ #include "google/cloud/storage/async/bucket_name.h" #include "google/cloud/storage/async/client.h" #include "google/cloud/storage/async/idempotency_policy.h" -#include "google/cloud/storage/async/options.h" #include "google/cloud/storage/async/read_all.h" +#include "google/cloud/opentelemetry_options.h" #include "google/cloud/storage/grpc_plugin.h" #include "google/cloud/storage/testing/storage_integration_test.h" #include "google/cloud/grpc_options.h" @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include namespace google { @@ -52,15 +54,7 @@ using ::testing::VariantWith; class AsyncClientIntegrationTest : public google::cloud::storage::testing::StorageIntegrationTest { - protected: - void SetUp() override { - bucket_name_ = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME").value_or(""); - ASSERT_THAT(bucket_name_, Not(IsEmpty())) - << "GOOGLE_CLOUD_CPP_STORAGE_TEST_BUCKET_NAME is not set"; - } - - std::string const& bucket_name() const { return bucket_name_; } +protected: using google::cloud::storage::testing::StorageIntegrationTest:: ScheduleForDelete; @@ -71,998 +65,325 @@ class AsyncClientIntegrationTest .set_generation(object.generation())); } - private: +private: std::string bucket_name_; }; -auto TestOptions() { - // Disable metrics in the test, they just make the logs harder to grok. - return Options{} - .set(false) - .set(1) - .set(TracingOptions().SetOptions( - "truncate_string_field_longer_than=2048")); -} - -auto AlwaysRetry() { - return TestOptions().set( - MakeAlwaysRetryIdempotencyPolicy); -} - -TEST_F(AsyncClientIntegrationTest, ObjectCRUD) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); +namespace gcs = ::google::cloud::storage; - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - LoremIpsum(), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); +// auto AlwaysRetry() { +// return google::cloud::Options{}.set( +// MakeAlwaysRetryIdempotencyPolicy); +// } - auto full0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto full1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, 0, - LoremIpsum().size()); - auto partial0 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - auto partial1 = async.ReadObjectRange(BucketName(bucket_name()), object_name, - 2, LoremIpsum().size()); - - for (auto* p : {&full1, &full0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const full = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full, LoremIpsum()); +google::cloud::Options MakeOptions(google::cloud::Options opts) { + auto fallback = google::cloud::Options{}; + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_GRPC_ENDPOINT")) { + fallback.set(*v); } - for (auto* p : {&partial1, &partial0}) { - auto response = p->get(); - ASSERT_STATUS_OK(response); - auto contents = response->contents(); - auto const partial = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(partial, LoremIpsum().substr(2)); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_JSON_ENDPOINT")) { + fallback.set(*v); } - - auto status = async - .DeleteObject(BucketName(bucket_name()), object_name, - insert->generation()) - .get(); - EXPECT_STATUS_OK(status); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - auto head = async.ReadObjectRange(request, /*offset=*/0, /*limit=*/1).get(); - EXPECT_THAT(head, StatusIs(StatusCode::kNotFound)); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - auto destination = MakeRandomObjectName(); - - auto insert1 = async.InsertObject(BucketName(bucket_name()), o1, LoremIpsum(), - AlwaysRetry()); - auto insert2 = async.InsertObject(BucketName(bucket_name()), o2, LoremIpsum(), - AlwaysRetry()); - std::vector> inserted{insert1.get(), - insert2.get()}; - for (auto const& insert : inserted) { - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_AUTHORITY")) { + fallback.set(*v); } - std::vector sources; - std::transform(inserted.begin(), inserted.end(), std::back_inserter(sources), - [](auto const& o) { - google::storage::v2::ComposeObjectRequest::SourceObject r; - r.set_name(o->name()); - r.set_generation(o->generation()); - return r; - }); - auto pending = async.ComposeObject(BucketName(bucket_name()), destination, - std::move(sources)); - auto const composed = pending.get(); - EXPECT_STATUS_OK(composed); - ScheduleForDelete(*composed); - - auto read = async - .ReadObjectRange(BucketName(bucket_name()), destination, 0, - 2 * LoremIpsum().size()) - .get(); - ASSERT_STATUS_OK(read); - auto contents = read->contents(); - auto const full_contents = std::accumulate(contents.begin(), contents.end(), - std::string{}, [](auto a, auto b) { - a += std::string(b); - return a; - }); - EXPECT_EQ(full_contents, LoremIpsum() + LoremIpsum()); - EXPECT_THAT(read->metadata(), Optional(IsProtoEqual(*composed))); -} - -TEST_F(AsyncClientIntegrationTest, StreamingRead) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto const block = MakeRandomData(kLineSize); - std::vector insert_data(kLineCount); - std::generate(insert_data.begin(), insert_data.end(), [&, n = 0]() mutable { - return std::to_string(++n) + ": " + block; - }); - auto const expected_size = std::accumulate( - insert_data.begin(), insert_data.end(), static_cast(0), - [](auto a, auto const& b) { return a + b.size(); }); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - insert_data, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_size); - - auto r = async.ReadObject(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - EXPECT_EQ(actual.size(), expected_size); - auto view = absl::string_view(actual); - for (auto const& expected : insert_data) { - ASSERT_GE(view.size(), expected.size()); - ASSERT_EQ(expected, view.substr(0, expected.size())); - view.remove_prefix(expected.size()); + if (auto v = google::cloud::internal::GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_TARGET_API_VERSION")) { + fallback.set(*v); } - EXPECT_EQ(view, absl::string_view{}); -} - -TEST_F(AsyncClientIntegrationTest, StreamingReadRange) { - auto async = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a relatively large object so the streaming read makes sense. We - // aim for something around 5MiB, enough for 3 `Read()` calls. - auto constexpr kLineSize = 64; - auto constexpr kLineCount = 5 * 1024 * 1024 / kLineSize; - auto constexpr kReadOffset = kLineCount * kLineSize / 2; - auto const block = MakeRandomData(kLineSize - 1) + "\n"; - std::string contents; - for (int i = 0; i != kLineCount; ++i) contents += block; - auto const expected_insert_size = contents.size(); - - auto insert = async - .InsertObject(BucketName(bucket_name()), object_name, - contents, AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - ASSERT_EQ(insert->size(), expected_insert_size); - - auto request = google::storage::v2::ReadObjectRequest{}; - request.set_bucket(insert->bucket()); - request.set_object(insert->name()); - request.set_generation(insert->generation()); - request.set_read_offset(kReadOffset); - auto r = async.ReadObject(request).get(); - ASSERT_STATUS_OK(r); - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(r); - - std::string actual; - while (token.valid()) { - auto p = reader.Read(std::move(token)).get(); - ASSERT_STATUS_OK(p); - ReadPayload payload; - std::tie(payload, token) = *std::move(p); - for (auto v : payload.contents()) actual += std::string(v); - } - - EXPECT_EQ(absl::string_view(actual), - absl::string_view(contents).substr(kReadOffset)); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResume) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - ASSERT_EQ(writer.UploadId(), upload_id); - auto const persisted = writer.PersistedState(); - // We don't expect this to be larger that the total size of the object. - // Incidentally, this shows the value fits into an `int`. - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadResumeFinalized) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto w = client.StartUnbufferedUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto const upload_id = writer.UploadId(); - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - w = client.ResumeUnbufferedUpload(upload_id).get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadEmpty) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadMultiple) { - auto client = AsyncClient(TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - - auto w = - client.StartBufferedUpload(BucketName(bucket_name()), object_name).get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObject) { - auto async = AsyncClient(TestOptions()); - auto o1 = MakeRandomObjectName(); - auto o2 = MakeRandomObjectName(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto insert = async - .InsertObject(BucketName(bucket_name()), o1, - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(insert); - ScheduleForDelete(*insert); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - google::storage::v2::Object metadata; - AsyncRewriter rewriter; - AsyncToken token; - google::storage::v2::RewriteObjectRequest request; - request.set_destination_name(o2); - request.set_destination_bucket(BucketName(bucket_name()).FullName()); - request.set_source_object(o1); - request.set_source_bucket(BucketName(bucket_name()).FullName()); - request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(std::move(request)); - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_FALSE(token.valid()); - } - EXPECT_EQ(metadata.name(), o2); - EXPECT_EQ(metadata.size(), insert->size()); -} - -TEST_F(AsyncClientIntegrationTest, RewriteObjectResume) { - auto async = AsyncClient(TestOptions()); - auto destination = - GetEnv("GOOGLE_CLOUD_CPP_STORAGE_TEST_DESTINATION_BUCKET_NAME"); - if (!destination || destination->empty()) GTEST_SKIP(); - - auto constexpr kBlockSize = 4 * 1024 * 1024; - auto source = - async - .InsertObject(BucketName(bucket_name()), MakeRandomObjectName(), - MakeRandomData(kBlockSize), AlwaysRetry()) - .get(); - ASSERT_STATUS_OK(source); - ScheduleForDelete(*source); - - // Start a rewrite, but limit each iteration to a small number of bytes, to - // force multiple calls. - AsyncRewriter rewriter; - AsyncToken token; - auto const expected_name = MakeRandomObjectName(); - google::storage::v2::RewriteObjectRequest start_request; - start_request.set_destination_name(expected_name); - start_request.set_destination_bucket(BucketName(*destination).FullName()); - start_request.set_source_object(source->name()); - start_request.set_source_bucket(source->bucket()); - start_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.StartRewrite(start_request); - - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); - google::storage::v2::RewriteResponse response; - AsyncToken t; - std::tie(response, t) = *std::move(rt); - - // We want to resume a partially completed resume. Verify the first rewrite - // did not complete things. - ASSERT_THAT(response.rewrite_token(), Not(IsEmpty())); + fallback.set(false); + return google::cloud::internal::MergeOptions(std::move(opts), fallback); +} + + +google::cloud::storage::Client MakeGrpcClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id)); + return google::cloud::storage::MakeGrpcClient(std::move(options)); +} + +google::cloud::storage_experimental::AsyncClient MakeAsyncClient(std::string project_id) { + auto options = MakeOptions(google::cloud::Options{} + .set(project_id) + .set({"rpc"}) + .set(131072) + .set(true)); + return google::cloud::storage_experimental::AsyncClient(options); +} + +class ThreadPool { +public: + // Constructor initializes the thread pool with a given number of worker threads. + ThreadPool(size_t threads) : stop_(false) { + if (threads == 0) { + throw std::invalid_argument("Thread count cannot be zero."); + } + for (size_t i = 0; i < threads; ++i) { + workers_.emplace_back([this] { + while (true) { + std::function task; + { + // Acquire a lock on the task queue. + std::unique_lock lock(this->queue_mutex_); + + // Wait for a task to be available or for the pool to stop. + this->condition_.wait(lock, [this] { + return this->stop_ || !this->tasks_.empty(); + }); + + // If the pool is stopping and no tasks are left, exit the thread. + if (this->stop_ && this->tasks_.empty()) { + return; + } + + // Get the next task from the queue. + task = std::move(this->tasks_.front()); + this->tasks_.pop(); + } + + // Execute the task. + task(); + } + }); + } + } + + // Adds a new task to the thread pool. + template + auto enqueue(F&& f, Args&&... args) -> std::future::type> { + using return_type = typename std::result_of::type; + + // Create a packaged_task to wrap the function and its arguments. + auto task = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...) + ); + + std::future res = task->get_future(); + { + // Acquire a lock on the queue and push the task. + std::unique_lock lock(queue_mutex_); + + // Don't allow enqueueing after stopping the pool. + if (stop_) { + throw std::runtime_error("enqueue on stopped ThreadPool"); + } + + tasks_.emplace([task]() { (*task)(); }); + } + + // Notify one waiting thread that a new task is available. + condition_.notify_one(); + return res; + } + + // Destructor stops all worker threads and joins them. + ~ThreadPool() { + { + std::unique_lock lock(queue_mutex_); + stop_ = true; + } + + // Notify all threads so they can wake up and exit their loops. + condition_.notify_all(); + + for (std::thread& worker : workers_) { + worker.join(); + } + } + +private: + std::vector workers_; + std::queue> tasks_; + + std::mutex queue_mutex_; + std::condition_variable condition_; + + bool stop_; +}; - google::storage::v2::RewriteObjectRequest resume_request; - resume_request.set_source_bucket(source->bucket()); - resume_request.set_source_object(source->name()); - resume_request.set_destination_bucket(BucketName(*destination).FullName()); - resume_request.set_destination_name(expected_name); - resume_request.set_max_bytes_rewritten_per_call(1024 * 1024); - std::tie(rewriter, token) = async.ResumeRewrite(std::move(resume_request)); - google::storage::v2::Object metadata; - while (token.valid()) { - auto rt = rewriter.Iterate(std::move(token)).get(); - ASSERT_STATUS_OK(rt); +void ReadRangeTask(std::shared_ptr descriptor, + std::int64_t& offset, + std::int64_t& limit) { + for(int i =0;i<10000;i++){ + AsyncReader r; AsyncToken t; - std::tie(response, t) = *std::move(rt); - token = std::move(t); - if (!response.has_resource()) continue; - metadata = response.resource(); - ScheduleForDelete(metadata); - EXPECT_EQ(metadata.bucket(), BucketName(*destination).FullName()); - EXPECT_EQ(metadata.name(), expected_name); - EXPECT_EQ(metadata.size(), source->size()); - EXPECT_FALSE(token.valid()); - } -} - -TEST_F(AsyncClientIntegrationTest, InsertFailure) { - auto async = AsyncClient(TestOptions()); - - auto insert = async - .InsertObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), LoremIpsum()) - .get(); - ASSERT_THAT(insert, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadFailure) { - auto async = AsyncClient(TestOptions()); - - auto read = async - .ReadObject(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - // At the moment, only connectivity errors are detected before the first - // `Read()` call. Accept such failures too: - if (!read) return; - AsyncReader reader; - AsyncToken token; - std::tie(reader, token) = *std::move(read); - auto payload = ReadAll(std::move(reader), std::move(token)).get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ReadRangeFailure) { - auto async = AsyncClient(TestOptions()); - - auto payload = - async - .ReadObjectRange(BucketName(MakeRandomBucketName()), - MakeRandomObjectName(), /*offset=*/0, /*limit=*/1) - .get(); - ASSERT_THAT(payload, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartBufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeBufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async.ResumeBufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = async - .StartUnbufferedUpload(BucketName(MakeRandomBucketName()), - MakeRandomObjectName()) - .get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeUnbufferedUploadFailure) { - auto async = AsyncClient(TestOptions()); - - auto writer = - async.ResumeUnbufferedUpload("test-only-invalid-upload-id").get(); - ASSERT_THAT(writer, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ComposeObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto make_source = [](std::string name) { - auto source = google::storage::v2::ComposeObjectRequest::SourceObject{}; - source.set_name(std::move(name)); - return source; - }; - auto composed = - async - .ComposeObject(BucketName(bucket_name()), MakeRandomObjectName(), - {make_source(MakeRandomObjectName()), - make_source(MakeRandomObjectName())}) - .get(); - ASSERT_THAT(composed, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, DeleteObjectFailure) { - auto async = AsyncClient(TestOptions()); - - auto deleted = - async.DeleteObject(BucketName(bucket_name()), MakeRandomObjectName()) - .get(); - ASSERT_THAT(deleted, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.StartRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName()); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, ResumeRewriteFailure) { - auto async = AsyncClient(TestOptions()); - - AsyncRewriter rewriter; - AsyncToken token; - std::tie(rewriter, token) = - async.ResumeRewrite(BucketName(bucket_name()), MakeRandomObjectName(), - BucketName(bucket_name()), MakeRandomObjectName(), - "test-only-invalid-rewrite-token"); - ASSERT_TRUE(token.valid()); - auto iteration = rewriter.Iterate(std::move(token)).get(); - ASSERT_THAT(iteration, Not(IsOk())); -} - -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadEmpty) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); + + // 1. Get the reader and token for the specified range + // MODIFIED: Call Read() directly on the descriptor object + std::tie(r, t) = descriptor->Read(offset, limit); + + // 2. Consume the entire stream for this range + while (t.valid()) { + auto read = r.Read(std::move(t)).get(); + + // ASSERT_STATUS_OK will flag the test as failed and abort this + // thread if the status is not OK. + ASSERT_STATUS_OK(read); + + ReadPayload p; + AsyncToken t_new; + std::tie(p, t_new) = *std::move(read); + t = std::move(t_new); + + // In this test, we are discarding the payload `p`, just as + // the original single-threaded loop did. + } } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), 0); } -TEST_F(AsyncClientIntegrationTest, StartAppendableObjectUploadMultiple) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kBlockCount = 16; - auto const block = MakeRandomData(kBlockSize); - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } +TEST_F(AsyncClientIntegrationTest, StartAppendableUploadEmpty) { + auto project_id = "bajajnehaa-devrel-test"; + // auto const kproject = google::cloud::Project(project_id); - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); + auto client = MakeGrpcClient(project_id); - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); -} + auto bucket_name = std::string{"gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"}; + auto object_name = "vaibhav-test-file-5"; + auto placement = gcs::BucketCustomPlacementConfig{{"us-west4-a"}}; + // auto hns = gcs::BucketHierarchicalNamespace{true}; + auto ubla = gcs::BucketIamConfiguration{gcs::UniformBucketLevelAccess{true, {}}, absl::nullopt}; -TEST_F(AsyncClientIntegrationTest, ResumeAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = 256 * 1024; - auto constexpr kInitialBlockCount = 4; - auto constexpr kTotalBlockCount = 4 + kInitialBlockCount; - auto constexpr kDesiredSize = kBlockSize * kTotalBlockCount; + auto constexpr kBlockSize = 1024*1024 * 10; + auto constexpr kBlockCount = 1000; auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - for (int i = 0; i != kInitialBlockCount - 1; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - writer.Close(); - - // Reset the existing writer and resume the upload. - writer = AsyncWriter(); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - auto const persisted = writer.PersistedState(); - ASSERT_THAT(persisted, VariantWith(Le(kDesiredSize))); - // Cast to `int` because otherwise we need to write multiple casts below. - auto offset = static_cast(absl::get(persisted)); - if (offset % kBlockSize != 0) { - auto s = block.substr(offset % kBlockSize); - auto const size = s.size(); - auto p = writer.Write(std::move(token), WritePayload(std::move(s))).get(); - ASSERT_STATUS_OK(p); - offset += static_cast(size); - token = *std::move(p); - } - while (offset < kDesiredSize) { - auto const n = std::min(kBlockSize, kDesiredSize - offset); - auto p = - writer.Write(std::move(token), WritePayload(block.substr(0, n))).get(); - ASSERT_STATUS_OK(p); - offset += n; - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kDesiredSize); -} - -TEST_F(AsyncClientIntegrationTest, ResumeFinalizedAppendableObjectUpload) { - // Skipping the test till we get the takeover feature on testbench. - GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto metadata = writer.Finalize(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); - - auto object_metadata = client.GetObjectMetadata(bucket_name(), object_name); - ASSERT_STATUS_OK(object_metadata); - auto m = *object_metadata; - auto generation = m.generation(); - - w = async - .ResumeAppendableObjectUpload(BucketName(bucket_name()), object_name, - generation) - .get(); - ASSERT_STATUS_OK(w); - std::tie(writer, token) = *std::move(w); - EXPECT_FALSE(token.valid()); - EXPECT_THAT(writer.PersistedState(), VariantWith( - IsProtoEqual(*metadata))); -} - -TEST_F(AsyncClientIntegrationTest, ExplicitFlushAppendableObjectUpload) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - // Create a small block to send over and over. - auto constexpr kBlockSize = static_cast(256 * 1024); - auto const block = MakeRandomData(kBlockSize); - - auto create = - client.CreateBucket(bucket_name(), storage::BucketMetadata{} - .set_location("us-west4") - .set_storage_class("RAPID")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - - // Explicitly flush the data. - auto flush_status = writer.Flush().get(); - EXPECT_STATUS_OK(flush_status); - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - ScheduleForDelete(*metadata); - - EXPECT_EQ(metadata->bucket(), BucketName(bucket_name()).FullName()); - EXPECT_EQ(metadata->name(), object_name); - EXPECT_EQ(metadata->size(), kBlockSize); -} - -TEST_F(AsyncClientIntegrationTest, Open) { - if (!UsingEmulator()) GTEST_SKIP(); - auto async = AsyncClient(TestOptions()); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); - } - - auto constexpr kSize = 8 * 1024; - auto constexpr kStride = 2 * kSize; - auto constexpr kBlockCount = 4; - auto const block = MakeRandomData(kSize); - - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - for (int i = 0; i != kBlockCount; ++i) { - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); - } - - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); + auto const block2 = MakeRandomData(kBlockSize); + + auto async = MakeAsyncClient(project_id); + // auto w = async.StartBufferedUpload(BucketName(bucket_name), object_name) + // .get(); + // ASSERT_STATUS_OK(w); + // auto w = async.StartAppendableObjectUpload(BucketName(bucket_name), object_name) + // .get(); + // ASSERT_STATUS_OK(w); + + // AsyncWriter writer; + // AsyncToken token; + // std::tie(writer, token) = *std::move(w); + // for (int i = 0; i < kBlockCount; ++i) { + // std::cout << "Writing data iteration #" << i << std::endl; + // auto p = writer.Write(std::move(token), WritePayload(block)).get(); + // ASSERT_STATUS_OK(p); + // token = *std::move(p); + // } + + // auto metadata1 = writer.Finalize(std::move(token)).get(); + // ASSERT_STATUS_OK(metadata1); + // std::cout << "Request metadata: " << metadata1->generation() << std::endl; + EXPECT_EQ(1,2); + + // auto close = writer.Close(); + + // auto object_metadata = client.GetObjectMetadata(bucket_name, object_name); + // auto m = *object_metadata; + // auto generation = m.generation(); + + // auto w1 = async.ResumeAppendableObjectUpload(BucketName(bucket_name), object_name, generation) + // .get(); + + // ASSERT_STATUS_OK(w1); + + // AsyncWriter writer1; + // AsyncToken token1; + // std::tie(writer1, token1) = *std::move(w1); + + // for (int i = 0; i < kBlockCount; ++i) { + // // std::cout << "Writing data iteration #" << i << std::endl; + // auto p = writer1.Write(std::move(token1), WritePayload(block)).get(); + // ASSERT_STATUS_OK(p); + // token1 = *std::move(p); + // } + + // auto object_metadata1 = client.GetObjectMetadata(bucket_name, object_name); + // auto m1 = *object_metadata1; + // // auto generation1 = m1.generation(); + // std::cout << "Object metadata1: " << m << std::endl; + + // auto metadata = writer1.Finalize(std::move(token1))..get(); + // ASSERT_STATUS_OK(metadata); + // // // ScheduleForDelete(*metadata); + + // EXPECT_EQ(metadata->bucket(), BucketName(bucket_name).FullName()); + // EXPECT_EQ(metadata->name()," object_name"); + // EXPECT_EQ(metadata->size(), kBlockCount * kBlockSize); + // EXPECT_EQ("dddd", "Sdfs"); + // std::cout << "Test completed successfully" << std::endl; + // client.DeleteObject(bucket_name, object_name); auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); + // std::cout << object_metadata->bucket() << "\n"; - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0 * kStride, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); + spec.set_bucket("projects/_/buckets/gcs-grpc-team-fastbyte-bajajnehaa-test-us-west4"); + spec.set_object(object_name); + auto descriptor_status = async.Open(spec).get(); + ASSERT_STATUS_OK(descriptor_status); + ObjectDescriptor descriptor = *std::move(descriptor_status); + auto descriptor_ptr = + std::make_shared(std::move(descriptor)); + // std::shared_ptr descriptor_ptr = std::make_shared(*descriptor); + + // --- Start of ThreadPool implementation --- + + // 1. Initialize the ThreadPool + // Use hardware_concurrency to get a reasonable number of threads + size_t num_threads = std::thread::hardware_concurrency(); + std::cout << "Starting ThreadPool with " << num_threads << " threads." << std::endl; + ThreadPool pool(num_threads); + + // 2. Define read parameters and storage for futures + std::vector> futures; + int num_reads = 1; + std::int64_t read_offset = 0; + std::int64_t read_limit = 1024 * 1024 * 200; // 1 GiB + + std::cout << "Enqueuing " << num_reads << " read tasks..." << std::endl; + + // 3. Enqueue all the read tasks + // The original loop is replaced with this loop. + for (int i = 0; i < num_reads; ++i) { + // Pass *descriptor (the ObjectDescriptor) by value. + // This is safe because it's a copyable wrapper. + futures.push_back( + pool.enqueue(ReadRangeTask, descriptor_ptr, read_offset, read_limit) + ); } - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); -} - -TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { - GTEST_SKIP(); - auto async = AsyncClient( - TestOptions().set(1024)); - auto client = MakeIntegrationTestClient(true, TestOptions()); - auto object_name = MakeRandomObjectName(); - - auto create = client.CreateBucket( - bucket_name(), storage::BucketMetadata{}.set_location("us-west4")); - if (!create && create.status().code() != StatusCode::kAlreadyExists) { - GTEST_FAIL() << "cannot create bucket: " << create.status(); + // 4. Wait for all enqueued tasks to complete + std::cout << "Waiting for all " << futures.size() << " read tasks to complete..." << std::endl; + for (auto& f : futures) { + f.get(); // This blocks until the future is ready. + // If a task failed (e.g., via ASSERT_STATUS_OK), + // gtest will have already flagged the failure. + // If the task threw an exception, get() will re-throw it. } - auto constexpr kSize = 2048; - auto const block = MakeRandomData(kSize); + std::cout << "All " << num_reads << " parallel read tasks completed." << std::endl; + + // --- End of ThreadPool implementation --- - auto w = - async.StartAppendableObjectUpload(BucketName(bucket_name()), object_name) - .get(); - ASSERT_STATUS_OK(w); - AsyncWriter writer; - AsyncToken token; - std::tie(writer, token) = *std::move(w); - auto p = writer.Write(std::move(token), WritePayload(block)).get(); - ASSERT_STATUS_OK(p); - token = *std::move(p); + // auto actual0 = std::string{}; + // for(int i =0 ; i< 1000 ; i++){ + // std::tie(r0, t0) = descriptor->Read(0 , 1024* 1024* 1024); + // actual0 = std::string{}; + // while (t0.valid()) { + // auto read = r0.Read(std::move(t0)).get(); + // ASSERT_STATUS_OK(read); + // ReadPayload p; + // AsyncToken t; + // std::tie(p, t) = *std::move(read); + // t0 = std::move(t); + // } + // } - auto metadata = writer.Finalize(std::move(token)).get(); - ASSERT_STATUS_OK(metadata); - - auto spec = google::storage::v2::BidiReadObjectSpec{}; - spec.set_bucket(BucketName(bucket_name()).FullName()); - spec.set_object(object_name); - auto descriptor = async.Open(spec).get(); - ASSERT_STATUS_OK(descriptor); - - AsyncReader r0; - AsyncToken t0; - auto actual0 = std::string{}; - std::tie(r0, t0) = descriptor->Read(0, kSize); - while (t0.valid()) { - auto read = r0.Read(std::move(t0)).get(); - ASSERT_STATUS_OK(read); - ReadPayload p; - AsyncToken t; - std::tie(p, t) = *std::move(read); - for (auto sv : p.contents()) actual0 += std::string(sv); - t0 = std::move(t); - } - - EXPECT_EQ(actual0.size(), kSize); - client.DeleteObject(bucket_name(), object_name, - storage::Generation(metadata->generation())); + auto ans = block + block + block; + // EXPECT_EQ(1,2); } + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_experimental } // namespace cloud } // namespace google -#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC +#endif // GOOGLE_CLOUD_CPP_STORAGE_HAVE_GRPC \ No newline at end of file