|
1 | | -import { batch, logger, task } from "@trigger.dev/sdk"; |
| 1 | +import { batch, logger, otel, task } from "@trigger.dev/sdk"; |
2 | 2 | import { createHash } from "node:crypto"; |
3 | 3 | import { setTimeout } from "node:timers/promises"; |
4 | 4 |
|
| 5 | +// Custom metrics — instruments are created once at module level |
| 6 | +const meter = otel.metrics.getMeter("hello-world"); |
| 7 | +const itemsProcessedCounter = meter.createCounter("items.processed", { |
| 8 | + description: "Total number of items processed", |
| 9 | + unit: "items", |
| 10 | +}); |
| 11 | +const itemDurationHistogram = meter.createHistogram("item.duration", { |
| 12 | + description: "Time spent processing each item", |
| 13 | + unit: "ms", |
| 14 | +}); |
| 15 | +const queueDepthGauge = meter.createUpDownCounter("queue.depth", { |
| 16 | + description: "Current simulated queue depth", |
| 17 | + unit: "items", |
| 18 | +}); |
| 19 | + |
5 | 20 | /** |
6 | 21 | * Tight computational loop that produces sustained high CPU utilization. |
7 | 22 | * Uses repeated SHA-256 hashing to keep the CPU busy. |
@@ -243,3 +258,69 @@ export const concurrentLoad = task({ |
243 | 258 | return { childRunIds: results.runs.map((r) => r.id) }; |
244 | 259 | }, |
245 | 260 | }); |
| 261 | + |
| 262 | +/** |
| 263 | + * Demonstrates custom OTEL metrics: counter, histogram, and up-down counter. |
| 264 | + * Simulates processing a queue of items with varying processing times. |
| 265 | + */ |
| 266 | +export const customMetrics = task({ |
| 267 | + id: "custom-metrics", |
| 268 | + run: async ( |
| 269 | + { |
| 270 | + itemCount = 20, |
| 271 | + minProcessingMs = 50, |
| 272 | + maxProcessingMs = 500, |
| 273 | + batchSize = 5, |
| 274 | + }: { |
| 275 | + itemCount?: number; |
| 276 | + minProcessingMs?: number; |
| 277 | + maxProcessingMs?: number; |
| 278 | + batchSize?: number; |
| 279 | + }, |
| 280 | + { ctx } |
| 281 | + ) => { |
| 282 | + logger.info("Starting custom metrics demo", { itemCount, batchSize }); |
| 283 | + |
| 284 | + // Simulate items arriving in the queue |
| 285 | + queueDepthGauge.add(itemCount); |
| 286 | + |
| 287 | + let totalProcessed = 0; |
| 288 | + |
| 289 | + for (let i = 0; i < itemCount; i += batchSize) { |
| 290 | + const currentBatch = Math.min(batchSize, itemCount - i); |
| 291 | + |
| 292 | + for (let j = 0; j < currentBatch; j++) { |
| 293 | + const processingTime = |
| 294 | + minProcessingMs + Math.random() * (maxProcessingMs - minProcessingMs); |
| 295 | + |
| 296 | + // Simulate work |
| 297 | + const start = performance.now(); |
| 298 | + let data = Buffer.from(`item-${i + j}`); |
| 299 | + const deadline = Date.now() + processingTime; |
| 300 | + while (Date.now() < deadline) { |
| 301 | + data = createHash("sha256").update(data).digest(); |
| 302 | + } |
| 303 | + const elapsed = performance.now() - start; |
| 304 | + |
| 305 | + // Record metrics |
| 306 | + itemsProcessedCounter.add(1, { "item.type": j % 2 === 0 ? "even" : "odd" }); |
| 307 | + itemDurationHistogram.record(elapsed, { "item.type": j % 2 === 0 ? "even" : "odd" }); |
| 308 | + queueDepthGauge.add(-1); |
| 309 | + |
| 310 | + totalProcessed++; |
| 311 | + } |
| 312 | + |
| 313 | + logger.info(`Processed batch`, { |
| 314 | + batchNumber: Math.floor(i / batchSize) + 1, |
| 315 | + totalProcessed, |
| 316 | + remaining: itemCount - totalProcessed, |
| 317 | + }); |
| 318 | + |
| 319 | + // Brief pause between batches |
| 320 | + await setTimeout(1000); |
| 321 | + } |
| 322 | + |
| 323 | + logger.info("Custom metrics demo complete", { totalProcessed }); |
| 324 | + return { totalProcessed }; |
| 325 | + }, |
| 326 | +}); |
0 commit comments