Skip to content

Commit 7061d3a

Browse files
committed
child tasks inherit the queue timestamp from their parent tasks to prioritize completing child tasks based on when their parent started
1 parent ac8fa75 commit 7061d3a

File tree

7 files changed

+138
-4
lines changed

7 files changed

+138
-4
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ export class MarQS {
174174
messageId: string,
175175
messageData: Record<string, unknown>,
176176
concurrencyKey?: string,
177-
timestamp?: number
177+
timestamp?: number | Date
178178
) {
179179
return await this.#trace(
180180
"enqueueMessage",
@@ -190,7 +190,12 @@ export class MarQS {
190190
data: messageData,
191191
queue: messageQueue,
192192
concurrencyKey,
193-
timestamp: timestamp ?? Date.now(),
193+
timestamp:
194+
typeof timestamp === "undefined"
195+
? Date.now()
196+
: typeof timestamp === "number"
197+
? timestamp
198+
: timestamp.getTime(),
194199
messageId,
195200
parentQueue,
196201
};

apps/webapp/app/v3/services/enqueueDelayedRun.server.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ export class EnqueueDelayedRunService extends BaseService {
9494
environmentId: run.runtimeEnvironment.id,
9595
environmentType: run.runtimeEnvironment.type,
9696
},
97-
run.concurrencyKey ?? undefined
97+
run.concurrencyKey ?? undefined,
98+
run.queueTimestamp ?? undefined
9899
);
99100
}
100101
}

apps/webapp/app/v3/services/resumeBatchRun.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ export class ResumeBatchRunService extends BaseService {
186186
dependentTaskAttemptId: dependentTaskAttempt.id,
187187
});
188188

189+
// TODO: use the new priority queue thingie
189190
await marqs?.enqueueMessage(
190191
environment,
191192
dependentRun.queue,

apps/webapp/app/v3/services/resumeTaskDependency.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ export class ResumeTaskDependencyService extends BaseService {
4949
runId: dependentRun.id,
5050
}
5151
);
52+
53+
// TODO: use the new priority queue thingie
5254
await marqs?.enqueueMessage(
5355
dependency.taskRun.runtimeEnvironment,
5456
dependentRun.queue,

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,7 @@ export class TriggerTaskService extends BaseService {
186186
taskIdentifier: true,
187187
rootTaskRunId: true,
188188
depth: true,
189+
queueTimestamp: true,
189190
},
190191
},
191192
},
@@ -242,6 +243,7 @@ export class TriggerTaskService extends BaseService {
242243
taskIdentifier: true,
243244
rootTaskRunId: true,
244245
depth: true,
246+
queueTimestamp: true,
245247
},
246248
},
247249
},
@@ -367,6 +369,12 @@ export class TriggerTaskService extends BaseService {
367369
? dependentBatchRun.dependentTaskAttempt.taskRun.depth + 1
368370
: 0;
369371

372+
const queueTimestamp =
373+
dependentAttempt?.taskRun.queueTimestamp ??
374+
dependentBatchRun?.dependentTaskAttempt?.taskRun.queueTimestamp ??
375+
delayUntil ??
376+
new Date();
377+
370378
const taskRun = await tx.taskRun.create({
371379
data: {
372380
status: delayUntil ? "DELAYED" : "PENDING",
@@ -394,6 +402,7 @@ export class TriggerTaskService extends BaseService {
394402
isTest: body.options?.test ?? false,
395403
delayUntil,
396404
queuedAt: delayUntil ? undefined : new Date(),
405+
queueTimestamp,
397406
maxAttempts: body.options?.maxAttempts,
398407
taskEventStore: getTaskEventStore(),
399408
ttl,
@@ -573,7 +582,8 @@ export class TriggerTaskService extends BaseService {
573582
environmentId: environment.id,
574583
environmentType: environment.type,
575584
},
576-
body.options?.concurrencyKey
585+
body.options?.concurrencyKey,
586+
run.queueTimestamp ?? undefined
577587
);
578588
}
579589

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
-- DropIndex
2+
DROP INDEX "SecretStore_key_idx";
3+
4+
-- AlterTable
5+
ALTER TABLE "TaskRun" ADD COLUMN "queueTimestamp" TIMESTAMP(3);
6+
7+
-- CreateIndex
8+
CREATE INDEX "SecretStore_key_idx" ON "SecretStore"("key" text_pattern_ops);

references/v3-catalog/src/trigger/concurrency.ts

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,3 +129,110 @@ export const testReserveConcurrencyRecursiveWaits = task({
129129
};
130130
},
131131
});
132+
133+
export const testChildTaskPriorityController = task({
134+
id: "test-child-task-priority-controller",
135+
run: async ({ delay = 5000 }: { delay: number }) => {
136+
logger.info(`Delaying for ${delay}ms`);
137+
138+
await new Promise((resolve) => setTimeout(resolve, delay));
139+
140+
return {
141+
completedAt: new Date(),
142+
};
143+
},
144+
});
145+
146+
export const testChildTaskPriorityParent = task({
147+
id: "test-child-task-priority-parent",
148+
run: async ({ delay = 5000 }: { delay: number }) => {
149+
logger.info(`Delaying for ${delay}ms`);
150+
151+
await new Promise((resolve) => setTimeout(resolve, delay));
152+
153+
await testChildTaskPriorityChild.triggerAndWait({
154+
delay,
155+
});
156+
157+
logger.info(`Delay of ${delay}ms completed`);
158+
159+
return {
160+
completedAt: new Date(),
161+
};
162+
},
163+
});
164+
165+
export const testChildTaskPriorityChildCreator = task({
166+
id: "test-child-task-priority-child-creator",
167+
run: async ({ delay = 5000 }: { delay: number }) => {
168+
await testChildTaskPriorityChild.batchTrigger([
169+
{ payload: { delay, propagate: false } },
170+
{ payload: { delay, propagate: false } },
171+
{ payload: { delay, propagate: false } },
172+
{ payload: { delay, propagate: false } },
173+
]);
174+
175+
return {
176+
completedAt: new Date(),
177+
};
178+
},
179+
});
180+
181+
export const testChildTaskPriorityChild = task({
182+
id: "test-child-task-priority-child",
183+
queue: {
184+
concurrencyLimit: 1,
185+
},
186+
run: async ({ delay = 5000, propagate }: { delay: number; propagate?: boolean }) => {
187+
logger.info(`Delaying for ${delay}ms`);
188+
189+
await new Promise((resolve) => setTimeout(resolve, delay));
190+
191+
if (typeof propagate === "undefined" || propagate) {
192+
await testChildTaskPriorityGrandChild.triggerAndWait({
193+
delay,
194+
});
195+
}
196+
197+
logger.info(`Delay of ${delay}ms completed`);
198+
199+
return {
200+
completedAt: new Date(),
201+
};
202+
},
203+
});
204+
205+
export const testChildTaskPriorityGrandChildCreator = task({
206+
id: "test-child-task-priority-grand-child-creator",
207+
run: async ({ delay = 5000 }: { delay: number }) => {
208+
await testChildTaskPriorityGrandChild.batchTrigger([
209+
{ payload: { delay } },
210+
{ payload: { delay } },
211+
{ payload: { delay } },
212+
]);
213+
214+
logger.info(`Delay of ${delay}ms completed`);
215+
216+
return {
217+
completedAt: new Date(),
218+
};
219+
},
220+
});
221+
222+
export const testChildTaskPriorityGrandChild = task({
223+
id: "test-child-task-priority-grandchild",
224+
queue: {
225+
concurrencyLimit: 1,
226+
},
227+
run: async ({ delay = 5000 }: { delay: number }) => {
228+
logger.info(`Delaying for ${delay}ms`);
229+
230+
await new Promise((resolve) => setTimeout(resolve, delay));
231+
232+
logger.info(`Delay of ${delay}ms completed`);
233+
234+
return {
235+
completedAt: new Date(),
236+
};
237+
},
238+
});

0 commit comments

Comments
 (0)