diff --git a/ci/cloudbuild/builds/integration-production.sh b/ci/cloudbuild/builds/integration-production.sh index 66ca4496d87fa..e0783aa8559bc 100755 --- a/ci/cloudbuild/builds/integration-production.sh +++ b/ci/cloudbuild/builds/integration-production.sh @@ -27,7 +27,7 @@ export CC=clang export CXX=clang++ mapfile -t args < <(bazel::common_args) -io::run bazel test "${args[@]}" --test_tag_filters=-integration-test "${BAZEL_TARGETS[@]}" +#io::run bazel test "${args[@]}" --test_tag_filters=-integration-test "${BAZEL_TARGETS[@]}" excluded_rules=( "-//examples:grpc_credential_types" @@ -39,8 +39,8 @@ excluded_rules=( "-//google/cloud/storagecontrol:v2_samples_storage_control_anywhere_cache_samples" ) +# --test_filter="*ReadRowsAllRows*" --test_timeout=30 \ io::log_h2 "Running the integration tests against prod" mapfile -t integration_args < <(integration::bazel_args) -io::run bazel test "${args[@]}" "${integration_args[@]}" \ - --cache_test_results="auto" --test_tag_filters="integration-test,-ud-only" \ - -- "${BAZEL_TARGETS[@]}" "${excluded_rules[@]}" +io::run bazel test "${args[@]}" "${integration_args[@]}" --test_output=all \ + //google/cloud/bigtable/tests:data_integration_test diff --git a/ci/cloudbuild/builds/lib/integration.sh b/ci/cloudbuild/builds/lib/integration.sh index 4ab702369f232..ebb8f71b240c0 100644 --- a/ci/cloudbuild/builds/lib/integration.sh +++ b/ci/cloudbuild/builds/lib/integration.sh @@ -51,7 +51,7 @@ function integration::bazel_args() { # Integration tests are inherently flaky. Make up to three attempts to get the # test passing. - args+=(--flaky_test_attempts=3) + #args+=(--flaky_test_attempts=3) args+=( # Common settings diff --git a/google/cloud/bigtable/CMakeLists.txt b/google/cloud/bigtable/CMakeLists.txt index 48436804617f3..b8c0781f75b7a 100644 --- a/google/cloud/bigtable/CMakeLists.txt +++ b/google/cloud/bigtable/CMakeLists.txt @@ -166,6 +166,8 @@ add_library( internal/bigtable_logging_decorator.h internal/bigtable_metadata_decorator.cc internal/bigtable_metadata_decorator.h + internal/bigtable_random_two_least_used_decorator.cc + internal/bigtable_random_two_least_used_decorator.h internal/bigtable_round_robin_decorator.cc internal/bigtable_round_robin_decorator.h internal/bigtable_stub.cc @@ -194,6 +196,8 @@ add_library( internal/default_row_reader.h internal/defaults.cc internal/defaults.h + internal/dynamic_channel_pool.cc + internal/dynamic_channel_pool.h internal/google_bytes_traits.cc internal/google_bytes_traits.h internal/legacy_async_bulk_apply.cc @@ -491,6 +495,7 @@ if (BUILD_TESTING) internal/data_tracing_connection_test.cc internal/default_row_reader_test.cc internal/defaults_test.cc + internal/dynamic_channel_pool_test.cc internal/google_bytes_traits_test.cc internal/legacy_async_bulk_apply_test.cc internal/legacy_async_row_reader_test.cc diff --git a/google/cloud/bigtable/bigtable_client_unit_tests.bzl b/google/cloud/bigtable/bigtable_client_unit_tests.bzl index c5dda043da0a4..7c6ec80782a75 100644 --- a/google/cloud/bigtable/bigtable_client_unit_tests.bzl +++ b/google/cloud/bigtable/bigtable_client_unit_tests.bzl @@ -56,6 +56,7 @@ bigtable_client_unit_tests = [ "internal/data_tracing_connection_test.cc", "internal/default_row_reader_test.cc", "internal/defaults_test.cc", + "internal/dynamic_channel_pool_test.cc", "internal/google_bytes_traits_test.cc", "internal/legacy_async_bulk_apply_test.cc", "internal/legacy_async_row_reader_test.cc", diff --git a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl index c5d4f47beff0b..eccb8f17ba000 100644 --- a/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl +++ b/google/cloud/bigtable/google_cloud_cpp_bigtable.bzl @@ -80,6 +80,7 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/bigtable_channel_refresh.h", "internal/bigtable_logging_decorator.h", "internal/bigtable_metadata_decorator.h", + "internal/bigtable_random_two_least_used_decorator.h", "internal/bigtable_round_robin_decorator.h", "internal/bigtable_stub.h", "internal/bigtable_stub_factory.h", @@ -95,6 +96,7 @@ google_cloud_cpp_bigtable_hdrs = [ "internal/data_tracing_connection.h", "internal/default_row_reader.h", "internal/defaults.h", + "internal/dynamic_channel_pool.h", "internal/google_bytes_traits.h", "internal/legacy_async_bulk_apply.h", "internal/legacy_async_row_reader.h", @@ -204,6 +206,7 @@ google_cloud_cpp_bigtable_srcs = [ "internal/bigtable_channel_refresh.cc", "internal/bigtable_logging_decorator.cc", "internal/bigtable_metadata_decorator.cc", + "internal/bigtable_random_two_least_used_decorator.cc", "internal/bigtable_round_robin_decorator.cc", "internal/bigtable_stub.cc", "internal/bigtable_stub_factory.cc", @@ -217,6 +220,7 @@ google_cloud_cpp_bigtable_srcs = [ "internal/data_tracing_connection.cc", "internal/default_row_reader.cc", "internal/defaults.cc", + "internal/dynamic_channel_pool.cc", "internal/google_bytes_traits.cc", "internal/legacy_async_bulk_apply.cc", "internal/legacy_async_row_reader.cc", diff --git a/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc b/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc new file mode 100644 index 0000000000000..8203234f6325d --- /dev/null +++ b/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.cc @@ -0,0 +1,341 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +template +class StreamingReadRpcTracking + : public google::cloud::internal::StreamingReadRpc { + public: + StreamingReadRpcTracking( + std::unique_ptr> child, + std::function on_destruction) + : child_(std::move(child)), on_destruction_(std::move(on_destruction)) {} + + ~StreamingReadRpcTracking() override { on_destruction_(); } + + void Cancel() override { child_->Cancel(); } + absl::optional Read(T* response) override { + return child_->Read(response); + } + RpcMetadata GetRequestMetadata() const override { + return child_->GetRequestMetadata(); + } + + private: + std::unique_ptr> child_; + std::function on_destruction_; +}; + +template +class AsyncStreamingReadRpcTracking + : public google::cloud::internal::AsyncStreamingReadRpc { + public: + AsyncStreamingReadRpcTracking( + std::unique_ptr> child, + std::function on_destruction) + : child_(std::move(child)), on_destruction_(std::move(on_destruction)) {} + + ~AsyncStreamingReadRpcTracking() override { on_destruction_(); } + + void Cancel() override { child_->Cancel(); } + future Start() override { return child_->Start(); } + future> Read() override { return child_->Read(); } + future Finish() override { return child_->Finish(); } + RpcMetadata GetRequestMetadata() const override { + return child_->GetRequestMetadata(); + } + + private: + std::unique_ptr> child_; + std::function on_destruction_; +}; + +} // namespace + +std::unique_ptr> +BigtableRandomTwoLeastUsed::ReadRows( + std::shared_ptr context, Options const& options, + google::bigtable::v2::ReadRowsRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->ReadRows(std::move(context), options, request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique< + StreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::SampleRowKeys( + std::shared_ptr context, Options const& options, + google::bigtable::v2::SampleRowKeysRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->SampleRowKeys(std::move(context), options, request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique< + StreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +StatusOr +BigtableRandomTwoLeastUsed::MutateRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::MutateRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->MutateRow(context, options, request); + child->ReleaseStub(); + return result; +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::MutateRows( + std::shared_ptr context, Options const& options, + google::bigtable::v2::MutateRowsRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->MutateRows(std::move(context), options, request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique< + StreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +StatusOr +BigtableRandomTwoLeastUsed::CheckAndMutateRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::CheckAndMutateRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->CheckAndMutateRow(context, options, request); + child->ReleaseStub(); + return result; +} + +StatusOr +BigtableRandomTwoLeastUsed::PingAndWarm( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::PingAndWarmRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->PingAndWarm(context, options, request); + child->ReleaseStub(); + return result; +} + +StatusOr +BigtableRandomTwoLeastUsed::ReadModifyWriteRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::ReadModifyWriteRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->ReadModifyWriteRow(context, options, request); + child->ReleaseStub(); + return result; +} + +StatusOr +BigtableRandomTwoLeastUsed::PrepareQuery( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::PrepareQueryRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->PrepareQuery(context, options, request); + child->ReleaseStub(); + return result; +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::ExecuteQuery( + std::shared_ptr context, Options const& options, + google::bigtable::v2::ExecuteQueryRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->ExecuteQuery(std::move(context), options, request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique< + StreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::AsyncReadRows( + google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::ReadRowsRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = + stub->AsyncReadRows(cq, std::move(context), std::move(options), request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique< + AsyncStreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::AsyncSampleRowKeys( + google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::SampleRowKeysRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->AsyncSampleRowKeys(cq, std::move(context), + std::move(options), request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + return std::make_unique>(std::move(result), + std::move(release_fn)); +} + +future> +BigtableRandomTwoLeastUsed::AsyncMutateRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::MutateRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = + stub->AsyncMutateRow(cq, std::move(context), std::move(options), request); + child->ReleaseStub(); + return result; +} + +std::unique_ptr> +BigtableRandomTwoLeastUsed::AsyncMutateRows( + google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::MutateRowsRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->AsyncMutateRows(cq, std::move(context), + std::move(options), request); + auto release_fn = [weak = child->MakeWeak()] { + auto child = weak.lock(); + if (child) child->ReleaseStub(); + }; + + return std::make_unique< + AsyncStreamingReadRpcTracking>( + std::move(result), std::move(release_fn)); +} + +future> +BigtableRandomTwoLeastUsed::AsyncCheckAndMutateRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::CheckAndMutateRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->AsyncCheckAndMutateRow(cq, std::move(context), + std::move(options), request); + child->ReleaseStub(); + return result; +} + +future> +BigtableRandomTwoLeastUsed::AsyncReadModifyWriteRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::ReadModifyWriteRowRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->AsyncReadModifyWriteRow(cq, std::move(context), + std::move(options), request); + child->ReleaseStub(); + return result; +} + +future> +BigtableRandomTwoLeastUsed::AsyncPrepareQuery( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::PrepareQueryRequest const& request) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto child = Child(); + auto stub = child->AcquireStub(); + auto result = stub->AsyncPrepareQuery(cq, std::move(context), + std::move(options), request); + child->ReleaseStub(); + return result; +} + +std::shared_ptr> +BigtableRandomTwoLeastUsed::Child() { + std::cout << __PRETTY_FUNCTION__ << std::endl; + return pool_->GetChannelRandomTwoLeastUsed(); +} + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h b/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h new file mode 100644 index 0000000000000..8ee6a7504fb90 --- /dev/null +++ b/google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h @@ -0,0 +1,141 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_RANDOM_TWO_LEAST_USED_DECORATOR_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_RANDOM_TWO_LEAST_USED_DECORATOR_H + +#include "google/cloud/bigtable/internal/bigtable_stub.h" +#include "google/cloud/bigtable/internal/dynamic_channel_pool.h" +#include "google/cloud/version.h" +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +class BigtableRandomTwoLeastUsed : public BigtableStub { + public: + explicit BigtableRandomTwoLeastUsed( + std::shared_ptr> pool) + : pool_(std::move(pool)) {} + + ~BigtableRandomTwoLeastUsed() override = default; + + std::unique_ptr> + ReadRows(std::shared_ptr context, Options const& options, + google::bigtable::v2::ReadRowsRequest const& request) override; + + std::unique_ptr> + SampleRowKeys( + std::shared_ptr context, Options const& options, + google::bigtable::v2::SampleRowKeysRequest const& request) override; + + StatusOr MutateRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::MutateRowRequest const& request) override; + + std::unique_ptr> + MutateRows(std::shared_ptr context, + Options const& options, + google::bigtable::v2::MutateRowsRequest const& request) override; + + StatusOr CheckAndMutateRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::CheckAndMutateRowRequest const& request) override; + + StatusOr PingAndWarm( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::PingAndWarmRequest const& request) override; + + StatusOr ReadModifyWriteRow( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::ReadModifyWriteRowRequest const& request) override; + + StatusOr PrepareQuery( + grpc::ClientContext& context, Options const& options, + google::bigtable::v2::PrepareQueryRequest const& request) override; + + std::unique_ptr> + ExecuteQuery( + std::shared_ptr context, Options const& options, + google::bigtable::v2::ExecuteQueryRequest const& request) override; + + std::unique_ptr<::google::cloud::internal::AsyncStreamingReadRpc< + google::bigtable::v2::ReadRowsResponse>> + AsyncReadRows(google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::ReadRowsRequest const& request) override; + + std::unique_ptr<::google::cloud::internal::AsyncStreamingReadRpc< + google::bigtable::v2::SampleRowKeysResponse>> + AsyncSampleRowKeys( + google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::SampleRowKeysRequest const& request) override; + + future> AsyncMutateRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::MutateRowRequest const& request) override; + + std::unique_ptr<::google::cloud::internal::AsyncStreamingReadRpc< + google::bigtable::v2::MutateRowsResponse>> + AsyncMutateRows( + google::cloud::CompletionQueue const& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::MutateRowsRequest const& request) override; + + future> + AsyncCheckAndMutateRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::CheckAndMutateRowRequest const& request) override; + + future> + AsyncReadModifyWriteRow( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::ReadModifyWriteRowRequest const& request) override; + + future> + AsyncPrepareQuery( + google::cloud::CompletionQueue& cq, + std::shared_ptr context, + google::cloud::internal::ImmutableOptions options, + google::bigtable::v2::PrepareQueryRequest const& request) override; + + private: + std::shared_ptr> Child(); + std::shared_ptr> pool_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_RANDOM_TWO_LEAST_USED_DECORATOR_H diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.cc b/google/cloud/bigtable/internal/bigtable_stub_factory.cc index 9d5add4a61c44..e0f0537c5c8a8 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.cc +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.cc @@ -17,6 +17,7 @@ #include "google/cloud/bigtable/internal/bigtable_channel_refresh.h" #include "google/cloud/bigtable/internal/bigtable_logging_decorator.h" #include "google/cloud/bigtable/internal/bigtable_metadata_decorator.h" +#include "google/cloud/bigtable/internal/bigtable_random_two_least_used_decorator.h" #include "google/cloud/bigtable/internal/bigtable_round_robin_decorator.h" #include "google/cloud/bigtable/internal/bigtable_tracing_stub.h" #include "google/cloud/bigtable/internal/connection_refresh_state.h" @@ -63,35 +64,98 @@ std::string FeaturesMetadata() { } // namespace std::shared_ptr CreateBigtableStubRoundRobin( - Options const& options, - std::function(int)> child_factory) { + Options const& options, std::function(int)> + refreshing_channel_stub_factory) { std::vector> children( (std::max)(1, options.get())); int id = 0; std::generate(children.begin(), children.end(), - [&id, &child_factory] { return child_factory(id++); }); + [&id, &refreshing_channel_stub_factory] { + return refreshing_channel_stub_factory(id++); + }); return std::make_shared(std::move(children)); } +std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( + std::shared_ptr auth, + std::shared_ptr cq_impl, + Options const& options, BaseBigtableStubFactory stub_factory, + std::shared_ptr refresh_state) { + std::cout << __PRETTY_FUNCTION__ << std::endl; + + auto refreshing_channel_stub_factory = + [stub_factory = std::move(stub_factory), cq_impl, refresh_state, + auth = std::move(auth), options]( + std::uint32_t id, + bool prime_channel) -> std::shared_ptr> { + auto wrapper = std::make_shared>(); + auto connection_status_fn = [weak = wrapper->MakeWeak()](Status const& s) { + if (auto self = weak.lock()) { + self->set_last_refresh_status(s); + } + if (!s.ok()) { + GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << s; + } + }; + auto channel = CreateGrpcChannel(*auth, options, id); + if (prime_channel) { + (void)channel->GetState(true); + } + ScheduleChannelRefresh(cq_impl, refresh_state, channel, + std::move(connection_status_fn)); + wrapper->set_channel(stub_factory(std::move(channel))); + return wrapper; + }; + + std::vector>> children( + std::max(1, options.get())); + std::uint32_t id = 0; + std::generate(children.begin(), children.end(), + [&id, &refreshing_channel_stub_factory] { + return refreshing_channel_stub_factory(id++, false); + }); + + return std::make_shared( + DynamicChannelPool::Create( + CompletionQueue(std::move(cq_impl)), std::move(children), + std::move(refresh_state), + std::move(refreshing_channel_stub_factory))); + // CompletionQueue(cq_impl), std::move(refresh_state), + // std::move(refreshing_channel_stub_factory), std::move(children)); +} + std::shared_ptr CreateDecoratedStubs( std::shared_ptr auth, CompletionQueue const& cq, Options const& options, - BaseBigtableStubFactory const& base_factory) { + BaseBigtableStubFactory const& stub_factory) { auto cq_impl = internal::GetCompletionQueueImpl(cq); auto refresh = std::make_shared( cq_impl, options.get(), options.get()); - auto child_factory = [base_factory, cq_impl, refresh, &auth, - options](int id) { - auto channel = CreateGrpcChannel(*auth, options, id); - if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel); - return base_factory(std::move(channel)); - }; - auto stub = CreateBigtableStubRoundRobin(options, std::move(child_factory)); - if (refresh->enabled()) { - stub = std::make_shared(std::move(stub), - std::move(refresh)); + + std::shared_ptr stub; + if (options.has() && + options.get() == + bigtable::experimental::ChannelPoolType::kDynamic) { + stub = CreateBigtableStubRandomTwoLeastUsed( + auth, std::move(cq_impl), options, stub_factory, + // std::move(refreshing_channel_stub_factory), + std::move(refresh)); + } else { + auto refreshing_channel_stub_factory = [stub_factory, cq_impl, refresh, + &auth, options](int id) { + auto channel = CreateGrpcChannel(*auth, options, id); + if (refresh->enabled()) ScheduleChannelRefresh(cq_impl, refresh, channel); + return stub_factory(std::move(channel)); + }; + stub = CreateBigtableStubRoundRobin( + options, std::move(refreshing_channel_stub_factory)); + if (refresh->enabled()) { + stub = std::make_shared(std::move(stub), + std::move(refresh)); + } } + if (auth->RequiresConfigureContext()) { stub = std::make_shared(std::move(auth), std::move(stub)); } diff --git a/google/cloud/bigtable/internal/bigtable_stub_factory.h b/google/cloud/bigtable/internal/bigtable_stub_factory.h index cdf7a633b0f42..346014fb85b97 100644 --- a/google/cloud/bigtable/internal/bigtable_stub_factory.h +++ b/google/cloud/bigtable/internal/bigtable_stub_factory.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_BIGTABLE_STUB_FACTORY_H #include "google/cloud/bigtable/internal/bigtable_stub.h" +#include "google/cloud/bigtable/internal/connection_refresh_state.h" #include "google/cloud/completion_queue.h" #include "google/cloud/internal/unified_grpc_credentials.h" #include "google/cloud/options.h" @@ -28,18 +29,32 @@ namespace cloud { namespace bigtable_internal { GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +enum class ChannelSelectionStrategy { kRoundRobin, kRandomTwoLeastUsed }; + +struct ChannelSelectionStrategyOption { + using Type = ChannelSelectionStrategy; +}; + using BaseBigtableStubFactory = std::function( std::shared_ptr)>; std::shared_ptr CreateBigtableStubRoundRobin( - Options const& options, - std::function(int)> child_factory); + Options const& options, std::function(int)> + refreshing_channel_stub_factory); + +std::shared_ptr CreateBigtableStubRandomTwoLeastUsed( + std::shared_ptr auth, + std::shared_ptr cq_impl, + Options const& options, BaseBigtableStubFactory stub_factory, + // std::function(int)> + // refreshing_channel_stub_factory, + std::shared_ptr refresh_state); /// Used in testing to create decorated mocks. std::shared_ptr CreateDecoratedStubs( std::shared_ptr auth, CompletionQueue const& cq, Options const& options, - BaseBigtableStubFactory const& base_factory); + BaseBigtableStubFactory const& stub_factory); /// Default function used by `DataConnectionImpl`. std::shared_ptr CreateBigtableStub( diff --git a/google/cloud/bigtable/internal/bulk_mutator.cc b/google/cloud/bigtable/internal/bulk_mutator.cc index 7203d23fbd14d..481cab7fe3ce2 100644 --- a/google/cloud/bigtable/internal/bulk_mutator.cc +++ b/google/cloud/bigtable/internal/bulk_mutator.cc @@ -214,6 +214,7 @@ grpc::Status BulkMutator::MakeOneRequest(bigtable::DataClient& client, Status BulkMutator::MakeOneRequest(BigtableStub& stub, MutateRowsLimiter& limiter, Options const& options) { + std::cout << __PRETTY_FUNCTION__ << std::endl; // Send the request to the server. auto const& mutations = state_.BeforeStart(); @@ -226,8 +227,10 @@ Status BulkMutator::MakeOneRequest(BigtableStub& stub, // Potentially throttle the request limiter.Acquire(); + std::cout << __PRETTY_FUNCTION__ << ": pre-stub" << std::endl; // Read the stream of responses. auto stream = stub.MutateRows(client_context, options, mutations); + std::cout << __PRETTY_FUNCTION__ << ": post-stub" << std::endl; absl::optional status; while (true) { btproto::MutateRowsResponse response; diff --git a/google/cloud/bigtable/internal/connection_refresh_state.cc b/google/cloud/bigtable/internal/connection_refresh_state.cc index 2a6cdd052b6bc..04ed8afe1650e 100644 --- a/google/cloud/bigtable/internal/connection_refresh_state.cc +++ b/google/cloud/bigtable/internal/connection_refresh_state.cc @@ -30,6 +30,12 @@ namespace { */ auto constexpr kConnectionReadyTimeout = std::chrono::seconds(10); +void LogFailedConnectionRefresh(Status const& conn_status) { + if (!conn_status.ok()) { + GCP_LOG(WARNING) << "Failed to refresh connection. Error: " << conn_status; + } +} + } // namespace ConnectionRefreshState::ConnectionRefreshState( @@ -56,7 +62,11 @@ bool ConnectionRefreshState::enabled() const { void ScheduleChannelRefresh( std::shared_ptr const& cq_impl, std::shared_ptr const& state, - std::shared_ptr const& channel) { + std::shared_ptr const& channel, + std::function connection_status_fn) { + if (!connection_status_fn) { + connection_status_fn = LogFailedConnectionRefresh; + } // The timers will only hold weak pointers to the channel or to the // completion queue, so if either of them are destroyed, the timer chain // will simply not continue. @@ -66,7 +76,9 @@ void ScheduleChannelRefresh( using TimerFuture = future>; auto timer_future = cq.MakeRelativeTimer(state->RandomizedRefreshDelay()) - .then([weak_channel, weak_cq_impl, state](TimerFuture fut) { + .then([weak_channel, weak_cq_impl, state, + connection_status_fn = + std::move(connection_status_fn)](TimerFuture fut) { if (!fut.get()) { // Timer cancelled. return; @@ -79,17 +91,17 @@ void ScheduleChannelRefresh( cq.AsyncWaitConnectionReady( channel, std::chrono::system_clock::now() + kConnectionReadyTimeout) - .then([weak_channel, weak_cq_impl, state](future fut) { + .then([weak_channel, weak_cq_impl, state, + connection_status_fn = std::move(connection_status_fn)]( + future fut) { auto conn_status = fut.get(); - if (!conn_status.ok()) { - GCP_LOG(WARNING) << "Failed to refresh connection. Error: " - << conn_status; - } + connection_status_fn(conn_status); auto channel = weak_channel.lock(); if (!channel) return; auto cq_impl = weak_cq_impl.lock(); if (!cq_impl) return; - ScheduleChannelRefresh(cq_impl, state, channel); + ScheduleChannelRefresh(cq_impl, state, channel, + std::move(connection_status_fn)); }); }); state->timers().RegisterTimer(std::move(timer_future)); diff --git a/google/cloud/bigtable/internal/connection_refresh_state.h b/google/cloud/bigtable/internal/connection_refresh_state.h index 4e4f9f3dcfdd5..7bfc1e807bb27 100644 --- a/google/cloud/bigtable/internal/connection_refresh_state.h +++ b/google/cloud/bigtable/internal/connection_refresh_state.h @@ -86,7 +86,8 @@ class ConnectionRefreshState { void ScheduleChannelRefresh( std::shared_ptr const& cq, std::shared_ptr const& state, - std::shared_ptr const& channel); + std::shared_ptr const& channel, + std::function connection_status_fn = {}); GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal diff --git a/google/cloud/bigtable/internal/data_connection_impl.cc b/google/cloud/bigtable/internal/data_connection_impl.cc index 87aeeadd0f7c4..d7dc6b48b262a 100644 --- a/google/cloud/bigtable/internal/data_connection_impl.cc +++ b/google/cloud/bigtable/internal/data_connection_impl.cc @@ -102,6 +102,7 @@ bigtable::RowReader ReadRowsHelper( params, // NOLINT(performance-unnecessary-value-param) std::shared_ptr operation_context) { // NOLINT(performance-unnecessary-value-param) + std::cout << __PRETTY_FUNCTION__ << std::endl; auto impl = std::make_shared( stub, std::move(params.app_profile_id), std::move(params.table_name), std::move(params.row_set), params.rows_limit, std::move(params.filter), @@ -338,6 +339,7 @@ future DataConnectionImpl::AsyncApply(std::string const& table_name, std::vector DataConnectionImpl::BulkApply( std::string const& table_name, bigtable::BulkMutation mut) { + std::cout << __PRETTY_FUNCTION__ << std::endl; auto current = google::cloud::internal::SaveCurrentOptions(); if (mut.empty()) return {}; auto operation_context = operation_context_factory_->MutateRows( @@ -350,6 +352,7 @@ std::vector DataConnectionImpl::BulkApply( std::unique_ptr retry; std::unique_ptr backoff; Status status; + std::cout << __PRETTY_FUNCTION__ << ": pre-loop" << std::endl; while (true) { status = mutator.MakeOneRequest(*stub_, *limiter_, *current); if (!mutator.HasPendingMutations()) break; @@ -361,6 +364,7 @@ std::vector DataConnectionImpl::BulkApply( if (!delay) break; std::this_thread::sleep_for(*delay); } + std::cout << __PRETTY_FUNCTION__ << ": post-loop" << std::endl; operation_context->OnDone(status); return std::move(mutator).OnRetryDone(); } @@ -380,6 +384,7 @@ DataConnectionImpl::AsyncBulkApply(std::string const& table_name, bigtable::RowReader DataConnectionImpl::ReadRowsFull( bigtable::ReadRowsParams params) { + std::cout << __PRETTY_FUNCTION__ << std::endl; auto current = google::cloud::internal::SaveCurrentOptions(); auto operation_context = operation_context_factory_->ReadRows( params.table_name, params.app_profile_id); @@ -660,6 +665,7 @@ void DataConnectionImpl::AsyncReadRows( std::function(bigtable::Row)> on_row, std::function on_finish, bigtable::RowSet row_set, std::int64_t rows_limit, bigtable::Filter filter) { + std::cout << __PRETTY_FUNCTION__ << std::endl; auto current = google::cloud::internal::SaveCurrentOptions(); auto operation_context = operation_context_factory_->ReadRows( table_name, app_profile_id(*current)); diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.cc b/google/cloud/bigtable/internal/dynamic_channel_pool.cc new file mode 100644 index 0000000000000..d47303482257a --- /dev/null +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.cc @@ -0,0 +1,25 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/dynamic_channel_pool.h" +#include "google/cloud/version.h" + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h new file mode 100644 index 0000000000000..f36d97ec6148d --- /dev/null +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -0,0 +1,499 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H +#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H + +#include "google/cloud/bigtable/internal/connection_refresh_state.h" +#include "google/cloud/bigtable/options.h" +#include "google/cloud/completion_queue.h" +#include "google/cloud/internal/clock.h" +#include "google/cloud/internal/random.h" +#include "google/cloud/version.h" +#include +#include +#include +#include +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN + +template +class ChannelUsage : public std::enable_shared_from_this> { + public: + using Clock = ::google::cloud::internal::SteadyClock; + ChannelUsage() : clock_(std::make_shared()) {} + explicit ChannelUsage(std::shared_ptr stub, std::shared_ptr clock = + std::make_shared()) + : stub_(std::move(stub)), clock_(std::move(clock)) {} + + StatusOr average_outstanding_rpcs() { + std::unique_lock lk(mu_); + if (!last_refresh_status_.ok()) return last_refresh_status_; + if (measurements_.empty()) return 0; + auto time_point = clock_->Now() - std::chrono::seconds(60); + auto iter = std::find_if( + measurements_.begin(), measurements_.end(), + [&](Measurement const& m) { return m.timestamp >= time_point; }); + int num_measurements = 0; + int last_minute_sum = std::accumulate( + iter, measurements_.end(), 0, [&](int a, Measurement const& b) mutable { + ++num_measurements; + return a + b.outstanding_rpcs; + }); + measurements_.erase(measurements_.begin(), iter); + return num_measurements > 0 ? last_minute_sum / num_measurements : 0; + } + + StatusOr instant_outstanding_rpcs() { + std::unique_lock lk(mu_); + if (!last_refresh_status_.ok()) return last_refresh_status_; + return outstanding_rpcs_; + } + + ChannelUsage& set_last_refresh_status(Status s) { + std::unique_lock lk(mu_); + last_refresh_status_ = std::move(s); + return *this; + } + + // A channel can only be set if the current value is nullptr. This mutator + // exists only so that we can obtain a std::weak_ptr to the ChannelUsage + // object that will eventually hold the channel. + ChannelUsage& set_channel(std::shared_ptr channel) { + std::unique_lock lk(mu_); + if (!stub_) stub_ = std::move(channel); + return *this; + } + + std::weak_ptr> MakeWeak() { return this->shared_from_this(); } + + std::shared_ptr AcquireStub() { + std::unique_lock lk(mu_); + ++outstanding_rpcs_; + auto time = clock_->Now(); + measurements_.emplace_back(outstanding_rpcs_, time); + return stub_; + } + + void ReleaseStub() { + std::unique_lock lk(mu_); + --outstanding_rpcs_; + measurements_.emplace_back(outstanding_rpcs_, clock_->Now()); + } + + private: + mutable std::mutex mu_; + std::shared_ptr stub_; + std::shared_ptr clock_; + int outstanding_rpcs_ = 0; + Status last_refresh_status_; + struct Measurement { + Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p) + : outstanding_rpcs(outstanding_rpcs), timestamp(p) {} + int outstanding_rpcs; + std::chrono::steady_clock::time_point timestamp; + }; + std::deque measurements_; +}; + +template +class DynamicChannelPool + : public std::enable_shared_from_this> { + public: + using StubFactoryFn = std::function>( + std::uint32_t id, bool prime_channel)>; + + static std::shared_ptr Create( + CompletionQueue cq, + std::vector>> initial_channels, + std::shared_ptr refresh_state, + StubFactoryFn stub_factory_fn, + bigtable::experimental::DynamicChannelPoolSizingPolicy sizing_policy = + {}) { + std::cout << __PRETTY_FUNCTION__ << ": enter" << std::endl; + auto pool = std::shared_ptr(new DynamicChannelPool( + std::move(cq), std::move(initial_channels), std::move(refresh_state), + std::move(stub_factory_fn), std::move(sizing_policy))); + std::cout << __PRETTY_FUNCTION__ << ": return pool" << std::endl; + return pool; + } + + ~DynamicChannelPool() { + std::unique_lock lk(mu_); + // Eventually the channel refresh chain will terminate after this class is + // destroyed. But only after the timer futures expire on the CompletionQueue + // performing this work. We might as well cancel those timer futures now. + refresh_state_->timers().CancelAll(); + if (remove_channel_poll_timer_.valid()) remove_channel_poll_timer_.cancel(); + if (remove_channel_poll_timer_.valid()) { + pool_resize_cooldown_timer_.cancel(); + } + } + + // This is a snapshot aka dirty read as the size could immediately change + // after this function returns. + std::size_t size() const { + std::unique_lock lk(mu_); + return channels_.size(); + } + + // If the pool is not under a pool_resize_cooldown_timer_, call + // CheckPoolChannelHealth. + // Pick two random channels from channels_ and return the channel with the + // lower number of outstanding_rpcs. This is the "quick" path. + // If one or both of the random channels have been marked unhealthy after a + // refresh, continue choosing random channels to find a pair of healthy + // channels to compare. Any channels found to be unhealthy are moved from + // channels_ to draining_channels_ and call ScheduleRemoveChannel. + // If there is only one health channel in the pool, use it. + // If there are no healthy channels in channels_, create a new channel and + // use that one. Also call ScheduleAddChannel to replenish channels_. + std::shared_ptr> GetChannelRandomTwoLeastUsed() { + std::unique_lock lk(mu_); + std::cout << __PRETTY_FUNCTION__ << ": channels_size()=" << channels_.size() + << std::endl; + // for (auto const& c : channels_) { + // std::cout << __PRETTY_FUNCTION__ << ": channel=" << c.get() << + // std::endl; + // } + + if (!pool_resize_cooldown_timer_.valid()) { + CheckPoolChannelHealth(lk); + } else if (pool_resize_cooldown_timer_.is_ready()) { + (void)pool_resize_cooldown_timer_.get(); + CheckPoolChannelHealth(lk); + } + + // std::cout << __PRETTY_FUNCTION__ << ": finished + // CheckPoolChannelHealth" + // << std::endl; + std::vector< + typename std::vector>>::iterator> + iterators(channels_.size()); + + std::iota(iterators.begin(), iterators.end(), channels_.begin()); + std::shuffle(iterators.begin(), iterators.end(), rng_); + + // typename std::vector>>::iterator>::iterator + auto shuffle_iter = iterators.begin(); + // typename + // std::vector>>::iterator + auto channel_1_iter = *shuffle_iter; + std::shared_ptr> c = *channel_1_iter; + // std::cout << __PRETTY_FUNCTION__ + // << ": check channel 1=" << c.get() << std::endl; + auto channel_1_rpcs = shuffle_iter != iterators.end() + ? (*channel_1_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + ++shuffle_iter; + // typename + // std::vector>>::iterator + auto channel_2_iter = *shuffle_iter; + // We want to snapshot these outstanding_rpcs values. + // std::cout << __PRETTY_FUNCTION__ + // << ": check channel 2=" << (channel_2)->get() << std::endl; + auto channel_2_rpcs = shuffle_iter != iterators.end() + ? (*channel_2_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + // This is the ideal (and most common ) case so we try it first. + // std::cout << __PRETTY_FUNCTION__ << ": compare channel rpcs" << + // std::endl; + if (channel_1_rpcs.ok() && channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels, returning smaller" + << std::endl; + return *channel_1_rpcs < *channel_2_rpcs ? *channel_1_iter + : *channel_2_iter; + } + + // We have one or more bad channels. Spending time finding a good channel + // will be cheaper than trying to use a bad channel in the long run. + std::vector< + typename std::vector>>::iterator> + bad_channel_iters; + + while (!channel_1_rpcs.ok() && shuffle_iter != iterators.end()) { + bad_channel_iters.push_back(channel_1_iter); + ++shuffle_iter; + channel_1_iter = *shuffle_iter; + channel_1_rpcs = shuffle_iter != iterators.end() + ? (*channel_1_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + } + + while (!channel_2_rpcs.ok() && shuffle_iter != iterators.end()) { + bad_channel_iters.push_back(channel_2_iter); + ++shuffle_iter; + channel_2_iter = *shuffle_iter; + channel_2_rpcs = shuffle_iter != iterators.end() + ? (*channel_2_iter)->average_outstanding_rpcs() + : Status{StatusCode::kNotFound, ""}; + } + + EvictBadChannels(lk, bad_channel_iters); + ScheduleRemoveChannel(lk); + + if (channel_1_rpcs.ok() && channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": 2 ok channels" << std::endl; + return *channel_1_rpcs < *channel_2_rpcs ? *channel_1_iter + : *channel_2_iter; + } + if (channel_1_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_1 ok" << std::endl; + // Schedule repopulating the pool. + ScheduleAddChannel(lk); + return *channel_1_iter; + } + if (channel_2_rpcs.ok()) { + std::cout << __PRETTY_FUNCTION__ << ": ONLY channel_2 ok" << std::endl; + // Schedule repopulating the pool. + ScheduleAddChannel(lk); + return *channel_2_iter; + } + + // We have no usable channels in the entire pool; this is bad. + // Create a channel immediately to unblock application. + std::cout << __PRETTY_FUNCTION__ << ": NO USABLE CHANNELS" << std::endl; + channels_.push_back(stub_factory_fn_(next_channel_id_++, true)); + std::swap(channels_.front(), channels_.back()); + // Schedule repopulating the pool. + ScheduleAddChannel(lk); + return channels_.front(); + } + + private: + DynamicChannelPool( + CompletionQueue cq, + std::vector>> initial_wrapped_channels, + std::shared_ptr refresh_state, + StubFactoryFn stub_factory_fn, + bigtable::experimental::DynamicChannelPoolSizingPolicy sizing_policy) + : cq_(std::move(cq)), + refresh_state_(std::move(refresh_state)), + stub_factory_fn_(std::move(stub_factory_fn)), + channels_(std::move(initial_wrapped_channels)), + sizing_policy_(std::move(sizing_policy)), + next_channel_id_(static_cast(channels_.size())) { + sizing_policy_.minimum_channel_pool_size = channels_.size(); + } + + struct ChannelAddVisitor { + std::size_t pool_size; + explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {} + std::size_t operator()( + typename bigtable::experimental::DynamicChannelPoolSizingPolicy:: + DiscreteChannels const& c) { + return c.number; + } + + std::size_t operator()( + typename bigtable::experimental::DynamicChannelPoolSizingPolicy:: + PercentageOfPoolSize const& c) { + return static_cast( + std::floor(static_cast(pool_size) * c.percentage)); + } + }; + + // Determines the number of channels to add and reserves the channel ids to + // be used. Lastly, it calls CompletionQueue::RunAsync with a callback that + // executes AddChannel with the reserved ids. + void ScheduleAddChannel(std::unique_lock const&) { + std::size_t num_channels_to_add; + // If we're undersized due to bad channels, get us back to the minimum size. + if (channels_.size() < sizing_policy_.minimum_channel_pool_size) { + num_channels_to_add = + sizing_policy_.minimum_channel_pool_size - channels_.size(); + } else { + num_channels_to_add = + std::min(sizing_policy_.maximum_channel_pool_size - channels_.size(), + absl::visit(ChannelAddVisitor(channels_.size()), + sizing_policy_.channels_to_add_per_resize)); + } + std::vector new_channel_ids; + new_channel_ids.reserve(num_channels_to_add); + for (std::size_t i = 0; i < num_channels_to_add; ++i) { + new_channel_ids.push_back(next_channel_id_++); + } + + std::weak_ptr> foo = this->shared_from_this(); + cq_.RunAsync([new_channel_ids = std::move(new_channel_ids), + weak = std::move(foo)]() { + if (auto self = weak.lock()) { + self->AddChannel(new_channel_ids); + } + }); + } + + // Creates the new channels using the stub_factory_fn and only after that + // locks the mutex to add the new channels. + void AddChannel(std::vector const& new_channel_ids) { + std::vector>> new_stubs; + new_stubs.reserve(new_channel_ids.size()); + for (auto const& id : new_channel_ids) { + new_stubs.push_back(stub_factory_fn_(id, true)); + } + std::unique_lock lk(mu_); + channels_.insert(channels_.end(), + std::make_move_iterator(new_stubs.begin()), + std::make_move_iterator(new_stubs.end())); + } + + // Calls CompletionQueuer::MakeRelativeTimer using + // remove_channel_polling_interval with a callback that executes + // RemoveChannel. + void ScheduleRemoveChannel(std::unique_lock const&) { + if (remove_channel_poll_timer_.valid()) return; + std::cout << __PRETTY_FUNCTION__ << ": set remove_channel_poll_timer" + << std::endl; + std::weak_ptr> foo = this->shared_from_this(); + remove_channel_poll_timer_ = + cq_.MakeRelativeTimer(sizing_policy_.remove_channel_polling_interval) + .then( + [weak = std::move(foo)]( + future> f) { + if (f.get().ok()) { + if (auto self = weak.lock()) { + self->RemoveChannel(); + } + } + }); + } + + // Locks the mutex, reverse sorts draining_channels_, calling pop_back until + // either draining_channels_ is empty or a channel with outstanding_rpcs is + // encountered. Calls ScheduleRemoveChannel if draining_channels_ is + // non-empty. + void RemoveChannel() { + std::unique_lock lk(mu_); + std::sort(draining_channels_.begin(), draining_channels_.end(), + [](std::shared_ptr> const& a, + std::shared_ptr> b) { + auto rpcs_a = a->instant_outstanding_rpcs(); + auto rpcs_b = b->instant_outstanding_rpcs(); + if (!rpcs_a.ok()) return false; + if (!rpcs_b.ok()) return true; + return *rpcs_a > *rpcs_b; + }); + while (!draining_channels_.empty()) { + auto outstanding_rpcs = + draining_channels_.back()->instant_outstanding_rpcs(); + if (outstanding_rpcs.ok() && *outstanding_rpcs > 0) { + ScheduleRemoveChannel(lk); + return; + } + draining_channels_.pop_back(); + } + // TODO(sdhart): If iterators becomes a member variable perhaps add logic to + // call + // shrink_to_fit on iterators_ if there's a large + // difference between iterators_.capacity and channels_.size + } + + void EvictBadChannels( + std::unique_lock const&, + std::vector< + typename std::vector>>::iterator>& + bad_channel_iters) { + auto back_iter = channels_.rbegin(); + for (auto& bad_channel_iter : bad_channel_iters) { + bool swapped = false; + while (!swapped) { + auto b = (*back_iter)->instant_outstanding_rpcs(); + if (b.ok()) { + std::swap(*back_iter, *bad_channel_iter); + draining_channels_.push_back(std::move(*back_iter)); + swapped = true; + } + ++back_iter; + } + } + for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) { + channels_.pop_back(); + } + } + + void SetResizeCooldownTimer(std::unique_lock const&) { + pool_resize_cooldown_timer_ = + cq_.MakeRelativeTimer(sizing_policy_.pool_resize_cooldown_interval); + } + + // Computes the average_rpcs_pre_channel across all channels in the pool, + // excluding any channels that are awaiting removal in draining_channels_. + // The computed average is compared to the thresholds in the sizing policy + // and calls either ScheduleRemoveChannel or ScheduleAddChannel as + // appropriate. If either is called the resize_cooldown_timer is also set. + void CheckPoolChannelHealth(std::unique_lock const& lk) { + int average_rpc_per_channel = + std::accumulate(channels_.begin(), channels_.end(), 0, + [](int a, auto const& b) { + auto rpcs_b = b->average_outstanding_rpcs(); + return a + (rpcs_b.ok() ? *rpcs_b : 0); + }) / + static_cast(channels_.size()); + std::cout << __PRETTY_FUNCTION__ + << ": channels_.size()=" << channels_.size() + << "; sizing_policy_.minimum_channel_pool_size=" + << sizing_policy_.minimum_channel_pool_size << std::endl; + // TODO(sdhart): do we need to check if we're over max pool size here? + if (average_rpc_per_channel < + sizing_policy_.minimum_average_outstanding_rpcs_per_channel && + channels_.size() > sizing_policy_.minimum_channel_pool_size) { + auto random_channel = std::uniform_int_distribution( + 0, channels_.size() - 1)(rng_); + std::swap(channels_[random_channel], channels_.back()); + draining_channels_.push_back(std::move(channels_.back())); + channels_.pop_back(); + ScheduleRemoveChannel(lk); + SetResizeCooldownTimer(lk); + } + // TODO(sdhart): do we need to check if we're under min pool size here? + if (average_rpc_per_channel > + sizing_policy_.maximum_average_outstanding_rpcs_per_channel && + channels_.size() < sizing_policy_.maximum_channel_pool_size) { + // Channel/stub creation is expensive, instead of making the current RPC + // wait on this, use an existing channel right now, and schedule a channel + // to be added. + ScheduleAddChannel(lk); + SetResizeCooldownTimer(lk); + } + } + + mutable std::mutex mu_; + CompletionQueue cq_; + google::cloud::internal::DefaultPRNG rng_; + std::shared_ptr refresh_state_; + StubFactoryFn stub_factory_fn_; + std::vector>> channels_; + bigtable::experimental::DynamicChannelPoolSizingPolicy sizing_policy_; + std::vector>> draining_channels_; + future remove_channel_poll_timer_; + future> + pool_resize_cooldown_timer_; + std::uint32_t next_channel_id_; + // std::vector< + // typename + // std::vector>>::iterator> + // iterators_; +}; + +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_INTERNAL_DYNAMIC_CHANNEL_POOL_H diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc new file mode 100644 index 0000000000000..176c40ef78aaa --- /dev/null +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -0,0 +1,152 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "google/cloud/bigtable/internal/dynamic_channel_pool.h" +#include "google/cloud/bigtable/testing/mock_bigtable_stub.h" +#include "google/cloud/internal/make_status.h" +#include "google/cloud/testing_util/fake_clock.h" +#include "google/cloud/testing_util/fake_completion_queue_impl.h" +#include "google/cloud/testing_util/status_matchers.h" +#include + +namespace google { +namespace cloud { +namespace bigtable_internal { +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN +namespace { + +using ::google::cloud::bigtable::testing::MockBigtableStub; +using ::google::cloud::testing_util::FakeCompletionQueueImpl; +using ::google::cloud::testing_util::IsOkAndHolds; +using ::google::cloud::testing_util::StatusIs; +using ::testing::Eq; + +TEST(ChannelUsageTest, SetChannel) { + auto mock = std::make_shared(); + auto channel = std::make_shared>(); + EXPECT_THAT(channel->AcquireStub(), Eq(nullptr)); + channel->set_channel(mock); + EXPECT_THAT(channel->AcquireStub(), Eq(mock)); + auto mock2 = std::make_shared(); + channel->set_channel(mock2); + EXPECT_THAT(channel->AcquireStub(), Eq(mock)); +} + +TEST(ChannelUsageTest, InstantOutstandingRpcs) { + // auto clock = std::make_shared(); + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock); + + auto stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1)); + stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + stub = channel->AcquireStub(); + EXPECT_THAT(stub, Eq(mock)); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(3)); + channel->ReleaseStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(2)); + channel->ReleaseStub(); + channel->ReleaseStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0)); +} + +TEST(ChannelUsageTest, SetLastRefreshStatus) { + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock); + Status expected_status = internal::InternalError("uh oh"); + (void)channel->AcquireStub(); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(1)); + channel->set_last_refresh_status(expected_status); + EXPECT_THAT(channel->instant_outstanding_rpcs(), + StatusIs(expected_status.code())); + EXPECT_THAT(channel->average_outstanding_rpcs(), + StatusIs(expected_status.code())); +} + +TEST(ChannelUsageTest, AverageOutstandingRpcs) { + auto clock = std::make_shared(); + auto mock = std::make_shared(); + auto channel = std::make_shared>(mock, clock); + EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0)); + + auto start = std::chrono::steady_clock::now(); + clock->SetTime(start); + // measurements: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 + for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + + clock->AdvanceTime(std::chrono::seconds(20)); + // measurements: 11, 12, 13, 14, 15, 16, 17, 18, 19, 20 + for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + + clock->AdvanceTime(std::chrono::seconds(30)); + // measurements: 21, 22, 23, 24, 25, 26, 27, 28, 29, 30 + for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); + + clock->AdvanceTime(std::chrono::seconds(20)); + // FLOOR(SUM(11...30) / 20) = 20 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(20)); + + clock->AdvanceTime(std::chrono::seconds(20)); + // FLOOR(SUM(21...30) / 10) = 25 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(25)); + // measurements: 29, 28, 27, 26, 25, 24, 23, 22, 21, 20 + for (int i = 0; i < 10; ++i) channel->ReleaseStub(); + // FLOOR((SUM(21...30) + SUM(20...29)) / 20) = 25 + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(25)); + + clock->AdvanceTime(std::chrono::seconds(61)); + // All measurements have aged out. + EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(0)); +} + +TEST(ChannelUsageTest, MakeWeak) { + auto channel = std::make_shared>(); + auto weak = channel->MakeWeak(); + EXPECT_THAT(weak.lock(), Eq(channel)); +} + +TEST(DynamicChannelPoolTest, GetChannelRandomTwoLeastUsed) { + auto fake_cq_impl = std::make_shared(); + + auto refresh_state = std::make_shared( + fake_cq_impl, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + auto stub_factory_fn = + [](int, bool) -> std::shared_ptr> { + auto mock = std::make_shared(); + return std::make_shared>(mock); + }; + + bigtable::experimental::DynamicChannelPoolSizingPolicy sizing_policy; + + std::vector>> channels(10); + int id = 0; + std::generate(channels.begin(), channels.end(), + [&]() { return stub_factory_fn(id++, false); }); + + auto pool = DynamicChannelPool::Create( + CompletionQueue(fake_cq_impl), channels, refresh_state, stub_factory_fn, + sizing_policy); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); +} + +} // namespace +GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END +} // namespace bigtable_internal +} // namespace cloud +} // namespace google diff --git a/google/cloud/bigtable/options.h b/google/cloud/bigtable/options.h index 865d5c009547e..11e03700a2a31 100644 --- a/google/cloud/bigtable/options.h +++ b/google/cloud/bigtable/options.h @@ -203,6 +203,53 @@ struct QueryPlanRefreshFunctionRetryPolicyOption { using Type = std::shared_ptr; }; +/** + * Option to select between a static sized or dynamically resizing channel pool. + * The default is the static sized pool. + */ +enum class ChannelPoolType { kStatic, kDynamic }; +struct ChannelPoolTypeOption { + using Type = ChannelPoolType; +}; + +struct DynamicChannelPoolSizingPolicy { + // To avoid channel churn, the pool will not add or remove channels more + // frequently that this period. + std::chrono::milliseconds pool_resize_cooldown_interval = + std::chrono::seconds(60); + + struct DiscreteChannels { + int number; + }; + struct PercentageOfPoolSize { + double percentage; + }; + absl::variant + channels_to_add_per_resize = DiscreteChannels{1}; + + // If the average number of outstanding RPCs is below this threshold, + // the pool size will be decreased. + int minimum_average_outstanding_rpcs_per_channel = 1; + // If the average number of outstanding RPCs is above this threshold, + // the pool size will be increased. + int maximum_average_outstanding_rpcs_per_channel = 25; + + // When channels are removed from the pool, we have to wait until all + // outstanding RPCs on that channel are completed before destroying it. + std::chrono::milliseconds remove_channel_polling_interval = + std::chrono::seconds(30); + + // Limits how large the pool can grow. Default is twice the minimum_pool_size. + std::size_t maximum_channel_pool_size; + + // This is set to the value of GrpcNumChannelsOption. + std::size_t minimum_channel_pool_size; +}; + +struct DynamicChannelPoolSizingPolicyOption { + using Type = DynamicChannelPoolSizingPolicy; +}; + } // namespace experimental /// The complete list of options accepted by `bigtable::*Client` diff --git a/google/cloud/bigtable/table.cc b/google/cloud/bigtable/table.cc index 14b96ef6715d3..0bba7b934d7a5 100644 --- a/google/cloud/bigtable/table.cc +++ b/google/cloud/bigtable/table.cc @@ -222,6 +222,7 @@ RowReader Table::ReadRows(RowSet row_set, Filter filter, Options opts) { RowReader Table::ReadRows(RowSet row_set, std::int64_t rows_limit, Filter filter, Options opts) { + std::cout << __PRETTY_FUNCTION__ << std::endl; if (connection_) { OptionsSpan span(MergeOptions(std::move(opts), options_)); return connection_->ReadRows(table_name_, std::move(row_set), rows_limit, diff --git a/google/cloud/bigtable/test_proxy/cbt_test_proxy.cc b/google/cloud/bigtable/test_proxy/cbt_test_proxy.cc index 0a390eaee1528..6df93aa3c2c6f 100644 --- a/google/cloud/bigtable/test_proxy/cbt_test_proxy.cc +++ b/google/cloud/bigtable/test_proxy/cbt_test_proxy.cc @@ -93,6 +93,8 @@ grpc::Status CbtTestProxy::CreateClient( auto options = Options{} + .set( + bigtable::experimental::ChannelPoolType::kDynamic) .set(request->data_target()) .set(grpc::InsecureChannelCredentials()) .set(std::chrono::milliseconds(0)) diff --git a/google/cloud/bigtable/testing/table_integration_test.cc b/google/cloud/bigtable/testing/table_integration_test.cc index d9f8156437f7b..e41e1369e385b 100644 --- a/google/cloud/bigtable/testing/table_integration_test.cc +++ b/google/cloud/bigtable/testing/table_integration_test.cc @@ -122,7 +122,12 @@ void TableAdminTestEnvironment::TearDown() { } void TableIntegrationTest::SetUp() { - data_connection_ = MakeDataConnection(); + std::cout << __PRETTY_FUNCTION__ << std::endl; + auto options = Options{} + .set( + experimental::ChannelPoolType::kDynamic) + .set(10); + data_connection_ = MakeDataConnection(options); data_client_ = bigtable::MakeDataClient(TableTestEnvironment::project_id(), TableTestEnvironment::instance_id()); diff --git a/google/cloud/bigtable/tests/data_integration_test.cc b/google/cloud/bigtable/tests/data_integration_test.cc index 9a835f61176e3..f373fd5bf57e3 100644 --- a/google/cloud/bigtable/tests/data_integration_test.cc +++ b/google/cloud/bigtable/tests/data_integration_test.cc @@ -206,6 +206,7 @@ TEST_P(DataIntegrationTest, TableReadRowNotExistTest) { } TEST_P(DataIntegrationTest, TableReadRowsAllRows) { + std::cout << __PRETTY_FUNCTION__ << ": ENTER TEST CASE" << std::endl; auto table = GetTable(GetParam()); std::string const row_key1 = "row-key-1"; std::string const row_key2 = "row-key-2"; @@ -219,6 +220,7 @@ TEST_P(DataIntegrationTest, TableReadRowsAllRows) { CreateCells(table, created); + std::cout << __PRETTY_FUNCTION__ << ": START READING ROWS" << std::endl; // Some equivalent ways to read the three rows auto read1 = table.ReadRows(RowSet(RowRange::InfiniteRange()), Filter::PassAllFilter());