diff --git a/src/remote/remote-controller.ts b/src/remote/remote-controller.ts index b3bc96c..a5cd46d 100644 --- a/src/remote/remote-controller.ts +++ b/src/remote/remote-controller.ts @@ -41,7 +41,7 @@ export class RemoteController { } else if (request.method === "POST") { return await this.onFullSync(request); } else if (request.method === "GET") { - return this.getStatus(); + return await this.getStatus(); } } else if ( url.pathname === "/postgres/reset" && request.method === "POST" @@ -62,21 +62,24 @@ export class RemoteController { remote.on("restoreLog", this.makeLoggingHandler("pg_restore").bind(this)); } - private getStatus(): Response { + private async getStatus(): Promise { if (!this.syncResponse || this.syncStatus !== SyncStatus.COMPLETED) { return Response.json({ status: this.syncStatus }); } const { schema, meta } = this.syncResponse; - const queries = this.remote.optimizer.getQueries(); - this.remote.pollQueriesOnce().catch((error) => { - log.error("Failed to poll queries", "remote-controller"); - console.error(error); - }); + const { queries, diffs } = await this.remote.getStatus(); + let deltas: DeltasResult; + if (diffs.status === "fulfilled") { + deltas = { type: "ok", value: diffs.value }; + } else { + deltas = { type: "error", value: String(diffs.reason) }; + } return Response.json({ status: this.syncStatus, meta, schema, queries: { type: "ok", value: queries }, + deltas, }); } @@ -179,3 +182,14 @@ export class RemoteController { ); } } + +type DeltasResult = { + type: "ok"; + // the type of this is not super important + // currently the frontend only cares whether + // or not this array is empty + value: unknown[]; +} | { + type: "error"; + value: string; +}; diff --git a/src/remote/remote.test.ts b/src/remote/remote.test.ts index 3f9d56a..f818255 100644 --- a/src/remote/remote.test.ts +++ b/src/remote/remote.test.ts @@ -6,6 +6,7 @@ import { assertEquals } from "@std/assert/equals"; import { ConnectionManager } from "../sync/connection-manager.ts"; import { assertArrayIncludes } from "@std/assert"; import { PgIdentifier } from "@query-doctor/core"; +import { type Op } from "jsondiffpatch/formatters/jsonpatch"; const TEST_TARGET_CONTAINER_NAME = "postgres:17"; const TEST_TARGET_CONTAINER_TIMESCALEDB_NAME = @@ -423,3 +424,95 @@ Deno.test({ } }, }); + +Deno.test({ + name: "schema loader detects changes after database modification", + sanitizeOps: false, + sanitizeResources: false, + fn: async () => { + const [sourceDb, targetDb] = await Promise.all([ + new PostgreSqlContainer("postgres:17") + .withCopyContentToContainer([ + { + content: ` + create extension pg_stat_statements; + create table testing(a int, b text); + insert into testing values (1, 'test'); + create index "testing_b_idx" on testing(b); + select * from testing where a = 1; + `, + target: "/docker-entrypoint-initdb.d/init.sql", + }, + ]) + .withCommand(["-c", "shared_preload_libraries=pg_stat_statements"]) + .start(), + testSpawnTarget(), + ]); + + try { + const target = Connectable.fromString(targetDb.getConnectionUri()); + const source = Connectable.fromString(sourceDb.getConnectionUri()); + + const manager = ConnectionManager.forLocalDatabase(); + await using remote = new Remote(target, manager); + + const sourcePg = postgres(source.toString()); + + await remote.syncFrom(source); + await remote.optimizer.finish; + + const initialStatus = await remote.getStatus(); + const initialDiffsResult = initialStatus.diffs; + assertEquals( + initialDiffsResult.status, + "fulfilled", + "Schema poll should succeed", + ); + const initialDiffs = initialDiffsResult.status === "fulfilled" + ? initialDiffsResult.value + : []; + assertEquals( + initialDiffs.length, + 0, + "Should have no diffs initially after sync", + ); + + await sourcePg.unsafe(` + alter table testing add column c int; + create index "testing_c_idx" on testing(c); + `); + + const statusAfterChange = await remote.getStatus(); + const diffsResult = statusAfterChange.diffs; + + assertEquals( + diffsResult.status, + "fulfilled", + "Schema poll should succeed", + ); + const diffs = diffsResult.status === "fulfilled" + ? diffsResult.value + : []; + + assertEquals( + diffs.length, + 2, + "Should detect 2 schema changes (added column and index)", + ); + + const addedColumnDiff = diffs.find((diff: Op) => + typeof diff.path === "string" && diff.path.includes("columns") + ); + assertEquals(addedColumnDiff?.op, "add", "Should detect column addition"); + + const addedIndexDiff = diffs.find((diff: Op) => + typeof diff.path === "string" && diff.path.includes("indexes") + ); + assertEquals(addedIndexDiff?.op, "add", "Should detect index addition"); + + await sourcePg.end(); + } finally { + await Promise.all([sourceDb.stop(), targetDb.stop()]); + } + }, +}); diff --git a/src/remote/remote.ts b/src/remote/remote.ts index ba860e3..71fa536 100644 --- a/src/remote/remote.ts +++ b/src/remote/remote.ts @@ -9,12 +9,14 @@ import { type Connectable } from "../sync/connectable.ts"; import { DumpCommand, RestoreCommand } from "../sync/schema-link.ts"; import { ConnectionManager } from "../sync/connection-manager.ts"; import { type RecentQuery } from "../sql/recent-query.ts"; -import { type FullSchema, SchemaDiffer } from "../sync/schema_differ.ts"; +import { type Op } from "jsondiffpatch/formatters/jsonpatch"; +import { type FullSchema } from "../sync/schema_differ.ts"; import { type RemoteSyncFullSchemaResponse } from "./remote.dto.ts"; import { QueryOptimizer } from "./query-optimizer.ts"; import { EventEmitter } from "node:events"; import { log } from "../log.ts"; import { QueryLoader } from "./query-loader.ts"; +import { SchemaLoader } from "./schema-loader.ts"; type RemoteEvents = { dumpLog: [line: string]; @@ -40,7 +42,6 @@ export class Remote extends EventEmitter { */ private static readonly STATS_ROWS_THRESHOLD = 5_000; - private readonly differ = new SchemaDiffer(); readonly optimizer: QueryOptimizer; /** @@ -58,6 +59,7 @@ export class Remote extends EventEmitter { private isPolling = false; private queryLoader?: QueryLoader; + private schemaLoader?: SchemaLoader; constructor( /** This has to be a local url. Very bad things will happen if this is a remote URL */ @@ -100,7 +102,7 @@ export class Remote extends EventEmitter { ]); if (fullSchema.status === "fulfilled") { - this.differ.put(source, fullSchema.value); + this.schemaLoader?.update(fullSchema.value); } // Second: resolve stats strategy using table list from schema @@ -147,11 +149,33 @@ export class Remote extends EventEmitter { }; } + async getStatus() { + const queries = this.optimizer.getQueries(); + const [diffs] = await Promise.allSettled([ + this.schemaLoader?.poll().then( + (results) => results.diffs, + (error) => { + log.error("Failed to poll schema", "remote"); + console.error(error); + throw error; + }, + ) ?? + [] as Op[], /* no panic in case schemaLoader has not loaded in yet */ + this.pollQueriesOnce().catch((error) => { + log.error("Failed to poll queries", "remote"); + console.error(error); + throw error; + }), + ]); + + return { queries, diffs }; + } + /** * Runs a single poll of pg_stat_statements if * there isn't already an in-flight request */ - async pollQueriesOnce() { + private async pollQueriesOnce() { if (this.queryLoader && !this.isPolling) { try { this.isPolling = true; @@ -299,6 +323,7 @@ export class Remote extends EventEmitter { this.queryLoader.stop(); } this.queryLoader = new QueryLoader(this.sourceManager, source); + this.schemaLoader = new SchemaLoader(this.sourceManager, source); this.queryLoader.on("pollError", (error) => { log.error("Failed to poll queries", "remote"); console.error(error); diff --git a/src/remote/schema-loader.ts b/src/remote/schema-loader.ts new file mode 100644 index 0000000..e2407a3 --- /dev/null +++ b/src/remote/schema-loader.ts @@ -0,0 +1,24 @@ +import { Connectable } from "../sync/connectable.ts"; +import { ConnectionManager } from "../sync/connection-manager.ts"; +import { FullSchema, SchemaDiffer } from "../sync/schema_differ.ts"; + +export class SchemaLoader { + constructor( + private readonly sourceManager: ConnectionManager, + private readonly connectable: Connectable, + ) {} + + private readonly differ = new SchemaDiffer(); + + async poll() { + const connector = this.sourceManager.getConnectorFor(this.connectable); + const schema = await connector.getSchema(); + + const diffs = this.update(schema) ?? []; + return { diffs }; + } + + update(fullSchema: FullSchema) { + return this.differ.put(this.connectable, fullSchema); + } +}