@@ -52,6 +52,9 @@ pub struct State {
5252 pub accounts : AccountsState ,
5353 /// HAMT containing all blobs keyed by blob hash.
5454 pub blobs : BlobsState ,
55+ /// The next account to debit in the current debit cycle.
56+ /// If this is None, we have finished the debit cycle.
57+ pub next_debit_addr : Option < Address > ,
5558}
5659
5760/// Key used to namespace subscriptions in the expiry index.
@@ -121,6 +124,7 @@ impl State {
121124 pending : BlobsProgressCollection :: new ( store, "pending blobs queue" ) ?,
122125 accounts : AccountsState :: new ( store) ?,
123126 blobs : BlobsState :: new ( store) ?,
127+ next_debit_addr : None ,
124128 } )
125129 }
126130
@@ -550,35 +554,43 @@ impl State {
550554 & mut self ,
551555 store : & BS ,
552556 current_epoch : ChainEpoch ,
557+ blob_delete_batch_size : u64 ,
558+ account_debit_batch_size : u64 ,
553559 ) -> anyhow:: Result < HashSet < Hash > , ActorError > {
554560 // Delete expired subscriptions
555561 let mut delete_from_disc = HashSet :: new ( ) ;
556562 let mut num_deleted = 0 ;
557- let expiries = self . expiries . clone ( ) ;
558- expiries. foreach_up_to_epoch ( store, current_epoch, |_, subscriber, key| {
559- match self . delete_blob (
560- store,
561- subscriber,
562- subscriber,
563- current_epoch,
564- key. hash ,
565- key. id . clone ( ) ,
566- ) {
567- Ok ( ( from_disc, _) ) => {
568- num_deleted += 1 ;
569- if from_disc {
570- delete_from_disc. insert ( key. hash ) ;
563+ let mut expiries = self . expiries . clone ( ) ;
564+
565+ expiries. foreach_up_to_epoch (
566+ store,
567+ current_epoch,
568+ Some ( blob_delete_batch_size) ,
569+ |_, subscriber, key| {
570+ match self . delete_blob (
571+ store,
572+ subscriber,
573+ subscriber,
574+ current_epoch,
575+ key. hash ,
576+ key. id . clone ( ) ,
577+ ) {
578+ Ok ( ( from_disc, _) ) => {
579+ num_deleted += 1 ;
580+ if from_disc {
581+ delete_from_disc. insert ( key. hash ) ;
582+ }
583+ }
584+ Err ( e) => {
585+ warn ! (
586+ "failed to delete blob {} for {} (id: {}): {}" ,
587+ key. hash, subscriber, key. id, e
588+ )
571589 }
572590 }
573- Err ( e) => {
574- warn ! (
575- "failed to delete blob {} for {} (id: {}): {}" ,
576- key. hash, subscriber, key. id, e
577- )
578- }
579- }
580- Ok ( ( ) )
581- } ) ?;
591+ Ok ( ( ) )
592+ } ,
593+ ) ?;
582594 debug ! ( "deleted {} expired subscriptions" , num_deleted) ;
583595 debug ! (
584596 "{} blobs marked for deletion from disc" ,
@@ -587,19 +599,32 @@ impl State {
587599 // Debit for existing usage
588600 let reader = self . accounts . hamt ( store) ?;
589601 let mut writer = self . accounts . hamt ( store) ?;
590- reader. for_each ( |address, account| {
591- let mut account = account. clone ( ) ;
592- let debit_blocks = current_epoch - account. last_debit_epoch ;
593- let debit_credits =
594- Credit :: from_whole ( self . get_storage_cost ( debit_blocks, & account. capacity_used ) ) ;
595- self . credit_debited += & debit_credits;
596- self . credit_committed -= & debit_credits;
597- account. credit_committed -= & debit_credits;
598- account. last_debit_epoch = current_epoch;
599- debug ! ( "debited {} credits from {}" , debit_credits, address) ;
600- writer. set ( & address, account) ?;
601- Ok ( ( ) )
602- } ) ?;
602+
603+ let start_key = self
604+ . next_debit_addr
605+ . map ( |address| BytesKey :: from ( address. to_bytes ( ) ) ) ;
606+ let ( count, next_account) = reader. for_each_ranged (
607+ start_key. as_ref ( ) ,
608+ Some ( account_debit_batch_size as usize ) ,
609+ |address, account| {
610+ let mut account = account. clone ( ) ;
611+ let debit_blocks = current_epoch - account. last_debit_epoch ;
612+ let debit_credits =
613+ Credit :: from_whole ( self . get_storage_cost ( debit_blocks, & account. capacity_used ) ) ;
614+ self . credit_debited += & debit_credits;
615+ self . credit_committed -= & debit_credits;
616+ account. credit_committed -= & debit_credits;
617+ account. last_debit_epoch = current_epoch;
618+ debug ! ( "debited {} credits from {}" , debit_credits, address) ;
619+ writer. set ( & address, account) ?;
620+ Ok ( ( ) )
621+ } ,
622+ ) ?;
623+ debug ! (
624+ "finished debiting {:#?} accounts, next account: {:#?}" ,
625+ count, next_account
626+ ) ;
627+ self . next_debit_addr = next_account;
603628 self . accounts . root = writer. flush ( ) ?;
604629 Ok ( delete_from_disc)
605630 }
@@ -2281,7 +2306,14 @@ mod tests {
22812306
22822307 // Debit all accounts at an epoch between the two expiries (3601-3621)
22832308 let debit_epoch = ChainEpoch :: from ( config. blob_min_ttl + 11 ) ;
2284- let deletes_from_disc = state. debit_accounts ( & store, debit_epoch) . unwrap ( ) ;
2309+ let deletes_from_disc = state
2310+ . debit_accounts (
2311+ & store,
2312+ debit_epoch,
2313+ config. blob_delete_batch_size ,
2314+ config. account_debit_batch_size ,
2315+ )
2316+ . unwrap ( ) ;
22852317 assert ! ( deletes_from_disc. is_empty( ) ) ;
22862318
22872319 // Check the account balance
@@ -2301,7 +2333,14 @@ mod tests {
23012333
23022334 // Debit all accounts at an epoch greater than group expiry (3621)
23032335 let debit_epoch = ChainEpoch :: from ( config. blob_min_ttl + 31 ) ;
2304- let deletes_from_disc = state. debit_accounts ( & store, debit_epoch) . unwrap ( ) ;
2336+ let deletes_from_disc = state
2337+ . debit_accounts (
2338+ & store,
2339+ debit_epoch,
2340+ config. blob_delete_batch_size ,
2341+ config. account_debit_batch_size ,
2342+ )
2343+ . unwrap ( ) ;
23052344 assert ! ( !deletes_from_disc. is_empty( ) ) ; // blob is marked for deletion
23062345
23072346 // Check the account balance
@@ -2901,7 +2940,14 @@ mod tests {
29012940
29022941 // Debit all accounts
29032942 let debit_epoch = ChainEpoch :: from ( 41 ) ;
2904- let deletes_from_disc = state. debit_accounts ( & store, debit_epoch) . unwrap ( ) ;
2943+ let deletes_from_disc = state
2944+ . debit_accounts (
2945+ & store,
2946+ debit_epoch,
2947+ config. blob_delete_batch_size ,
2948+ config. account_debit_batch_size ,
2949+ )
2950+ . unwrap ( ) ;
29052951 assert ! ( deletes_from_disc. is_empty( ) ) ;
29062952
29072953 // Check the account balance
@@ -3261,7 +3307,14 @@ mod tests {
32613307
32623308 // Debit accounts to trigger a refund when we fail below
32633309 let debit_epoch = ChainEpoch :: from ( 11 ) ;
3264- let deletes_from_disc = state. debit_accounts ( & store, debit_epoch) . unwrap ( ) ;
3310+ let deletes_from_disc = state
3311+ . debit_accounts (
3312+ & store,
3313+ debit_epoch,
3314+ config. blob_delete_batch_size ,
3315+ config. account_debit_batch_size ,
3316+ )
3317+ . unwrap ( ) ;
32653318 assert ! ( deletes_from_disc. is_empty( ) ) ;
32663319
32673320 // Check the account balance
@@ -4522,7 +4575,14 @@ mod tests {
45224575
45234576 // Every debit interval epochs we debit all acounts
45244577 if epoch % debit_interval == 0 {
4525- let deletes_from_disc = state. debit_accounts ( & store, epoch) . unwrap ( ) ;
4578+ let deletes_from_disc = state
4579+ . debit_accounts (
4580+ & store,
4581+ epoch,
4582+ config. blob_delete_batch_size ,
4583+ config. account_debit_batch_size ,
4584+ )
4585+ . unwrap ( ) ;
45264586 warn ! (
45274587 "deleting {} blobs at epoch {}" ,
45284588 deletes_from_disc. len( ) ,
@@ -4565,4 +4625,151 @@ mod tests {
45654625 assert_eq ! ( stats. num_resolving, 0 ) ;
45664626 assert_eq ! ( stats. bytes_resolving, 0 ) ;
45674627 }
4628+
4629+ #[ test]
4630+ fn test_paginated_debit_accounts ( ) {
4631+ let config = RecallConfig {
4632+ account_debit_batch_size : 5 , // Process 5 accounts at a time (10 accounts total)
4633+ ..Default :: default ( )
4634+ } ;
4635+
4636+ let store = MemoryBlockstore :: default ( ) ;
4637+ let mut state = State :: new ( & store) . unwrap ( ) ;
4638+ let current_epoch = ChainEpoch :: from ( 1 ) ;
4639+
4640+ // Create more than one batch worth of accounts (>5)
4641+ for i in 0 ..10 {
4642+ let address = Address :: new_id ( 1000 + i) ;
4643+ let token_amount = TokenAmount :: from_whole ( 10 ) ;
4644+
4645+ // Buy credits for each account
4646+ state
4647+ . buy_credit (
4648+ & config,
4649+ & store,
4650+ address,
4651+ token_amount. clone ( ) ,
4652+ current_epoch,
4653+ )
4654+ . unwrap ( ) ;
4655+
4656+ // Add some storage usage
4657+ let mut accounts = state. accounts . hamt ( & store) . unwrap ( ) ;
4658+ let mut account = accounts. get ( & address) . unwrap ( ) . unwrap ( ) ;
4659+ account. capacity_used = 1000 ;
4660+ accounts. set ( & address, account) . unwrap ( ) ;
4661+ }
4662+
4663+ // First batch (should process 5 accounts)
4664+ assert ! ( state. next_debit_addr. is_none( ) ) ;
4665+ let deletes1 = state
4666+ . debit_accounts (
4667+ & store,
4668+ current_epoch + 1 ,
4669+ config. blob_delete_batch_size ,
4670+ config. account_debit_batch_size ,
4671+ )
4672+ . unwrap ( ) ;
4673+ assert ! ( deletes1. is_empty( ) ) ; // No expired blobs
4674+ assert ! ( state. next_debit_addr. is_some( ) ) ;
4675+
4676+ // Second batch (should process remaining 5 accounts and clear state)
4677+ let deletes2 = state
4678+ . debit_accounts (
4679+ & store,
4680+ current_epoch + 1 ,
4681+ config. blob_delete_batch_size ,
4682+ config. account_debit_batch_size ,
4683+ )
4684+ . unwrap ( ) ;
4685+ assert ! ( deletes2. is_empty( ) ) ;
4686+ assert ! ( state. next_debit_addr. is_none( ) ) ; // State should be cleared after all accounts processed
4687+
4688+ // Verify all accounts were processed
4689+ let reader = state. accounts . hamt ( & store) . unwrap ( ) ;
4690+ reader
4691+ . for_each ( |_, account| {
4692+ assert_eq ! ( account. last_debit_epoch, current_epoch + 1 ) ;
4693+ Ok ( ( ) )
4694+ } )
4695+ . unwrap ( ) ;
4696+ }
4697+
4698+ #[ test]
4699+ fn test_multiple_debit_cycles ( ) {
4700+ let config = RecallConfig {
4701+ account_debit_batch_size : 5 , // Process 5 accounts at a time (10 accounts total)
4702+ ..Default :: default ( )
4703+ } ;
4704+
4705+ let store = MemoryBlockstore :: default ( ) ;
4706+ let mut state = State :: new ( & store) . unwrap ( ) ;
4707+ let current_epoch = ChainEpoch :: from ( 1 ) ;
4708+
4709+ // Create accounts
4710+ for i in 0 ..10 {
4711+ let address = Address :: new_id ( 1000 + i) ;
4712+ let token_amount = TokenAmount :: from_whole ( 10 ) ;
4713+ state
4714+ . buy_credit (
4715+ & config,
4716+ & store,
4717+ address,
4718+ token_amount. clone ( ) ,
4719+ current_epoch,
4720+ )
4721+ . unwrap ( ) ;
4722+
4723+ let mut accounts = state. accounts . hamt ( & store) . unwrap ( ) ;
4724+ let mut account = accounts. get ( & address) . unwrap ( ) . unwrap ( ) ;
4725+ account. capacity_used = 1000 ;
4726+ accounts. set ( & address, account) . unwrap ( ) ;
4727+ }
4728+
4729+ // First cycle
4730+ let deletes1 = state
4731+ . debit_accounts (
4732+ & store,
4733+ current_epoch + 1 ,
4734+ config. blob_delete_batch_size ,
4735+ config. account_debit_batch_size ,
4736+ )
4737+ . unwrap ( ) ;
4738+ assert ! ( deletes1. is_empty( ) ) ;
4739+ assert ! ( state. next_debit_addr. is_some( ) ) ;
4740+
4741+ let deletes2 = state
4742+ . debit_accounts (
4743+ & store,
4744+ current_epoch + 1 ,
4745+ config. blob_delete_batch_size ,
4746+ config. account_debit_batch_size ,
4747+ )
4748+ . unwrap ( ) ;
4749+ assert ! ( deletes2. is_empty( ) ) ;
4750+ assert ! ( state. next_debit_addr. is_none( ) ) ; // First cycle complete
4751+
4752+ // Second cycle
4753+ let deletes3 = state
4754+ . debit_accounts (
4755+ & store,
4756+ current_epoch + 2 ,
4757+ config. blob_delete_batch_size ,
4758+ config. account_debit_batch_size ,
4759+ )
4760+ . unwrap ( ) ;
4761+ assert ! ( deletes3. is_empty( ) ) ;
4762+ assert ! ( state. next_debit_addr. is_some( ) ) ;
4763+
4764+ let deletes4 = state
4765+ . debit_accounts (
4766+ & store,
4767+ current_epoch + 2 ,
4768+ config. blob_delete_batch_size ,
4769+ config. account_debit_batch_size ,
4770+ )
4771+ . unwrap ( ) ;
4772+ assert ! ( deletes4. is_empty( ) ) ;
4773+ assert ! ( state. next_debit_addr. is_none( ) ) ; // Second cycle complete
4774+ }
45684775}
0 commit comments