Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/domain/WarpGraph.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ export default class WarpGraph {
/** @type {number} */
this._patchesSinceCheckpoint = 0;

/** @type {number} */
this._maxObservedLamport = 0;

/** @type {{every: number}|null} */
this._checkpointPolicy = checkpointPolicy || null;

Expand Down
15 changes: 10 additions & 5 deletions src/domain/services/PatchBuilderV2.js
Original file line number Diff line number Diff line change
Expand Up @@ -524,19 +524,24 @@ export class PatchBuilderV2 {
throw err;
}

// 3. Calculate lamport and parent from current ref state
let lamport = 1;
// 3. Calculate lamport and parent from current ref state.
// Start from this._lamport (set by _nextLamport() in createPatch()), which already
// incorporates the globally-observed max Lamport tick via _maxObservedLamport.
// This ensures a first-time writer whose own chain is empty still commits at a tick
// above any previously-observed writer, winning LWW tiebreakers correctly.
let lamport = this._lamport;
let parentCommit = null;

if (currentRefSha) {
// Read the current patch commit to get its lamport timestamp
// Read the current patch commit to get its lamport timestamp and take the max,
// so the chain stays monotonic even if the ref advanced since createPatch().
const commitMessage = await this._persistence.showNode(currentRefSha);
const patchInfo = decodePatchMessage(commitMessage);
lamport = patchInfo.lamport + 1;
lamport = Math.max(this._lamport, patchInfo.lamport + 1);
parentCommit = currentRefSha;
}

// 3. Build PatchV2 structure with correct lamport
// 4. Build PatchV2 structure with correct lamport
// Note: Dots were assigned using constructor lamport, but commit lamport may differ.
// For now, we use the calculated lamport for the patch metadata.
// The dots themselves are independent of patch lamport (they use VV counters).
Expand Down
50 changes: 50 additions & 0 deletions src/domain/warp/materialize.methods.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,47 @@
import { reduceV5, createEmptyStateV5, cloneStateV5 } from '../services/JoinReducer.js';
import { ProvenanceIndex } from '../services/ProvenanceIndex.js';
import { diffStates, isEmptyDiff } from '../services/StateDiff.js';
import { decodePatchMessage, detectMessageKind } from '../services/WarpMessageCodec.js';

/**
* Scans the checkpoint frontier's tip commits for the maximum observed Lamport tick.
* Updates `graph._maxObservedLamport` in-place; best-effort (skips unreadable commits).
*
* @param {import('../WarpGraph.js').default} graph
* @param {Map<string, string>} frontier
* @returns {Promise<void>}
*/
async function scanFrontierForMaxLamport(graph, frontier) {
for (const tipSha of frontier.values()) {
try {
const msg = await graph._persistence.showNode(tipSha);
if (detectMessageKind(msg) === 'patch') {
const { lamport } = decodePatchMessage(msg);
if (lamport > graph._maxObservedLamport) {
graph._maxObservedLamport = lamport;
}
}
} catch {
// best-effort: skip unreadable frontier commits
}
}
}

/**
* Scans a list of patch entries for the maximum observed Lamport tick.
* Updates `graph._maxObservedLamport` in-place.
*
* @param {import('../WarpGraph.js').default} graph
* @param {Array<{patch: {lamport?: number}}>} patches
*/
function scanPatchesForMaxLamport(graph, patches) {
for (const { patch } of patches) {
const tick = patch.lamport ?? 0;
if (tick > graph._maxObservedLamport) {
graph._maxObservedLamport = tick;
}
}
}

/**
* Materializes the current graph state.
Expand Down Expand Up @@ -63,6 +104,13 @@ export async function materialize(options) {
// If checkpoint exists, use incremental materialization
if (checkpoint?.schema === 2 || checkpoint?.schema === 3) {
const patches = await this._loadPatchesSince(checkpoint);
// Update max observed Lamport so _nextLamport() issues globally-monotonic ticks.
// Read the checkpoint frontier's tip commit messages to capture the pre-checkpoint max,
// then scan the incremental patches for anything newer.
if (checkpoint.frontier instanceof Map) {
await scanFrontierForMaxLamport(this, checkpoint.frontier);
}
scanPatchesForMaxLamport(this, patches);
if (collectReceipts) {
const result = /** @type {{state: import('../services/JoinReducer.js').WarpStateV5, receipts: import('../types/TickReceipt.js').TickReceipt[]}} */ (reduceV5(/** @type {any} */ (patches), /** @type {import('../services/JoinReducer.js').WarpStateV5} */ (checkpoint.state), { receipts: true })); // TODO(ts-cleanup): type patch array
state = result.state;
Expand Down Expand Up @@ -109,6 +157,8 @@ export async function materialize(options) {
receipts = [];
}
} else {
// Update max observed Lamport from all loaded patches.
scanPatchesForMaxLamport(this, allPatches);
// 5. Reduce all patches to state
if (collectReceipts) {
const result = /** @type {{state: import('../services/JoinReducer.js').WarpStateV5, receipts: import('../types/TickReceipt.js').TickReceipt[]}} */ (reduceV5(/** @type {any} */ (allPatches), undefined, { receipts: true })); // TODO(ts-cleanup): type patch array
Expand Down
50 changes: 30 additions & 20 deletions src/domain/warp/patch.methods.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,30 +89,36 @@ export async function _nextLamport() {
const writerRef = buildWriterRef(this._graphName, this._writerId);
const currentRefSha = await this._persistence.readRef(writerRef);

if (!currentRefSha) {
// First commit for this writer
return { lamport: 1, parentSha: null };
}
let ownTick = 0;

// Read the current patch commit to get its lamport timestamp
const commitMessage = await this._persistence.showNode(currentRefSha);
const kind = detectMessageKind(commitMessage);
if (currentRefSha) {
// Read the current patch commit to get its lamport timestamp
const commitMessage = await this._persistence.showNode(currentRefSha);
const kind = detectMessageKind(commitMessage);

if (kind !== 'patch') {
// Writer ref doesn't point to a patch commit - treat as first commit
return { lamport: 1, parentSha: currentRefSha };
}
if (kind !== 'patch') {
// Writer ref doesn't point to a patch commit - treat as fresh start
return { lamport: Math.max(1, this._maxObservedLamport + 1), parentSha: currentRefSha };
}

try {
const patchInfo = decodePatchMessage(commitMessage);
return { lamport: patchInfo.lamport + 1, parentSha: currentRefSha };
} catch {
// Malformed message - error with actionable message
throw new Error(
`Failed to parse lamport from writer ref ${writerRef}: ` +
`commit ${currentRefSha} has invalid patch message format`
);
try {
const patchInfo = decodePatchMessage(commitMessage);
ownTick = patchInfo.lamport;
} catch {
throw new Error(
`Failed to parse lamport from writer ref ${writerRef}: ` +
`commit ${currentRefSha} has invalid patch message format`
);
}
}

// Standard Lamport clock rule: next tick = max(own chain, globally observed max) + 1.
// _maxObservedLamport is updated during materialize() and after each commit, so this
// is O(1) — no additional git reads required at commit time.
return {
lamport: Math.max(ownTick, this._maxObservedLamport) + 1,
parentSha: currentRefSha ?? null,
};
}

/**
Expand Down Expand Up @@ -193,6 +199,10 @@ export async function getWriterPatches(writerId, stopAtSha = null) {
*/
export async function _onPatchCommitted(writerId, { patch: committed, sha } = {}) {
vvIncrement(this._versionVector, writerId);
// Keep _maxObservedLamport up to date so _nextLamport() issues globally-monotonic ticks.
if (committed?.lamport !== undefined && committed.lamport > this._maxObservedLamport) {
this._maxObservedLamport = committed.lamport;
}
this._patchesSinceCheckpoint++;
// Eager re-materialize: apply the just-committed patch to cached state
// Only when the cache is clean — applying a patch to stale state would be incorrect
Expand Down
116 changes: 116 additions & 0 deletions test/unit/domain/WarpGraph.noCoordination.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -139,4 +139,120 @@ describe('No-coordination regression suite', () => {
{ seed: 4242, numRuns: 8 }
);
}, { timeout: 30000 });

describe('Lamport clock global-max monotonicity', () => {
it('first-time writer beats existing writer when it materializes first', async () => {
// Regression: when writer B makes its very first commit to a repo where writer A
// has already committed at tick N, B must commit at tick > N so its operations
// win the LWW CRDT tiebreaker — not lose to A's tick-1 commit.
const repo = await createGitRepo('lamport-mono');
try {
// Writer A seeds a node
const graphA = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'writer-zzz', // alphabetically > writer-aaa so A would win ties
autoMaterialize: true,
});
const pA = await graphA.createPatch();
pA.addNode('node:shared')
.setProperty('node:shared', 'value', 'from-A');
await pA.commit();

// Writer B opens a fresh handle, materializes to observe A, then mutates
const graphB = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'writer-aaa', // alphabetically < writer-zzz, would LOSE tick ties
autoMaterialize: true,
});
await graphB.syncCoverage();
await graphB.materialize(); // observes A's tick-1 patch → _maxObservedLamport = 1

const pB = await graphB.createPatch();
pB.setProperty('node:shared', 'value', 'from-B'); // must be at tick >= 2 to win
await pB.commit();

// B's own state should reflect its own mutation
const propsB = await graphB.getNodeProps('node:shared');
expect(propsB?.get('value')).toBe('from-B');

// A fresh reader that sees both writers must also resolve to B's value
const graphReader = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'reader',
autoMaterialize: true,
});
await graphReader.syncCoverage();
await graphReader.materialize();
const propsReader = await graphReader.getNodeProps('node:shared');
expect(propsReader?.get('value')).toBe('from-B');
} finally {
await repo.cleanup();
}
}, { timeout: 20000 });

it('_maxObservedLamport is updated after each commit on the same instance', async () => {
const repo = await createGitRepo('lamport-mono');
try {
const graph = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'writer-a',
autoMaterialize: true,
});

expect(graph._maxObservedLamport).toBe(0);

const p1 = await graph.createPatch();
p1.addNode('node:x').setProperty('node:x', 'v', '1');
await p1.commit();

expect(graph._maxObservedLamport).toBe(1);

const p2 = await graph.createPatch();
p2.setProperty('node:x', 'v', '2');
await p2.commit();

expect(graph._maxObservedLamport).toBe(2);
} finally {
await repo.cleanup();
}
}, { timeout: 10000 });

it('materialize updates _maxObservedLamport from observed patches', async () => {
const repo = await createGitRepo('lamport-mono');
try {
// Seed with writer-z at tick 1
const graphZ = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'writer-z',
autoMaterialize: true,
});
const p = await graphZ.createPatch();
p.addNode('node:x').setProperty('node:x', 'v', 'z');
await p.commit();

// Fresh writer-a: before materialize, max is 0
const graphA = await WarpGraph.open({
persistence: repo.persistence,
graphName: 'test',
writerId: 'writer-a',
autoMaterialize: true,
});
expect(graphA._maxObservedLamport).toBe(0);

await graphA.syncCoverage();
await graphA.materialize();

// After materialize, should have observed tick 1 from writer-z
expect(graphA._maxObservedLamport).toBe(1);
} finally {
await repo.cleanup();
}
}, { timeout: 10000 });
});

});
Loading