From ff417dd03e7f3914162f0b65596115b0a79e1f18 Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 17 Feb 2026 04:26:21 -0800 Subject: [PATCH 1/2] fix: extract Lamport scan helpers to resolve lint complexity/nesting Extract scanFrontierForMaxLamport and scanPatchesForMaxLamport as module-private functions in materialize.methods.js, replacing inline scanning loops that pushed materialize() to complexity 38 (max 35) and nesting depth 7 (max 6). No behaviour change: same logic, same _maxObservedLamport update semantics. All Lamport clock monotonicity tests continue to pass. --- src/domain/WarpGraph.js | 3 + src/domain/services/PatchBuilderV2.js | 15 ++- src/domain/warp/materialize.methods.js | 49 ++++++++ src/domain/warp/patch.methods.js | 50 +++++--- .../domain/WarpGraph.noCoordination.test.js | 116 ++++++++++++++++++ 5 files changed, 208 insertions(+), 25 deletions(-) diff --git a/src/domain/WarpGraph.js b/src/domain/WarpGraph.js index 8eff78e6..19eed42c 100644 --- a/src/domain/WarpGraph.js +++ b/src/domain/WarpGraph.js @@ -95,6 +95,9 @@ export default class WarpGraph { /** @type {number} */ this._patchesSinceCheckpoint = 0; + /** @type {number} */ + this._maxObservedLamport = 0; + /** @type {{every: number}|null} */ this._checkpointPolicy = checkpointPolicy || null; diff --git a/src/domain/services/PatchBuilderV2.js b/src/domain/services/PatchBuilderV2.js index c29974a1..9802daa7 100644 --- a/src/domain/services/PatchBuilderV2.js +++ b/src/domain/services/PatchBuilderV2.js @@ -524,19 +524,24 @@ export class PatchBuilderV2 { throw err; } - // 3. Calculate lamport and parent from current ref state - let lamport = 1; + // 3. Calculate lamport and parent from current ref state. + // Start from this._lamport (set by _nextLamport() in createPatch()), which already + // incorporates the globally-observed max Lamport tick via _maxObservedLamport. + // This ensures a first-time writer whose own chain is empty still commits at a tick + // above any previously-observed writer, winning LWW tiebreakers correctly. + let lamport = this._lamport; let parentCommit = null; if (currentRefSha) { - // Read the current patch commit to get its lamport timestamp + // Read the current patch commit to get its lamport timestamp and take the max, + // so the chain stays monotonic even if the ref advanced since createPatch(). const commitMessage = await this._persistence.showNode(currentRefSha); const patchInfo = decodePatchMessage(commitMessage); - lamport = patchInfo.lamport + 1; + lamport = Math.max(this._lamport, patchInfo.lamport + 1); parentCommit = currentRefSha; } - // 3. Build PatchV2 structure with correct lamport + // 4. Build PatchV2 structure with correct lamport // Note: Dots were assigned using constructor lamport, but commit lamport may differ. // For now, we use the calculated lamport for the patch metadata. // The dots themselves are independent of patch lamport (they use VV counters). diff --git a/src/domain/warp/materialize.methods.js b/src/domain/warp/materialize.methods.js index ae2fd74a..83822d12 100644 --- a/src/domain/warp/materialize.methods.js +++ b/src/domain/warp/materialize.methods.js @@ -9,6 +9,46 @@ import { reduceV5, createEmptyStateV5, cloneStateV5 } from '../services/JoinReducer.js'; import { ProvenanceIndex } from '../services/ProvenanceIndex.js'; import { diffStates, isEmptyDiff } from '../services/StateDiff.js'; +import { decodePatchMessage, detectMessageKind } from '../services/WarpMessageCodec.js'; + +/** + * Scans the checkpoint frontier's tip commits for the maximum observed Lamport tick. + * Updates `graph._maxObservedLamport` in-place; best-effort (skips unreadable commits). + * + * @param {import('../WarpGraph.js').default} graph + * @param {Map} frontier + * @returns {Promise} + */ +async function scanFrontierForMaxLamport(graph, frontier) { + for (const tipSha of frontier.values()) { + try { + const msg = await graph._persistence.showNode(tipSha); + if (detectMessageKind(msg) === 'patch') { + const { lamport } = decodePatchMessage(msg); + if (lamport > graph._maxObservedLamport) { + graph._maxObservedLamport = lamport; + } + } + } catch { + // best-effort: skip unreadable frontier commits + } + } +} + +/** + * Scans a list of patch entries for the maximum observed Lamport tick. + * Updates `graph._maxObservedLamport` in-place. + * + * @param {import('../WarpGraph.js').default} graph + * @param {Array<{patch: {lamport?: number}}>} patches + */ +function scanPatchesForMaxLamport(graph, patches) { + for (const { patch } of patches) { + if ((patch.lamport ?? 0) > graph._maxObservedLamport) { + graph._maxObservedLamport = patch.lamport; + } + } +} /** * Materializes the current graph state. @@ -63,6 +103,13 @@ export async function materialize(options) { // If checkpoint exists, use incremental materialization if (checkpoint?.schema === 2 || checkpoint?.schema === 3) { const patches = await this._loadPatchesSince(checkpoint); + // Update max observed Lamport so _nextLamport() issues globally-monotonic ticks. + // Read the checkpoint frontier's tip commit messages to capture the pre-checkpoint max, + // then scan the incremental patches for anything newer. + if (checkpoint.frontier instanceof Map) { + await scanFrontierForMaxLamport(this, checkpoint.frontier); + } + scanPatchesForMaxLamport(this, patches); if (collectReceipts) { const result = /** @type {{state: import('../services/JoinReducer.js').WarpStateV5, receipts: import('../types/TickReceipt.js').TickReceipt[]}} */ (reduceV5(/** @type {any} */ (patches), /** @type {import('../services/JoinReducer.js').WarpStateV5} */ (checkpoint.state), { receipts: true })); // TODO(ts-cleanup): type patch array state = result.state; @@ -109,6 +156,8 @@ export async function materialize(options) { receipts = []; } } else { + // Update max observed Lamport from all loaded patches. + scanPatchesForMaxLamport(this, allPatches); // 5. Reduce all patches to state if (collectReceipts) { const result = /** @type {{state: import('../services/JoinReducer.js').WarpStateV5, receipts: import('../types/TickReceipt.js').TickReceipt[]}} */ (reduceV5(/** @type {any} */ (allPatches), undefined, { receipts: true })); // TODO(ts-cleanup): type patch array diff --git a/src/domain/warp/patch.methods.js b/src/domain/warp/patch.methods.js index 5e4ff929..2602baf7 100644 --- a/src/domain/warp/patch.methods.js +++ b/src/domain/warp/patch.methods.js @@ -89,30 +89,36 @@ export async function _nextLamport() { const writerRef = buildWriterRef(this._graphName, this._writerId); const currentRefSha = await this._persistence.readRef(writerRef); - if (!currentRefSha) { - // First commit for this writer - return { lamport: 1, parentSha: null }; - } + let ownTick = 0; - // Read the current patch commit to get its lamport timestamp - const commitMessage = await this._persistence.showNode(currentRefSha); - const kind = detectMessageKind(commitMessage); + if (currentRefSha) { + // Read the current patch commit to get its lamport timestamp + const commitMessage = await this._persistence.showNode(currentRefSha); + const kind = detectMessageKind(commitMessage); - if (kind !== 'patch') { - // Writer ref doesn't point to a patch commit - treat as first commit - return { lamport: 1, parentSha: currentRefSha }; - } + if (kind !== 'patch') { + // Writer ref doesn't point to a patch commit - treat as fresh start + return { lamport: Math.max(1, this._maxObservedLamport + 1), parentSha: currentRefSha }; + } - try { - const patchInfo = decodePatchMessage(commitMessage); - return { lamport: patchInfo.lamport + 1, parentSha: currentRefSha }; - } catch { - // Malformed message - error with actionable message - throw new Error( - `Failed to parse lamport from writer ref ${writerRef}: ` + - `commit ${currentRefSha} has invalid patch message format` - ); + try { + const patchInfo = decodePatchMessage(commitMessage); + ownTick = patchInfo.lamport; + } catch { + throw new Error( + `Failed to parse lamport from writer ref ${writerRef}: ` + + `commit ${currentRefSha} has invalid patch message format` + ); + } } + + // Standard Lamport clock rule: next tick = max(own chain, globally observed max) + 1. + // _maxObservedLamport is updated during materialize() and after each commit, so this + // is O(1) — no additional git reads required at commit time. + return { + lamport: Math.max(ownTick, this._maxObservedLamport) + 1, + parentSha: currentRefSha ?? null, + }; } /** @@ -193,6 +199,10 @@ export async function getWriterPatches(writerId, stopAtSha = null) { */ export async function _onPatchCommitted(writerId, { patch: committed, sha } = {}) { vvIncrement(this._versionVector, writerId); + // Keep _maxObservedLamport up to date so _nextLamport() issues globally-monotonic ticks. + if (committed?.lamport !== undefined && committed.lamport > this._maxObservedLamport) { + this._maxObservedLamport = committed.lamport; + } this._patchesSinceCheckpoint++; // Eager re-materialize: apply the just-committed patch to cached state // Only when the cache is clean — applying a patch to stale state would be incorrect diff --git a/test/unit/domain/WarpGraph.noCoordination.test.js b/test/unit/domain/WarpGraph.noCoordination.test.js index 8d7a1f15..b8839c44 100644 --- a/test/unit/domain/WarpGraph.noCoordination.test.js +++ b/test/unit/domain/WarpGraph.noCoordination.test.js @@ -139,4 +139,120 @@ describe('No-coordination regression suite', () => { { seed: 4242, numRuns: 8 } ); }, { timeout: 30000 }); + + describe('Lamport clock global-max monotonicity', () => { + it('first-time writer beats existing writer when it materializes first', async () => { + // Regression: when writer B makes its very first commit to a repo where writer A + // has already committed at tick N, B must commit at tick > N so its operations + // win the LWW CRDT tiebreaker — not lose to A's tick-1 commit. + const repo = await createGitRepo('lamport-mono'); + try { + // Writer A seeds a node + const graphA = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'writer-zzz', // alphabetically > writer-aaa so A would win ties + autoMaterialize: true, + }); + const pA = await graphA.createPatch(); + pA.addNode('node:shared') + .setProperty('node:shared', 'value', 'from-A'); + await pA.commit(); + + // Writer B opens a fresh handle, materializes to observe A, then mutates + const graphB = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'writer-aaa', // alphabetically < writer-zzz, would LOSE tick ties + autoMaterialize: true, + }); + await graphB.syncCoverage(); + await graphB.materialize(); // observes A's tick-1 patch → _maxObservedLamport = 1 + + const pB = await graphB.createPatch(); + pB.setProperty('node:shared', 'value', 'from-B'); // must be at tick >= 2 to win + await pB.commit(); + + // B's own state should reflect its own mutation + const propsB = await graphB.getNodeProps('node:shared'); + expect(propsB?.get('value')).toBe('from-B'); + + // A fresh reader that sees both writers must also resolve to B's value + const graphReader = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'reader', + autoMaterialize: true, + }); + await graphReader.syncCoverage(); + await graphReader.materialize(); + const propsReader = await graphReader.getNodeProps('node:shared'); + expect(propsReader?.get('value')).toBe('from-B'); + } finally { + await repo.cleanup(); + } + }, { timeout: 20000 }); + + it('_maxObservedLamport is updated after each commit on the same instance', async () => { + const repo = await createGitRepo('lamport-mono'); + try { + const graph = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'writer-a', + autoMaterialize: true, + }); + + expect(graph._maxObservedLamport).toBe(0); + + const p1 = await graph.createPatch(); + p1.addNode('node:x').setProperty('node:x', 'v', '1'); + await p1.commit(); + + expect(graph._maxObservedLamport).toBe(1); + + const p2 = await graph.createPatch(); + p2.setProperty('node:x', 'v', '2'); + await p2.commit(); + + expect(graph._maxObservedLamport).toBe(2); + } finally { + await repo.cleanup(); + } + }, { timeout: 10000 }); + + it('materialize updates _maxObservedLamport from observed patches', async () => { + const repo = await createGitRepo('lamport-mono'); + try { + // Seed with writer-z at tick 1 + const graphZ = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'writer-z', + autoMaterialize: true, + }); + const p = await graphZ.createPatch(); + p.addNode('node:x').setProperty('node:x', 'v', 'z'); + await p.commit(); + + // Fresh writer-a: before materialize, max is 0 + const graphA = await WarpGraph.open({ + persistence: repo.persistence, + graphName: 'test', + writerId: 'writer-a', + autoMaterialize: true, + }); + expect(graphA._maxObservedLamport).toBe(0); + + await graphA.syncCoverage(); + await graphA.materialize(); + + // After materialize, should have observed tick 1 from writer-z + expect(graphA._maxObservedLamport).toBe(1); + } finally { + await repo.cleanup(); + } + }, { timeout: 10000 }); + }); + }); From f2c71072c437434d91e7c3a340285f6a41f29d48 Mon Sep 17 00:00:00 2001 From: James Ross Date: Tue, 17 Feb 2026 04:36:09 -0800 Subject: [PATCH 2/2] fix: use local tick variable in scanPatchesForMaxLamport to satisfy TS narrowing `patch.lamport` is typed `number | undefined`. The previous assignment `graph._maxObservedLamport = patch.lamport` was guarded by `?? 0` only in the condition, which TypeScript could not narrow through. Extracting `const tick = patch.lamport ?? 0` makes the type `number` at the assignment site and satisfies strict checks with no behaviour change. --- src/domain/warp/materialize.methods.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/domain/warp/materialize.methods.js b/src/domain/warp/materialize.methods.js index 83822d12..cc855066 100644 --- a/src/domain/warp/materialize.methods.js +++ b/src/domain/warp/materialize.methods.js @@ -44,8 +44,9 @@ async function scanFrontierForMaxLamport(graph, frontier) { */ function scanPatchesForMaxLamport(graph, patches) { for (const { patch } of patches) { - if ((patch.lamport ?? 0) > graph._maxObservedLamport) { - graph._maxObservedLamport = patch.lamport; + const tick = patch.lamport ?? 0; + if (tick > graph._maxObservedLamport) { + graph._maxObservedLamport = tick; } } }