Skip to content

Commit 5699497

Browse files
committed
Adding lock metrics
1 parent e601b71 commit 5699497

File tree

3 files changed

+113
-5
lines changed

3 files changed

+113
-5
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ export class RunEngine {
9797
redis: this.runLockRedis,
9898
logger: this.logger,
9999
tracer: trace.getTracer("RunLocker"),
100+
meter: options.meter,
100101
});
101102

102103
const keys = new RunQueueFullKeyProducer();

internal-packages/run-engine/src/engine/locking.ts

Lines changed: 90 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,30 @@ import { Redis } from "@internal/redis";
55
import * as redlock from "redlock";
66
import { tryCatch } from "@trigger.dev/core";
77
import { Logger } from "@trigger.dev/core/logger";
8-
import { startSpan, Tracer } from "@internal/tracing";
8+
import { startSpan, Tracer, Meter, getMeter, ValueType, ObservableResult, Attributes, Histogram } from "@internal/tracing";
9+
10+
const SemanticAttributes = {
11+
LOCK_TYPE: "run_engine.lock.type",
12+
LOCK_RESOURCES: "run_engine.lock.resources",
13+
LOCK_SUCCESS: "run_engine.lock.success",
14+
};
915

1016
interface LockContext {
1117
resources: string;
1218
signal: redlock.RedlockAbortSignal;
19+
lockType: string;
1320
}
1421

1522
export class RunLocker {
1623
private redlock: InstanceType<typeof redlock.default>;
1724
private asyncLocalStorage: AsyncLocalStorage<LockContext>;
1825
private logger: Logger;
1926
private tracer: Tracer;
27+
private meter: Meter;
28+
private activeLocks: Map<string, { lockType: string; resources: string[] }> = new Map();
29+
private lockDurationHistogram: Histogram;
2030

21-
constructor(options: { redis: Redis; logger: Logger; tracer: Tracer }) {
31+
constructor(options: { redis: Redis; logger: Logger; tracer: Tracer; meter?: Meter }) {
2232
this.redlock = new Redlock([options.redis], {
2333
driftFactor: 0.01,
2434
retryCount: 10,
@@ -29,6 +39,45 @@ export class RunLocker {
2939
this.asyncLocalStorage = new AsyncLocalStorage<LockContext>();
3040
this.logger = options.logger;
3141
this.tracer = options.tracer;
42+
this.meter = options.meter ?? getMeter("run-engine");
43+
44+
const activeLocksObservableGauge = this.meter.createObservableGauge(
45+
"run_engine.locks.active",
46+
{
47+
description: "The number of active locks by type",
48+
unit: "1",
49+
valueType: ValueType.INT,
50+
}
51+
);
52+
53+
const lockDurationHistogram = this.meter.createHistogram(
54+
"run_engine.lock.duration",
55+
{
56+
description: "The duration of lock operations",
57+
unit: "ms",
58+
valueType: ValueType.DOUBLE,
59+
}
60+
);
61+
62+
activeLocksObservableGauge.addCallback(this.#updateActiveLocksCount.bind(this));
63+
this.lockDurationHistogram = lockDurationHistogram;
64+
}
65+
66+
async #updateActiveLocksCount(observableResult: ObservableResult<Attributes>) {
67+
// Group active locks by type
68+
const lockCountsByType = new Map<string, number>();
69+
70+
for (const [_, lockInfo] of this.activeLocks) {
71+
const count = lockCountsByType.get(lockInfo.lockType) || 0;
72+
lockCountsByType.set(lockInfo.lockType, count + 1);
73+
}
74+
75+
// Report metrics for each lock type
76+
for (const [lockType, count] of lockCountsByType) {
77+
observableResult.observe(count, {
78+
[SemanticAttributes.LOCK_TYPE]: lockType,
79+
});
80+
}
3281
}
3382

3483
/** Locks resources using RedLock. It won't lock again if we're already inside a lock with the same resources. */
@@ -54,17 +103,53 @@ export class RunLocker {
54103
span.setAttribute("nested", false);
55104

56105
// Different resources or not in a lock, proceed with new lock
106+
const lockId = `${name}:${joinedResources}:${Date.now()}`;
107+
const lockStartTime = performance.now();
108+
57109
const [error, result] = await tryCatch(
58110
this.redlock.using(resources, duration, async (signal) => {
59-
const newContext: LockContext = { resources: joinedResources, signal };
111+
const newContext: LockContext = {
112+
resources: joinedResources,
113+
signal,
114+
lockType: name
115+
};
60116

61-
return this.asyncLocalStorage.run(newContext, async () => {
62-
return routine(signal);
117+
// Track active lock
118+
this.activeLocks.set(lockId, {
119+
lockType: name,
120+
resources: resources,
63121
});
122+
123+
let lockSuccess = true;
124+
try {
125+
return this.asyncLocalStorage.run(newContext, async () => {
126+
return routine(signal);
127+
});
128+
} catch (lockError) {
129+
lockSuccess = false;
130+
throw lockError;
131+
} finally {
132+
// Record lock duration
133+
const lockDuration = performance.now() - lockStartTime;
134+
this.lockDurationHistogram.record(lockDuration, {
135+
[SemanticAttributes.LOCK_TYPE]: name,
136+
[SemanticAttributes.LOCK_SUCCESS]: lockSuccess.toString(),
137+
});
138+
139+
// Remove from active locks when done
140+
this.activeLocks.delete(lockId);
141+
}
64142
})
65143
);
66144

67145
if (error) {
146+
// Record failed lock acquisition
147+
const lockDuration = performance.now() - lockStartTime;
148+
this.lockDurationHistogram.record(lockDuration, {
149+
[SemanticAttributes.LOCK_TYPE]: name,
150+
[SemanticAttributes.LOCK_SUCCESS]: "false",
151+
});
152+
68153
this.logger.error("[RunLocker] Error locking resources", { error, resources, duration });
69154
throw error;
70155
}

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ import { z } from "zod";
4545
const SemanticAttributes = {
4646
QUEUE: "runqueue.queue",
4747
WORKER_QUEUE: "runqueue.workerQueue",
48+
MASTER_QUEUE_SHARD: "runqueue.masterQueueShard",
4849
CONSUMER_ID: "runqueue.consumerId",
4950
RUN_ID: "runqueue.runId",
5051
RESULT_COUNT: "runqueue.resultCount",
@@ -141,7 +142,17 @@ export class RunQueue {
141142
}
142143
);
143144

145+
const masterQueueObservableGauge = this._meter.createObservableGauge(
146+
"runqueue.masterQueue.length",
147+
{
148+
description: "The number of queues in the master queue shard",
149+
unit: "1",
150+
valueType: ValueType.INT,
151+
}
152+
);
153+
144154
workerQueueObservableGauge.addCallback(this.#updateWorkerQueueLength.bind(this));
155+
masterQueueObservableGauge.addCallback(this.#updateMasterQueueLength.bind(this));
145156

146157
this.abortController = new AbortController();
147158

@@ -206,6 +217,17 @@ export class RunQueue {
206217
}
207218
}
208219

220+
async #updateMasterQueueLength(observableResult: ObservableResult<Attributes>) {
221+
for (let shard = 0; shard < this.shardCount; shard++) {
222+
const masterQueueKey = this.keys.masterQueueKeyForShard(shard);
223+
const masterQueueLength = await this.redis.zcard(masterQueueKey);
224+
225+
observableResult.observe(masterQueueLength, {
226+
[SemanticAttributes.MASTER_QUEUE_SHARD]: shard.toString(),
227+
});
228+
}
229+
}
230+
209231
public async updateQueueConcurrencyLimits(
210232
env: MinimalAuthenticatedEnvironment,
211233
queue: string,

0 commit comments

Comments
 (0)