diff --git a/google/cloud/storage/async/object_descriptor.cc b/google/cloud/storage/async/object_descriptor.cc index dd197e0109419..90877d414f9f6 100644 --- a/google/cloud/storage/async/object_descriptor.cc +++ b/google/cloud/storage/async/object_descriptor.cc @@ -27,14 +27,11 @@ absl::optional ObjectDescriptor::metadata() const { std::pair ObjectDescriptor::Read(std::int64_t offset, std::int64_t limit) { - // TODO(15340): This change is causing performance regression. We need to - // revisit it after benchmarking our code. - - // std::int64_t max_range = - // impl_->options().get(); - // if (limit > max_range) { - // impl_->MakeSubsequentStream(); - // } + std::int64_t max_range = + impl_->options().get(); + if (limit > max_range) { + impl_->MakeSubsequentStream(); + } auto reader = impl_->Read({offset, limit}); auto token = storage_internal::MakeAsyncToken(reader.get()); return {AsyncReader(std::move(reader)), std::move(token)}; diff --git a/google/cloud/storage/async/object_descriptor_test.cc b/google/cloud/storage/async/object_descriptor_test.cc index f1541fab05363..0851439821e7c 100644 --- a/google/cloud/storage/async/object_descriptor_test.cc +++ b/google/cloud/storage/async/object_descriptor_test.cc @@ -149,7 +149,6 @@ TEST(ObjectDescriptor, ReadLast) { } TEST(ObjectDescriptor, ReadExceedsMaxRange) { - GTEST_SKIP(); auto mock = std::make_shared(); auto constexpr kMaxRange = 1024; EXPECT_CALL(*mock, options) diff --git a/google/cloud/storage/google_cloud_cpp_storage_grpc.bzl b/google/cloud/storage/google_cloud_cpp_storage_grpc.bzl index 7644ce88dea77..4113b1e4cab04 100644 --- a/google/cloud/storage/google_cloud_cpp_storage_grpc.bzl +++ b/google/cloud/storage/google_cloud_cpp_storage_grpc.bzl @@ -43,6 +43,7 @@ google_cloud_cpp_storage_grpc_hdrs = [ "internal/async/default_options.h", "internal/async/handle_redirect_error.h", "internal/async/insert_object.h", + "internal/async/multi_stream_manager.h", "internal/async/object_descriptor_connection_tracing.h", "internal/async/object_descriptor_impl.h", "internal/async/object_descriptor_reader.h", @@ -120,6 +121,7 @@ google_cloud_cpp_storage_grpc_srcs = [ "internal/async/default_options.cc", "internal/async/handle_redirect_error.cc", "internal/async/insert_object.cc", + "internal/async/multi_stream_manager.cc", "internal/async/object_descriptor_connection_tracing.cc", "internal/async/object_descriptor_impl.cc", "internal/async/object_descriptor_reader.cc", diff --git a/google/cloud/storage/google_cloud_cpp_storage_grpc.cmake b/google/cloud/storage/google_cloud_cpp_storage_grpc.cmake index dfac5b7a8ab75..8349ed1889b8f 100644 --- a/google/cloud/storage/google_cloud_cpp_storage_grpc.cmake +++ b/google/cloud/storage/google_cloud_cpp_storage_grpc.cmake @@ -110,6 +110,8 @@ add_library( internal/async/handle_redirect_error.h internal/async/insert_object.cc internal/async/insert_object.h + internal/async/multi_stream_manager.cc + internal/async/multi_stream_manager.h internal/async/object_descriptor_connection_tracing.cc internal/async/object_descriptor_connection_tracing.h internal/async/object_descriptor_impl.cc @@ -442,6 +444,7 @@ set(storage_client_grpc_unit_tests internal/async/default_options_test.cc internal/async/handle_redirect_error_test.cc internal/async/insert_object_test.cc + internal/async/multi_stream_manager_test.cc internal/async/object_descriptor_connection_tracing_test.cc internal/async/object_descriptor_impl_test.cc internal/async/object_descriptor_reader_test.cc diff --git a/google/cloud/storage/internal/async/connection_impl_open_test.cc b/google/cloud/storage/internal/async/connection_impl_open_test.cc index 6d72d6f40a6ea..c1902d8ba92e7 100644 --- a/google/cloud/storage/internal/async/connection_impl_open_test.cc +++ b/google/cloud/storage/internal/async/connection_impl_open_test.cc @@ -44,6 +44,8 @@ using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::MockCompletionQueueImpl; using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; +using ::testing::InvokeWithoutArgs; +using ::testing::NiceMock; using ::testing::NotNull; using ::testing::Optional; @@ -183,10 +185,29 @@ TEST(AsyncConnectionImplTest, OpenSimple) { [](auto) { return Status{}; }); }); + return std::unique_ptr(std::move(stream)); + }) + .WillRepeatedly([](CompletionQueue const&, + std::shared_ptr const&, + google::cloud::internal::ImmutableOptions const&) { + auto stream = std::make_unique>(); + ON_CALL(*stream, Start).WillByDefault(InvokeWithoutArgs([] { + return make_ready_future(false); + })); + ON_CALL(*stream, Finish).WillByDefault(InvokeWithoutArgs([] { + return make_ready_future(Status{}); + })); + ON_CALL(*stream, Cancel).WillByDefault([] {}); return std::unique_ptr(std::move(stream)); }); auto mock_cq = std::make_shared(); + EXPECT_CALL(*mock_cq, MakeRelativeTimer) + .WillRepeatedly([](std::chrono::nanoseconds) { + return make_ready_future( + StatusOr( + std::chrono::system_clock::now())); + }); auto connection = std::make_shared( CompletionQueue(mock_cq), std::shared_ptr(), mock, TestOptions()); diff --git a/google/cloud/storage/internal/async/multi_stream_manager.cc b/google/cloud/storage/internal/async/multi_stream_manager.cc new file mode 100644 index 0000000000000..2425c990a7712 --- /dev/null +++ b/google/cloud/storage/internal/async/multi_stream_manager.cc @@ -0,0 +1,29 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/storage/internal/async/multi_stream_manager.h" +#include "google/cloud/storage/internal/async/object_descriptor_impl.h" + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +// Explicit instantiation for ObjectDescriptorImpl usage. +template class MultiStreamManager; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/storage/internal/async/multi_stream_manager.h b/google/cloud/storage/internal/async/multi_stream_manager.h new file mode 100644 index 0000000000000..c8b213611963d --- /dev/null +++ b/google/cloud/storage/internal/async/multi_stream_manager.h @@ -0,0 +1,175 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H + +#include "google/cloud/status.h" +#include "google/cloud/version.h" +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +// Defines the interface contract that any stream type (e.g., ReadStream, +// WriteStream) managed by MultiStreamManager must implement. This explicit base +// class ensures we have a clear, enforceable interface for operations like +// CancelAll(). +class StreamBase { + public: + virtual ~StreamBase() = default; + virtual void Cancel() = 0; +}; + +// Manages a collection of streams. +// +// This class implements the "Subsequent Stream" logic where idle streams +// are moved to the front of the queue for reuse. +// +// THREAD SAFETY: +// This class is NOT thread-safe. The owner (e.g. ObjectDescriptorImpl +// or AsyncWriterImpl etc) must serialize access, typically by holding +// an external mutex while calling these methods. +// +// EXAMPLE USAGE: +// class MyOwner { +// std::mutex mu_; +// MultiStreamManager manager_; +// +// void StartRead() { +// std::unique_lock lk(mu_); +// auto it = manager_.GetLeastBusyStream(); +// } +// }; +template +class MultiStreamManager { + public: + struct Stream { + std::shared_ptr stream; + std::unordered_map> active_ranges; + }; + + using StreamIterator = typename std::list::iterator; + using StreamFactory = std::function()>; + + // Constructor creates the first stream using the factory immediately. + explicit MultiStreamManager(StreamFactory stream_factory) + : stream_factory_(std::move(stream_factory)) { + streams_.push_back(Stream{stream_factory_(), {}}); + } + + // Constructor accepts an already-created initial stream. + // This is required by ObjectDescriptorImpl which receives an OpenStream. + MultiStreamManager(StreamFactory stream_factory, + std::shared_ptr initial_stream) + : stream_factory_(std::move(stream_factory)) { + streams_.push_back(Stream{std::move(initial_stream), {}}); + } + + StreamIterator GetFirstStream() { + if (streams_.empty()) return streams_.end(); + return streams_.begin(); + } + + StreamIterator GetLeastBusyStream() { + if (streams_.empty()) return streams_.end(); + auto least_busy_it = streams_.begin(); + // Track min_ranges to avoid calling .size() repeatedly if possible, + // though for std::unordered_map .size() is O(1). + std::size_t min_ranges = least_busy_it->active_ranges.size(); + if (min_ranges == 0) return least_busy_it; + + // Start checking from the second element + for (auto it = std::next(streams_.begin()); it != streams_.end(); ++it) { + // Strict less-than ensures stability (preferring older streams if tied) + auto size = it->active_ranges.size(); + if (size < min_ranges) { + least_busy_it = it; + min_ranges = size; + if (min_ranges == 0) return least_busy_it; + } + } + return least_busy_it; + } + + StreamIterator AddStream(std::shared_ptr stream) { + streams_.push_front(Stream{std::move(stream), {}}); + return streams_.begin(); + } + + void CancelAll() { + for (auto& s : streams_) { + if (s.stream) s.stream->Cancel(); + } + } + + void RemoveStreamAndNotifyRanges(StreamIterator it, Status const& status) { + auto ranges = std::move(it->active_ranges); + streams_.erase(it); + for (auto const& kv : ranges) { + kv.second->OnFinish(status); + } + } + + void MoveActiveRanges(StreamIterator from, StreamIterator to) { + to->active_ranges = std::move(from->active_ranges); + } + + void CleanupDoneRanges(StreamIterator it) { + auto& active_ranges = it->active_ranges; + for (auto i = active_ranges.begin(); i != active_ranges.end();) { + if (i->second->IsDone()) { + i = active_ranges.erase(i); + } else { + ++i; + } + } + } + + template + bool ReuseIdleStreamToFront(Pred pred) { + for (auto it = streams_.begin(); it != streams_.end(); ++it) { + if (!pred(*it)) continue; + + // If the idle stream is already at the front, we don't + // need to move it. Otherwise splice to the front in O(1). + if (it != streams_.begin()) { + streams_.splice(streams_.begin(), streams_, it); + } + return true; + } + return false; + } + + bool Empty() const { return streams_.empty(); } + StreamIterator End() { return streams_.end(); } + std::size_t Size() const { return streams_.size(); } + + private: + std::list streams_; + StreamFactory stream_factory_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_INTERNAL_ASYNC_MULTI_STREAM_MANAGER_H diff --git a/google/cloud/storage/internal/async/multi_stream_manager_test.cc b/google/cloud/storage/internal/async/multi_stream_manager_test.cc new file mode 100644 index 0000000000000..482390f97fd0e --- /dev/null +++ b/google/cloud/storage/internal/async/multi_stream_manager_test.cc @@ -0,0 +1,239 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/storage/internal/async/multi_stream_manager.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace storage_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +struct FakeRange { + bool done = false; + int finished = 0; + bool IsDone() const { return done; } + void OnFinish(Status const&) { ++finished; } +}; + +struct FakeStream : public StreamBase { + void Cancel() override { ++cancelled; } + int cancelled = 0; + bool write_pending = false; +}; + +using Manager = MultiStreamManager; + +struct MultiStreamManagerTest : public ::testing::Test { + static Manager MakeManager() { + return Manager([] { return std::make_shared(); }); + } +}; + +} // namespace + +TEST(MultiStreamManagerTest, ConstructsWithFactoryAndHasOneStream) { + auto mgr = MultiStreamManagerTest::MakeManager(); + EXPECT_FALSE(mgr.Empty()); + EXPECT_EQ(mgr.Size(), 1U); + auto it = mgr.GetFirstStream(); + ASSERT_TRUE(it->stream); +} + +TEST(MultiStreamManagerTest, ConstructsWithInitialStream) { + auto initial = std::make_shared(); + Manager mgr([] { return nullptr; }, initial); + EXPECT_EQ(mgr.Size(), 1U); + auto it = mgr.GetFirstStream(); + EXPECT_EQ(it->stream, initial); +} + +TEST(MultiStreamManagerTest, AddStreamAppendsAndGetFirstReturnsNew) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto s1 = std::make_shared(); + auto it1 = mgr.AddStream(s1); + EXPECT_EQ(mgr.Size(), 2U); + EXPECT_EQ(it1->stream.get(), s1.get()); + auto it_first = mgr.GetFirstStream(); + EXPECT_EQ(it_first->stream.get(), s1.get()); +} + +TEST(MultiStreamManagerTest, GetLeastBusyPrefersFewestActiveRanges) { + auto mgr = MultiStreamManagerTest::MakeManager(); + + // The manager starts with an initial stream (size 0). + // We must make it "busy" so it doesn't win the comparison against our test + // streams. + auto it_init = mgr.GetFirstStream(); + it_init->active_ranges.emplace(999, std::make_shared()); + it_init->active_ranges.emplace(998, std::make_shared()); + + auto s1 = std::make_shared(); + auto s2 = std::make_shared(); + auto it1 = mgr.AddStream(s1); + auto it2 = mgr.AddStream(s2); + + // s1 has 2 ranges. + it1->active_ranges.emplace(1, std::make_shared()); + it1->active_ranges.emplace(2, std::make_shared()); + + // s2 has 1 range. + it2->active_ranges.emplace(3, std::make_shared()); + + auto it_least = mgr.GetLeastBusyStream(); + + // Expect it2 (1 range) over it1 (2 ranges) and it_init (2 ranges). + EXPECT_EQ(it_least, it2); + EXPECT_EQ(it_least->active_ranges.size(), 1U); +} + +TEST(MultiStreamManagerTest, CleanupDoneRangesRemovesFinished) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto it = mgr.GetFirstStream(); + auto r1 = std::make_shared(); + r1->done = false; + auto r2 = std::make_shared(); + r2->done = true; + auto r3 = std::make_shared(); + r3->done = true; + it->active_ranges.emplace(1, r1); + it->active_ranges.emplace(2, r2); + it->active_ranges.emplace(3, r3); + mgr.CleanupDoneRanges(it); + EXPECT_EQ(it->active_ranges.size(), 1U); + EXPECT_TRUE(it->active_ranges.count(1)); +} + +TEST(MultiStreamManagerTest, RemoveStreamAndNotifyRangesCallsOnFinish) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto it = mgr.GetFirstStream(); + auto r1 = std::make_shared(); + auto r2 = std::make_shared(); + it->active_ranges.emplace(11, r1); + it->active_ranges.emplace(22, r2); + mgr.RemoveStreamAndNotifyRanges(it, Status()); // OK status + EXPECT_EQ(mgr.Size(), 0U); + EXPECT_EQ(r1->finished, 1); + EXPECT_EQ(r2->finished, 1); +} + +TEST(MultiStreamManagerTest, CancelAllInvokesCancel) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto s1 = std::make_shared(); + auto s2 = std::make_shared(); + mgr.AddStream(s1); + mgr.AddStream(s2); + mgr.CancelAll(); + EXPECT_EQ(s1->cancelled, 1); + EXPECT_EQ(s2->cancelled, 1); +} + +TEST(MultiStreamManagerTest, ReuseIdleStreamToFrontMovesElement) { + auto mgr = MultiStreamManagerTest::MakeManager(); + // Capture the factory-created stream pointer (initial element) + auto* factory_ptr = mgr.GetFirstStream()->stream.get(); + auto s1 = std::make_shared(); + mgr.AddStream(s1); + bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) { + auto* fs = s.stream.get(); + return fs != nullptr && s.active_ranges.empty() && !fs->write_pending; + }); + EXPECT_TRUE(moved); + auto it_first = mgr.GetFirstStream(); + // After move, the s1 stream should be first + EXPECT_EQ(it_first->stream.get(), s1.get()); + EXPECT_NE(it_first->stream.get(), factory_ptr); +} + +TEST(MultiStreamManagerTest, + ReuseIdleStreamAlreadyAtFrontReturnsTrueWithoutMove) { + auto mgr = MultiStreamManagerTest::MakeManager(); + // The manager starts with one stream. It is the first stream, and it is idle. + auto initial_first = mgr.GetFirstStream(); + bool reused = mgr.ReuseIdleStreamToFront( + [](Manager::Stream const& s) { return s.active_ranges.empty(); }); + EXPECT_TRUE(reused); + // Pointer should remain the same (it was already at the front) + EXPECT_EQ(mgr.GetFirstStream(), initial_first); +} + +TEST(MultiStreamManagerTest, ReuseIdleStreamDoesNotMoveWhenWritePending) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto* factory_ptr = mgr.GetFirstStream()->stream.get(); + // Mark factory stream as not reusable + mgr.GetFirstStream()->stream->write_pending = true; + auto s1 = std::make_shared(); + s1->write_pending = true; // also mark appended stream as not reusable + mgr.AddStream(s1); + bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) { + auto* fs = s.stream.get(); + return fs != nullptr && s.active_ranges.empty() && !fs->write_pending; + }); + EXPECT_FALSE(moved); + auto it_first = mgr.GetFirstStream(); + EXPECT_EQ(it_first->stream.get(), s1.get()); + EXPECT_NE(it_first->stream.get(), factory_ptr); +} + +TEST(MultiStreamManagerTest, MoveActiveRangesTransfersAllEntries) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto s1 = std::make_shared(); + auto s2 = std::make_shared(); + auto it1 = mgr.AddStream(s1); + auto it2 = mgr.AddStream(s2); + it1->active_ranges.emplace(101, std::make_shared()); + it1->active_ranges.emplace(202, std::make_shared()); + ASSERT_EQ(it1->active_ranges.size(), 2U); + ASSERT_TRUE(it2->active_ranges.empty()); + mgr.MoveActiveRanges(it1, it2); + EXPECT_TRUE(it1->active_ranges.empty()); + EXPECT_EQ(it2->active_ranges.size(), 2U); + EXPECT_TRUE(it2->active_ranges.count(101)); + EXPECT_TRUE(it2->active_ranges.count(202)); +} + +TEST(MultiStreamManagerTest, GetFirstStreamReflectsFrontReuse) { + auto mgr = MultiStreamManagerTest::MakeManager(); + auto s1 = std::make_shared(); + mgr.AddStream(s1); + EXPECT_EQ(mgr.GetFirstStream()->stream.get(), s1.get()); + bool moved = mgr.ReuseIdleStreamToFront([](Manager::Stream const& s) { + return s.stream != nullptr && s.active_ranges.empty(); + }); + EXPECT_TRUE(moved); + auto it_first = mgr.GetFirstStream(); + EXPECT_EQ(it_first->stream.get(), s1.get()); +} + +TEST(MultiStreamManagerTest, EmptyAndSizeTransitions) { + auto mgr = MultiStreamManagerTest::MakeManager(); + EXPECT_FALSE(mgr.Empty()); + EXPECT_EQ(mgr.Size(), 1U); + auto it = mgr.GetFirstStream(); + mgr.RemoveStreamAndNotifyRanges(it, Status()); + EXPECT_TRUE(mgr.Empty()); + EXPECT_EQ(mgr.Size(), 0U); + auto s = std::make_shared(); + mgr.AddStream(s); + EXPECT_FALSE(mgr.Empty()); + EXPECT_EQ(mgr.Size(), 1U); +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace storage_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.cc b/google/cloud/storage/internal/async/object_descriptor_impl.cc index cebca0a2bbfc5..f3651cc390f7c 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl.cc @@ -15,6 +15,7 @@ #include "google/cloud/storage/internal/async/object_descriptor_impl.h" #include "google/cloud/storage/async/options.h" #include "google/cloud/storage/internal/async/handle_redirect_error.h" +#include "google/cloud/storage/internal/async/multi_stream_manager.h" #include "google/cloud/storage/internal/async/object_descriptor_reader_tracing.h" #include "google/cloud/storage/internal/hash_function.h" #include "google/cloud/storage/internal/hash_function_impl.h" @@ -38,25 +39,31 @@ ObjectDescriptorImpl::ObjectDescriptorImpl( make_stream_(std::move(make_stream)), read_object_spec_(std::move(read_object_spec)), options_(std::move(options)) { - streams_.push_back( - Stream{std::move(stream), {}, resume_policy_prototype_->clone()}); + stream_manager_ = std::make_unique( + []() -> std::shared_ptr { return nullptr; }, // NOLINT + std::make_shared(std::move(stream), + resume_policy_prototype_->clone())); } -ObjectDescriptorImpl::~ObjectDescriptorImpl() { - for (auto const& stream : streams_) { - stream.stream->Cancel(); - } -} +ObjectDescriptorImpl::~ObjectDescriptorImpl() { Cancel(); } void ObjectDescriptorImpl::Start( google::storage::v2::BidiReadObjectResponse first_response) { - OnRead(std::move(first_response)); + std::unique_lock lk(mu_); + auto it = stream_manager_->GetFirstStream(); + if (it == stream_manager_->End()) return; + lk.unlock(); + OnRead(it, std::move(first_response)); + // Acquire lock and queue the background stream. + lk.lock(); + AssurePendingStreamQueued(lk); } void ObjectDescriptorImpl::Cancel() { - for (auto const& stream : streams_) { - stream.stream->Cancel(); - } + std::unique_lock lk(mu_); + cancelled_ = true; + if (stream_manager_) stream_manager_->CancelAll(); + if (pending_stream_.valid()) pending_stream_.cancel(); } absl::optional ObjectDescriptorImpl::metadata() @@ -65,17 +72,59 @@ absl::optional ObjectDescriptorImpl::metadata() return metadata_; } -void ObjectDescriptorImpl::MakeSubsequentStream() { +void ObjectDescriptorImpl::AssurePendingStreamQueued( + std::unique_lock const&) { + if (pending_stream_.valid()) return; auto request = google::storage::v2::BidiReadObjectRequest{}; *request.mutable_read_object_spec() = read_object_spec_; - auto stream_result = make_stream_(std::move(request)).get(); + pending_stream_ = make_stream_(std::move(request)); +} +void ObjectDescriptorImpl::MakeSubsequentStream() { std::unique_lock lk(mu_); - streams_.push_back(Stream{ - std::move(stream_result->stream), {}, resume_policy_prototype_->clone()}); + // Reuse an idle stream if possible. + if (stream_manager_->ReuseIdleStreamToFront( + [](StreamManager::Stream const& s) { + auto const* rs = s.stream.get(); + return rs != nullptr && s.active_ranges.empty() && + !rs->write_pending; + })) { + return; + } + // Proactively create a new stream if needed. + AssurePendingStreamQueued(lk); + if (!pending_stream_.valid()) return; + auto stream_future = std::move(pending_stream_); lk.unlock(); - OnRead(std::move(stream_result->first_response)); + + // Use .then() to retrieves the result without blocking. + stream_future.then([w = WeakFromThis()](auto f) { + auto self = w.lock(); + if (!self) return; + + auto stream_result = f.get(); + if (!stream_result) { + // Stream creation failed. + // The next call to AssurePendingStreamQueued will retry creation. + return; + } + + std::unique_lock lk(self->mu_); + if (self->cancelled_) return; + + auto read_stream = + std::make_shared(std::move(stream_result->stream), + self->resume_policy_prototype_->clone()); + + auto new_it = self->stream_manager_->AddStream(std::move(read_stream)); + + // Now that we consumed pending_stream_, queue the next one immediately. + self->AssurePendingStreamQueued(lk); + + lk.unlock(); + self->OnRead(new_it, std::move(stream_result->first_response)); + }); } std::unique_ptr @@ -91,13 +140,25 @@ ObjectDescriptorImpl::Read(ReadParams p) { auto range = std::make_shared(p.start, p.length, hash_function); std::unique_lock lk(mu_); + if (stream_manager_->Empty()) { + lk.unlock(); + range->OnFinish(Status(StatusCode::kFailedPrecondition, + "Cannot read object, all streams failed")); + if (!internal::TracingEnabled(options_)) { + return std::unique_ptr( + std::make_unique(std::move(range))); + } + return MakeTracingObjectDescriptorReader(std::move(range)); + } + + auto it = stream_manager_->GetLeastBusyStream(); auto const id = ++read_id_generator_; - streams_.back().active_ranges.emplace(id, range); - auto& read_range = *next_request_.add_read_ranges(); + it->active_ranges.emplace(id, range); + auto& read_range = *it->stream->next_request.add_read_ranges(); read_range.set_read_id(id); read_range.set_read_offset(p.start); read_range.set_read_length(p.length); - Flush(std::move(lk)); + Flush(std::move(lk), it); if (!internal::TracingEnabled(options_)) { return std::unique_ptr( @@ -107,44 +168,56 @@ ObjectDescriptorImpl::Read(ReadParams p) { return MakeTracingObjectDescriptorReader(std::move(range)); } -void ObjectDescriptorImpl::Flush(std::unique_lock lk) { - if (streams_.back().write_pending || next_request_.read_ranges().empty()) { +void ObjectDescriptorImpl::Flush(std::unique_lock lk, + StreamIterator it) { + if (it->stream->write_pending || + it->stream->next_request.read_ranges().empty()) { return; } - streams_.back().write_pending = true; + it->stream->write_pending = true; google::storage::v2::BidiReadObjectRequest request; - request.Swap(&next_request_); + request.Swap(&it->stream->next_request); // Assign CurrentStream to a temporary variable to prevent // lifetime extension which can cause the lock to be held until the // end of the block. - auto current_stream = CurrentStream(std::move(lk)); - current_stream->Write(std::move(request)).then([w = WeakFromThis()](auto f) { - if (auto self = w.lock()) self->OnWrite(f.get()); - }); + auto current_stream = it->stream->stream; + lk.unlock(); + current_stream->Write(std::move(request)) + .then([w = WeakFromThis(), it](auto f) { + if (auto self = w.lock()) self->OnWrite(it, f.get()); + }); } -void ObjectDescriptorImpl::OnWrite(bool ok) { +void ObjectDescriptorImpl::OnWrite(StreamIterator it, bool ok) { std::unique_lock lk(mu_); - if (!ok) return DoFinish(std::move(lk)); - streams_.back().write_pending = false; - Flush(std::move(lk)); + if (!ok) return DoFinish(std::move(lk), it); + it->stream->write_pending = false; + Flush(std::move(lk), it); } -void ObjectDescriptorImpl::DoRead(std::unique_lock lk) { +void ObjectDescriptorImpl::DoRead(std::unique_lock lk, + StreamIterator it) { + if (it->stream->read_pending) return; + it->stream->read_pending = true; + // Assign CurrentStream to a temporary variable to prevent // lifetime extension which can cause the lock to be held until the // end of the block. - auto current_stream = CurrentStream(std::move(lk)); - current_stream->Read().then([w = WeakFromThis()](auto f) { - if (auto self = w.lock()) self->OnRead(f.get()); + auto current_stream = it->stream->stream; + lk.unlock(); + current_stream->Read().then([w = WeakFromThis(), it](auto f) { + if (auto self = w.lock()) self->OnRead(it, f.get()); }); } void ObjectDescriptorImpl::OnRead( + StreamIterator it, absl::optional response) { std::unique_lock lk(mu_); - if (!response) return DoFinish(std::move(lk)); + it->stream->read_pending = false; + + if (!response) return DoFinish(std::move(lk), it); if (response->has_metadata()) { metadata_ = std::move(*response->mutable_metadata()); } @@ -152,7 +225,7 @@ void ObjectDescriptorImpl::OnRead( *read_object_spec_.mutable_read_handle() = std::move(*response->mutable_read_handle()); } - auto copy = CopyActiveRanges(lk); + auto copy = it->active_ranges; // Release the lock while notifying the ranges. The notifications may trigger // application code, and that code may callback on this class. lk.unlock(); @@ -165,91 +238,97 @@ void ObjectDescriptorImpl::OnRead( l->second->OnRead(std::move(range_data)); } lk.lock(); - CleanupDoneRanges(lk); - DoRead(std::move(lk)); -} - -void ObjectDescriptorImpl::CleanupDoneRanges( - std::unique_lock const&) { - if (streams_.empty()) return; - auto& active_ranges = streams_.back().active_ranges; - for (auto i = active_ranges.begin(); i != active_ranges.end();) { - if (i->second->IsDone()) { - i = active_ranges.erase(i); - } else { - ++i; - } - } + stream_manager_->CleanupDoneRanges(it); + DoRead(std::move(lk), it); } -void ObjectDescriptorImpl::DoFinish(std::unique_lock lk) { +void ObjectDescriptorImpl::DoFinish(std::unique_lock lk, + StreamIterator it) { + it->stream->read_pending = false; // Assign CurrentStream to a temporary variable to prevent // lifetime extension which can cause the lock to be held until the // end of the block. - auto current_stream = CurrentStream(std::move(lk)); + auto current_stream = it->stream->stream; + lk.unlock(); auto pending = current_stream->Finish(); if (!pending.valid()) return; - pending.then([w = WeakFromThis()](auto f) { - if (auto self = w.lock()) self->OnFinish(f.get()); + pending.then([w = WeakFromThis(), it](auto f) { + if (auto self = w.lock()) self->OnFinish(it, f.get()); }); } -void ObjectDescriptorImpl::OnFinish(Status const& status) { +void ObjectDescriptorImpl::OnFinish(StreamIterator it, Status const& status) { auto proto_status = ExtractGrpcStatus(status); - if (IsResumable(status, proto_status)) return Resume(proto_status); + if (IsResumable(it, status, proto_status)) return Resume(it, proto_status); std::unique_lock lk(mu_); - auto copy = CopyActiveRanges(std::move(lk)); - for (auto const& kv : copy) { - kv.second->OnFinish(status); - } + stream_manager_->RemoveStreamAndNotifyRanges(it, status); + // Since a stream died, we might want to ensure a replacement is queued. + AssurePendingStreamQueued(lk); } -void ObjectDescriptorImpl::Resume(google::rpc::Status const& proto_status) { +void ObjectDescriptorImpl::Resume(StreamIterator it, + google::rpc::Status const& proto_status) { std::unique_lock lk(mu_); // This call needs to happen inside the lock, as it may modify // `read_object_spec_`. ApplyRedirectErrors(read_object_spec_, proto_status); auto request = google::storage::v2::BidiReadObjectRequest{}; *request.mutable_read_object_spec() = read_object_spec_; - for (auto const& kv : streams_.back().active_ranges) { + for (auto const& kv : it->active_ranges) { auto range = kv.second->RangeForResume(kv.first); if (!range) continue; *request.add_read_ranges() = *std::move(range); } - streams_.back().write_pending = true; lk.unlock(); - make_stream_(std::move(request)).then([w = WeakFromThis()](auto f) { - if (auto self = w.lock()) self->OnResume(f.get()); + make_stream_(std::move(request)).then([w = WeakFromThis(), it](auto f) { + if (auto self = w.lock()) self->OnResume(it, f.get()); }); } -void ObjectDescriptorImpl::OnResume(StatusOr result) { - if (!result) return OnFinish(std::move(result).status()); +void ObjectDescriptorImpl::OnResume(StreamIterator it, + StatusOr result) { + if (!result) return OnFinish(it, std::move(result).status()); std::unique_lock lk(mu_); - streams_.push_back( - Stream{std::move(result->stream), {}, resume_policy_prototype_->clone()}); + if (cancelled_) return; + + it->stream = std::make_shared(std::move(result->stream), + resume_policy_prototype_->clone()); + it->stream->write_pending = false; + it->stream->read_pending = false; + // TODO(#15105) - this should be done without release the lock. - Flush(std::move(lk)); - OnRead(std::move(result->first_response)); + Flush(std::move(lk), it); + OnRead(it, std::move(result->first_response)); } bool ObjectDescriptorImpl::IsResumable( - Status const& status, google::rpc::Status const& proto_status) { + StreamIterator it, Status const& status, + google::rpc::Status const& proto_status) { + std::unique_lock lk(mu_); for (auto const& any : proto_status.details()) { auto error = google::storage::v2::BidiReadObjectError{}; if (!any.UnpackTo(&error)) continue; - for (auto const& range : CopyActiveRanges()) { - for (auto const& range_error : error.read_range_errors()) { - if (range.first != range_error.read_id()) continue; - range.second->OnFinish(MakeStatusFromRpcError(range_error.status())); + + std::vector> notify; + for (auto const& re : error.read_range_errors()) { + if (it->active_ranges.count(re.read_id()) != 0) { + notify.emplace_back(re.read_id(), MakeStatusFromRpcError(re.status())); } } - CleanupDoneRanges(std::unique_lock(mu_)); + if (notify.empty()) continue; + + auto copy = it->active_ranges; + lk.unlock(); + for (auto const& p : notify) { + auto l = copy.find(p.first); + if (l != copy.end()) l->second->OnFinish(p.second); + } + lk.lock(); + stream_manager_->CleanupDoneRanges(it); return true; } - std::unique_lock lk(mu_); - return streams_.back().resume_policy->OnFinish(status) == + return it->stream->resume_policy->OnFinish(status) == storage_experimental::ResumePolicy::kContinue; } diff --git a/google/cloud/storage/internal/async/object_descriptor_impl.h b/google/cloud/storage/internal/async/object_descriptor_impl.h index f4c467de5c42e..ff91e4c90f048 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl.h +++ b/google/cloud/storage/internal/async/object_descriptor_impl.h @@ -17,6 +17,7 @@ #include "google/cloud/storage/async/object_descriptor_connection.h" #include "google/cloud/storage/async/resume_policy.h" +#include "google/cloud/storage/internal/async/multi_stream_manager.h" #include "google/cloud/storage/internal/async/object_descriptor_reader.h" #include "google/cloud/storage/internal/async/open_stream.h" #include "google/cloud/storage/internal/async/read_range.h" @@ -35,17 +36,25 @@ namespace cloud { namespace storage_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +struct ReadStream : public storage_internal::StreamBase { + ReadStream(std::shared_ptr stream, + std::unique_ptr resume_policy) + : stream(std::move(stream)), resume_policy(std::move(resume_policy)) {} + + void Cancel() override { + if (stream) stream->Cancel(); + } + + std::shared_ptr stream; + std::unique_ptr resume_policy; + google::storage::v2::BidiReadObjectRequest next_request; + bool write_pending = false; + bool read_pending = false; +}; + class ObjectDescriptorImpl : public storage_experimental::ObjectDescriptorConnection, public std::enable_shared_from_this { - private: - struct Stream { - std::shared_ptr stream; - std::unordered_map> active_ranges; - std::unique_ptr resume_policy; - bool write_pending = false; - }; - public: ObjectDescriptorImpl( std::unique_ptr resume_policy, @@ -73,35 +82,29 @@ class ObjectDescriptorImpl void MakeSubsequentStream() override; private: + using StreamManager = MultiStreamManager; + using StreamIterator = + MultiStreamManager::StreamIterator; + std::weak_ptr WeakFromThis() { return shared_from_this(); } - // This may seem expensive, but it is less bug-prone than iterating over - // the map with the lock held. - auto CopyActiveRanges(std::unique_lock const&) const { - return streams_.back().active_ranges; - } - - auto CopyActiveRanges() const { - return CopyActiveRanges(std::unique_lock(mu_)); - } - - auto CurrentStream(std::unique_lock) const { - return streams_.back().stream; - } + // Logic to ensure a background stream is always connecting which must be + // invoked while holding `mu_`. + void AssurePendingStreamQueued(std::unique_lock const&); - void Flush(std::unique_lock lk); - void OnWrite(bool ok); - void DoRead(std::unique_lock); + void Flush(std::unique_lock lk, StreamIterator it); + void OnWrite(StreamIterator it, bool ok); + void DoRead(std::unique_lock lk, StreamIterator it); void OnRead( + StreamIterator it, absl::optional response); - void CleanupDoneRanges(std::unique_lock const&); - void DoFinish(std::unique_lock); - void OnFinish(Status const& status); - void Resume(google::rpc::Status const& proto_status); - void OnResume(StatusOr result); - bool IsResumable(Status const& status, + void DoFinish(std::unique_lock lk, StreamIterator it); + void OnFinish(StreamIterator it, Status const& status); + void Resume(StreamIterator it, google::rpc::Status const& proto_status); + void OnResume(StreamIterator it, StatusOr result); + bool IsResumable(StreamIterator it, Status const& status, google::rpc::Status const& proto_status); std::unique_ptr resume_policy_prototype_; @@ -111,10 +114,14 @@ class ObjectDescriptorImpl google::storage::v2::BidiReadObjectSpec read_object_spec_; absl::optional metadata_; std::int64_t read_id_generator_ = 0; - google::storage::v2::BidiReadObjectRequest next_request_; Options options_; - std::vector streams_; + std::unique_ptr stream_manager_; + // The future for the proactive background stream. + google::cloud::future< + google::cloud::StatusOr> + pending_stream_; + bool cancelled_ = false; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc index 595dd34231b6d..49eca7630a9ae 100644 --- a/google/cloud/storage/internal/async/object_descriptor_impl_test.cc +++ b/google/cloud/storage/internal/async/object_descriptor_impl_test.cc @@ -27,6 +27,7 @@ #include #include #include +#include namespace google { namespace cloud { @@ -41,10 +42,13 @@ using ::google::cloud::testing_util::IsOk; using ::google::cloud::testing_util::IsProtoEqual; using ::google::cloud::testing_util::StatusIs; using ::google::protobuf::TextFormat; +using ::testing::_; +using ::testing::AtMost; using ::testing::ElementsAre; using ::testing::NotNull; using ::testing::Optional; using ::testing::ResultOf; +using ::testing::Return; using ::testing::VariantWith; using Request = google::storage::v2::BidiReadObjectRequest; @@ -88,12 +92,12 @@ TEST(ObjectDescriptorImpl, LifecycleNoRead) { return sequencer.PushBack("Finish").then( [](auto) { return PermanentError(); }); }); - EXPECT_CALL(*stream, Cancel).WillOnce([&sequencer]() { - sequencer.PushBack("Cancel"); - }); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, @@ -117,9 +121,56 @@ TEST(ObjectDescriptorImpl, LifecycleNoRead) { next.first.set_value(true); tested.reset(); - next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Cancel"); - next.first.set_value(true); +} + +/// @test Verify that Cancel() is called if OnFinish() is delayed. +TEST(ObjectDescriptorImpl, LifecycleCancelRacesWithFinish) { + AsyncSequencer sequencer; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Read).WillOnce([&sequencer]() { + return sequencer.PushBack("Read[1]").then( + [](auto) { return absl::optional{}; }); + }); + EXPECT_CALL(*stream, Finish).WillOnce([&sequencer]() { + return sequencer.PushBack("Finish").then( + [](auto) { return PermanentError(); }); + }); + // This time we expect Cancel() because we will delay OnFinish(). + EXPECT_CALL(*stream, Cancel).Times(1); + + MockFactory factory; + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream))); + auto response = Response{}; + EXPECT_TRUE( + TextFormat::ParseFromString(kMetadataText, response.mutable_metadata())); + tested->Start(std::move(response)); + EXPECT_TRUE(tested->metadata().has_value()); + + auto expected_metadata = google::storage::v2::Object{}; + EXPECT_TRUE(TextFormat::ParseFromString(kMetadataText, &expected_metadata)); + EXPECT_THAT(tested->metadata(), Optional(IsProtoEqual(expected_metadata))); + + auto read1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read1.second, "Read[1]"); + read1.first.set_value(true); + + // This pops the future that OnFinish() waits for. + auto finish = sequencer.PopFrontWithName(); + EXPECT_EQ(finish.second, "Finish"); + + // Reset the descriptor *before* OnFinish() gets to run. This invokes the + // destructor, which calls Cancel() on any streams that have not yet been + // removed by OnFinish(). + tested.reset(); + + // Now allow OnFinish() to run. + finish.first.set_value(true); } /// @test Read a single stream and then close. @@ -174,10 +225,12 @@ TEST(ObjectDescriptorImpl, ReadSingleRange) { return sequencer.PushBack("Finish").then( [](auto) { return PermanentError(); }); }); - EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, @@ -292,10 +345,12 @@ TEST(ObjectDescriptorImpl, ReadMultipleRanges) { return sequencer.PushBack("Finish").then( [](auto) { return PermanentError(); }); }); - EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, @@ -440,10 +495,12 @@ TEST(ObjectDescriptorImpl, ReadSingleRangeManyMessages) { return sequencer.PushBack("Finish").then( [](auto) { return PermanentError(); }); }); - EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, @@ -554,10 +611,12 @@ TEST(ObjectDescriptorImpl, AllRangesFailOnUnrecoverableError) { return sequencer.PushBack("Finish").then( [](auto) { return PermanentError(); }); }); - EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, @@ -637,7 +696,7 @@ auto InitialStream(AsyncSequencer& sequencer) { )pb"; auto stream = std::make_unique(); - EXPECT_CALL(*stream, Cancel).Times(1); // Always called by OpenStream + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); // Always called by OpenStream EXPECT_CALL(*stream, Write) .WillOnce([=, &sequencer](Request const& request, grpc::WriteOptions) { auto expected = Request{}; @@ -716,16 +775,23 @@ TEST(ObjectDescriptorImpl, ResumeRangesOnRecoverableError) { )pb"; AsyncSequencer sequencer; + Request expected_resume_request; + ASSERT_TRUE( + TextFormat::ParseFromString(kResumeRequest, &expected_resume_request)); MockFactory factory; - EXPECT_CALL(factory, Call).WillOnce([=, &sequencer](Request const& request) { - auto expected = Request{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResumeRequest, &expected)); - EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering(expected)); - // Resume with an unrecoverable failure to simplify the test. - return sequencer.PushBack("Factory").then( - [&](auto) { return StatusOr(PermanentError()); }); - }); + EXPECT_CALL(factory, Call) + .WillOnce([](Request const& request) { + EXPECT_TRUE(request.read_ranges().empty()); + return make_ready_future(StatusOr(TransientError())); + }) + .WillOnce([&](Request const& request) { + EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering( + expected_resume_request)); + // Resume with an unrecoverable failure to simplify the test. + return sequencer.PushBack("Factory").then( + [](auto) { return StatusOr(PermanentError()); }); + }); auto spec = google::storage::v2::BidiReadObjectSpec{}; ASSERT_TRUE(TextFormat::ParseFromString(kReadSpecText, &spec)); @@ -852,7 +918,8 @@ TEST(ObjectDescriptorImpl, PendingFinish) { )pb"; auto stream = std::make_unique(); - EXPECT_CALL(*stream, Cancel).Times(1); // Always called by OpenStream + EXPECT_CALL(*stream, Cancel) + .Times(AtMost(1)); // Always called by OpenStream EXPECT_CALL(*stream, Write) .WillOnce([=, &sequencer](Request const& request, grpc::WriteOptions) { auto expected = Request{}; @@ -886,7 +953,9 @@ TEST(ObjectDescriptorImpl, PendingFinish) { )pb"; MockFactory factory; - EXPECT_CALL(factory, Call).Times(0); + EXPECT_CALL(factory, Call).WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); auto spec = google::storage::v2::BidiReadObjectSpec{}; ASSERT_TRUE(TextFormat::ParseFromString(kReadSpecText, &spec)); @@ -939,7 +1008,7 @@ TEST(ObjectDescriptorImpl, ResumeUsesRouting) { )pb"; auto stream = std::make_unique(); - EXPECT_CALL(*stream, Cancel).Times(1); // Always called by OpenStream + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); EXPECT_CALL(*stream, Write) .WillOnce([=, &sequencer](Request const& request, grpc::WriteOptions) { auto expected = Request{}; @@ -986,15 +1055,23 @@ TEST(ObjectDescriptorImpl, ResumeUsesRouting) { read_ranges { read_id: 1 read_offset: 20000 read_length: 100 } )pb"; + Request expected_resume_request; + ASSERT_TRUE( + TextFormat::ParseFromString(kResumeRequest, &expected_resume_request)); + MockFactory factory; - EXPECT_CALL(factory, Call).WillOnce([=, &sequencer](Request const& request) { - auto expected = Request{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResumeRequest, &expected)); - EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering(expected)); - // Resume with an unrecoverable failure to simplify the test. - return sequencer.PushBack("Factory").then( - [&](auto) { return StatusOr(PermanentError()); }); - }); + EXPECT_CALL(factory, Call) + .WillOnce([](Request const& request) { + EXPECT_TRUE(request.read_ranges().empty()); + return make_ready_future(StatusOr(TransientError())); + }) + .WillOnce([&](Request const& request) { + EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering( + expected_resume_request)); + // Resume with an unrecoverable failure to simplify the test. + return sequencer.PushBack("Factory").then( + [](auto) { return StatusOr(PermanentError()); }); + }); auto spec = google::storage::v2::BidiReadObjectSpec{}; ASSERT_TRUE(TextFormat::ParseFromString(kReadSpecText, &spec)); @@ -1117,17 +1194,24 @@ TEST(ObjectDescriptorImpl, RecoverFromPartialFailure) { return PartialFailure(2); }); }); - EXPECT_CALL(*stream, Cancel).Times(1); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + Request expected_resume_request; + ASSERT_TRUE( + TextFormat::ParseFromString(kResumeRequest, &expected_resume_request)); MockFactory factory; - EXPECT_CALL(factory, Call).WillOnce([=, &sequencer](Request const& request) { - auto expected = Request{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResumeRequest, &expected)); - EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering(expected)); - // Resume with an unrecoverable failure to simplify the test. - return sequencer.PushBack("Factory").then( - [&](auto) { return StatusOr(PermanentError()); }); - }); + EXPECT_CALL(factory, Call) + .WillOnce([](Request const& request) { + EXPECT_TRUE(request.read_ranges().empty()); + return make_ready_future(StatusOr(TransientError())); + }) + .WillOnce([&](Request const& request) { + EXPECT_THAT(request, IsProtoEqualModuloRepeatedFieldOrdering( + expected_resume_request)); + // Resume with an unrecoverable failure to simplify the test. + return sequencer.PushBack("Factory").then( + [](auto) { return StatusOr(PermanentError()); }); + }); auto spec = google::storage::v2::BidiReadObjectSpec{}; EXPECT_TRUE(TextFormat::ParseFromString(kReadSpecText, &spec)); @@ -1190,188 +1274,503 @@ TEST(ObjectDescriptorImpl, RecoverFromPartialFailure) { EXPECT_THAT(s3r1.get(), VariantWith(PermanentError())); } -/// @test Verify that we can create a subsequent stream and read from it. -TEST(ObjectDescriptorImpl, ReadWithSubsequentStream) { - // Setup - auto constexpr kResponse0 = R"pb( - metadata { - bucket: "projects/_/buckets/test-bucket" - name: "test-object" - generation: 42 - } - read_handle { handle: "handle-12345" } - )pb"; - auto constexpr kRequest1 = R"pb( - read_ranges { read_id: 1 read_offset: 100 read_length: 100 } - )pb"; - auto constexpr kResponse1 = R"pb( - object_data_ranges { - range_end: true - read_range { read_id: 1 read_offset: 100 } - checksummed_data { content: "payload-for-stream-1" } - } - )pb"; - auto constexpr kRequest2 = R"pb( - read_ranges { read_id: 2 read_offset: 200 read_length: 200 } - )pb"; - auto constexpr kResponse2 = R"pb( - object_data_ranges { - range_end: true - read_range { read_id: 2 read_offset: 200 } - checksummed_data { content: "payload-for-stream-2" } - } - )pb"; +/// @test Verify that a background stream is created proactively. +TEST(ObjectDescriptorImpl, ProactiveStreamCreation) { + AsyncSequencer sequencer; + auto stream = std::make_unique(); + EXPECT_CALL(*stream, Read).WillOnce([&] { + return sequencer.PushBack("Read").then( + [](auto) { return absl::optional(); }); + }); + EXPECT_CALL(*stream, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); // Always called by OpenStream + + MockFactory factory; + // This is the proactive stream creation. The implementation is designed to + // always keep a pending stream in flight, so it may call the factory more + // than once if a pending stream is consumed or fails. We use WillRepeatedly + // to make the test robust to this behavior. + EXPECT_CALL(factory, Call).WillRepeatedly([&](Request const& request) { + EXPECT_TRUE(request.read_ranges().empty()); + return sequencer.PushBack("Factory").then( + [](auto) { return StatusOr(PermanentError()); }); + }); + + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream))); + + tested->Start(Response{}); + + // In the implementation, the Read() is started before the factory call. + auto read_called = sequencer.PopFrontWithName(); + EXPECT_EQ(read_called.second, "Read"); + + auto factory_called = sequencer.PopFrontWithName(); + EXPECT_EQ(factory_called.second, "Factory"); + + // Allow the events to complete. + read_called.first.set_value(true); + factory_called.first.set_value(true); +} +/// @test Verify a new stream is used if all existing streams are busy. +TEST(ObjectDescriptorImpl, MakeSubsequentStreamCreatesNewWhenAllBusy) { AsyncSequencer sequencer; - // First stream setup + // Setup for the first stream, which will remain busy. auto stream1 = std::make_unique(); - EXPECT_CALL(*stream1, Write) - .WillOnce([&](Request const& request, grpc::WriteOptions) { - auto expected = Request{}; - EXPECT_TRUE(TextFormat::ParseFromString(kRequest1, &expected)); - EXPECT_THAT(request, IsProtoEqual(expected)); - return sequencer.PushBack("Write[1]").then([](auto f) { - return f.get(); - }); + EXPECT_CALL(*stream1, Write(_, _)) + .Times(::testing::AtMost(1)) + .WillRepeatedly( + [](Request const&, auto) { return make_ready_future(true); }); + // Keep stream1 busy but return ready futures. + EXPECT_CALL(*stream1, Read).WillRepeatedly([] { + return make_ready_future(absl::optional{}); + }); + EXPECT_CALL(*stream1, Finish).WillOnce([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*stream1, Cancel).Times(AtMost(1)); + + // Setup for the second stream, which will be created proactively. + auto stream2 = std::make_unique(); + EXPECT_CALL(*stream2, Write(_, _)) + .Times(::testing::AtMost(1)) + .WillRepeatedly([](...) { return make_ready_future(true); }); + EXPECT_CALL(*stream2, Read).WillRepeatedly([] { + return make_ready_future(absl::optional{}); + }); + EXPECT_CALL(*stream2, Finish).WillRepeatedly([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*stream2, Cancel).Times(AtMost(1)); + + // First call is proactive for stream2. It may be called more than once. + MockFactory factory; + EXPECT_CALL(factory, Call) + .WillOnce([&](Request const&) { + return make_ready_future(StatusOr(OpenStreamResult{ + std::make_shared(std::move(stream2)), Response{}})); + }) + .WillRepeatedly([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); }); + + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream1))); + tested->Start(Response{}); + + // Start a read on stream1 to make it busy. + auto reader1 = tested->Read({0, 100}); + + // Call MakeSubsequentStream. Since stream1 is busy, this should activate the + // pending stream (stream2). + tested->MakeSubsequentStream(); + + // A new read should now be routed to stream2. + auto reader2 = tested->Read({100, 200}); + + // stream2 returns ready futures; no sequencer pop needed here. + tested->Cancel(); + tested.reset(); +} + +/// @test Verify reusing an idle stream that is already last is a no-op. +TEST(ObjectDescriptorImpl, MakeSubsequentStreamReusesIdleStreamAlreadyLast) { + AsyncSequencer sequencer; + + // Setup for the first and only stream. + auto stream1 = std::make_unique(); + EXPECT_CALL(*stream1, Write(_, _)) + .Times(AtMost(1)) + .WillRepeatedly( + [](auto const&, auto) { return make_ready_future(true); }); + EXPECT_CALL(*stream1, Read) - .WillOnce([&]() { - return sequencer.PushBack("Read[1]").then([&](auto) { - auto response = Response{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResponse1, &response)); - return absl::make_optional(response); + .WillOnce([&] { // From Start() + return sequencer.PushBack("Read[1.1]").then([](auto) { + auto resp = Response{}; + auto* r = resp.add_object_data_ranges(); + r->set_range_end(true); + r->mutable_read_range()->set_read_id(0); + r->mutable_read_range()->set_read_offset(0); + return absl::make_optional(std::move(resp)); }); }) - .WillOnce([&]() { - return sequencer.PushBack("Read[1.eos]").then([&](auto) { - return absl::optional{}; + .WillOnce([&] { // From OnRead() loop + return sequencer.PushBack("Read[1.2]").then([](auto) { + return absl::make_optional(Response{}); }); + }) + .WillOnce([] { // Complete read_id=1 + auto resp = Response{}; + auto* r = resp.add_object_data_ranges(); + r->set_range_end(true); + r->mutable_read_range()->set_read_id(1); + r->mutable_read_range()->set_read_offset(0); + return make_ready_future(absl::make_optional(std::move(resp))); + }) + .WillRepeatedly( + [] { return make_ready_future(absl::optional{}); }); + + // Finish() will be called by the OpenStream destructor. + EXPECT_CALL(*stream1, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream1, Cancel) + .Times(AtMost(1)); // Always called by OpenStream + + MockFactory factory; + EXPECT_CALL(factory, Call) + .WillOnce([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }) + .WillRepeatedly([](Request const&) { + return make_ready_future(StatusOr(PermanentError())); }); - EXPECT_CALL(*stream1, Finish).WillOnce([&]() { - return sequencer.PushBack("Finish[1]").then([](auto) { - return PermanentError(); - }); + + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream1))); + auto start_response = Response{}; + EXPECT_TRUE(TextFormat::ParseFromString( + R"pb(object_data_ranges { read_range { read_id: 0 read_offset: 0 } })pb", + &start_response)); + tested->Start(std::move(start_response)); + + auto read1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read1.second, "Read[1.1]"); + read1.first.set_value(true); + + auto read2 = sequencer.PopFrontWithName(); + EXPECT_EQ(read2.second, "Read[1.2]"); + read2.first.set_value(true); + + // Call MakeSubsequentStream. It should find stream1 and return. + tested->MakeSubsequentStream(); + + // Ensure background activity stops cleanly. + tested->Cancel(); + tested.reset(); +} + +/// @test Verify an idle stream at the front is moved to the back and reused. +TEST(ObjectDescriptorImpl, MakeSubsequentStreamReusesAndMovesIdleStream) { + AsyncSequencer sequencer; + + // First stream setup + auto stream1 = std::make_unique(); + EXPECT_CALL(*stream1, Write(_, _)).WillRepeatedly([](auto const&, auto) { + return make_ready_future(true); }); - EXPECT_CALL(*stream1, Cancel).Times(1); + + EXPECT_CALL(*stream1, Read) + .WillOnce([&] { // From Start() + return sequencer.PushBack("Read[1.1]").then([](auto) { + return absl::make_optional(Response{}); + }); + }) + .WillOnce([] { // From OnRead() loop + auto response = Response{}; + auto* r = response.add_object_data_ranges(); + r->set_range_end(true); + r->mutable_read_range()->set_read_id(1); + r->mutable_read_range()->set_read_offset(0); + return make_ready_future(absl::make_optional(std::move(response))); + }) + .WillOnce([] { // From OnRead() loop after reader1 done + return make_ready_future(absl::make_optional(Response{})); + }) + .WillRepeatedly( + [] { return make_ready_future(absl::optional{}); }); + EXPECT_CALL(*stream1, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream1, Cancel).Times(AtMost(1)); // Second stream setup auto stream2 = std::make_unique(); - EXPECT_CALL(*stream2, Write) - .WillOnce([&](Request const& request, grpc::WriteOptions) { - auto expected = Request{}; - EXPECT_TRUE(TextFormat::ParseFromString(kRequest2, &expected)); - EXPECT_THAT(request, IsProtoEqual(expected)); - return sequencer.PushBack("Write[2]").then([](auto f) { + EXPECT_CALL(*stream2, Write(_, _)) + .WillOnce([&](Request const&, auto) { + return sequencer.PushBack("Write[2.1]").then([](auto f) { return f.get(); }); - }); + }) + .WillRepeatedly( + [](auto const&, auto) { return make_ready_future(true); }); EXPECT_CALL(*stream2, Read) - .WillOnce([&]() { - return sequencer.PushBack("Read[2]").then([&](auto) { - auto response = Response{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResponse2, &response)); - return absl::make_optional(response); + .WillOnce([&] { + return sequencer.PushBack("Read[2.1]").then([](auto) { + auto resp = Response{}; + auto* r = resp.add_object_data_ranges(); + r->set_range_end(true); + r->mutable_read_range()->set_read_id(2); + r->mutable_read_range()->set_read_offset(100); + return absl::make_optional(std::move(resp)); }); }) - .WillOnce([&]() { - return sequencer.PushBack("Read[2.eos]").then([](auto) { - return absl::optional{}; - }); - }); - EXPECT_CALL(*stream2, Finish).WillOnce([&]() { - return sequencer.PushBack("Finish[2]").then([](auto) { return Status{}; }); + .WillRepeatedly( + [] { return make_ready_future(absl::optional{}); }); + EXPECT_CALL(*stream2, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream2, Cancel).Times(AtMost(1)); + + // Third stream setup + auto stream3 = std::make_unique(); + // stream3 sits in pending_stream_ and may become active; provide ready + // defaults. + EXPECT_CALL(*stream3, Write(_, _)).WillRepeatedly([](auto const&, auto) { + return make_ready_future(true); + }); + EXPECT_CALL(*stream3, Read).WillRepeatedly([] { + return make_ready_future(absl::optional{}); }); - EXPECT_CALL(*stream2, Cancel).Times(1); + // stream3 sits in pending_stream_ and is destroyed when the test ends. + // It is never "started", so Finish() is never called. + EXPECT_CALL(*stream3, Finish).Times(AtMost(1)).WillRepeatedly([] { + return make_ready_future(Status{}); + }); + EXPECT_CALL(*stream3, Cancel).Times(AtMost(1)); // Mock factory for subsequent streams MockFactory factory; - EXPECT_CALL(factory, Call).WillOnce([&](Request const& request) { - EXPECT_TRUE(request.read_object_spec().has_read_handle()); - EXPECT_EQ(request.read_object_spec().read_handle().handle(), - "handle-12345"); - auto stream_result = OpenStreamResult{ - std::make_shared(std::move(stream2)), Response{}}; - return make_ready_future(make_status_or(std::move(stream_result))); - }); + EXPECT_CALL(factory, Call) + .WillOnce([&] { // For stream2 (Triggered by Start) + auto resp = Response{}; + EXPECT_TRUE( + TextFormat::ParseFromString( + R"pb(object_data_ranges { + read_range { read_id: 0 read_offset: 0 } + })pb", + &resp)); + return make_ready_future(StatusOr( + OpenStreamResult{std::make_shared(std::move(stream2)), + std::move(resp)})); + }) + .WillOnce([&] { // For stream3 (Triggered by MakeSubsequentStream) + return make_ready_future(StatusOr(OpenStreamResult{ + std::make_shared(std::move(stream3)), Response{}})); + }) + .WillRepeatedly([&](Request const&) { + return make_ready_future(StatusOr(PermanentError())); + }); - // Create the ObjectDescriptorImpl auto tested = std::make_shared( NoResume(), factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, std::make_shared(std::move(stream1))); - auto response0 = Response{}; - EXPECT_TRUE(TextFormat::ParseFromString(kResponse0, &response0)); - tested->Start(std::move(response0)); + auto start_response = Response{}; + ASSERT_TRUE(TextFormat::ParseFromString( + R"pb(object_data_ranges { read_range { read_id: 0 read_offset: 0 } })pb", + &start_response)); + tested->Start(std::move(start_response)); - auto read1 = sequencer.PopFrontWithName(); - EXPECT_EQ(read1.second, "Read[1]"); - // Start a read on the first stream - auto reader1 = tested->Read({100, 100}); - auto future1 = reader1->Read(); - // The implementation starts a read loop eagerly after Start(), and then - // the call to tested->Read() schedules a write. - auto write1 = sequencer.PopFrontWithName(); - EXPECT_EQ(write1.second, "Write[1]"); - write1.first.set_value(true); + auto read1_1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read1_1.second, "Read[1.1]"); - // Now we can satisfy the read. This will deliver the data to the reader. - read1.first.set_value(true); + // Make stream1 busy + auto reader1 = tested->Read({0, 100}); - EXPECT_THAT(future1.get(), - VariantWith(ResultOf( - "contents are", - [](storage_experimental::ReadPayload const& p) { - return p.contents(); - }, - ElementsAre(absl::string_view{"payload-for-stream-1"})))); + // Create and switch to a new stream. This happens before the first + // stream is finished. + tested->MakeSubsequentStream(); - EXPECT_THAT(reader1->Read().get(), VariantWith(IsOk())); + auto read2_1 = sequencer.PopFrontWithName(); + EXPECT_EQ(read2_1.second, "Read[2.1]"); - auto next = sequencer.PopFrontWithName(); - EXPECT_EQ(next.second, "Read[1.eos]"); - next.first.set_value(true); + // Make stream2 busy + auto reader2 = tested->Read({100, 200}); + auto write2_1 = sequencer.PopFrontWithName(); + EXPECT_EQ(write2_1.second, "Write[2.1]"); + write2_1.first.set_value(true); // Complete write immediately - // Create and switch to a new stream. This happens before the first - // stream is finished. + // Make stream1 IDLE. + auto r1f1 = reader1->Read(); + read1_1.first.set_value(true); + // read1_2/read1_3 completed via ready futures; no sequencer pops needed. + ASSERT_THAT(r1f1.get(), VariantWith(_)); + auto r1f2 = reader1->Read(); + ASSERT_THAT(r1f2.get(), VariantWith(IsOk())); + + // Call MakeSubsequentStream. It finds stream1, moves it, and returns. tested->MakeSubsequentStream(); - // The events are interleaved. Based on the log, Finish[1] comes first. - auto finish1 = sequencer.PopFrontWithName(); - EXPECT_EQ(finish1.second, "Finish[1]"); + auto reader3 = tested->Read({200, 300}); + read2_1.first.set_value(true); + tested.reset(); +} - auto read2 = sequencer.PopFrontWithName(); - EXPECT_EQ(read2.second, "Read[2]"); - finish1.first.set_value(true); +/// @test Verify that a successful resume executes the OnResume logic correctly. +TEST(ObjectDescriptorImpl, OnResumeSuccessful) { + AsyncSequencer sequencer; - // Start a read on the second stream - auto reader2 = tested->Read({200, 200}); - auto future2 = reader2->Read(); + auto expect_startup_events = [&](AsyncSequencer& seq) { + auto e1 = seq.PopFrontWithName(); + auto e2 = seq.PopFrontWithName(); + std::set names = {e1.second, e2.second}; + if (names.count("Read[1]") != 0 && names.count("ProactiveFactory") != 0) { + e1.first.set_value(true); // Allow read to proceed + e2.first.set_value(true); // Allow factory to proceed + } else { + ADD_FAILURE() << "Got unexpected events: " << e1.second << ", " + << e2.second; + } + }; - auto write2 = sequencer.PopFrontWithName(); - EXPECT_EQ(write2.second, "Write[2]"); - write2.first.set_value(true); + auto stream1 = std::make_unique(); + EXPECT_CALL(*stream1, Write).WillOnce([&](auto, auto) { + return sequencer.PushBack("Write[1]").then([](auto f) { return f.get(); }); + }); - read2.first.set_value(true); + // To keep Stream 1 alive during startup, the first Read returns a valid + // (empty) response. Subsequent reads return nullopt to trigger the + // Finish/Resume logic. + EXPECT_CALL(*stream1, Read) + .WillOnce([&] { + return sequencer.PushBack("Read[1]").then( + [](auto) { return absl::make_optional(Response{}); }); + }) + .WillRepeatedly([&] { + return sequencer.PushBack("Read[Loop]").then([](auto) { + return absl::optional{}; + }); + }); - EXPECT_THAT(future2.get(), - VariantWith(ResultOf( - "contents are", - [](storage_experimental::ReadPayload const& p) { - return p.contents(); - }, - ElementsAre(absl::string_view{"payload-for-stream-2"})))); + EXPECT_CALL(*stream1, Finish).WillOnce([&] { + return sequencer.PushBack("Finish[1]").then([](auto) { + return TransientError(); + }); + }); + EXPECT_CALL(*stream1, Cancel).Times(AtMost(1)); + + auto stream2 = std::make_unique(); + // The resumed stream to starts reading immediately. + EXPECT_CALL(*stream2, Read).WillRepeatedly([&] { + return sequencer.PushBack("Read[2]").then( + [](auto) { return absl::make_optional(Response{}); }); + }); + EXPECT_CALL(*stream2, Finish).WillOnce(Return(make_ready_future(Status{}))); + EXPECT_CALL(*stream2, Cancel).Times(AtMost(1)); + + MockFactory factory; + EXPECT_CALL(factory, Call) + .WillOnce([&](auto) { + return sequencer.PushBack("ProactiveFactory").then([](auto) { + return StatusOr(TransientError()); + }); + }) + .WillOnce([&](auto) { + return sequencer.PushBack("ResumeFactory").then([&](auto) { + return StatusOr(OpenStreamResult{ + std::make_shared(std::move(stream2)), Response{}}); + }); + }); + + auto tested = std::make_shared( + storage_experimental::LimitedErrorCountResumePolicy(1)(), + factory.AsStdFunction(), google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream1))); + + tested->Start(Response{}); + expect_startup_events(sequencer); + + // Register the read range. + auto reader = tested->Read({0, 100}); + + auto next_event = sequencer.PopFrontWithName(); + promise fail_stream_promise; + + if (next_event.second == "Read[Loop]") { + fail_stream_promise = std::move(next_event.first); + // Now expect Write[1] + auto w1 = sequencer.PopFrontWithName(); + EXPECT_EQ(w1.second, "Write[1]"); + w1.first.set_value(true); + } else { + // It was Write[1] immediately + EXPECT_EQ(next_event.second, "Write[1]"); + next_event.first.set_value(true); + + // Now wait for Read[Loop] + auto read_loop = sequencer.PopFrontWithName(); + EXPECT_EQ(read_loop.second, "Read[Loop]"); + fail_stream_promise = std::move(read_loop.first); + } + + // Trigger Failure on Stream 1. + fail_stream_promise.set_value(true); + + auto f1 = sequencer.PopFrontWithName(); + EXPECT_EQ(f1.second, "Finish[1]"); + f1.first.set_value(true); + + auto resume = sequencer.PopFrontWithName(); + EXPECT_EQ(resume.second, "ResumeFactory"); + resume.first.set_value(true); + + // The OnResume block calls OnRead, which triggers Read() on Stream 2. + auto r2 = sequencer.PopFrontWithName(); + EXPECT_EQ(r2.second, "Read[2]"); + r2.first.set_value(true); + + tested.reset(); +} + +/// @test Verify Read() behavior when all streams have failed permanently. +TEST(ObjectDescriptorImpl, ReadFailsWhenAllStreamsAreDead) { + AsyncSequencer sequencer; + auto stream = std::make_unique(); + + // Initial Read returns empty (EOF) to trigger Finish + EXPECT_CALL(*stream, Read).WillOnce([&] { + return sequencer.PushBack("Read[1]").then( + [](auto) { return absl::optional{}; }); + }); + + EXPECT_CALL(*stream, Finish).WillOnce([&] { + return sequencer.PushBack("Finish").then( + [](auto) { return PermanentError(); }); + }); + EXPECT_CALL(*stream, Cancel).Times(AtMost(1)); + + MockFactory factory; + EXPECT_CALL(factory, Call).WillOnce([&](auto) { + return sequencer.PushBack("ProactiveFactory").then([](auto) { + return StatusOr(PermanentError()); + }); + }); + + auto tested = std::make_shared( + NoResume(), factory.AsStdFunction(), + google::storage::v2::BidiReadObjectSpec{}, + std::make_shared(std::move(stream))); + + tested->Start(Response{}); + + auto e1 = sequencer.PopFrontWithName(); + auto e2 = sequencer.PopFrontWithName(); + + std::set names = {e1.second, e2.second}; + ASSERT_EQ(names.count("Read[1]"), 1); + ASSERT_EQ(names.count("ProactiveFactory"), 1); + + e1.first.set_value(true); + e2.first.set_value(true); + + auto finish = sequencer.PopFrontWithName(); + EXPECT_EQ(finish.second, "Finish"); + finish.first.set_value(true); - EXPECT_THAT(reader2->Read().get(), VariantWith(IsOk())); + // At this point, the only active stream failed permanently and was removed. + // The proactive stream creation also failed. The manager is now EMPTY. - auto read2_eos = sequencer.PopFrontWithName(); - EXPECT_EQ(read2_eos.second, "Read[2.eos]"); - read2_eos.first.set_value(true); + auto reader = tested->Read({0, 100}); - auto finish2 = sequencer.PopFrontWithName(); - EXPECT_EQ(finish2.second, "Finish[2]"); - finish2.first.set_value(true); + // The Read() should immediately fail with FailedPrecondition. + auto result = reader->Read().get(); + EXPECT_THAT(result, + VariantWith(StatusIs(StatusCode::kFailedPrecondition))); } } // namespace diff --git a/google/cloud/storage/storage_client_grpc_unit_tests.bzl b/google/cloud/storage/storage_client_grpc_unit_tests.bzl index 9387e768810aa..d009c79025598 100644 --- a/google/cloud/storage/storage_client_grpc_unit_tests.bzl +++ b/google/cloud/storage/storage_client_grpc_unit_tests.bzl @@ -41,6 +41,7 @@ storage_client_grpc_unit_tests = [ "internal/async/default_options_test.cc", "internal/async/handle_redirect_error_test.cc", "internal/async/insert_object_test.cc", + "internal/async/multi_stream_manager_test.cc", "internal/async/object_descriptor_connection_tracing_test.cc", "internal/async/object_descriptor_impl_test.cc", "internal/async/object_descriptor_reader_test.cc", diff --git a/google/cloud/storage/tests/async_client_integration_test.cc b/google/cloud/storage/tests/async_client_integration_test.cc index fed96694e768d..7424a2de485d4 100644 --- a/google/cloud/storage/tests/async_client_integration_test.cc +++ b/google/cloud/storage/tests/async_client_integration_test.cc @@ -1005,7 +1005,7 @@ TEST_F(AsyncClientIntegrationTest, Open) { } TEST_F(AsyncClientIntegrationTest, OpenExceedMaximumRange) { - GTEST_SKIP(); + if (!UsingEmulator()) GTEST_SKIP(); auto async = AsyncClient( TestOptions().set(1024)); auto client = MakeIntegrationTestClient(true, TestOptions());