From 9619e78c4e3d80ce2e1e53236e9d0bdcb3eb5148 Mon Sep 17 00:00:00 2001 From: mverzilli <651693+mverzilli@users.noreply.github.com> Date: Wed, 14 Jan 2026 09:05:22 +0000 Subject: [PATCH] refactor: staged writes in tagging stores Third part of the series started with https://github.com/AztecProtocol/aztec-packages/pull/19445. This makes the stores related to tagging synchronization work based on staged writes. --- .../oracle/private_execution_oracle.ts | 4 +- yarn-project/pxe/src/logs/log_service.ts | 1 + yarn-project/pxe/src/pxe.ts | 4 +- .../recipient_tagging_store.test.ts | 88 +++++ .../tagging_store/recipient_tagging_store.ts | 102 +++++- .../sender_tagging_store.test.ts | 316 +++++++++++------- .../tagging_store/sender_tagging_store.ts | 155 +++++++-- ...ate_logs_for_sender_recipient_pair.test.ts | 24 +- ..._private_logs_for_sender_recipient_pair.ts | 9 +- .../sync_sender_tagging_indexes.test.ts | 30 +- .../sync_sender_tagging_indexes.ts | 13 +- ...load_and_store_new_tagging_indexes.test.ts | 58 ++-- .../load_and_store_new_tagging_indexes.ts | 5 +- 13 files changed, 582 insertions(+), 227 deletions(-) create mode 100644 yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts diff --git a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts index 38c46eff3b53..a19c34d72991 100644 --- a/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts +++ b/yarn-project/pxe/src/contract_function_simulator/oracle/private_execution_oracle.ts @@ -265,9 +265,9 @@ export class PrivateExecutionOracle extends UtilityExecutionOracle implements IP // This is a tagging secret we've not yet used in this tx, so first sync our store to make sure its indices // are up to date. We do this here because this store is not synced as part of the global sync because // that'd be wasteful as most tagging secrets are not used in each tx. - await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore); + await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore, this.jobId); - const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret); + const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret, this.jobId); // If lastUsedIndex is undefined, we've never used this secret, so start from 0 // Otherwise, the next index to use is one past the last used index return lastUsedIndex === undefined ? 0 : lastUsedIndex + 1; diff --git a/yarn-project/pxe/src/logs/log_service.ts b/yarn-project/pxe/src/logs/log_service.ts index 7a719572d80c..c1382093ed0e 100644 --- a/yarn-project/pxe/src/logs/log_service.ts +++ b/yarn-project/pxe/src/logs/log_service.ts @@ -123,6 +123,7 @@ export class LogService { this.aztecNode, this.recipientTaggingStore, anchorBlockNumber, + this.jobId, ), ), ); diff --git a/yarn-project/pxe/src/pxe.ts b/yarn-project/pxe/src/pxe.ts index 3b2e16ce1690..e5b5f8cab472 100644 --- a/yarn-project/pxe/src/pxe.ts +++ b/yarn-project/pxe/src/pxe.ts @@ -157,7 +157,7 @@ export class PXE { ); const jobCoordinator = new JobCoordinator(store); - jobCoordinator.registerStores([capsuleStore]); + jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]); const debugUtils = new PXEDebugUtils(contractStore, noteStore); @@ -673,7 +673,7 @@ export class PXE { // TODO(benesjan): The following is an expensive operation. Figure out a way to avoid it. const txHash = (await txProvingResult.toTx()).txHash; - await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash); + await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash, jobId); this.log.debug(`Stored used pre-tags as sender for the tx`, { preTagsUsedInTheTx, }); diff --git a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts new file mode 100644 index 000000000000..49ff3ad27a89 --- /dev/null +++ b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.test.ts @@ -0,0 +1,88 @@ +import { Fr } from '@aztec/foundation/curves/bn254'; +import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; +import { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; + +import { RecipientTaggingStore } from './recipient_tagging_store.js'; + +describe('RecipientTaggingStore', () => { + let taggingStore: RecipientTaggingStore; + let secret1: DirectionalAppTaggingSecret; + let secret2: DirectionalAppTaggingSecret; + + beforeEach(async () => { + taggingStore = new RecipientTaggingStore(await openTmpStore('test')); + secret1 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); + secret2 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); + }); + + describe('staged writes', () => { + it('persists staged highest aged index to the store', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBeUndefined(); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); + }); + + it('persists staged highest finalized index to the store', async () => { + await taggingStore.updateHighestFinalizedIndex(secret1, 10, 'job1'); + + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBeUndefined(); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(10); + }); + + it('persists multiple secrets for the same job', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.updateHighestAgedIndex(secret2, 8, 'job1'); + await taggingStore.updateHighestFinalizedIndex(secret1, 3, 'job1'); + await taggingStore.updateHighestFinalizedIndex(secret2, 6, 'job1'); + + await taggingStore.commit('job1'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); + expect(await taggingStore.getHighestAgedIndex(secret2, 'job2')).toBe(8); + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(3); + expect(await taggingStore.getHighestFinalizedIndex(secret2, 'job2')).toBe(6); + }); + + it('clears staged data after commit', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.commit('job1'); + + // Updating again with a higher value in the same job should work + // (if staged data wasn't cleared, it would still have the old value cached) + await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); + expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10); + await taggingStore.commit('job2'); + + expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10); + }); + + it('does not affect other jobs when committing', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); + + await taggingStore.commit('job2'); + + // job1's staged value should still be intact + expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(5); + }); + + it('discards staged highest aged index without persisting', async () => { + await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); + await taggingStore.discardStaged('job1'); + expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBeUndefined(); + }); + + it('discards staged highest finalized index without persisting', async () => { + await taggingStore.updateHighestFinalizedIndex(secret1, 5, 'job1'); + await taggingStore.discardStaged('job1'); + expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job1')).toBeUndefined(); + }); + }); +}); diff --git a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts index 3c9c4d77a883..ce981d93dccf 100644 --- a/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts +++ b/yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts @@ -1,6 +1,8 @@ import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; + /** * Data provider of tagging data used when syncing the logs as a recipient. The sender counterpart of this class * is called SenderTaggingStore. We have the providers separate for the sender and recipient because @@ -8,46 +10,120 @@ import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; * * @dev Chain reorgs do not need to be handled here because both the finalized and aged indexes refer to finalized * blocks, which by definition cannot be affected by reorgs. - * - * TODO(benesjan): Relocate to yarn-project/pxe/src/storage/tagging_store */ -export class RecipientTaggingStore { +export class RecipientTaggingStore implements StagedStore { + storeName: string = 'recipient_tagging'; + #store: AztecAsyncKVStore; #highestAgedIndex: AztecAsyncMap; #highestFinalizedIndex: AztecAsyncMap; + // jobId => secret => number + #highestAgedIndexForJob: Map>; + + // jobId => secret => number + #highestFinalizedIndexForJob: Map>; + constructor(store: AztecAsyncKVStore) { this.#store = store; this.#highestAgedIndex = this.#store.openMap('highest_aged_index'); this.#highestFinalizedIndex = this.#store.openMap('highest_finalized_index'); + + this.#highestAgedIndexForJob = new Map(); + this.#highestFinalizedIndexForJob = new Map(); + } + + #getHighestAgedIndexForJob(jobId: string): Map { + let highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId); + if (!highestAgedIndexForJob) { + highestAgedIndexForJob = new Map(); + this.#highestAgedIndexForJob.set(jobId, highestAgedIndexForJob); + } + return highestAgedIndexForJob; + } + + async #readHighestAgedIndex(jobId: string, secret: string): Promise { + return this.#getHighestAgedIndexForJob(jobId).get(secret) ?? (await this.#highestAgedIndex.getAsync(secret)); + } + + #writeHighestAgedIndex(jobId: string, secret: string, index: number) { + this.#getHighestAgedIndexForJob(jobId).set(secret, index); + } + + #getHighestFinalizedIndexForJob(jobId: string): Map { + let jobStagedHighestFinalizedIndex = this.#highestFinalizedIndexForJob.get(jobId); + if (!jobStagedHighestFinalizedIndex) { + jobStagedHighestFinalizedIndex = new Map(); + this.#highestFinalizedIndexForJob.set(jobId, jobStagedHighestFinalizedIndex); + } + return jobStagedHighestFinalizedIndex; + } + + async #readHighestFinalizedIndex(jobId: string, secret: string): Promise { + return ( + this.#getHighestFinalizedIndexForJob(jobId).get(secret) ?? (await this.#highestFinalizedIndex.getAsync(secret)) + ); + } + + #writeHighestFinalizedIndex(jobId: string, secret: string, index: number) { + this.#getHighestFinalizedIndexForJob(jobId).set(secret, index); + } + + /** + * Writes all job-specific in-memory data to persistent storage. + * + * @remark This method must run in a DB transaction context. It's designed to be called from JobCoordinator#commitJob. + */ + async commit(jobId: string): Promise { + const highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId); + if (highestAgedIndexForJob) { + for (const [secret, index] of highestAgedIndexForJob.entries()) { + await this.#highestAgedIndex.set(secret, index); + } + } + + const highestFinalizedIndexForJob = this.#highestFinalizedIndexForJob.get(jobId); + if (highestFinalizedIndexForJob) { + for (const [secret, index] of highestFinalizedIndexForJob.entries()) { + await this.#highestFinalizedIndex.set(secret, index); + } + } + + return this.discardStaged(jobId); + } + + discardStaged(jobId: string): Promise { + this.#highestAgedIndexForJob.delete(jobId); + this.#highestFinalizedIndexForJob.delete(jobId); + return Promise.resolve(); } - getHighestAgedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#highestAgedIndex.getAsync(secret.toString()); + getHighestAgedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return this.#readHighestAgedIndex(jobId, secret.toString()); } - async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise { - const currentIndex = await this.#highestAgedIndex.getAsync(secret.toString()); + async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise { + const currentIndex = await this.#readHighestAgedIndex(jobId, secret.toString()); if (currentIndex !== undefined && index <= currentIndex) { // Log sync should never set a lower highest aged index. throw new Error(`New highest aged index (${index}) must be higher than the current one (${currentIndex})`); } - await this.#highestAgedIndex.set(secret.toString(), index); + this.#writeHighestAgedIndex(jobId, secret.toString(), index); } - getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#highestFinalizedIndex.getAsync(secret.toString()); + getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return this.#readHighestFinalizedIndex(jobId, secret.toString()); } - async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise { - const currentIndex = await this.#highestFinalizedIndex.getAsync(secret.toString()); + async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise { + const currentIndex = await this.#readHighestFinalizedIndex(jobId, secret.toString()); if (currentIndex !== undefined && index < currentIndex) { // Log sync should never set a lower highest finalized index but it can happen that it would try to set the same // one because we are loading logs from highest aged index + 1 and not from the highest finalized index. throw new Error(`New highest finalized index (${index}) must be higher than the current one (${currentIndex})`); } - await this.#highestFinalizedIndex.set(secret.toString(), index); + this.#writeHighestFinalizedIndex(jobId, secret.toString(), index); } } diff --git a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts index 19d3073b223c..7b78ce0141a3 100644 --- a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts +++ b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.test.ts @@ -22,9 +22,9 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); const preTag: PreTag = { secret: secret1, index: 5 }; - await taggingStore.storePendingIndexes([preTag], txHash); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash); }); @@ -36,13 +36,13 @@ describe('SenderTaggingStore', () => { { secret: secret2, index: 7 }, ]; - await taggingStore.storePendingIndexes(preTags, txHash); + await taggingStore.storePendingIndexes(preTags, txHash, 'test'); - const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes1).toHaveLength(1); expect(txHashes1[0]).toEqual(txHash); - const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10); + const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10, 'test'); expect(txHashes2).toHaveLength(1); expect(txHashes2[0]).toEqual(txHash); }); @@ -51,10 +51,10 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(2); expect(txHashes).toContainEqual(txHash1); expect(txHashes).toContainEqual(txHash2); @@ -64,10 +64,10 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); const preTag: PreTag = { secret: secret1, index: 5 }; - await taggingStore.storePendingIndexes([preTag], txHash); - await taggingStore.storePendingIndexes([preTag], txHash); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); + await taggingStore.storePendingIndexes([preTag], txHash, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash); }); @@ -79,7 +79,7 @@ describe('SenderTaggingStore', () => { { secret: secret1, index: 7 }, ]; - await expect(taggingStore.storePendingIndexes(preTags, txHash)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes(preTags, txHash, 'test')).rejects.toThrow( 'Duplicate secrets found when storing pending indexes', ); }); @@ -88,10 +88,10 @@ describe('SenderTaggingStore', () => { const txHash = TxHash.random(); // First store an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); // Try to store a different index for the same secret + txHash pair - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash, 'test')).rejects.toThrow( /Cannot store index 7.*a different index 5 already exists/, ); }); @@ -101,11 +101,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store a pending index lower than the finalized index - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test')).rejects.toThrow( /Cannot store pending index 5.*lower than or equal to the last finalized index 10/, ); }); @@ -115,11 +115,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store a pending index equal to the finalized index - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2)).rejects.toThrow( + await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2, 'test')).rejects.toThrow( /Cannot store pending index 10.*lower than or equal to the last finalized index 10/, ); }); @@ -129,13 +129,15 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Store a pending index higher than the finalized index - should succeed - await expect(taggingStore.storePendingIndexes([{ secret: secret1, index: 15 }], txHash2)).resolves.not.toThrow(); + await expect( + taggingStore.storePendingIndexes([{ secret: secret1, index: 15 }], txHash2, 'test'), + ).resolves.not.toThrow(); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 20); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 20, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash2); }); @@ -148,12 +150,12 @@ describe('SenderTaggingStore', () => { const indexBeyondWindow = finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN + 1; // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Try to store an index beyond the window await expect( - taggingStore.storePendingIndexes([{ secret: secret1, index: indexBeyondWindow }], txHash2), + taggingStore.storePendingIndexes([{ secret: secret1, index: indexBeyondWindow }], txHash2, 'test'), ).rejects.toThrow( `Highest used index ${indexBeyondWindow} is further than window length from the highest finalized index ${finalizedIndex}`, ); @@ -166,15 +168,15 @@ describe('SenderTaggingStore', () => { const indexAtBoundary = finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; // First store and finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: finalizedIndex }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Store an index at the boundary, but check is >, so it should succeed await expect( - taggingStore.storePendingIndexes([{ secret: secret1, index: indexAtBoundary }], txHash2), + taggingStore.storePendingIndexes([{ secret: secret1, index: indexAtBoundary }], txHash2, 'test'), ).resolves.not.toThrow(); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, indexAtBoundary + 5); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, indexAtBoundary + 5, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash2); }); @@ -183,7 +185,7 @@ describe('SenderTaggingStore', () => { describe('getTxHashesOfPendingIndexes', () => { it('returns empty array when no pending indexes exist', async () => { - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toEqual([]); }); @@ -192,11 +194,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 8 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 8 }], txHash3, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 4, 9); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 4, 9, 'test'); expect(txHashes).toHaveLength(2); expect(txHashes).toContainEqual(txHash2); expect(txHashes).toContainEqual(txHash3); @@ -207,10 +209,10 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 10 }], txHash2, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 5, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 5, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash1); }); @@ -221,15 +223,15 @@ describe('SenderTaggingStore', () => { const txHash3 = TxHash.random(); const txHash4 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); // We store different secret with txHash1 to check we correctly don't return it in the result - await taggingStore.storePendingIndexes([{ secret: secret2, index: 7 }], txHash1); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 7 }], txHash1, 'test'); // Store "parallel" index for secret1 with a different tx (can happen when sending logs from multiple PXEs) - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash4); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash4, 'test'); - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); // Should have 3 unique tx hashes for secret1 expect(txHashes).toEqual(expect.arrayContaining([txHash1, txHash2, txHash3, txHash4])); }); @@ -237,32 +239,32 @@ describe('SenderTaggingStore', () => { describe('getLastFinalizedIndex', () => { it('returns undefined when no finalized index exists', async () => { - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBeUndefined(); }); it('returns the last finalized index after finalizePendingIndexes', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(5); }); }); describe('getLastUsedIndex', () => { it('returns undefined when no indexes exist', async () => { - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBeUndefined(); }); it('returns the last finalized index when no pending indexes exist', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(5); }); @@ -271,13 +273,13 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // First, finalize an index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Then add a higher pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(7); }); @@ -286,11 +288,11 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash3, 'test'); - const lastUsed = await taggingStore.getLastUsedIndex(secret1); + const lastUsed = await taggingStore.getLastUsedIndex(secret1, 'test'); expect(lastUsed).toBe(7); }); }); @@ -300,19 +302,19 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); - await taggingStore.dropPendingIndexes([txHash1]); + await taggingStore.dropPendingIndexes([txHash1], 'test'); // txHash1 should be removed - const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes1 = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes1).toHaveLength(1); expect(txHashes1[0]).toEqual(txHash2); // txHash1 should also be removed from secret2 - const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10); + const txHashes2 = await taggingStore.getTxHashesOfPendingIndexes(secret2, 0, 10, 'test'); expect(txHashes2).toEqual([]); }); }); @@ -320,15 +322,15 @@ describe('SenderTaggingStore', () => { describe('finalizePendingIndexes', () => { it('moves pending index to finalized for a given tx hash', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash, 'test'); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(5); // Pending index should be removed - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toEqual([]); }); @@ -336,13 +338,13 @@ describe('SenderTaggingStore', () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(7); }); @@ -351,16 +353,16 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // Store both pending indexes first - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash2, 'test'); // Finalize the higher index first - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Then try to finalize the lower index - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBe(7); // Should remain at 7 }); @@ -369,16 +371,16 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); const txHash3 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); // Finalize txHash2 (index 5) - await taggingStore.finalizePendingIndexes([txHash2]); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); // txHash1 (index 3) should be pruned as it's lower than finalized // txHash3 (index 7) should remain - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); expect(txHashes[0]).toEqual(txHash3); }); @@ -391,12 +393,13 @@ describe('SenderTaggingStore', () => { { secret: secret2, index: 7 }, ], txHash, + 'test', ); - await taggingStore.finalizePendingIndexes([txHash]); + await taggingStore.finalizePendingIndexes([txHash], 'test'); - const lastFinalized1 = await taggingStore.getLastFinalizedIndex(secret1); - const lastFinalized2 = await taggingStore.getLastFinalizedIndex(secret2); + const lastFinalized1 = await taggingStore.getLastFinalizedIndex(secret1, 'test'); + const lastFinalized2 = await taggingStore.getLastFinalizedIndex(secret2, 'test'); expect(lastFinalized1).toBe(3); expect(lastFinalized2).toBe(7); @@ -404,16 +407,16 @@ describe('SenderTaggingStore', () => { it('does nothing when tx hash does not exist', async () => { const txHash = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash, 'test'); - await taggingStore.finalizePendingIndexes([TxHash.random()]); + await taggingStore.finalizePendingIndexes([TxHash.random()], 'test'); // Original pending index should still be there - const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10); + const txHashes = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'test'); expect(txHashes).toHaveLength(1); // Finalized index should not be set - const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1); + const lastFinalized = await taggingStore.getLastFinalizedIndex(secret1, 'test'); expect(lastFinalized).toBeUndefined(); }); }); @@ -424,39 +427,39 @@ describe('SenderTaggingStore', () => { const txHash2 = TxHash.random(); // Step 1: Add pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBeUndefined(); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBeUndefined(); // Step 2: Finalize the index - await taggingStore.finalizePendingIndexes([txHash1]); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); // Step 3: Add a new higher pending index - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); // Step 4: Finalize the new index - await taggingStore.finalizePendingIndexes([txHash2]); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(7); + await taggingStore.finalizePendingIndexes([txHash2], 'test'); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(7); }); it('handles dropped transactions', async () => { const txHash1 = TxHash.random(); const txHash2 = TxHash.random(); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], txHash2, 'test'); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(5); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(5); // Drop txHash2 - await taggingStore.dropPendingIndexes([txHash2]); + await taggingStore.dropPendingIndexes([txHash2], 'test'); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(3); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(3); }); it('handles multiple secrets with different lifecycles', async () => { @@ -465,19 +468,96 @@ describe('SenderTaggingStore', () => { const txHash3 = TxHash.random(); // Secret1: pending -> finalized - await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1); - await taggingStore.finalizePendingIndexes([txHash1]); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, 'test'); + await taggingStore.finalizePendingIndexes([txHash1], 'test'); // Secret2: pending (not finalized) - await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash2); + await taggingStore.storePendingIndexes([{ secret: secret2, index: 5 }], txHash2, 'test'); // Secret1: new pending - await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, 'test'); - expect(await taggingStore.getLastFinalizedIndex(secret1)).toBe(3); - expect(await taggingStore.getLastUsedIndex(secret1)).toBe(7); - expect(await taggingStore.getLastFinalizedIndex(secret2)).toBeUndefined(); - expect(await taggingStore.getLastUsedIndex(secret2)).toBe(5); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'test')).toBe(3); + expect(await taggingStore.getLastUsedIndex(secret1, 'test')).toBe(7); + expect(await taggingStore.getLastFinalizedIndex(secret2, 'test')).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret2, 'test')).toBe(5); + }); + }); + + describe('staged writes', () => { + it('writes of uncommitted jobs are not visible outside the job that makes them', async () => { + const committedTxHash = TxHash.random(); + { + const commitJobId: string = 'commit-job'; + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], committedTxHash, commitJobId); + await taggingStore.commit(commitJobId); + } + + const stagedTxHash = TxHash.random(); + const stagingJobId: string = 'staging-job'; + await taggingStore.storePendingIndexes([{ secret: secret1, index: 5 }], stagedTxHash, stagingJobId); + + // For a job without any staged data we should only get committed data + const txHashesWithoutJobId = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, 'no-data-job'); + expect(txHashesWithoutJobId).toHaveLength(1); + expect(txHashesWithoutJobId[0]).toEqual(committedTxHash); + + // With stagingJobId, should get both committed and staged data + const txHashesWithJobId = await taggingStore.getTxHashesOfPendingIndexes(secret1, 0, 10, stagingJobId); + expect(txHashesWithJobId).toHaveLength(2); + expect(txHashesWithJobId).toContainEqual(committedTxHash); + expect(txHashesWithJobId).toContainEqual(stagedTxHash); + }); + + it('job staged data is correctly isolated when storing and finalizing pending indexes', async () => { + const txHash1 = TxHash.random(); + { + const commitJobId: string = 'commit-job'; + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash1, commitJobId); + await taggingStore.finalizePendingIndexes([txHash1], commitJobId); + await taggingStore.commit(commitJobId); + } + + const txHash2 = TxHash.random(); + const stagingJobId: string = 'staging-job'; + + // Stage a higher finalized index (not committed) + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash2, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash2], stagingJobId); + + // With a different jobId, should get the committed finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, 'no-data-job')).toBe(3); + + // With stagingJobId, should get the staged finalized index + expect(await taggingStore.getLastFinalizedIndex(secret1, stagingJobId)).toBe(7); + }); + + it('discardStaged removes staged data without affecting persistent storage', async () => { + { + const txHash1 = TxHash.random(); + const txHash2 = TxHash.random(); + const commitJobId: string = 'commit-job'; + await taggingStore.storePendingIndexes([{ secret: secret1, index: 2 }], txHash1, commitJobId); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 3 }], txHash2, commitJobId); + await taggingStore.finalizePendingIndexes([txHash1], commitJobId); + await taggingStore.commit(commitJobId); + } + + const stagingJobId: string = 'staging-job'; + { + const txHash3 = TxHash.random(); + await taggingStore.storePendingIndexes([{ secret: secret1, index: 7 }], txHash3, stagingJobId); + await taggingStore.finalizePendingIndexes([txHash3], stagingJobId); + await taggingStore.discardStaged(stagingJobId); + } + + // Should still get the committed finalized index + expect(await taggingStore.getLastUsedIndex(secret1, 'no-data-job')).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, 'no-data-job')).toBe(2); + + // With stagingJobId should fall back to committed since staging was discarded + expect(await taggingStore.getLastUsedIndex(secret1, stagingJobId)).toBe(3); + expect(await taggingStore.getLastFinalizedIndex(secret1, stagingJobId)).toBe(2); }); }); }); diff --git a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts index d95f1491adc1..86224213231d 100644 --- a/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts +++ b/yarn-project/pxe/src/storage/tagging_store/sender_tagging_store.ts @@ -3,14 +3,17 @@ import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; import type { DirectionalAppTaggingSecret, PreTag } from '@aztec/stdlib/logs'; import { TxHash } from '@aztec/stdlib/tx'; +import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; import { UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN } from '../../tagging/constants.js'; /** * Data provider of tagging data used when syncing the sender tagging indexes. The recipient counterpart of this class - * is called RecipientTaggingStore. We have the providers separate for the sender and recipient because + * is called RecipientTaggingStore. We have the data stores separate for sender and recipient because * the algorithms are completely disjoint and there is not data reuse between the two. */ -export class SenderTaggingStore { +export class SenderTaggingStore implements StagedStore { + readonly storeName = 'sender_tagging'; + #store: AztecAsyncKVStore; // Stores the pending indexes for each directional app tagging secret. Pending here means that the tx that contained @@ -21,18 +24,114 @@ export class SenderTaggingStore { // the smaller ones are irrelevant due to tx atomicity. // // TODO(#17615): This assumes no logs are used in the non-revertible phase. + // + // directional app tagging secret => { pending index, txHash }[] #pendingIndexes: AztecAsyncMap; + // jobId => directional app tagging secret => { pending index, txHash }[] + #pendingIndexesForJob: Map>; + // Stores the last (highest) finalized index for each directional app tagging secret. We care only about the last // index because unlike the pending indexes, it will never happen that a finalized index would be removed and hence // we don't need to store the history. + // + // directional app tagging secret => highest finalized index #lastFinalizedIndexes: AztecAsyncMap; + // jobId => directional app tagging secret => highest finalized index + #lastFinalizedIndexesForJob: Map>; + constructor(store: AztecAsyncKVStore) { this.#store = store; this.#pendingIndexes = this.#store.openMap('pending_indexes'); this.#lastFinalizedIndexes = this.#store.openMap('last_finalized_indexes'); + + this.#pendingIndexesForJob = new Map(); + this.#lastFinalizedIndexesForJob = new Map(); + } + + #getPendingIndexesForJob(jobId: string): Map { + let pendingIndexesForJob = this.#pendingIndexesForJob.get(jobId); + if (!pendingIndexesForJob) { + pendingIndexesForJob = new Map(); + this.#pendingIndexesForJob.set(jobId, pendingIndexesForJob); + } + return pendingIndexesForJob; + } + + #getLastFinalizedIndexesForJob(jobId: string): Map { + let jobStagedLastFinalizedIndexes = this.#lastFinalizedIndexesForJob.get(jobId); + if (!jobStagedLastFinalizedIndexes) { + jobStagedLastFinalizedIndexes = new Map(); + this.#lastFinalizedIndexesForJob.set(jobId, jobStagedLastFinalizedIndexes); + } + return jobStagedLastFinalizedIndexes; + } + + async #readPendingIndexes(jobId: string, secret: string): Promise<{ index: number; txHash: string }[]> { + const jobStagedPendingIndexes = this.#getPendingIndexesForJob(jobId); + const pendingIndexes = jobStagedPendingIndexes.get(secret); + // We return the staged version of this if it exists, if not, we read from the DB. + // If the DB also has nothing, we return an empty array []. + return pendingIndexes !== undefined ? pendingIndexes : ((await this.#pendingIndexes.getAsync(secret)) ?? []); + } + + #writePendingIndexes(jobId: string, secret: string, pendingIndexes: { index: number; txHash: string }[]) { + this.#getPendingIndexesForJob(jobId).set(secret, pendingIndexes); + } + + /** + * Returns a job view of all the secrets that have a corresponding list of pending indexes either in persistent + * storage or the current job. + */ + async #readSecretsWithPendingIndexes(jobId: string): Promise { + const storedSecrets = new Set(await toArray(this.#pendingIndexes.keysAsync())); + const stagedSecrets = this.#getPendingIndexesForJob(jobId).keys(); + return [...storedSecrets.union(new Set(stagedSecrets))]; + } + + async #readLastFinalizedIndex(jobId: string, secret: string): Promise { + return ( + this.#getLastFinalizedIndexesForJob(jobId).get(secret) ?? (await this.#lastFinalizedIndexes.getAsync(secret)) + ); + } + + #writeLastFinalizedIndex(jobId: string, secret: string, lastFinalizedIndex: number) { + this.#getLastFinalizedIndexesForJob(jobId).set(secret, lastFinalizedIndex); + } + + /** + * Writes all job-specific in-memory data to persistent storage. + * + * @remark This method must run in a DB transaction context. It's designed to be called from JobCoordinator#commitJob. + */ + async commit(jobId: string): Promise { + const pendingIndexesForJob = this.#pendingIndexesForJob.get(jobId); + if (pendingIndexesForJob) { + for (const [secret, pendingIndexes] of pendingIndexesForJob.entries()) { + if (pendingIndexes.length === 0) { + await this.#pendingIndexes.delete(secret); + } else { + await this.#pendingIndexes.set(secret, pendingIndexes); + } + } + } + + const lastFinalizedIndexesForJob = this.#lastFinalizedIndexesForJob.get(jobId); + if (lastFinalizedIndexesForJob) { + for (const [secret, lastFinalizedIndex] of lastFinalizedIndexesForJob.entries()) { + await this.#lastFinalizedIndexes.set(secret, lastFinalizedIndex); + } + } + + return this.discardStaged(jobId); + } + + discardStaged(jobId: string): Promise { + this.#pendingIndexesForJob.delete(jobId); + this.#lastFinalizedIndexesForJob.delete(jobId); + return Promise.resolve(); } /** @@ -43,6 +142,7 @@ export class SenderTaggingStore { * @param preTags - The pre-tags containing the directional app tagging secrets and the indexes that are to be * stored in the db. * @param txHash - The tx in which the pretags were used in private logs. + * @param jobId - job context for staged writes to this store. See `JobCoordinator` for more details. * @throws If any two pre-tags contain the same directional app tagging secret. This is enforced because we care * only about the highest index for a given secret that was used in the tx. Hence this check is a good way to catch * bugs. @@ -56,7 +156,7 @@ export class SenderTaggingStore { * This is enforced because this should never happen if the syncing is done correctly as we look for logs from higher * indexes than finalized ones. */ - async storePendingIndexes(preTags: PreTag[], txHash: TxHash) { + async storePendingIndexes(preTags: PreTag[], txHash: TxHash, jobId: string) { // The secrets in pre-tags should be unique because we always store just the highest index per given secret-txHash // pair. Below we check that this is the case. const secretsSet = new Set(preTags.map(preTag => preTag.secret.toString())); @@ -67,7 +167,7 @@ export class SenderTaggingStore { for (const { secret, index } of preTags) { // First we check that for any secret the highest used index in tx is not further than window length from // the highest finalized index. - const finalizedIndex = (await this.getLastFinalizedIndex(secret)) ?? 0; + const finalizedIndex = (await this.getLastFinalizedIndex(secret, jobId)) ?? 0; if (index > finalizedIndex + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN) { throw new Error( `Highest used index ${index} is further than window length from the highest finalized index ${finalizedIndex}. @@ -78,7 +178,7 @@ export class SenderTaggingStore { // Throw if the new pending index is lower than or equal to the last finalized index const secretStr = secret.toString(); - const lastFinalizedIndex = await this.#lastFinalizedIndexes.getAsync(secretStr); + const lastFinalizedIndex = await this.#readLastFinalizedIndex(jobId, secretStr); if (lastFinalizedIndex !== undefined && index <= lastFinalizedIndex) { throw new Error( `Cannot store pending index ${index} for secret ${secretStr}: ` + @@ -88,7 +188,7 @@ export class SenderTaggingStore { // Check if this secret + txHash combination already exists const txHashStr = txHash.toString(); - const existingForSecret = (await this.#pendingIndexes.getAsync(secretStr)) ?? []; + const existingForSecret = await this.#readPendingIndexes(jobId, secretStr); const existingForSecretAndTx = existingForSecret.find(entry => entry.txHash === txHashStr); if (existingForSecretAndTx) { @@ -102,7 +202,7 @@ export class SenderTaggingStore { // If it exists with the same index, ignore the update (no-op) } else { // If it doesn't exist, add it - await this.#pendingIndexes.set(secretStr, [...existingForSecret, { index, txHash: txHashStr }]); + this.#writePendingIndexes(jobId, secretStr, [...existingForSecret, { index, txHash: txHashStr }]); } } } @@ -120,8 +220,9 @@ export class SenderTaggingStore { secret: DirectionalAppTaggingSecret, startIndex: number, endIndex: number, + jobId: string, ): Promise { - const existing = (await this.#pendingIndexes.getAsync(secret.toString())) ?? []; + const existing = await this.#readPendingIndexes(jobId, secret.toString()); const txHashes = existing .filter(entry => entry.index >= startIndex && entry.index < endIndex) .map(entry => entry.txHash); @@ -133,8 +234,8 @@ export class SenderTaggingStore { * @param secret - The secret to get the last finalized index for. * @returns The last (highest) finalized index for the given secret. */ - getLastFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise { - return this.#lastFinalizedIndexes.getAsync(secret.toString()); + getLastFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { + return this.#readLastFinalizedIndex(jobId, secret.toString()); } /** @@ -143,13 +244,13 @@ export class SenderTaggingStore { * @param secret - The directional app tagging secret to query the last used index for. * @returns The last used index. */ - async getLastUsedIndex(secret: DirectionalAppTaggingSecret): Promise { + async getLastUsedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise { const secretStr = secret.toString(); - const pendingTxScopedIndexes = (await this.#pendingIndexes.getAsync(secretStr)) ?? []; + const pendingTxScopedIndexes = await this.#readPendingIndexes(jobId, secretStr); const pendingIndexes = pendingTxScopedIndexes.map(entry => entry.index); if (pendingTxScopedIndexes.length === 0) { - return this.#lastFinalizedIndexes.getAsync(secretStr); + return this.#readLastFinalizedIndex(jobId, secretStr); } // As the last used index we return the highest one from the pending indexes. Note that this value will be always @@ -160,23 +261,23 @@ export class SenderTaggingStore { /** * Drops all pending indexes corresponding to the given transaction hashes. */ - async dropPendingIndexes(txHashes: TxHash[]) { + async dropPendingIndexes(txHashes: TxHash[], jobId: string) { if (txHashes.length === 0) { return; } - const txHashStrs = new Set(txHashes.map(txHash => txHash.toString())); - const allSecrets = await toArray(this.#pendingIndexes.keysAsync()); + const txHashStrings = new Set(txHashes.map(txHash => txHash.toString())); + const allSecrets = await this.#readSecretsWithPendingIndexes(jobId); for (const secret of allSecrets) { - const pendingData = await this.#pendingIndexes.getAsync(secret); + const pendingData = await this.#readPendingIndexes(jobId, secret); if (pendingData) { - const filtered = pendingData.filter(item => !txHashStrs.has(item.txHash)); + const filtered = pendingData.filter(item => !txHashStrings.has(item.txHash)); if (filtered.length === 0) { - await this.#pendingIndexes.delete(secret); + this.#writePendingIndexes(jobId, secret, []); } else if (filtered.length !== pendingData.length) { // Some items were filtered out, so update the pending data - await this.#pendingIndexes.set(secret, filtered); + this.#writePendingIndexes(jobId, secret, filtered); } // else: No items were filtered out (txHashes not found for this secret) --> no-op } @@ -187,7 +288,7 @@ export class SenderTaggingStore { * Updates pending indexes corresponding to the given transaction hashes to be finalized and prunes any lower pending * indexes. */ - async finalizePendingIndexes(txHashes: TxHash[]) { + async finalizePendingIndexes(txHashes: TxHash[], jobId: string) { if (txHashes.length === 0) { return; } @@ -195,10 +296,10 @@ export class SenderTaggingStore { for (const txHash of txHashes) { const txHashStr = txHash.toString(); - const allSecrets = await toArray(this.#pendingIndexes.keysAsync()); + const allSecrets = await this.#readSecretsWithPendingIndexes(jobId); for (const secret of allSecrets) { - const pendingData = await this.#pendingIndexes.getAsync(secret); + const pendingData = await this.#readPendingIndexes(jobId, secret); if (!pendingData) { continue; } @@ -214,7 +315,7 @@ export class SenderTaggingStore { throw new Error(`Multiple pending indexes found for tx hash ${txHashStr} and secret ${secret}`); } - let lastFinalized = await this.#lastFinalizedIndexes.getAsync(secret); + let lastFinalized = await this.#readLastFinalizedIndex(jobId, secret); const newFinalized = matchingIndexes[0]; if (newFinalized < (lastFinalized ?? 0)) { @@ -225,7 +326,7 @@ export class SenderTaggingStore { ); } - await this.#lastFinalizedIndexes.set(secret, newFinalized); + this.#writeLastFinalizedIndex(jobId, secret, newFinalized); lastFinalized = newFinalized; // When we add pending indexes, we ensure they are higher than the last finalized index. However, because we @@ -234,9 +335,9 @@ export class SenderTaggingStore { // outdated pending indexes. const remainingItemsOfHigherIndex = pendingData.filter(item => item.index > (lastFinalized ?? 0)); if (remainingItemsOfHigherIndex.length === 0) { - await this.#pendingIndexes.delete(secret); + this.#writePendingIndexes(jobId, secret, []); } else { - await this.#pendingIndexes.set(secret, remainingItemsOfHigherIndex); + this.#writePendingIndexes(jobId, secret, remainingItemsOfHigherIndex); } } } diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts index 37e28eed71a6..fbd936187ded 100644 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts +++ b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.test.ts @@ -64,11 +64,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(0); - expect(await taggingStore.getHighestAgedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBeUndefined(); }); it('loads log and updates highest finalized index but not highest aged index', async () => { @@ -97,11 +98,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(logIndex); - expect(await taggingStore.getHighestAgedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBeUndefined(); }); it('loads log and updates both highest aged and highest finalized indexes', async () => { @@ -130,11 +132,12 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); expect(logs).toHaveLength(1); - expect(await taggingStore.getHighestAgedIndex(secret)).toBe(logIndex); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(logIndex); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(logIndex); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(logIndex); }); it('logs at boundaries are properly loaded, window and highest indexes advance as expected', async () => { @@ -150,8 +153,8 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { const log2Tag = await computeSiloedTagForIndex(log2Index); // Set existing highest aged index and highest finalized index - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex); - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex); + await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, 'test'); + await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, 'test'); aztecNode.getL2Tips.mockResolvedValue(makeL2Tips(finalizedBlockNumber)); @@ -180,12 +183,13 @@ describe('loadPrivateLogsForSenderRecipientPair', () => { aztecNode, taggingStore, NON_INTERFERING_ANCHOR_BLOCK_NUMBER, + 'test', ); // Verify that both logs at the boundaries of the range were found and processed expect(logs).toHaveLength(2); - expect(await taggingStore.getHighestFinalizedIndex(secret)).toBe(log2Index); - expect(await taggingStore.getHighestAgedIndex(secret)).toBe(log1Index); + expect(await taggingStore.getHighestFinalizedIndex(secret, 'test')).toBe(log2Index); + expect(await taggingStore.getHighestAgedIndex(secret, 'test')).toBe(log1Index); // Verify that the window was moved forward correctly // Total range queried: from (highestAgedIndex + 1) to (log2Index + WINDOW_LEN + 1) exclusive diff --git a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts index fab2cdf83a70..3ffb7988a6ee 100644 --- a/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts +++ b/yarn-project/pxe/src/tagging/recipient_sync/load_private_logs_for_sender_recipient_pair.ts @@ -21,6 +21,7 @@ export async function loadPrivateLogsForSenderRecipientPair( aztecNode: AztecNode, taggingStore: RecipientTaggingStore, anchorBlockNumber: BlockNumber, + jobId: string, ): Promise { // # Explanation of how the algorithm works // When we perform the sync we will look at logs that correspond to the tagging index range @@ -76,8 +77,8 @@ export async function loadPrivateLogsForSenderRecipientPair( let start: number, end: number; { - const currentHighestAgedIndex = await taggingStore.getHighestAgedIndex(secret); - const currentHighestFinalizedIndex = await taggingStore.getHighestFinalizedIndex(secret); + const currentHighestAgedIndex = await taggingStore.getHighestAgedIndex(secret, jobId); + const currentHighestFinalizedIndex = await taggingStore.getHighestFinalizedIndex(secret, jobId); // We don't want to include the highest aged index so we start from `currentHighestAgedIndex + 1` (or 0 if not set) start = currentHighestAgedIndex === undefined ? 0 : currentHighestAgedIndex + 1; @@ -107,7 +108,7 @@ export async function loadPrivateLogsForSenderRecipientPair( // Store updates in data provider and update local variables if (highestAgedIndex !== undefined) { - await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex); + await taggingStore.updateHighestAgedIndex(secret, highestAgedIndex, jobId); } if (highestFinalizedIndex === undefined) { @@ -120,7 +121,7 @@ export async function loadPrivateLogsForSenderRecipientPair( throw new Error('Highest aged index lower than highest finalized index invariant violated'); } - await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex); + await taggingStore.updateHighestFinalizedIndex(secret, highestFinalizedIndex, jobId); // For the next iteration we want to look only at indexes for which we have not attempted to load logs yet while // ensuring that we do not look further than WINDOW_LEN ahead of the highest finalized index. diff --git a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts index ed4f2d5a4109..0d8051e9469f 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.test.ts @@ -44,11 +44,11 @@ describe('syncSenderTaggingIndexes', () => { return Promise.resolve(tags.map((_tag: SiloedTag) => [])); }); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Highest used and finalized indexes should stay undefined - expect(await taggingStore.getLastUsedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBeUndefined(); }); // These tests need to be run together in sequence. @@ -84,13 +84,13 @@ describe('syncSenderTaggingIndexes', () => { // Mock getL2Tips to return a finalized block number >= the tx block number aztecNode.getL2Tips.mockResolvedValue(makeL2Tips(finalizedBlockNumberStep1)); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify the highest finalized index is updated to 3 - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(finalizedIndexStep1); // Verify the highest used index also returns 3 (when there is no higher pending index the highest used index is // the highest finalized index). - expect(await taggingStore.getLastUsedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(finalizedIndexStep1); }); it('step 2: pending log is synced', async () => { @@ -111,12 +111,12 @@ describe('syncSenderTaggingIndexes', () => { aztecNode.getL2Tips.mockResolvedValue(makeL2Tips(finalizedBlockNumberStep1)); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify the highest finalized index was not updated - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(finalizedIndexStep1); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(finalizedIndexStep1); // Verify the highest used index was updated to the pending index - expect(await taggingStore.getLastUsedIndex(secret)).toBe(pendingIndexStep2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(pendingIndexStep2); }); it('step 3: syncs logs across 2 windows', async () => { @@ -178,10 +178,10 @@ describe('syncSenderTaggingIndexes', () => { // Mock getL2Tips with the new finalized block number aztecNode.getL2Tips.mockResolvedValue(makeL2Tips(newFinalizedBlockNumber)); - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(newHighestFinalizedIndex); - expect(await taggingStore.getLastUsedIndex(secret)).toBe(newHighestUsedIndex); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(newHighestFinalizedIndex); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(newHighestUsedIndex); }); }); @@ -230,10 +230,10 @@ describe('syncSenderTaggingIndexes', () => { aztecNode.getL2Tips.mockResolvedValue(makeL2Tips(finalizedBlockNumber)); // Sync tagged logs - await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore); + await syncSenderTaggingIndexes(secret, contractAddress, aztecNode, taggingStore, 'test'); // Verify that both highest finalized and highest used were set to the pending and finalized index - expect(await taggingStore.getLastFinalizedIndex(secret)).toBe(pendingAndFinalizedIndex); - expect(await taggingStore.getLastUsedIndex(secret)).toBe(pendingAndFinalizedIndex); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBe(pendingAndFinalizedIndex); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(pendingAndFinalizedIndex); }); }); diff --git a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts index ed90b0bdc1cd..7fdfb5e0b590 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/sync_sender_tagging_indexes.ts @@ -26,6 +26,7 @@ export async function syncSenderTaggingIndexes( app: AztecAddress, aztecNode: AztecNode, taggingStore: SenderTaggingStore, + jobId: string, ): Promise { // # Explanation of how syncing works // @@ -45,7 +46,7 @@ export async function syncSenderTaggingIndexes( // Each window advance requires two queries (logs + tx status). For example, syncing indexes 0–500 with a window of // 100 takes at least 10 round trips (5 windows × 2 queries). - const finalizedIndex = await taggingStore.getLastFinalizedIndex(secret); + const finalizedIndex = await taggingStore.getLastFinalizedIndex(secret, jobId); let start = finalizedIndex === undefined ? 0 : finalizedIndex + 1; let end = start + UNFINALIZED_TAGGING_INDEXES_WINDOW_LEN; @@ -56,21 +57,21 @@ export async function syncSenderTaggingIndexes( while (true) { // Load and store indexes for the current window. These indexes may already exist in the database if txs using // them were previously sent from this PXE. Any duplicates are handled by the tagging data provider. - await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore, jobId); // Retrieve all indexes within the current window from storage and update their status accordingly. - const pendingTxHashes = await taggingStore.getTxHashesOfPendingIndexes(secret, start, end); + const pendingTxHashes = await taggingStore.getTxHashesOfPendingIndexes(secret, start, end, jobId); if (pendingTxHashes.length === 0) { break; } const { txHashesToFinalize, txHashesToDrop } = await getStatusChangeOfPending(pendingTxHashes, aztecNode); - await taggingStore.dropPendingIndexes(txHashesToDrop); - await taggingStore.finalizePendingIndexes(txHashesToFinalize); + await taggingStore.dropPendingIndexes(txHashesToDrop, jobId); + await taggingStore.finalizePendingIndexes(txHashesToFinalize, jobId); // We check if the finalized index has been updated. - newFinalizedIndex = await taggingStore.getLastFinalizedIndex(secret); + newFinalizedIndex = await taggingStore.getLastFinalizedIndex(secret, jobId); if (previousFinalizedIndex !== newFinalizedIndex) { // A new finalized index was found, so we'll run the loop again. For example: // - Previous finalized index: 10 diff --git a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts index 8139e8ce721f..6565e0720e53 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.test.ts @@ -46,14 +46,14 @@ describe('loadAndStoreNewTaggingIndexes', () => { return Promise.resolve(tags.map((_tag: SiloedTag) => [])); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that no pending indexes were stored - expect(await taggingStore.getLastUsedIndex(secret)).toBeUndefined(); - expect(await taggingStore.getLastFinalizedIndex(secret)).toBeUndefined(); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBeUndefined(); + expect(await taggingStore.getLastFinalizedIndex(secret, 'test')).toBeUndefined(); // Verify the entire window has no pending tx hashes - const txHashesInWindow = await taggingStore.getTxHashesOfPendingIndexes(secret, 0, 10); + const txHashesInWindow = await taggingStore.getTxHashesOfPendingIndexes(secret, 0, 10, 'test'); expect(txHashesInWindow).toHaveLength(0); }); @@ -66,15 +66,15 @@ describe('loadAndStoreNewTaggingIndexes', () => { return Promise.resolve(tags.map((t: SiloedTag) => (t.equals(tag) ? [makeLog(txHash, tag.value)] : []))); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that the pending index was stored for this txHash - const txHashesInRange = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1); + const txHashesInRange = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1, 'test'); expect(txHashesInRange).toHaveLength(1); expect(txHashesInRange[0].equals(txHash)).toBe(true); // Verify the last used index is correct - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index); }); it('for multiple logs with same txHash stores the highest index', async () => { @@ -97,19 +97,19 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that only the highest index (7) was stored for this txHash and secret - const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1); + const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1, 'test'); expect(txHashesAtIndex2).toHaveLength(1); expect(txHashesAtIndex2[0].equals(txHash)).toBe(true); // Verify the lower index is not stored separately - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1, 'test'); expect(txHashesAtIndex1).toHaveLength(0); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index2); }); it('multiple logs with different txHashes', async () => { @@ -133,19 +133,19 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that both txHashes have their respective indexes stored - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, index1, index1 + 1, 'test'); expect(txHashesAtIndex1).toHaveLength(1); expect(txHashesAtIndex1[0].equals(txHash1)).toBe(true); - const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1); + const txHashesAtIndex2 = await taggingStore.getTxHashesOfPendingIndexes(secret, index2, index2 + 1, 'test'); expect(txHashesAtIndex2).toHaveLength(1); expect(txHashesAtIndex2[0].equals(txHash2)).toBe(true); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index2); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index2); }); // Expected to happen if sending logs from multiple PXEs at a similar time. @@ -161,17 +161,17 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify that both txHashes have the same index stored - const txHashesAtIndex = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1); + const txHashesAtIndex = await taggingStore.getTxHashesOfPendingIndexes(secret, index, index + 1, 'test'); expect(txHashesAtIndex).toHaveLength(2); const txHashStrings = txHashesAtIndex.map(h => h.toString()); expect(txHashStrings).toContain(txHash1.toString()); expect(txHashStrings).toContain(txHash2.toString()); // Verify the last used index is correct - expect(await taggingStore.getLastUsedIndex(secret)).toBe(index); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(index); }); it('complex scenario: multiple txHashes with multiple indexes', async () => { @@ -207,29 +207,29 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, 0, 10, aztecNode, taggingStore, 'test'); // Verify txHash1 has highest index 8 (should not be at index 1) - const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, 1, 2); + const txHashesAtIndex1 = await taggingStore.getTxHashesOfPendingIndexes(secret, 1, 2, 'test'); expect(txHashesAtIndex1).toHaveLength(0); - const txHashesAtIndex8 = await taggingStore.getTxHashesOfPendingIndexes(secret, 8, 9); + const txHashesAtIndex8 = await taggingStore.getTxHashesOfPendingIndexes(secret, 8, 9, 'test'); expect(txHashesAtIndex8).toHaveLength(1); expect(txHashesAtIndex8[0].equals(txHash1)).toBe(true); // Verify txHash2 has highest index 5 (should not be at index 3) - const txHashesAtIndex3 = await taggingStore.getTxHashesOfPendingIndexes(secret, 3, 4); + const txHashesAtIndex3 = await taggingStore.getTxHashesOfPendingIndexes(secret, 3, 4, 'test'); expect(txHashesAtIndex3).toHaveLength(0); - const txHashesAtIndex5 = await taggingStore.getTxHashesOfPendingIndexes(secret, 5, 6); + const txHashesAtIndex5 = await taggingStore.getTxHashesOfPendingIndexes(secret, 5, 6, 'test'); expect(txHashesAtIndex5).toHaveLength(1); expect(txHashesAtIndex5[0].equals(txHash2)).toBe(true); // Verify txHash3 has index 9 - const txHashesAtIndex9 = await taggingStore.getTxHashesOfPendingIndexes(secret, 9, 10); + const txHashesAtIndex9 = await taggingStore.getTxHashesOfPendingIndexes(secret, 9, 10, 'test'); expect(txHashesAtIndex9).toHaveLength(1); expect(txHashesAtIndex9[0].equals(txHash3)).toBe(true); // Verify the last used index is the highest - expect(await taggingStore.getLastUsedIndex(secret)).toBe(9); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(9); }); it('start is inclusive and end is exclusive', async () => { @@ -255,18 +255,18 @@ describe('loadAndStoreNewTaggingIndexes', () => { ); }); - await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore); + await loadAndStoreNewTaggingIndexes(secret, app, start, end, aztecNode, taggingStore, 'test'); // Verify that the log at start (inclusive) was processed - const txHashesAtStart = await taggingStore.getTxHashesOfPendingIndexes(secret, start, start + 1); + const txHashesAtStart = await taggingStore.getTxHashesOfPendingIndexes(secret, start, start + 1, 'test'); expect(txHashesAtStart).toHaveLength(1); expect(txHashesAtStart[0].equals(txHashAtStart)).toBe(true); // Verify that the log at end (exclusive) was NOT processed - const txHashesAtEnd = await taggingStore.getTxHashesOfPendingIndexes(secret, end, end + 1); + const txHashesAtEnd = await taggingStore.getTxHashesOfPendingIndexes(secret, end, end + 1, 'test'); expect(txHashesAtEnd).toHaveLength(0); // Verify the last used index is the start index (since end was not processed) - expect(await taggingStore.getLastUsedIndex(secret)).toBe(start); + expect(await taggingStore.getLastUsedIndex(secret, 'test')).toBe(start); }); }); diff --git a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts index 8b3a5eb7e6c8..2a8cfeb4d812 100644 --- a/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts +++ b/yarn-project/pxe/src/tagging/sender_sync/utils/load_and_store_new_tagging_indexes.ts @@ -17,6 +17,8 @@ import type { SenderTaggingStore } from '../../../storage/tagging_store/sender_t * @param end - The ending index (exclusive) of the window to process. * @param aztecNode - The Aztec node instance to query for logs. * @param taggingStore - The data provider to store pending indexes. + * @param jobId - Job identifier, used to keep writes in-memory until they can be persisted in a data integrity + * preserving way. */ export async function loadAndStoreNewTaggingIndexes( secret: DirectionalAppTaggingSecret, @@ -25,6 +27,7 @@ export async function loadAndStoreNewTaggingIndexes( end: number, aztecNode: AztecNode, taggingStore: SenderTaggingStore, + jobId: string, ) { // We compute the tags for the current window of indexes const preTagsForWindow: PreTag[] = Array(end - start) @@ -40,7 +43,7 @@ export async function loadAndStoreNewTaggingIndexes( // Now we iterate over the map, reconstruct the preTags and tx hash and store them in the db. for (const [txHashStr, highestIndex] of highestIndexMap.entries()) { const txHash = TxHash.fromString(txHashStr); - await taggingStore.storePendingIndexes([{ secret, index: highestIndex }], txHash); + await taggingStore.storePendingIndexes([{ secret, index: highestIndex }], txHash, jobId); } }