Skip to content

Commit 795a2d4

Browse files
authored
Merge branch 'main' into Address-final-comments-on-single-row-mutate-PR
2 parents 43f21c2 + ce1e796 commit 795a2d4

File tree

9 files changed

+451
-141
lines changed

9 files changed

+451
-141
lines changed

src/client-side-metrics/gcp-metrics-handler.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ const {
3131
} = require('@opentelemetry/sdk-metrics');
3232
import * as os from 'os';
3333
import * as crypto from 'crypto';
34+
import {MethodName} from './client-side-metrics-attributes';
3435

3536
/**
3637
* Generates a unique client identifier string.
@@ -251,10 +252,16 @@ export class GCPMetricsHandler implements IMetricsHandler {
251252
status: data.status,
252253
...commonAttributes,
253254
});
254-
otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, {
255-
status: data.status,
256-
...commonAttributes,
257-
});
255+
if (
256+
data.metricsCollectorData.method === MethodName.READ_ROWS ||
257+
data.metricsCollectorData.method === MethodName.READ_ROW
258+
) {
259+
otelInstruments.firstResponseLatencies.record(data.firstResponseLatency, {
260+
status: data.status,
261+
...commonAttributes,
262+
});
263+
}
264+
258265
if (data.applicationLatency) {
259266
otelInstruments.applicationBlockingLatencies.record(
260267
data.applicationLatency,

src/interceptor.ts renamed to src/client-side-metrics/metric-interceptor.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
import {CallOptions} from 'google-gax';
16-
import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector';
16+
import {OperationMetricsCollector} from './operation-metrics-collector';
1717

1818
// Mock Server Implementation
1919
import * as grpcJs from '@grpc/grpc-js';
@@ -49,16 +49,21 @@ function createMetricsInterceptorProvider(
4949
collector.onStatusMetadataReceived(
5050
status as unknown as ServerStatus,
5151
);
52+
collector.onAttemptComplete(status.code);
5253
nextStat(status);
5354
},
5455
};
5556
next(metadata, newListener);
5657
},
58+
sendMessage: function (message, next) {
59+
collector.onAttemptStart();
60+
next(message);
61+
},
5762
});
5863
};
5964
}
6065

61-
export function withInterceptors(
66+
export function withMetricInterceptors(
6267
gaxOptions: CallOptions,
6368
metricsCollector?: OperationMetricsCollector,
6469
) {

src/client-side-metrics/operation-metrics-collector.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,7 @@ export class OperationMetricsCollector {
181181
}) => {
182182
this.onStatusMetadataReceived(status);
183183
},
184-
)
185-
.on('data', () => {
186-
this.onResponse();
187-
});
184+
);
188185
}
189186

190187
/**
@@ -289,8 +286,15 @@ export class OperationMetricsCollector {
289286
finalOperationStatus: grpc.status,
290287
applicationLatency?: number,
291288
) {
292-
this.onAttemptComplete(finalOperationStatus);
293289
withMetricsDebug(() => {
290+
if (
291+
this.state ===
292+
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_NO_ROWS_YET ||
293+
this.state ===
294+
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_IN_PROGRESS_SOME_ROWS_RECEIVED
295+
) {
296+
this.onAttemptComplete(finalOperationStatus);
297+
}
294298
checkState(this.state, [
295299
MetricsCollectorState.OPERATION_STARTED_ATTEMPT_NOT_IN_PROGRESS,
296300
]);
@@ -310,7 +314,7 @@ export class OperationMetricsCollector {
310314
client_name: `nodejs-bigtable/${version}`,
311315
operationLatency: totalMilliseconds,
312316
retryCount: this.attemptCount - 1,
313-
firstResponseLatency: this.firstResponseLatency ?? undefined,
317+
firstResponseLatency: this.firstResponseLatency ?? 0,
314318
applicationLatency: applicationLatency ?? 0,
315319
});
316320
}

src/row-data-utils.ts

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
import {OperationMetricsCollector} from './client-side-metrics/operation-metrics-collector';
16+
1517
const dotProp = require('dot-prop');
1618
import {Filter, RawFilter} from './filter';
1719
import {
@@ -30,6 +32,11 @@ import {TabularApiSurface} from './tabular-api-surface';
3032
import arrify = require('arrify');
3133
import {Bigtable} from './index';
3234
import {CallOptions} from 'google-gax';
35+
import {
36+
MethodName,
37+
StreamingState,
38+
} from './client-side-metrics/client-side-metrics-attributes';
39+
import {withMetricInterceptors} from './client-side-metrics/metric-interceptor';
3340

3441
interface TabularApiSurfaceRequest {
3542
tableName?: string;
@@ -193,14 +200,31 @@ class RowDataUtils {
193200
properties.reqOpts,
194201
);
195202
properties.requestData.data = {};
203+
// 1. Create a metrics collector.
204+
const metricsCollector = new OperationMetricsCollector(
205+
properties.requestData.table,
206+
MethodName.READ_MODIFY_WRITE_ROW,
207+
StreamingState.UNARY,
208+
(
209+
properties.requestData.table as any
210+
).bigtable._metricsConfigManager!.metricsHandlers,
211+
);
212+
// 2. Tell the metrics collector an attempt has been started.
213+
metricsCollector.onOperationStart();
214+
// 3. Make a unary call with gax options that include interceptors. The
215+
// interceptors are built from a method that hooks them up to the
216+
// metrics collector
196217
properties.requestData.bigtable.request<google.bigtable.v2.IReadModifyWriteRowResponse>(
197218
{
198219
client: 'BigtableClient',
199220
method: 'readModifyWriteRow',
200221
reqOpts,
201-
gaxOpts: gaxOptions,
222+
gaxOpts: withMetricInterceptors(gaxOptions, metricsCollector),
223+
},
224+
(err, ...args) => {
225+
metricsCollector.onOperationComplete(err ? err.code : 0);
226+
callback(err, ...args);
202227
},
203-
callback,
204228
);
205229
}
206230

src/utils/createReadStreamInternal.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,10 @@ export function createReadStreamInternal(
320320
gaxOpts,
321321
retryOpts,
322322
});
323+
requestStream.on('data', () => {
324+
// This handler is necessary for recording firstResponseLatencies.
325+
metricsCollector.onResponse();
326+
});
323327

324328
activeRequestStream = requestStream!;
325329

@@ -416,7 +420,6 @@ export function createReadStreamInternal(
416420
})
417421
.on('end', () => {
418422
activeRequestStream = null;
419-
const applicationLatency = userStream.getTotalDurationMs();
420423
metricsCollector.onOperationComplete(
421424
grpc.status.OK,
422425
userStream.getTotalDurationMs(),

0 commit comments

Comments
 (0)