Skip to content

Commit 089c935

Browse files
committed
Use unkey cache for the created at filter caching
1 parent f4a7369 commit 089c935

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

apps/webapp/app/services/realtimeClient.server.ts

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ import { createRedisClient, RedisClient, RedisWithClusterOptions } from "~/redis
77
import { longPollingFetch } from "~/utils/longPollingFetch";
88
import { logger } from "./logger.server";
99
import { jumpHash } from "@trigger.dev/core/v3/serverOnly";
10+
import { Cache, createCache, DefaultStatefulContext, Namespace } from "@unkey/cache";
11+
import { MemoryStore } from "@unkey/cache/stores";
12+
import { RedisCacheStore } from "./unkey/redisCacheStore.server";
1013

1114
export interface CachedLimitProvider {
1215
getCachedLimit: (organizationId: string, defaultValue: number) => Promise<number | undefined>;
@@ -67,12 +70,38 @@ export class RealtimeClient {
6770
private redis: RedisClient;
6871
private expiryTimeInSeconds: number;
6972
private cachedLimitProvider: CachedLimitProvider;
73+
private cache: Cache<{ createdAtFilter: string }>;
7074

7175
constructor(private options: RealtimeClientOptions) {
7276
this.redis = createRedisClient("trigger:realtime", options.redis);
7377
this.expiryTimeInSeconds = options.expiryTimeInSeconds ?? 60 * 5; // default to 5 minutes
7478
this.cachedLimitProvider = options.cachedLimitProvider;
7579
this.#registerCommands();
80+
81+
const ctx = new DefaultStatefulContext();
82+
const memory = new MemoryStore({ persistentMap: new Map() });
83+
const redisCacheStore = new RedisCacheStore({
84+
connection: {
85+
keyPrefix: "tr:cache:realtime",
86+
port: options.redis.port,
87+
host: options.redis.host,
88+
username: options.redis.username,
89+
password: options.redis.password,
90+
tlsDisabled: options.redis.tlsDisabled,
91+
clusterMode: options.redis.clusterMode,
92+
},
93+
});
94+
95+
// This cache holds the limits fetched from the platform service
96+
const cache = createCache({
97+
createdAtFilter: new Namespace<string>(ctx, {
98+
stores: [memory, redisCacheStore],
99+
fresh: 60_000 * 60 * 24 * 7, // 1 week
100+
stale: 60_000 * 60 * 24 * 14, // 2 weeks
101+
}),
102+
});
103+
104+
this.cache = cache;
76105
}
77106

78107
async streamChunks(
@@ -196,14 +225,22 @@ export class RealtimeClient {
196225
}
197226

198227
async #getCreatedAtFilter(shapeId: string) {
199-
// TODO: replace this with unkey cache so we can use in memory
200-
const createdAtFilterRawValue = await this.redis.get(`shapes:${shapeId}:filters:createdAt`);
228+
const createdAtFilterCacheResult = await this.cache.createdAtFilter.get(shapeId);
229+
230+
if (createdAtFilterCacheResult.err) {
231+
logger.error("[realtimeClient] Failed to get createdAt filter", {
232+
shapeId,
233+
error: createdAtFilterCacheResult.err,
234+
});
235+
236+
return;
237+
}
201238

202-
if (!createdAtFilterRawValue) {
239+
if (!createdAtFilterCacheResult.val) {
203240
return;
204241
}
205242

206-
return new Date(createdAtFilterRawValue);
243+
return new Date(createdAtFilterCacheResult.val);
207244
}
208245

209246
async #setCreatedAtFilterFromResponse(response: Response, createdAtFilter: Date) {
@@ -213,7 +250,7 @@ export class RealtimeClient {
213250
return;
214251
}
215252

216-
await this.redis.set(`shapes:${shapeId}:filters:createdAt`, createdAtFilter.toISOString());
253+
await this.cache.createdAtFilter.set(shapeId, createdAtFilter.toISOString());
217254
}
218255

219256
async #streamRunsWhere(

references/hello-world/src/trigger/realtime.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ export const realtimeByTagsTask = task({
2121

2222
for await (const run of runs.subscribeToRunsWithTag(
2323
"hello-world",
24-
{ createdAt: "10s", skipColumns: ["payload", "output", "number"] },
24+
{ createdAt: "2m", skipColumns: ["payload", "output", "number"] },
2525
{ signal: $signal }
2626
)) {
2727
logger.info("run", { run });

0 commit comments

Comments
 (0)