Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 84 additions & 61 deletions ROADMAP.md

Large diffs are not rendered by default.

19 changes: 17 additions & 2 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ export class BitmapIndexBuilder {
addEdge(srcSha: string, tgtSha: string): void;

/** Serializes the index to a tree structure of buffers */
serialize(): Record<string, Buffer>;
serialize(options?: { frontier?: Map<string, string> }): Record<string, Buffer>;
}

/**
Expand Down Expand Up @@ -888,7 +888,16 @@ export default class WarpGraph {
writerId: string;
logger?: LoggerPort;
adjacencyCacheSize?: number;
gcPolicy?: { type: string; [key: string]: unknown };
gcPolicy?: {
enabled?: boolean;
tombstoneRatioThreshold?: number;
entryCountThreshold?: number;
minPatchesSinceCompaction?: number;
maxTimeSinceCompaction?: number;
compactOnCheckpoint?: boolean;
};
checkpointPolicy?: { every: number };
autoMaterialize?: boolean;
}): Promise<WarpGraph>;

/**
Expand Down Expand Up @@ -953,6 +962,12 @@ export default class WarpGraph {
*/
getFrontier(): Promise<Map<string, string>>;

/**
* Checks whether any writer tip has changed since the last materialize.
* O(writers) comparison — cheap "has anything changed?" check without materialization.
*/
hasFrontierChanged(): Promise<boolean>;

/**
* Creates a checkpoint snapshot of the current materialized state.
*/
Expand Down
85 changes: 82 additions & 3 deletions src/domain/WarpGraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,9 @@ export default class WarpGraph {
* @param {number} [options.adjacencyCacheSize] - Max materialized adjacency cache entries
* @param {{every: number}} [options.checkpointPolicy] - Auto-checkpoint policy; creates a checkpoint every N patches
* @param {boolean} [options.autoMaterialize=false] - If true, query methods auto-materialize instead of throwing
* @param {import('../ports/LoggerPort.js').default} [options.logger] - Logger for structured logging
*/
constructor({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize = DEFAULT_ADJACENCY_CACHE_SIZE, checkpointPolicy, autoMaterialize = false }) {
constructor({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize = DEFAULT_ADJACENCY_CACHE_SIZE, checkpointPolicy, autoMaterialize = false, logger }) {
/** @type {import('../ports/GraphPersistencePort.js').default} */
this._persistence = persistence;

Expand Down Expand Up @@ -141,6 +142,12 @@ export default class WarpGraph {

/** @type {import('./utils/LRUCache.js').default|null} */
this._adjacencyCache = adjacencyCacheSize > 0 ? new LRUCache(adjacencyCacheSize) : null;

/** @type {Map<string, string>|null} */
this._lastFrontier = null;

/** @type {import('../ports/LoggerPort.js').default|null} */
this._logger = logger || null;
}

/**
Expand All @@ -154,6 +161,7 @@ export default class WarpGraph {
* @param {number} [options.adjacencyCacheSize] - Max materialized adjacency cache entries
* @param {{every: number}} [options.checkpointPolicy] - Auto-checkpoint policy; creates a checkpoint every N patches
* @param {boolean} [options.autoMaterialize] - If true, query methods auto-materialize instead of throwing
* @param {import('../ports/LoggerPort.js').default} [options.logger] - Logger for structured logging
* @returns {Promise<WarpGraph>} The opened graph instance
* @throws {Error} If graphName, writerId, or checkpointPolicy is invalid
*
Expand All @@ -164,7 +172,7 @@ export default class WarpGraph {
* writerId: 'node-1'
* });
*/
static async open({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize, checkpointPolicy, autoMaterialize }) {
static async open({ persistence, graphName, writerId, gcPolicy = {}, adjacencyCacheSize, checkpointPolicy, autoMaterialize, logger }) {
// Validate inputs
validateGraphName(graphName);
validateWriterId(writerId);
Expand All @@ -188,7 +196,7 @@ export default class WarpGraph {
throw new Error('autoMaterialize must be a boolean');
}

const graph = new WarpGraph({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize });
const graph = new WarpGraph({ persistence, graphName, writerId, gcPolicy, adjacencyCacheSize, checkpointPolicy, autoMaterialize, logger });

// Validate migration boundary
await graph._validateMigrationBoundary();
Expand Down Expand Up @@ -503,6 +511,7 @@ export default class WarpGraph {
}

this._setMaterializedState(state);
this._lastFrontier = await this.getFrontier();
this._patchesSinceCheckpoint = patchCount;

// Auto-checkpoint if policy is set and threshold exceeded.
Expand All @@ -516,6 +525,8 @@ export default class WarpGraph {
}
}

this._maybeRunGC(state);

return state;
}

Expand Down Expand Up @@ -1015,6 +1026,46 @@ export default class WarpGraph {
// Garbage Collection
// ============================================================================

/**
* Post-materialize GC check. Warn by default; execute only when enabled.
* GC failure never breaks materialize.
*
* @param {import('./services/JoinReducer.js').WarpStateV5} state
* @private
*/
_maybeRunGC(state) {
try {
const metrics = collectGCMetrics(state);
const inputMetrics = {
...metrics,
patchesSinceCompaction: this._patchesSinceGC,
timeSinceCompaction: Date.now() - this._lastGCTime,
};
const { shouldRun, reasons } = shouldRunGC(inputMetrics, this._gcPolicy);

if (!shouldRun) {
return;
}

if (this._gcPolicy.enabled) {
const appliedVV = computeAppliedVV(state);
const result = executeGC(state, appliedVV);
this._lastGCTime = Date.now();
this._patchesSinceGC = 0;
if (this._logger) {
this._logger.info('Auto-GC completed', { ...result, reasons });
}
} else if (this._logger) {
this._logger.warn(
'GC thresholds exceeded but auto-GC is disabled. Set gcPolicy: { enabled: true } to auto-compact.',
{ reasons },
);
}
} catch {
// GC failure never breaks materialize
}
}

/**
* Checks if GC should run based on current metrics and policy.
* If thresholds are exceeded, runs GC on the cached state.
Expand Down Expand Up @@ -1133,6 +1184,34 @@ export default class WarpGraph {
return frontier;
}

/**
* Checks whether any writer tip has changed since the last materialize.
*
* O(writers) comparison of stored writer tip SHAs against current refs.
* Cheap "has anything changed?" check without materialization.
*
* @returns {Promise<boolean>} True if frontier has changed (or never materialized)
*/
async hasFrontierChanged() {
if (this._lastFrontier === null) {
return true;
}

const current = await this.getFrontier();

if (current.size !== this._lastFrontier.size) {
return true;
}

for (const [writerId, tipSha] of current) {
if (this._lastFrontier.get(writerId) !== tipSha) {
return true;
}
}

return false;
}

/**
* Creates a sync request to send to a remote peer.
* The request contains the local frontier for comparison.
Expand Down
44 changes: 32 additions & 12 deletions src/domain/services/BitmapIndexBuilder.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { createHash } from 'crypto';
import { getRoaringBitmap32, getNativeRoaringAvailable } from '../utils/roaring.js';
import { encode as cborEncode } from '../../infrastructure/codecs/CborCodec.js';

/**
* Shard format version for forward compatibility.
Expand Down Expand Up @@ -51,6 +52,32 @@ const ensureRoaringBitmap32 = () => {
return RoaringBitmap32;
};

/**
* Wraps data in a version/checksum envelope.
* @param {Object} data - The data to wrap
* @returns {Object} Envelope with version, checksum, and data
*/
const wrapShard = (data) => ({
version: SHARD_VERSION,
checksum: computeChecksum(data),
data,
});

/**
* Serializes a frontier Map into CBOR and JSON blobs in the given tree.
* @param {Map<string, string>} frontier - Writer→tip SHA map
* @param {Record<string, Buffer>} tree - Target tree to add entries to
*/
function serializeFrontierToTree(frontier, tree) {
const sorted = {};
for (const key of Array.from(frontier.keys()).sort()) {
sorted[key] = frontier.get(key);
}
const envelope = { version: 1, writerCount: frontier.size, frontier: sorted };
tree['frontier.cbor'] = Buffer.from(cborEncode(envelope));
tree['frontier.json'] = Buffer.from(canonicalStringify(envelope));
}

/**
* Builder for constructing bitmap indexes in memory.
*
Expand Down Expand Up @@ -125,20 +152,9 @@ export default class BitmapIndexBuilder {
*
* @returns {Record<string, Buffer>} Map of path → serialized content
*/
serialize() {
serialize({ frontier } = {}) {
const tree = {};

/**
* Wraps data in a version/checksum envelope.
* @param {Object} data - The data to wrap
* @returns {Object} Envelope with version, checksum, and data
*/
const wrapShard = (data) => ({
version: SHARD_VERSION,
checksum: computeChecksum(data),
data,
});

// Serialize ID mappings (sharded by prefix)
const idShards = {};
for (const [sha, id] of this.shaToId) {
Expand Down Expand Up @@ -172,6 +188,10 @@ export default class BitmapIndexBuilder {
}
}

if (frontier) {
serializeFrontierToTree(frontier, tree);
}

return tree;
}

Expand Down
2 changes: 2 additions & 0 deletions src/domain/services/GCPolicy.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { collectGCMetrics } from './GCMetrics.js';

/**
* @typedef {Object} GCPolicy
* @property {boolean} enabled - Whether automatic GC is enabled (default: false)
* @property {number} tombstoneRatioThreshold - Ratio of tombstones that triggers GC (0.0-1.0)
* @property {number} entryCountThreshold - Total entries that triggers GC
* @property {number} minPatchesSinceCompaction - Minimum patches between GCs
Expand Down Expand Up @@ -38,6 +39,7 @@ import { collectGCMetrics } from './GCMetrics.js';

/** @type {Readonly<GCPolicy>} */
export const DEFAULT_GC_POLICY = Object.freeze({
enabled: false, // Must opt-in to automatic GC
tombstoneRatioThreshold: 0.3, // 30% tombstones triggers GC
entryCountThreshold: 50000, // 50K entries triggers GC
minPatchesSinceCompaction: 1000, // Min patches between GCs
Expand Down
Loading