diff --git a/src/index.ts b/src/index.ts index f86616dc..f92513e4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -46,3 +46,4 @@ export { }; import * as protos from '../protos/protos'; export {protos}; +export * as query from './query'; diff --git a/src/query/helper.ts b/src/query/helper.ts new file mode 100644 index 00000000..b69d76c7 --- /dev/null +++ b/src/query/helper.ts @@ -0,0 +1,162 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {BigQueryClient, BigQueryClientOptions} from '../bigquery'; +import {Query} from './query'; +import {CallOptions} from './options'; +import {protos} from '..'; +import {randomUUID} from 'crypto'; + +/** + * The QueryHelper provides a simplified interface for interacting with BigQuery + * queries. It handles the lifecycle of query jobs, from creation to result + * retrieval. + */ +export class QueryHelper { + private client: BigQueryClient; + private projectId?: string; + + /** + * @param {object} opts - The configuration object. + * @param {BigQueryClient} opts.client - A BigQueryClient instance. + * @param {string} [opts.projectId] - The project ID to use. Optional. + */ + constructor(opts: {client: BigQueryClient; projectId?: string}) { + this.client = opts.client; + this.projectId = opts.projectId; + void this.initialize(); + } + + /** + * Retrieves the project ID from the client. + * + * @returns {Promise} A promise that resolves with the project ID. + */ + async getProjectId(): Promise { + if (this.projectId) { + return this.projectId; + } + const {jobClient} = this.getBigQueryClient(); + const projectId = await jobClient.getProjectId(); + this.projectId = projectId; + return projectId; + } + + /** + * Initialize the client. + * Performs asynchronous operations (such as authentication) and prepares the client. + * This function will be called automatically when any class method is called for the + * first time, but if you need to initialize it before calling an actual method, + * feel free to call initialize() directly. + * + * You can await on this method if you want to make sure the client is initialized. + * + * @returns {Promise} A promise that resolves when auth is complete. + */ + async initialize(): Promise { + if (this.projectId) { + return; + } + const {jobClient} = this.getBigQueryClient(); + await jobClient.initialize(); + const projectId = await this.getProjectId(); + this.projectId = projectId; + } + + /** + * Starts a query job using the `jobs.query` API. + * + * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request + * The request object that will be sent. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} A promise that resolves with a Query instance. + */ + async startQuery( + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ): Promise { + if (!request.projectId) { + request.projectId = this.projectId; + } + if (!request.queryRequest) { + throw new Error('queryRequest is required'); + } + if (!request.queryRequest.requestId) { + request.queryRequest.requestId = randomUUID(); + } + return Query.fromQueryRequest_(this, request, options); + } + + /** + * Starts a new asynchronous query job using the `jobs.insert` API. + * + * @param {protos.google.cloud.bigquery.v2.IJob} job + * A job resource to insert. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} A promise that resolves with a Query instance. + */ + async startQueryJob( + job: protos.google.cloud.bigquery.v2.IJob, + options?: CallOptions, + ): Promise { + const config = job.configuration; + if (!config) { + throw new Error('job is missing configuration'); + } + const queryConfig = config.query; + if (!queryConfig) { + throw new Error('job is not a query'); + } + job.jobReference ||= {}; + if (!job.jobReference.jobId) { + job.jobReference.jobId = randomUUID(); + } + + return Query.fromJobRequest_(this, job, this.projectId, options); + } + + /** + * Attaches to an existing query job. + * + * @param {protos.google.cloud.bigquery.v2.IJobReference} jobReference + * A job reference object. + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} A promise that resolves with a Query instance. + */ + async attachJob( + jobReference: protos.google.cloud.bigquery.v2.IJobReference, + options?: CallOptions, + ): Promise { + if (!jobReference.jobId) { + throw new Error('attachJob requires a non-empty JobReference.JobId'); + } + if (!jobReference.projectId) { + jobReference.projectId = this.projectId; + } + + return Query.fromJobRef_(this, jobReference, options); + } + + /** + * Returns the BigQueryClient instance. + * + * @returns {BigQueryClient} The BigQueryClient instance. + */ + getBigQueryClient(): BigQueryClient { + return this.client; + } +} diff --git a/src/query/index.ts b/src/query/index.ts new file mode 100644 index 00000000..2c644a2a --- /dev/null +++ b/src/query/index.ts @@ -0,0 +1,18 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +export {QueryHelper} from './helper'; +export {Query} from './query'; +export {Row} from './row'; +export {RowIterator} from './iterator'; diff --git a/src/query/iterator.ts b/src/query/iterator.ts new file mode 100644 index 00000000..b608cffe --- /dev/null +++ b/src/query/iterator.ts @@ -0,0 +1,41 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Query} from './query'; +import {Row} from './row'; + +/** + * The RowIterator provides a way to iterate over the rows of a query result. + * It can be used with `for await...of` loops. + */ +export class RowIterator { + private query: Query; + + /** + * @param {Query} query - The Query instance to iterate over. + * @internal + */ + constructor(query: Query) { + this.query = query; + } + + /** + * Asynchronously iterates over the rows in the query result. + * + * @yields {Row} A row from the query result. + */ + async *[Symbol.asyncIterator](): AsyncGenerator { + // TODO(#1541): implement iterator + } +} diff --git a/src/query/options.ts b/src/query/options.ts new file mode 100644 index 00000000..1d0c56a7 --- /dev/null +++ b/src/query/options.ts @@ -0,0 +1,25 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {CallOptions as GaxCallOptions} from 'google-gax'; + +/** + * Extended call options that include an AbortSignal. + */ +export interface CallOptions extends GaxCallOptions { + /** + * An AbortSignal to cancel the operation. + */ + signal?: AbortSignal; +} diff --git a/src/query/query.ts b/src/query/query.ts new file mode 100644 index 00000000..ad819cf1 --- /dev/null +++ b/src/query/query.ts @@ -0,0 +1,284 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {QueryHelper} from './helper'; +import {protos} from '../'; +import {RowIterator} from './iterator'; +import {CallOptions} from './options'; +import {setInterval} from 'timers/promises'; +import {EventEmitter} from 'stream'; + +/** + * The Query object provides a handle to a BigQuery query job. It allows you to + * wait for the job to complete, retrieve results, and access job metadata. + */ +export class Query { + private helper: QueryHelper; + private jobComplete: boolean; + + private _queryId: string | null; + private projectId?: string; + private jobId: string | null; + private location?: string | null; + + private emitter: EventEmitter; + + private constructor(helper: QueryHelper, projectId?: string) { + this.helper = helper; + this.jobComplete = false; + this.emitter = new EventEmitter(); + + this.projectId = projectId; + this._queryId = null; + this.jobId = null; + } + + /** + * Creates a Query instance from a query request. + * + * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IPostQueryRequest} request - The query request. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. + */ + static fromQueryRequest_( + helper: QueryHelper, + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ): Promise { + const q = new Query(helper, request.projectId ?? undefined); + void q.runQuery(request, options); + + return Promise.resolve(q); + } + + /** + * Creates a Query instance from a job request. + * + * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IJob} job - The job object. + * @param {string} [projectId] - The project ID. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. + */ + static fromJobRequest_( + helper: QueryHelper, + job: protos.google.cloud.bigquery.v2.IJob, + projectId?: string, + options?: CallOptions, + ): Promise { + const q = new Query(helper, projectId); + void q.insertQuery(job, options); + + return Promise.resolve(q); + } + + /** + * Creates a Query instance from a job reference. + * + * @internal + * + * @param {QueryHelper} helper - The QueryHelper instance. + * @param {protos.google.cloud.bigquery.v2.IJobReference} jobReference - The job reference. + * @param {CallOptions} [options] - Call options. + * @returns {Promise} A promise that resolves with a Query instance. + */ + static fromJobRef_( + helper: QueryHelper, + jobReference: protos.google.cloud.bigquery.v2.IJobReference, + options?: CallOptions, + ): Promise { + const q = new Query(helper, jobReference.projectId || undefined); + + q.consumeQueryResponse({jobReference}); + void q.waitQueryBackground(options); + return Promise.resolve(q); + } + + /** + * Returns a job reference for the query job. + * This will be null until the query job has been successfully submitted. + * + * @returns {protos.google.cloud.bigquery.v2.IJobReference | null} The job reference, or null if not available. + */ + get jobReference(): protos.google.cloud.bigquery.v2.IJobReference | null { + if (!this.jobId) { + return null; + } + return { + jobId: this.jobId, + projectId: this.projectId, + location: {value: this.location}, + }; + } + + /** + * Returns the schema of the query results. + * This will be null until the query has completed and the schema is available. + * + * @returns {protos.google.cloud.bigquery.v2.ITableSchema | null} The schema, or null if not available. + */ + get schema(): protos.google.cloud.bigquery.v2.ITableSchema | null { + return null; + } + + /** + * Whether the query job is complete. + * + * @returns {boolean} True if the job is complete, false otherwise. + */ + get complete(): boolean { + return this.jobComplete; + } + + /** + * Returns the auto-generated ID for the query. + * This is only populated for stateless queries (i.e. those started via jobs.query) + * after the query has been submitted. + * + * @returns {string | null} The query ID, or null if not available. + */ + get queryId(): string | null { + return this._queryId; + } + + private async runQuery( + request: protos.google.cloud.bigquery.v2.IPostQueryRequest, + options?: CallOptions, + ) { + const {jobClient} = this.helper.getBigQueryClient(); + try { + const [response] = await jobClient.query(request, options); + this.location = response.location; + if (response.queryId) { + this._queryId = response.queryId; + } + this.consumeQueryResponse(response); + void this.waitQueryBackground(options); + } catch (err) { + this.markDone(err as Error); + } + } + + private async insertQuery( + job: protos.google.cloud.bigquery.v2.IJob, + options?: CallOptions, + ) { + const {jobClient} = this.helper.getBigQueryClient(); + try { + const [response] = await jobClient.insertJob( + { + job: job, + projectId: this.projectId, + }, + options, + ); + this.emitter.emit('query:created', response); + this.consumeQueryResponse(response); + void this.waitQueryBackground(options); + } catch (err) { + this.markDone(err as Error); + } + } + + private async waitQueryBackground(options?: CallOptions) { + if (this.complete) { + this.markDone(); + return; + } + const signal = options?.signal; + let waitTime = 1; + for await (const _ of setInterval(waitTime, undefined, {signal})) { + await this.checkStatus(options); + if (this.complete) { + this.markDone(); + break; + } + waitTime = 1000; + } + } + + private markDone(err?: Error) { + this.emitter.emit('done', err); + } + + /** + * Waits for the query to complete. + * + * @param {CallOptions} [options] + * Call options. See {@link https://googleapis.dev/nodejs/google-gax/latest/interfaces/CallOptions.html|CallOptions} for more details. + * @returns {Promise} + */ + async wait(options?: CallOptions): Promise { + if (this.complete) { + return; + } + const signal = options?.signal; + return new Promise((resolve, reject) => { + const callback = (err: Error) => { + this.emitter.removeListener('done', callback); + if (err) { + reject(err); + return; + } + resolve(); + }; + this.emitter.addListener('done', callback); + signal?.addEventListener('abort', () => { + reject(new Error('The operation was aborted.')); + this.emitter.removeListener('done', callback); + }); + }); + } + + /** + * Returns a RowIterator for the query results. + * + * @returns {Promise} A promise that resolves with a RowIterator. + */ + async read(): Promise { + const it = new RowIterator(this); + return it; + } + + private consumeQueryResponse( + response: protos.google.cloud.bigquery.v2.IGetQueryResultsResponse, + ) { + if (response.jobReference) { + this.projectId = response.jobReference.projectId!; + this.jobId = response.jobReference.jobId!; + this.location = response.jobReference.location?.value || ''; + } + this.jobComplete = response.jobComplete?.value ?? false; + } + + private async checkStatus(options?: CallOptions): Promise { + const {jobClient} = this.helper.getBigQueryClient(); + const req: protos.google.cloud.bigquery.v2.IGetQueryResultsRequest = { + projectId: this.projectId, + jobId: this.jobId, + location: this.location, + maxResults: {value: 0}, + formatOptions: { + useInt64Timestamp: true, + }, + }; + const [response] = await jobClient.getQueryResults(req, options); + this.consumeQueryResponse(response); + } +} diff --git a/src/query/row.ts b/src/query/row.ts new file mode 100644 index 00000000..6a3a67cb --- /dev/null +++ b/src/query/row.ts @@ -0,0 +1,18 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Represents a row in a BigQuery query result. + */ +export class Row {} diff --git a/system-test/query/query.ts b/system-test/query/query.ts new file mode 100644 index 00000000..6e75af53 --- /dev/null +++ b/system-test/query/query.ts @@ -0,0 +1,160 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import * as sinon from 'sinon'; +import {it} from 'mocha'; +import {QueryHelper} from '../../src/query'; +import {BigQueryClient, protos} from '../../src'; + +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + +// the GCLOUD_PROJECT environment variable is set as part of test harness setup +const projectId = process.env.GCLOUD_PROJECT; +if (!projectId) { + throw new Error('GCLOUD_PROJECT environment variable is not set'); +} + +const transports = ['grpc', 'rest']; + +// run tests with the gRPC client and the REST fallback client +transports.forEach(transport => { + let client: BigQueryClient; + if (transport === 'grpc') { + client = new BigQueryClient({}); + } else { + client = new BigQueryClient({fallback: true}); + } + const helper = new QueryHelper({client, projectId}); + + describe('Run Query', () => { + describe(transport, () => { + let getQueryResultsSpy: sinon.SinonSpy; + + beforeEach(async () => { + await helper.initialize(); + const {jobClient} = helper.getBigQueryClient(); + + getQueryResultsSpy = sinon.spy(jobClient, 'getQueryResults'); + }); + + afterEach(() => { + getQueryResultsSpy.restore(); + }); + + it('should run a stateless query', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_OPTIONAL, + }, + projectId, + }; + + const q = await helper.startQuery(req); + await q.wait(); + + assert(q.complete); + + // TODO(#1541): read rows and assert row count + }); + + it('should stop waiting for query to complete', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + useQueryCache: {value: false}, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_REQUIRED, + timeoutMs: {value: 500}, + }, + projectId, + }; + + const q = await helper.startQuery(req); + const abortCtrl = new AbortController(); + q.wait({ + signal: abortCtrl.signal, + }).catch(err => { + assert(err, 'aborted'); + }); + await sleep(2000); + abortCtrl.abort(); + + assert(getQueryResultsSpy.callCount >= 1); + assert(getQueryResultsSpy.callCount <= 2); + }).timeout(5000); + + it('should allow attach with job reference to a query handler', async () => { + const req: protos.google.cloud.bigquery.v2.IPostQueryRequest = { + queryRequest: { + query: 'SELECT CURRENT_TIMESTAMP() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + formatOptions: { + useInt64Timestamp: true, + }, + jobCreationMode: + protos.google.cloud.bigquery.v2.QueryRequest.JobCreationMode + .JOB_CREATION_REQUIRED, + }, + projectId, + }; + + let q = await helper.startQuery(req); + await q.wait(); + + const jobRef = q.jobReference; + if (!jobRef) { + throw new Error('jobRef is null'); + } + q = await helper.attachJob(jobRef); + await q.wait(); + + assert(q.complete); + + // TODO(#1541): read rows and assert row count + }); + + it('should insert a query job', async () => { + const q = await helper.startQueryJob({ + configuration: { + query: { + query: 'SELECT CURRENT_DATETIME() as foo, SESSION_USER() as bar', + useLegacySql: {value: false}, + }, + }, + }); + await q.wait(); + + assert(q.complete); + + // TODO(#1541): read rows and assert row count + }); + }); + }); +}); diff --git a/tsconfig.json b/tsconfig.json index 7ecdf9fc..ba1dec36 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,11 +16,14 @@ "test/*.ts", "test/**/*.ts", "system-test/*.ts", + "system-test/**/*.ts", "src/**/*.json", "protos/protos.json", "benchmark/*.ts", "scripts/*.ts", "samples/**/*.json" - + ], + "exclude": [ + "system-test/fixtures/sample/**/*.ts" ] } \ No newline at end of file