-
Notifications
You must be signed in to change notification settings - Fork 581
refactor: staged writes in tagging stores #19476
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,88 @@ | ||||||||
| import { Fr } from '@aztec/foundation/curves/bn254'; | ||||||||
| import { openTmpStore } from '@aztec/kv-store/lmdb-v2'; | ||||||||
| import { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; | ||||||||
|
|
||||||||
| import { RecipientTaggingStore } from './recipient_tagging_store.js'; | ||||||||
|
|
||||||||
| describe('RecipientTaggingStore', () => { | ||||||||
| let taggingStore: RecipientTaggingStore; | ||||||||
| let secret1: DirectionalAppTaggingSecret; | ||||||||
| let secret2: DirectionalAppTaggingSecret; | ||||||||
|
|
||||||||
| beforeEach(async () => { | ||||||||
| taggingStore = new RecipientTaggingStore(await openTmpStore('test')); | ||||||||
| secret1 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); | ||||||||
| secret2 = DirectionalAppTaggingSecret.fromString(Fr.random().toString()); | ||||||||
| }); | ||||||||
|
|
||||||||
| describe('staged writes', () => { | ||||||||
| it('persists staged highest aged index to the store', async () => { | ||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBeUndefined(); | ||||||||
|
|
||||||||
| await taggingStore.commit('job1'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); | ||||||||
| }); | ||||||||
|
|
||||||||
| it('persists staged highest finalized index to the store', async () => { | ||||||||
| await taggingStore.updateHighestFinalizedIndex(secret1, 10, 'job1'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBeUndefined(); | ||||||||
|
|
||||||||
| await taggingStore.commit('job1'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(10); | ||||||||
| }); | ||||||||
|
|
||||||||
| it('persists multiple secrets for the same job', async () => { | ||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); | ||||||||
| await taggingStore.updateHighestAgedIndex(secret2, 8, 'job1'); | ||||||||
| await taggingStore.updateHighestFinalizedIndex(secret1, 3, 'job1'); | ||||||||
| await taggingStore.updateHighestFinalizedIndex(secret2, 6, 'job1'); | ||||||||
|
|
||||||||
| await taggingStore.commit('job1'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(5); | ||||||||
| expect(await taggingStore.getHighestAgedIndex(secret2, 'job2')).toBe(8); | ||||||||
| expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job2')).toBe(3); | ||||||||
| expect(await taggingStore.getHighestFinalizedIndex(secret2, 'job2')).toBe(6); | ||||||||
| }); | ||||||||
|
|
||||||||
| it('clears staged data after commit', async () => { | ||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); | ||||||||
| await taggingStore.commit('job1'); | ||||||||
|
|
||||||||
| // Updating again with a higher value in the same job should work | ||||||||
| // (if staged data wasn't cleared, it would still have the old value cached) | ||||||||
|
Comment on lines
+57
to
+58
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We are not using the same job below though so this seems out of place? |
||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); | ||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job2')).toBe(10); | ||||||||
| await taggingStore.commit('job2'); | ||||||||
|
|
||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(10); | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Took me a bit to figure out how exactly this is testing the staged gets cleared. |
||||||||
| }); | ||||||||
|
|
||||||||
| it('does not affect other jobs when committing', async () => { | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one also should have the finalized index counterpart. |
||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); | ||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 10, 'job2'); | ||||||||
|
|
||||||||
| await taggingStore.commit('job2'); | ||||||||
|
|
||||||||
| // job1's staged value should still be intact | ||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBe(5); | ||||||||
| }); | ||||||||
|
|
||||||||
| it('discards staged highest aged index without persisting', async () => { | ||||||||
mverzilli marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||
| await taggingStore.updateHighestAgedIndex(secret1, 5, 'job1'); | ||||||||
| await taggingStore.discardStaged('job1'); | ||||||||
| expect(await taggingStore.getHighestAgedIndex(secret1, 'job1')).toBeUndefined(); | ||||||||
| }); | ||||||||
|
|
||||||||
| it('discards staged highest finalized index without persisting', async () => { | ||||||||
| await taggingStore.updateHighestFinalizedIndex(secret1, 5, 'job1'); | ||||||||
| await taggingStore.discardStaged('job1'); | ||||||||
| expect(await taggingStore.getHighestFinalizedIndex(secret1, 'job1')).toBeUndefined(); | ||||||||
| }); | ||||||||
| }); | ||||||||
| }); | ||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,53 +1,129 @@ | ||
| import type { AztecAsyncKVStore, AztecAsyncMap } from '@aztec/kv-store'; | ||
| import type { DirectionalAppTaggingSecret } from '@aztec/stdlib/logs'; | ||
|
|
||
| import type { StagedStore } from '../../job_coordinator/job_coordinator.js'; | ||
|
|
||
| /** | ||
| * Data provider of tagging data used when syncing the logs as a recipient. The sender counterpart of this class | ||
| * is called SenderTaggingStore. We have the providers separate for the sender and recipient because | ||
| * the algorithms are completely disjoint and there is not data reuse between the two. | ||
| * | ||
| * @dev Chain reorgs do not need to be handled here because both the finalized and aged indexes refer to finalized | ||
| * blocks, which by definition cannot be affected by reorgs. | ||
| * | ||
| * TODO(benesjan): Relocate to yarn-project/pxe/src/storage/tagging_store | ||
| */ | ||
| export class RecipientTaggingStore { | ||
| export class RecipientTaggingStore implements StagedStore { | ||
| storeName: string = 'recipient_tagging'; | ||
|
|
||
| #store: AztecAsyncKVStore; | ||
|
|
||
| #highestAgedIndex: AztecAsyncMap<string, number>; | ||
| #highestFinalizedIndex: AztecAsyncMap<string, number>; | ||
|
|
||
| // jobId => secret => number | ||
| #highestAgedIndexForJob: Map<string, Map<string, number>>; | ||
|
|
||
| // jobId => secret => number | ||
| #highestFinalizedIndexForJob: Map<string, Map<string, number>>; | ||
|
|
||
| constructor(store: AztecAsyncKVStore) { | ||
| this.#store = store; | ||
|
|
||
| this.#highestAgedIndex = this.#store.openMap('highest_aged_index'); | ||
| this.#highestFinalizedIndex = this.#store.openMap('highest_finalized_index'); | ||
|
|
||
| this.#highestAgedIndexForJob = new Map(); | ||
| this.#highestFinalizedIndexForJob = new Map(); | ||
| } | ||
|
|
||
| #getHighestAgedIndexForJob(jobId: string): Map<string, number> { | ||
| let highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId); | ||
| if (!highestAgedIndexForJob) { | ||
| highestAgedIndexForJob = new Map(); | ||
| this.#highestAgedIndexForJob.set(jobId, highestAgedIndexForJob); | ||
| } | ||
| return highestAgedIndexForJob; | ||
| } | ||
|
|
||
| async #readHighestAgedIndex(jobId: string, secret: string): Promise<number | undefined> { | ||
| return this.#getHighestAgedIndexForJob(jobId).get(secret) ?? (await this.#highestAgedIndex.getAsync(secret)); | ||
| } | ||
|
|
||
| #writeHighestAgedIndex(jobId: string, secret: string, index: number) { | ||
| this.#getHighestAgedIndexForJob(jobId).set(secret, index); | ||
| } | ||
|
|
||
| #getHighestFinalizedIndexForJob(jobId: string): Map<string, number> { | ||
| let jobStagedHighestFinalizedIndex = this.#highestFinalizedIndexForJob.get(jobId); | ||
| if (!jobStagedHighestFinalizedIndex) { | ||
| jobStagedHighestFinalizedIndex = new Map(); | ||
| this.#highestFinalizedIndexForJob.set(jobId, jobStagedHighestFinalizedIndex); | ||
| } | ||
| return jobStagedHighestFinalizedIndex; | ||
| } | ||
|
|
||
| async #readHighestFinalizedIndex(jobId: string, secret: string): Promise<number | undefined> { | ||
| return ( | ||
| this.#getHighestFinalizedIndexForJob(jobId).get(secret) ?? (await this.#highestFinalizedIndex.getAsync(secret)) | ||
| ); | ||
| } | ||
|
|
||
| #writeHighestFinalizedIndex(jobId: string, secret: string, index: number) { | ||
| this.#getHighestFinalizedIndexForJob(jobId).set(secret, index); | ||
| } | ||
|
|
||
| /** | ||
| * Writes all job-specific in-memory data to persistent storage. | ||
| * | ||
| * @remark This method must run in a DB transaction context. It's designed to be called from JobCoordinator#commitJob. | ||
| */ | ||
| async commit(jobId: string): Promise<void> { | ||
| const highestAgedIndexForJob = this.#highestAgedIndexForJob.get(jobId); | ||
| if (highestAgedIndexForJob) { | ||
| for (const [secret, index] of highestAgedIndexForJob.entries()) { | ||
| await this.#highestAgedIndex.set(secret, index); | ||
| } | ||
| } | ||
|
|
||
| const highestFinalizedIndexForJob = this.#highestFinalizedIndexForJob.get(jobId); | ||
| if (highestFinalizedIndexForJob) { | ||
| for (const [secret, index] of highestFinalizedIndexForJob.entries()) { | ||
| await this.#highestFinalizedIndex.set(secret, index); | ||
| } | ||
| } | ||
|
|
||
| return this.discardStaged(jobId); | ||
| } | ||
|
|
||
| discardStaged(jobId: string): Promise<void> { | ||
| this.#highestAgedIndexForJob.delete(jobId); | ||
| this.#highestFinalizedIndexForJob.delete(jobId); | ||
| return Promise.resolve(); | ||
| } | ||
|
|
||
| getHighestAgedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> { | ||
| return this.#highestAgedIndex.getAsync(secret.toString()); | ||
| getHighestAgedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> { | ||
| return this.#readHighestAgedIndex(jobId, secret.toString()); | ||
| } | ||
|
|
||
| async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> { | ||
| const currentIndex = await this.#highestAgedIndex.getAsync(secret.toString()); | ||
| async updateHighestAgedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> { | ||
| const currentIndex = await this.#readHighestAgedIndex(jobId, secret.toString()); | ||
| if (currentIndex !== undefined && index <= currentIndex) { | ||
| // Log sync should never set a lower highest aged index. | ||
| throw new Error(`New highest aged index (${index}) must be higher than the current one (${currentIndex})`); | ||
| } | ||
| await this.#highestAgedIndex.set(secret.toString(), index); | ||
| this.#writeHighestAgedIndex(jobId, secret.toString(), index); | ||
| } | ||
|
|
||
| getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret): Promise<number | undefined> { | ||
| return this.#highestFinalizedIndex.getAsync(secret.toString()); | ||
| getHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, jobId: string): Promise<number | undefined> { | ||
| return this.#readHighestFinalizedIndex(jobId, secret.toString()); | ||
| } | ||
|
|
||
| async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number): Promise<void> { | ||
| const currentIndex = await this.#highestFinalizedIndex.getAsync(secret.toString()); | ||
| async updateHighestFinalizedIndex(secret: DirectionalAppTaggingSecret, index: number, jobId: string): Promise<void> { | ||
| const currentIndex = await this.#readHighestFinalizedIndex(jobId, secret.toString()); | ||
| if (currentIndex !== undefined && index < currentIndex) { | ||
| // Log sync should never set a lower highest finalized index but it can happen that it would try to set the same | ||
| // one because we are loading logs from highest aged index + 1 and not from the highest finalized index. | ||
| throw new Error(`New highest finalized index (${index}) must be higher than the current one (${currentIndex})`); | ||
| } | ||
| await this.#highestFinalizedIndex.set(secret.toString(), index); | ||
| this.#writeHighestFinalizedIndex(jobId, secret.toString(), index); | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know it's annoying but given that aged and finalized indexes are stored completely separately we should have this test case (and some of the following) duplicated for both type of indexes.