Skip to content

Commit 709f2b2

Browse files
committed
Release concurrency bucket metrics
1 parent 5699497 commit 709f2b2

File tree

2 files changed

+43
-1
lines changed

2 files changed

+43
-1
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,7 @@ export class RunEngine {
242242
pollInterval: options.releaseConcurrency?.pollInterval ?? 1000,
243243
batchSize: options.releaseConcurrency?.batchSize ?? 10,
244244
tracer: this.tracer,
245+
meter: this.meter,
245246
},
246247
});
247248

internal-packages/run-engine/src/engine/releaseConcurrencyTokenBucketQueue.ts

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Callback, createRedisClient, Redis, Result, type RedisOptions } from "@internal/redis";
2-
import { startSpan, Tracer } from "@internal/tracing";
2+
import { startSpan, Tracer, Meter, getMeter, ValueType, ObservableResult, Attributes } from "@internal/tracing";
33
import { Logger } from "@trigger.dev/core/logger";
44
import { z } from "zod";
55
import { setInterval } from "node:timers/promises";
@@ -39,6 +39,7 @@ export type ReleaseConcurrencyQueueOptions<T> = {
3939
consumersCount?: number;
4040
masterQueuesKey?: string;
4141
tracer?: Tracer;
42+
meter?: Meter;
4243
logger?: Logger;
4344
pollInterval?: number;
4445
batchSize?: number;
@@ -56,6 +57,7 @@ type QueueItemMetadata = z.infer<typeof QueueItemMetadata>;
5657
export class ReleaseConcurrencyTokenBucketQueue<T> {
5758
private redis: Redis;
5859
private logger: Logger;
60+
private meter: Meter;
5961
private abortController: AbortController;
6062
private consumers: ReleaseConcurrencyQueueConsumer<T>[];
6163
private sweeper?: ReleaseConcurrencyReleasingsSweeper;
@@ -69,11 +71,14 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
6971
private batchSize: number;
7072
private maxRetries: number;
7173
private backoff: NonNullable<Required<ReleaseConcurrencyQueueRetryOptions["backoff"]>>;
74+
private _lastReleasingsLength: number = 0;
75+
private _lastMasterQueueLength: number = 0;
7276

7377
constructor(private readonly options: ReleaseConcurrencyQueueOptions<T>) {
7478
this.redis = createRedisClient(options.redis);
7579
this.keyPrefix = options.redis.keyPrefix ?? "re2:release-concurrency-queue:";
7680
this.logger = options.logger ?? new Logger("ReleaseConcurrencyQueue", "debug");
81+
this.meter = options.meter ?? getMeter("release-concurrency");
7782
this.abortController = new AbortController();
7883
this.consumers = [];
7984

@@ -90,6 +95,28 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
9095
factor: options.retry?.backoff?.factor ?? 2,
9196
};
9297

98+
// Set up OpenTelemetry metrics
99+
const releasingsLengthGauge = this.meter.createObservableGauge(
100+
"release_concurrency.releasings.length",
101+
{
102+
description: "Number of items in the releasings sorted set",
103+
unit: "1",
104+
valueType: ValueType.INT,
105+
}
106+
);
107+
108+
const masterQueueLengthGauge = this.meter.createObservableGauge(
109+
"release_concurrency.master_queue.length",
110+
{
111+
description: "Number of queues in the master queue sorted set",
112+
unit: "1",
113+
valueType: ValueType.INT,
114+
}
115+
);
116+
117+
releasingsLengthGauge.addCallback(this.#updateReleasingsLength.bind(this));
118+
masterQueueLengthGauge.addCallback(this.#updateMasterQueueLength.bind(this));
119+
93120
this.#registerCommands();
94121

95122
if (!options.disableConsumers) {
@@ -104,6 +131,14 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
104131
await this.redis.quit();
105132
}
106133

134+
async #updateReleasingsLength(observableResult: ObservableResult<Attributes>) {
135+
observableResult.observe(this._lastReleasingsLength);
136+
}
137+
138+
async #updateMasterQueueLength(observableResult: ObservableResult<Attributes>) {
139+
observableResult.observe(this._lastMasterQueueLength);
140+
}
141+
107142
/**
108143
* Attempt to release concurrency for a run.
109144
*
@@ -496,13 +531,19 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
496531
const metrics = await this.getQueueMetrics();
497532
this.logger.info("Queue metrics:", { metrics });
498533

534+
// Update cached values for OpenTelemetry observable gauges
535+
this._lastReleasingsLength = await this.redis.zcard(this.#releasingsKey());
536+
this._lastMasterQueueLength = await this.redis.zcard(this.masterQueuesKey);
537+
499538
await startSpan(
500539
this.options.tracer,
501540
"ReleaseConcurrencyTokenBucketQueue.metrics",
502541
async (span) => {},
503542
{
504543
attributes: {
505544
...flattenAttributes(metrics, "queues"),
545+
releasingsLength: this._lastReleasingsLength,
546+
masterQueueLength: this._lastMasterQueueLength,
506547
forceRecording: true,
507548
},
508549
}

0 commit comments

Comments
 (0)