Skip to content

Commit e1df4d0

Browse files
committed
Added priority support when triggering
1 parent 9326c6b commit e1df4d0

File tree

6 files changed

+86
-0
lines changed

6 files changed

+86
-0
lines changed

internal-packages/run-engine/src/engine/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,7 @@ export class RunEngine {
13051305
/** @deprecated */
13061306
baseCostInCents: run.baseCostInCents,
13071307
traceContext: run.traceContext as Record<string, string | undefined>,
1308+
priority: run.priorityMs === 0 ? undefined : run.priorityMs / 1_000,
13081309
},
13091310
queue: {
13101311
id: queue.friendlyId,

packages/core/src/v3/schemas/api.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ export const TriggerTaskRequestBody = z.object({
118118
tags: RunTags.optional(),
119119
test: z.boolean().optional(),
120120
ttl: z.string().or(z.number().nonnegative().int()).optional(),
121+
priority: z.number().optional(),
121122
})
122123
.optional(),
123124
});
@@ -160,6 +161,7 @@ export const BatchTriggerTaskItem = z.object({
160161
tags: RunTags.optional(),
161162
test: z.boolean().optional(),
162163
ttl: z.string().or(z.number().nonnegative().int()).optional(),
164+
priority: z.number().optional(),
163165
})
164166
.optional(),
165167
});

packages/core/src/v3/schemas/common.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,8 @@ export const TaskRun = z.object({
229229
* @link https://trigger.dev/docs/run-usage
230230
*/
231231
baseCostInCents: z.number().default(0),
232+
/** The priority of the run. Wih a value of 10 it will be dequeued before runs that were triggered 9 seconds before it (assuming they had no priority set). */
233+
priority: z.number().optional(),
232234
});
233235

234236
export type TaskRun = z.infer<typeof TaskRun>;

packages/core/src/v3/types/tasks.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -707,9 +707,26 @@ export type TriggerOptions = {
707707
* Specify a duration string like "1h", "10s", "30m", etc.
708708
*/
709709
idempotencyKeyTTL?: string;
710+
711+
/**
712+
* The maximum number of retry attempts for the task if it fails.
713+
* If not specified, it will use the task or the default retry policy from your trigger.config file.
714+
*/
710715
maxAttempts?: number;
716+
717+
/**
718+
* You can override the queue for the task. If a queue doesn't exist for the given name, it will be created.
719+
* Setting the `concurrencyLimit` here will modify the limit for this queue everywhere it's used.
720+
*/
711721
queue?: TaskRunConcurrencyOptions;
722+
723+
/**
724+
* The `concurrencyKey` creates a copy of the queue for every unique value of the key.
725+
* For example, if the queue (set when triggering or on the task) has a concurrency limit of 10,
726+
* and you set the concurrency key to `userId`, then each user will have their own queue with a concurrency limit of 10.
727+
*/
712728
concurrencyKey?: string;
729+
713730
/**
714731
* The delay before the task is executed. This can be a string like "1h" or a Date object.
715732
*
@@ -739,6 +756,26 @@ export type TriggerOptions = {
739756
*/
740757
ttl?: string | number;
741758

759+
/**
760+
* If triggered at the same time, a higher priority run will be executed first.
761+
*
762+
The value is a time offset in seconds that determines the order of dequeuing.
763+
* If you trigger two runs 9 seconds apart but the second one has `priority: 10`, it will be executed before the first one.
764+
*
765+
* @example
766+
* ```ts
767+
// no priority = 0
768+
await myTask.trigger({ foo: "bar" });
769+
770+
//... imagine 9s pass by
771+
772+
// this run will start before the run above that was triggered 9s ago (with no priority)
773+
await myTask.trigger({ foo: "bar" }, { priority: 10 });
774+
```
775+
*
776+
*/
777+
priority?: number;
778+
742779
/**
743780
* Tags to attach to the run. Tags can be used to filter runs in the dashboard and using the SDK.
744781
*
@@ -777,6 +814,10 @@ export type TriggerOptions = {
777814
export type TriggerAndWaitOptions = TriggerOptions;
778815

779816
export type BatchTriggerOptions = {
817+
/**
818+
* If no idempotencyKey is set on an individual item in the batch, it will use this key on each item + the array index.
819+
* This is useful to prevent work being done again if the task has to retry.
820+
*/
780821
idempotencyKey?: IdempotencyKey | string | string[];
781822
idempotencyKeyTTL?: string;
782823

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -619,6 +619,7 @@ export async function batchTriggerById<TTask extends AnyTask>(
619619
(await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey,
620620
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
621621
machine: item.options?.machine,
622+
priority: item.options?.priority,
622623
},
623624
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
624625
})
@@ -786,6 +787,7 @@ export async function batchTriggerByIdAndWait<TTask extends AnyTask>(
786787
batchItemIdempotencyKey,
787788
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
788789
machine: item.options?.machine,
790+
priority: item.options?.priority,
789791
},
790792
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
791793
})
@@ -944,6 +946,7 @@ export async function batchTriggerTasks<TTasks extends readonly AnyTask[]>(
944946
(await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey,
945947
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
946948
machine: item.options?.machine,
949+
priority: item.options?.priority,
947950
},
948951
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
949952
})
@@ -1113,6 +1116,7 @@ export async function batchTriggerAndWaitTasks<TTasks extends readonly AnyTask[]
11131116
batchItemIdempotencyKey,
11141117
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
11151118
machine: item.options?.machine,
1119+
priority: item.options?.priority,
11161120
},
11171121
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
11181122
})
@@ -1184,6 +1188,7 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
11841188
maxDuration: options?.maxDuration,
11851189
parentRunId: taskContext.ctx?.run.id,
11861190
machine: options?.machine,
1191+
priority: options?.priority,
11871192
},
11881193
},
11891194
{
@@ -1250,6 +1255,7 @@ async function batchTrigger_internal<TRunTypes extends AnyRunTypes>(
12501255
(await makeIdempotencyKey(item.options?.idempotencyKey)) ?? batchItemIdempotencyKey,
12511256
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
12521257
machine: item.options?.machine,
1258+
priority: item.options?.priority,
12531259
},
12541260
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
12551261
})
@@ -1333,6 +1339,7 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
13331339
idempotencyKey: await makeIdempotencyKey(options?.idempotencyKey),
13341340
idempotencyKeyTTL: options?.idempotencyKeyTTL,
13351341
machine: options?.machine,
1342+
priority: options?.priority,
13361343
},
13371344
},
13381345
{},
@@ -1418,6 +1425,7 @@ async function batchTriggerAndWait_internal<TIdentifier extends string, TPayload
14181425
batchItemIdempotencyKey,
14191426
idempotencyKeyTTL: item.options?.idempotencyKeyTTL ?? options?.idempotencyKeyTTL,
14201427
machine: item.options?.machine,
1428+
priority: item.options?.priority,
14211429
},
14221430
} satisfies BatchTriggerTaskV2RequestBody["items"][0];
14231431
})
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import { logger, task } from "@trigger.dev/sdk/v3";
2+
3+
export const priorityParent = task({
4+
id: "priority-parent",
5+
run: async (payload: any, { ctx }) => {
6+
logger.log("Hello, world from the parent", { payload });
7+
8+
const batch1 = await priorityChild.batchTriggerAndWait([
9+
{
10+
payload: { order: 1 },
11+
},
12+
{
13+
payload: { order: 2 },
14+
options: { priority: 1 },
15+
},
16+
{
17+
payload: { order: 3 },
18+
options: { priority: 2 },
19+
},
20+
]);
21+
},
22+
});
23+
24+
export const priorityChild = task({
25+
id: "priority-child",
26+
queue: {
27+
concurrencyLimit: 1,
28+
},
29+
run: async ({ order }: { order: number }, { ctx }) => {
30+
logger.log(`Priority ${ctx.run.priority}`);
31+
},
32+
});

0 commit comments

Comments
 (0)