Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions apps/webapp/app/env.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const EnvironmentSchema = z.object({
SMTP_PORT: z.coerce.number().optional(),
SMTP_SECURE: z.coerce.boolean().optional(),
SMTP_USER: z.string().optional(),
SMTP_PASSWORD: z.string().optional(),
SMTP_PASSWORD: z.string().optional(),

PLAIN_API_KEY: z.string().optional(),
RUNTIME_PLATFORM: z.enum(["docker-compose", "ecs", "local"]).default("local"),
Expand Down Expand Up @@ -210,8 +210,7 @@ const EnvironmentSchema = z.object({
ALERT_SMTP_PORT: z.coerce.number().optional(),
ALERT_SMTP_SECURE: z.coerce.boolean().optional(),
ALERT_SMTP_USER: z.string().optional(),
ALERT_SMTP_PASSWORD: z.string().optional(),

ALERT_SMTP_PASSWORD: z.string().optional(),

MAX_SEQUENTIAL_INDEX_FAILURE_COUNT: z.coerce.number().default(96),

Expand Down Expand Up @@ -261,6 +260,8 @@ const EnvironmentSchema = z.object({

REALTIME_STREAM_VERSION: z.enum(["v1", "v2"]).default("v1"),
BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS: z.coerce.number().int().default(1000),
BATCH_METADATA_OPERATIONS_FLUSH_ENABLED: z.string().default("1"),
BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED: z.string().default("1"),
});

export type Environment = z.infer<typeof EnvironmentSchema>;
Expand Down
38 changes: 29 additions & 9 deletions apps/webapp/app/services/metadata/updateMetadata.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ export class UpdateMetadataService extends BaseService {

constructor(
protected readonly _prisma: PrismaClientOrTransaction = prisma,
private readonly flushIntervalMs: number = 5000
private readonly flushIntervalMs: number = 5000,
private readonly flushEnabled: boolean = true,
private readonly flushLoggingEnabled: boolean = true
) {
super();

Expand All @@ -39,6 +41,14 @@ export class UpdateMetadataService extends BaseService {

// Start a loop that periodically flushes buffered operations
private _startFlushing() {
if (!this.flushEnabled) {
logger.info("[UpdateMetadataService] 🚽 Flushing disabled");

return;
}

logger.info("[UpdateMetadataService] 🚽 Flushing started");

// Create a program that sleeps, then processes buffered ops
const program = Effect.gen(this, function* (_) {
while (true) {
Expand All @@ -50,9 +60,11 @@ export class UpdateMetadataService extends BaseService {
this._bufferedOperations.clear();

yield* Effect.sync(() => {
logger.debug(`[UpdateMetadataService] Flushing operations`, {
operations: Object.fromEntries(currentOperations),
});
if (this.flushLoggingEnabled) {
logger.debug(`[UpdateMetadataService] Flushing operations`, {
operations: Object.fromEntries(currentOperations),
});
}
});

// If we have operations, process them
Expand Down Expand Up @@ -87,10 +99,12 @@ export class UpdateMetadataService extends BaseService {
}

yield* Effect.sync(() => {
logger.debug(`[UpdateMetadataService] Processing operations for run`, {
runId,
operationsCount: processedOps.length,
});
if (this.flushLoggingEnabled) {
logger.debug(`[UpdateMetadataService] Processing operations for run`, {
runId,
operationsCount: processedOps.length,
});
}
});

// Update run with retry
Expand Down Expand Up @@ -410,5 +424,11 @@ export class UpdateMetadataService extends BaseService {

export const updateMetadataService = singleton(
"update-metadata-service",
() => new UpdateMetadataService(prisma, env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS)
() =>
new UpdateMetadataService(
prisma,
env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS,
env.BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1",
env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1"
)
);
Loading