Skip to content

Commit 9c0df54

Browse files
authored
feat: new queryWithTimeout method for customer-side wait (#3995)
* feat: new queryNoWait method * Rename method to queryWithTimeout * rename to queryWithTimeout() * lint * lint * add tests * Update clirr ignore
1 parent 1d8977d commit 9c0df54

File tree

5 files changed

+137
-46
lines changed

5 files changed

+137
-46
lines changed

google-cloud-bigquery/clirr-ignored-differences.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,11 @@
117117
<className>com/google/cloud/bigquery/TableInfo*</className>
118118
<method>*ResourceTags(*)</method>
119119
</difference>
120+
<difference>
121+
<differenceType>7012</differenceType>
122+
<className>com/google/cloud/bigquery/BigQuery</className>
123+
<method>java.lang.Object queryWithTimeout(com.google.cloud.bigquery.QueryJobConfiguration, com.google.cloud.bigquery.JobId, java.lang.Long, com.google.cloud.bigquery.BigQuery$JobOption[])</method>
124+
</difference>
120125
<difference>
121126
<differenceType>7012</differenceType>
122127
<className>com/google/cloud/bigquery/Connection</className>

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQuery.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1609,6 +1609,28 @@ TableResult query(QueryJobConfiguration configuration, JobOption... options)
16091609
TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
16101610
throws InterruptedException, JobException;
16111611

1612+
/**
1613+
* Starts the query associated with the request, using the given JobId. It returns either
1614+
* TableResult for quick queries or Job object for long-running queries.
1615+
*
1616+
* <p>If the location of the job is not "US" or "EU", the {@code jobId} must specify the job
1617+
* location.
1618+
*
1619+
* <p>This method cannot be used in conjunction with {@link QueryJobConfiguration#dryRun()}
1620+
* queries. Since dry-run queries are not actually executed, there's no way to retrieve results.
1621+
*
1622+
* <p>See {@link #query(QueryJobConfiguration, JobOption...)} for examples on populating a {@link
1623+
* QueryJobConfiguration}.
1624+
*
1625+
* @throws BigQueryException upon failure
1626+
* @throws InterruptedException if the current thread gets interrupted while waiting for the query
1627+
* to complete
1628+
* @throws JobException if the job completes unsuccessfully
1629+
*/
1630+
Object queryWithTimeout(
1631+
QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options)
1632+
throws InterruptedException, JobException;
1633+
16121634
/**
16131635
* Returns results of the query associated with the provided job.
16141636
*

google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryImpl.java

Lines changed: 31 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,48 +1925,10 @@ public Boolean call() throws IOException {
19251925
@Override
19261926
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
19271927
throws InterruptedException, JobException {
1928-
Job.checkNotDryRun(configuration, "query");
1929-
1930-
configuration =
1931-
configuration.toBuilder()
1932-
.setJobCreationMode(getOptions().getDefaultJobCreationMode())
1933-
.build();
1934-
1935-
Span querySpan = null;
1936-
if (getOptions().isOpenTelemetryTracingEnabled()
1937-
&& getOptions().getOpenTelemetryTracer() != null) {
1938-
querySpan =
1939-
getOptions()
1940-
.getOpenTelemetryTracer()
1941-
.spanBuilder("com.google.cloud.bigquery.BigQuery.query")
1942-
.setAllAttributes(otelAttributesFromOptions(options))
1943-
.startSpan();
1944-
}
1945-
try (Scope queryScope = querySpan != null ? querySpan.makeCurrent() : null) {
1946-
// If all parameters passed in configuration are supported by the query() method on the
1947-
// backend,
1948-
// put on fast path
1949-
QueryRequestInfo requestInfo =
1950-
new QueryRequestInfo(configuration, getOptions().getUseInt64Timestamps());
1951-
if (requestInfo.isFastQuerySupported(null)) {
1952-
String projectId = getOptions().getProjectId();
1953-
QueryRequest content = requestInfo.toPb();
1954-
if (getOptions().getLocation() != null) {
1955-
content.setLocation(getOptions().getLocation());
1956-
}
1957-
return queryRpc(projectId, content, options);
1958-
}
1959-
// Otherwise, fall back to the existing create query job logic
1960-
return create(JobInfo.of(configuration), options).getQueryResults();
1961-
} finally {
1962-
if (querySpan != null) {
1963-
querySpan.end();
1964-
}
1965-
}
1928+
return query(configuration, null, options);
19661929
}
19671930

1968-
private TableResult queryRpc(
1969-
final String projectId, final QueryRequest content, JobOption... options)
1931+
private Object queryRpc(final String projectId, final QueryRequest content, JobOption... options)
19701932
throws InterruptedException {
19711933
com.google.api.services.bigquery.model.QueryResponse results;
19721934
Span queryRpc = null;
@@ -2030,7 +1992,7 @@ public com.google.api.services.bigquery.model.QueryResponse call()
20301992
// here, but this is left as future work.
20311993
JobId jobId = JobId.fromPb(results.getJobReference());
20321994
Job job = getJob(jobId, options);
2033-
return job.getQueryResults();
1995+
return job;
20341996
}
20351997

20361998
if (results.getPageToken() != null) {
@@ -2070,16 +2032,35 @@ public com.google.api.services.bigquery.model.QueryResponse call()
20702032
@Override
20712033
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
20722034
throws InterruptedException, JobException {
2035+
Object result = queryWithTimeout(configuration, jobId, null, options);
2036+
if (result instanceof Job) {
2037+
return ((Job) result).getQueryResults();
2038+
}
2039+
return (TableResult) result;
2040+
}
2041+
2042+
@Override
2043+
public Object queryWithTimeout(
2044+
QueryJobConfiguration configuration, JobId jobId, Long timeoutMs, JobOption... options)
2045+
throws InterruptedException, JobException {
20732046
Job.checkNotDryRun(configuration, "query");
20742047

2048+
// If JobCreationMode is not explicitly set, update it with default value;
2049+
if (configuration.getJobCreationMode() == null) {
2050+
configuration =
2051+
configuration.toBuilder()
2052+
.setJobCreationMode(getOptions().getDefaultJobCreationMode())
2053+
.build();
2054+
}
2055+
20752056
Span querySpan = null;
20762057
if (getOptions().isOpenTelemetryTracingEnabled()
20772058
&& getOptions().getOpenTelemetryTracer() != null) {
20782059
querySpan =
20792060
getOptions()
20802061
.getOpenTelemetryTracer()
2081-
.spanBuilder("com.google.cloud.bigquery.BigQuery.query")
2082-
.setAllAttributes(jobId.getOtelAttributes())
2062+
.spanBuilder("com.google.cloud.bigquery.BigQuery.queryWithTimeout")
2063+
.setAllAttributes(jobId != null ? jobId.getOtelAttributes() : null)
20832064
.setAllAttributes(otelAttributesFromOptions(options))
20842065
.startSpan();
20852066
}
@@ -2095,18 +2076,23 @@ && getOptions().getOpenTelemetryTracer() != null) {
20952076
// fail with "Access denied" if the project do not have enough permissions to run the job.
20962077

20972078
String projectId =
2098-
jobId.getProject() != null ? jobId.getProject() : getOptions().getProjectId();
2079+
jobId != null && jobId.getProject() != null
2080+
? jobId.getProject()
2081+
: getOptions().getProjectId();
20992082
QueryRequest content = requestInfo.toPb();
21002083
// Be careful when setting the location, if a location is specified in the BigQueryOption or
21012084
// JobId the job created by the query method will be in that location, even if the table to
21022085
// be
21032086
// queried is in a different location. This may cause the query to fail with
21042087
// "BigQueryException: Not found"
2105-
if (jobId.getLocation() != null) {
2088+
if (jobId != null && jobId.getLocation() != null) {
21062089
content.setLocation(jobId.getLocation());
21072090
} else if (getOptions().getLocation() != null) {
21082091
content.setLocation(getOptions().getLocation());
21092092
}
2093+
if (timeoutMs != null) {
2094+
content.setTimeoutMs(timeoutMs);
2095+
}
21102096

21112097
return queryRpc(projectId, content, options);
21122098
}

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/BigQueryImplTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2610,6 +2610,29 @@ PROJECT, JOB, null, optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
26102610
PROJECT, DATASET, TABLE, Collections.<BigQueryRpc.Option, Object>emptyMap());
26112611
}
26122612

2613+
@Test
2614+
public void testQueryWithTimeoutSetsTimeout() throws InterruptedException, IOException {
2615+
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
2616+
new com.google.api.services.bigquery.model.QueryResponse()
2617+
.setCacheHit(false)
2618+
.setJobComplete(true)
2619+
.setKind("bigquery#queryResponse")
2620+
.setPageToken(null)
2621+
.setRows(ImmutableList.of(TABLE_ROW))
2622+
.setSchema(TABLE_SCHEMA.toPb())
2623+
.setTotalBytesProcessed(42L)
2624+
.setTotalRows(BigInteger.valueOf(1L));
2625+
2626+
when(bigqueryRpcMock.queryRpcSkipExceptionTranslation(eq(PROJECT), requestPbCapture.capture()))
2627+
.thenReturn(queryResponsePb);
2628+
2629+
bigquery = options.getService();
2630+
Object result = bigquery.queryWithTimeout(QUERY_JOB_CONFIGURATION_FOR_QUERY, null, 1000L);
2631+
assertTrue(result instanceof TableResult);
2632+
QueryRequest requestPb = requestPbCapture.getValue();
2633+
assertEquals((Long) 1000L, requestPb.getTimeoutMs());
2634+
}
2635+
26132636
@Test
26142637
public void testGetQueryResults() throws IOException {
26152638
JobId queryJob = JobId.of(JOB);

google-cloud-bigquery/src/test/java/com/google/cloud/bigquery/it/ITBigQueryTest.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7215,6 +7215,7 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException {
72157215
// 2. For queries that fails the requirements to be stateless, then jobId is populated and
72167216
// queryId is not.
72177217
// 3. For explicitly created jobs, then jobId is populated and queryId is not populated.
7218+
// 4. If QueryJobConfiguration explicitly sets Job Creation Mode to Required.
72187219

72197220
// Test scenario 1.
72207221
// Create local BigQuery for test scenario 1 to not contaminate global test parameters.
@@ -7241,6 +7242,16 @@ public void testTableResultJobIdAndQueryId() throws InterruptedException {
72417242
result = job.getQueryResults();
72427243
assertNotNull(result.getJobId());
72437244
assertNull(result.getQueryId());
7245+
7246+
// Test scenario 4.
7247+
configWithJob =
7248+
QueryJobConfiguration.newBuilder(query)
7249+
.setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED)
7250+
.build();
7251+
result = bigQuery.query(configWithJob);
7252+
result = job.getQueryResults();
7253+
assertNotNull(result.getJobId());
7254+
assertNull(result.getQueryId());
72447255
}
72457256

72467257
@Test
@@ -7294,6 +7305,50 @@ public void testStatelessQueriesWithLocation() throws Exception {
72947305
}
72957306
}
72967307

7308+
@Test
7309+
public void testQueryWithTimeout() throws InterruptedException {
7310+
// Validate that queryWithTimeout returns either TableResult or Job object
7311+
7312+
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
7313+
BigQuery bigQuery = bigqueryHelper.getOptions().getService();
7314+
bigQuery.getOptions().setDefaultJobCreationMode(JobCreationMode.JOB_CREATION_OPTIONAL);
7315+
String largeQuery =
7316+
"SELECT * FROM UNNEST(GENERATE_ARRAY(1, 20000)) CROSS JOIN UNNEST(GENERATE_ARRAY(1, 20000))";
7317+
String query = "SELECT 1 as one";
7318+
// Test scenario 1.
7319+
// Stateless query returns TableResult
7320+
QueryJobConfiguration config = QueryJobConfiguration.newBuilder(query).build();
7321+
Object result = bigQuery.queryWithTimeout(config, null, null);
7322+
assertTrue(result instanceof TableResult);
7323+
assertNull(((TableResult) result).getJobId());
7324+
assertNotNull(((TableResult) result).getQueryId());
7325+
7326+
// Stateful query returns Job
7327+
// Test scenario 2 to ensure job is created if JobCreationMode is set, but for a small query
7328+
// it still returns results.
7329+
config =
7330+
QueryJobConfiguration.newBuilder(query)
7331+
.setJobCreationMode(JobCreationMode.JOB_CREATION_REQUIRED)
7332+
.build();
7333+
result = bigQuery.queryWithTimeout(config, null, null);
7334+
assertTrue(result instanceof TableResult);
7335+
assertNotNull(((TableResult) result).getJobId());
7336+
assertNull(((TableResult) result).getQueryId());
7337+
7338+
// Stateful query returns Job
7339+
// Test scenario 3 to ensure job is created if Query is long running.
7340+
// Explicitly disable cache to ensure it is long-running query;
7341+
config = QueryJobConfiguration.newBuilder(largeQuery).setUseQueryCache(false).build();
7342+
long millis = System.currentTimeMillis();
7343+
result = bigQuery.queryWithTimeout(config, null, 1000L);
7344+
millis = System.currentTimeMillis() - millis;
7345+
assertTrue(result instanceof Job);
7346+
// Cancel the job as we don't need results.
7347+
((Job) result).cancel();
7348+
// Allow 2 seconds of timeout value to account for random delays
7349+
assertTrue(millis < 1_000_000 * 2);
7350+
}
7351+
72977352
@Test
72987353
public void testUniverseDomainWithInvalidUniverseDomain() {
72997354
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
@@ -7743,7 +7798,7 @@ public void testOpenTelemetryTracingQuery() throws InterruptedException {
77437798
assertNotNull(
77447799
OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRetryHelper.runWithRetries"));
77457800
assertNotNull(OTEL_ATTRIBUTES.get("com.google.cloud.bigquery.BigQueryRpc.queryRpc"));
7746-
assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.query"));
7801+
assertTrue(OTEL_ATTRIBUTES.containsKey("com.google.cloud.bigquery.BigQuery.queryWithTimeout"));
77477802

77487803
// Query job
77497804
String query = "SELECT TimestampField, StringField, BooleanField FROM " + TABLE_ID.getTable();

0 commit comments

Comments
 (0)