Skip to content

Commit 8d67a4b

Browse files
committed
fix issue with swallowing batch completion error in the callback
1 parent e78b619 commit 8d67a4b

File tree

6 files changed

+100
-17
lines changed

6 files changed

+100
-17
lines changed

apps/webapp/app/env.server.ts

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -941,23 +941,15 @@ const EnvironmentSchema = z
941941
BATCH_TRIGGER_WORKER_REDIS_CLUSTER_MODE_ENABLED: z.string().default("0"),
942942

943943
// BatchQueue DRR settings (Run Engine v2)
944-
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(5),
945-
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(50),
946-
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().optional(),
947-
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().optional(),
944+
BATCH_QUEUE_DRR_QUANTUM: z.coerce.number().int().default(25),
945+
BATCH_QUEUE_MAX_DEFICIT: z.coerce.number().int().default(100),
946+
BATCH_QUEUE_CONSUMER_COUNT: z.coerce.number().int().default(3),
947+
BATCH_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(50),
948948
// Global rate limit: max items processed per second across all consumers
949949
// If not set, no global rate limiting is applied
950950
BATCH_QUEUE_GLOBAL_RATE_LIMIT: z.coerce.number().int().positive().optional(),
951-
952-
// Batch rate limits and concurrency by plan type
953-
// Rate limit: max items per minute for batch creation
954-
BATCH_RATE_LIMIT_FREE: z.coerce.number().int().default(100), // 100 items/min for free
955-
BATCH_RATE_LIMIT_PAID: z.coerce.number().int().default(10_000), // 10k items/min for paid
956-
BATCH_RATE_LIMIT_ENTERPRISE: z.coerce.number().int().default(100_000), // 100k items/min for enterprise
957-
// Processing concurrency: max concurrent batch items being processed
958-
BATCH_CONCURRENCY_FREE: z.coerce.number().int().default(1),
959-
BATCH_CONCURRENCY_PAID: z.coerce.number().int().default(10),
960-
BATCH_CONCURRENCY_ENTERPRISE: z.coerce.number().int().default(50),
951+
// Processing concurrency: max concurrent batch items being processed per environment
952+
BATCH_CONCURRENCY_DEFAULT_CONCURRENCY: z.coerce.number().int().default(1),
961953

962954
ADMIN_WORKER_ENABLED: z.string().default(process.env.WORKER_ENABLED ?? "true"),
963955
ADMIN_WORKER_CONCURRENCY_WORKERS: z.coerce.number().int().default(2),

apps/webapp/app/routes/api.v3.batches.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const { action, loader } = createActionApiRoute(
3131
headers: HeadersSchema,
3232
body: CreateBatchRequestBody,
3333
allowJWT: true,
34-
maxContentLength: 65_536, // 64KB is plenty for the batch metadata
34+
maxContentLength: 131_072, // 128KB is plenty for the batch metadata
3535
authorization: {
3636
action: "batchTrigger",
3737
resource: () => ({

apps/webapp/app/runEngine/services/createBatch.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ export class CreateBatchService extends WithRunEngine {
3737
private readonly validator: DefaultTriggerTaskValidator;
3838

3939
constructor(protected readonly _prisma: PrismaClientOrTransaction = prisma) {
40-
super({ prisma });
40+
super({ prisma: _prisma });
4141

4242
this.queueConcern = new DefaultQueueManager(this._prisma, this._engine);
4343
this.validator = new DefaultTriggerTaskValidator();

apps/webapp/app/v3/runEngine.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ function createRunEngine() {
176176
consumerIntervalMs: env.BATCH_QUEUE_CONSUMER_INTERVAL_MS,
177177
// Default processing concurrency when no specific limit is set
178178
// This is overridden per-batch based on the plan type at batch creation
179-
defaultConcurrency: env.BATCH_CONCURRENCY_PAID, // Use paid plan default as baseline
179+
defaultConcurrency: env.BATCH_CONCURRENCY_DEFAULT_CONCURRENCY,
180180
// Optional global rate limiter - limits max items/sec processed across all consumers
181181
globalRateLimiter: env.BATCH_QUEUE_GLOBAL_RATE_LIMIT
182182
? createBatchGlobalRateLimiter(env.BATCH_QUEUE_GLOBAL_RATE_LIMIT)

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,8 @@ export function setupBatchQueueCallbacks() {
796796
batchId,
797797
error: error instanceof Error ? error.message : String(error),
798798
});
799+
// Re-throw to preserve Redis data for retry (BatchQueue expects errors to propagate)
800+
throw error;
799801
}
800802
});
801803

internal-packages/run-engine/src/batch-queue/tests/index.test.ts

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -569,4 +569,93 @@ describe("BatchQueue", () => {
569569
}
570570
});
571571
});
572+
573+
describe("completion callback error handling", () => {
574+
redisTest(
575+
"should preserve Redis data when completion callback throws an error",
576+
async ({ redisContainer }) => {
577+
const queue = createBatchQueue(redisContainer, { startConsumers: true });
578+
let callbackCallCount = 0;
579+
let lastCompletionResult: CompleteBatchResult | null = null;
580+
581+
try {
582+
queue.onProcessItem(async ({ itemIndex }) => {
583+
return { success: true, runId: `run_${itemIndex}` };
584+
});
585+
586+
queue.onBatchComplete(async (result) => {
587+
callbackCallCount++;
588+
lastCompletionResult = result;
589+
// Simulate database failure on first attempt
590+
if (callbackCallCount === 1) {
591+
throw new Error("Database temporarily unavailable");
592+
}
593+
});
594+
595+
await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
596+
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));
597+
598+
// Wait for completion callback to be called (and fail)
599+
await vi.waitFor(
600+
() => {
601+
expect(callbackCallCount).toBeGreaterThanOrEqual(1);
602+
},
603+
{ timeout: 5000 }
604+
);
605+
606+
// Redis data should still exist after callback failure
607+
const meta = await queue.getBatchMeta("batch1");
608+
expect(meta).not.toBeNull();
609+
expect(meta?.batchId).toBe("batch1");
610+
611+
// Verify the completion result was correct
612+
expect(lastCompletionResult).not.toBeNull();
613+
expect(lastCompletionResult!.batchId).toBe("batch1");
614+
expect(lastCompletionResult!.successfulRunCount).toBe(3);
615+
expect(lastCompletionResult!.runIds).toHaveLength(3);
616+
} finally {
617+
await queue.close();
618+
}
619+
}
620+
);
621+
622+
redisTest(
623+
"should cleanup Redis data when completion callback succeeds",
624+
async ({ redisContainer }) => {
625+
const queue = createBatchQueue(redisContainer, { startConsumers: true });
626+
let completionCalled = false;
627+
628+
try {
629+
queue.onProcessItem(async ({ itemIndex }) => {
630+
return { success: true, runId: `run_${itemIndex}` };
631+
});
632+
633+
queue.onBatchComplete(async () => {
634+
completionCalled = true;
635+
// Callback succeeds - no error thrown
636+
});
637+
638+
await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
639+
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));
640+
641+
// Wait for completion
642+
await vi.waitFor(
643+
() => {
644+
expect(completionCalled).toBe(true);
645+
},
646+
{ timeout: 5000 }
647+
);
648+
649+
// Small delay to ensure cleanup has occurred
650+
await new Promise((resolve) => setTimeout(resolve, 100));
651+
652+
// Redis data should be cleaned up after successful callback
653+
const meta = await queue.getBatchMeta("batch1");
654+
expect(meta).toBeNull();
655+
} finally {
656+
await queue.close();
657+
}
658+
}
659+
);
660+
});
572661
});

0 commit comments

Comments
 (0)