diff --git a/apps/webapp/app/db.server.ts b/apps/webapp/app/db.server.ts index c99b2e2c43..8435182e63 100644 --- a/apps/webapp/app/db.server.ts +++ b/apps/webapp/app/db.server.ts @@ -201,6 +201,7 @@ function getClient() { message: log.message, target: log.target, }, + ignoreError: true, }); }); } diff --git a/apps/webapp/app/models/runtimeEnvironment.server.ts b/apps/webapp/app/models/runtimeEnvironment.server.ts index adde2db5ca..67119acd08 100644 --- a/apps/webapp/app/models/runtimeEnvironment.server.ts +++ b/apps/webapp/app/models/runtimeEnvironment.server.ts @@ -37,7 +37,7 @@ export async function findEnvironmentByApiKey( if (environment.type === "PREVIEW") { if (!branchName) { - logger.error("findEnvironmentByApiKey(): Preview env with no branch name provided", { + logger.warn("findEnvironmentByApiKey(): Preview env with no branch name provided", { environmentId: environment.id, }); return null; diff --git a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts index b35f26a10e..5a436b6575 100644 --- a/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts +++ b/apps/webapp/app/routes/engine.v1.worker-actions.runs.$runFriendlyId.snapshots.$snapshotFriendlyId.continue.ts @@ -27,8 +27,12 @@ export const loader = createLoaderWorkerApiRoute( return json(continuationResult); } catch (error) { - logger.error("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error }); - throw error; + logger.warn("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error }); + if (error instanceof Error) { + throw json({ error: error.message }, { status: 422 }); + } + + throw json({ error: "Failed to continue run execution" }, { status: 422 }); } } ); diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index 776b6179fa..0f2c3d011a 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -184,6 +184,15 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { return new Response(null, { status: 200 }); } catch (error) { + if (error instanceof Error) { + if ("code" in error && error.code === "ECONNRESET") { + logger.info("[RealtimeStreams][ingestData] Connection reset during ingestData:", { + error, + }); + return new Response(null, { status: 500 }); + } + } + logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error }); return new Response(null, { status: 500 }); diff --git a/apps/webapp/app/v3/alertsWorker.server.ts b/apps/webapp/app/v3/alertsWorker.server.ts index 3e1917ead1..46670887a7 100644 --- a/apps/webapp/app/v3/alertsWorker.server.ts +++ b/apps/webapp/app/v3/alertsWorker.server.ts @@ -33,6 +33,7 @@ function initializeWorker() { retry: { maxAttempts: 3, }, + logErrors: false, }, "v3.performDeploymentAlerts": { schema: z.object({ @@ -42,6 +43,7 @@ function initializeWorker() { retry: { maxAttempts: 3, }, + logErrors: false, }, "v3.deliverAlert": { schema: z.object({ @@ -51,6 +53,7 @@ function initializeWorker() { retry: { maxAttempts: 3, }, + logErrors: false, }, }, concurrency: { diff --git a/apps/webapp/app/v3/eventRepository.server.ts b/apps/webapp/app/v3/eventRepository.server.ts index 687fbe9e76..1e4350b4b3 100644 --- a/apps/webapp/app/v3/eventRepository.server.ts +++ b/apps/webapp/app/v3/eventRepository.server.ts @@ -1246,7 +1246,7 @@ export class EventRepository { span.setAttribute("prisma_error_code", errorDetails.code); } - logger.error("Failed to insert events, will attempt bisection", { + logger.info("Failed to insert events, will attempt bisection", { error: errorDetails, }); diff --git a/apps/webapp/app/v3/machinePresets.server.ts b/apps/webapp/app/v3/machinePresets.server.ts index 024cb9f114..ee01aace82 100644 --- a/apps/webapp/app/v3/machinePresets.server.ts +++ b/apps/webapp/app/v3/machinePresets.server.ts @@ -6,7 +6,7 @@ export function machinePresetFromConfig(config: unknown): MachinePreset { const parsedConfig = MachineConfig.safeParse(config); if (!parsedConfig.success) { - logger.error("Failed to parse machine config", { config }); + logger.info("Failed to parse machine config", { config }); return machinePresetFromName("small-1x"); } diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 075732544c..a075a17847 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -621,7 +621,8 @@ export class SharedQueueConsumer { const worker = deployment?.worker; if (!deployment || !worker) { - logger.error("No matching deployment found for task run", { + // This happens when a run is "WAITING_FOR_DEPLOY" and is expected + logger.info("No matching deployment found for task run", { queueMessage: message.data, messageId: message.messageId, }); diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index fffee59368..f40f4b0176 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -17,6 +17,7 @@ import { updateMetadataService } from "~/services/metadata/updateMetadataInstanc import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server"; import { env } from "~/env.server"; import { getTaskEventStoreTableForRun } from "./taskEventStore.server"; +import { MetadataTooLargeError } from "~/utils/packets"; export function registerRunEngineEventBusHandlers() { engine.eventBus.on("runSucceeded", async ({ time, run }) => { @@ -381,17 +382,31 @@ export function registerRunEngineEventBusHandlers() { try { await updateMetadataService.call(run.id, run.metadata, env); } catch (e) { - logger.error("[runMetadataUpdated] Failed to update metadata", { - taskRun: run.id, - error: - e instanceof Error - ? { - name: e.name, - message: e.message, - stack: e.stack, - } - : e, - }); + if (e instanceof MetadataTooLargeError) { + logger.warn("[runMetadataUpdated] Failed to update metadata, too large", { + taskRun: run.id, + error: + e instanceof Error + ? { + name: e.name, + message: e.message, + stack: e.stack, + } + : e, + }); + } else { + logger.error("[runMetadataUpdated] Failed to update metadata", { + taskRun: run.id, + error: + e instanceof Error + ? { + name: e.name, + message: e.message, + stack: e.stack, + } + : e, + }); + } } }); diff --git a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts index 1d8e644876..a27d738094 100644 --- a/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts +++ b/apps/webapp/app/v3/services/alerts/deliverAlert.server.ts @@ -926,7 +926,7 @@ export class DeliverAlertService extends BaseService { }); if (!response.ok) { - logger.error("[DeliverAlert] Failed to send alert webhook", { + logger.info("[DeliverAlert] Failed to send alert webhook", { status: response.status, statusText: response.statusText, url: webhook.url, diff --git a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts index fa30d7fc7b..8c208265de 100644 --- a/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts +++ b/apps/webapp/app/v3/services/cancelTaskRunV1.server.ts @@ -43,7 +43,7 @@ export class CancelTaskRunServiceV1 extends BaseService { // Make sure the task run is in a cancellable state if (!isCancellableRunStatus(taskRun.status)) { - logger.error("Task run is not in a cancellable state", { + logger.info("Task run is not in a cancellable state", { runId: taskRun.id, status: taskRun.status, }); diff --git a/apps/webapp/app/v3/services/createCheckpoint.server.ts b/apps/webapp/app/v3/services/createCheckpoint.server.ts index ec87d5bacd..761f3a5185 100644 --- a/apps/webapp/app/v3/services/createCheckpoint.server.ts +++ b/apps/webapp/app/v3/services/createCheckpoint.server.ts @@ -132,7 +132,7 @@ export class CreateCheckpointService extends BaseService { } if (childRun.dependency?.resumedAt) { - logger.error("CreateCheckpointService: Child run already resumed", { + logger.info("CreateCheckpointService: Child run already resumed", { childRun, params, }); @@ -168,7 +168,7 @@ export class CreateCheckpointService extends BaseService { } if (batchRun.resumedAt) { - logger.error("CreateCheckpointService: Batch already resumed", { + logger.info("CreateCheckpointService: Batch already resumed", { batchRun, params, }); diff --git a/internal-packages/run-engine/src/engine/machinePresets.ts b/internal-packages/run-engine/src/engine/machinePresets.ts index 4c526942a7..a2edf5b1b1 100644 --- a/internal-packages/run-engine/src/engine/machinePresets.ts +++ b/internal-packages/run-engine/src/engine/machinePresets.ts @@ -26,7 +26,7 @@ export function getMachinePreset({ const parsedConfig = MachineConfig.safeParse(config); if (!parsedConfig.success) { - logger.error("Failed to parse machine config", { config }); + logger.info("Failed to parse machine config", { config }); return machinePresetFromName(machines, "small-1x"); } diff --git a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts index 84ef841775..3935802a5c 100644 --- a/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/dequeueSystem.ts @@ -135,11 +135,21 @@ export class DequeueSystem { if (isExecuting(snapshot.executionStatus)) { this.$.logger.error( - `RunEngine.dequeueFromWorkerQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}` + `RunEngine.dequeueFromWorkerQueue(): Run is not in a valid state to be dequeued`, + { + runId, + snapshotId: snapshot.id, + executionStatus: snapshot.executionStatus, + } ); } else { this.$.logger.warn( - `RunEngine.dequeueFromWorkerQueue(): Run is in an expected not valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}` + `RunEngine.dequeueFromWorkerQueue(): Run is in an expected not valid state to be dequeued`, + { + runId, + snapshotId: snapshot.id, + executionStatus: snapshot.executionStatus, + } ); } diff --git a/packages/core/src/logger.ts b/packages/core/src/logger.ts index 1e7a811bcb..3d5620a9d2 100644 --- a/packages/core/src/logger.ts +++ b/packages/core/src/logger.ts @@ -71,7 +71,9 @@ export class Logger { this.#structuredLog(console.error, message, "error", ...args); - if (Logger.onError) { + const ignoreError = args.some((arg) => arg?.ignoreError); + + if (Logger.onError && !ignoreError) { Logger.onError(message, ...args); } } diff --git a/packages/redis-worker/src/worker.ts b/packages/redis-worker/src/worker.ts index a5e77d3a35..92880d1f07 100644 --- a/packages/redis-worker/src/worker.ts +++ b/packages/redis-worker/src/worker.ts @@ -36,6 +36,8 @@ export type WorkerCatalog = { retry?: RetryOptions; cron?: string; jitterInMs?: number; + /** Defaults to true. If false, errors will not be logged. */ + logErrors?: boolean; }; }; @@ -541,12 +543,12 @@ class Worker { const catalogItem = this.options.catalog[job as any]; const handler = this.jobs[job as any]; if (!handler) { - this.logger.error(`No handler found for job type: ${job}`); + this.logger.error(`Worker no handler found for job type: ${job}`); return; } if (!catalogItem) { - this.logger.error(`No catalog item found for job type: ${job}`); + this.logger.error(`Worker no catalog item found for job type: ${job}`); return; } @@ -590,7 +592,10 @@ class Worker { } ).catch(async (error) => { const errorMessage = error instanceof Error ? error.message : String(error); - this.logger.error(`Error processing item:`, { + + const shouldLogError = catalogItem.logErrors ?? true; + + const logAttributes = { name: this.options.name, id, job, @@ -598,7 +603,14 @@ class Worker { visibilityTimeoutMs, error, errorMessage, - }); + }; + + if (shouldLogError) { + this.logger.error(`Worker error processing item`, logAttributes); + } else { + this.logger.info(`Worker failed to process item`, logAttributes); + } + // Attempt requeue logic. try { const newAttempt = attempt + 1; @@ -609,15 +621,17 @@ class Worker { const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt); if (!retryDelay) { - this.logger.error(`Item ${id} reached max attempts. Moving to DLQ.`, { - name: this.options.name, - id, - job, - item, - visibilityTimeoutMs, - attempt: newAttempt, - errorMessage, - }); + if (shouldLogError) { + this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, { + ...logAttributes, + attempt: newAttempt, + }); + } else { + this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, { + ...logAttributes, + attempt: newAttempt, + }); + } await this.queue.moveToDeadLetterQueue(id, errorMessage); @@ -629,7 +643,7 @@ class Worker { } const retryDate = new Date(Date.now() + retryDelay); - this.logger.info(`Requeuing failed item ${id} with delay`, { + this.logger.info(`Worker requeuing failed item with delay`, { name: this.options.name, id, job, @@ -649,7 +663,7 @@ class Worker { }); } catch (requeueError) { this.logger.error( - `Failed to requeue item ${id}. It will be retried after the visibility timeout.`, + `Worker failed to requeue item. It will be retried after the visibility timeout.`, { name: this.options.name, id,