Skip to content

Commit 3cfa5c9

Browse files
committed
fixing metadata WIP
1 parent 4b9f330 commit 3cfa5c9

File tree

10 files changed

+630
-17
lines changed

10 files changed

+630
-17
lines changed

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ export class RunEngineTriggerTaskService {
4646
private readonly engine: RunEngine;
4747
private readonly tracer: Tracer;
4848
private readonly traceEventConcern: TraceEventConcern;
49+
private readonly metadataMaximumSize: number;
4950

5051
constructor(opts: {
5152
prisma: PrismaClientOrTransaction;
@@ -57,6 +58,7 @@ export class RunEngineTriggerTaskService {
5758
runNumberIncrementer: RunNumberIncrementer;
5859
traceEventConcern: TraceEventConcern;
5960
tracer: Tracer;
61+
metadataMaximumSize: number;
6062
}) {
6163
this.prisma = opts.prisma;
6264
this.engine = opts.engine;
@@ -67,6 +69,7 @@ export class RunEngineTriggerTaskService {
6769
this.runNumberIncrementer = opts.runNumberIncrementer;
6870
this.tracer = opts.tracer;
6971
this.traceEventConcern = opts.traceEventConcern;
72+
this.metadataMaximumSize = opts.metadataMaximumSize;
7073
}
7174

7275
public async call({
@@ -188,7 +191,8 @@ export class RunEngineTriggerTaskService {
188191
const metadataPacket = body.options?.metadata
189192
? handleMetadataPacket(
190193
body.options?.metadata,
191-
body.options?.metadataType ?? "application/json"
194+
body.options?.metadataType ?? "application/json",
195+
this.metadataMaximumSize
192196
)
193197
: undefined;
194198

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import { singleton } from "~/utils/singleton";
2+
import { env } from "~/env.server";
3+
import { UpdateMetadataService } from "./updateMetadata.server";
4+
import { prisma } from "~/db.server";
5+
6+
export const updateMetadataService = singleton(
7+
"update-metadata-service",
8+
() =>
9+
new UpdateMetadataService({
10+
prisma,
11+
flushIntervalMs: env.BATCH_METADATA_OPERATIONS_FLUSH_INTERVAL_MS,
12+
flushEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_ENABLED === "1",
13+
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
14+
maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,
15+
logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info",
16+
})
17+
);

apps/webapp/app/utils/packets.ts

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { IOPacket } from "@trigger.dev/core/v3/utils/ioSerialization";
2-
import { env } from "~/env.server";
3-
import { ServiceValidationError } from "~/v3/services/baseService.server";
2+
import { ServiceValidationError } from "~/v3/services/common.server";
43

54
export class MetadataTooLargeError extends ServiceValidationError {
65
constructor(message: string) {
@@ -9,7 +8,13 @@ export class MetadataTooLargeError extends ServiceValidationError {
98
}
109
}
1110

12-
export function handleMetadataPacket(metadata: any, metadataType: string): IOPacket | undefined {
11+
export function handleMetadataPacket(
12+
metadata: any,
13+
metadataType: string,
14+
maximumSize: number
15+
): IOPacket | undefined {
16+
console.log("handleMetadataPacket.maximumSize", maximumSize);
17+
1318
let metadataPacket: IOPacket | undefined = undefined;
1419

1520
if (typeof metadata === "string") {
@@ -26,10 +31,10 @@ export function handleMetadataPacket(metadata: any, metadataType: string): IOPac
2631

2732
const byteLength = Buffer.byteLength(metadataPacket.data, "utf8");
2833

29-
if (byteLength > env.TASK_RUN_METADATA_MAXIMUM_SIZE) {
30-
throw new MetadataTooLargeError(
31-
`Metadata exceeds maximum size of ${env.TASK_RUN_METADATA_MAXIMUM_SIZE} bytes`
32-
);
34+
console.log("handleMetadataPacket.byteLength", byteLength);
35+
36+
if (byteLength > maximumSize) {
37+
throw new MetadataTooLargeError(`Metadata exceeds maximum size of ${maximumSize} bytes`);
3338
}
3439

3540
return metadataPacket;

apps/webapp/app/v3/runEngineHandlers.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { roomFromFriendlyRunId, socketIo } from "./handleSocketIo.server";
1313
import { engine } from "./runEngine.server";
1414
import { PerformTaskRunAlertsService } from "./services/alerts/performTaskRunAlerts.server";
1515
import { RunId } from "@trigger.dev/core/v3/isomorphic";
16-
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
16+
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
1717
import { findEnvironmentFromRun } from "~/models/runtimeEnvironment.server";
1818
import { env } from "~/env.server";
1919
import { getTaskEventStoreTableForRun } from "./taskEventStore.server";

apps/webapp/app/v3/services/baseService.server.ts

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ import { $replica, PrismaClientOrTransaction, prisma } from "~/db.server";
33
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
44
import { attributesFromAuthenticatedEnv, tracer } from "../tracer.server";
55
import { engine, RunEngine } from "../runEngine.server";
6+
import { ServiceValidationError } from "./common.server";
7+
8+
export { ServiceValidationError };
69

710
export abstract class BaseService {
811
constructor(
@@ -54,10 +57,3 @@ export class WithRunEngine extends BaseService {
5457
this._engine = opts.engine ?? engine;
5558
}
5659
}
57-
58-
export class ServiceValidationError extends Error {
59-
constructor(message: string, public status?: number) {
60-
super(message);
61-
this.name = "ServiceValidationError";
62-
}
63-
}
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export class ServiceValidationError extends Error {
2+
constructor(message: string, public status?: number) {
3+
super(message);
4+
this.name = "ServiceValidationError";
5+
}
6+
}

apps/webapp/app/v3/services/finalizeTaskRun.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { type Prisma, type TaskRun } from "@trigger.dev/database";
33
import { findQueueInEnvironment } from "~/models/taskQueue.server";
44
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
55
import { logger } from "~/services/logger.server";
6-
import { updateMetadataService } from "~/services/metadata/updateMetadata.server";
6+
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
77
import { marqs } from "~/v3/marqs/index.server";
88
import { generateFriendlyId } from "../friendlyIdentifiers";
99
import { socketIo } from "../handleSocketIo.server";

apps/webapp/app/v3/services/triggerTask.server.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import { eventRepository } from "../eventRepository.server";
1313
import { tracer } from "../tracer.server";
1414
import { WithRunEngine } from "./baseService.server";
1515
import { TriggerTaskServiceV1 } from "./triggerTaskV1.server";
16+
import { env } from "~/env.server";
1617

1718
export type TriggerTaskServiceOptions = {
1819
idempotencyKey?: string;
@@ -107,6 +108,7 @@ export class TriggerTaskService extends WithRunEngine {
107108
runNumberIncrementer: new DefaultRunNumberIncrementer(),
108109
traceEventConcern,
109110
tracer: tracer,
111+
metadataMaximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,
110112
});
111113

112114
return await service.call({

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ describe("RunEngineTriggerTaskService", () => {
164164
validator: new MockTriggerTaskValidator(),
165165
traceEventConcern: new MockTraceEventConcern(),
166166
tracer: trace.getTracer("test", "0.0.0"),
167+
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
167168
});
168169

169170
const result = await triggerTaskService.call({
@@ -254,6 +255,7 @@ describe("RunEngineTriggerTaskService", () => {
254255
validator: new MockTriggerTaskValidator(),
255256
traceEventConcern: new MockTraceEventConcern(),
256257
tracer: trace.getTracer("test", "0.0.0"),
258+
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
257259
});
258260

259261
const result = await triggerTaskService.call({
@@ -395,6 +397,7 @@ describe("RunEngineTriggerTaskService", () => {
395397
validator: new MockTriggerTaskValidator(),
396398
traceEventConcern: new MockTraceEventConcern(),
397399
tracer: trace.getTracer("test", "0.0.0"),
400+
metadataMaximumSize: 1024 * 1024 * 1, // 1MB
398401
});
399402

400403
// Test case 1: Trigger with lockToVersion but no specific queue

0 commit comments

Comments
 (0)