Skip to content

Commit 9bc5fa8

Browse files
committed
feat: add dispatchMany for bulk job dispatch
1 parent b2594d2 commit 9bc5fa8

File tree

12 files changed

+674
-1
lines changed

12 files changed

+674
-1
lines changed

.changelog/push-many.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# Bulk job dispatch
2+
3+
## New feature
4+
5+
### dispatchMany
6+
7+
Jobs can now be dispatched in batches using `Job.dispatchMany()`. This is more efficient than calling `dispatch()` multiple times as it uses optimized batch operations (Redis MULTI/EXEC transaction, SQL batch insert).
8+
9+
```typescript
10+
// Dispatch multiple jobs at once
11+
const { jobIds } = await SendEmailJob.dispatchMany([
12+
{ to: 'user1@example.com', subject: 'Newsletter' },
13+
{ to: 'user2@example.com', subject: 'Newsletter' },
14+
{ to: 'user3@example.com', subject: 'Newsletter' },
15+
])
16+
.group('newsletter-jan-2025')
17+
.toQueue('emails')
18+
.priority(3)
19+
.run()
20+
21+
console.log(`Dispatched ${jobIds.length} jobs`)
22+
```
23+
24+
### Use cases
25+
26+
- **Newsletters**: Send thousands of emails in a single batch operation
27+
- **Bulk exports**: Create export jobs for multiple users
28+
- **Data migrations**: Queue many transformation jobs at once
29+
- **Notifications**: Dispatch notifications to many recipients
30+
31+
### API
32+
33+
The `JobBatchDispatcher` supports the same fluent API as `JobDispatcher`:
34+
35+
```typescript
36+
await SendEmailJob.dispatchMany(payloads)
37+
.toQueue('emails') // Target queue
38+
.group('batch-123') // Group all jobs together
39+
.priority(1) // Set priority for all jobs
40+
.with('redis') // Use specific adapter
41+
.run()
42+
```
43+
44+
### Adapter API
45+
46+
For low-level access, adapters now support `pushMany()` and `pushManyOn()`:
47+
48+
```typescript
49+
await adapter.pushManyOn('emails', [
50+
{ id: 'uuid1', name: 'SendEmailJob', payload: {...}, attempts: 0 },
51+
{ id: 'uuid2', name: 'SendEmailJob', payload: {...}, attempts: 0 },
52+
])
53+
```

index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ export { QueueManager } from './src/queue_manager.js'
44
export { Locator } from './src/locator.js'
55
export { Schedule } from './src/schedule.js'
66
export { ScheduleBuilder } from './src/schedule_builder.js'
7-
export type { AdapterFactory, JobFactory } from './src/types/main.js'
7+
export { JobBatchDispatcher } from './src/job_batch_dispatcher.js'
8+
export type { AdapterFactory, JobFactory, DispatchManyResult } from './src/types/main.js'
89
export {
910
customBackoff,
1011
linearBackoff,

src/contracts/adapter.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,27 @@ export interface Adapter {
147147
*/
148148
pushLaterOn(queue: string, jobData: JobData, delay: number): Promise<void>
149149

150+
/**
151+
* Push multiple jobs to the default queue for immediate processing.
152+
*
153+
* This is more efficient than calling push() multiple times as it
154+
* batches the operations (e.g., Redis pipeline, SQL batch insert).
155+
*
156+
* @param jobs - Array of job data to push
157+
*/
158+
pushMany(jobs: JobData[]): Promise<void>
159+
160+
/**
161+
* Push multiple jobs to a specific queue for immediate processing.
162+
*
163+
* This is more efficient than calling pushOn() multiple times as it
164+
* batches the operations (e.g., Redis pipeline, SQL batch insert).
165+
*
166+
* @param queue - The queue name to push to
167+
* @param jobs - Array of job data to push
168+
*/
169+
pushManyOn(queue: string, jobs: JobData[]): Promise<void>
170+
150171
/**
151172
* Get the number of pending jobs in the default queue.
152173
*

src/drivers/knex_adapter.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,27 @@ export class KnexAdapter implements Adapter {
445445
})
446446
}
447447

448+
async pushMany(jobs: JobData[]): Promise<void> {
449+
return this.pushManyOn('default', jobs)
450+
}
451+
452+
async pushManyOn(queue: string, jobs: JobData[]): Promise<void> {
453+
if (jobs.length === 0) return
454+
455+
await this.#ensureTables()
456+
457+
const now = Date.now()
458+
const rows = jobs.map((job) => ({
459+
id: job.id,
460+
queue,
461+
status: 'pending' as const,
462+
data: JSON.stringify(job),
463+
score: calculateScore(job.priority ?? DEFAULT_PRIORITY, now),
464+
}))
465+
466+
await this.#connection(this.#jobsTable).insert(rows)
467+
}
468+
448469
async size(): Promise<number> {
449470
return this.sizeOf('default')
450471
}

src/drivers/redis_adapter.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -648,6 +648,27 @@ export class RedisAdapter implements Adapter {
648648
)
649649
}
650650

651+
pushMany(jobs: JobData[]): Promise<void> {
652+
return this.pushManyOn('default', jobs)
653+
}
654+
655+
async pushManyOn(queue: string, jobs: JobData[]): Promise<void> {
656+
if (jobs.length === 0) return
657+
658+
const keys = this.#getKeys(queue)
659+
const now = Date.now()
660+
const multi = this.#connection.multi()
661+
662+
for (const job of jobs) {
663+
const priority = job.priority ?? DEFAULT_PRIORITY
664+
const score = calculateScore(priority, now)
665+
multi.hset(keys.data, job.id, JSON.stringify(job))
666+
multi.zadd(keys.pending, score, job.id)
667+
}
668+
669+
await multi.exec()
670+
}
671+
651672
size(): Promise<number> {
652673
return this.sizeOf('default')
653674
}

src/drivers/sync_adapter.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ export class SyncAdapter implements Adapter {
4545
return Promise.resolve()
4646
}
4747

48+
pushMany(jobs: JobData[]): Promise<void> {
49+
return this.pushManyOn('default', jobs)
50+
}
51+
52+
async pushManyOn(queue: string, jobs: JobData[]): Promise<void> {
53+
for (const job of jobs) {
54+
await this.pushOn(queue, job)
55+
}
56+
}
57+
4858
size(): Promise<number> {
4959
return this.sizeOf('default')
5060
}

src/job.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { JobDispatcher } from './job_dispatcher.js'
2+
import { JobBatchDispatcher } from './job_batch_dispatcher.js'
23
import { ScheduleBuilder } from './schedule_builder.js'
34
import type { JobContext, JobOptions } from './types/main.js'
45

@@ -193,6 +194,57 @@ export abstract class Job<Payload = any> {
193194
return dispatcher
194195
}
195196

197+
/**
198+
* Dispatch multiple jobs to the queue in a single batch.
199+
*
200+
* Returns a JobBatchDispatcher for fluent configuration before dispatching.
201+
* The jobs are not actually dispatched until `.run()` is called or the
202+
* dispatcher is awaited.
203+
*
204+
* This is more efficient than calling `dispatch()` multiple times as it
205+
* uses batched operations (e.g., Redis pipeline, SQL batch insert).
206+
*
207+
* @param payloads - Array of data to pass to each job
208+
* @returns A JobBatchDispatcher for fluent configuration
209+
*
210+
* @example
211+
* ```typescript
212+
* // Batch dispatch for newsletter
213+
* const { jobIds } = await SendEmailJob.dispatchMany([
214+
* { to: 'user1@example.com', subject: 'Newsletter' },
215+
* { to: 'user2@example.com', subject: 'Newsletter' },
216+
* ])
217+
* .group('newsletter-jan-2025')
218+
* .toQueue('emails')
219+
* .run()
220+
*
221+
* console.log(`Dispatched ${jobIds.length} jobs`)
222+
* ```
223+
*/
224+
static dispatchMany<T extends Job>(
225+
this: new (...args: any[]) => T,
226+
payloads: (T extends Job<infer P> ? P : never)[]
227+
): JobBatchDispatcher<T extends Job<infer P> ? P : never> {
228+
const options = (this as any).options || {}
229+
const jobName = options.name || this.name
230+
231+
const dispatcher = new JobBatchDispatcher<T extends Job<infer P> ? P : never>(jobName, payloads)
232+
233+
if (options.queue) {
234+
dispatcher.toQueue(options.queue)
235+
}
236+
237+
if (options.adapter) {
238+
dispatcher.with(options.adapter)
239+
}
240+
241+
if (options.priority !== undefined) {
242+
dispatcher.priority(options.priority)
243+
}
244+
245+
return dispatcher
246+
}
247+
196248
/**
197249
* Create a schedule for this job.
198250
*

0 commit comments

Comments
 (0)