Skip to content

Commit 9f32a0f

Browse files
authored
Durable Queue (#459)
* initial implementation * Handle failure and add some test * Fix linting * update aws dep * add comment to env * review fix * review fix
1 parent 366d325 commit 9f32a0f

File tree

11 files changed

+565
-16
lines changed

11 files changed

+565
-16
lines changed
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
import { defineCloudflareConfig } from "@opennextjs/cloudflare";
22
import d1TagCache from "@opennextjs/cloudflare/d1-tag-cache";
33
import kvIncrementalCache from "@opennextjs/cloudflare/kv-cache";
4-
import memoryQueue from "@opennextjs/cloudflare/memory-queue";
4+
import doQueue from "@opennextjs/cloudflare/durable-queue";
55

66
export default defineCloudflareConfig({
77
incrementalCache: kvIncrementalCache,
88
tagCache: d1TagCache,
9-
queue: memoryQueue,
9+
queue: doQueue,
1010
});

examples/e2e/app-router/wrangler.jsonc

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,20 @@
88
"directory": ".open-next/assets",
99
"binding": "ASSETS"
1010
},
11+
"durable_objects": {
12+
"bindings": [
13+
{
14+
"name": "NEXT_CACHE_REVALIDATION_DURABLE_OBJECT",
15+
"class_name": "DurableObjectQueueHandler"
16+
}
17+
]
18+
},
19+
"migrations": [
20+
{
21+
"tag": "v1",
22+
"new_classes": ["DurableObjectQueueHandler"]
23+
}
24+
],
1125
"kv_namespaces": [
1226
{
1327
"binding": "NEXT_CACHE_WORKERS_KV",

packages/cloudflare/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
"dependencies": {
7474
"@ast-grep/napi": "^0.36.1",
7575
"@dotenvx/dotenvx": "catalog:",
76-
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@7e23eee",
76+
"@opennextjs/aws": "https://pkg.pr.new/@opennextjs/aws@773",
7777
"enquirer": "^2.4.1",
7878
"glob": "catalog:",
7979
"yaml": "^2.7.0"

packages/cloudflare/src/api/cloudflare-context.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
11
import type { Context, RunningCodeOptions } from "node:vm";
22

3+
import type { DurableObjectQueueHandler } from "./durable-objects/queue";
4+
35
declare global {
46
interface CloudflareEnv {
7+
// KV used for the incremental cache
58
NEXT_CACHE_WORKERS_KV?: KVNamespace;
9+
// D1 db used for the tag cache
610
NEXT_CACHE_D1?: D1Database;
11+
// D1 table to use for the tag cache for the tag/path mapping
712
NEXT_CACHE_D1_TAGS_TABLE?: string;
13+
// D1 table to use for the tag cache for storing the tag and their associated revalidation times
814
NEXT_CACHE_D1_REVALIDATIONS_TABLE?: string;
15+
// Service binding for the worker itself to be able to call itself from within the worker
916
NEXT_CACHE_REVALIDATION_WORKER?: Service;
17+
// Durable Object namespace to use for the durable object queue handler
18+
NEXT_CACHE_REVALIDATION_DURABLE_OBJECT?: DurableObjectNamespace<DurableObjectQueueHandler>;
19+
// Asset binding
1020
ASSETS?: Fetcher;
1121
}
1222
}
Lines changed: 312 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,312 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
import { DurableObjectQueueHandler } from "./queue";
4+
5+
vi.mock("cloudflare:workers", () => ({
6+
DurableObject: class {
7+
constructor(
8+
public ctx: DurableObjectState,
9+
public env: CloudflareEnv
10+
) {}
11+
},
12+
}));
13+
14+
const createDurableObjectQueue = ({
15+
fetchDuration,
16+
statusCode,
17+
headers,
18+
}: {
19+
fetchDuration: number;
20+
statusCode?: number;
21+
headers?: Headers;
22+
}) => {
23+
const mockState = {
24+
waitUntil: vi.fn(),
25+
blockConcurrencyWhile: vi.fn().mockImplementation(async (fn) => fn()),
26+
storage: {
27+
setAlarm: vi.fn(),
28+
getAlarm: vi.fn(),
29+
},
30+
};
31+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
32+
return new DurableObjectQueueHandler(mockState as any, {
33+
NEXT_CACHE_REVALIDATION_WORKER: {
34+
fetch: vi.fn().mockReturnValue(
35+
new Promise<Response>((res) =>
36+
setTimeout(
37+
() =>
38+
res(
39+
new Response(null, {
40+
status: statusCode,
41+
headers: headers ?? new Headers([["x-nextjs-cache", "REVALIDATED"]]),
42+
})
43+
),
44+
fetchDuration
45+
)
46+
)
47+
),
48+
connect: vi.fn(),
49+
},
50+
});
51+
};
52+
53+
const createMessage = (dedupId: string, lastModified = Date.now()) => ({
54+
MessageBody: { host: "test.local", url: "/test", eTag: "test", lastModified },
55+
MessageGroupId: "test.local/test",
56+
MessageDeduplicationId: dedupId,
57+
previewModeId: "test",
58+
});
59+
60+
describe("DurableObjectQueue", () => {
61+
describe("successful revalidation", () => {
62+
it("should process a single revalidation", async () => {
63+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
64+
const firstRequest = await queue.revalidate(createMessage("id"));
65+
expect(firstRequest).toBeUndefined();
66+
expect(queue.ongoingRevalidations.size).toBe(1);
67+
expect(queue.ongoingRevalidations.has("id")).toBe(true);
68+
69+
await queue.ongoingRevalidations.get("id");
70+
71+
expect(queue.ongoingRevalidations.size).toBe(0);
72+
expect(queue.ongoingRevalidations.has("id")).toBe(false);
73+
expect(queue.service.fetch).toHaveBeenCalledWith("https://test.local/test", {
74+
method: "HEAD",
75+
headers: {
76+
"x-prerender-revalidate": "test",
77+
"x-isr": "1",
78+
},
79+
signal: expect.any(AbortSignal),
80+
});
81+
});
82+
83+
it("should dedupe revalidations", async () => {
84+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
85+
await queue.revalidate(createMessage("id"));
86+
await queue.revalidate(createMessage("id"));
87+
expect(queue.ongoingRevalidations.size).toBe(1);
88+
expect(queue.ongoingRevalidations.has("id")).toBe(true);
89+
});
90+
91+
it("should block concurrency", async () => {
92+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
93+
await queue.revalidate(createMessage("id"));
94+
await queue.revalidate(createMessage("id2"));
95+
await queue.revalidate(createMessage("id3"));
96+
await queue.revalidate(createMessage("id4"));
97+
await queue.revalidate(createMessage("id5"));
98+
// the next one should block until one of the previous ones finishes
99+
const blockedReq = queue.revalidate(createMessage("id6"));
100+
101+
expect(queue.ongoingRevalidations.size).toBe(queue.maxRevalidations);
102+
expect(queue.ongoingRevalidations.has("id6")).toBe(false);
103+
expect(Array.from(queue.ongoingRevalidations.keys())).toEqual(["id", "id2", "id3", "id4", "id5"]);
104+
105+
// @ts-expect-error
106+
expect(queue.ctx.blockConcurrencyWhile).toHaveBeenCalledTimes(1);
107+
108+
// Here we await the blocked request to ensure it's resolved
109+
await blockedReq;
110+
// We then need to await for the actual revalidation to finish
111+
await Promise.all(Array.from(queue.ongoingRevalidations.values()));
112+
expect(queue.ongoingRevalidations.size).toBe(0);
113+
expect(queue.service.fetch).toHaveBeenCalledTimes(6);
114+
});
115+
});
116+
117+
describe("failed revalidation", () => {
118+
it("should not put it in failed state for an incorrect 200", async () => {
119+
const queue = createDurableObjectQueue({
120+
fetchDuration: 10,
121+
statusCode: 200,
122+
headers: new Headers([["x-nextjs-cache", "MISS"]]),
123+
});
124+
await queue.revalidate(createMessage("id"));
125+
126+
await queue.ongoingRevalidations.get("id");
127+
128+
expect(queue.routeInFailedState.size).toBe(0);
129+
});
130+
131+
it("should not put it in failed state for a failed revalidation with 404", async () => {
132+
const queue = createDurableObjectQueue({
133+
fetchDuration: 10,
134+
statusCode: 404,
135+
});
136+
await queue.revalidate(createMessage("id"));
137+
138+
await queue.ongoingRevalidations.get("id");
139+
140+
expect(queue.routeInFailedState.size).toBe(0);
141+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
142+
143+
await queue.revalidate(createMessage("id"));
144+
145+
expect(queue.routeInFailedState.size).toBe(0);
146+
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
147+
});
148+
149+
it("should put it in failed state if revalidation fails with 500", async () => {
150+
const queue = createDurableObjectQueue({
151+
fetchDuration: 10,
152+
statusCode: 500,
153+
});
154+
await queue.revalidate(createMessage("id"));
155+
156+
await queue.ongoingRevalidations.get("id");
157+
158+
expect(queue.routeInFailedState.size).toBe(1);
159+
expect(queue.routeInFailedState.has("id")).toBe(true);
160+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
161+
162+
await queue.revalidate(createMessage("id"));
163+
164+
expect(queue.routeInFailedState.size).toBe(1);
165+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
166+
});
167+
168+
it("should put it in failed state if revalidation fetch throw", async () => {
169+
const queue = createDurableObjectQueue({
170+
fetchDuration: 10,
171+
});
172+
// @ts-expect-error - This is mocked above
173+
queue.service.fetch.mockImplementationOnce(() => Promise.reject(new Error("fetch error")));
174+
await queue.revalidate(createMessage("id"));
175+
176+
await queue.ongoingRevalidations.get("id");
177+
178+
expect(queue.routeInFailedState.size).toBe(1);
179+
expect(queue.routeInFailedState.has("id")).toBe(true);
180+
expect(queue.ongoingRevalidations.size).toBe(0);
181+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
182+
183+
await queue.revalidate(createMessage("id"));
184+
185+
expect(queue.routeInFailedState.size).toBe(1);
186+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
187+
});
188+
});
189+
190+
describe("addAlarm", () => {
191+
const getStorage = (queue: DurableObjectQueueHandler): DurableObjectStorage => {
192+
// @ts-expect-error - ctx is a protected field
193+
return queue.ctx.storage;
194+
};
195+
196+
it("should not add an alarm if there are no failed states", async () => {
197+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
198+
await queue.addAlarm();
199+
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
200+
});
201+
202+
it("should add an alarm if there are failed states", async () => {
203+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
204+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
205+
await queue.addAlarm();
206+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(1000);
207+
});
208+
209+
it("should not add an alarm if there is already an alarm set", async () => {
210+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
211+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
212+
// @ts-expect-error
213+
queue.ctx.storage.getAlarm.mockResolvedValueOnce(1000);
214+
await queue.addAlarm();
215+
expect(getStorage(queue).setAlarm).not.toHaveBeenCalled();
216+
});
217+
218+
it("should set the alarm to the lowest nextAlarm", async () => {
219+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
220+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 0, nextAlarmMs: 1000 });
221+
queue.routeInFailedState.set("id2", { msg: createMessage("id2"), retryCount: 0, nextAlarmMs: 500 });
222+
await queue.addAlarm();
223+
expect(getStorage(queue).setAlarm).toHaveBeenCalledWith(500);
224+
});
225+
});
226+
227+
describe("addToFailedState", () => {
228+
it("should add a failed state", async () => {
229+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
230+
await queue.addToFailedState(createMessage("id"));
231+
expect(queue.routeInFailedState.size).toBe(1);
232+
expect(queue.routeInFailedState.has("id")).toBe(true);
233+
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
234+
});
235+
236+
it("should add a failed state with the correct nextAlarm", async () => {
237+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
238+
await queue.addToFailedState(createMessage("id"));
239+
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
240+
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(1);
241+
});
242+
243+
it("should add a failed state with the correct nextAlarm for a retry", async () => {
244+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
245+
await queue.addToFailedState(createMessage("id"));
246+
await queue.addToFailedState(createMessage("id"));
247+
expect(queue.routeInFailedState.get("id")?.nextAlarmMs).toBeGreaterThan(Date.now());
248+
expect(queue.routeInFailedState.get("id")?.retryCount).toBe(2);
249+
});
250+
251+
it("should not add a failed state if it has been retried 6 times", async () => {
252+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
253+
queue.routeInFailedState.set("id", { msg: createMessage("id"), retryCount: 6, nextAlarmMs: 1000 });
254+
await queue.addToFailedState(createMessage("id"));
255+
expect(queue.routeInFailedState.size).toBe(0);
256+
});
257+
});
258+
259+
describe("alarm", () => {
260+
it("should execute revalidations for expired events", async () => {
261+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
262+
queue.routeInFailedState.set("id", {
263+
msg: createMessage("id"),
264+
retryCount: 0,
265+
nextAlarmMs: Date.now() - 1000,
266+
});
267+
queue.routeInFailedState.set("id2", {
268+
msg: createMessage("id2"),
269+
retryCount: 0,
270+
nextAlarmMs: Date.now() - 1000,
271+
});
272+
await queue.alarm();
273+
expect(queue.routeInFailedState.size).toBe(0);
274+
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
275+
});
276+
277+
it("should execute revalidations for the next event to retry", async () => {
278+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
279+
queue.routeInFailedState.set("id", {
280+
msg: createMessage("id"),
281+
retryCount: 0,
282+
nextAlarmMs: Date.now() + 1000,
283+
});
284+
queue.routeInFailedState.set("id2", {
285+
msg: createMessage("id2"),
286+
retryCount: 0,
287+
nextAlarmMs: Date.now() + 500,
288+
});
289+
await queue.alarm();
290+
expect(queue.routeInFailedState.size).toBe(1);
291+
expect(queue.service.fetch).toHaveBeenCalledTimes(1);
292+
expect(queue.routeInFailedState.has("id2")).toBe(false);
293+
});
294+
295+
it("should execute revalidations for the next event to retry and expired events", async () => {
296+
const queue = createDurableObjectQueue({ fetchDuration: 10 });
297+
queue.routeInFailedState.set("id", {
298+
msg: createMessage("id"),
299+
retryCount: 0,
300+
nextAlarmMs: Date.now() + 1000,
301+
});
302+
queue.routeInFailedState.set("id2", {
303+
msg: createMessage("id2"),
304+
retryCount: 0,
305+
nextAlarmMs: Date.now() - 1000,
306+
});
307+
await queue.alarm();
308+
expect(queue.routeInFailedState.size).toBe(0);
309+
expect(queue.service.fetch).toHaveBeenCalledTimes(2);
310+
});
311+
});
312+
});

0 commit comments

Comments
 (0)