diff --git a/ROADMAP.md b/ROADMAP.md index d7dcdcd..619dcf5 100644 --- a/ROADMAP.md +++ b/ROADMAP.md @@ -1,7 +1,8 @@ # Roadmap > Execution plan for `@git-stunts/empty-graph` from v7.1.0 onward. -> Current version: v7.0.0. All milestones prior to this document (M1–M4) are complete. +> Current release: v7.0.0. Main branch: v7.1.0 complete (AUTOPILOT merged, unreleased). +> Active milestone: GROUNDSKEEPER (v7.2.0). ## How to Read This Document @@ -44,15 +45,15 @@ Eliminates manual state freshness management. Cached state stays fresh after loc Indexes, GC, and frontier tracking manage themselves. -**Features:** +**Features (recommended order):** +- GK/FRONTIER — Frontier change detection (multiplier — unlocks PULSE, useful for status/debug/CI) - GK/IDX — Index staleness tracking -- GK/GC — Auto-GC after materialization -- GK/FRONTIER — Frontier change detection +- GK/GC — Auto-GC after materialization (last — most likely to cause surprise perf behavior) **User-Facing Changes:** -- Bitmap index stores frontier at build time; `loadIndex()` warns or auto-rebuilds when stale. -- `materialize()` automatically runs GC when tombstone ratio exceeds threshold. - New `graph.hasFrontierChanged()` method for cheap "has anything changed?" polling. +- Bitmap index stores frontier at build time; `loadIndex()` warns when stale, opt-in `autoRebuild: true` to rebuild automatically. +- `materialize()` runs GC when `gcPolicy: { enabled: true }` is set and tombstone ratio exceeds threshold. Default: warn only, no automatic GC. ### v7.3.0 — WEIGHTED @@ -70,7 +71,7 @@ Extends the data model with properties on edges. - New `patch.setEdgeProperty(from, to, label, key, value)` API. - `getEdges()` and query results include edge `props` field. - Schema v3 with backward-compatible v2 reader support. -- Mixed-version sync safety between v2 and v3 writers. +- Mixed-version sync: v2 readers fail fast with `E_SCHEMA_UNSUPPORTED` on unknown edge prop ops (never silently drop data). ### v7.4.0 — HANDSHAKE @@ -186,8 +187,8 @@ Observer-scoped views, translation costs, and temporal queries from Paper IV. | # | Codename | Version | Theme | Status | |---|----------|---------|-------|--------| -| 1 | **AUTOPILOT** | v7.1.0 | Kill the Materialize Tax | In progress | -| 2 | **GROUNDSKEEPER** | v7.2.0 | Self-Managing Infrastructure | Planned | +| 1 | **AUTOPILOT** | v7.1.0 | Kill the Materialize Tax | Complete (merged, unreleased) | +| 2 | **GROUNDSKEEPER** | v7.2.0 | Self-Managing Infrastructure | In progress | | 3 | **WEIGHTED** | v7.3.0 | Edge Properties | Planned | | 4 | **HANDSHAKE** | v7.4.0 | Multi-Writer Ergonomics | Planned | | 5 | **COMPASS** | v7.5.0 | Advanced Query Language | Planned | @@ -238,11 +239,11 @@ AUTOPILOT (v7.1.0) █████████████████ ■ AP/LAZY/1 → AP/LAZY/2 ■ AP/LAZY/2 -GROUNDSKEEPER (v7.2.0) ░░░░░░░░░░░░░░░░░░░░ 0% (0/4) - ◆ GK/FRONTIER/1 → PL/WATCH/2 - ◆ GK/GC/1 - ◆ GK/IDX/1 → GK/IDX/2 - ○ GK/IDX/2 +GROUNDSKEEPER (v7.2.0) ████████████████████ 100% (4/4) + ■ GK/FRONTIER/1 → PL/WATCH/2 + ■ GK/GC/1 + ■ GK/IDX/1 → GK/IDX/2 + ■ GK/IDX/2 WEIGHTED (v7.3.0) ░░░░░░░░░░░░░░░░░░░░ 0% (0/7) ◆ WT/EPKEY/1 → WT/OPS/1, WT/SCHEMA/1 @@ -427,23 +428,25 @@ The single biggest DX problem. Developers manually orchestrate state freshness a - Affected methods: `hasNode()`, `getNodeProps()`, `neighbors()`, `getNodes()`, `getEdges()`, `query().run()`, all `traverse.*` methods. - When `autoMaterialize === false`, preserve current behavior (throw or return stale). - Guard must be async-safe (callers already await these methods). + - **Core invariant: concurrent auto-materialize calls MUST coalesce.** Store one in-flight materialize promise on the graph instance; concurrent callers await it; clear when resolved/rejected. Without this, N concurrent queries trigger N materializations and the library becomes unusable under load. - **Acceptance Criteria:** - With autoMaterialize on: open graph → addNode → commit → hasNode returns true (no explicit materialize). - With autoMaterialize on: open graph → query().run() works on first call (no prior materialize). - With autoMaterialize off: current behavior unchanged. -- **Scope:** Add guard to all query entry points. + - 20 concurrent queries trigger exactly 1 materialize() call (coalescing invariant). +- **Scope:** Add guard to all query entry points with materialize coalescing. - **Out of Scope:** Incremental/partial materialization strategy. - **Estimated Hours:** 4 - **Estimated LOC:** ~60 prod + ~200 test - **Blocked by:** AP/LAZY/1, AP/INVAL/1 - **Blocking:** None -- **Definition of Done:** All query methods auto-materialize when enabled. Existing tests unaffected. +- **Definition of Done:** All query methods auto-materialize when enabled. Concurrent calls coalesce. Existing tests unaffected. - **Test Plan:** - Golden path: fresh open → query with autoMaterialize → results returned. - Golden path: dirty state → query → auto-rematerializes → fresh results. - Known failures: autoMaterialize off → null state → appropriate error. - - Edge cases: concurrent auto-materialize calls coalesce (no double work). - - Stress: 50 rapid queries, only one materialize() call triggered. + - **Core invariant test:** 20 concurrent queries → exactly 1 materialize() call triggered. + - Stress: 50 rapid queries, coalescing verified via spy count. --- @@ -587,50 +590,58 @@ Once the materialize tax is gone, the next friction layer is infrastructure that #### GK/IDX/1 — Store frontier snapshot in index metadata at build time -- **Status:** `OPEN` +- **Status:** `CLOSED` - **User Story:** As the system, I need to record the frontier when an index was built so I can later detect staleness. - **Requirements:** - At `BitmapIndexBuilder.serialize()` time, accept and store current frontier (writer ID → tip SHA map). - - Write as `frontier.json` blob in the index tree alongside existing shard files. - - Format: `{ "version": 1, "frontier": { "alice": "abc...", "bob": "def..." } }`. + - **Authoritative format:** Write `frontier.cbor` blob in the index tree using the existing CborCodec. CBOR gives deterministic bytes, faster parsing on the hot staleness-check path, and prevents manual edits from creating lies. + - **Debug artifact:** Also write `frontier.json` blob as a human-readable debug artifact, generated from the same data. This is optional output — the system never reads from it. + - CBOR payload: `{ version: 1, writerCount: N, frontier: { "alice": "abc...", "bob": "def..." } }`. + - JSON payload: identical structure, canonical JSON (sorted keys, no whitespace variance, UTF-8). + - On read (GK/IDX/2), prefer `frontier.cbor`; fall back to `frontier.json` if CBOR missing (forward compat during rollout). - **Acceptance Criteria:** - - Built index contains `frontier.json` with correct writer tips. - - Existing index loading code ignores `frontier.json` if not present (backward compat). + - Built index contains `frontier.cbor` with correct writer tips. + - Built index contains `frontier.json` as debug artifact. + - Existing index loading code ignores both files if not present (backward compat). - **Scope:** Write frontier metadata at build time. - **Out of Scope:** Reading/comparing frontier (that's GK/IDX/2). - **Estimated Hours:** 3 -- **Estimated LOC:** ~50 prod + ~80 test +- **Estimated LOC:** ~60 prod + ~80 test - **Blocked by:** None - **Blocking:** GK/IDX/2 -- **Definition of Done:** Index tree contains frontier metadata. Backward compatible with existing indexes. +- **Definition of Done:** Index tree contains frontier metadata in both CBOR (authoritative) and JSON (debug). Backward compatible with existing indexes. - **Test Plan:** - - Golden path: build index → frontier.json present with correct data. + - Golden path: build index → frontier.cbor present with correct data, frontier.json present and matches. - Edge cases: empty frontier (no writers), single writer. + - Round-trip: CBOR encode → decode matches original frontier map. #### GK/IDX/2 — Detect and report index staleness on load -- **Status:** `BLOCKED` +- **Status:** `CLOSED` - **User Story:** As a developer, I want to know if my index is stale so I can decide whether to rebuild. - **Requirements:** - - On `loadIndex()`, read `frontier.json` from index tree. + - On `loadIndex()`, read `frontier.cbor` (or `frontier.json` fallback) from index tree. - Compare stored frontier against current writer refs. - - If diverged, log warning via LoggerPort: `[warp] Index is stale. N writers have advanced since last build.` - - Add `autoRebuild: boolean` option (default `false`). When true, trigger rebuild on staleness. + - **Default behavior: warn only.** If diverged, log warning via LoggerPort: `[warp] Index is stale. N writers have advanced since last build. Call rebuildIndex() to update.` + - **Opt-in rebuild:** Add `autoRebuild: boolean` option (default `false`). When `true`, trigger rebuild on staleness. Users must explicitly opt in to expensive work. + - Warning message must include clear "what to do next" guidance. - **Acceptance Criteria:** - - Stale index → warning logged. + - Stale index → warning logged with recovery guidance. - Fresh index → no warning. - `autoRebuild: true` → index rebuilt automatically. + - `autoRebuild: false` (default) → warning only, no rebuild. - **Scope:** Staleness detection and optional auto-rebuild. - **Out of Scope:** Incremental index update (always full rebuild). - **Estimated Hours:** 4 - **Estimated LOC:** ~60 prod + ~120 test - **Blocked by:** GK/IDX/1 - **Blocking:** None -- **Definition of Done:** Stale indexes detected and reported. Auto-rebuild works when enabled. +- **Definition of Done:** Stale indexes detected and reported with guidance. Auto-rebuild works only when explicitly enabled. - **Test Plan:** - - Golden path: build index → advance writer → load → warning. + - Golden path: build index → advance writer → load → warning logged with guidance. - Golden path: build index → load immediately → no warning. - - Known failures: index has no frontier.json (legacy) → no warning, no crash. + - Golden path: build index → advance writer → load with autoRebuild:true → index rebuilt. + - Known failures: index has no frontier.cbor/frontier.json (legacy) → no warning, no crash. - Edge cases: new writer added since index build, writer removed. --- @@ -641,29 +652,32 @@ Once the materialize tax is gone, the next friction layer is infrastructure that #### GK/GC/1 — Wire GC check into post-materialize path -- **Status:** `OPEN` +- **Status:** `CLOSED` - **User Story:** As a developer, I want tombstones cleaned up automatically so I don't have to think about GC. - **Requirements:** - - After `materialize()` completes (and after optional auto-checkpoint from AP/CKPT/3), check `getGCMetrics()` against configured `gcPolicy`. - - If `shouldRunGC()` returns true, execute GC. - - Flow: `materialize() → apply patches → maybe checkpoint → maybe GC`. + - Accept `gcPolicy` option on `WarpGraph.open()`: `{ enabled: boolean, tombstoneRatioThreshold?: number, ... }`. + - **Default behavior: warn only.** When no `gcPolicy` is set (or `enabled: false`), log a warning via LoggerPort when thresholds are exceeded but do NOT execute GC. Users must never be surprised by expensive work they didn't ask for. + - **Opt-in execution:** When `gcPolicy: { enabled: true }` is set, after `materialize()` completes (and after optional auto-checkpoint), check `getGCMetrics()` against policy thresholds and execute GC if exceeded. + - Flow: `materialize() → apply patches → maybe checkpoint → maybe warn/GC`. - Log GC execution and results via LoggerPort. - GC failure does not break materialize — log warning and continue. - **Acceptance Criteria:** - - After materialize with 40% tombstone ratio (threshold 30%), GC runs automatically. - - After materialize with 10% tombstone ratio, GC does not run. + - No gcPolicy set + 40% tombstone ratio → warning logged, GC does NOT run. + - `gcPolicy: { enabled: true }` + 40% tombstone ratio (threshold 30%) → GC runs automatically. + - `gcPolicy: { enabled: true }` + 10% tombstone ratio → GC does not run. - GC failure logged but materialize still returns valid state. -- **Scope:** Wire existing GC into materialize path. +- **Scope:** Wire existing GC into materialize path with opt-in semantics. - **Out of Scope:** New GC algorithms, concurrent GC. - **Estimated Hours:** 3 - **Estimated LOC:** ~30 prod + ~100 test - **Blocked by:** None (uses existing GC infrastructure) - **Blocking:** None -- **Definition of Done:** GC runs automatically when thresholds exceeded after materialize. +- **Definition of Done:** GC warns by default, runs only when explicitly enabled, and never surprises users with unexpected latency. - **Test Plan:** - - Golden path: create graph with many tombstones → materialize → GC runs. + - Golden path: gcPolicy enabled + many tombstones → materialize → GC runs. + - Golden path: gcPolicy absent + many tombstones → warning logged, no GC. - Known failures: GC throws → materialize still succeeds. - - Edge cases: exactly at threshold, gcPolicy not configured. + - Edge cases: exactly at threshold, gcPolicy not configured, gcPolicy enabled with custom thresholds. --- @@ -673,7 +687,7 @@ Once the materialize tax is gone, the next friction layer is infrastructure that #### GK/FRONTIER/1 — Implement hasFrontierChanged() method -- **Status:** `OPEN` +- **Status:** `CLOSED` - **User Story:** As a developer, I want to cheaply check if the graph has new data without materializing. - **Requirements:** - `graph.hasFrontierChanged()` compares `_cachedState.observedFrontier` against current writer refs. @@ -868,23 +882,28 @@ Extends the data model to support properties on edges, enabling weighted graphs, - **Status:** `BLOCKED` - **User Story:** As a developer, I want to sync between v2 and v3 writers without data loss. - **Requirements:** - - v3 writer syncing with v2 writer: v2 patches applied normally (no edge props). - - v2 writer syncing with v3 writer: v3 patches with edge prop ops decoded but edge props silently dropped (v2 has no edge prop support). - - Alternatively: v2 writer encountering v3 patch throws clear error with upgrade guidance. - - Choose one strategy and document the decision. + - **Decision: fail fast. Never silently drop data.** + - v3 writer syncing with v2 writer: v2 patches applied normally (no edge props). This direction is safe. + - v2 writer encountering v3 patch: throw `E_SCHEMA_UNSUPPORTED` with message: "Upgrade to >=7.3.0 (WEIGHTED) to sync edge properties." + - **Rationale:** Silent dropping is data corruption with a smile. Users will sync, lose edge property semantics, and only notice when it's too late. Failing fast forces a conscious upgrade decision. + - Exception: v3 patches containing only node/edge ops (no edge property ops) SHOULD be accepted by v2 readers — the schema bump alone is not a rejection criterion; only unknown op types trigger rejection. - **Acceptance Criteria:** - - Mixed-version sync either degrades gracefully or fails clearly. - - No silent data corruption. + - v3→v2 sync with edge prop ops → `E_SCHEMA_UNSUPPORTED` error with upgrade guidance. + - v3→v2 sync with only node/edge ops → succeeds (no edge prop ops to misunderstand). + - v2→v3 sync → succeeds (v2 patches are always valid v3 input). + - No silent data corruption in any direction. - **Scope:** Sync compatibility behavior. -- **Out of Scope:** Online migration, schema negotiation protocol. +- **Out of Scope:** Online migration, schema negotiation protocol, explicit version downgrade path. - **Estimated Hours:** 4 - **Estimated LOC:** ~60 prod + ~150 test - **Blocked by:** WT/SCHEMA/1 - **Blocking:** None -- **Definition of Done:** Mixed-version sync tested and behavior documented. +- **Definition of Done:** Mixed-version sync tested. v2 readers fail fast on unknown ops with actionable error. - **Test Plan:** - - Golden path: v2 writer ↔ v3 writer sync in both directions. - - Known failures: unsupported schema error path. + - Golden path: v2→v3 sync succeeds. + - Golden path: v3→v2 sync with edge prop ops → E_SCHEMA_UNSUPPORTED. + - Golden path: v3→v2 sync with node-only ops → succeeds. + - Known failures: unsupported schema error includes version and upgrade guidance. - Edge cases: v3 patch with only node ops (should work with v2). --- @@ -1059,27 +1078,31 @@ The multi-writer story works but has sharp edges around writer identity, sync wo - **Status:** `BLOCKED` - **User Story:** As a developer, I want node deletion to fail or warn when the node has attached data. - **Requirements:** - - In `commitPatch()`, before finalizing: inspect `NodeRemove` ops. - - For each removed node, check current state for properties and connected edges. + - **Two-layer validation:** + - **PatchBuilder (best-effort):** When `removeNode()` is called, check cached state for attached edges/props. If found, fail early (reject) or warn immediately. This catches most issues at build time with zero network cost. + - **Commit path (authoritative):** In `commitPatch()`, before finalizing, re-inspect `NodeRemove` ops against the state at commit time (post-CAS). This is the ground truth because patches may be built against stale state. - `'reject'`: throw error listing the attached data. - `'warn'`: log warning via LoggerPort, proceed with deletion. + - **Rationale:** Fail early when possible, recheck at commit time because of CAS and sync realities. - **Acceptance Criteria:** - Reject mode: delete node with props → error thrown, commit aborted. - Reject mode: delete node with edges → error thrown, commit aborted. - Warn mode: delete node with props → warning logged, commit succeeds. - Delete node with no data → succeeds in all modes. -- **Scope:** Reject and warn validation. + - Best-effort validation catches issues even when commit-time check isn't reached. +- **Scope:** Reject and warn validation at both patch-build and commit time. - **Out of Scope:** Cascade mode (that's HS/DELGUARD/3). - **Estimated Hours:** 4 - **Estimated LOC:** ~80 prod + ~150 test - **Blocked by:** HS/DELGUARD/1 - **Blocking:** None -- **Definition of Done:** Reject and warn modes work correctly for all cases. +- **Definition of Done:** Reject and warn modes work correctly at both validation layers. - **Test Plan:** - Golden path: reject mode blocks deletion of node with props. - Golden path: reject mode blocks deletion of node with edges. - Golden path: warn mode logs and proceeds. - - Edge cases: node with both props and edges, node with only outgoing edges, node with only incoming edges. + - Golden path: best-effort validation catches issue at build time (before commit). + - Edge cases: node with both props and edges, node with only outgoing edges, node with only incoming edges, stale state at build time but fresh at commit time. #### HS/DELGUARD/3 — Implement cascade mode @@ -1363,7 +1386,7 @@ The library is opaque at runtime. Users can't see what's happening without addin - For OR-Set add: record whether dot was new or re-add. - For OR-Set remove: record whether remove was effective. - Return receipts in materialize result: `{ state, receipts }`. - - When `{ receipts: false }` (default), no overhead — receipts array not allocated. + - **Zero-cost invariant:** When `{ receipts: false }` (default), strictly zero overhead — no receipt array allocated, no decision strings constructed, no allocations on the hot path. This is non-negotiable. Receipts must never become a permanent perf tax that leaks into normal materialization. - **Acceptance Criteria:** - `materialize({ receipts: true })` returns receipts array. - Each receipt corresponds to one patch with per-op decisions. diff --git a/index.d.ts b/index.d.ts index 6065521..34b71aa 100644 --- a/index.d.ts +++ b/index.d.ts @@ -504,7 +504,7 @@ export class BitmapIndexBuilder { addEdge(srcSha: string, tgtSha: string): void; /** Serializes the index to a tree structure of buffers */ - serialize(): Record; + serialize(options?: { frontier?: Map }): Record; } /** @@ -888,7 +888,16 @@ export default class WarpGraph { writerId: string; logger?: LoggerPort; adjacencyCacheSize?: number; - gcPolicy?: { type: string; [key: string]: unknown }; + gcPolicy?: { + enabled?: boolean; + tombstoneRatioThreshold?: number; + entryCountThreshold?: number; + minPatchesSinceCompaction?: number; + maxTimeSinceCompaction?: number; + compactOnCheckpoint?: boolean; + }; + checkpointPolicy?: { every: number }; + autoMaterialize?: boolean; }): Promise; /** @@ -953,6 +962,12 @@ export default class WarpGraph { */ getFrontier(): Promise>; + /** + * Checks whether any writer tip has changed since the last materialize. + * O(writers) comparison — cheap "has anything changed?" check without materialization. + */ + hasFrontierChanged(): Promise; + /** * Creates a checkpoint snapshot of the current materialized state. */ diff --git a/src/domain/WarpGraph.js b/src/domain/WarpGraph.js index 48ec55d..5ec4d62 100644 --- a/src/domain/WarpGraph.js +++ b/src/domain/WarpGraph.js @@ -92,8 +92,9 @@ export default class WarpGraph { * @param {number} [options.adjacencyCacheSize] - Max materialized adjacency cache entries * @param {{every: number}} [options.checkpointPolicy] - Auto-checkpoint policy; creates a checkpoint every N patches * @param {boolean} [options.autoMaterialize=false] - If true, query methods auto-materialize instead of throwing + * @param {import('../ports/LoggerPort.js').default} [options.logger] - Logger for structured logging */ - constructor({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize = DEFAULT_ADJACENCY_CACHE_SIZE, checkpointPolicy, autoMaterialize = false }) { + constructor({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize = DEFAULT_ADJACENCY_CACHE_SIZE, checkpointPolicy, autoMaterialize = false, logger }) { /** @type {import('../ports/GraphPersistencePort.js').default} */ this._persistence = persistence; @@ -141,6 +142,12 @@ export default class WarpGraph { /** @type {import('./utils/LRUCache.js').default|null} */ this._adjacencyCache = adjacencyCacheSize > 0 ? new LRUCache(adjacencyCacheSize) : null; + + /** @type {Map|null} */ + this._lastFrontier = null; + + /** @type {import('../ports/LoggerPort.js').default|null} */ + this._logger = logger || null; } /** @@ -154,6 +161,7 @@ export default class WarpGraph { * @param {number} [options.adjacencyCacheSize] - Max materialized adjacency cache entries * @param {{every: number}} [options.checkpointPolicy] - Auto-checkpoint policy; creates a checkpoint every N patches * @param {boolean} [options.autoMaterialize] - If true, query methods auto-materialize instead of throwing + * @param {import('../ports/LoggerPort.js').default} [options.logger] - Logger for structured logging * @returns {Promise} The opened graph instance * @throws {Error} If graphName, writerId, or checkpointPolicy is invalid * @@ -164,7 +172,7 @@ export default class WarpGraph { * writerId: 'node-1' * }); */ - static async open({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize, checkpointPolicy, autoMaterialize }) { + static async open({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize, checkpointPolicy, autoMaterialize, logger }) { // Validate inputs validateGraphName(graphName); validateWriterId(writerId); @@ -188,7 +196,7 @@ export default class WarpGraph { throw new Error('autoMaterialize must be a boolean'); } - const graph = new WarpGraph({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize }); + const graph = new WarpGraph({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize, logger }); // Validate migration boundary await graph._validateMigrationBoundary(); @@ -503,6 +511,7 @@ export default class WarpGraph { } this._setMaterializedState(state); + this._lastFrontier = await this.getFrontier(); this._patchesSinceCheckpoint = patchCount; // Auto-checkpoint if policy is set and threshold exceeded. @@ -516,6 +525,8 @@ export default class WarpGraph { } } + this._maybeRunGC(state); + return state; } @@ -1015,6 +1026,46 @@ export default class WarpGraph { // Garbage Collection // ============================================================================ + /** + * Post-materialize GC check. Warn by default; execute only when enabled. + * GC failure never breaks materialize. + * + * @param {import('./services/JoinReducer.js').WarpStateV5} state + * @private + */ + _maybeRunGC(state) { + try { + const metrics = collectGCMetrics(state); + const inputMetrics = { + ...metrics, + patchesSinceCompaction: this._patchesSinceGC, + timeSinceCompaction: Date.now() - this._lastGCTime, + }; + const { shouldRun, reasons } = shouldRunGC(inputMetrics, this._gcPolicy); + + if (!shouldRun) { + return; + } + + if (this._gcPolicy.enabled) { + const appliedVV = computeAppliedVV(state); + const result = executeGC(state, appliedVV); + this._lastGCTime = Date.now(); + this._patchesSinceGC = 0; + if (this._logger) { + this._logger.info('Auto-GC completed', { ...result, reasons }); + } + } else if (this._logger) { + this._logger.warn( + 'GC thresholds exceeded but auto-GC is disabled. Set gcPolicy: { enabled: true } to auto-compact.', + { reasons }, + ); + } + } catch { + // GC failure never breaks materialize + } + } + /** * Checks if GC should run based on current metrics and policy. * If thresholds are exceeded, runs GC on the cached state. @@ -1133,6 +1184,34 @@ export default class WarpGraph { return frontier; } + /** + * Checks whether any writer tip has changed since the last materialize. + * + * O(writers) comparison of stored writer tip SHAs against current refs. + * Cheap "has anything changed?" check without materialization. + * + * @returns {Promise} True if frontier has changed (or never materialized) + */ + async hasFrontierChanged() { + if (this._lastFrontier === null) { + return true; + } + + const current = await this.getFrontier(); + + if (current.size !== this._lastFrontier.size) { + return true; + } + + for (const [writerId, tipSha] of current) { + if (this._lastFrontier.get(writerId) !== tipSha) { + return true; + } + } + + return false; + } + /** * Creates a sync request to send to a remote peer. * The request contains the local frontier for comparison. diff --git a/src/domain/services/BitmapIndexBuilder.js b/src/domain/services/BitmapIndexBuilder.js index f6a6cc1..c19585f 100644 --- a/src/domain/services/BitmapIndexBuilder.js +++ b/src/domain/services/BitmapIndexBuilder.js @@ -1,5 +1,6 @@ import { createHash } from 'crypto'; import { getRoaringBitmap32, getNativeRoaringAvailable } from '../utils/roaring.js'; +import { encode as cborEncode } from '../../infrastructure/codecs/CborCodec.js'; /** * Shard format version for forward compatibility. @@ -51,6 +52,32 @@ const ensureRoaringBitmap32 = () => { return RoaringBitmap32; }; +/** + * Wraps data in a version/checksum envelope. + * @param {Object} data - The data to wrap + * @returns {Object} Envelope with version, checksum, and data + */ +const wrapShard = (data) => ({ + version: SHARD_VERSION, + checksum: computeChecksum(data), + data, +}); + +/** + * Serializes a frontier Map into CBOR and JSON blobs in the given tree. + * @param {Map} frontier - Writer→tip SHA map + * @param {Record} tree - Target tree to add entries to + */ +function serializeFrontierToTree(frontier, tree) { + const sorted = {}; + for (const key of Array.from(frontier.keys()).sort()) { + sorted[key] = frontier.get(key); + } + const envelope = { version: 1, writerCount: frontier.size, frontier: sorted }; + tree['frontier.cbor'] = Buffer.from(cborEncode(envelope)); + tree['frontier.json'] = Buffer.from(canonicalStringify(envelope)); +} + /** * Builder for constructing bitmap indexes in memory. * @@ -125,20 +152,9 @@ export default class BitmapIndexBuilder { * * @returns {Record} Map of path → serialized content */ - serialize() { + serialize({ frontier } = {}) { const tree = {}; - /** - * Wraps data in a version/checksum envelope. - * @param {Object} data - The data to wrap - * @returns {Object} Envelope with version, checksum, and data - */ - const wrapShard = (data) => ({ - version: SHARD_VERSION, - checksum: computeChecksum(data), - data, - }); - // Serialize ID mappings (sharded by prefix) const idShards = {}; for (const [sha, id] of this.shaToId) { @@ -172,6 +188,10 @@ export default class BitmapIndexBuilder { } } + if (frontier) { + serializeFrontierToTree(frontier, tree); + } + return tree; } diff --git a/src/domain/services/GCPolicy.js b/src/domain/services/GCPolicy.js index eb853c7..e2ed93b 100644 --- a/src/domain/services/GCPolicy.js +++ b/src/domain/services/GCPolicy.js @@ -7,6 +7,7 @@ import { collectGCMetrics } from './GCMetrics.js'; /** * @typedef {Object} GCPolicy + * @property {boolean} enabled - Whether automatic GC is enabled (default: false) * @property {number} tombstoneRatioThreshold - Ratio of tombstones that triggers GC (0.0-1.0) * @property {number} entryCountThreshold - Total entries that triggers GC * @property {number} minPatchesSinceCompaction - Minimum patches between GCs @@ -38,6 +39,7 @@ import { collectGCMetrics } from './GCMetrics.js'; /** @type {Readonly} */ export const DEFAULT_GC_POLICY = Object.freeze({ + enabled: false, // Must opt-in to automatic GC tombstoneRatioThreshold: 0.3, // 30% tombstones triggers GC entryCountThreshold: 50000, // 50K entries triggers GC minPatchesSinceCompaction: 1000, // Min patches between GCs diff --git a/src/domain/services/IndexRebuildService.js b/src/domain/services/IndexRebuildService.js index 745d981..7cf148b 100644 --- a/src/domain/services/IndexRebuildService.js +++ b/src/domain/services/IndexRebuildService.js @@ -2,6 +2,7 @@ import { performance } from 'perf_hooks'; import BitmapIndexBuilder from './BitmapIndexBuilder.js'; import BitmapIndexReader from './BitmapIndexReader.js'; import StreamingBitmapIndexBuilder from './StreamingBitmapIndexBuilder.js'; +import { loadIndexFrontier, checkStaleness } from './IndexStalenessChecker.js'; import NoOpLogger from '../../infrastructure/adapters/NoOpLogger.js'; import { checkAborted } from '../utils/cancellation.js'; @@ -69,6 +70,8 @@ export default class IndexRebuildService { * Receives { processedNodes, currentMemoryBytes }. * @param {AbortSignal} [options.signal] - Optional AbortSignal for cancellation support. * When aborted, throws OperationAbortedError at the next loop boundary. + * @param {Map} [options.frontier] - Frontier to persist alongside the rebuilt index. + * Maps writer IDs to their tip SHAs; stored in the index tree for staleness detection. * @returns {Promise} OID of the created tree containing the index * @throws {Error} If ref is invalid or limit is out of range * @@ -83,7 +86,7 @@ export default class IndexRebuildService { * onFlush: ({ flushCount }) => console.log(`Flush #${flushCount}`), * }); */ - async rebuild(ref, { limit = 10_000_000, maxMemoryBytes, onFlush, onProgress, signal } = {}) { + async rebuild(ref, { limit = 10_000_000, maxMemoryBytes, onFlush, onProgress, signal, frontier } = {}) { if (maxMemoryBytes !== undefined && maxMemoryBytes <= 0) { throw new Error('maxMemoryBytes must be a positive number'); } @@ -101,9 +104,9 @@ export default class IndexRebuildService { try { let treeOid; if (maxMemoryBytes !== undefined) { - treeOid = await this._rebuildStreaming(ref, { limit, maxMemoryBytes, onFlush, onProgress, signal }); + treeOid = await this._rebuildStreaming(ref, { limit, maxMemoryBytes, onFlush, onProgress, signal, frontier }); } else { - treeOid = await this._rebuildInMemory(ref, { limit, onProgress, signal }); + treeOid = await this._rebuildInMemory(ref, { limit, onProgress, signal, frontier }); } const durationMs = performance.now() - startTime; @@ -140,7 +143,7 @@ export default class IndexRebuildService { * @returns {Promise} Tree OID * @private */ - async _rebuildInMemory(ref, { limit, onProgress, signal }) { + async _rebuildInMemory(ref, { limit, onProgress, signal, frontier }) { const builder = new BitmapIndexBuilder(); let processedNodes = 0; @@ -159,7 +162,7 @@ export default class IndexRebuildService { } } - return await this._persistIndex(builder); + return await this._persistIndex(builder, { frontier }); } /** @@ -175,7 +178,7 @@ export default class IndexRebuildService { * @returns {Promise} Tree OID * @private */ - async _rebuildStreaming(ref, { limit, maxMemoryBytes, onFlush, onProgress, signal }) { + async _rebuildStreaming(ref, { limit, maxMemoryBytes, onFlush, onProgress, signal, frontier }) { const builder = new StreamingBitmapIndexBuilder({ storage: this.storage, maxMemoryBytes, @@ -203,7 +206,7 @@ export default class IndexRebuildService { } } - return await builder.finalize({ signal }); + return await builder.finalize({ signal, frontier }); } /** @@ -216,8 +219,8 @@ export default class IndexRebuildService { * @returns {Promise} OID of the created tree * @private */ - async _persistIndex(builder) { - const treeStructure = builder.serialize(); + async _persistIndex(builder, { frontier } = {}) { + const treeStructure = builder.serialize({ frontier }); const flatEntries = []; for (const [path, buffer] of Object.entries(treeStructure)) { const oid = await this.storage.writeBlob(buffer); @@ -251,6 +254,12 @@ export default class IndexRebuildService { * @param {boolean} [options.strict=true] - Enable strict integrity verification (fail-closed). * When true, throws on any shard validation or corruption errors. * When false, attempts graceful degradation. + * @param {Map} [options.currentFrontier] - Frontier to compare for staleness. + * Maps writer IDs to their current tip SHAs. When provided, triggers a staleness + * check against the frontier stored in the index. + * @param {boolean} [options.autoRebuild=false] - Auto-rebuild when a stale index is detected. + * Requires `rebuildRef` to be set. + * @param {string} [options.rebuildRef] - Git ref to rebuild from when `autoRebuild` is true. * @returns {Promise} Configured reader ready for O(1) queries * @throws {Error} If treeOid is invalid or tree cannot be read * @throws {ShardValidationError} (strict mode) If shard structure validation fails @@ -278,7 +287,7 @@ export default class IndexRebuildService { * const savedOid = await storage.readRef('refs/empty-graph/index'); * const reader = await rebuildService.load(savedOid); */ - async load(treeOid, { strict = true } = {}) { + async load(treeOid, { strict = true, currentFrontier, autoRebuild = false, rebuildRef } = {}) { this.logger.debug('Loading index', { operation: 'load', treeOid, @@ -289,6 +298,29 @@ export default class IndexRebuildService { const shardOids = await this.storage.readTreeOids(treeOid); const shardCount = Object.keys(shardOids).length; + // Staleness check + if (currentFrontier) { + const indexFrontier = await loadIndexFrontier(shardOids, this.storage); + if (indexFrontier) { + const result = checkStaleness(indexFrontier, currentFrontier); + if (result.stale) { + this.logger.warn('Index is stale', { + operation: 'load', + reason: result.reason, + hint: 'Rebuild the index or pass autoRebuild: true', + }); + if (autoRebuild && rebuildRef) { + const newTreeOid = await this.rebuild(rebuildRef, { frontier: currentFrontier }); + return await this.load(newTreeOid, { strict }); + } + } + } else { + this.logger.debug('No frontier in index (legacy); skipping staleness check', { + operation: 'load', + }); + } + } + const reader = new BitmapIndexReader({ storage: this.storage, strict, logger: this.logger.child({ component: 'BitmapIndexReader' }) }); reader.setup(shardOids); diff --git a/src/domain/services/IndexStalenessChecker.js b/src/domain/services/IndexStalenessChecker.js new file mode 100644 index 0000000..84dd40d --- /dev/null +++ b/src/domain/services/IndexStalenessChecker.js @@ -0,0 +1,100 @@ +/** + * IndexStalenessChecker - Detects stale bitmap indexes by comparing + * frontier metadata stored at build time against current writer refs. + */ + +import { decode as cborDecode } from '../../infrastructure/codecs/CborCodec.js'; + +/** @private */ +function validateEnvelope(envelope, label) { + if (!envelope || typeof envelope !== 'object' || !envelope.frontier || typeof envelope.frontier !== 'object') { + throw new Error(`invalid frontier envelope for ${label}`); + } +} + +/** + * Loads the frontier from an index tree's shard OIDs. + * + * @param {Record} shardOids - Map of path → blob OID from readTreeOids + * @param {import('../../ports/IndexStoragePort.js').default} storage - Storage adapter + * @returns {Promise|null>} Frontier map, or null if not present (legacy index) + */ +export async function loadIndexFrontier(shardOids, storage) { + const cborOid = shardOids['frontier.cbor']; + if (cborOid) { + const buffer = await storage.readBlob(cborOid); + const envelope = cborDecode(buffer); + validateEnvelope(envelope, 'frontier.cbor'); + return new Map(Object.entries(envelope.frontier)); + } + + const jsonOid = shardOids['frontier.json']; + if (jsonOid) { + const buffer = await storage.readBlob(jsonOid); + const envelope = JSON.parse(buffer.toString('utf-8')); + validateEnvelope(envelope, 'frontier.json'); + return new Map(Object.entries(envelope.frontier)); + } + + return null; +} + +/** + * @typedef {Object} StalenessResult + * @property {boolean} stale - Whether the index is stale + * @property {string} reason - Human-readable summary + * @property {string[]} advancedWriters - Writers whose tips changed + * @property {string[]} newWriters - Writers not in index frontier + * @property {string[]} removedWriters - Writers in index but not current + */ + +/** @private */ +function buildReason({ stale, advancedWriters, newWriters, removedWriters }) { + if (!stale) { + return 'index is current'; + } + const parts = []; + if (advancedWriters.length > 0) { + parts.push(`${advancedWriters.length} writer(s) advanced`); + } + if (newWriters.length > 0) { + parts.push(`${newWriters.length} new writer(s)`); + } + if (removedWriters.length > 0) { + parts.push(`${removedWriters.length} writer(s) removed`); + } + return parts.join(', '); +} + +/** + * Compares index frontier against current frontier to detect staleness. + * + * @param {Map} indexFrontier - Frontier stored in the index + * @param {Map} currentFrontier - Current frontier from refs + * @returns {StalenessResult} + */ +export function checkStaleness(indexFrontier, currentFrontier) { + const advancedWriters = []; + const newWriters = []; + const removedWriters = []; + + for (const [writerId, tipSha] of currentFrontier) { + const indexTip = indexFrontier.get(writerId); + if (indexTip === undefined) { + newWriters.push(writerId); + } else if (indexTip !== tipSha) { + advancedWriters.push(writerId); + } + } + + for (const writerId of indexFrontier.keys()) { + if (!currentFrontier.has(writerId)) { + removedWriters.push(writerId); + } + } + + const stale = advancedWriters.length > 0 || newWriters.length > 0 || removedWriters.length > 0; + const reason = buildReason({ stale, advancedWriters, newWriters, removedWriters }); + + return { stale, reason, advancedWriters, newWriters, removedWriters }; +} diff --git a/src/domain/services/StreamingBitmapIndexBuilder.js b/src/domain/services/StreamingBitmapIndexBuilder.js index 36ac743..c1b57f9 100644 --- a/src/domain/services/StreamingBitmapIndexBuilder.js +++ b/src/domain/services/StreamingBitmapIndexBuilder.js @@ -5,6 +5,25 @@ import ShardValidationError from '../errors/ShardValidationError.js'; import NoOpLogger from '../../infrastructure/adapters/NoOpLogger.js'; import { checkAborted } from '../utils/cancellation.js'; import { getRoaringBitmap32 } from '../utils/roaring.js'; +import { encode as cborEncode } from '../../infrastructure/codecs/CborCodec.js'; + +/** + * Produces canonical JSON with lexicographically sorted keys at all levels. + * @param {*} value - Value to serialize + * @returns {string} Canonical JSON string + */ +function canonicalJson(value) { + return JSON.stringify(value, (_key, val) => { + if (val && typeof val === 'object' && !Array.isArray(val)) { + const sorted = {}; + for (const k of Object.keys(val).sort()) { + sorted[k] = val[k]; + } + return sorted; + } + return val; + }); +} /** * Current shard format version. @@ -332,7 +351,7 @@ export default class StreamingBitmapIndexBuilder { * @param {AbortSignal} [options.signal] - Optional AbortSignal for cancellation * @returns {Promise} OID of the created tree containing the index */ - async finalize({ signal } = {}) { + async finalize({ signal, frontier } = {}) { this.logger.debug('Finalizing index', { operation: 'finalize', nodeCount: this.shaToId.size, @@ -350,6 +369,20 @@ export default class StreamingBitmapIndexBuilder { checkAborted(signal, 'finalize'); const bitmapEntries = await this._processBitmapShards({ signal }); const flatEntries = [...metaEntries, ...bitmapEntries]; + + // Store frontier metadata for staleness detection + if (frontier) { + const sorted = {}; + for (const key of Array.from(frontier.keys()).sort()) { + sorted[key] = frontier.get(key); + } + const envelope = { version: 1, writerCount: frontier.size, frontier: sorted }; + const cborOid = await this.storage.writeBlob(Buffer.from(cborEncode(envelope))); + flatEntries.push(`100644 blob ${cborOid}\tfrontier.cbor`); + const jsonOid = await this.storage.writeBlob(Buffer.from(canonicalJson(envelope))); + flatEntries.push(`100644 blob ${jsonOid}\tfrontier.json`); + } + const treeOid = await this.storage.writeTree(flatEntries); this.logger.debug('Index finalized', { diff --git a/test/unit/domain/WarpGraph.autoGC.test.js b/test/unit/domain/WarpGraph.autoGC.test.js new file mode 100644 index 0000000..d99a122 --- /dev/null +++ b/test/unit/domain/WarpGraph.autoGC.test.js @@ -0,0 +1,228 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import WarpGraph from '../../../src/domain/WarpGraph.js'; +import { createEmptyStateV5 } from '../../../src/domain/services/JoinReducer.js'; +import { orsetAdd } from '../../../src/domain/crdt/ORSet.js'; +import { createVersionVector } from '../../../src/domain/crdt/VersionVector.js'; + +/** + * GK/GC/1 — Wire GC into post-materialize (opt-in, warn-by-default). + * + * After materialize, check GC metrics. Warn by default. Execute only + * when gcPolicy.enabled === true. + */ + +function createMockPersistence() { + return { + readRef: vi.fn().mockResolvedValue(null), + showNode: vi.fn(), + writeBlob: vi.fn(), + writeTree: vi.fn(), + readBlob: vi.fn(), + readTreeOids: vi.fn(), + commitNode: vi.fn(), + commitNodeWithTree: vi.fn(), + updateRef: vi.fn(), + listRefs: vi.fn().mockResolvedValue([]), + getNodeInfo: vi.fn(), + ping: vi.fn().mockResolvedValue({ ok: true, latencyMs: 1 }), + configGet: vi.fn().mockResolvedValue(null), + configSet: vi.fn().mockResolvedValue(undefined), + }; +} + +function createMockLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + child: vi.fn().mockReturnThis(), + }; +} + +/** + * Create a state with lots of tombstones to trigger GC thresholds. + * Adds nodes and then removes them, leaving tombstone entries. + */ +function createHighTombstoneState() { + const state = createEmptyStateV5(); + const vv = createVersionVector(); + + // Add many nodes then tombstone them to create high tombstone ratio + for (let i = 0; i < 100; i++) { + const dot = `writer-1:${i + 1}`; + orsetAdd(state.nodeAlive, `node-${i}`, dot); + } + // Remove them all (add tombstones for each dot) + for (let i = 0; i < 100; i++) { + const dot = `writer-1:${i + 1}`; + state.nodeAlive.tombstones.add(dot); + } + + state.observedFrontier = vv; + return state; +} + +describe('WarpGraph auto-GC after materialize (GK/GC/1)', () => { + let persistence; + + beforeEach(() => { + persistence = createMockPersistence(); + }); + + it('default gcPolicy (enabled: false) + high tombstones → warning logged, no GC', async () => { + const logger = createMockLogger(); + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + logger, + gcPolicy: { + tombstoneRatioThreshold: 0.01, // Very low threshold to trigger + minPatchesSinceCompaction: 0, + maxTimeSinceCompaction: 0, + entryCountThreshold: 0, + }, + }); + + // Manually set state with high tombstone ratio + await graph.materialize(); + + // Now inject a high-tombstone state and re-materialize + graph._cachedState = createHighTombstoneState(); + graph._stateDirty = false; + + // Call materialize — since no writers exist, it'll reduce to empty state + // but _maybeRunGC runs on the fresh state. Let's trigger it directly. + // Better approach: test _maybeRunGC directly with injected state + graph._maybeRunGC(createHighTombstoneState()); + + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('auto-GC is disabled'), + expect.objectContaining({ reasons: expect.any(Array) }), + ); + expect(logger.info).not.toHaveBeenCalled(); + }); + + it('gcPolicy: { enabled: true } + high tombstones → GC executed, logger.info', async () => { + const logger = createMockLogger(); + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + logger, + gcPolicy: { + enabled: true, + tombstoneRatioThreshold: 0.01, + minPatchesSinceCompaction: 0, + maxTimeSinceCompaction: 0, + entryCountThreshold: 0, + }, + }); + + await graph.materialize(); + graph._maybeRunGC(createHighTombstoneState()); + + expect(logger.info).toHaveBeenCalledWith( + 'Auto-GC completed', + expect.objectContaining({ + tombstonesRemoved: expect.any(Number), + reasons: expect.any(Array), + }), + ); + expect(logger.warn).not.toHaveBeenCalled(); + }); + + it('low tombstones → no warning, no GC', async () => { + const logger = createMockLogger(); + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + logger, + }); + + // Set recent GC time so time-since-compaction doesn't trigger + graph._lastGCTime = Date.now(); + graph._patchesSinceGC = 0; + + await graph.materialize(); + logger.warn.mockClear(); + logger.info.mockClear(); + + // Empty state → no tombstones → no GC needed + graph._maybeRunGC(createEmptyStateV5()); + + expect(logger.warn).not.toHaveBeenCalled(); + expect(logger.info).not.toHaveBeenCalled(); + }); + + it('GC throws → materialize still succeeds', async () => { + const logger = createMockLogger(); + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + logger, + gcPolicy: { + enabled: true, + tombstoneRatioThreshold: 0.01, + minPatchesSinceCompaction: 0, + maxTimeSinceCompaction: 0, + entryCountThreshold: 0, + }, + }); + + await graph.materialize(); + + // Create a broken state that will cause GC to throw + const badState = { nodeAlive: null, edgeAlive: null }; + + // Should not throw despite internal error + expect(() => graph._maybeRunGC(badState)).not.toThrow(); + }); + + it('_lastGCTime and _patchesSinceGC reset after GC', async () => { + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + logger: createMockLogger(), + gcPolicy: { + enabled: true, + tombstoneRatioThreshold: 0.01, + minPatchesSinceCompaction: 0, + maxTimeSinceCompaction: 0, + entryCountThreshold: 0, + }, + }); + + graph._patchesSinceGC = 999; + graph._lastGCTime = 0; + + await graph.materialize(); + graph._maybeRunGC(createHighTombstoneState()); + + expect(graph._patchesSinceGC).toBe(0); + expect(graph._lastGCTime).toBeGreaterThan(0); + }); + + it('no logger provided → no crash', async () => { + const graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + gcPolicy: { + tombstoneRatioThreshold: 0.01, + minPatchesSinceCompaction: 0, + maxTimeSinceCompaction: 0, + entryCountThreshold: 0, + }, + }); + + await graph.materialize(); + + // No logger → should still work without crashing + expect(() => graph._maybeRunGC(createHighTombstoneState())).not.toThrow(); + }); +}); diff --git a/test/unit/domain/WarpGraph.frontierChanged.test.js b/test/unit/domain/WarpGraph.frontierChanged.test.js new file mode 100644 index 0000000..6b163eb --- /dev/null +++ b/test/unit/domain/WarpGraph.frontierChanged.test.js @@ -0,0 +1,211 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import WarpGraph from '../../../src/domain/WarpGraph.js'; +import { encodePatchMessage } from '../../../src/domain/services/WarpMessageCodec.js'; +import { encode as cborEncode } from '../../../src/infrastructure/codecs/CborCodec.js'; + +/** + * GK/FRONTIER/1 — hasFrontierChanged() + * + * O(writers) method comparing stored writer tip SHAs against current refs. + * Cheap "has anything changed?" check without materialization. + */ + +const FAKE_BLOB_OID = 'a'.repeat(40); +const FAKE_COMMIT_SHA = 'c'.repeat(40); +const FAKE_COMMIT_SHA_2 = 'd'.repeat(40); + +/** CBOR-encoded empty V5 patch with required context field */ +const EMPTY_PATCH_CBOR = Buffer.from(cborEncode({ schema: 2, ops: [], context: {} })); + +function createMockPersistence() { + return { + readRef: vi.fn(), + showNode: vi.fn(), + writeBlob: vi.fn(), + writeTree: vi.fn(), + readBlob: vi.fn(), + readTreeOids: vi.fn(), + commitNode: vi.fn(), + commitNodeWithTree: vi.fn(), + updateRef: vi.fn(), + listRefs: vi.fn().mockResolvedValue([]), + getNodeInfo: vi.fn(), + ping: vi.fn().mockResolvedValue({ ok: true, latencyMs: 1 }), + configGet: vi.fn().mockResolvedValue(null), + configSet: vi.fn().mockResolvedValue(undefined), + }; +} + +/** Configure mocks for a single writer with one patch */ +function mockSingleWriter(persistence, { writerRef, commitSha, patchMessage }) { + persistence.listRefs.mockResolvedValue([writerRef]); + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef) return Promise.resolve(commitSha); + return Promise.resolve(null); + }); + persistence.getNodeInfo.mockResolvedValue({ + sha: commitSha, + message: patchMessage, + parents: [], + }); + persistence.readBlob.mockResolvedValue(EMPTY_PATCH_CBOR); + persistence.showNode.mockResolvedValue(patchMessage); +} + +describe('WarpGraph.hasFrontierChanged() (GK/FRONTIER/1)', () => { + let persistence; + let graph; + + beforeEach(async () => { + persistence = createMockPersistence(); + graph = await WarpGraph.open({ + persistence, + graphName: 'test', + writerId: 'writer-1', + }); + }); + + it('returns true when never materialized', async () => { + expect(await graph.hasFrontierChanged()).toBe(true); + }); + + it('returns false after materialize with no changes', async () => { + persistence.listRefs.mockResolvedValue([]); + await graph.materialize(); + expect(await graph.hasFrontierChanged()).toBe(false); + }); + + it('returns false after materialize with existing writer and no changes', async () => { + const writerRef = 'refs/empty-graph/test/writers/writer-1'; + const patchMessage = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + + mockSingleWriter(persistence, { writerRef, commitSha: FAKE_COMMIT_SHA, patchMessage }); + await graph.materialize(); + + expect(await graph.hasFrontierChanged()).toBe(false); + }); + + it('returns true when writer tip SHA changes', async () => { + const writerRef = 'refs/empty-graph/test/writers/writer-1'; + const patchMessage = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + + mockSingleWriter(persistence, { writerRef, commitSha: FAKE_COMMIT_SHA, patchMessage }); + await graph.materialize(); + + // Writer tip advances + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef) return Promise.resolve(FAKE_COMMIT_SHA_2); + return Promise.resolve(null); + }); + + expect(await graph.hasFrontierChanged()).toBe(true); + }); + + it('returns true when new writer appears', async () => { + const writerRef1 = 'refs/empty-graph/test/writers/writer-1'; + const patchMessage = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + + mockSingleWriter(persistence, { writerRef: writerRef1, commitSha: FAKE_COMMIT_SHA, patchMessage }); + await graph.materialize(); + + // Second writer appears + const writerRef2 = 'refs/empty-graph/test/writers/writer-2'; + persistence.listRefs.mockResolvedValue([writerRef1, writerRef2]); + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef1) return Promise.resolve(FAKE_COMMIT_SHA); + if (ref === writerRef2) return Promise.resolve(FAKE_COMMIT_SHA_2); + return Promise.resolve(null); + }); + + expect(await graph.hasFrontierChanged()).toBe(true); + }); + + it('returns true when writer removed', async () => { + const writerRef1 = 'refs/empty-graph/test/writers/writer-1'; + const writerRef2 = 'refs/empty-graph/test/writers/writer-2'; + const patchMessage1 = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + const patchMessage2 = encodePatchMessage({ + graph: 'test', writer: 'writer-2', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + + persistence.listRefs.mockResolvedValue([writerRef1, writerRef2]); + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef1) return Promise.resolve(FAKE_COMMIT_SHA); + if (ref === writerRef2) return Promise.resolve(FAKE_COMMIT_SHA_2); + return Promise.resolve(null); + }); + persistence.getNodeInfo.mockImplementation((sha) => { + if (sha === FAKE_COMMIT_SHA) { + return Promise.resolve({ sha, message: patchMessage1, parents: [] }); + } + return Promise.resolve({ sha, message: patchMessage2, parents: [] }); + }); + persistence.readBlob.mockResolvedValue(EMPTY_PATCH_CBOR); + persistence.showNode.mockImplementation((sha) => { + if (sha === FAKE_COMMIT_SHA) return Promise.resolve(patchMessage1); + return Promise.resolve(patchMessage2); + }); + + await graph.materialize(); + + // Only writer-1 remains + persistence.listRefs.mockResolvedValue([writerRef1]); + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef1) return Promise.resolve(FAKE_COMMIT_SHA); + return Promise.resolve(null); + }); + + expect(await graph.hasFrontierChanged()).toBe(true); + }); + + it('returns false after re-materialize incorporates changes', async () => { + const writerRef = 'refs/empty-graph/test/writers/writer-1'; + const patchMessage = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 1, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + + mockSingleWriter(persistence, { writerRef, commitSha: FAKE_COMMIT_SHA, patchMessage }); + await graph.materialize(); + + // Tip advances + const patchMessage2 = encodePatchMessage({ + graph: 'test', writer: 'writer-1', lamport: 2, + patchOid: FAKE_BLOB_OID, schema: 2, + }); + persistence.readRef.mockImplementation((ref) => { + if (ref === writerRef) return Promise.resolve(FAKE_COMMIT_SHA_2); + return Promise.resolve(null); + }); + persistence.getNodeInfo.mockImplementation((sha) => { + if (sha === FAKE_COMMIT_SHA_2) { + return Promise.resolve({ sha, message: patchMessage2, parents: [FAKE_COMMIT_SHA] }); + } + return Promise.resolve({ sha, message: patchMessage, parents: [] }); + }); + persistence.showNode.mockImplementation((sha) => { + if (sha === FAKE_COMMIT_SHA_2) return Promise.resolve(patchMessage2); + return Promise.resolve(patchMessage); + }); + + expect(await graph.hasFrontierChanged()).toBe(true); + + // Re-materialize + await graph.materialize(); + + expect(await graph.hasFrontierChanged()).toBe(false); + }); +}); diff --git a/test/unit/domain/services/BitmapIndexBuilder.frontier.test.js b/test/unit/domain/services/BitmapIndexBuilder.frontier.test.js new file mode 100644 index 0000000..3e65f48 --- /dev/null +++ b/test/unit/domain/services/BitmapIndexBuilder.frontier.test.js @@ -0,0 +1,127 @@ +import { describe, it, expect } from 'vitest'; +import BitmapIndexBuilder from '../../../../src/domain/services/BitmapIndexBuilder.js'; +import { decode as cborDecode } from '../../../../src/infrastructure/codecs/CborCodec.js'; + +/** + * GK/IDX/1 — Store frontier in index metadata at build time. + * + * frontier.cbor (authoritative) and frontier.json (debug) are added + * to the bitmap index tree when a frontier Map is provided to serialize(). + */ + +describe('BitmapIndexBuilder frontier metadata (GK/IDX/1)', () => { + it('no frontier option → no frontier files in output', () => { + const builder = new BitmapIndexBuilder(); + builder.registerNode('aabbcc'); + + const tree = builder.serialize(); + + expect(tree['frontier.cbor']).toBeUndefined(); + expect(tree['frontier.json']).toBeUndefined(); + }); + + it('serialize() without options → no frontier files', () => { + const builder = new BitmapIndexBuilder(); + builder.registerNode('aabbcc'); + + const tree = builder.serialize({}); + + expect(tree['frontier.cbor']).toBeUndefined(); + expect(tree['frontier.json']).toBeUndefined(); + }); + + it('with frontier → frontier.cbor and frontier.json present', () => { + const builder = new BitmapIndexBuilder(); + builder.registerNode('aabbcc'); + + const frontier = new Map([ + ['writer-a', 'sha-aaa'], + ['writer-b', 'sha-bbb'], + ]); + + const tree = builder.serialize({ frontier }); + + expect(tree['frontier.cbor']).toBeInstanceOf(Buffer); + expect(tree['frontier.json']).toBeInstanceOf(Buffer); + }); + + it('CBOR roundtrip: decode → verify envelope structure', () => { + const builder = new BitmapIndexBuilder(); + const frontier = new Map([ + ['writer-b', 'sha-bbb'], + ['writer-a', 'sha-aaa'], + ]); + + const tree = builder.serialize({ frontier }); + const envelope = cborDecode(tree['frontier.cbor']); + + expect(envelope.version).toBe(1); + expect(envelope.writerCount).toBe(2); + expect(envelope.frontier).toEqual({ + 'writer-a': 'sha-aaa', + 'writer-b': 'sha-bbb', + }); + }); + + it('JSON matches CBOR content', () => { + const builder = new BitmapIndexBuilder(); + const frontier = new Map([ + ['writer-b', 'sha-bbb'], + ['writer-a', 'sha-aaa'], + ]); + + const tree = builder.serialize({ frontier }); + const cborEnvelope = cborDecode(tree['frontier.cbor']); + const jsonEnvelope = JSON.parse(tree['frontier.json'].toString('utf-8')); + + expect(jsonEnvelope).toEqual(cborEnvelope); + }); + + it('frontier keys are sorted in output', () => { + const builder = new BitmapIndexBuilder(); + const frontier = new Map([ + ['zulu', 'sha-z'], + ['alpha', 'sha-a'], + ['mike', 'sha-m'], + ]); + + const tree = builder.serialize({ frontier }); + const envelope = JSON.parse(tree['frontier.json'].toString('utf-8')); + const keys = Object.keys(envelope.frontier); + + expect(keys).toEqual(['alpha', 'mike', 'zulu']); + }); + + it('empty frontier → writerCount: 0, frontier: {}', () => { + const builder = new BitmapIndexBuilder(); + const frontier = new Map(); + + const tree = builder.serialize({ frontier }); + const envelope = cborDecode(tree['frontier.cbor']); + + expect(envelope.version).toBe(1); + expect(envelope.writerCount).toBe(0); + expect(envelope.frontier).toEqual({}); + }); + + it('existing shards unaffected by frontier addition', () => { + const builder = new BitmapIndexBuilder(); + builder.registerNode('aabbcc'); + builder.addEdge('aabbcc', 'aaddee'); + + const treeWithout = builder.serialize(); + const treeWith = builder.serialize({ + frontier: new Map([['w1', 'sha1']]), + }); + + // All non-frontier entries should be identical + for (const key of Object.keys(treeWithout)) { + expect(treeWith[key]).toBeDefined(); + expect(treeWith[key].equals(treeWithout[key])).toBe(true); + } + + // Frontier files are extra + expect(treeWith['frontier.cbor']).toBeDefined(); + expect(treeWith['frontier.json']).toBeDefined(); + }); +}); diff --git a/test/unit/domain/services/IndexStalenessChecker.test.js b/test/unit/domain/services/IndexStalenessChecker.test.js new file mode 100644 index 0000000..1adb93f --- /dev/null +++ b/test/unit/domain/services/IndexStalenessChecker.test.js @@ -0,0 +1,224 @@ +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { loadIndexFrontier, checkStaleness } from '../../../../src/domain/services/IndexStalenessChecker.js'; +import { encode as cborEncode } from '../../../../src/infrastructure/codecs/CborCodec.js'; +import IndexRebuildService from '../../../../src/domain/services/IndexRebuildService.js'; + +/** + * GK/IDX/2 — Detect and report index staleness on load. + */ + +describe('loadIndexFrontier', () => { + it('with CBOR present → correct Map', async () => { + const envelope = { version: 1, writerCount: 2, frontier: { alice: 'sha-a', bob: 'sha-b' } }; + const cborBuffer = Buffer.from(cborEncode(envelope)); + const storage = { readBlob: vi.fn().mockResolvedValue(cborBuffer) }; + const shardOids = { 'frontier.cbor': 'cbor-oid' }; + + const result = await loadIndexFrontier(shardOids, storage); + + expect(result).toBeInstanceOf(Map); + expect(result.get('alice')).toBe('sha-a'); + expect(result.get('bob')).toBe('sha-b'); + expect(result.size).toBe(2); + }); + + it('with JSON fallback → correct Map', async () => { + const envelope = { version: 1, writerCount: 1, frontier: { alice: 'sha-a' } }; + const jsonBuffer = Buffer.from(JSON.stringify(envelope)); + const storage = { readBlob: vi.fn().mockResolvedValue(jsonBuffer) }; + const shardOids = { 'frontier.json': 'json-oid' }; + + const result = await loadIndexFrontier(shardOids, storage); + + expect(result).toBeInstanceOf(Map); + expect(result.get('alice')).toBe('sha-a'); + }); + + it('with neither → null', async () => { + const storage = { readBlob: vi.fn() }; + const result = await loadIndexFrontier({}, storage); + expect(result).toBeNull(); + }); +}); + +describe('checkStaleness', () => { + it('identical → stale: false', () => { + const index = new Map([['a', 'sha1'], ['b', 'sha2']]); + const current = new Map([['a', 'sha1'], ['b', 'sha2']]); + + const result = checkStaleness(index, current); + + expect(result.stale).toBe(false); + expect(result.advancedWriters).toEqual([]); + expect(result.newWriters).toEqual([]); + expect(result.removedWriters).toEqual([]); + }); + + it('writer advanced → stale: true, advancedWriters populated', () => { + const index = new Map([['alice', 'sha-old']]); + const current = new Map([['alice', 'sha-new']]); + + const result = checkStaleness(index, current); + + expect(result.stale).toBe(true); + expect(result.advancedWriters).toEqual(['alice']); + }); + + it('new writer → newWriters populated', () => { + const index = new Map([['alice', 'sha-a']]); + const current = new Map([['alice', 'sha-a'], ['bob', 'sha-b']]); + + const result = checkStaleness(index, current); + + expect(result.stale).toBe(true); + expect(result.newWriters).toEqual(['bob']); + }); + + it('writer removed → removedWriters populated', () => { + const index = new Map([['alice', 'sha-a'], ['bob', 'sha-b']]); + const current = new Map([['alice', 'sha-a']]); + + const result = checkStaleness(index, current); + + expect(result.stale).toBe(true); + expect(result.removedWriters).toEqual(['bob']); + }); + + it('all changes combined', () => { + const index = new Map([['alice', 'sha-old'], ['charlie', 'sha-c']]); + const current = new Map([['alice', 'sha-new'], ['bob', 'sha-b']]); + + const result = checkStaleness(index, current); + + expect(result.stale).toBe(true); + expect(result.advancedWriters).toEqual(['alice']); + expect(result.newWriters).toEqual(['bob']); + expect(result.removedWriters).toEqual(['charlie']); + }); + + it('reason string describes changes', () => { + const index = new Map([['a', 'old']]); + const current = new Map([['a', 'new'], ['b', 'sha-b']]); + + const result = checkStaleness(index, current); + + expect(result.reason).toContain('1 writer(s) advanced'); + expect(result.reason).toContain('1 new writer(s)'); + }); +}); + +describe('IndexRebuildService.load() staleness integration', () => { + let storage; + let logger; + let graphService; + + beforeEach(() => { + storage = { + readTreeOids: vi.fn(), + readBlob: vi.fn(), + writeBlob: vi.fn(), + writeTree: vi.fn(), + }; + logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + child: vi.fn().mockReturnThis(), + }; + graphService = { + iterateNodes: vi.fn(), + }; + }); + + it('logs warning on stale index', async () => { + const envelope = { version: 1, writerCount: 1, frontier: { alice: 'sha-old' } }; + const cborBuffer = Buffer.from(cborEncode(envelope)); + + storage.readTreeOids.mockResolvedValue({ + 'meta_aa.json': 'meta-oid', + 'frontier.cbor': 'frontier-oid', + }); + storage.readBlob.mockResolvedValue(cborBuffer); + + const service = new IndexRebuildService({ graphService, storage, logger }); + const currentFrontier = new Map([['alice', 'sha-new']]); + + await service.load('tree-oid', { currentFrontier }); + + expect(logger.warn).toHaveBeenCalledWith( + 'Index is stale', + expect.objectContaining({ reason: expect.stringContaining('1 writer(s) advanced') }), + ); + }); + + it('no warning on current index', async () => { + const envelope = { version: 1, writerCount: 1, frontier: { alice: 'sha-a' } }; + const cborBuffer = Buffer.from(cborEncode(envelope)); + + storage.readTreeOids.mockResolvedValue({ + 'meta_aa.json': 'meta-oid', + 'frontier.cbor': 'frontier-oid', + }); + storage.readBlob.mockResolvedValue(cborBuffer); + + const service = new IndexRebuildService({ graphService, storage, logger }); + const currentFrontier = new Map([['alice', 'sha-a']]); + + await service.load('tree-oid', { currentFrontier }); + + expect(logger.warn).not.toHaveBeenCalled(); + }); + + it('no frontier (legacy) → debug log, no warning', async () => { + storage.readTreeOids.mockResolvedValue({ + 'meta_aa.json': 'meta-oid', + }); + + const service = new IndexRebuildService({ graphService, storage, logger }); + const currentFrontier = new Map([['alice', 'sha-a']]); + + await service.load('tree-oid', { currentFrontier }); + + expect(logger.warn).not.toHaveBeenCalled(); + expect(logger.debug).toHaveBeenCalledWith( + expect.stringContaining('legacy'), + expect.any(Object), + ); + }); + + it('autoRebuild: true triggers rebuild on stale index', async () => { + const envelope = { version: 1, writerCount: 1, frontier: { alice: 'sha-old' } }; + const cborBuffer = Buffer.from(cborEncode(envelope)); + + // First call: stale index + storage.readTreeOids.mockResolvedValueOnce({ + 'meta_aa.json': 'meta-oid', + 'frontier.cbor': 'frontier-oid', + }); + storage.readBlob.mockResolvedValueOnce(cborBuffer); + + // rebuild() returns new tree OID + // Second call: rebuilt index (no frontier = no staleness check) + storage.readTreeOids.mockResolvedValueOnce({ + 'meta_aa.json': 'new-meta-oid', + }); + + const currentFrontier = new Map([['alice', 'sha-new']]); + + // Mock graphService.iterateNodes to yield nothing (empty graph) + graphService.iterateNodes = function* () { /* empty */ }; + + const service = new IndexRebuildService({ graphService, storage, logger }); + + const reader = await service.load('tree-oid', { + currentFrontier, + autoRebuild: true, + rebuildRef: 'HEAD', + }); + + expect(reader).toBeDefined(); + // The warn should have been called for the stale detection + expect(logger.warn).toHaveBeenCalled(); + }); +});