Skip to content

Commit 400673d

Browse files
committed
feat(run-engine): add RunDataProvider for V3 format ack/nack operations
Instead of creating message keys when dequeuing V3 messages, we now read run data from PostgreSQL via a RunDataProvider when needed for ack/nack operations. This approach: - Eliminates ALL message keys for V3 format (not just pending runs) - Uses PostgreSQL as the source of truth for run data - Falls back to Redis message key for legacy V2 messages - Adds RunDataProvider interface and RunData type The RunEngine creates a runDataProvider that queries the TaskRun table for queue, orgId, environmentId, etc. when readMessage is called and no Redis message key exists. https://claude.ai/code/session_01AyzQp6tbj7th5QRTCYjJR5
1 parent 5ff7951 commit 400673d

File tree

3 files changed

+111
-8
lines changed

3 files changed

+111
-8
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,43 @@ export class RunEngine {
182182
processWorkerQueueDebounceMs: options.queue?.processWorkerQueueDebounceMs,
183183
dequeueBlockingTimeoutSeconds: options.queue?.dequeueBlockingTimeoutSeconds,
184184
meter: options.meter,
185+
// Run data provider for V3 optimized format - reads from PostgreSQL when no Redis message key exists
186+
runDataProvider: {
187+
getRunData: async (runId: string) => {
188+
const run = await this.prisma.taskRun.findUnique({
189+
where: { id: runId },
190+
select: {
191+
queue: true,
192+
organizationId: true,
193+
projectId: true,
194+
runtimeEnvironmentId: true,
195+
environmentType: true,
196+
concurrencyKey: true,
197+
attemptNumber: true,
198+
queueTimestamp: true,
199+
workerQueue: true,
200+
taskIdentifier: true,
201+
},
202+
});
203+
204+
if (!run || !run.organizationId || !run.environmentType) {
205+
return undefined;
206+
}
207+
208+
return {
209+
queue: run.queue,
210+
orgId: run.organizationId,
211+
projectId: run.projectId,
212+
environmentId: run.runtimeEnvironmentId,
213+
environmentType: run.environmentType,
214+
concurrencyKey: run.concurrencyKey ?? undefined,
215+
attempt: run.attemptNumber ?? 0,
216+
timestamp: run.queueTimestamp?.getTime() ?? Date.now(),
217+
workerQueue: run.workerQueue,
218+
taskIdentifier: run.taskIdentifier,
219+
};
220+
},
221+
},
185222
});
186223

187224
this.worker = new Worker({

internal-packages/run-engine/src/run-queue/index.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import {
3939
InputPayload,
4040
OutputPayload,
4141
OutputPayloadV2,
42+
RunDataProvider,
4243
RunQueueKeyProducer,
4344
RunQueueKeyProducerEnvironment,
4445
RunQueueSelectionStrategy,
@@ -110,6 +111,12 @@ export type RunQueueOptions = {
110111
* 3. Old messages drain naturally as they're processed
111112
*/
112113
useOptimizedMessageFormat?: boolean;
114+
/**
115+
* Provider for fetching run data from PostgreSQL.
116+
* Required when using V3 optimized format for ack/nack operations.
117+
* Falls back to Redis message key if not provided (legacy behavior).
118+
*/
119+
runDataProvider?: RunDataProvider;
113120
};
114121

115122
export interface ConcurrencySweeperCallback {
@@ -194,9 +201,11 @@ export class RunQueue {
194201
private _meter: Meter;
195202
private _queueCooloffStates: Map<string, QueueCooloffState> = new Map();
196203
private _useOptimizedMessageFormat: boolean;
204+
private _runDataProvider?: RunDataProvider;
197205

198206
constructor(public readonly options: RunQueueOptions) {
199207
this._useOptimizedMessageFormat = options.useOptimizedMessageFormat ?? false;
208+
this._runDataProvider = options.runDataProvider;
200209
this.shardCount = options.shardCount ?? 2;
201210
this.retryOptions = options.retryOptions ?? defaultRetrySettings;
202211
this.redis = createRedisClient(options.redis, {
@@ -575,8 +584,43 @@ export class RunQueue {
575584
return this.redis.exists(this.keys.messageKey(orgId, messageId));
576585
}
577586

578-
public async readMessage(orgId: string, messageId: string) {
579-
return this.readMessageFromKey(this.keys.messageKey(orgId, messageId));
587+
public async readMessage(orgId: string, messageId: string): Promise<OutputPayload | undefined> {
588+
// First try to read from Redis (legacy V2 format)
589+
const redisMessage = await this.readMessageFromKey(this.keys.messageKey(orgId, messageId));
590+
if (redisMessage) {
591+
return redisMessage;
592+
}
593+
594+
// Fall back to runDataProvider (for V3 format where there's no message key)
595+
if (this._runDataProvider) {
596+
const runData = await this._runDataProvider.getRunData(messageId);
597+
if (runData) {
598+
// Convert RunData to OutputPayloadV2
599+
const queueKey = this.keys.queueKey(
600+
runData.orgId,
601+
runData.projectId,
602+
runData.environmentId,
603+
runData.taskIdentifier,
604+
runData.concurrencyKey
605+
);
606+
return {
607+
version: "2" as const,
608+
runId: messageId,
609+
taskIdentifier: runData.taskIdentifier,
610+
orgId: runData.orgId,
611+
projectId: runData.projectId,
612+
environmentId: runData.environmentId,
613+
environmentType: runData.environmentType,
614+
queue: queueKey,
615+
concurrencyKey: runData.concurrencyKey,
616+
timestamp: runData.timestamp,
617+
attempt: runData.attempt,
618+
workerQueue: runData.workerQueue,
619+
};
620+
}
621+
}
622+
623+
return undefined;
580624
}
581625

582626
public async readMessageFromKey(messageKey: string) {
@@ -2373,12 +2417,6 @@ export class RunQueue {
23732417
const descriptor = this.keys.descriptorFromQueue(decoded.queueKey);
23742418
const message = reconstructMessageFromWorkerEntry(decoded, descriptor);
23752419

2376-
// For V3 format: create the message key now that the run is executing.
2377-
// This allows ack/nack to work (they read from message key).
2378-
// Storage savings come from not having message keys for PENDING runs (the backlog).
2379-
const messageKey = this.keys.messageKey(descriptor.orgId, message.runId);
2380-
await this.redis.set(messageKey, JSON.stringify(message));
2381-
23822420
// Update the currentDequeued sets (this is done in the Lua script for legacy)
23832421
const queueCurrentDequeuedKey = this.keys.queueCurrentDequeuedKeyFromQueue(decoded.queueKey);
23842422
const envCurrentDequeuedKey = this.keys.envCurrentDequeuedKeyFromQueue(decoded.queueKey);

internal-packages/run-engine/src/run-queue/types.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,31 @@ export interface RunQueueSelectionStrategy {
133133
consumerId: string
134134
): Promise<Array<EnvQueues>>;
135135
}
136+
137+
/**
138+
* Provider for fetching run data from a persistent store (e.g., PostgreSQL).
139+
* Used for V3 optimized format where message data is not stored in Redis.
140+
*/
141+
export interface RunDataProvider {
142+
/**
143+
* Fetch run data for ack/nack operations.
144+
* Returns undefined if the run is not found.
145+
*/
146+
getRunData(runId: string): Promise<RunData | undefined>;
147+
}
148+
149+
/**
150+
* Run data needed for queue operations (ack, nack, release concurrency).
151+
*/
152+
export type RunData = {
153+
queue: string;
154+
orgId: string;
155+
projectId: string;
156+
environmentId: string;
157+
environmentType: RuntimeEnvironmentType;
158+
concurrencyKey?: string;
159+
attempt: number;
160+
timestamp: number;
161+
workerQueue: string;
162+
taskIdentifier: string;
163+
};

0 commit comments

Comments
 (0)