diff --git a/package.json b/package.json index c0c086a5..6b68129f 100644 --- a/package.json +++ b/package.json @@ -27,18 +27,21 @@ "precompile": "gts clean" }, "dependencies": { + "@google-cloud/paginator": "^5.0.0", + "apache-arrow": "^14.0.2", + "core-js": "^3.37.1", "extend": "^3.0.2", - "google-gax": "^4.3.1", - "google-auth-library": "^9.6.3" + "google-auth-library": "^9.6.3", + "google-gax": "^4.3.1" }, "peerDependencies": { "protobufjs": "^7.2.4" }, "devDependencies": { - "@google-cloud/bigquery": "^7.0.0", + "@google-cloud/bigquery": "^7.5.2", "@types/extend": "^3.0.4", "@types/mocha": "^9.0.0", - "@types/node": "^20.0.0", + "@types/node": "^20.16.5", "@types/sinon": "^17.0.0", "@types/uuid": "^9.0.1", "c8": "^9.0.0", @@ -55,7 +58,7 @@ "nise": "6.0.0", "path-to-regexp": "6.3.0", "ts-loader": "^9.0.0", - "typescript": "^5.1.6", + "typescript": "^5.5.3", "uuid": "^9.0.0", "webpack": "^5.0.0", "webpack-cli": "^5.0.0" diff --git a/src/index.ts b/src/index.ts index 7f6d42fd..c095dfe9 100644 --- a/src/index.ts +++ b/src/index.ts @@ -20,6 +20,7 @@ import * as v1 from './v1'; import * as v1beta1 from './v1beta1'; import * as v1alpha from './v1alpha'; import * as managedwriter from './managedwriter'; +import * as reader from './reader'; const BigQueryReadClient = v1.BigQueryReadClient; type BigQueryReadClient = v1.BigQueryReadClient; const BigQueryWriteClient = v1.BigQueryWriteClient; @@ -28,6 +29,8 @@ const BigQueryStorageClient = v1beta1.BigQueryStorageClient; type BigQueryStorageClient = v1beta1.BigQueryStorageClient; const WriterClient = managedwriter.WriterClient; type WriterClient = managedwriter.WriterClient; +const ReadClient = reader.ReadClient; +type ReadClient = reader.ReadClient; export { v1, BigQueryReadClient, @@ -37,6 +40,8 @@ export { BigQueryWriteClient, managedwriter, WriterClient, + reader, + ReadClient, }; // For compatibility with JavaScript libraries we need to provide this default export: // tslint:disable-next-line no-default-export @@ -46,6 +51,8 @@ export default { BigQueryWriteClient, managedwriter, WriterClient, + reader, + ReadClient, }; import * as protos from '../protos/protos'; export {protos}; diff --git a/src/reader/arrow_reader.ts b/src/reader/arrow_reader.ts new file mode 100644 index 00000000..01439769 --- /dev/null +++ b/src/reader/arrow_reader.ts @@ -0,0 +1,101 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {RecordBatch} from 'apache-arrow'; + +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; +import {logger} from '../util/logger'; +import { + ArrowRawTransform, + ArrowRecordBatchTransform, + ArrowRecordReaderTransform, +} from './arrow_transform'; +import {ReadSession, GetStreamOptions} from './read_session'; +import {ArrowFormat} from './data_format'; + +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; + +/** + * A BigQuery Storage API Reader that can be used to read data + * from BigQuery Tables using the Storage API in Arrow format. + * + * @class + * @memberof reader + */ +export class ArrowTableReader { + private _tableRef: TableReference; + private _session: ReadSession; + + /** + * Creates a new ArrowTableReader instance. Usually created via + * ReadClient.createArrowTableReader(). + * + * @param {ReadClient} readClient - Storage Read Client. + * @param {TableReference} table - target table to read data from. + */ + constructor(readClient: ReadClient, tableRef: TableReference) { + this._tableRef = tableRef; + this._session = new ReadSession(readClient, tableRef, ArrowFormat); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'arrow_table_reader', + `[table: ${this._tableRef.tableId}]`, + msg, + ...otherArgs + ); + } + + getSessionInfo(): ReadSessionInfo | undefined | null { + return this._session.getSessionInfo(); + } + + /** + * Get a byte stream of Arrow Record Batch. + * + * @param {GetStreamOptions} options + */ + async getStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getStream', options); + const stream = await this._session.getStream(options); + return stream.pipe(new ArrowRawTransform()) as ResourceStream; + } + + /** + * Get a stream of Arrow RecordBatch objects. + * + * @param {GetStreamOptions} options + */ + async getRecordBatchStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getRecordBatchStream', options); + const stream = await this._session.getStream(options); + const info = this._session.getSessionInfo(); + return stream + .pipe(new ArrowRawTransform()) + .pipe(new ArrowRecordReaderTransform(info!)) + .pipe(new ArrowRecordBatchTransform()) as ResourceStream; + } + + close() { + this._session.close(); + } +} diff --git a/src/reader/arrow_transform.ts b/src/reader/arrow_transform.ts new file mode 100644 index 00000000..ca9e8b44 --- /dev/null +++ b/src/reader/arrow_transform.ts @@ -0,0 +1,176 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {Transform, TransformCallback} from 'stream'; +import { + RecordBatchReader, + RecordBatch, + RecordBatchStreamReader, + Vector, +} from 'apache-arrow'; +import * as protos from '../../protos/protos'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; + +interface TableCell { + v?: any; +} +interface TableRow { + f?: Array; +} + +/** + * ArrowRawTransform implements a node stream Transform that reads + * ReadRowsResponse from BigQuery Storage Read API and convert + * a raw Arrow Record Batch. + */ +export class ArrowRawTransform extends Transform { + constructor() { + super({ + readableObjectMode: false, + writableObjectMode: true, + }); + } + + _transform( + response: ReadRowsResponse, + _: BufferEncoding, + callback: TransformCallback + ): void { + if ( + !( + response.arrowRecordBatch && + response.arrowRecordBatch.serializedRecordBatch + ) + ) { + callback(null); + return; + } + callback(null, response.arrowRecordBatch?.serializedRecordBatch); + } +} + +/** + * ArrowRecordReaderTransform implements a node stream Transform that reads + * a byte stream of raw Arrow Record Batch and convert to a stream of Arrow + * RecordBatchStreamReader. + */ +export class ArrowRecordReaderTransform extends Transform { + private session: ReadSession; + + constructor(session: ReadSession) { + super({ + objectMode: true, + }); + this.session = session; + } + + _transform( + serializedRecordBatch: Uint8Array, + _: BufferEncoding, + callback: TransformCallback + ): void { + const buf = Buffer.concat([ + this.session.arrowSchema?.serializedSchema as Uint8Array, + serializedRecordBatch, + ]); + const reader = RecordBatchReader.from(buf); + callback(null, reader); + } +} + +/** + * ArrowRecordBatchTransform implements a node stream Transform that reads + * a RecordBatchStreamReader and convert a stream of Arrow RecordBatch. + */ +export class ArrowRecordBatchTransform extends Transform { + constructor() { + super({ + objectMode: true, + }); + } + + _transform( + reader: RecordBatchStreamReader, + _: BufferEncoding, + callback: TransformCallback + ): void { + const batches = reader.readAll(); + for (const row of batches) { + this.push(row); + } + callback(null); + } +} + +/** + * ArrowRecordBatchTableRowTransform implements a node stream Transform that reads + * an Arrow RecordBatch and convert a stream of BigQuery TableRow. + */ +export class ArrowRecordBatchTableRowTransform extends Transform { + constructor() { + super({ + objectMode: true, + }); + } + + _transform( + batch: RecordBatch, + _: BufferEncoding, + callback: TransformCallback + ): void { + const rows = new Array(batch.numRows); + for (let i = 0; i < batch.numRows; i++) { + rows[i] = { + f: new Array(batch.numCols), + }; + } + for (let j = 0; j < batch.numCols; j++) { + const column = batch.selectAt([j]); + const columnName = column.schema.fields[0].name; + for (let i = 0; i < batch.numRows; i++) { + const fieldData = column.get(i); + const fieldValue = fieldData?.toJSON()[columnName]; + rows[i].f[j] = { + v: convertArrowValue(fieldValue), + }; + } + } + for (let i = 0; i < batch.numRows; i++) { + this.push(rows[i]); + } + callback(null); + } +} + +function convertArrowValue(fieldValue: any): any { + if (typeof fieldValue === 'object') { + if (fieldValue instanceof Vector) { + const arr = fieldValue.toJSON(); + return arr.map((v: any) => { + return {v: convertArrowValue(v)}; + }); + } + const tableRow: TableRow = {f: []}; + Object.keys(fieldValue).forEach(key => { + tableRow.f?.push({ + v: convertArrowValue(fieldValue[key]), + }); + }); + return tableRow; + } + return fieldValue; +} diff --git a/src/reader/data_format.ts b/src/reader/data_format.ts new file mode 100644 index 00000000..d599c471 --- /dev/null +++ b/src/reader/data_format.ts @@ -0,0 +1,33 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as protos from '../../protos/protos'; + +export type DataFormat = + protos.google.cloud.bigquery.storage.v1.IReadSession['dataFormat']; +const DataFormat = protos.google.cloud.bigquery.storage.v1.DataFormat; + +/** + * Return data in Apache Arrow format. + * + * @memberof reader + */ +export const ArrowFormat: DataFormat = 'ARROW'; + +/** + * Return data in Apache Avro format. + * + * @memberof reader + */ +export const AvroFormat: DataFormat = 'AVRO'; diff --git a/src/reader/index.ts b/src/reader/index.ts new file mode 100644 index 00000000..280011fc --- /dev/null +++ b/src/reader/index.ts @@ -0,0 +1,35 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * Package reader provides an EXPERIMENTAL thick client around the + * BigQuery storage API's BigQueryReadClient. + * More information about this new read client may also be found in + * the public documentation: https://cloud.google.com/bigquery/docs/read-api + * + * It is EXPERIMENTAL and subject to change or removal without notice. This is primarily to signal that this + * package may still make breaking changes to existing methods and functionality. + * + * @namespace reader + */ + +export {ReadClient} from './read_client'; +export {TableReader} from './table_reader'; +export {ArrowTableReader} from './arrow_reader'; +export {ReadStream} from './read_stream'; +export {DataFormat, ArrowFormat, AvroFormat} from './data_format'; +export {setLogFunction} from '../util/logger'; + +// polyfill array.at for Node < 14. Remove after Node 14 is deprecated. +import 'core-js/full/array/at'; diff --git a/src/reader/read_client.ts b/src/reader/read_client.ts new file mode 100644 index 00000000..5b89ae12 --- /dev/null +++ b/src/reader/read_client.ts @@ -0,0 +1,202 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as gax from 'google-gax'; +import type {CallOptions, ClientOptions} from 'google-gax'; + +import * as protos from '../../protos/protos'; +import {BigQueryReadClient} from '../v1'; +import {ReadStream} from './read_stream'; +import {TableReader} from './table_reader'; +import {ArrowTableReader} from './arrow_reader'; +import {DataFormat} from './data_format'; + +type CreateReadSessionRequest = + protos.google.cloud.bigquery.storage.v1.ICreateReadSessionRequest; +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; + +export type TableReference = { + /** + * Required. The ID of the dataset containing this table. + */ + datasetId?: string; + /** + * Required. The ID of the project containing this table. + */ + projectId?: string; + /** + * Required. The ID of the table. The ID can contain Unicode characters in category L (letter), M (mark), N (number), Pc (connector, including underscore), Pd (dash), and Zs (space). For more information, see [General Category](https://wikipedia.org/wiki/Unicode_character_property#General_Category). The maximum length is 1,024 characters. Certain operations allow suffixing of the table ID with a partition decorator, such as `sample_table$20190123`. + */ + tableId?: string; +}; + +/** + * BigQuery Read API Client. + * The Read API can be used to read data to BigQuery. + * + * This class provides the ability to make remote calls to the backing service through method + * calls that map to API methods. + * + * For supplementary information about the Read API, see: + * https://cloud.google.com/bigquery/docs/read-api + * + * @class + * @memberof reader + */ +export class ReadClient { + private _client: BigQueryReadClient; + + constructor(opts?: ClientOptions) { + const baseOptions = { + 'grpc.keepalive_time_ms': 30 * 1000, + 'grpc.keepalive_timeout_ms': 10 * 1000, + 'grpc.use_local_subchannel_pool': 0, + }; + this._client = new BigQueryReadClient({ + ...baseOptions, + ...opts, + }); + } + + /** + * Initialize the client. + * Performs asynchronous operations (such as authentication) and prepares the client. + * This function will be called automatically when any class method is called for the + * first time, but if you need to initialize it before calling an actual method, + * feel free to call initialize() directly. + * + * You can await on this method if you want to make sure the client is initialized. + * + * @returns {Promise} A promise that resolves when auth is complete. + */ + initialize = async (): Promise => { + await this._client.initialize(); + }; + + getClient = (): BigQueryReadClient => { + return this._client; + }; + + setClient = (client: BigQueryReadClient): void => { + this._client = client; + }; + + /** + * Creates a new read session. A read session divides the contents of a + * BigQuery table into one or more streams, which can then be used to read + * data from the table. The read session also specifies properties of the + * data to be read, such as a list of columns or a push-down filter describing + * the rows to be returned. + * + * A particular row can be read by at most one stream. When the caller has + * reached the end of each stream in the session, then all the data in the + * table has been read. + * + * Data is assigned to each stream such that roughly the same number of + * rows can be read from each stream. Because the server-side unit for + * assigning data is collections of rows, the API does not guarantee that + * each stream will return the same number or rows. Additionally, the + * limits are enforced based on the number of pre-filtered rows, so some + * filters can lead to lopsided assignments. + * + * Read sessions automatically expire 6 hours after they are created and do + * not require manual clean-up by the caller. + * + * @param {Object} request + * The request object that will be sent. + * @param {string} request.parent + * Parent table that all the streams should belong to, in the form + * of `projects/{project}`. + * @param {string} request.table + * Parent table that all the streams should belong to, in the form + * of `projects/{project}/datasets/{dataset}/tables/{table}`. + * @returns {Promise}} - The promise which resolves to the streamId. + */ + async createReadSession(request: { + parent: string; + table: string; + dataFormat: DataFormat; + selectedFields?: string[]; + }): Promise { + await this.initialize(); + const {table, parent, dataFormat, selectedFields} = request; + const maxWorkerCount = 1; + const maxStreamCount = 0; + const createReq: CreateReadSessionRequest = { + parent, + readSession: { + table, + dataFormat, + readOptions: { + selectedFields: selectedFields, + }, + }, + preferredMinStreamCount: maxWorkerCount, + maxStreamCount: maxStreamCount, + }; + const [response] = await this._client.createReadSession(createReq); + if (typeof [response] === undefined) { + throw new gax.GoogleError(`${response}`); + } + return response; + } + + /** + * Creates a ReadStream to the given stream name and ReadSession. + * + * @param {Object} request + * The request object that will be sent. + * @param {string} request.streamName + * Required. The id/name of read stream to read from. + * @param {string} request.session + * Required. Reference to the ReadSession. See `createReadSession`. + * @returns {Promise}} - The promise which resolves to the `ReadStream`. + */ + async createReadStream( + request: { + streamName: string; + session: ReadSession; + }, + options?: CallOptions + ): Promise { + await this.initialize(); + const {streamName, session} = request; + try { + const stream = new ReadStream(streamName, session, this, options); + return stream; + } catch (err) { + throw new Error('read stream connection failed:' + err); + } + } + + async createTableReader(params: { + table: TableReference; + }): Promise { + await this.initialize(); + const reader = new TableReader(this, params.table); + return reader; + } + + async createArrowTableReader(params: { + table: TableReference; + }): Promise { + await this.initialize(); + const reader = new ArrowTableReader(this, params.table); + return reader; + } + + close() { + this._client.close(); + } +} diff --git a/src/reader/read_session.ts b/src/reader/read_session.ts new file mode 100644 index 00000000..c08ce9b5 --- /dev/null +++ b/src/reader/read_session.ts @@ -0,0 +1,149 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; +import {Readable} from 'stream'; + +import {ReadStream} from './read_stream'; +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; +import {DataFormat} from './data_format'; +import {logger} from '../util/logger'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; +const ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.ReadSession; + +export type GetStreamOptions = { + /** + * Row limit of the table. + */ + maxResults?: number; + /** + * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; + */ + selectedFields?: string; +}; + +/** + * A ReadSession represents a Read Session from the BigQuery + * Storage Read API. + * + * Read more on:https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#readsession + * + * @class + * @memberof reader + */ +export class ReadSession { + private _info: ReadSessionInfo | null; + private _tableRef: TableReference; + private _format: DataFormat; + private _readStreams: ReadStream[]; + private _readClient: ReadClient; + + constructor( + readClient: ReadClient, + tableRef: TableReference, + format: DataFormat + ) { + this._info = null; + this._format = format; + this._tableRef = tableRef; + this._readClient = readClient; + this._readStreams = []; + } + + getSessionInfo(): ReadSessionInfo | null { + return this._info; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger('session', `[session: ${this._info?.name}]`, msg, ...otherArgs); + } + + private async getOrCreateSession( + options?: GetStreamOptions + ): Promise { + if (this._info) { + return this._info; + } + const session = await this._readClient.createReadSession({ + parent: `projects/${this._tableRef.projectId}`, + table: `projects/${this._tableRef.projectId}/datasets/${this._tableRef.datasetId}/tables/${this._tableRef.tableId}`, + dataFormat: this._format, + selectedFields: options?.selectedFields?.split(','), + }); + this.trace( + 'session created', + session.name, + session.streams, + session.estimatedRowCount + ); + this._info = session; + + this._readStreams = []; + for (const readStream of session.streams || []) { + const r = await this._readClient.createReadStream( + { + streamName: readStream.name!, + session, + }, + options + ); + this._readStreams.push(r); + } + return session; + } + + /** + * Get a merged stream of ReadRowsResponse from all ReadStream + * under this ReadSession. + * + * @param {GetStreamOptions} options + */ + async getStream( + options?: GetStreamOptions + ): Promise> { + this.trace('getStream', options); + + await this.getOrCreateSession(options); + + const mergedStream = mergeStreams( + this._readStreams.map(r => { + const stream = r.getRowsStream(); + return stream; + }) + ); + const joined = Readable.from(mergedStream); + this.trace('joined streams', joined); + const stream = joined as ResourceStream; + return stream; + } + + close() { + this._readStreams.forEach(rs => { + rs.close(); + }); + } +} + +async function* mergeStreams(readables: Readable[]) { + for (const readable of readables) { + for await (const chunk of readable) { + yield chunk; + } + } +} diff --git a/src/reader/read_stream.ts b/src/reader/read_stream.ts new file mode 100644 index 00000000..7e528e4e --- /dev/null +++ b/src/reader/read_stream.ts @@ -0,0 +1,190 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as gax from 'google-gax'; +import * as protos from '../../protos/protos'; + +import {ReadClient} from './read_client'; +import {logger} from '../util/logger'; +import {Readable, Transform} from 'stream'; + +type ReadSession = protos.google.cloud.bigquery.storage.v1.IReadSession; +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; + +export type RemoveListener = { + off: () => void; +}; + +/** + * ReadStream is responsible for reading data from a GRPC read stream + * connection against the Storage Read API readRows method. + * + * @class + * @extends EventEmitter + * @memberof reader + */ +export class ReadStream { + private _streamName: string; + private _offset: number; + private _readClient: ReadClient; + private _session: ReadSession; + private _readStream: Readable | null; + private _connection: gax.CancellableStream | null; + private _callOptions?: gax.CallOptions; + + constructor( + streamName: string, + session: ReadSession, + readClient: ReadClient, + options?: gax.CallOptions + ) { + this._streamName = streamName; + this._session = session; + this._offset = 0; + this._readClient = readClient; + this._connection = null; + this._readStream = null; + this._callOptions = options; + this.open(); + } + + open() { + if (this.isOpen()) { + this.close(); + } + const client = this._readClient.getClient(); + const connection = client.readRows( + { + readStream: this._streamName, + offset: this._offset, + }, + this._callOptions + ); + this._connection = connection; + const passthrough = new Transform({ + objectMode: true, + transform: (response: ReadRowsResponse, _, callback) => { + this.processReadRowsResponse(response); + callback(null, response); + }, + }); + this._readStream = this._connection.pipe(passthrough); + this._connection.on('error', this.handleError); + this._connection.on('close', () => { + this.trace('connection closed'); + }); + this._connection.on('pause', () => { + this.trace('connection paused'); + }); + this._connection.on('resume', async () => { + this.trace('connection resumed'); + }); + this._connection.on('end', () => { + this.trace('connection ended'); + }); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'read_stream', + `[streamName: ${this._streamName}]`, + msg, + ...otherArgs + ); + } + + private handleError = (err: gax.GoogleError) => { + this.trace('on error', err, JSON.stringify(err)); + if (this.isRetryableError(err)) { + this.reconnect(); + return; + } + this._readStream?.destroy(err); + this._readStream = null; + }; + + private isRetryableError(err?: gax.GoogleError | null): boolean { + if (!err) { + return false; + } + const reconnectionErrorCodes = [ + gax.Status.ABORTED, + gax.Status.CANCELLED, + gax.Status.DEADLINE_EXCEEDED, + gax.Status.INTERNAL, + gax.Status.UNAVAILABLE, + ]; + return !!err.code && reconnectionErrorCodes.includes(err.code); + } + + private processReadRowsResponse(response: ReadRowsResponse) { + if (!response.rowCount) { + return; + } + const rowCount = parseInt(response.rowCount as string, 10); + this._offset += rowCount; + } + + /** + * Get the name of the read stream associated with this connection. + */ + getStreamName = (): string => { + return this._streamName; + }; + + getReadSession(): ReadSession { + return this._session; + } + + getRowsStream(): Readable { + return this._readStream!; + } + + /** + * Check if connection is open and ready to read data. + */ + isOpen(): boolean { + if (this._connection) { + return !(this._connection.destroyed || this._connection.closed); + } + return false; + } + + /** + * Reconnect and re-open readRows channel. + */ + reconnect() { + this.trace('reconnect called'); + this.close(); + this.open(); + } + + /** + * Close the read stream connection. + */ + close() { + if (this._connection) { + this._connection.end(); + this._connection.removeAllListeners(); + this._connection.destroy(); + this._connection = null; + } + if (this._readStream) { + this._readStream.destroy(); + this._readStream = null; + } + } +} diff --git a/src/reader/table_reader.ts b/src/reader/table_reader.ts new file mode 100644 index 00000000..8ccef34a --- /dev/null +++ b/src/reader/table_reader.ts @@ -0,0 +1,136 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import {ResourceStream} from '@google-cloud/paginator'; + +import * as protos from '../../protos/protos'; +import {TableReference, ReadClient} from './read_client'; +import {logger} from '../util/logger'; +import {ArrowRecordBatchTableRowTransform} from './arrow_transform'; +import {ArrowTableReader} from './arrow_reader'; + +type ReadSessionInfo = protos.google.cloud.bigquery.storage.v1.IReadSession; + +interface TableCell { + v?: any; +} +interface TableRow { + /** + * Represents a single row in the result set, consisting of one or more fields. + */ + f?: Array; +} +interface TableDataList { + /** + * Rows of results. + */ + rows?: Array; + /** + * Total rows of the entire table. In order to show default value 0 we have to present it as string. + */ + totalRows?: string; +} + +type GetRowsOptions = { + /** + * Row limit of the table. + */ + maxResults?: number; + /** + * Subset of fields to return, supports select into sub fields. Example: selected_fields = "a,e.d.f"; + */ + selectedFields?: string; +}; +type RowsResponse = [any[], ReadSessionInfo | null, TableDataList]; + +/** + * A BigQuery Storage API Reader that can be used to reader data into BigQuery Table + * using the Storage API. + * + * @class + * @memberof reader + */ +export class TableReader { + private _arrowReader: ArrowTableReader; + private _tableRef: TableReference; + + /** + * Creates a new Reader instance. + * + * @param {Object} params - The parameters for the JSONWriter. + * @param {TableReference} params.table - The stream connection + * to the BigQuery streaming insert operation. + */ + constructor(readClient: ReadClient, tableRef: TableReference) { + this._tableRef = tableRef; + this._arrowReader = new ArrowTableReader(readClient, tableRef); + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + private trace(msg: string, ...otherArgs: any[]) { + logger( + 'table_reader', + `[table: ${this._tableRef.tableId}]`, + msg, + ...otherArgs + ); + } + + getSessionInfo(): ReadSessionInfo | undefined | null { + return this._arrowReader.getSessionInfo(); + } + + async getRowStream( + options?: GetRowsOptions + ): Promise> { + this.trace('getRowStream', options); + const stream = await this._arrowReader.getRecordBatchStream(options); + return stream.pipe( + new ArrowRecordBatchTableRowTransform() + ) as ResourceStream; + } + + /** + * Retrieves table data as rows in same format + * as tabledata.list BigQuery v2 API. + * Extra parameters returned contain Storage Read API specific information + * like ReadSession and totalRows count. + * + * @param {options} GetRowsOptions + */ + async getRows(options?: GetRowsOptions): Promise { + this.trace('getRows', options); + const stream = await this.getRowStream(options); + const session = this.getSessionInfo(); + return new Promise((resolve, reject) => { + const rows: TableRow[] = []; + stream.on('data', (data: TableRow) => { + rows.push(data); + }); + stream.on('error', err => { + this.trace('reject called on joined stream', err); + reject(err); + }); + stream.on('end', () => { + this.trace('resolve called on joined stream'); + const totalRows = `${session?.estimatedRowCount ?? 0}`; + resolve([rows, session ?? null, {rows, totalRows}]); + }); + }); + } + + close() { + this._arrowReader.close(); + } +} diff --git a/system-test/install.ts b/system-test/install.ts index 83b83f33..3785fd2f 100644 --- a/system-test/install.ts +++ b/system-test/install.ts @@ -27,6 +27,7 @@ describe('📦 pack-n-play test', () => { packageDir: process.cwd(), sample: { description: 'TypeScript user can use the type definitions', + devDependencies: ['@types/web'], ts: readFileSync( './system-test/fixtures/sample/src/index.ts' ).toString(), diff --git a/system-test/reader_client_test.ts b/system-test/reader_client_test.ts new file mode 100644 index 00000000..82fd91a7 --- /dev/null +++ b/system-test/reader_client_test.ts @@ -0,0 +1,512 @@ +// Copyright 2023 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import * as gax from 'google-gax'; +import * as uuid from 'uuid'; +import * as sinon from 'sinon'; +import {BigQuery, TableRow, TableSchema} from '@google-cloud/bigquery'; +import * as protos from '../protos/protos'; +import * as protobuf from 'protobufjs'; +import {ClientOptions} from 'google-gax'; +import * as customerRecordProtoJson from '../samples/customer_record.json'; +import * as bigquerystorage from '../src'; +import * as reader from '../src/reader'; +import {RecordBatch, Table, tableFromIPC} from 'apache-arrow'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +const {ReadClient, ArrowFormat, AvroFormat} = reader; + +const sandbox = sinon.createSandbox(); +afterEach(() => sandbox.restore()); + +if (process.env.NODE_ENV === 'DEBUG') { + reader.setLogFunction(console.log); +} + +const GCLOUD_TESTS_PREFIX = 'nodejs_bqstorage_system_test'; +const bigquery = new BigQuery(); +const generateUuid = () => + `${GCLOUD_TESTS_PREFIX}_${uuid.v4()}`.replace(/-/gi, '_'); +const datasetId = generateUuid(); + +const sleep = (ms: number) => + new Promise(resolve => { + setTimeout(resolve, ms); + }); + +const root = protobuf.Root.fromJSON(customerRecordProtoJson); +if (!root) { + throw Error('Proto must not be undefined'); +} + +describe('reader.ReaderClient', () => { + let projectId: string; + let parent: string; + let tableRef: string; + let tableId: string; + let bqReadClient: bigquerystorage.BigQueryReadClient; + let clientOptions: ClientOptions; + const schema: TableSchema = { + fields: [ + { + name: 'name', + type: 'STRING', + mode: 'REQUIRED', + }, + { + name: 'row_num', + type: 'INTEGER', + mode: 'REQUIRED', + }, + ], + }; + + before(async () => { + await deleteDatasets(); + + await bigquery.createDataset(datasetId); + }); + + beforeEach(async () => { + tableId = generateUuid(); + + const [table] = await bigquery + .dataset(datasetId) + .createTable(tableId, {schema}); + + projectId = table.metadata.tableReference.projectId; + + parent = `projects/${projectId}`; + tableRef = `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`; + + await bigquery + .dataset(datasetId) + .table(tableId) + .insert([ + {name: 'Ada Lovelace', row_num: 1}, + {name: 'Alan Turing', row_num: 2}, + {name: 'Bell', row_num: 3}, + ]); + }); + + after(async () => { + await bigquery.dataset(datasetId).delete({force: true}).catch(console.warn); + }); + + beforeEach(async () => { + clientOptions = { + projectId: projectId, + 'grpc.keepalive_time_ms': 30 * 1000, + 'grpc.keepalive_timeout_ms': 10 * 1000, + }; + bqReadClient = new bigquerystorage.BigQueryReadClient(clientOptions); + }); + + afterEach(async () => { + await bqReadClient.close(); + }); + + describe('Common methods', () => { + it('should create a client without arguments', () => { + const client = new ReadClient(); + assert(client.getClient()); + }); + + it('should create a client with arguments: parent, client, opts', async () => { + const client = new ReadClient(clientOptions); + assert(client.getClient()); + const clientId = await client.getClient().getProjectId(); + assert.strictEqual(clientId, clientOptions.projectId); + }); + }); + + describe('Read', () => { + it('should invoke createReadSession and createReadStream without errors', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent, + table: tableRef, + dataFormat: ArrowFormat, + }); + + assert.equal(session.dataFormat, ArrowFormat); + assert.notEqual(session.streams, null); + assert.equal(session.streams?.length, 1); + + const readStream = session.streams![0]; + const stream = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + const rowStream = stream.getRowsStream(); + + const responses: ReadRowsResponse[] = []; + await new Promise((resolve, reject) => { + rowStream.on('data', (data: ReadRowsResponse) => { + responses.push(data); + }); + rowStream.on('error', reject); + rowStream.on('end', () => { + resolve(null); + }); + }); + + assert.equal(responses.length, 1); + + const res = responses[0]; + assert.equal(stream['_offset'], res.rowCount); + stream.close(); + } finally { + client.close(); + } + }); + }); + + describe('ArrowTableReader', () => { + it('should allow to read a table as an Arrow byte stream', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createArrowTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const rawStream = await reader.getStream(); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + const content: Buffer = await new Promise((resolve, reject) => { + let serializedSchema: string | Uint8Array = ''; + if (session?.arrowSchema?.serializedSchema) { + serializedSchema = session?.arrowSchema?.serializedSchema; + } + let buf = Buffer.from(serializedSchema); + rawStream.on('data', (data: Uint8Array) => { + buf = Buffer.concat([buf, data]); + }); + rawStream.on('error', reject); + rawStream.on('end', () => { + resolve(buf); + }); + }); + const table = await tableFromIPC(content); + + assert.equal(table.numRows, 3); + assert.equal(table.numCols, 2); + + reader.close(); + } finally { + client.close(); + } + }); + + it('should allow to read a table as a stream of Arrow Record Batches', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createArrowTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const recordBatchStream = await reader.getRecordBatchStream(); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + const batches: RecordBatch[] = []; + for await (const batch of recordBatchStream) { + batches.push(batch); + } + const table = new Table(batches); + + assert.equal(table.numRows, 3); + assert.equal(table.numCols, 2); + + reader.close(); + } finally { + client.close(); + } + }); + }); + + describe('TableReader', () => { + it('should allow to read a table as a stream', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const rowStream = await reader.getRowStream(); + const rows: TableRow[] = []; + await new Promise((resolve, reject) => { + rowStream.on('data', (data: TableRow) => { + rows.push(data); + }); + rowStream.on('error', reject); + rowStream.on('end', () => { + resolve(null); + }); + }); + + const session = reader.getSessionInfo(); + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.equal(rows.length, 3); + + reader.close(); + } finally { + client.close(); + } + }); + + it('should allow to read a table as tabledata.list RowsResponse', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + const [rows, session, response] = await reader.getRows(); + + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.notEqual(response.totalRows, null); // estimated row count + assert.equal(response.rows?.length, 3); + + assert.equal(rows.length, 3); + + reader.close(); + } finally { + client.close(); + } + }); + + it('should allow to read a table with long running query', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const genTableId = generateUuid(); + await bigquery.query( + `CREATE TABLE ${projectId}.${datasetId}.${genTableId} AS SELECT num FROM UNNEST(GENERATE_ARRAY(1,1000000)) as num` + ); + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId: genTableId, + }, + }); + + const [rows, session, response] = await reader.getRows(); + + assert.notEqual(session, null); + assert.equal(session?.dataFormat, ArrowFormat); + + assert.notEqual(response.totalRows, null); // estimated row count + assert.equal(response.rows?.length, 1000000); + + assert.equal(rows.length, 1000000); + + reader.close(); + } finally { + client.close(); + } + }).timeout(30 * 1000); + }); + + describe('Error Scenarios', () => { + it('send request with mismatched selected fields', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + + let foundError: gax.GoogleError | null = null; + try { + const rowStream = await reader.getRowStream({ + selectedFields: 'wrong_field', + }); + const rows: TableRow[] = []; + for await (const data of rowStream) { + rows.push(data); + } + } catch (err) { + assert.notEqual(err, null); + foundError = err as gax.GoogleError; + } + + assert.notEqual(foundError, null); + assert.equal(foundError?.code, gax.Status.INVALID_ARGUMENT); + assert.equal( + foundError?.message.includes( + 'request failed: The following selected fields do not exist in the table schema: wrong_field' + ), + true + ); + + reader.close(); + } finally { + client.close(); + } + }); + + it('should trigger reconnection when intermitent error happens', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const reader = await client.createTableReader({ + table: { + projectId, + datasetId, + tableId, + }, + }); + await reader.getRowStream(); + + // access private stream connection + const stream = reader['_arrowReader']['_session']['_readStreams'][0]; + let reconnectedCalled = false; + sandbox.stub(stream, 'reconnect').callsFake(() => { + reconnectedCalled = true; + }); + const conn = stream['_connection'] as gax.CancellableStream; // private method + + const gerr = new gax.GoogleError('aborted'); + gerr.code = gax.Status.ABORTED; + conn.emit('error', gerr); + conn.emit('close'); + + assert.equal(reconnectedCalled, true); + } finally { + client.close(); + } + }); + }); + + describe('close', () => { + it('should invoke close without errors', async () => { + bqReadClient.initialize(); + const client = new ReadClient(); + client.setClient(bqReadClient); + + try { + const session = await client.createReadSession({ + parent: `projects/${projectId}`, + table: `projects/${projectId}/datasets/${datasetId}/tables/${tableId}`, + dataFormat: AvroFormat, + }); + + assert.equal(session.dataFormat, AvroFormat); + assert.notEqual(session.streams, null); + assert.notEqual(session.streams?.length, 0); + + const readStream = session.streams![0]; + const connection = await client.createReadStream({ + session, + streamName: readStream.name!, + }); + await sleep(100); + + const internalConn = connection['_connection']!; + + connection.close(); + assert.strictEqual(internalConn.destroyed, true); + + client.close(); + } finally { + client.close(); + } + }); + }); + + // Only delete a resource if it is older than 24 hours. That will prevent + // collisions with parallel CI test runs. + function isResourceStale(creationTime: number) { + const oneDayMs = 86400000; + const now = new Date(); + const created = new Date(creationTime); + return now.getTime() - created.getTime() >= oneDayMs; + } + + async function deleteDatasets() { + let [datasets] = await bigquery.getDatasets(); + datasets = datasets.filter(dataset => + dataset.id?.includes(GCLOUD_TESTS_PREFIX) + ); + + for (const dataset of datasets) { + const [metadata] = await dataset.getMetadata(); + const creationTime = Number(metadata.creationTime); + if (isResourceStale(creationTime)) { + try { + await dataset.delete({force: true}); + } catch (e) { + console.log(`dataset(${dataset.id}).delete() failed`); + console.log(e); + } + } + } + } +}); diff --git a/test/reader/arrow_transform.ts b/test/reader/arrow_transform.ts new file mode 100644 index 00000000..e8081b40 --- /dev/null +++ b/test/reader/arrow_transform.ts @@ -0,0 +1,129 @@ +// Copyright 2024 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import * as assert from 'assert'; +import {describe, it} from 'mocha'; +import * as protos from '../../protos/protos'; +import {RecordBatchStreamWriter, tableFromArrays} from 'apache-arrow'; +import {Readable} from 'stream'; +import { + ArrowRawTransform, + ArrowRecordBatchTableRowTransform, + ArrowRecordBatchTransform, + ArrowRecordReaderTransform, +} from '../../src/reader/arrow_transform'; +import {BigQuery} from '@google-cloud/bigquery'; +import bigquery from '@google-cloud/bigquery/build/src/types'; + +type ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.IReadRowsResponse; +const ReadRowsResponse = + protos.google.cloud.bigquery.storage.v1.ReadRowsResponse; + +describe('Arrow Transform', () => { + it('Pipeline with all transforms', async () => { + const schema: bigquery.ITableSchema = { + fields: [ + {name: 'name', type: 'STRING'}, + {name: 'row', type: 'INTEGER'}, + {name: 'arr', type: 'INTEGER', mode: 'REPEATED'}, + { + name: 'rec', + type: 'RECORD', + fields: [ + {name: 'key', type: 'STRING'}, + {name: 'value', type: 'STRING'}, + ], + }, + { + name: 'recs', + type: 'RECORD', + mode: 'REPEATED', + fields: [{name: 'num', type: 'INTEGER'}], + }, + ], + }; + const table = tableFromArrays({ + name: ['Ada Lovelace', 'Alan Turing', 'Bell'], + row: [1, 2, 3], + arr: [ + [10, 20], + [20, 30], + [30, 40], + ], + rec: [ + {key: 'foo', value: 'bar'}, + {key: 'test', value: 'baz'}, + {key: 'a key', value: 'a value'}, + ], + recs: [ + [{num: 10}, {num: 20}], + [{num: 20}, {num: 30}], + [{num: 30}, {num: 40}], + ], + }); + const writer = RecordBatchStreamWriter.writeAll(table); + const serializedRecordBatch = writer.toUint8Array(true); + const serializedSchema = Uint8Array.from([]); + const response: ReadRowsResponse = { + arrowSchema: { + serializedSchema, + }, + arrowRecordBatch: { + serializedRecordBatch, + rowCount: table.numRows, + }, + }; + + const pipeline = Readable.from([response]) + .pipe(new ArrowRawTransform()) + .pipe(new ArrowRecordReaderTransform({arrowSchema: {serializedSchema}})) + .pipe(new ArrowRecordBatchTransform()) + .pipe(new ArrowRecordBatchTableRowTransform()); + + const consumeRows = new Promise(resolve => { + const rows: any[] = []; + pipeline + .on('data', data => rows.push(data)) + .on('end', () => resolve(rows)); + }); + const tableRows = await consumeRows; + const rows = BigQuery.mergeSchemaWithRows_(schema, tableRows, { + wrapIntegers: false, + }); + assert.deepStrictEqual(rows, [ + { + name: 'Ada Lovelace', + row: 1, + arr: [10, 20], + rec: {key: 'foo', value: 'bar'}, + recs: [{num: 10}, {num: 20}], + }, + { + name: 'Alan Turing', + row: 2, + arr: [20, 30], + rec: {key: 'test', value: 'baz'}, + recs: [{num: 20}, {num: 30}], + }, + { + name: 'Bell', + row: 3, + arr: [30, 40], + rec: {key: 'a key', value: 'a value'}, + recs: [{num: 30}, {num: 40}], + }, + ]); + }); +});