diff --git a/package.json b/package.json index 75d2d2b470..097ab1aebb 100644 --- a/package.json +++ b/package.json @@ -36,6 +36,7 @@ "@rivetkit/cloudflare-workers": "workspace:*", "@rivetkit/next-js": "workspace:*", "@rivetkit/db": "workspace:*", + "@rivetkit/sqlite-vfs": "workspace:*", "@rivetkit/engine-api-full": "workspace:*", "@types/react": "^19", "@types/react-dom": "^19", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 6db1626ab2..821858b52d 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -10,6 +10,7 @@ overrides: '@rivetkit/cloudflare-workers': workspace:* '@rivetkit/next-js': workspace:* '@rivetkit/db': workspace:* + '@rivetkit/sqlite-vfs': workspace:* '@rivetkit/engine-api-full': workspace:* '@types/react': ^19 '@types/react-dom': ^19 @@ -2934,7 +2935,7 @@ importers: version: 0.31.5 drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0)(bun-types@1.3.0(@types/react@19.2.2))(kysely@0.28.8) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(bun-types@1.3.0(@types/react@19.2.2))(kysely@0.28.8)(pg@8.17.2) devDependencies: '@types/node': specifier: ^22.13.9 @@ -3856,7 +3857,7 @@ importers: version: 3.13.12(react-dom@19.1.0(react@19.1.0))(react@19.1.0) '@uiw/codemirror-extensions-basic-setup': specifier: ^4.25.1 - version: 4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.8.1)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) + version: 4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.9.0)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) '@uiw/codemirror-theme-github': specifier: ^4.25.1 version: 4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) @@ -4292,6 +4293,9 @@ importers: '@rivetkit/on-change': specifier: ^6.0.2-rc.1 version: 6.0.2-rc.1 + '@rivetkit/sqlite-vfs': + specifier: workspace:* + version: link:../sqlite-vfs '@rivetkit/traces': specifier: workspace:* version: link:../traces @@ -4312,7 +4316,7 @@ importers: version: 0.31.5 drizzle-orm: specifier: ^0.44.2 - version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(better-sqlite3@11.10.0)(bun-types@1.3.0(@types/react@19.2.2))(kysely@0.28.8) + version: 0.44.6(@cloudflare/workers-types@4.20251014.0)(@opentelemetry/api@1.9.0)(@types/better-sqlite3@7.6.13)(@types/pg@8.16.0)(better-sqlite3@11.10.0)(bun-types@1.3.0(@types/react@19.2.2))(kysely@0.28.8)(pg@8.17.2) get-port: specifier: ^7.1.0 version: 7.1.0 @@ -4337,9 +4341,6 @@ importers: vbare: specifier: ^0.0.4 version: 0.0.4 - wa-sqlite: - specifier: ^1.0.0 - version: 1.0.0 zod: specifier: ^4.1.0 version: 4.1.13 @@ -4414,6 +4415,40 @@ importers: specifier: ^8.5.0 version: 8.5.0(@microsoft/api-extractor@7.53.2(@types/node@22.18.1))(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.2) + rivetkit-typescript/packages/sqlite-vfs: + dependencies: + '@rivetkit/bare-ts': + specifier: ^0.6.2 + version: 0.6.2 + vbare: + specifier: ^0.0.4 + version: 0.0.4 + wa-sqlite: + specifier: ^1.0.0 + version: 1.0.0 + devDependencies: + '@bare-ts/tools': + specifier: ^0.13.0 + version: 0.13.0(@bare-ts/lib@0.6.0) + '@types/node': + specifier: ^22.13.1 + version: 22.19.5 + commander: + specifier: ^12.0.0 + version: 12.1.0 + tsup: + specifier: ^8.4.0 + version: 8.5.1(@microsoft/api-extractor@7.53.2(@types/node@22.19.5))(jiti@1.21.7)(postcss@8.5.6)(tsx@4.20.6)(typescript@5.9.3)(yaml@2.8.2) + tsx: + specifier: ^4.7.0 + version: 4.20.6 + typescript: + specifier: ^5.7.3 + version: 5.9.3 + vitest: + specifier: ^3.1.1 + version: 3.2.4(@types/debug@4.1.12)(@types/node@22.19.5)(less@4.4.1)(lightningcss@1.30.2)(sass@1.93.2)(stylus@0.62.0)(terser@5.44.1) + rivetkit-typescript/packages/traces: dependencies: '@rivetkit/bare-ts': @@ -5875,9 +5910,6 @@ packages: resolution: {integrity: sha512-ooWCrlZP11i8GImSjTHYHLkvFDP48nS4+204nGb1RiX/WXYHmJA2III9/e2DWVabCESdW7hBAEzHRqUn9OUVvQ==} engines: {node: '>=0.1.90'} - '@corex/deepmerge@4.0.43': - resolution: {integrity: sha512-N8uEMrMPL0cu/bdboEWpQYb/0i2K5Qn8eCsxzOmxSggJbbQte7ljMRoXm917AbntqTGOzdTu+vP3KOOzoC70HQ==} - '@cspotcode/source-map-support@0.8.1': resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} engines: {node: '>=12'} @@ -10862,10 +10894,6 @@ packages: resolution: {integrity: sha512-+W/5efTR7y5HRD7gACw9yQjqMVvEMLBHmboM/kPWam+H+Hmyrgjh6YncVKK122YZkXrLudzTuAukUw9FnMf7IQ==} engines: {node: 10.* || >= 12.*} - cli-width@4.1.0: - resolution: {integrity: sha512-ouuZd4/dm2Sw5Gmqy6bGyNNNe1qt9RpmxveLSO7KcgsTnU7RXfsw+/bukWGo1abgBiMAic068rclZsO4IWmmxQ==} - engines: {node: '>= 12'} - client-only@0.0.1: resolution: {integrity: sha512-IV3Ou0jSMzZrd3pZ48nLkT9DA7Ag1pnPzaiQhpW7c3RbcqqzvzzVu+L8gfqMp/8IM2MQtSiqaCxrrcfu8I8rMA==} @@ -18743,8 +18771,6 @@ snapshots: '@colors/colors@1.5.0': optional: true - '@corex/deepmerge@4.0.43': {} - '@cspotcode/source-map-support@0.8.1': dependencies: '@jridgewell/trace-mapping': 0.3.9 @@ -19930,6 +19956,7 @@ snapshots: dependencies: hono: 4.9.8 optional: true + '@hono/node-ws@1.2.0(@hono/node-server@1.19.1(hono@4.11.3))(hono@4.11.3)': dependencies: '@hono/node-server': 1.19.1(hono@4.11.3) @@ -20702,7 +20729,7 @@ snapshots: react-simple-code-editor: 0.14.1(react-dom@19.1.0(react@19.1.0))(react@19.1.0) serve-handler: 6.1.6 tailwind-merge: 2.6.0 - tailwindcss-animate: 1.0.7(tailwindcss@3.4.17(ts-node@10.9.2(@types/node@20.19.13)(typescript@5.9.2))) + tailwindcss-animate: 1.0.7(tailwindcss@3.4.17(ts-node@10.9.2(@types/node@20.19.13)(typescript@5.9.3))) zod: 3.25.76 transitivePeerDependencies: - '@cfworker/json-schema' @@ -23200,6 +23227,16 @@ snapshots: '@codemirror/state': 6.5.2 '@codemirror/view': 6.38.2 + '@uiw/codemirror-extensions-basic-setup@4.25.1(@codemirror/autocomplete@6.19.0)(@codemirror/commands@6.9.0)(@codemirror/language@6.11.3)(@codemirror/lint@6.9.0)(@codemirror/search@6.5.11)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2)': + dependencies: + '@codemirror/autocomplete': 6.19.0 + '@codemirror/commands': 6.9.0 + '@codemirror/language': 6.11.3 + '@codemirror/lint': 6.9.0 + '@codemirror/search': 6.5.11 + '@codemirror/state': 6.5.2 + '@codemirror/view': 6.38.2 + '@uiw/codemirror-theme-github@4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2)': dependencies: '@uiw/codemirror-themes': 4.25.1(@codemirror/language@6.11.3)(@codemirror/state@6.5.2)(@codemirror/view@6.38.2) @@ -24620,8 +24657,6 @@ snapshots: optionalDependencies: '@colors/colors': 1.5.0 - cli-width@4.1.0: {} - client-only@0.0.1: {} clipboardy@3.0.0: diff --git a/rivetkit-typescript/CLAUDE.md b/rivetkit-typescript/CLAUDE.md new file mode 100644 index 0000000000..ab33198ff4 --- /dev/null +++ b/rivetkit-typescript/CLAUDE.md @@ -0,0 +1,8 @@ +# rivetkit-typescript/CLAUDE.md + +## Tree-Shaking Boundaries + +- Do not import `@rivetkit/workflow-engine` outside the `rivetkit/workflow` entrypoint so it remains tree-shakeable. +- Do not import SQLite VFS or `wa-sqlite` outside the `rivetkit/db` (or `@rivetkit/sqlite-vfs`) entrypoint so SQLite support remains tree-shakeable. +- Importing `rivetkit/db` (or `@rivetkit/sqlite-vfs`) is the explicit opt-in for SQLite. Do not lazily load SQLite from `rivetkit/db`; it may be imported eagerly inside that entrypoint. +- Core drivers must remain SQLite-agnostic. Any SQLite-specific wiring belongs behind the `rivetkit/db` or `@rivetkit/sqlite-vfs` boundary. diff --git a/rivetkit-typescript/packages/rivetkit/package.json b/rivetkit-typescript/packages/rivetkit/package.json index 9e55381bd4..b66787a34e 100644 --- a/rivetkit-typescript/packages/rivetkit/package.json +++ b/rivetkit-typescript/packages/rivetkit/package.json @@ -214,6 +214,7 @@ "@rivetkit/engine-runner": "workspace:*", "@rivetkit/fast-json-patch": "^3.1.2", "@rivetkit/on-change": "^6.0.2-rc.1", + "@rivetkit/sqlite-vfs": "workspace:*", "@rivetkit/traces": "workspace:*", "@rivetkit/virtual-websocket": "workspace:*", "cbor-x": "^1.6.0", @@ -225,7 +226,6 @@ "pino": "^9.5.0", "uuid": "^12.0.0", "vbare": "^0.0.4", - "wa-sqlite": "^1.0.0", "zod": "^4.1.0" }, "devDependencies": { diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts index 21abdc4305..c9359982e2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/contexts/base/actor.ts @@ -14,7 +14,9 @@ import { ActorKv } from "../../instance/kv"; import { ActorQueue } from "../../instance/queue"; import type { Schedule } from "../../schedule"; -export const ACTOR_CONTEXT_INTERNAL_SYMBOL = Symbol("actorContextInternal"); +export const ACTOR_CONTEXT_INTERNAL_SYMBOL = Symbol.for( + "rivetkit.actorContextInternal", +); /** * ActorContext class that provides access to actor methods and state diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/definition.ts b/rivetkit-typescript/packages/rivetkit/src/actor/definition.ts index 2352973ce8..75c5ee83fe 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/definition.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/definition.ts @@ -37,11 +37,43 @@ export class ActorDefinition< instantiate(): ActorInstance { // Lazy import to avoid pulling server-only dependencies (traces, fdb-tuple, etc.) // into browser bundles. This method is only called on the server. - const { ActorInstance: ActorInstanceClass } = require("./instance/mod"); - return new ActorInstanceClass(this.#config); + const requireFn = typeof require === "undefined" ? undefined : require; + if (!requireFn) { + throw new Error( + "ActorDefinition.instantiate requires a Node.js environment", + ); + } + + try { + const { ActorInstance: ActorInstanceClass } = + requireFn("./instance/mod"); + return new ActorInstanceClass(this.#config); + } catch (error) { + if (!isInstanceModuleNotFound(error)) { + throw error; + } + + try { + // In tests, register tsx so require() can resolve .ts files. + requireFn("tsx/cjs"); + } catch { + throw error; + } + + const { ActorInstance: ActorInstanceClass } = + requireFn("./instance/mod"); + return new ActorInstanceClass(this.#config); + } } } +function isInstanceModuleNotFound(error: unknown): boolean { + if (!error || typeof error !== "object") return false; + const err = error as { code?: string; message?: string }; + if (err.code !== "MODULE_NOT_FOUND") return false; + return (err.message ?? "").includes("./instance/mod"); +} + export function lookupInRegistry( config: RegistryConfig, name: string, diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts index 838ad2d4d6..84b33c1dc5 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/driver.ts @@ -7,7 +7,6 @@ import type { RegistryConfig } from "@/registry/config"; import type { RawDatabaseClient, } from "@/db/config"; -import type { SqliteVfs } from "@/db/vfs/mod"; import { BaseSQLiteDatabase } from "drizzle-orm/sqlite-core"; export type ActorDriverBuilder = ( @@ -68,14 +67,6 @@ export interface ActorDriver { actorId: string, ): Promise | undefined>; - /** - * SQLite VFS instance for creating KV-backed databases. - * Each driver should create its own instance to avoid concurrency issues. - * If not provided, the db() provider will throw an error. - * @experimental - */ - sqliteVfs?: SqliteVfs; - /** * Requests the actor to go to sleep. * diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts index 2f3d0bed6e..f4688a96e1 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/mod.ts @@ -155,7 +155,7 @@ export class ActorInstance { // MARK: - Inspector #inspectorToken?: string; - #inspector = new ActorInspector(this); + #inspector!: ActorInspector; // MARK: - Tracing #traces!: Traces; @@ -164,6 +164,7 @@ export class ActorInstance { constructor(config: ActorConfig) { this.#config = config; this.actorContext = new ActorContext(this); + this.#inspector = new ActorInspector(this); } // MARK: - Public Getters @@ -1280,7 +1281,6 @@ export class ActorInstance { batchDelete: (keys) => this.driver.kvBatchDelete(this.#actorId, keys), }, - sqliteVfs: this.driver.sqliteVfs, }); this.#rLog.info({ msg: "database migration starting" }); await this.#config.db.onMigrate?.(client); diff --git a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts index 791a276d4f..62ca92d3fc 100644 --- a/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts +++ b/rivetkit-typescript/packages/rivetkit/src/actor/instance/queue-manager.ts @@ -48,6 +48,15 @@ interface QueueWaiter { timeoutHandle?: ReturnType; } +interface QueueNameWaiter { + id: string; + nameSet: Set; + resolve: () => void; + reject: (error: Error) => void; + signal?: AbortSignal; + abortHandler?: () => void; +} + interface QueueCompletionWaiter { id: string; messageId: bigint; @@ -78,6 +87,7 @@ export class QueueManager { #actor: ActorInstance; #driver: ActorDriver; #waiters = new Map(); + #nameWaiters = new Map(); #completionWaiters = new Map(); #metadata: QueueMetadata = { ...DEFAULT_METADATA }; #pendingMessageId: bigint | undefined; @@ -302,6 +312,53 @@ export class QueueManager { return promise; } + /** Waits until any message for the provided queue names is available. */ + async waitForNames( + names: string[], + abortSignal: AbortSignal, + ): Promise { + this.#actor.assertReady(); + + const nameSet = new Set(names); + const now = Date.now(); + const existing = await this.#loadQueueMessages(); + if ( + existing.some( + (message) => + nameSet.has(message.name) && + !message.inFlight && + message.availableAt <= now, + ) + ) { + return; + } + + const { promise, resolve, reject } = promiseWithResolvers(); + const waiterId = crypto.randomUUID(); + const waiter: QueueNameWaiter = { + id: waiterId, + nameSet, + resolve, + reject, + signal: abortSignal, + }; + + const onAbort = () => { + this.#nameWaiters.delete(waiterId); + waiter.reject(new errors.ActorAborted()); + }; + + if (abortSignal.aborted) { + onAbort(); + return promise; + } + abortSignal.addEventListener("abort", onAbort, { once: true }); + waiter.abortHandler = onAbort; + + this.#nameWaiters.set(waiterId, waiter); + return promise; + } + /** Waits for a specific queue message to complete. */ async waitForCompletion( messageId: bigint, @@ -326,20 +383,6 @@ export class QueueManager { } this.#completionWaiters.set(messageId, waiter); - - const [messageEntry] = await this.#driver.kvBatchGet(this.#actor.id, [ - makeQueueMessageKey(messageId), - ]); - if (!messageEntry) { - const existing = this.#completionWaiters.get(messageId); - if (existing?.id === waiterId) { - this.#completionWaiters.delete(messageId); - if (existing.timeoutHandle) { - clearTimeout(existing.timeoutHandle); - } - resolve({ status: "completed", response: undefined }); - } - } return promise; } @@ -597,7 +640,45 @@ export class QueueManager { this.#redeliveryTimeout = undefined; this.#redeliveryAt = undefined; } - if (this.#waiters.size === 0) { + const hasReceiveWaiters = this.#waiters.size > 0; + const hasNameWaiters = this.#nameWaiters.size > 0; + if (!hasReceiveWaiters && !hasNameWaiters) { + return; + } + + if (hasNameWaiters) { + const entries = await this.#loadQueueMessages(); + const now = Date.now(); + const nameWaiters = [...this.#nameWaiters.values()]; + for (const waiter of nameWaiters) { + if (waiter.signal?.aborted) { + this.#nameWaiters.delete(waiter.id); + waiter.reject(new errors.ActorAborted()); + continue; + } + + const hasMatch = entries.some( + (message) => + waiter.nameSet.has(message.name) && + !message.inFlight && + message.availableAt <= now, + ); + if (!hasMatch) { + continue; + } + + this.#nameWaiters.delete(waiter.id); + if (waiter.abortHandler) { + waiter.signal?.removeEventListener( + "abort", + waiter.abortHandler, + ); + } + waiter.resolve(); + } + } + + if (!hasReceiveWaiters) { return; } const pending = [...this.#waiters.values()]; diff --git a/rivetkit-typescript/packages/rivetkit/src/db/config.ts b/rivetkit-typescript/packages/rivetkit/src/db/config.ts index 20fd37988f..533e11e18f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/config.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/config.ts @@ -1,5 +1,3 @@ -import type { SqliteVfs } from "./vfs/mod"; - export type AnyDatabaseProvider = DatabaseProvider | undefined; /** @@ -33,12 +31,6 @@ export interface DatabaseProviderContext { batchGet: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; batchDelete: (keys: Uint8Array[]) => Promise; }; - - /** - * SQLite VFS instance for creating KV-backed databases. - * Each driver creates its own instance to avoid concurrency issues. - */ - sqliteVfs?: SqliteVfs; } export type DatabaseProvider = { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts index bfce27de4d..8dfb0e8c87 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/drizzle/mod.ts @@ -4,7 +4,7 @@ import { } from "drizzle-orm/better-sqlite3"; import { drizzle as durableDrizzle } from "drizzle-orm/durable-sqlite"; import { migrate as durableMigrate } from "drizzle-orm/durable-sqlite/migrator"; -import type { KvVfsOptions } from "../vfs/mod"; +import { SqliteVfs, type KvVfsOptions } from "@rivetkit/sqlite-vfs"; import type { DatabaseProvider, RawAccess } from "../config"; export * from "drizzle-orm/sqlite-core"; @@ -61,6 +61,8 @@ export function db< >( config?: DatabaseFactoryConfig, ): DatabaseProvider & RawAccess> { + const sqliteVfs = new SqliteVfs(); + return { createClient: async (ctx) => { // Check if override is provided @@ -83,12 +85,8 @@ export function db< } // Construct KV-backed client using actor driver's KV operations - if (!ctx.sqliteVfs) { - throw new Error("SqliteVfs instance not provided in context. The driver must provide a sqliteVfs instance."); - } - const kvStore = createActorKvStore(ctx.kv); - const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore); + const db = await sqliteVfs.open(ctx.actorId, kvStore); // Wrap the KV-backed client with Drizzle const rawClient = { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts index 775992e6df..592d085ced 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/db/mod.ts @@ -1,4 +1,4 @@ -import type { KvVfsOptions } from "./vfs/mod"; +import { SqliteVfs, type KvVfsOptions } from "@rivetkit/sqlite-vfs"; import type { DatabaseProvider, RawAccess } from "./config"; interface DatabaseFactoryConfig { @@ -55,13 +55,11 @@ export function db({ } satisfies RawAccess; } - // Construct KV-backed client using actor driver's KV operations - if (!ctx.sqliteVfs) { - throw new Error("SqliteVfs instance not provided in context. The driver must provide a sqliteVfs instance."); - } + const sqliteVfs = new SqliteVfs(); + // Construct KV-backed client using actor driver's KV operations const kvStore = createActorKvStore(ctx.kv); - const db = await ctx.sqliteVfs.open(ctx.actorId, kvStore); + const db = await sqliteVfs.open(ctx.actorId, kvStore); return { execute: async (query, ...args) => { diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/vfs/mod.ts deleted file mode 100644 index b68ecdbfcf..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/db/vfs/mod.ts +++ /dev/null @@ -1,711 +0,0 @@ -/** - * SQLite raw database with KV storage backend - * - * This module provides a SQLite API that uses a KV-backed VFS - * for storage. Each SqliteVfs instance is independent and can be - * used concurrently with other instances. - */ - -// Note: wa-sqlite VFS.Base type definitions have incorrect types for xRead/xWrite -// The actual runtime uses Uint8Array, not the {size, value} object shown in types -import * as VFS from "wa-sqlite/src/VFS.js"; - -// VFS debug logging - set VFS_DEBUG=1 to enable -const VFS_DEBUG = process.env.VFS_DEBUG === "1"; -function vfsLog(op: string, details: Record) { - if (VFS_DEBUG) { - console.log(`[VFS] ${op}`, JSON.stringify(details)); - } -} -import SQLiteESMFactory from "wa-sqlite/dist/wa-sqlite-async.mjs"; -import { Factory } from "wa-sqlite"; -import { readFileSync } from "node:fs"; -import { createRequire } from "node:module"; -import { CHUNK_SIZE, getMetaKey, getChunkKey } from "./kv"; -import { - FILE_META_VERSIONED, - CURRENT_VERSION, -} from "./schemas/file-meta/versioned"; -import type { FileMeta } from "./schemas/file-meta/mod"; - -/** - * Options for creating the KV VFS - * Operations are scoped to a specific actor's KV store - */ -export interface KvVfsOptions { - /** Get a single value by key. Returns null if missing. */ - get: (key: Uint8Array) => Promise; - /** Get multiple values by keys. Returns null for missing keys. */ - getBatch: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; - /** Put a single key-value pair */ - put: (key: Uint8Array, value: Uint8Array) => Promise; - /** Put multiple key-value pairs */ - putBatch: (entries: [Uint8Array, Uint8Array][]) => Promise; - /** Delete multiple keys */ - deleteBatch: (keys: Uint8Array[]) => Promise; -} - -/** - * Represents an open file - */ -interface OpenFile { - /** File path */ - path: string; - /** File size in bytes */ - size: number; - /** Open flags */ - flags: number; - /** KV options for this file */ - options: KvVfsOptions; -} - -/** - * Encodes file metadata to a Uint8Array using BARE schema - */ -function encodeFileMeta(size: number): Uint8Array { - const meta: FileMeta = { size: BigInt(size) }; - return FILE_META_VERSIONED.serializeWithEmbeddedVersion( - meta, - CURRENT_VERSION, - ); -} - -/** - * Decodes file metadata from a Uint8Array using BARE schema - */ -function decodeFileMeta(data: Uint8Array): number { - const meta = FILE_META_VERSIONED.deserializeWithEmbeddedVersion(data); - return Number(meta.size); -} - -/** - * SQLite API interface (subset needed for VFS registration) - * This is part of wa-sqlite but not exported in TypeScript types - */ -interface SQLite3Api { - vfs_register: (vfs: unknown, makeDefault?: boolean) => number; - open_v2: ( - filename: string, - flags: number, - vfsName?: string, - ) => Promise; - close: (db: number) => Promise; - exec: ( - db: number, - sql: string, - callback?: (row: unknown[], columns: string[]) => void, - ) => Promise; - SQLITE_OPEN_READWRITE: number; - SQLITE_OPEN_CREATE: number; -} - -/** - * Simple async mutex for serializing database operations - * wa-sqlite is not safe for concurrent open_v2 calls - */ -class AsyncMutex { - #locked = false; - #waiting: (() => void)[] = []; - - async acquire(): Promise { - while (this.#locked) { - await new Promise((resolve) => this.#waiting.push(resolve)); - } - this.#locked = true; - } - - release(): void { - this.#locked = false; - const next = this.#waiting.shift(); - if (next) { - next(); - } - } -} - -/** - * Database wrapper that provides a simplified SQLite API - */ -export class Database { - readonly #sqlite3: SQLite3Api; - readonly #handle: number; - readonly #fileName: string; - readonly #onClose: () => void; - - constructor(sqlite3: SQLite3Api, handle: number, fileName: string, onClose: () => void) { - this.#sqlite3 = sqlite3; - this.#handle = handle; - this.#fileName = fileName; - this.#onClose = onClose; - } - - /** - * Execute SQL with optional row callback - * @param sql - SQL statement to execute - * @param callback - Called for each result row with (row, columns) where row is an array of values and columns is an array of column names - */ - async exec(sql: string, callback?: (row: unknown[], columns: string[]) => void): Promise { - return this.#sqlite3.exec(this.#handle, sql, callback); - } - - /** - * Close the database - */ - async close(): Promise { - await this.#sqlite3.close(this.#handle); - this.#onClose(); - } - - /** - * Get the raw wa-sqlite API (for advanced usage) - */ - get sqlite3(): SQLite3Api { - return this.#sqlite3; - } - - /** - * Get the raw database handle (for advanced usage) - */ - get handle(): number { - return this.#handle; - } -} - -/** - * SQLite VFS backed by KV storage. - * - * Each instance is independent and has its own wa-sqlite WASM module. - * This allows multiple instances to operate concurrently without interference. - */ -export class SqliteVfs { - #sqlite3: SQLite3Api | null = null; - #sqliteSystem: SqliteSystem | null = null; - #initPromise: Promise | null = null; - #openMutex = new AsyncMutex(); - #instanceId: string; - - constructor() { - // Generate unique instance ID for VFS name - this.#instanceId = crypto.randomUUID().replace(/-/g, '').slice(0, 8); - } - - /** - * Initialize wa-sqlite and VFS (called once per instance) - */ - async #ensureInitialized(): Promise { - // Fast path: already initialized - if (this.#sqlite3 && this.#sqliteSystem) { - return; - } - - // Synchronously create the promise if not started - if (!this.#initPromise) { - this.#initPromise = (async () => { - // Load WASM binary (Node.js environment) - const require = createRequire(import.meta.url); - const wasmPath = require.resolve("wa-sqlite/dist/wa-sqlite-async.wasm"); - const wasmBinary = readFileSync(wasmPath); - - // Initialize wa-sqlite module - each instance gets its own module - const module = await SQLiteESMFactory({ wasmBinary }); - this.#sqlite3 = Factory(module) as SQLite3Api; - - // Create and register VFS with unique name - this.#sqliteSystem = new SqliteSystem(this.#sqlite3, `kv-vfs-${this.#instanceId}`); - this.#sqliteSystem.register(); - })(); - } - - // Wait for initialization - await this.#initPromise; - } - - /** - * Open a SQLite database using KV storage backend - * - * @param fileName - The database file name (typically the actor ID) - * @param options - KV storage operations for this database - * @returns A Database instance - */ - async open( - fileName: string, - options: KvVfsOptions, - ): Promise { - // Serialize all open operations within this instance - await this.#openMutex.acquire(); - try { - // Initialize wa-sqlite and SqliteSystem on first call - await this.#ensureInitialized(); - - if (!this.#sqlite3 || !this.#sqliteSystem) { - throw new Error("Failed to initialize SQLite"); - } - - // Register this filename with its KV options - this.#sqliteSystem.registerFile(fileName, options); - - // Open database - const db = await this.#sqlite3.open_v2( - fileName, - this.#sqlite3.SQLITE_OPEN_READWRITE | this.#sqlite3.SQLITE_OPEN_CREATE, - this.#sqliteSystem.name, - ); - - // Create cleanup callback - const sqliteSystem = this.#sqliteSystem; - const onClose = () => { - sqliteSystem.unregisterFile(fileName); - }; - - return new Database(this.#sqlite3, db, fileName, onClose); - } finally { - this.#openMutex.release(); - } - } -} - -/** - * Internal VFS implementation - */ -class SqliteSystem extends VFS.Base { - readonly name: string; - readonly #fileOptions: Map = new Map(); - readonly #openFiles: Map = new Map(); - readonly #sqlite3: SQLite3Api; - - constructor(sqlite3: SQLite3Api, name: string) { - super(); - this.#sqlite3 = sqlite3; - this.name = name; - } - - /** - * Registers the VFS with SQLite - */ - register(): void { - this.#sqlite3.vfs_register(this, false); - } - - /** - * Registers a file with its KV options (before opening) - */ - registerFile(fileName: string, options: KvVfsOptions): void { - this.#fileOptions.set(fileName, options); - } - - /** - * Unregisters a file's KV options (after closing) - */ - unregisterFile(fileName: string): void { - this.#fileOptions.delete(fileName); - } - - /** - * Gets KV options for a file, handling journal/wal files by using the main database's options - */ - #getOptionsForPath(path: string): KvVfsOptions | undefined { - let options = this.#fileOptions.get(path); - if (!options) { - // Try to find the main database file by removing common SQLite suffixes - const mainDbPath = path - .replace(/-journal$/, "") - .replace(/-wal$/, "") - .replace(/-shm$/, ""); - - if (mainDbPath !== path) { - options = this.#fileOptions.get(mainDbPath); - } - } - return options; - } - - /** - * Opens a file - */ - xOpen( - path: string | null, - fileId: number, - flags: number, - pOutFlags: DataView, - ): number { - return this.handleAsync(async () => { - if (!path) { - return VFS.SQLITE_CANTOPEN; - } - - // Get the registered KV options for this file - // For journal/wal files, use the main database's options - const options = this.#getOptionsForPath(path); - if (!options) { - throw new Error(`No KV options registered for file: ${path}`); - } - - // Get existing file size if the file exists - const metaKey = getMetaKey(path); - const sizeData = await options.get(metaKey); - - let size: number; - - if (sizeData) { - // File exists, use existing size - size = decodeFileMeta(sizeData); - } else if (flags & VFS.SQLITE_OPEN_CREATE) { - // File doesn't exist, create it - size = 0; - await options.put(metaKey, encodeFileMeta(size)); - } else { - // File doesn't exist and we're not creating it - return VFS.SQLITE_CANTOPEN; - } - - // Store open file info with options - this.#openFiles.set(fileId, { - path, - size, - flags, - options, - }); - - // Set output flags - pOutFlags.setInt32(0, flags & VFS.SQLITE_OPEN_READONLY ? 1 : 0, true); - - return VFS.SQLITE_OK; - }); - } - - /** - * Closes a file - */ - xClose(fileId: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_OK; - } - - // Delete file if SQLITE_OPEN_DELETEONCLOSE flag was set - if (file.flags & VFS.SQLITE_OPEN_DELETEONCLOSE) { - await this.#delete(file.path); - } - - this.#openFiles.delete(fileId); - return VFS.SQLITE_OK; - }); - } - - /** - * Reads data from a file - */ - // @ts-expect-error - VFS.Base types are incorrect, runtime uses Uint8Array - xRead(fileId: number, pData: Uint8Array, iOffset: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_READ; - } - - const options = file.options; - const requestedLength = pData.length; - const fileSize = file.size; - - // If offset is beyond file size, return short read with zeroed buffer - if (iOffset >= fileSize) { - pData.fill(0); - return VFS.SQLITE_IOERR_SHORT_READ; - } - - // Calculate which chunks we need to read - const startChunk = Math.floor(iOffset / CHUNK_SIZE); - const endChunk = Math.floor((iOffset + requestedLength - 1) / CHUNK_SIZE); - - // Fetch all needed chunks - const chunkKeys: Uint8Array[] = []; - for (let i = startChunk; i <= endChunk; i++) { - chunkKeys.push(getChunkKey(file.path, i)); - } - - const readStart = performance.now(); - const chunks = await options.getBatch(chunkKeys); - vfsLog("xRead", { file: file.path, offset: iOffset, len: requestedLength, chunks: chunkKeys.length, ms: (performance.now() - readStart).toFixed(2) }); - - // Copy data from chunks to output buffer - for (let i = startChunk; i <= endChunk; i++) { - const chunkData = chunks[i - startChunk]; - const chunkOffset = i * CHUNK_SIZE; - - // Calculate the range within this chunk - const readStart = Math.max(0, iOffset - chunkOffset); - const readEnd = Math.min( - CHUNK_SIZE, - iOffset + requestedLength - chunkOffset, - ); - - if (chunkData) { - // Copy available data - const sourceStart = readStart; - const sourceEnd = Math.min(readEnd, chunkData.length); - const destStart = chunkOffset + readStart - iOffset; - - if (sourceEnd > sourceStart) { - pData.set( - chunkData.slice(sourceStart, sourceEnd), - destStart, - ); - } - - // Zero-fill if chunk is smaller than expected - if (sourceEnd < readEnd) { - const zeroStart = destStart + (sourceEnd - sourceStart); - const zeroEnd = destStart + (readEnd - readStart); - pData.fill(0, zeroStart, zeroEnd); - } - } else { - // Chunk doesn't exist, zero-fill - const destStart = chunkOffset + readStart - iOffset; - const destEnd = destStart + (readEnd - readStart); - pData.fill(0, destStart, destEnd); - } - } - - // If we read less than requested (past EOF), return short read - const actualBytes = Math.min(requestedLength, fileSize - iOffset); - if (actualBytes < requestedLength) { - pData.fill(0, actualBytes); - return VFS.SQLITE_IOERR_SHORT_READ; - } - - return VFS.SQLITE_OK; - }); - } - - /** - * Writes data to a file - */ - // @ts-expect-error - VFS.Base types are incorrect, runtime uses Uint8Array - xWrite(fileId: number, pData: Uint8Array, iOffset: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_WRITE; - } - - const options = file.options; - const writeLength = pData.length; - - // Calculate which chunks we need to modify - const startChunk = Math.floor(iOffset / CHUNK_SIZE); - const endChunk = Math.floor((iOffset + writeLength - 1) / CHUNK_SIZE); - - // Fetch existing chunks that we'll need to modify - const chunkKeys: Uint8Array[] = []; - for (let i = startChunk; i <= endChunk; i++) { - chunkKeys.push(getChunkKey(file.path, i)); - } - - const getBatchStart = performance.now(); - const existingChunks = await options.getBatch(chunkKeys); - const getBatchMs = performance.now() - getBatchStart; - - // Prepare new chunk data - const entriesToWrite: [Uint8Array, Uint8Array][] = []; - - for (let i = startChunk; i <= endChunk; i++) { - const chunkOffset = i * CHUNK_SIZE; - const existingChunk = existingChunks[i - startChunk]; - - // Calculate the range within this chunk that we're writing - const writeStart = Math.max(0, iOffset - chunkOffset); - const writeEnd = Math.min( - CHUNK_SIZE, - iOffset + writeLength - chunkOffset, - ); - - // Calculate the size this chunk needs to be - const requiredSize = writeEnd; - - // Create new chunk data - let newChunk: Uint8Array; - if (existingChunk && existingChunk.length >= requiredSize) { - // Use existing chunk (copy it so we can modify) - newChunk = new Uint8Array(Math.max(existingChunk.length, requiredSize)); - newChunk.set(existingChunk); - } else if (existingChunk) { - // Need to expand existing chunk - newChunk = new Uint8Array(requiredSize); - newChunk.set(existingChunk); - } else { - // Create new chunk - newChunk = new Uint8Array(requiredSize); - } - - // Copy data from input buffer to chunk - const sourceStart = chunkOffset + writeStart - iOffset; - const sourceEnd = sourceStart + (writeEnd - writeStart); - newChunk.set(pData.slice(sourceStart, sourceEnd), writeStart); - - entriesToWrite.push([getChunkKey(file.path, i), newChunk]); - } - - // Update file size if we wrote past the end - const newSize = Math.max(file.size, iOffset + writeLength); - if (newSize !== file.size) { - file.size = newSize; - entriesToWrite.push([getMetaKey(file.path), encodeFileMeta(file.size)]); - } - - // Write all chunks and metadata - const putBatchStart = performance.now(); - await options.putBatch(entriesToWrite); - vfsLog("xWrite", { file: file.path, offset: iOffset, len: writeLength, readChunks: chunkKeys.length, writeEntries: entriesToWrite.length, getBatchMs: getBatchMs.toFixed(2), putBatchMs: (performance.now() - putBatchStart).toFixed(2) }); - - return VFS.SQLITE_OK; - }); - } - - /** - * Truncates a file - */ - xTruncate(fileId: number, size: number): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_TRUNCATE; - } - - const options = file.options; - - // If truncating to larger size, just update metadata - if (size >= file.size) { - return VFS.SQLITE_OK; - } - - // Calculate which chunks to delete - // Note: When size=0, lastChunkToKeep = floor(-1/4096) = -1, which means - // all chunks (starting from index 0) will be deleted in the loop below. - const lastChunkToKeep = Math.floor((size - 1) / CHUNK_SIZE); - const lastExistingChunk = Math.floor((file.size - 1) / CHUNK_SIZE); - - // Delete chunks beyond the new size - const keysToDelete: Uint8Array[] = []; - for (let i = lastChunkToKeep + 1; i <= lastExistingChunk; i++) { - keysToDelete.push(getChunkKey(file.path, i)); - } - - if (keysToDelete.length > 0) { - await options.deleteBatch(keysToDelete); - } - - // Truncate the last kept chunk if needed - if (size > 0 && size % CHUNK_SIZE !== 0) { - const lastChunkKey = getChunkKey(file.path, lastChunkToKeep); - const lastChunkData = await options.get(lastChunkKey); - - if (lastChunkData && lastChunkData.length > size % CHUNK_SIZE) { - const truncatedChunk = lastChunkData.slice(0, size % CHUNK_SIZE); - await options.put(lastChunkKey, truncatedChunk); - } - } - - // Update file size - file.size = size; - await options.put(getMetaKey(file.path), encodeFileMeta(file.size)); - - return VFS.SQLITE_OK; - }); - } - - /** - * Syncs file data to storage - */ - xSync(fileId: number, _flags: number): number { - return this.handleAsync(async () => { - // KV storage is immediately durable, so sync is a no-op - // But we should ensure size is persisted - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_OK; - } - - const options = file.options; - await options.put(getMetaKey(file.path), encodeFileMeta(file.size)); - return VFS.SQLITE_OK; - }); - } - - /** - * Gets the file size - */ - xFileSize(fileId: number, pSize: DataView): number { - return this.handleAsync(async () => { - const file = this.#openFiles.get(fileId); - if (!file) { - return VFS.SQLITE_IOERR_FSTAT; - } - - // Set size as 64-bit integer (low and high parts) - pSize.setBigInt64(0, BigInt(file.size), true); - return VFS.SQLITE_OK; - }); - } - - /** - * Deletes a file - */ - xDelete(path: string, _syncDir: number): number { - return this.handleAsync(async () => { - await this.#delete(path); - return VFS.SQLITE_OK; - }); - } - - /** - * Internal delete implementation - */ - async #delete(path: string): Promise { - const options = this.#getOptionsForPath(path); - if (!options) { - throw new Error(`No KV options registered for file: ${path}`); - } - - // Get file size to find out how many chunks to delete - const metaKey = getMetaKey(path); - const sizeData = await options.get(metaKey); - - if (!sizeData) { - // File doesn't exist, that's OK - return; - } - - const size = decodeFileMeta(sizeData); - - // Delete all chunks - const keysToDelete: Uint8Array[] = [metaKey]; - const numChunks = Math.ceil(size / CHUNK_SIZE); - for (let i = 0; i < numChunks; i++) { - keysToDelete.push(getChunkKey(path, i)); - } - - await options.deleteBatch(keysToDelete); - } - - /** - * Checks file accessibility - */ - xAccess(path: string, _flags: number, pResOut: DataView): number { - return this.handleAsync(async () => { - const options = this.#getOptionsForPath(path); - if (!options) { - // File not registered, doesn't exist - pResOut.setInt32(0, 0, true); - return VFS.SQLITE_OK; - } - - const metaKey = getMetaKey(path); - const metaData = await options.get(metaKey); - - // Set result: 1 if file exists, 0 otherwise - pResOut.setInt32(0, metaData ? 1 : 0, true); - return VFS.SQLITE_OK; - }); - } -} diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/mod.ts b/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/mod.ts deleted file mode 100644 index c16a04f105..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/mod.ts +++ /dev/null @@ -1 +0,0 @@ -export * from "./v1"; diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/v1.ts b/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/v1.ts deleted file mode 100644 index e091877457..0000000000 --- a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/v1.ts +++ /dev/null @@ -1,37 +0,0 @@ -import * as bare from "@bare-ts/lib" - -const config = /* @__PURE__ */ bare.Config({}) - -export type u64 = bigint - -export type FileMeta = { - readonly size: u64, -} - -export function readFileMeta(bc: bare.ByteCursor): FileMeta { - return { - size: bare.readU64(bc), - } -} - -export function writeFileMeta(bc: bare.ByteCursor, x: FileMeta): void { - bare.writeU64(bc, x.size) -} - -export function encodeFileMeta(x: FileMeta): Uint8Array { - const bc = new bare.ByteCursor( - new Uint8Array(config.initialBufferLength), - config - ) - writeFileMeta(bc, x) - return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) -} - -export function decodeFileMeta(bytes: Uint8Array): FileMeta { - const bc = new bare.ByteCursor(bytes, config) - const result = readFileMeta(bc) - if (bc.offset < bc.view.byteLength) { - throw new bare.BareError(bc.offset, "remaining bytes") - } - return result -} diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts index dd11ec6bef..771aadcba3 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/engine/actor-driver.ts @@ -33,7 +33,6 @@ import type { RivetMessageEvent, UniversalWebSocket, } from "@/common/websocket-interface"; -import { SqliteVfs } from "@/db/vfs/mod"; import { type ActorDriver, type AnyActorInstance, @@ -83,9 +82,6 @@ export class EngineActorDriver implements ActorDriver { #actors: Map = new Map(); #actorRouter: ActorRouter; - /** SQLite VFS instance for creating KV-backed databases */ - readonly sqliteVfs = new SqliteVfs(); - #runnerStarted: PromiseWithResolvers = promiseWithResolvers(); #runnerStopped: PromiseWithResolvers = promiseWithResolvers(); #isRunnerStopped: boolean = false; diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts index bbfe8a5ccf..094777b0d2 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/actor.ts @@ -1,6 +1,5 @@ import type { AnyClient } from "@/client/client"; import type { RawDatabaseClient } from "@/db/config"; -import type { SqliteVfs } from "@/db/vfs/mod"; import type { ActorDriver, AnyActorInstance, @@ -195,11 +194,6 @@ export class FileSystemActorDriver implements ActorDriver { } } - /** SQLite VFS instance for creating KV-backed databases */ - get sqliteVfs(): SqliteVfs { - return this.#state.sqliteVfs; - } - startSleep(actorId: string): void { // Spawns the sleepActor promise this.#state.sleepActor(actorId); diff --git a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts index 3598e89eea..197b44242d 100644 --- a/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts +++ b/rivetkit-typescript/packages/rivetkit/src/drivers/file-system/global-state.ts @@ -5,7 +5,6 @@ import type { AnyActorInstance } from "@/actor/instance/mod"; import type { ActorKey } from "@/actor/mod"; import type { AnyClient } from "@/client/client"; import { type ActorDriver, getInitialActorKvState } from "@/driver-helpers/mod"; -import { SqliteVfs } from "@/db/vfs/mod"; import type { RegistryConfig } from "@/registry/config"; import type { RunnerConfig } from "@/registry/run-config"; import type * as schema from "@/schemas/file-system-driver/mod"; @@ -94,9 +93,6 @@ export class FileSystemGlobalState { #persist: boolean; #useNativeSqlite: boolean; - /** SQLite VFS instance for this driver. */ - readonly sqliteVfs = new SqliteVfs(); - // IMPORTANT: Never delete from this map. Doing so will result in race // conditions since the actor generation will cease to be tracked // correctly. Always increment generation if a new actor is created. diff --git a/rivetkit-typescript/packages/rivetkit/tsconfig.json b/rivetkit-typescript/packages/rivetkit/tsconfig.json index c4fe95879e..c75c8546d6 100644 --- a/rivetkit-typescript/packages/rivetkit/tsconfig.json +++ b/rivetkit-typescript/packages/rivetkit/tsconfig.json @@ -9,6 +9,7 @@ "@rivetkit/traces": ["../traces/src/index.ts"], "@rivetkit/traces/encoding": ["../traces/src/encoding.ts"], "@rivetkit/traces/otlp": ["../traces/src/otlp-entry.ts"], + "@rivetkit/sqlite-vfs": ["../sqlite-vfs/src/index.ts"], // Used for test fixtures "rivetkit": ["./src/mod.ts"], "rivetkit/utils": ["./src/utils.ts"], diff --git a/rivetkit-typescript/packages/sqlite-vfs/package.json b/rivetkit-typescript/packages/sqlite-vfs/package.json new file mode 100644 index 0000000000..58c2baa3ed --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/package.json @@ -0,0 +1,48 @@ +{ + "name": "@rivetkit/sqlite-vfs", + "version": "0.0.1", + "description": "SQLite VFS backed by KV storage for RivetKit", + "license": "Apache-2.0", + "type": "module", + "files": [ + "dist", + "src", + "schemas", + "package.json" + ], + "exports": { + ".": { + "import": { + "types": "./dist/tsup/index.d.ts", + "default": "./dist/tsup/index.js" + }, + "require": { + "types": "./dist/tsup/index.d.cts", + "default": "./dist/tsup/index.cjs" + } + } + }, + "engines": { + "node": ">=18.0.0" + }, + "scripts": { + "build": "pnpm run compile:bare && tsup src/index.ts", + "compile:bare": "tsx scripts/compile-bare.ts compile schemas/file-meta/v1.bare -o dist/schemas/file-meta/v1.ts", + "check-types": "pnpm run compile:bare && tsc --noEmit", + "test": "pnpm run compile:bare && vitest run" + }, + "dependencies": { + "@rivetkit/bare-ts": "^0.6.2", + "vbare": "^0.0.4", + "wa-sqlite": "^1.0.0" + }, + "devDependencies": { + "@bare-ts/tools": "^0.13.0", + "@types/node": "^22.13.1", + "commander": "^12.0.0", + "tsx": "^4.7.0", + "tsup": "^8.4.0", + "typescript": "^5.7.3", + "vitest": "^3.1.1" + } +} diff --git a/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/mod.ts b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/mod.ts new file mode 100644 index 0000000000..cc5ca385c7 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/mod.ts @@ -0,0 +1,2 @@ +export * from "./versioned"; +export * from "../../dist/schemas/file-meta/v1"; diff --git a/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/v1.bare b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/v1.bare new file mode 100644 index 0000000000..4836c09e89 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/v1.bare @@ -0,0 +1,7 @@ +# File Meta BARE Schema v1 + +# File metadata for KV-backed SQLite storage + +type FileMeta struct { + size: u64 +} diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/versioned.ts b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/versioned.ts similarity index 91% rename from rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/versioned.ts rename to rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/versioned.ts index 870d999e8e..4565390c1f 100644 --- a/rivetkit-typescript/packages/rivetkit/src/db/vfs/schemas/file-meta/versioned.ts +++ b/rivetkit-typescript/packages/sqlite-vfs/schemas/file-meta/versioned.ts @@ -1,5 +1,5 @@ import { createVersionedDataHandler } from "vbare"; -import * as v1 from "./v1"; +import * as v1 from "../../dist/schemas/file-meta/v1"; export const CURRENT_VERSION = 1; diff --git a/rivetkit-typescript/packages/sqlite-vfs/scripts/compile-bare.ts b/rivetkit-typescript/packages/sqlite-vfs/scripts/compile-bare.ts new file mode 100644 index 0000000000..f14de109ce --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/scripts/compile-bare.ts @@ -0,0 +1,115 @@ +#!/usr/bin/env -S tsx + +/** + * BARE schema compiler for TypeScript + * + * This script compiles .bare schema files to TypeScript using @bare-ts/tools, + * then post-processes the output to: + * 1. Replace @bare-ts/lib import with @rivetkit/bare-ts + * 2. Replace Node.js assert import with a custom assert function + */ + +import * as fs from "node:fs/promises"; +import * as path from "node:path"; +import { type Config, transform } from "@bare-ts/tools"; +import { Command } from "commander"; + +const program = new Command(); + +program + .name("bare-compiler") + .description("Compile BARE schemas to TypeScript") + .version("0.0.1"); + +program + .command("compile") + .description("Compile a BARE schema file") + .argument("", "Input BARE schema file") + .option("-o, --output ", "Output file path") + .option("--pedantic", "Enable pedantic mode", false) + .option("--generator ", "Generator type (ts, js, dts, bare)", "ts") + .action(async (input: string, options) => { + try { + const schemaPath = path.resolve(input); + const outputPath = options.output + ? path.resolve(options.output) + : schemaPath.replace(/\.bare$/, ".ts"); + + await compileSchema({ + schemaPath, + outputPath, + config: { + pedantic: options.pedantic, + generator: options.generator, + }, + }); + + console.log(`Successfully compiled ${input} to ${outputPath}`); + } catch (error) { + console.error("Failed to compile schema:", error); + process.exit(1); + } + }); + +program.parse(); + +export interface CompileOptions { + schemaPath: string; + outputPath: string; + config?: Partial; +} + +export async function compileSchema(options: CompileOptions): Promise { + const { schemaPath, outputPath, config = {} } = options; + + const schema = await fs.readFile(schemaPath, "utf-8"); + const outputDir = path.dirname(outputPath); + + await fs.mkdir(outputDir, { recursive: true }); + + const defaultConfig: Partial = { + pedantic: true, + generator: "ts", + ...config, + }; + + let result = transform(schema, defaultConfig); + + result = postProcess(result); + + await fs.writeFile(outputPath, result); +} + +const POST_PROCESS_MARKER = "// @generated - post-processed by compile-bare.ts\n"; + +const ASSERT_FUNCTION = ` +function assert(condition: boolean, message?: string): asserts condition { + if (!condition) throw new Error(message ?? "Assertion failed") +} +`; + +/** + * Post-process the generated TypeScript file to: + * 1. Replace @bare-ts/lib import with @rivetkit/bare-ts + * 2. Replace Node.js assert import with a custom assert function + */ +function postProcess(code: string): string { + if (code.startsWith(POST_PROCESS_MARKER)) { + return code; + } + + code = code.replace(/@bare-ts\/lib/g, "@rivetkit/bare-ts"); + code = code.replace(/^import assert from "assert"/m, ""); + code = POST_PROCESS_MARKER + code + `\n${ASSERT_FUNCTION}`; + + if (code.includes("@bare-ts/lib")) { + throw new Error("Failed to replace @bare-ts/lib import"); + } + if (code.includes("import assert from")) { + throw new Error("Failed to remove Node.js assert import"); + } + + return code; +} + +export type { Config } from "@bare-ts/tools"; diff --git a/rivetkit-typescript/packages/sqlite-vfs/src/index.ts b/rivetkit-typescript/packages/sqlite-vfs/src/index.ts new file mode 100644 index 0000000000..5180ce6149 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/src/index.ts @@ -0,0 +1,651 @@ +/** + * SQLite raw database with KV storage backend + * + * This module provides a SQLite API that uses a KV-backed VFS + * for storage. Each SqliteVfs instance is independent and can be + * used concurrently with other instances. + */ + +// Note: wa-sqlite VFS.Base type definitions have incorrect types for xRead/xWrite +// The actual runtime uses Uint8Array, not the {size, value} object shown in types +import * as VFS from "wa-sqlite/src/VFS.js"; + +// VFS debug logging - set VFS_DEBUG=1 to enable +const VFS_DEBUG = process.env.VFS_DEBUG === "1"; +function vfsLog(op: string, details: Record) { + if (VFS_DEBUG) { + console.log(`[VFS] ${op}`, JSON.stringify(details)); + } +} +import SQLiteESMFactory from "wa-sqlite/dist/wa-sqlite-async.mjs"; +import { Factory } from "wa-sqlite"; +import { readFileSync } from "node:fs"; +import { createRequire } from "node:module"; +import { CHUNK_SIZE, getMetaKey, getChunkKey } from "./kv"; +import { + FILE_META_VERSIONED, + CURRENT_VERSION, +} from "../schemas/file-meta/versioned.js"; +import type { FileMeta } from "../schemas/file-meta/mod.js"; + +/** + * Options for creating the KV VFS + * Operations are scoped to a specific actor's KV store + */ +export interface KvVfsOptions { + /** Get a single value by key. Returns null if missing. */ + get: (key: Uint8Array) => Promise; + /** Get multiple values by keys. Returns null for missing keys. */ + getBatch: (keys: Uint8Array[]) => Promise<(Uint8Array | null)[]>; + /** Put a single key-value pair */ + put: (key: Uint8Array, value: Uint8Array) => Promise; + /** Put multiple key-value pairs */ + putBatch: (entries: [Uint8Array, Uint8Array][]) => Promise; + /** Delete multiple keys */ + deleteBatch: (keys: Uint8Array[]) => Promise; +} + +/** + * Represents an open file + */ +interface OpenFile { + /** File path */ + path: string; + /** File size in bytes */ + size: number; + /** Open flags */ + flags: number; + /** KV options for this file */ + options: KvVfsOptions; +} + +/** + * Encodes file metadata to a Uint8Array using BARE schema + */ +function encodeFileMeta(size: number): Uint8Array { + const meta: FileMeta = { size: BigInt(size) }; + return FILE_META_VERSIONED.serializeWithEmbeddedVersion( + meta, + CURRENT_VERSION, + ); +} + +/** + * Decodes file metadata from a Uint8Array using BARE schema + */ +function decodeFileMeta(data: Uint8Array): number { + const meta = FILE_META_VERSIONED.deserializeWithEmbeddedVersion(data); + return Number(meta.size); +} + +/** + * SQLite API interface (subset needed for VFS registration) + * This is part of wa-sqlite but not exported in TypeScript types + */ +interface SQLite3Api { + vfs_register: (vfs: unknown, makeDefault?: boolean) => number; + open_v2: ( + filename: string, + flags: number, + vfsName?: string, + ) => Promise; + close: (db: number) => Promise; + exec: ( + db: number, + sql: string, + callback?: (row: unknown[], columns: string[]) => void, + ) => Promise; + SQLITE_OPEN_READWRITE: number; + SQLITE_OPEN_CREATE: number; +} + +/** + * Simple async mutex for serializing database operations + * wa-sqlite is not safe for concurrent open_v2 calls + */ +class AsyncMutex { + #locked = false; + #waiting: (() => void)[] = []; + + async acquire(): Promise { + while (this.#locked) { + await new Promise((resolve) => this.#waiting.push(resolve)); + } + this.#locked = true; + } + + release(): void { + this.#locked = false; + const next = this.#waiting.shift(); + if (next) { + next(); + } + } + + async run(fn: () => Promise): Promise { + await this.acquire(); + try { + return await fn(); + } finally { + this.release(); + } + } +} + +/** + * Database wrapper that provides a simplified SQLite API + */ +export class Database { + readonly #sqlite3: SQLite3Api; + readonly #handle: number; + readonly #fileName: string; + readonly #onClose: () => void; + readonly #mutex: AsyncMutex; + + constructor( + sqlite3: SQLite3Api, + handle: number, + fileName: string, + onClose: () => void, + mutex: AsyncMutex, + ) { + this.#sqlite3 = sqlite3; + this.#handle = handle; + this.#fileName = fileName; + this.#onClose = onClose; + this.#mutex = mutex; + } + + /** + * Execute SQL with optional row callback + * @param sql - SQL statement to execute + * @param callback - Called for each result row with (row, columns) where row is an array of values and columns is an array of column names + */ + async exec(sql: string, callback?: (row: unknown[], columns: string[]) => void): Promise { + return this.#mutex.run(async () => { + return this.#sqlite3.exec(this.#handle, sql, callback); + }); + } + + /** + * Close the database + */ + async close(): Promise { + await this.#mutex.run(async () => { + await this.#sqlite3.close(this.#handle); + }); + this.#onClose(); + } + + /** + * Get the raw wa-sqlite API (for advanced usage) + */ + get sqlite3(): SQLite3Api { + return this.#sqlite3; + } + + /** + * Get the raw database handle (for advanced usage) + */ + get handle(): number { + return this.#handle; + } +} + +/** + * SQLite VFS backed by KV storage. + * + * Each instance is independent and has its own wa-sqlite WASM module. + * This allows multiple instances to operate concurrently without interference. + */ +export class SqliteVfs { + #sqlite3: SQLite3Api | null = null; + #sqliteSystem: SqliteSystem | null = null; + #initPromise: Promise | null = null; + #operationMutex = new AsyncMutex(); + #instanceId: string; + + constructor() { + // Generate unique instance ID for VFS name + this.#instanceId = crypto.randomUUID().replace(/-/g, "").slice(0, 8); + } + + /** + * Initialize wa-sqlite and VFS (called once per instance) + */ + async #ensureInitialized(): Promise { + // Fast path: already initialized + if (this.#sqlite3 && this.#sqliteSystem) { + return; + } + + // Synchronously create the promise if not started + if (!this.#initPromise) { + this.#initPromise = (async () => { + // Load WASM binary (Node.js environment) + const require = createRequire(import.meta.url); + const wasmPath = require.resolve("wa-sqlite/dist/wa-sqlite-async.wasm"); + const wasmBinary = readFileSync(wasmPath); + + // Initialize wa-sqlite module - each instance gets its own module + const module = await SQLiteESMFactory({ wasmBinary }); + this.#sqlite3 = Factory(module) as SQLite3Api; + + // Create and register VFS with unique name + this.#sqliteSystem = new SqliteSystem(this.#sqlite3, `kv-vfs-${this.#instanceId}`); + this.#sqliteSystem.register(); + })(); + } + + // Wait for initialization + await this.#initPromise; + } + + /** + * Open a SQLite database using KV storage backend + * + * @param fileName - The database file name (typically the actor ID) + * @param options - KV storage operations for this database + * @returns A Database instance + */ + async open( + fileName: string, + options: KvVfsOptions, + ): Promise { + return this.#operationMutex.run(async () => { + // Initialize wa-sqlite and SqliteSystem on first call + await this.#ensureInitialized(); + + if (!this.#sqlite3 || !this.#sqliteSystem) { + throw new Error("SQLite not initialized"); + } + + // Register the file with its KV options + this.#sqliteSystem.registerFile(fileName, options); + + const db = await this.#sqlite3.open_v2( + fileName, + this.#sqlite3.SQLITE_OPEN_READWRITE | + this.#sqlite3.SQLITE_OPEN_CREATE, + this.#sqliteSystem.name, + ); + + const sqliteSystem = this.#sqliteSystem; + const onClose = () => { + sqliteSystem.unregisterFile(fileName); + }; + + return new Database( + this.#sqlite3, + db, + fileName, + onClose, + this.#operationMutex, + ); + }); + } +} + +/** + * KV-backed VFS implementation for wa-sqlite + */ +class SqliteSystem extends (VFS.Base as typeof VFS.Base) { + readonly #sqlite3: SQLite3Api; + readonly #name: string; + readonly #openFiles: Map = new Map(); + readonly #fileOptions: Map = new Map(); + #nextFileId = 1; + + constructor(sqlite3: SQLite3Api, name: string) { + super(); + this.#sqlite3 = sqlite3; + this.#name = name; + } + + get name(): string { + return this.#name; + } + + register(): void { + this.#sqlite3.vfs_register(this, false); + } + + registerFile(fileName: string, options: KvVfsOptions): void { + this.#fileOptions.set(fileName, options); + } + + unregisterFile(fileName: string): void { + this.#fileOptions.delete(fileName); + } + + #getOptionsForPath(path: string): KvVfsOptions | undefined { + const direct = this.#fileOptions.get(path); + if (direct) { + return direct; + } + + const suffixes = ["-journal", "-wal", "-shm"]; + for (const suffix of suffixes) { + if (path.endsWith(suffix)) { + const basePath = path.slice(0, -suffix.length); + const baseOptions = this.#fileOptions.get(basePath); + if (baseOptions) { + return baseOptions; + } + } + } + + return undefined; + } + + // @ts-expect-error - wa-sqlite types are incorrect + xOpen( + name: string, + fileId: number, + flags: number, + pOutFlags: DataView, + ): number { + return this.handleAsync(async () => { + try { + const resolvedName = name && name.length > 0 ? name : `temp-${fileId}`; + let options = this.#getOptionsForPath(resolvedName); + if (!options && this.#fileOptions.size === 1) { + options = this.#fileOptions.values().next().value; + } + if (!options) { + throw new Error(`File not registered: ${resolvedName}`); + } + + if (VFS_DEBUG) { + vfsLog("xOpen", { file: resolvedName, flags }); + } + + const key = getMetaKey(resolvedName); + const metaData = await options.get(key); + let size = 0; + + if (metaData) { + size = decodeFileMeta(metaData); + } + + const file: OpenFile = { + path: resolvedName, + size, + flags, + options, + }; + + this.#openFiles.set(fileId, file); + pOutFlags.setInt32(0, flags, true); + return VFS.SQLITE_OK; + } catch (error) { + vfsLog("xOpen", { + file: name, + error: String(error), + }); + return VFS.SQLITE_CANTOPEN; + } + }); + } + + // @ts-expect-error - wa-sqlite types are incorrect + xClose(fileId: number): number { + this.#openFiles.delete(fileId); + return VFS.SQLITE_OK; + } + + // @ts-expect-error - wa-sqlite types are incorrect + xRead( + fileId: number, + pData: Uint8Array, + iOffset: number, + iAmt: number, + ): number { + return this.handleAsync(async () => { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR; + } + + const offsetRaw = + typeof iOffset === "bigint" ? Number(iOffset) : iOffset; + const amountRaw = typeof iAmt === "bigint" ? Number(iAmt) : iAmt; + const offset = Number.isFinite(offsetRaw) ? offsetRaw : 0; + const amount = Number.isFinite(amountRaw) ? amountRaw : pData.length; + const readStart = performance.now(); + const chunkKeys: Uint8Array[] = []; + const startChunk = Math.floor(offset / CHUNK_SIZE); + const endChunk = Math.floor((offset + amount - 1) / CHUNK_SIZE); + + for (let chunkIndex = startChunk; chunkIndex <= endChunk; chunkIndex++) { + chunkKeys.push(getChunkKey(file.path, chunkIndex)); + } + + const chunks = await file.options.getBatch(chunkKeys); + + // Copy the requested data from the chunks into the buffer + let bytesCopied = 0; + for (let i = 0; i < chunks.length; i++) { + const chunkIndex = startChunk + i; + const chunkData = chunks[i]; + + const chunkStartOffset = chunkIndex * CHUNK_SIZE; + const readStartOffset = Math.max(offset - chunkStartOffset, 0); + const readEndOffset = Math.min( + CHUNK_SIZE, + offset + amount - chunkStartOffset, + ); + + const readLength = readEndOffset - readStartOffset; + if (readLength <= 0) { + continue; + } + + if (chunkData) { + pData.set( + chunkData.subarray(readStartOffset, readEndOffset), + bytesCopied, + ); + } else { + // If chunk missing, fill with zeros + pData.fill(0, bytesCopied, bytesCopied + readLength); + } + + bytesCopied += readLength; + } + + if (VFS_DEBUG) { + vfsLog("xRead", { + file: file.path, + offset, + len: amount, + chunks: chunkKeys.length, + ms: (performance.now() - readStart).toFixed(2), + }); + } + + if (offset + amount > file.size) { + return VFS.SQLITE_IOERR_SHORT_READ; + } + + return VFS.SQLITE_OK; + }); + } + + // @ts-expect-error - wa-sqlite types are incorrect + xWrite( + fileId: number, + pData: Uint8Array, + iOffset: number, + iAmt: number, + ): number { + return this.handleAsync(async () => { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR; + } + + const offsetRaw = + typeof iOffset === "bigint" ? Number(iOffset) : iOffset; + const amountRaw = typeof iAmt === "bigint" ? Number(iAmt) : iAmt; + const offset = Number.isFinite(offsetRaw) ? offsetRaw : 0; + const amount = Number.isFinite(amountRaw) ? amountRaw : pData.length; + const writeStart = performance.now(); + const chunkKeys: Uint8Array[] = []; + const startChunk = Math.floor(offset / CHUNK_SIZE); + const endChunk = Math.floor((offset + amount - 1) / CHUNK_SIZE); + + for (let chunkIndex = startChunk; chunkIndex <= endChunk; chunkIndex++) { + chunkKeys.push(getChunkKey(file.path, chunkIndex)); + } + + const getBatchStart = performance.now(); + const chunks = await file.options.getBatch(chunkKeys); + const getBatchMs = performance.now() - getBatchStart; + + const entriesToWrite: [Uint8Array, Uint8Array][] = []; + + // Update each chunk with the new data + let bytesWritten = 0; + for (let i = 0; i < chunks.length; i++) { + const chunkIndex = startChunk + i; + const chunkData = chunks[i]; + + const chunkStartOffset = chunkIndex * CHUNK_SIZE; + const writeStartOffset = Math.max(offset - chunkStartOffset, 0); + const writeEndOffset = Math.min( + CHUNK_SIZE, + offset + amount - chunkStartOffset, + ); + + const writeLength = writeEndOffset - writeStartOffset; + if (writeLength <= 0) { + continue; + } + + // Create or clone the chunk data + const newChunkData = chunkData + ? new Uint8Array(chunkData) + : new Uint8Array(CHUNK_SIZE); + + // Copy data into the chunk + newChunkData.set( + pData.subarray(bytesWritten, bytesWritten + writeLength), + writeStartOffset, + ); + + entriesToWrite.push([chunkKeys[i], newChunkData]); + bytesWritten += writeLength; + } + + const putBatchStart = performance.now(); + await file.options.putBatch(entriesToWrite); + const putBatchMs = performance.now() - putBatchStart; + + // Update file size if needed + const newSize = Math.max(file.size, offset + amount); + if (newSize !== file.size) { + file.size = newSize; + const metaKey = getMetaKey(file.path); + const metaData = encodeFileMeta(newSize); + await file.options.put(metaKey, metaData); + } + + if (VFS_DEBUG) { + vfsLog("xWrite", { + file: file.path, + offset, + len: amount, + readChunks: chunkKeys.length, + writeEntries: entriesToWrite.length, + getBatchMs: getBatchMs.toFixed(2), + putBatchMs: putBatchMs.toFixed(2), + ms: (performance.now() - writeStart).toFixed(2), + }); + } + + return VFS.SQLITE_OK; + }); + } + + // @ts-expect-error - wa-sqlite types are incorrect + xTruncate(fileId: number, size: number): number { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR; + } + + const nextSize = typeof size === "bigint" ? Number(size) : size; + file.size = Number.isFinite(nextSize) ? nextSize : 0; + return VFS.SQLITE_OK; + } + + // @ts-expect-error - wa-sqlite types are incorrect + xSync(fileId: number): number { + return this.handleAsync(async () => { + const file = this.#openFiles.get(fileId); + if (!file) { + return VFS.SQLITE_IOERR; + } + + // Update metadata + const metaKey = getMetaKey(file.path); + const metaData = encodeFileMeta(file.size); + await file.options.put(metaKey, metaData); + + return VFS.SQLITE_OK; + }); + } + + // @ts-expect-error - wa-sqlite types are incorrect + xFileSize(fileId: number, pSize64: DataView): number { + const file = this.#openFiles.get(fileId); + if (!file) { + pSize64.setBigInt64(0, BigInt(0), true); + return VFS.SQLITE_OK; + } + + pSize64.setBigInt64(0, BigInt(file.size), true); + return VFS.SQLITE_OK; + } + + // @ts-expect-error - wa-sqlite types are incorrect + xDelete(name: string, _syncDir: number): number { + // In a KV store, we can't easily delete all chunks without scanning + // For now, we'll just remove the metadata + const options = this.#getOptionsForPath(name); + if (options) { + const metaKey = getMetaKey(name); + void options.deleteBatch([metaKey]); + } + return VFS.SQLITE_OK; + } + + // @ts-expect-error - wa-sqlite types are incorrect + xAccess( + name: string, + _flags: number, + pResOut: DataView, + ): number { + return this.handleAsync(async () => { + let options = this.#getOptionsForPath(name); + if (!options && this.#fileOptions.size === 1) { + options = this.#fileOptions.values().next().value; + } + if (!options) { + pResOut.setInt32(0, 0, true); + return VFS.SQLITE_OK; + } + + if (_flags === VFS.SQLITE_ACCESS_EXISTS) { + const metaKey = getMetaKey(name); + const metaData = await options.get(metaKey); + pResOut.setInt32(0, metaData ? 1 : 0, true); + return VFS.SQLITE_OK; + } + + pResOut.setInt32(0, 1, true); + return VFS.SQLITE_OK; + }); + } +} diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/kv.ts b/rivetkit-typescript/packages/sqlite-vfs/src/kv.ts similarity index 100% rename from rivetkit-typescript/packages/rivetkit/src/db/vfs/kv.ts rename to rivetkit-typescript/packages/sqlite-vfs/src/kv.ts diff --git a/rivetkit-typescript/packages/rivetkit/src/db/vfs/wa-sqlite.d.ts b/rivetkit-typescript/packages/sqlite-vfs/src/wa-sqlite.d.ts similarity index 100% rename from rivetkit-typescript/packages/rivetkit/src/db/vfs/wa-sqlite.d.ts rename to rivetkit-typescript/packages/sqlite-vfs/src/wa-sqlite.d.ts diff --git a/rivetkit-typescript/packages/sqlite-vfs/tests/sqlite-vfs.test.ts b/rivetkit-typescript/packages/sqlite-vfs/tests/sqlite-vfs.test.ts new file mode 100644 index 0000000000..a07a7029a2 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/tests/sqlite-vfs.test.ts @@ -0,0 +1,66 @@ +import { describe, expect, it } from "vitest"; +import { SqliteVfs, type KvVfsOptions } from "../src/index"; + +function keyToString(key: Uint8Array): string { + return Buffer.from(key).toString("hex"); +} + +function createKvStore(): KvVfsOptions { + const store = new Map(); + + return { + get: async (key) => { + const value = store.get(keyToString(key)); + return value ? new Uint8Array(value) : null; + }, + getBatch: async (keys) => { + return keys.map((key) => { + const value = store.get(keyToString(key)); + return value ? new Uint8Array(value) : null; + }); + }, + put: async (key, value) => { + store.set(keyToString(key), new Uint8Array(value)); + }, + putBatch: async (entries) => { + for (const [key, value] of entries) { + store.set(keyToString(key), new Uint8Array(value)); + } + }, + deleteBatch: async (keys) => { + for (const key of keys) { + store.delete(keyToString(key)); + } + }, + }; +} + +describe("sqlite-vfs", () => { + it("persists data across VFS instances", async () => { + const kvStore = createKvStore(); + + const vfs = new SqliteVfs(); + const db = await vfs.open("actor-1", kvStore); + await db.exec( + "CREATE TABLE IF NOT EXISTS test_data (id INTEGER PRIMARY KEY AUTOINCREMENT, value TEXT NOT NULL)", + ); + await db.exec("INSERT INTO test_data (value) VALUES ('alpha')"); + await db.exec("INSERT INTO test_data (value) VALUES ('beta')"); + await db.close(); + + const vfsReloaded = new SqliteVfs(); + const dbReloaded = await vfsReloaded.open("actor-1", kvStore); + + const values: string[] = []; + await dbReloaded.exec( + "SELECT value FROM test_data ORDER BY id", + (row) => { + values.push(String(row[0])); + }, + ); + + expect(values).toEqual(["alpha", "beta"]); + + await dbReloaded.close(); + }); +}); diff --git a/rivetkit-typescript/packages/sqlite-vfs/tsconfig.json b/rivetkit-typescript/packages/sqlite-vfs/tsconfig.json new file mode 100644 index 0000000000..2767f744b3 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "paths": { + "@/*": ["./src/*"] + } + }, + "include": ["src/**/*", "schemas/**/*", "dist/schemas/**/*", "tests/**/*"] +} diff --git a/rivetkit-typescript/packages/sqlite-vfs/tsup.config.ts b/rivetkit-typescript/packages/sqlite-vfs/tsup.config.ts new file mode 100644 index 0000000000..d8652c0151 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/tsup.config.ts @@ -0,0 +1,7 @@ +import { defineConfig } from "tsup"; +import defaultConfig from "../../../tsup.base.ts"; + +export default defineConfig({ + ...defaultConfig, + outDir: "dist/tsup/", +}); diff --git a/rivetkit-typescript/packages/sqlite-vfs/turbo.json b/rivetkit-typescript/packages/sqlite-vfs/turbo.json new file mode 100644 index 0000000000..29d4cb2625 --- /dev/null +++ b/rivetkit-typescript/packages/sqlite-vfs/turbo.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://turbo.build/schema.json", + "extends": ["//"] +}