From 54736368e1f7e4377f36b5515496e5013333da30 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Sat, 14 Feb 2026 20:31:33 -0500 Subject: [PATCH 1/2] feat: add read attempt and operation timeout to ExportJob settings --- .../wrappers/veneer/DataClientVeneerApi.java | 8 +++++++- .../cloud/bigtable/beam/TemplateUtils.java | 16 ++++++++++++++++ .../bigtable/beam/sequencefiles/ExportJob.java | 12 ++++++++++++ 3 files changed, 35 insertions(+), 1 deletion(-) diff --git a/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java b/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java index bdcb462653..2119e723fe 100644 --- a/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java +++ b/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java @@ -33,6 +33,7 @@ import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.hbase.adapters.Adapters; +import com.google.cloud.bigtable.hbase.util.Logger; import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper; import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper; import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper; @@ -59,11 +60,14 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.threeten.bp.Duration; /** For internal use only - public for technical reasons. */ @InternalApi("For internal usage only") public class DataClientVeneerApi implements DataClientWrapper { + private final Logger LOG = new Logger(DataClientVeneerApi.class); + private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter(); private final BigtableDataClient delegate; @@ -203,7 +207,9 @@ private GrpcCallContext createScanCallContext() { callSettings.getOperationTimeout().get().toMillis(), TimeUnit.MILLISECONDS))); } if (callSettings.getAttemptTimeout().isPresent()) { - ctx = ctx.withTimeout(callSettings.getAttemptTimeout().get()); + Duration attemptTimeout = callSettings.getAttemptTimeout().get(); + LOG.info("effective attempt timeout for scan is %s", attemptTimeout); + ctx = ctx.withTimeout(attemptTimeout); } return ctx; diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java index f9e1e0b660..241b238d69 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java @@ -90,12 +90,28 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt .withConfiguration( CloudBigtableIO.Reader.RETRY_IDLE_TIMEOUT, String.valueOf(options.getRetryIdleTimeout())) + .withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, + String.valueOf(options.getBigtableReadRpcTimeoutMs())) + .withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, + String.valueOf(options.getBigtableReadRpcAttemptTimeoutMs())) .withScan( new ScanValueProvider( options.getBigtableStartRow(), options.getBigtableStopRow(), options.getBigtableMaxVersions(), options.getBigtableFilter())); + if (options.getBigtableReadRpcTimeoutMs() != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, + String.valueOf(options.getBigtableReadRpcTimeoutMs())); + } + if (options.getBigtableReadRpcAttemptTimeoutMs() != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, + String.valueOf(options.getBigtableReadRpcAttemptTimeoutMs())); + } return configBuilder.build(); } } diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java index 740466b05d..ba7001703b 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java @@ -179,6 +179,18 @@ public interface ExportOptions extends GcpOptions, GcsOptions { @SuppressWarnings("unused") void setRetryIdleTimeout(boolean retryIdleTimeout); + + @Description("Read RPC timeout in milliseconds.") + ValueProvider getBigtableReadRpcTimeoutMs(); + + @SuppressWarnings("unused") + void setBigtableReadRpcTimeoutMs(ValueProvider readRpcTimeoutMs); + + @Description("Read RPC attempt timeout in milliseconds.") + ValueProvider getBigtableReadRpcAttemptTimeoutMs(); + + @SuppressWarnings("unused") + void setBigtableReadRpcAttemptTimeoutMs(ValueProvider readRpcAttemptTimeoutMs); } public static void main(String[] args) { From f73300bd13c3165d04a8f21d1a6e9a91eba9e102 Mon Sep 17 00:00:00 2001 From: Mattie Fu Date: Sat, 14 Feb 2026 21:35:03 -0500 Subject: [PATCH 2/2] use nested value provider --- .../cloud/bigtable/beam/TemplateUtils.java | 31 ++++++++++++------- 1 file changed, 19 insertions(+), 12 deletions(-) diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java index 241b238d69..b1c4587f05 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java @@ -90,12 +90,6 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt .withConfiguration( CloudBigtableIO.Reader.RETRY_IDLE_TIMEOUT, String.valueOf(options.getRetryIdleTimeout())) - .withConfiguration( - BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, - String.valueOf(options.getBigtableReadRpcTimeoutMs())) - .withConfiguration( - BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, - String.valueOf(options.getBigtableReadRpcAttemptTimeoutMs())) .withScan( new ScanValueProvider( options.getBigtableStartRow(), @@ -103,14 +97,27 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt options.getBigtableMaxVersions(), options.getBigtableFilter())); if (options.getBigtableReadRpcTimeoutMs() != null) { - configBuilder.withConfiguration( - BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, - String.valueOf(options.getBigtableReadRpcTimeoutMs())); + ValueProvider.NestedValueProvider.of( + options.getBigtableReadRpcTimeoutMs(), + (Integer timeout) -> { + if (timeout != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, String.valueOf(timeout)); + } + return null; + }); } if (options.getBigtableReadRpcAttemptTimeoutMs() != null) { - configBuilder.withConfiguration( - BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, - String.valueOf(options.getBigtableReadRpcAttemptTimeoutMs())); + ValueProvider.NestedValueProvider.of( + options.getBigtableReadRpcAttemptTimeoutMs(), + (Integer timeout) -> { + if (timeout != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, + String.valueOf(timeout)); + } + return null; + }); } return configBuilder.build(); }