Skip to content

Commit a34f03f

Browse files
committed
Improve dynamic flush scheduler for otel data
The changes introduce a more flexible and adaptive dynamic flush scheduler to address production issues where the system wasn't flushing data fast enough, causing memory growth and crashes. This issue arises from the existing scheduler handling only a single flush at a time, limiting concurrency and failing to cope with the influx of logs. - Added configuration options for setting minimum and maximum concurrency levels, maximum batch size, and memory pressure threshold. These parameters ensure that flush operations adjust dynamically based on workload and pressure. - Implemented `pLimit` to facilitate concurrent flush operations, with adjustments made according to batch queue length and memory pressure. - Metrics reporting improvements were added to monitor the dynamic behavior of the flush scheduler, aiding in identifying performance issues and optimizing the operation accordingly.
1 parent 0d136a3 commit a34f03f

File tree

3 files changed

+272
-19
lines changed

3 files changed

+272
-19
lines changed

apps/webapp/app/env.server.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,10 @@ const EnvironmentSchema = z.object({
252252
EVENTS_BATCH_SIZE: z.coerce.number().int().default(100),
253253
EVENTS_BATCH_INTERVAL: z.coerce.number().int().default(1000),
254254
EVENTS_DEFAULT_LOG_RETENTION: z.coerce.number().int().default(7),
255+
EVENTS_MIN_CONCURRENCY: z.coerce.number().int().default(1),
256+
EVENTS_MAX_CONCURRENCY: z.coerce.number().int().default(10),
257+
EVENTS_MAX_BATCH_SIZE: z.coerce.number().int().default(500),
258+
EVENTS_MEMORY_PRESSURE_THRESHOLD: z.coerce.number().int().default(2000),
255259
SHARED_QUEUE_CONSUMER_POOL_SIZE: z.coerce.number().int().default(10),
256260
SHARED_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(100),
257261
SHARED_QUEUE_CONSUMER_NEXT_TICK_INTERVAL_MS: z.coerce.number().int().default(100),
Lines changed: 211 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,84 @@
11
import { nanoid } from "nanoid";
2+
import pLimit from "p-limit";
3+
import { logger } from "~/services/logger.server";
24

35
export type DynamicFlushSchedulerConfig<T> = {
46
batchSize: number;
57
flushInterval: number;
68
callback: (flushId: string, batch: T[]) => Promise<void>;
9+
// New configuration options
10+
minConcurrency?: number;
11+
maxConcurrency?: number;
12+
maxBatchSize?: number;
13+
memoryPressureThreshold?: number; // Number of items that triggers increased concurrency
714
};
815

916
export class DynamicFlushScheduler<T> {
10-
private batchQueue: T[][]; // Adjust the type according to your data structure
11-
private currentBatch: T[]; // Adjust the type according to your data structure
17+
private batchQueue: T[][];
18+
private currentBatch: T[];
1219
private readonly BATCH_SIZE: number;
1320
private readonly FLUSH_INTERVAL: number;
1421
private flushTimer: NodeJS.Timeout | null;
1522
private readonly callback: (flushId: string, batch: T[]) => Promise<void>;
23+
24+
// New properties for dynamic scaling
25+
private readonly minConcurrency: number;
26+
private readonly maxConcurrency: number;
27+
private readonly maxBatchSize: number;
28+
private readonly memoryPressureThreshold: number;
29+
private limiter: ReturnType<typeof pLimit>;
30+
private currentBatchSize: number;
31+
private totalQueuedItems: number = 0;
32+
private consecutiveFlushFailures: number = 0;
33+
private lastFlushTime: number = Date.now();
34+
private metrics = {
35+
flushedBatches: 0,
36+
failedBatches: 0,
37+
totalItemsFlushed: 0,
38+
};
1639

1740
constructor(config: DynamicFlushSchedulerConfig<T>) {
1841
this.batchQueue = [];
1942
this.currentBatch = [];
2043
this.BATCH_SIZE = config.batchSize;
44+
this.currentBatchSize = config.batchSize;
2145
this.FLUSH_INTERVAL = config.flushInterval;
2246
this.callback = config.callback;
2347
this.flushTimer = null;
48+
49+
// Initialize dynamic scaling parameters
50+
this.minConcurrency = config.minConcurrency ?? 1;
51+
this.maxConcurrency = config.maxConcurrency ?? 10;
52+
this.maxBatchSize = config.maxBatchSize ?? config.batchSize * 5;
53+
this.memoryPressureThreshold = config.memoryPressureThreshold ?? config.batchSize * 20;
54+
55+
// Start with minimum concurrency
56+
this.limiter = pLimit(this.minConcurrency);
57+
2458
this.startFlushTimer();
59+
this.startMetricsReporter();
2560
}
2661

2762
addToBatch(items: T[]): void {
2863
this.currentBatch.push(...items);
64+
this.totalQueuedItems += items.length;
2965

30-
if (this.currentBatch.length >= this.BATCH_SIZE) {
31-
this.batchQueue.push(this.currentBatch);
32-
this.currentBatch = [];
33-
this.flushNextBatch();
34-
this.resetFlushTimer();
66+
// Check if we need to create a batch
67+
if (this.currentBatch.length >= this.currentBatchSize) {
68+
this.createBatch();
3569
}
70+
71+
// Adjust concurrency based on queue pressure
72+
this.adjustConcurrency();
73+
}
74+
75+
private createBatch(): void {
76+
if (this.currentBatch.length === 0) return;
77+
78+
this.batchQueue.push(this.currentBatch);
79+
this.currentBatch = [];
80+
this.flushBatches();
81+
this.resetFlushTimer();
3682
}
3783

3884
private startFlushTimer(): void {
@@ -48,23 +94,169 @@ export class DynamicFlushScheduler<T> {
4894

4995
private checkAndFlush(): void {
5096
if (this.currentBatch.length > 0) {
51-
this.batchQueue.push(this.currentBatch);
52-
this.currentBatch = [];
97+
this.createBatch();
5398
}
54-
this.flushNextBatch();
99+
this.flushBatches();
55100
}
56101

57-
private async flushNextBatch(): Promise<void> {
58-
if (this.batchQueue.length === 0) return;
102+
private async flushBatches(): Promise<void> {
103+
const batchesToFlush: T[][] = [];
104+
105+
// Dequeue all available batches up to current concurrency limit
106+
while (this.batchQueue.length > 0 && batchesToFlush.length < this.limiter.concurrency) {
107+
const batch = this.batchQueue.shift();
108+
if (batch) {
109+
batchesToFlush.push(batch);
110+
}
111+
}
112+
113+
if (batchesToFlush.length === 0) return;
59114

60-
const batchToFlush = this.batchQueue.shift();
61-
try {
62-
await this.callback(nanoid(), batchToFlush!);
115+
// Schedule all batches for concurrent processing
116+
const flushPromises = batchesToFlush.map((batch) =>
117+
this.limiter(async () => {
118+
const flushId = nanoid();
119+
const itemCount = batch.length;
120+
121+
try {
122+
const startTime = Date.now();
123+
await this.callback(flushId, batch);
124+
125+
const duration = Date.now() - startTime;
126+
this.totalQueuedItems -= itemCount;
127+
this.consecutiveFlushFailures = 0;
128+
this.lastFlushTime = Date.now();
129+
this.metrics.flushedBatches++;
130+
this.metrics.totalItemsFlushed += itemCount;
131+
132+
logger.debug("Batch flushed successfully", {
133+
flushId,
134+
itemCount,
135+
duration,
136+
remainingQueueDepth: this.totalQueuedItems,
137+
activeConcurrency: this.limiter.activeCount,
138+
pendingConcurrency: this.limiter.pendingCount,
139+
});
140+
} catch (error) {
141+
this.consecutiveFlushFailures++;
142+
this.metrics.failedBatches++;
143+
144+
logger.error("Error flushing batch", {
145+
flushId,
146+
itemCount,
147+
error,
148+
consecutiveFailures: this.consecutiveFlushFailures,
149+
});
150+
151+
// Re-queue the batch at the front if it fails
152+
this.batchQueue.unshift(batch);
153+
this.totalQueuedItems += itemCount;
154+
155+
// Back off on failures
156+
if (this.consecutiveFlushFailures > 3) {
157+
this.adjustConcurrency(true);
158+
}
159+
}
160+
})
161+
);
162+
163+
// Don't await here - let them run concurrently
164+
Promise.allSettled(flushPromises).then(() => {
165+
// After flush completes, check if we need to flush more
63166
if (this.batchQueue.length > 0) {
64-
this.flushNextBatch();
167+
this.flushBatches();
65168
}
66-
} catch (error) {
67-
console.error("Error inserting batch:", error);
169+
});
170+
}
171+
172+
private adjustConcurrency(backOff: boolean = false): void {
173+
const currentConcurrency = this.limiter.concurrency;
174+
let newConcurrency = currentConcurrency;
175+
176+
if (backOff) {
177+
// Reduce concurrency on failures
178+
newConcurrency = Math.max(this.minConcurrency, Math.floor(currentConcurrency * 0.75));
179+
} else {
180+
// Calculate pressure metrics
181+
const queuePressure = this.totalQueuedItems / this.memoryPressureThreshold;
182+
const timeSinceLastFlush = Date.now() - this.lastFlushTime;
183+
184+
if (queuePressure > 0.8 || timeSinceLastFlush > this.FLUSH_INTERVAL * 2) {
185+
// High pressure - increase concurrency
186+
newConcurrency = Math.min(this.maxConcurrency, currentConcurrency + 2);
187+
} else if (queuePressure < 0.2 && currentConcurrency > this.minConcurrency) {
188+
// Low pressure - decrease concurrency
189+
newConcurrency = Math.max(this.minConcurrency, currentConcurrency - 1);
190+
}
191+
}
192+
193+
// Adjust batch size based on pressure
194+
if (this.totalQueuedItems > this.memoryPressureThreshold) {
195+
this.currentBatchSize = Math.min(
196+
this.maxBatchSize,
197+
Math.floor(this.BATCH_SIZE * (1 + queuePressure))
198+
);
199+
} else {
200+
this.currentBatchSize = this.BATCH_SIZE;
201+
}
202+
203+
// Update concurrency if changed
204+
if (newConcurrency !== currentConcurrency) {
205+
this.limiter = pLimit(newConcurrency);
206+
207+
logger.info("Adjusted flush concurrency", {
208+
previousConcurrency: currentConcurrency,
209+
newConcurrency,
210+
queuePressure,
211+
totalQueuedItems: this.totalQueuedItems,
212+
currentBatchSize: this.currentBatchSize,
213+
});
214+
}
215+
}
216+
217+
private startMetricsReporter(): void {
218+
// Report metrics every 30 seconds
219+
setInterval(() => {
220+
logger.info("DynamicFlushScheduler metrics", {
221+
totalQueuedItems: this.totalQueuedItems,
222+
batchQueueLength: this.batchQueue.length,
223+
currentBatchLength: this.currentBatch.length,
224+
currentConcurrency: this.limiter.concurrency,
225+
activeConcurrent: this.limiter.activeCount,
226+
pendingConcurrent: this.limiter.pendingCount,
227+
currentBatchSize: this.currentBatchSize,
228+
metrics: this.metrics,
229+
});
230+
}, 30000);
231+
}
232+
233+
// Method to get current status
234+
getStatus() {
235+
return {
236+
queuedItems: this.totalQueuedItems,
237+
batchQueueLength: this.batchQueue.length,
238+
currentBatchSize: this.currentBatch.length,
239+
concurrency: this.limiter.concurrency,
240+
activeFlushes: this.limiter.activeCount,
241+
pendingFlushes: this.limiter.pendingCount,
242+
metrics: { ...this.metrics },
243+
};
244+
}
245+
246+
// Graceful shutdown
247+
async shutdown(): Promise<void> {
248+
if (this.flushTimer) {
249+
clearInterval(this.flushTimer);
250+
}
251+
252+
// Flush any remaining items
253+
if (this.currentBatch.length > 0) {
254+
this.createBatch();
255+
}
256+
257+
// Wait for all pending flushes to complete
258+
while (this.batchQueue.length > 0 || this.limiter.activeCount > 0) {
259+
await new Promise((resolve) => setTimeout(resolve, 100));
68260
}
69261
}
70-
}
262+
}

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,10 @@ export type EventRepoConfig = {
107107
retentionInDays: number;
108108
partitioningEnabled: boolean;
109109
tracer?: Tracer;
110+
minConcurrency?: number;
111+
maxConcurrency?: number;
112+
maxBatchSize?: number;
113+
memoryPressureThreshold?: number;
110114
};
111115

112116
export type QueryOptions = Prisma.TaskEventWhereInput;
@@ -199,6 +203,10 @@ export class EventRepository {
199203
return this._subscriberCount;
200204
}
201205

206+
get flushSchedulerStatus() {
207+
return this._flushScheduler.getStatus();
208+
}
209+
202210
constructor(
203211
db: PrismaClient = prisma,
204212
readReplica: PrismaReplicaClient = $replica,
@@ -208,6 +216,10 @@ export class EventRepository {
208216
batchSize: _config.batchSize,
209217
flushInterval: _config.batchInterval,
210218
callback: this.#flushBatch.bind(this),
219+
minConcurrency: _config.minConcurrency,
220+
maxConcurrency: _config.maxConcurrency,
221+
maxBatchSize: _config.maxBatchSize,
222+
memoryPressureThreshold: _config.memoryPressureThreshold,
211223
});
212224

213225
this._redisPublishClient = createRedisClient("trigger:eventRepoPublisher", this._config.redis);
@@ -1324,6 +1336,10 @@ function initializeEventRepo() {
13241336
batchInterval: env.EVENTS_BATCH_INTERVAL,
13251337
retentionInDays: env.EVENTS_DEFAULT_LOG_RETENTION,
13261338
partitioningEnabled: env.TASK_EVENT_PARTITIONING_ENABLED === "1",
1339+
minConcurrency: env.EVENTS_MIN_CONCURRENCY,
1340+
maxConcurrency: env.EVENTS_MAX_CONCURRENCY,
1341+
maxBatchSize: env.EVENTS_MAX_BATCH_SIZE,
1342+
memoryPressureThreshold: env.EVENTS_MEMORY_PRESSURE_THRESHOLD,
13271343
redis: {
13281344
port: env.PUBSUB_REDIS_PORT,
13291345
host: env.PUBSUB_REDIS_HOST,
@@ -1343,6 +1359,47 @@ function initializeEventRepo() {
13431359
registers: [metricsRegister],
13441360
});
13451361

1362+
// Add metrics for flush scheduler
1363+
new Gauge({
1364+
name: "event_flush_scheduler_queued_items",
1365+
help: "Total number of items queued in the flush scheduler",
1366+
collect() {
1367+
const status = repo.flushSchedulerStatus;
1368+
this.set(status.queuedItems);
1369+
},
1370+
registers: [metricsRegister],
1371+
});
1372+
1373+
new Gauge({
1374+
name: "event_flush_scheduler_batch_queue_length",
1375+
help: "Number of batches waiting to be flushed",
1376+
collect() {
1377+
const status = repo.flushSchedulerStatus;
1378+
this.set(status.batchQueueLength);
1379+
},
1380+
registers: [metricsRegister],
1381+
});
1382+
1383+
new Gauge({
1384+
name: "event_flush_scheduler_concurrency",
1385+
help: "Current concurrency level of the flush scheduler",
1386+
collect() {
1387+
const status = repo.flushSchedulerStatus;
1388+
this.set(status.concurrency);
1389+
},
1390+
registers: [metricsRegister],
1391+
});
1392+
1393+
new Gauge({
1394+
name: "event_flush_scheduler_active_flushes",
1395+
help: "Number of active flush operations",
1396+
collect() {
1397+
const status = repo.flushSchedulerStatus;
1398+
this.set(status.activeFlushes);
1399+
},
1400+
registers: [metricsRegister],
1401+
});
1402+
13461403
return repo;
13471404
}
13481405

0 commit comments

Comments
 (0)