From e807c7e437f4fc7b7a5457cbd497fb7caa57314e Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 2 May 2025 14:56:34 +0100 Subject: [PATCH 1/5] make testcontainers wait until container has stopped --- internal-packages/testcontainers/src/index.ts | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/internal-packages/testcontainers/src/index.ts b/internal-packages/testcontainers/src/index.ts index 7253470998..ae5dcc76d6 100644 --- a/internal-packages/testcontainers/src/index.ts +++ b/internal-packages/testcontainers/src/index.ts @@ -53,7 +53,9 @@ const postgresContainer = async ( try { await use(container); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } }; @@ -92,7 +94,9 @@ const redisContainer = async ( try { await use(container); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } }; @@ -142,7 +146,9 @@ const electricOrigin = async ( try { await use(origin); } finally { - await container.stop(); + // WARNING: Testcontainers by default will not wait until the container has stopped. It will simply issue the stop command and return immediately. + // If you need to wait for the container to be stopped, you can provide a timeout. The unit of timeout option here is second + await container.stop({ timeout: 10 }); } }; From b64b448180b50e4095cb8990ed6db9d38427567b Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 2 May 2025 14:56:56 +0100 Subject: [PATCH 2/5] require unit tests for publishing again --- .github/workflows/publish.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 696abadc8c..674b410eaa 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -56,14 +56,14 @@ jobs: secrets: inherit publish-webapp: - needs: [typecheck] + needs: [typecheck, units] uses: ./.github/workflows/publish-webapp.yml secrets: inherit with: image_tag: ${{ inputs.image_tag }} publish-worker: - needs: [typecheck] + needs: [typecheck, units] uses: ./.github/workflows/publish-worker.yml secrets: inherit with: From 84e3a4a31c11f43941e10fb28f498f868b9b0df0 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 2 May 2025 15:35:12 +0100 Subject: [PATCH 3/5] add failing test case --- .../src/run-queue/tests/nack.test.ts | 61 +++++++++++++++++++ 1 file changed, 61 insertions(+) diff --git a/internal-packages/run-engine/src/run-queue/tests/nack.test.ts b/internal-packages/run-engine/src/run-queue/tests/nack.test.ts index f7b8aa1449..4b1e832023 100644 --- a/internal-packages/run-engine/src/run-queue/tests/nack.test.ts +++ b/internal-packages/run-engine/src/run-queue/tests/nack.test.ts @@ -214,4 +214,65 @@ describe("RunQueue.nackMessage", () => { } } ); + + redisTest( + "nacking a message with retryAt sets the correct requeue time", + async ({ redisContainer }) => { + const queue = new RunQueue({ + ...testOptions, + queueSelectionStrategy: new FairQueueSelectionStrategy({ + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + keys: testOptions.keys, + }), + redis: { + keyPrefix: "runqueue:test:", + host: redisContainer.getHost(), + port: redisContainer.getPort(), + }, + }); + + try { + const envMasterQueue = `env:${authenticatedEnvDev.id}`; + + // Enqueue message + await queue.enqueueMessage({ + env: authenticatedEnvDev, + message: messageDev, + masterQueues: ["main", envMasterQueue], + }); + + // Dequeue message + const dequeued = await queue.dequeueMessageFromMasterQueue( + "test_12345", + envMasterQueue, + 10 + ); + expect(dequeued.length).toBe(1); + + // Set retryAt to 5 seconds in the future + const retryAt = Date.now() + 5000; + await queue.nackMessage({ + orgId: messageDev.orgId, + messageId: messageDev.runId, + retryAt, + }); + + // Check the score of the message in the queue + const queueKey = queue.keys.queueKey(authenticatedEnvDev, messageDev.queue); + const score = await queue.oldestMessageInQueue(authenticatedEnvDev, messageDev.queue); + expect(typeof score).toBe("number"); + if (typeof score !== "number") { + throw new Error("Expected score to be a number, but got undefined"); + } + // Should be within 100ms of retryAt + expect(Math.abs(score - retryAt)).toBeLessThanOrEqual(100); + } finally { + await queue.quit(); + } + } + ); }); From 2e0e6410340a6ac3b7ab765edf1217c271cf0ba3 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 2 May 2025 15:36:12 +0100 Subject: [PATCH 4/5] make it pass --- internal-packages/run-engine/src/run-queue/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal-packages/run-engine/src/run-queue/index.ts b/internal-packages/run-engine/src/run-queue/index.ts index c34247f11c..b792dda793 100644 --- a/internal-packages/run-engine/src/run-queue/index.ts +++ b/internal-packages/run-engine/src/run-queue/index.ts @@ -541,7 +541,7 @@ export class RunQueue { } } - await this.#callNackMessage({ message }); + await this.#callNackMessage({ message, retryAt }); return true; }, From 0d4064e2957833c0a2cfb2bac477c5a73bf061f9 Mon Sep 17 00:00:00 2001 From: nicktrn <55853254+nicktrn@users.noreply.github.com> Date: Fri, 2 May 2025 15:37:03 +0100 Subject: [PATCH 5/5] add retry threshold ms env var and use it --- apps/webapp/app/env.server.ts | 3 ++- apps/webapp/app/v3/runEngine.server.ts | 1 + internal-packages/run-engine/src/engine/index.ts | 1 + 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index f0931683b4..589c9a0a33 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -460,6 +460,7 @@ const EnvironmentSchema = z.object({ RUN_ENGINE_REUSE_SNAPSHOT_COUNT: z.coerce.number().int().default(0), RUN_ENGINE_MAXIMUM_ENV_COUNT: z.coerce.number().int().optional(), RUN_ENGINE_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000), + RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS: z.coerce.number().int().default(30_000), RUN_ENGINE_WORKER_REDIS_HOST: z .string() @@ -717,7 +718,7 @@ const EnvironmentSchema = z.object({ SLACK_BOT_TOKEN: z.string().optional(), SLACK_SIGNUP_REASON_CHANNEL_ID: z.string().optional(), - + // kapa.ai KAPA_AI_WEBSITE_ID: z.string().optional(), }); diff --git a/apps/webapp/app/v3/runEngine.server.ts b/apps/webapp/app/v3/runEngine.server.ts index ad9e1c9aeb..01a9d3e5d4 100644 --- a/apps/webapp/app/v3/runEngine.server.ts +++ b/apps/webapp/app/v3/runEngine.server.ts @@ -95,6 +95,7 @@ function createRunEngine() { ...(env.RUN_ENGINE_RUN_QUEUE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), }, }, + retryWarmStartThresholdMs: env.RUN_ENGINE_RETRY_WARM_START_THRESHOLD_MS, }); return engine; diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index d21b1d6795..f1102a195c 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -304,6 +304,7 @@ export class RunEngine { waitpointSystem: this.waitpointSystem, delayedRunSystem: this.delayedRunSystem, machines: this.options.machines, + retryWarmStartThresholdMs: this.options.retryWarmStartThresholdMs, }); this.dequeueSystem = new DequeueSystem({