Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/webapp/app/db.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ function getClient() {
message: log.message,
target: log.target,
},
ignoreError: true,
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/models/runtimeEnvironment.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export async function findEnvironmentByApiKey(

if (environment.type === "PREVIEW") {
if (!branchName) {
logger.error("findEnvironmentByApiKey(): Preview env with no branch name provided", {
logger.warn("findEnvironmentByApiKey(): Preview env with no branch name provided", {
environmentId: environment.id,
});
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@ export const loader = createLoaderWorkerApiRoute(

return json(continuationResult);
} catch (error) {
logger.error("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error });
throw error;
logger.warn("Failed to suspend run", { runFriendlyId, snapshotFriendlyId, error });
if (error instanceof Error) {
throw json({ error: error.message }, { status: 422 });
}

throw json({ error: "Failed to continue run execution" }, { status: 422 });
}
}
);
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,15 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder {

return new Response(null, { status: 200 });
} catch (error) {
if (error instanceof Error) {
if ("code" in error && error.code === "ECONNRESET") {
logger.info("[RealtimeStreams][ingestData] Connection reset during ingestData:", {
error,
});
return new Response(null, { status: 500 });
}
}

logger.error("[RealtimeStreams][ingestData] Error in ingestData:", { error });

return new Response(null, { status: 500 });
Expand Down
3 changes: 3 additions & 0 deletions apps/webapp/app/v3/alertsWorker.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ function initializeWorker() {
retry: {
maxAttempts: 3,
},
logErrors: false,
},
"v3.performDeploymentAlerts": {
schema: z.object({
Expand All @@ -42,6 +43,7 @@ function initializeWorker() {
retry: {
maxAttempts: 3,
},
logErrors: false,
},
"v3.deliverAlert": {
schema: z.object({
Expand All @@ -51,6 +53,7 @@ function initializeWorker() {
retry: {
maxAttempts: 3,
},
logErrors: false,
},
},
concurrency: {
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/eventRepository.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1246,7 +1246,7 @@ export class EventRepository {
span.setAttribute("prisma_error_code", errorDetails.code);
}

logger.error("Failed to insert events, will attempt bisection", {
logger.info("Failed to insert events, will attempt bisection", {
error: errorDetails,
});

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/machinePresets.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export function machinePresetFromConfig(config: unknown): MachinePreset {
const parsedConfig = MachineConfig.safeParse(config);

if (!parsedConfig.success) {
logger.error("Failed to parse machine config", { config });
logger.info("Failed to parse machine config", { config });

return machinePresetFromName("small-1x");
}
Expand Down
3 changes: 2 additions & 1 deletion apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,8 @@ export class SharedQueueConsumer {
const worker = deployment?.worker;

if (!deployment || !worker) {
logger.error("No matching deployment found for task run", {
// This happens when a run is "WAITING_FOR_DEPLOY" and is expected
logger.info("No matching deployment found for task run", {
queueMessage: message.data,
messageId: message.messageId,
});
Expand Down
37 changes: 26 additions & 11 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { updateMetadataService } from "~/services/metadata/updateMetadataInstanc
import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server";
import { env } from "~/env.server";
import { getTaskEventStoreTableForRun } from "./taskEventStore.server";
import { MetadataTooLargeError } from "~/utils/packets";

export function registerRunEngineEventBusHandlers() {
engine.eventBus.on("runSucceeded", async ({ time, run }) => {
Expand Down Expand Up @@ -381,17 +382,31 @@ export function registerRunEngineEventBusHandlers() {
try {
await updateMetadataService.call(run.id, run.metadata, env);
} catch (e) {
logger.error("[runMetadataUpdated] Failed to update metadata", {
taskRun: run.id,
error:
e instanceof Error
? {
name: e.name,
message: e.message,
stack: e.stack,
}
: e,
});
if (e instanceof MetadataTooLargeError) {
logger.warn("[runMetadataUpdated] Failed to update metadata, too large", {
taskRun: run.id,
error:
e instanceof Error
? {
name: e.name,
message: e.message,
stack: e.stack,
}
: e,
});
} else {
logger.error("[runMetadataUpdated] Failed to update metadata", {
taskRun: run.id,
error:
e instanceof Error
? {
name: e.name,
message: e.message,
stack: e.stack,
}
: e,
});
}
}
});

Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/alerts/deliverAlert.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -926,7 +926,7 @@ export class DeliverAlertService extends BaseService {
});

if (!response.ok) {
logger.error("[DeliverAlert] Failed to send alert webhook", {
logger.info("[DeliverAlert] Failed to send alert webhook", {
status: response.status,
statusText: response.statusText,
url: webhook.url,
Expand Down
2 changes: 1 addition & 1 deletion apps/webapp/app/v3/services/cancelTaskRunV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class CancelTaskRunServiceV1 extends BaseService {

// Make sure the task run is in a cancellable state
if (!isCancellableRunStatus(taskRun.status)) {
logger.error("Task run is not in a cancellable state", {
logger.info("Task run is not in a cancellable state", {
runId: taskRun.id,
status: taskRun.status,
});
Expand Down
4 changes: 2 additions & 2 deletions apps/webapp/app/v3/services/createCheckpoint.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ export class CreateCheckpointService extends BaseService {
}

if (childRun.dependency?.resumedAt) {
logger.error("CreateCheckpointService: Child run already resumed", {
logger.info("CreateCheckpointService: Child run already resumed", {
childRun,
params,
});
Expand Down Expand Up @@ -168,7 +168,7 @@ export class CreateCheckpointService extends BaseService {
}

if (batchRun.resumedAt) {
logger.error("CreateCheckpointService: Batch already resumed", {
logger.info("CreateCheckpointService: Batch already resumed", {
batchRun,
params,
});
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/machinePresets.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export function getMachinePreset({
const parsedConfig = MachineConfig.safeParse(config);

if (!parsedConfig.success) {
logger.error("Failed to parse machine config", { config });
logger.info("Failed to parse machine config", { config });

return machinePresetFromName(machines, "small-1x");
}
Expand Down
14 changes: 12 additions & 2 deletions internal-packages/run-engine/src/engine/systems/dequeueSystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,21 @@ export class DequeueSystem {

if (isExecuting(snapshot.executionStatus)) {
this.$.logger.error(
`RunEngine.dequeueFromWorkerQueue(): Run is not in a valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}`
`RunEngine.dequeueFromWorkerQueue(): Run is not in a valid state to be dequeued`,
{
runId,
snapshotId: snapshot.id,
executionStatus: snapshot.executionStatus,
}
);
} else {
this.$.logger.warn(
`RunEngine.dequeueFromWorkerQueue(): Run is in an expected not valid state to be dequeued: ${runId}\n ${snapshot.id}:${snapshot.executionStatus}`
`RunEngine.dequeueFromWorkerQueue(): Run is in an expected not valid state to be dequeued`,
{
runId,
snapshotId: snapshot.id,
executionStatus: snapshot.executionStatus,
}
);
}

Expand Down
4 changes: 3 additions & 1 deletion packages/core/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ export class Logger {

this.#structuredLog(console.error, message, "error", ...args);

if (Logger.onError) {
const ignoreError = args.some((arg) => arg?.ignoreError);

if (Logger.onError && !ignoreError) {
Logger.onError(message, ...args);
}
}
Expand Down
44 changes: 29 additions & 15 deletions packages/redis-worker/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ export type WorkerCatalog = {
retry?: RetryOptions;
cron?: string;
jitterInMs?: number;
/** Defaults to true. If false, errors will not be logged. */
logErrors?: boolean;
};
};

Expand Down Expand Up @@ -541,12 +543,12 @@ class Worker<TCatalog extends WorkerCatalog> {
const catalogItem = this.options.catalog[job as any];
const handler = this.jobs[job as any];
if (!handler) {
this.logger.error(`No handler found for job type: ${job}`);
this.logger.error(`Worker no handler found for job type: ${job}`);
return;
}

if (!catalogItem) {
this.logger.error(`No catalog item found for job type: ${job}`);
this.logger.error(`Worker no catalog item found for job type: ${job}`);
return;
}

Expand Down Expand Up @@ -590,15 +592,25 @@ class Worker<TCatalog extends WorkerCatalog> {
}
).catch(async (error) => {
const errorMessage = error instanceof Error ? error.message : String(error);
this.logger.error(`Error processing item:`, {

const shouldLogError = catalogItem.logErrors ?? true;

const logAttributes = {
name: this.options.name,
id,
job,
item,
visibilityTimeoutMs,
error,
errorMessage,
});
};

if (shouldLogError) {
this.logger.error(`Worker error processing item`, logAttributes);
} else {
this.logger.info(`Worker failed to process item`, logAttributes);
}

// Attempt requeue logic.
try {
const newAttempt = attempt + 1;
Expand All @@ -609,15 +621,17 @@ class Worker<TCatalog extends WorkerCatalog> {
const retryDelay = calculateNextRetryDelay(retrySettings, newAttempt);

if (!retryDelay) {
this.logger.error(`Item ${id} reached max attempts. Moving to DLQ.`, {
name: this.options.name,
id,
job,
item,
visibilityTimeoutMs,
attempt: newAttempt,
errorMessage,
});
if (shouldLogError) {
this.logger.error(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
} else {
this.logger.info(`Worker item reached max attempts. Moving to DLQ.`, {
...logAttributes,
attempt: newAttempt,
});
}

await this.queue.moveToDeadLetterQueue(id, errorMessage);

Expand All @@ -629,7 +643,7 @@ class Worker<TCatalog extends WorkerCatalog> {
}

const retryDate = new Date(Date.now() + retryDelay);
this.logger.info(`Requeuing failed item ${id} with delay`, {
this.logger.info(`Worker requeuing failed item with delay`, {
name: this.options.name,
id,
job,
Expand All @@ -649,7 +663,7 @@ class Worker<TCatalog extends WorkerCatalog> {
});
} catch (requeueError) {
this.logger.error(
`Failed to requeue item ${id}. It will be retried after the visibility timeout.`,
`Worker failed to requeue item. It will be retried after the visibility timeout.`,
{
name: this.options.name,
id,
Expand Down
Loading