Skip to content

Commit 5595160

Browse files
committed
implement more
1 parent bf97d4b commit 5595160

File tree

3 files changed

+75
-34
lines changed

3 files changed

+75
-34
lines changed

apps/webapp/app/v3/eventRepository/clickhouseEventRepository.server.ts

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import {
2828
generateTraceId,
2929
parseEventsField,
3030
convertDateToNanoseconds,
31+
createExceptionPropertiesFromError,
3132
} from "./common.server";
3233
import { serializeTraceparent } from "@trigger.dev/core/v3/isomorphic";
3334
import {
@@ -46,6 +47,8 @@ import {
4647
import { unflattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes";
4748
import { TaskEventLevel } from "@trigger.dev/database";
4849
import { originalRunIdCache } from "./originalRunIdCache.server";
50+
import { SemanticInternalAttributes } from "@trigger.dev/core/v3/semanticInternalAttributes";
51+
import { createJsonErrorObject } from "@trigger.dev/core/v3/errors";
4952

5053
export type ClickhouseEventRepositoryConfig = {
5154
clickhouse: ClickHouse;
@@ -500,9 +503,46 @@ export class ClickhouseEventRepository implements IEventRepository {
500503
expires_at: convertDateToClickhouseDateTime(new Date(Date.now() + 30 * 24 * 60 * 60 * 1000)),
501504
};
502505

503-
// TODO: Handle failedWithError by creating a SPAN_EVENT
506+
const originalRunId =
507+
options.attributes.properties?.[SemanticInternalAttributes.ORIGINAL_RUN_ID];
504508

505-
this._flushScheduler.addToBatch([event]);
509+
if (typeof originalRunId === "string") {
510+
await originalRunIdCache.set(traceId, spanId, originalRunId);
511+
}
512+
513+
const events = [event];
514+
515+
if (failedWithError) {
516+
const error = createJsonErrorObject(failedWithError);
517+
518+
events.push({
519+
environment_id: options.environment.id,
520+
organization_id: options.environment.organizationId,
521+
project_id: options.environment.projectId,
522+
task_identifier: options.taskSlug,
523+
run_id: options.attributes.runId,
524+
start_time: startTime.toString(),
525+
duration: String(options.incomplete ? 0 : duration),
526+
trace_id: traceId,
527+
span_id: spanId,
528+
parent_span_id: parentId ?? "",
529+
message: "exception",
530+
kind: "SPAN_EVENT",
531+
status: "ERROR",
532+
attributes: {
533+
error,
534+
},
535+
metadata: JSON.stringify({
536+
exception: createExceptionPropertiesFromError(failedWithError),
537+
}),
538+
// TODO: make sure configurable and by org
539+
expires_at: convertDateToClickhouseDateTime(
540+
new Date(Date.now() + 30 * 24 * 60 * 60 * 1000)
541+
),
542+
});
543+
}
544+
545+
this._flushScheduler.addToBatch(events);
506546

507547
return result;
508548
}

apps/webapp/app/v3/eventRepository/common.server.ts

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { Attributes } from "@opentelemetry/api";
22
import { RandomIdGenerator } from "@opentelemetry/sdk-trace-base";
33
import { parseTraceparent } from "@trigger.dev/core/v3/isomorphic";
4-
import { SpanEvents } from "@trigger.dev/core/v3/schemas";
4+
import { ExceptionEventProperties, SpanEvents, TaskRunError } from "@trigger.dev/core/v3/schemas";
55
import { unflattenAttributes } from "@trigger.dev/core/v3/utils/flattenAttributes";
66
import { createHash } from "node:crypto";
77

@@ -92,3 +92,34 @@ export function parseEventsField(events: unknown): SpanEvents {
9292

9393
return unsafe as SpanEvents;
9494
}
95+
96+
export function createExceptionPropertiesFromError(error: TaskRunError): ExceptionEventProperties {
97+
switch (error.type) {
98+
case "BUILT_IN_ERROR": {
99+
return {
100+
type: error.name,
101+
message: error.message,
102+
stacktrace: error.stackTrace,
103+
};
104+
}
105+
case "CUSTOM_ERROR": {
106+
return {
107+
type: "Error",
108+
message: error.raw,
109+
};
110+
}
111+
case "INTERNAL_ERROR": {
112+
return {
113+
type: "Internal error",
114+
message: [error.code, error.message].filter(Boolean).join(": "),
115+
stacktrace: error.stackTrace,
116+
};
117+
}
118+
case "STRING_ERROR": {
119+
return {
120+
type: "Error",
121+
message: error.raw,
122+
};
123+
}
124+
}
125+
}

apps/webapp/app/v3/eventRepository/eventRepository.server.ts

Lines changed: 1 addition & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import { startSpan } from "../tracing.server";
3535
import {
3636
calculateDurationFromStart,
3737
convertDateToNanoseconds,
38+
createExceptionPropertiesFromError,
3839
extractContextFromCarrier,
3940
generateDeterministicSpanId,
4041
generateSpanId,
@@ -1590,37 +1591,6 @@ function initializeEventRepo() {
15901591
return repo;
15911592
}
15921593

1593-
export function createExceptionPropertiesFromError(error: TaskRunError): ExceptionEventProperties {
1594-
switch (error.type) {
1595-
case "BUILT_IN_ERROR": {
1596-
return {
1597-
type: error.name,
1598-
message: error.message,
1599-
stacktrace: error.stackTrace,
1600-
};
1601-
}
1602-
case "CUSTOM_ERROR": {
1603-
return {
1604-
type: "Error",
1605-
message: error.raw,
1606-
};
1607-
}
1608-
case "INTERNAL_ERROR": {
1609-
return {
1610-
type: "Internal error",
1611-
message: [error.code, error.message].filter(Boolean).join(": "),
1612-
stacktrace: error.stackTrace,
1613-
};
1614-
}
1615-
case "STRING_ERROR": {
1616-
return {
1617-
type: "Error",
1618-
message: error.raw,
1619-
};
1620-
}
1621-
}
1622-
}
1623-
16241594
/**
16251595
* Filters out partial events from a batch of creatable events, excluding those that have a corresponding full event.
16261596
* @param batch - The batch of creatable events to filter.

0 commit comments

Comments
 (0)