Skip to content

Commit e1bc7b5

Browse files
committed
refactor!: separate dependency injection from job hydration
1 parent c12654a commit e1bc7b5

File tree

6 files changed

+176
-139
lines changed

6 files changed

+176
-139
lines changed

README.md

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ Schedule jobs to run in the future:
239239
```typescript
240240
// Various time formats
241241
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
242-
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
243-
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
244-
await SendEmailJob.dispatch(payload).in('1d') // 1 day
242+
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
243+
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
244+
await SendEmailJob.dispatch(payload).in('1d') // 1 day
245245
```
246246

247247
## Priority
@@ -326,19 +326,44 @@ const config = {
326326
}
327327
```
328328

329+
### Handling Timeout Gracefully
330+
331+
Jobs have access to an abort signal via `this.signal` to handle timeouts gracefully:
332+
333+
```typescript
334+
export default class LongRunningJob extends Job<Payload> {
335+
static readonly jobName = 'LongRunningJob'
336+
337+
static options: JobOptions = {
338+
timeout: '30s',
339+
}
340+
341+
async execute(): Promise<void> {
342+
for (const item of this.payload.items) {
343+
// Check if the job has been aborted
344+
if (this.signal?.aborted) {
345+
throw new Error('Job timed out')
346+
}
347+
348+
await this.processItem(item)
349+
}
350+
}
351+
352+
private async processItem(item: any): Promise<void> {
353+
// Pass the signal to fetch or other async operations
354+
await fetch(item.url, { signal: this.signal })
355+
}
356+
}
357+
```
358+
329359
## Job Context
330360

331361
Every job has access to execution context via `this.context`. This provides metadata about the current job execution:
332362

333363
```typescript
334364
import { Job } from '@boringnode/queue'
335-
import type { JobContext } from '@boringnode/queue'
336365

337366
export default class MyJob extends Job<Payload> {
338-
constructor(payload: Payload, context: JobContext) {
339-
super(payload, context)
340-
}
341-
342367
async execute(): Promise<void> {
343368
console.log(`Job ID: ${this.context.jobId}`)
344369
console.log(`Attempt: ${this.context.attempt}`) // 1, 2, 3...
@@ -367,17 +392,17 @@ export default class MyJob extends Job<Payload> {
367392

368393
## Dependency Injection
369394

370-
Use the `jobFactory` option to integrate with IoC containers for dependency injection. This allows your jobs to receive injected services in their constructor.
395+
Use the `jobFactory` option to integrate with IoC containers for dependency injection. The constructor is reserved for injecting dependencies - payload and context are provided separately by the worker.
371396

372397
```typescript
373398
import { QueueManager } from '@boringnode/queue'
374399

375400
await QueueManager.init({
376401
default: 'redis',
377402
adapters: { redis: redis(connection) },
378-
jobFactory: async (JobClass, payload, context) => {
403+
jobFactory: async (JobClass) => {
379404
// Use your IoC container to instantiate jobs
380-
return app.container.make(JobClass, [payload, context])
405+
return app.container.make(JobClass)
381406
},
382407
})
383408
```
@@ -386,7 +411,6 @@ Example with injected dependencies:
386411

387412
```typescript
388413
import { Job } from '@boringnode/queue'
389-
import type { JobContext } from '@boringnode/queue'
390414

391415
interface SendEmailPayload {
392416
to: string
@@ -397,12 +421,10 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
397421
static readonly jobName = 'SendEmailJob'
398422

399423
constructor(
400-
payload: SendEmailPayload,
401-
context: JobContext,
402424
private mailer: MailerService, // Injected by IoC container
403425
private logger: Logger // Injected by IoC container
404426
) {
405-
super(payload, context)
427+
super()
406428
}
407429

408430
async execute(): Promise<void> {
@@ -412,7 +434,7 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
412434
}
413435
```
414436

415-
Without a `jobFactory`, jobs are instantiated with `new JobClass(payload, context)`.
437+
Without a `jobFactory`, jobs are instantiated with `new JobClass()`.
416438

417439
## Scheduled Jobs
418440

src/drivers/sync_adapter.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,21 +122,20 @@ export class SyncAdapter implements Adapter {
122122
throw new Error(`Job class ${jobName} not found.`)
123123
}
124124

125-
const context: JobContext = Object.freeze({
125+
const context: JobContext = {
126126
jobId: `sync-${Date.now()}`,
127127
name: jobName,
128128
attempt: 1,
129129
queue,
130130
priority: DEFAULT_PRIORITY,
131131
acquiredAt: new Date(),
132132
stalledCount: 0,
133-
})
133+
}
134134

135135
const jobFactory = QueueManager.getJobFactory()
136-
const jobInstance = jobFactory
137-
? await jobFactory(JobClass, payload, context)
138-
: new JobClass(payload, context)
136+
const jobInstance = jobFactory ? await jobFactory(JobClass) : new JobClass()
139137

138+
jobInstance.$hydrate(payload, context)
140139
await jobInstance.execute()
141140
}
142141
}

src/job.ts

Lines changed: 79 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,14 @@ import type { JobContext, JobOptions } from './types/main.js'
88
* Extend this class to create your own jobs. Each job must implement
99
* the `execute()` method which contains the job's business logic.
1010
*
11+
* The constructor is reserved for dependency injection. Payload and context
12+
* are provided separately via the `$hydrate()` method (called by the worker).
13+
*
1114
* @typeParam Payload - The type of data this job receives
1215
*
1316
* @example
1417
* ```typescript
1518
* import { Job } from '@boringnode/queue'
16-
* import type { JobContext } from '@boringnode/queue'
1719
*
1820
* interface SendEmailPayload {
1921
* to: string
@@ -27,13 +29,14 @@ import type { JobContext, JobOptions } from './types/main.js'
2729
* maxRetries: 3,
2830
* }
2931
*
30-
* constructor(payload: SendEmailPayload, context: JobContext) {
31-
* super(payload, context)
32+
* // Constructor is for dependency injection only
33+
* constructor(private mailer: MailerService) {
34+
* super()
3235
* }
3336
*
3437
* async execute() {
3538
* console.log(`Attempt ${this.context.attempt} for job ${this.context.jobId}`)
36-
* await sendEmail(this.payload.to, this.payload.subject, this.payload.body)
39+
* await this.mailer.send(this.payload.to, this.payload.subject, this.payload.body)
3740
* }
3841
*
3942
* async failed(error: Error) {
@@ -43,13 +46,43 @@ import type { JobContext, JobOptions } from './types/main.js'
4346
* ```
4447
*/
4548
export abstract class Job<Payload = any> {
46-
readonly #payload: Payload
47-
readonly #context: JobContext
49+
#payload!: Payload
50+
#context!: JobContext
51+
#signal?: AbortSignal
4852

49-
/** Static options for this job class (queue, retries, timeout, etc.) */
53+
/**
54+
* Static options for this job class.
55+
*
56+
* Override this property in subclasses to configure job behavior
57+
* such as queue name, retry policy, timeout, and more.
58+
*
59+
* @example
60+
* ```typescript
61+
* class SendEmailJob extends Job<SendEmailPayload> {
62+
* static options = {
63+
* queue: 'emails',
64+
* maxRetries: 3,
65+
* timeout: '30s',
66+
* }
67+
* }
68+
* ```
69+
*/
5070
static options: JobOptions = {}
5171

52-
/** The payload data passed to this job instance */
72+
/**
73+
* The payload data passed to this job instance.
74+
*
75+
* Contains the data provided when the job was dispatched.
76+
* Available after the job has been hydrated by the worker.
77+
*
78+
* @example
79+
* ```typescript
80+
* async execute() {
81+
* const { to, subject, body } = this.payload
82+
* await sendEmail(to, subject, body)
83+
* }
84+
* ```
85+
*/
5386
get payload(): Payload {
5487
return this.#payload
5588
}
@@ -75,14 +108,42 @@ export abstract class Job<Payload = any> {
75108
}
76109

77110
/**
78-
* Create a new job instance.
111+
* The abort signal for timeout handling.
112+
*
113+
* Check `signal.aborted` in long-running operations to handle timeouts gracefully.
114+
*
115+
* @example
116+
* ```typescript
117+
* async execute() {
118+
* for (const item of this.payload.items) {
119+
* if (this.signal?.aborted) {
120+
* throw new Error('Job timed out')
121+
* }
122+
* await processItem(item)
123+
* }
124+
* }
125+
* ```
126+
*/
127+
get signal(): AbortSignal | undefined {
128+
return this.#signal
129+
}
130+
131+
/**
132+
* Hydrate the job with payload, context, and optional abort signal.
133+
*
134+
* This method is called by the worker after instantiation to provide
135+
* the job's runtime data. It should not be called directly by user code.
79136
*
80137
* @param payload - The data to be processed by this job
81-
* @param context - The job execution context (provided by the worker)
138+
* @param context - The job execution context
139+
* @param signal - Optional abort signal for timeout handling
140+
*
141+
* @internal
82142
*/
83-
constructor(payload: Payload, context: JobContext) {
143+
$hydrate(payload: Payload, context: JobContext, signal?: AbortSignal): void {
84144
this.#payload = payload
85145
this.#context = Object.freeze(context)
146+
this.#signal = signal
86147
}
87148

88149
/**
@@ -109,7 +170,7 @@ export abstract class Job<Payload = any> {
109170
* ```
110171
*/
111172
static dispatch<T extends Job>(
112-
this: new (payload: any, context: JobContext) => T,
173+
this: new (...args: any[]) => T,
113174
payload: T extends Job<infer P> ? P : never
114175
): JobDispatcher<T extends Job<infer P> ? P : never> {
115176
const dispatcher = new JobDispatcher<T extends Job<infer P> ? P : never>(
@@ -158,7 +219,7 @@ export abstract class Job<Payload = any> {
158219
* ```
159220
*/
160221
static schedule<T extends Job>(
161-
this: new (payload: any, context: JobContext) => T,
222+
this: new (...args: any[]) => T,
162223
payload: T extends Job<infer P> ? P : never
163224
): ScheduleBuilder {
164225
return new ScheduleBuilder((this as any).jobName, payload)
@@ -170,23 +231,23 @@ export abstract class Job<Payload = any> {
170231
* This method is called by the worker when processing the job.
171232
* Implement your job's logic here.
172233
*
173-
* @param signal - Optional AbortSignal for timeout handling.
174-
* Check `signal.aborted` for long-running operations.
234+
* For timeout handling, use `this.signal` which is available after hydration.
235+
*
175236
* @throws Any error thrown will trigger retry logic (if configured)
176237
*
177238
* @example
178239
* ```typescript
179-
* async execute(signal?: AbortSignal) {
240+
* async execute() {
180241
* for (const item of this.payload.items) {
181-
* if (signal?.aborted) {
242+
* if (this.signal?.aborted) {
182243
* throw new Error('Job timed out')
183244
* }
184245
* await processItem(item)
185246
* }
186247
* }
187248
* ```
188249
*/
189-
abstract execute(signal?: AbortSignal): Promise<void>
250+
abstract execute(): Promise<void>
190251

191252
/**
192253
* Called when the job has permanently failed (after all retries exhausted).

src/types/main.ts

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -181,39 +181,40 @@ export interface JobContext {
181181
stalledCount: number
182182
}
183183

184-
export type JobClass<T extends Job = Job> = (new (payload: any, context: JobContext) => T) & {
184+
/**
185+
* Type representing a Job class constructor.
186+
*
187+
* The constructor accepts any arguments for dependency injection.
188+
* Payload and context are provided separately via `$hydrate()`.
189+
*/
190+
export type JobClass<T extends Job = Job> = (new (...args: any[]) => T) & {
185191
options?: JobOptions
186192
}
187193

188194
/**
189195
* Factory function for custom job instantiation.
190196
*
191197
* Use this to integrate with IoC containers for dependency injection.
192-
* The factory receives the job class, payload, and context, and must return
193-
* a job instance (or a Promise that resolves to one).
198+
* The factory receives only the job class and should return an instance
199+
* with all dependencies injected. The worker will call `$hydrate()` separately
200+
* to provide payload, context, and signal.
194201
*
195202
* @param JobClass - The job class to instantiate
196-
* @param payload - The payload data for the job
197-
* @param context - The job execution context (jobId, attempt, queue, etc.)
198203
* @returns The job instance, or a Promise resolving to the instance
199204
*
200205
* @example
201206
* ```typescript
202207
* // With AdonisJS IoC container
203-
* const worker = new Worker({
204-
* worker: {
205-
* jobFactory: async (JobClass, payload, context) => {
206-
* return app.container.make(JobClass, [payload, context])
207-
* }
208+
* await QueueManager.init({
209+
* default: 'redis',
210+
* adapters: { redis: redis() },
211+
* jobFactory: async (JobClass) => {
212+
* return app.container.make(JobClass)
208213
* }
209214
* })
210215
* ```
211216
*/
212-
export type JobFactory = (
213-
JobClass: JobClass,
214-
payload: any,
215-
context: JobContext
216-
) => Job | Promise<Job>
217+
export type JobFactory = (JobClass: JobClass) => Job | Promise<Job>
217218

218219
export interface RetryConfig {
219220
maxRetries?: number
@@ -414,15 +415,17 @@ export interface QueueManagerConfig {
414415
* Custom factory function for job instantiation.
415416
*
416417
* Use this to integrate with IoC containers for dependency injection.
417-
* When provided, this factory is called instead of `new JobClass(payload, context)`.
418+
* When provided, this factory is called instead of `new JobClass()`.
419+
* The worker will call `$hydrate()` on the returned instance to provide
420+
* payload, context, and signal.
418421
*
419422
* @example
420423
* ```typescript
421424
* await QueueManager.init({
422425
* default: 'redis',
423426
* adapters: { redis: redis() },
424-
* jobFactory: async (JobClass, payload, context) => {
425-
* return app.container.make(JobClass, [payload, context])
427+
* jobFactory: async (JobClass) => {
428+
* return app.container.make(JobClass)
426429
* }
427430
* })
428431
* ```

0 commit comments

Comments
 (0)