diff --git a/google/cloud/storage/internal/async/writer_connection_resumed.cc b/google/cloud/storage/internal/async/writer_connection_resumed.cc index 3f467d0ed7eb3..8b9a3104fd0db 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed.cc @@ -184,6 +184,7 @@ class AsyncWriterConnectionResumedState } void WriteLoop(std::unique_lock lk) { + if (writing_) return; // Determine if there's data left to write *before* potentially finalizing. writing_ = write_offset_ < resend_buffer_.size(); @@ -205,7 +206,8 @@ class AsyncWriterConnectionResumedState } // If not finalizing, check if an empty flush is needed. if (flush_) { - // Pass empty payload to FlushStep + writing_ = true; + // Pass empty payload to FlushStep return FlushStep(std::move(lk), absl::Cord{}); } @@ -256,8 +258,10 @@ class AsyncWriterConnectionResumedState auto impl = Impl(lk); lk.unlock(); impl->Query().then([this, result, w = WeakFromThis()](auto f) { - SetFlushed(std::unique_lock(mu_), std::move(result)); - if (auto self = w.lock()) return self->OnQuery(f.get()); + auto self = w.lock(); + if (!self) return; + self->OnQuery(f.get()); + self->SetFlushed(std::unique_lock(self->mu_), std::move(result)); }); } @@ -295,8 +299,9 @@ class AsyncWriterConnectionResumedState buffer_offset_ = persisted_size; write_offset_ -= static_cast(n); // If the buffer is small enough, collect all the handlers to notify them. - auto const handlers = ClearHandlersIfEmpty(lk); - WriteLoop(std::move(lk)); + auto const handlers = ClearHandlersIfEmpty(lk); + writing_ = false; + StartWriting(std::move(lk)); // The notifications are deferred until the lock is released, as they might // call back and try to acquire the lock. for (auto const& h : handlers) { @@ -318,7 +323,8 @@ class AsyncWriterConnectionResumedState if (!result.ok()) return Resume(std::move(result)); std::unique_lock lk(mu_); write_offset_ += write_size; - return WriteLoop(std::move(lk)); + writing_ = false; + return StartWriting(std::move(lk)); } void Resume(Status const& s) { @@ -347,7 +353,8 @@ class AsyncWriterConnectionResumedState bool was_finalizing; { std::unique_lock lk(mu_); - was_finalizing = finalizing_; + was_finalizing = finalizing_; + writing_ = false; if (!s.ok() && cancelled_) { return SetError(std::move(lk), std::move(s)); } @@ -461,10 +468,6 @@ class AsyncWriterConnectionResumedState // lock. for (auto& h : handlers) h->Execute(Status{}); flushed.set_value(result); - // Restart the write loop ONLY if we are not already finalizing. - // If finalizing_ is true, the completion will be handled by OnFinalize. - std::unique_lock loop_lk(mu_); - if (!finalizing_) WriteLoop(std::move(loop_lk)); } void SetError(std::unique_lock lk, Status const& status) { @@ -590,7 +593,7 @@ class AsyncWriterConnectionResumedState // - A Flush() call that returns an unsatisified future until the buffer is // small enough. std::vector> flush_handlers_; - + // True if the writing loop is activate. bool writing_ = false; diff --git a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc index a046eb8588ed5..8d4852cecfaec 100644 --- a/google/cloud/storage/internal/async/writer_connection_resumed_test.cc +++ b/google/cloud/storage/internal/async/writer_connection_resumed_test.cc @@ -22,6 +22,8 @@ #include "google/cloud/testing_util/status_matchers.h" #include #include +#include +#include namespace google { namespace cloud { @@ -170,7 +172,7 @@ TEST(WriterConnectionResumed, FlushEmpty) { auto mock = std::make_unique(); EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); - EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) { + EXPECT_CALL(*mock, Flush).WillRepeatedly([&](auto const& p) { EXPECT_TRUE(p.payload().empty()); return sequencer.PushBack("Flush").then([](auto f) { if (!f.get()) return TransientError(); @@ -214,13 +216,21 @@ TEST(WriteConnectionResumed, FlushNonEmpty) { EXPECT_CALL(*mock, PersistedState) .WillRepeatedly(Return(MakePersistedState(0))); - EXPECT_CALL(*mock, Flush).WillOnce([&](auto const& p) { - EXPECT_EQ(p.payload(), payload.payload()); - return sequencer.PushBack("Flush").then([](auto f) { - if (!f.get()) return TransientError(); - return Status{}; - }); - }); + EXPECT_CALL(*mock, Flush) + .WillOnce([&](auto const& p) { + EXPECT_EQ(p.payload(), payload.payload()); + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }) + .WillOnce([&](auto const& p) { + EXPECT_TRUE(p.payload().empty()); + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); EXPECT_CALL(*mock, Query).WillOnce([&]() { return sequencer.PushBack("Query").then( [](auto f) -> StatusOr { @@ -392,8 +402,85 @@ TEST(WriteConnectionResumed, ResumeUsesAppendObjectSpecFromInitialRequest) { "projects/_/buckets/test-bucket"); } +TEST(WriteConnectionResumed, NoConcurrentWritesWhenFlushAndWriteRace) { + AsyncSequencer sequencer; + auto mock = std::make_unique(); + auto initial_request = google::storage::v2::BidiWriteObjectRequest{}; + auto first_response = google::storage::v2::BidiWriteObjectResponse{}; + + EXPECT_CALL(*mock, PersistedState) + .WillRepeatedly(Return(MakePersistedState(0))); + EXPECT_CALL(*mock, Flush(_)).WillRepeatedly([&](auto) { + return sequencer.PushBack("Flush").then([](auto f) { + if (!f.get()) return TransientError(); + return Status{}; + }); + }); + EXPECT_CALL(*mock, Query).WillOnce([&]() { + return sequencer.PushBack("Query").then([](auto f) -> StatusOr { + if (!f.get()) return TransientError(); + return 0; + }); + }); + + // Make Write detect concurrent invocations. If two writes run concurrently + // the compare_exchange will fail and the test will fail. + std::atomic in_write{false}; + EXPECT_CALL(*mock, Write(_)) + .WillRepeatedly([&](auto) { + bool expected = false; + EXPECT_TRUE(in_write.compare_exchange_strong(expected, true)); + // Simulate some work that allows a concurrent write to attempt to run. + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + in_write.store(false); + return make_ready_future(Status{}); + }); + + MockFactory mock_factory; + EXPECT_CALL(mock_factory, Call).Times(0); + + auto connection = MakeWriterConnectionResumed( + mock_factory.AsStdFunction(), std::move(mock), initial_request, nullptr, + first_response, Options{}); + + // Start a flush which will call impl->Flush() and block. + auto flush_future = connection->Flush({}); + // Allow the Flush to complete, this will schedule a Query (but Query will + // remain blocked until we pop it). + auto next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Flush"); + next.first.set_value(true); + + // Immediately perform a user Write after the flush completed but before + // Query completes. This can race with the OnQuery-driven write. + auto write_future = connection->Write(TestPayload(1024)); + + // Now allow the Query to complete; OnQuery may schedule a write. + next = sequencer.PopFrontWithName(); + EXPECT_EQ(next.second, "Query"); + next.first.set_value(true); + + // Wait for both futures to complete with a timeout to avoid indefinite hang. + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (!write_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(2); + while (!flush_future.is_ready() && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(std::chrono::milliseconds(5)); + } + + ASSERT_TRUE(write_future.is_ready()); + ASSERT_TRUE(flush_future.is_ready()); + + // Both futures should complete successfully. + EXPECT_THAT(write_future.get(), StatusIs(StatusCode::kOk)); + EXPECT_THAT(flush_future.get(), StatusIs(StatusCode::kOk)); +} + + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace storage_internal } // namespace cloud -} // namespace google +} // namespace google \ No newline at end of file