Skip to content

Commit b89e5ab

Browse files
committed
Cancel run events which then propogate cancellation status to span ancestors
1 parent 7d17730 commit b89e5ab

File tree

7 files changed

+134
-109
lines changed

7 files changed

+134
-109
lines changed

apps/webapp/app/presenters/v3/SpanPresenter.server.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,7 @@ export class SpanPresenter extends BasePresenter {
251251
engine: run.engine,
252252
region,
253253
workerQueue: run.workerQueue,
254+
traceId: run.traceId,
254255
spanId: run.spanId,
255256
isCached: !!span.originalRun,
256257
machinePreset: machine?.name,

apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,6 +772,14 @@ function RunBody({
772772
<Property.Label>Worker queue</Property.Label>
773773
<Property.Value>{run.workerQueue}</Property.Value>
774774
</Property.Item>
775+
<Property.Item>
776+
<Property.Label>Trace ID</Property.Label>
777+
<Property.Value>{run.traceId}</Property.Value>
778+
</Property.Item>
779+
<Property.Item>
780+
<Property.Label>Span ID</Property.Label>
781+
<Property.Value>{run.spanId}</Property.Value>
782+
</Property.Item>
775783
</div>
776784
)}
777785
</Property.Table>

apps/webapp/app/v3/eventRepository.server.ts

Lines changed: 73 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,14 @@ import {
2121
unflattenAttributes,
2222
} from "@trigger.dev/core/v3";
2323
import { parseTraceparent, serializeTraceparent } from "@trigger.dev/core/v3/isomorphic";
24-
import { Prisma, TaskEvent, TaskEventKind, TaskEventStatus } from "@trigger.dev/database";
24+
import {
25+
Prisma,
26+
RuntimeEnvironmentType,
27+
TaskEvent,
28+
TaskEventKind,
29+
TaskEventStatus,
30+
TaskRun,
31+
} from "@trigger.dev/database";
2532
import { nanoid } from "nanoid";
2633
import { createHash } from "node:crypto";
2734
import { EventEmitter } from "node:stream";
@@ -350,18 +357,44 @@ export class EventRepository {
350357
return completedEvent;
351358
}
352359

353-
async cancelEvent(event: TaskEventRecord, cancelledAt: Date, reason: string) {
354-
if (!event.isPartial) {
355-
return;
356-
}
360+
async cancelRunEvent({
361+
reason,
362+
run,
363+
cancelledAt,
364+
projectRef,
365+
environmentType,
366+
organizationId,
367+
}: {
368+
reason: string;
369+
run: TaskRun;
370+
cancelledAt: Date;
371+
projectRef: string;
372+
organizationId: string;
373+
environmentType: RuntimeEnvironmentType;
374+
}) {
375+
const startTime = convertDateToNanoseconds(run.createdAt);
357376

358377
await this.insertImmediate({
359-
...omit(event, "id"),
378+
message: run.taskIdentifier,
379+
serviceName: "api server",
380+
serviceNamespace: "trigger.dev",
381+
level: "TRACE",
382+
kind: "SERVER",
383+
traceId: run.traceId,
384+
spanId: run.spanId,
385+
parentId: run.parentSpanId,
386+
runId: run.id,
387+
taskSlug: run.taskIdentifier,
388+
projectRef,
389+
projectId: run.projectId,
390+
environmentId: run.runtimeEnvironmentId,
391+
environmentType,
392+
organizationId,
360393
isPartial: false,
361-
isError: false,
394+
isError: true,
362395
isCancelled: true,
363396
status: "ERROR",
364-
links: event.links ?? [],
397+
runIsTest: run.isTest,
365398
events: [
366399
{
367400
name: "cancellation",
@@ -370,56 +403,18 @@ export class EventRepository {
370403
reason,
371404
},
372405
},
373-
...((event.events as any[]) ?? []),
374406
],
375-
duration: calculateDurationFromStart(event.startTime, cancelledAt),
376-
properties: event.properties as Attributes,
377-
metadata: event.metadata as Attributes,
378-
style: event.style as Attributes,
379-
output: event.output as Attributes,
380-
outputType: event.outputType,
381-
payload: event.payload as Attributes,
382-
payloadType: event.payloadType,
407+
startTime,
408+
properties: {},
409+
metadata: undefined,
410+
style: undefined,
411+
duration: calculateDurationFromStart(startTime, cancelledAt),
412+
output: undefined,
413+
payload: undefined,
414+
payloadType: undefined,
383415
});
384416
}
385417

386-
async cancelEvents(events: TaskEventRecord[], cancelledAt: Date, reason: string) {
387-
const eventsToCancel = events.filter((event) => event.isPartial);
388-
389-
if (eventsToCancel.length === 0) {
390-
return;
391-
}
392-
393-
await this.insertMany(
394-
eventsToCancel.map((event) => ({
395-
...omit(event, "id"),
396-
isPartial: false,
397-
isError: false,
398-
isCancelled: true,
399-
status: "ERROR",
400-
links: event.links ?? [],
401-
events: [
402-
{
403-
name: "cancellation",
404-
time: cancelledAt,
405-
properties: {
406-
reason,
407-
},
408-
},
409-
...((event.events as any[]) ?? []),
410-
],
411-
duration: calculateDurationFromStart(event.startTime, cancelledAt),
412-
properties: event.properties as Attributes,
413-
metadata: event.metadata as Attributes,
414-
style: event.style as Attributes,
415-
output: event.output as Attributes,
416-
outputType: event.outputType,
417-
payload: event.payload as Attributes,
418-
payloadType: event.payloadType,
419-
}))
420-
);
421-
}
422-
423418
async crashEvent({
424419
event,
425420
crashedAt,
@@ -567,7 +562,19 @@ export class EventRepository {
567562
}
568563

569564
if (event.isCancelled || !event.isPartial) {
570-
eventsBySpanId.set(event.spanId, event);
565+
// If we have a cancelled event and an existing partial event,
566+
// merge them: use cancelled event data but preserve style from the partial event
567+
if (event.isCancelled && existingEvent.isPartial && !existingEvent.isCancelled) {
568+
const mergedEvent: PreparedEvent = {
569+
...event, // Use cancelled event as base (has correct timing, status, events)
570+
// Preserve style from the original partial event
571+
style: existingEvent.style,
572+
};
573+
eventsBySpanId.set(event.spanId, mergedEvent);
574+
} else {
575+
// For non-cancelled events or other cases, use the standard replacement logic
576+
eventsBySpanId.set(event.spanId, event);
577+
}
571578
}
572579
}
573580

@@ -583,6 +590,9 @@ export class EventRepository {
583590
event.duration
584591
);
585592

593+
const isCancelled =
594+
event.isCancelled === true ? true : event.isPartial && ancestorCancelled;
595+
586596
const span = {
587597
id: event.spanId,
588598
parentId: event.parentId ?? undefined,
@@ -592,9 +602,9 @@ export class EventRepository {
592602
message: event.message,
593603
style: event.style,
594604
duration,
595-
isError: event.isError,
605+
isError: isCancelled ? false : event.isError,
596606
isPartial: ancestorCancelled ? false : event.isPartial,
597-
isCancelled: event.isCancelled === true ? true : event.isPartial && ancestorCancelled,
607+
isCancelled,
598608
isDebug: event.kind === TaskEventKind.LOG,
599609
startTime: getDateFromNanoseconds(event.startTime),
600610
level: event.level,
@@ -618,6 +628,8 @@ export class EventRepository {
618628
return;
619629
}
620630

631+
logger.debug("[getTraceSummary] result", { rootSpan, spans });
632+
621633
return {
622634
rootSpan,
623635
spans,
@@ -1141,11 +1153,6 @@ export class EventRepository {
11411153
): Promise<TResult> {
11421154
const propagatedContext = extractContextFromCarrier(options.context ?? {});
11431155

1144-
logger.debug("[otelContext]", {
1145-
propagatedContext,
1146-
options,
1147-
});
1148-
11491156
const start = process.hrtime.bigint();
11501157
const startTime = options.startTime ?? getNowInNanoseconds();
11511158

@@ -1895,6 +1902,10 @@ export function getDateFromNanoseconds(nanoseconds: bigint) {
18951902
return new Date(Number(nanoseconds) / 1_000_000);
18961903
}
18971904

1905+
function convertDateToNanoseconds(date: Date) {
1906+
return BigInt(date.getTime()) * BigInt(1_000_000);
1907+
}
1908+
18981909
function nanosecondsToMilliseconds(nanoseconds: bigint | number): number {
18991910
return Number(nanoseconds) / 1_000_000;
19001911
}

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -299,20 +299,42 @@ export function registerRunEngineEventBusHandlers() {
299299

300300
engine.eventBus.on("runCancelled", async ({ time, run }) => {
301301
try {
302-
const eventStore = getTaskEventStoreTableForRun(run);
303-
304-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
305-
eventStore,
306-
{
307-
runId: run.friendlyId,
302+
const taskRun = await $replica.taskRun.findFirst({
303+
where: {
304+
id: run.id,
308305
},
309-
run.createdAt,
310-
run.completedAt ?? undefined
311-
);
306+
include: {
307+
project: {
308+
select: {
309+
externalRef: true,
310+
},
311+
},
312+
runtimeEnvironment: {
313+
select: {
314+
type: true,
315+
organizationId: true,
316+
},
317+
},
318+
},
319+
});
320+
321+
if (!taskRun) {
322+
logger.error("[runCancelled] Task run not found", {
323+
runId: run.id,
324+
});
325+
return;
326+
}
312327

313328
const error = createJsonErrorObject(run.error);
314329

315-
await eventRepository.cancelEvents(inProgressEvents, time, error.message);
330+
await eventRepository.cancelRunEvent({
331+
reason: error.message,
332+
run: taskRun,
333+
cancelledAt: time,
334+
projectRef: taskRun.project.externalRef,
335+
organizationId: taskRun.runtimeEnvironment.organizationId,
336+
environmentType: taskRun.runtimeEnvironment.type,
337+
});
316338
} catch (error) {
317339
logger.error("[runCancelled] Failed to cancel event", {
318340
error: error instanceof Error ? error.message : error,

apps/webapp/app/v3/services/cancelAttempt.server.ts

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -72,25 +72,6 @@ export class CancelAttemptService extends BaseService {
7272
error: isCancellable ? { type: "STRING_ERROR", raw: reason } : undefined,
7373
});
7474
});
75-
76-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
77-
getTaskEventStoreTableForRun(taskRunAttempt.taskRun),
78-
{
79-
runId: taskRunAttempt.taskRun.friendlyId,
80-
},
81-
taskRunAttempt.taskRun.createdAt,
82-
taskRunAttempt.taskRun.completedAt ?? undefined
83-
);
84-
85-
logger.debug("Cancelling in-progress events", {
86-
inProgressEvents: inProgressEvents.map((event) => event.id),
87-
});
88-
89-
await Promise.all(
90-
inProgressEvents.map((event) => {
91-
return eventRepository.cancelEvent(event, cancelledAt, reason);
92-
})
93-
);
9475
});
9576
}
9677
}

apps/webapp/app/v3/services/cancelTaskRunV1.server.ts

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { type Prisma, type TaskRun } from "@trigger.dev/database";
1+
import { type Prisma } from "@trigger.dev/database";
22
import assertNever from "assert-never";
33
import { logger } from "~/services/logger.server";
44
import { eventRepository } from "../eventRepository.server";
@@ -8,9 +8,9 @@ import { CANCELLABLE_ATTEMPT_STATUSES, isCancellableRunStatus } from "../taskSta
88
import { BaseService } from "./baseService.server";
99
import { CancelAttemptService } from "./cancelAttempt.server";
1010
import { CancelTaskAttemptDependenciesService } from "./cancelTaskAttemptDependencies.server";
11-
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
12-
import { getTaskEventStoreTableForRun } from "../taskEventStore.server";
1311
import { CancelableTaskRun } from "./cancelTaskRun.server";
12+
import { FinalizeTaskRunService } from "./finalizeTaskRun.server";
13+
import { tryCatch } from "@trigger.dev/core/utils";
1414

1515
type ExtendedTaskRun = Prisma.TaskRunGetPayload<{
1616
include: {
@@ -92,6 +92,7 @@ export class CancelTaskRunServiceV1 extends BaseService {
9292
},
9393
runtimeEnvironment: true,
9494
lockedToVersion: true,
95+
project: true,
9596
},
9697
attemptStatus: "CANCELED",
9798
error: {
@@ -100,21 +101,23 @@ export class CancelTaskRunServiceV1 extends BaseService {
100101
},
101102
});
102103

103-
const inProgressEvents = await eventRepository.queryIncompleteEvents(
104-
getTaskEventStoreTableForRun(taskRun),
105-
{
106-
runId: taskRun.friendlyId,
107-
},
108-
taskRun.createdAt,
109-
taskRun.completedAt ?? undefined
104+
const [cancelRunEventError] = await tryCatch(
105+
eventRepository.cancelRunEvent({
106+
reason: opts.reason,
107+
run: cancelledTaskRun,
108+
cancelledAt: opts.cancelledAt,
109+
projectRef: cancelledTaskRun.project.externalRef,
110+
organizationId: cancelledTaskRun.runtimeEnvironment.organizationId,
111+
environmentType: cancelledTaskRun.runtimeEnvironment.type,
112+
})
110113
);
111114

112-
logger.debug("Cancelling in-progress events", {
113-
inProgressEvents: inProgressEvents.map((event) => event.id),
114-
eventCount: inProgressEvents.length,
115-
});
116-
117-
await eventRepository.cancelEvents(inProgressEvents, opts.cancelledAt, opts.reason);
115+
if (cancelRunEventError) {
116+
logger.error("[CancelTaskRunServiceV1] Failed to cancel run event", {
117+
error: cancelRunEventError,
118+
runId: cancelledTaskRun.id,
119+
});
120+
}
118121

119122
// Cancel any in progress attempts
120123
if (opts.cancelAttempts) {

packages/core/src/v3/workers/taskExecutor.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -352,8 +352,7 @@ export class TaskExecutor {
352352
? runTimelineMetrics.convertMetricsToSpanEvents()
353353
: undefined,
354354
},
355-
traceContext.extractContext(),
356-
signal
355+
traceContext.extractContext()
357356
);
358357

359358
return { result };

0 commit comments

Comments
 (0)