|
1 | 1 | import { Logger } from "@trigger.dev/core/logger"; |
| 2 | +import { tryCatch } from "@trigger.dev/core/utils"; |
2 | 3 | import { nanoid } from "nanoid"; |
3 | 4 | import pLimit from "p-limit"; |
4 | 5 | import { signalsEmitter } from "~/services/signals.server"; |
@@ -195,47 +196,62 @@ export class DynamicFlushScheduler<T> { |
195 | 196 | // Schedule all batches for concurrent processing |
196 | 197 | const flushPromises = batchesToFlush.map((batch) => |
197 | 198 | this.limiter(async () => { |
198 | | - const flushId = nanoid(); |
199 | 199 | const itemCount = batch.length; |
200 | 200 |
|
201 | | - try { |
202 | | - const startTime = Date.now(); |
203 | | - await this.callback(flushId, batch); |
204 | | - |
205 | | - const duration = Date.now() - startTime; |
206 | | - this.totalQueuedItems -= itemCount; |
207 | | - this.consecutiveFlushFailures = 0; |
208 | | - this.lastFlushTime = Date.now(); |
209 | | - this.metrics.flushedBatches++; |
210 | | - this.metrics.totalItemsFlushed += itemCount; |
211 | | - |
212 | | - this.logger.debug("Batch flushed successfully", { |
213 | | - flushId, |
214 | | - itemCount, |
215 | | - duration, |
216 | | - remainingQueueDepth: this.totalQueuedItems, |
217 | | - activeConcurrency: this.limiter.activeCount, |
218 | | - pendingConcurrency: this.limiter.pendingCount, |
219 | | - }); |
220 | | - } catch (error) { |
221 | | - this.consecutiveFlushFailures++; |
222 | | - this.metrics.failedBatches++; |
| 201 | + const self = this; |
| 202 | + |
| 203 | + async function tryFlush(flushId: string, batchToFlush: T[], attempt: number = 1) { |
| 204 | + try { |
| 205 | + const startTime = Date.now(); |
| 206 | + await self.callback(flushId, batchToFlush); |
| 207 | + |
| 208 | + const duration = Date.now() - startTime; |
| 209 | + self.totalQueuedItems -= itemCount; |
| 210 | + self.consecutiveFlushFailures = 0; |
| 211 | + self.lastFlushTime = Date.now(); |
| 212 | + self.metrics.flushedBatches++; |
| 213 | + self.metrics.totalItemsFlushed += itemCount; |
| 214 | + |
| 215 | + self.logger.debug("Batch flushed successfully", { |
| 216 | + flushId, |
| 217 | + itemCount, |
| 218 | + duration, |
| 219 | + remainingQueueDepth: self.totalQueuedItems, |
| 220 | + activeConcurrency: self.limiter.activeCount, |
| 221 | + pendingConcurrency: self.limiter.pendingCount, |
| 222 | + }); |
| 223 | + } catch (error) { |
| 224 | + self.consecutiveFlushFailures++; |
| 225 | + self.metrics.failedBatches++; |
| 226 | + |
| 227 | + self.logger.error("Error attempting to flush batch", { |
| 228 | + flushId, |
| 229 | + itemCount, |
| 230 | + error, |
| 231 | + consecutiveFailures: self.consecutiveFlushFailures, |
| 232 | + attempt, |
| 233 | + }); |
| 234 | + |
| 235 | + // Back off on failures |
| 236 | + if (self.consecutiveFlushFailures > 5) { |
| 237 | + self.adjustConcurrency(true); |
| 238 | + } |
| 239 | + |
| 240 | + if (attempt <= 3) { |
| 241 | + await new Promise((resolve) => setTimeout(resolve, 500)); |
| 242 | + return await tryFlush(flushId, batchToFlush, attempt + 1); |
| 243 | + } else { |
| 244 | + throw error; |
| 245 | + } |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + const [flushError] = await tryCatch(tryFlush(nanoid(), batch)); |
223 | 250 |
|
| 251 | + if (flushError) { |
224 | 252 | this.logger.error("Error flushing batch", { |
225 | | - flushId, |
226 | | - itemCount, |
227 | | - error, |
228 | | - consecutiveFailures: this.consecutiveFlushFailures, |
| 253 | + error: flushError, |
229 | 254 | }); |
230 | | - |
231 | | - // Re-queue the batch at the front if it fails |
232 | | - this.batchQueue.unshift(batch); |
233 | | - this.totalQueuedItems += itemCount; |
234 | | - |
235 | | - // Back off on failures |
236 | | - if (this.consecutiveFlushFailures > 3) { |
237 | | - this.adjustConcurrency(true); |
238 | | - } |
239 | 255 | } |
240 | 256 | }) |
241 | 257 | ); |
|
0 commit comments