diff --git a/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java b/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java index bdcb462653..2119e723fe 100644 --- a/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java +++ b/bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java @@ -33,6 +33,7 @@ import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.bigtable.data.v2.models.RowMutation; import com.google.cloud.bigtable.hbase.adapters.Adapters; +import com.google.cloud.bigtable.hbase.util.Logger; import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper; import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper; import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper; @@ -59,11 +60,14 @@ import org.apache.hadoop.hbase.client.AbstractClientScanner; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.threeten.bp.Duration; /** For internal use only - public for technical reasons. */ @InternalApi("For internal usage only") public class DataClientVeneerApi implements DataClientWrapper { + private final Logger LOG = new Logger(DataClientVeneerApi.class); + private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter(); private final BigtableDataClient delegate; @@ -203,7 +207,9 @@ private GrpcCallContext createScanCallContext() { callSettings.getOperationTimeout().get().toMillis(), TimeUnit.MILLISECONDS))); } if (callSettings.getAttemptTimeout().isPresent()) { - ctx = ctx.withTimeout(callSettings.getAttemptTimeout().get()); + Duration attemptTimeout = callSettings.getAttemptTimeout().get(); + LOG.info("effective attempt timeout for scan is %s", attemptTimeout); + ctx = ctx.withTimeout(attemptTimeout); } return ctx; diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java index f9e1e0b660..b1c4587f05 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java @@ -96,6 +96,29 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt options.getBigtableStopRow(), options.getBigtableMaxVersions(), options.getBigtableFilter())); + if (options.getBigtableReadRpcTimeoutMs() != null) { + ValueProvider.NestedValueProvider.of( + options.getBigtableReadRpcTimeoutMs(), + (Integer timeout) -> { + if (timeout != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, String.valueOf(timeout)); + } + return null; + }); + } + if (options.getBigtableReadRpcAttemptTimeoutMs() != null) { + ValueProvider.NestedValueProvider.of( + options.getBigtableReadRpcAttemptTimeoutMs(), + (Integer timeout) -> { + if (timeout != null) { + configBuilder.withConfiguration( + BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY, + String.valueOf(timeout)); + } + return null; + }); + } return configBuilder.build(); } } diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java index 740466b05d..ba7001703b 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java @@ -179,6 +179,18 @@ public interface ExportOptions extends GcpOptions, GcsOptions { @SuppressWarnings("unused") void setRetryIdleTimeout(boolean retryIdleTimeout); + + @Description("Read RPC timeout in milliseconds.") + ValueProvider getBigtableReadRpcTimeoutMs(); + + @SuppressWarnings("unused") + void setBigtableReadRpcTimeoutMs(ValueProvider readRpcTimeoutMs); + + @Description("Read RPC attempt timeout in milliseconds.") + ValueProvider getBigtableReadRpcAttemptTimeoutMs(); + + @SuppressWarnings("unused") + void setBigtableReadRpcAttemptTimeoutMs(ValueProvider readRpcAttemptTimeoutMs); } public static void main(String[] args) {