Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -265,9 +265,9 @@ export class PrivateExecutionOracle extends UtilityExecutionOracle implements IP
// This is a tagging secret we've not yet used in this tx, so first sync our store to make sure its indices
// are up to date. We do this here because this store is not synced as part of the global sync because
// that'd be wasteful as most tagging secrets are not used in each tx.
await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore);
await syncSenderTaggingIndexes(secret, this.contractAddress, this.aztecNode, this.senderTaggingStore, this.jobId);

const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret);
const lastUsedIndex = await this.senderTaggingStore.getLastUsedIndex(secret, this.jobId);
// If lastUsedIndex is undefined, we've never used this secret, so start from 0
// Otherwise, the next index to use is one past the last used index
return lastUsedIndex === undefined ? 0 : lastUsedIndex + 1;
Expand Down
1 change: 1 addition & 0 deletions yarn-project/pxe/src/logs/log_service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ export class LogService {
this.aztecNode,
this.recipientTaggingStore,
anchorBlockNumber,
this.jobId,
),
),
);
Expand Down
4 changes: 2 additions & 2 deletions yarn-project/pxe/src/pxe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ export class PXE {
);

const jobCoordinator = new JobCoordinator(store);
jobCoordinator.registerStores([capsuleStore]);
jobCoordinator.registerStores([capsuleStore, senderTaggingStore, recipientTaggingStore]);

const debugUtils = new PXEDebugUtils(contractStore, noteStore);

Expand Down Expand Up @@ -673,7 +673,7 @@ export class PXE {
// TODO(benesjan): The following is an expensive operation. Figure out a way to avoid it.
const txHash = (await txProvingResult.toTx()).txHash;

await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash);
await this.senderTaggingStore.storePendingIndexes(preTagsUsedInTheTx, txHash, jobId);
this.log.debug(`Stored used pre-tags as sender for the tx`, {
preTagsUsedInTheTx,
});
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { Fr } from '@aztec/foundation/curves/bn254';
import { openTmpStore } from '@aztec/kv-store/lmdb-v2';
import { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs';

import { RecipientTaggingStore } from './recipient_tagging_store.js';

describe('RecipientTaggingStore', () => {
let taggingStore: RecipientTaggingStore;
let secret1: DirectionalAppTaggingSecret;
let secret2: DirectionalAppTaggingSecret;

beforeEach(async () => {
taggingStore = new RecipientTaggingStore(await openTmpStore('test'));
secret1 = DirectionalAppTaggingSecret.fromString(Fr.random().toString());
secret2 = DirectionalAppTaggingSecret.fromString(Fr.random().toString());
});

describe('staged writes', () => {
it('persists staged highest aged index to the store', async () => {
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');

expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBeUndefined();

await taggingStore.commit('job1');

expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5);
});

it('persists staged highest finalized index to the store', async () => {
await taggingStore.updateHighestFinalizedIndex(secret1, 10, 'job1');

expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBeUndefined();

await taggingStore.commit('job1');

expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(10);
});

it('persists multiple secrets for the same job', async () => {
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
await taggingStore.updateHighestAgedIndex(secret2, 8, 'job1');
await taggingStore.updateHighestFinalizedIndex(secret1, 3, 'job1');
await taggingStore.updateHighestFinalizedIndex(secret2, 6, 'job1');

await taggingStore.commit('job1');

expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5);
expect(await taggingStore.getHighestAgedIndex(secret2, 'job2')).toBe(8);
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(3);
expect(await taggingStore.getHighestFinalizedIndex(secret2, 'job2')).toBe(6);
});

it('clears staged data after commit', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's annoying but given that aged and finalized indexes are stored completely separately we should have this test case (and some of the following) duplicated for both type of indexes.

await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
await taggingStore.commit('job1');

// Updating again with a higher value in the same job should work
// (if staged data wasn't cleared, it would still have the old value cached)
Comment on lines +57 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are not using the same job below though so this seems out of place?

await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2');
expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10);
await taggingStore.commit('job2');

expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10);
// Now we verify that we don't receive the data originally staged in job1
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10);

Took me a bit to figure out how exactly this is testing the staged gets cleared.

});

it('does not affect other jobs when committing', async () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one also should have the finalized index counterpart.

await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2');

await taggingStore.commit('job2');

// job1's staged value should still be intact
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(5);
});

it('discards staged highest aged index without persisting', async () => {
await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1');
await taggingStore.discardStaged('job1');
expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBeUndefined();
});

it('discards staged highest finalized index without persisting', async () => {
await taggingStore.updateHighestFinalizedIndex(secret1, 5, 'job1');
await taggingStore.discardStaged('job1');
expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job1')).toBeUndefined();
});
});
});
102 changes: 89 additions & 13 deletions yarn-project/pxe/src/storage/tagging_store/recipient_tagging_store.ts
Original file line number Diff line number Diff line change
@@ -1,53 +1,129 @@
import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store';
import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs';

import type { StagedStore } from '../../job_coordinator/job_coordinator.js';

/**
* Data provider of tagging data used when syncing the logs as a recipient. The sender counterpart of this class
* is called SenderTaggingStore. We have the providers separate for the sender and recipient because
* the algorithms are completely disjoint and there is not data reuse between the two.
*
* @dev Chain reorgs do not need to be handled here because both the finalized and aged indexes refer to finalized
* blocks, which by definition cannot be affected by reorgs.
*
* TODO(benesjan): Relocate to yarn-project/pxe/src/storage/tagging_store
*/
export class RecipientTaggingStore {
export class RecipientTaggingStore implements StagedStore {
storeName: string = 'recipient_tagging';

#store: AztecAsyncKVStore;

#highestAgedIndex: AztecAsyncMap<string, number>;
#highestFinalizedIndex: AztecAsyncMap<string, number>;

// jobId => secret => number
#highestAgedIndexForJob: Map<string, Map<string, number>>;

// jobId => secret => number
#highestFinalizedIndexForJob: Map<string, Map<string, number>>;

constructor(store: AztecAsyncKVStore) {
this.#store = store;

this.#highestAgedIndex = this.#store.openMap('highest_aged_index');
this.#highestFinalizedIndex = this.#store.openMap('highest_finalized_index');

this.#highestAgedIndexForJob = new Map();
this.#highestFinalizedIndexForJob = new Map();
}

#getHighestAgedIndexForJob(jobId: string): Map<string, number> {
let highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId);
if (!highestAgedIndexForJob) {
highestAgedIndexForJob = new Map();
this.#highestAgedIndexForJob.set(jobId, highestAgedIndexForJob);
}
return highestAgedIndexForJob;
}

async #readHighestAgedIndex(jobId: string, secret: string): Promise<number | undefined> {
return this.#getHighestAgedIndexForJob(jobId).get(secret) ?? (await this.#highestAgedIndex.getAsync(secret));
}

#writeHighestAgedIndex(jobId: string, secret: string, index: number) {
this.#getHighestAgedIndexForJob(jobId).set(secret, index);
}

#getHighestFinalizedIndexForJob(jobId: string): Map<string, number> {
let jobStagedHighestFinalizedIndex = this.#highestFinalizedIndexForJob.get(jobId);
if (!jobStagedHighestFinalizedIndex) {
jobStagedHighestFinalizedIndex = new Map();
this.#highestFinalizedIndexForJob.set(jobId, jobStagedHighestFinalizedIndex);
}
return jobStagedHighestFinalizedIndex;
}

async #readHighestFinalizedIndex(jobId: string, secret: string): Promise<number | undefined> {
return (
this.#getHighestFinalizedIndexForJob(jobId).get(secret) ?? (await this.#highestFinalizedIndex.getAsync(secret))
);
}

#writeHighestFinalizedIndex(jobId: string, secret: string, index: number) {
this.#getHighestFinalizedIndexForJob(jobId).set(secret, index);
}

/**
* Writes all job-specific in-memory data to persistent storage.
*
* @remark This method must run in a DB transaction context. It's designed to be called from JobCoordinator#commitJob.
*/
async commit(jobId: string): Promise<void> {
const highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId);
if (highestAgedIndexForJob) {
for (const [secret, index] of highestAgedIndexForJob.entries()) {
await this.#highestAgedIndex.set(secret, index);
}
}

const highestFinalizedIndexForJob = this.#highestFinalizedIndexForJob.get(jobId);
if (highestFinalizedIndexForJob) {
for (const [secret, index] of highestFinalizedIndexForJob.entries()) {
await this.#highestFinalizedIndex.set(secret, index);
}
}

return this.discardStaged(jobId);
}

discardStaged(jobId: string): Promise<void> {
this.#highestAgedIndexForJob.delete(jobId);
this.#highestFinalizedIndexForJob.delete(jobId);
return Promise.resolve();
}

getHighestAgedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> {
return this.#highestAgedIndex.getAsync(secret.toString());
getHighestAgedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> {
return this.#readHighestAgedIndex(jobId, secret.toString());
}

async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> {
const currentIndex = await this.#highestAgedIndex.getAsync(secret.toString());
async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> {
const currentIndex = await this.#readHighestAgedIndex(jobId, secret.toString());
if (currentIndex !== undefined && index <= currentIndex) {
// Log sync should never set a lower highest aged index.
throw new Error(`New highest aged index (${index}) must be higher than the current one (${currentIndex})`);
}
await this.#highestAgedIndex.set(secret.toString(), index);
this.#writeHighestAgedIndex(jobId, secret.toString(), index);
}

getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> {
return this.#highestFinalizedIndex.getAsync(secret.toString());
getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> {
return this.#readHighestFinalizedIndex(jobId, secret.toString());
}

async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> {
const currentIndex = await this.#highestFinalizedIndex.getAsync(secret.toString());
async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> {
const currentIndex = await this.#readHighestFinalizedIndex(jobId, secret.toString());
if (currentIndex !== undefined && index < currentIndex) {
// Log sync should never set a lower highest finalized index but it can happen that it would try to set the same
// one because we are loading logs from highest aged index + 1 and not from the highest finalized index.
throw new Error(`New highest finalized index (${index}) must be higher than the current one (${currentIndex})`);
}
await this.#highestFinalizedIndex.set(secret.toString(), index);
this.#writeHighestFinalizedIndex(jobId, secret.toString(), index);
}
}
Loading
Loading