1- import { Effect , type Fiber , Option } from "effect"
1+ import { Effect , type Fiber , Option , Queue } from "effect"
22import type { TokenRawDenom } from "$lib/schema/token"
33import type { Chain , UniversalChainId } from "$lib/schema/chain"
44import { RawTokenBalance } from "$lib/schema/token"
5- import { createEvmBalanceQuery , type FetchEvmBalanceError } from "$lib/services/evm/balances"
6- import {
7- createCosmosBalanceQuery ,
8- type FetchCosmosBalanceError
9- } from "$lib/services/cosmos/balances"
5+ import { fetchEvmBalance , type FetchEvmBalanceError } from "$lib/services/evm/balances"
6+ import { fetchCosmosBalance , type FetchCosmosBalanceError } from "$lib/services/cosmos/balances"
107import { SvelteMap } from "svelte/reactivity"
118import {
129 AddressEvmCanonical ,
@@ -24,12 +21,29 @@ const createKey = (
2421 denom : TokenRawDenom
2522) : BalanceKey => `${ universalChainId } :${ address } :${ denom } `
2623
24+ // Type for a balance fetch request
25+ type BalanceFetchRequest = {
26+ chain : Chain
27+ address : AddressCanonicalBytes
28+ denom : TokenRawDenom
29+ }
30+
31+ // Type for chain key
32+ type ChainKey = `${UniversalChainId } :${AddressCanonicalBytes } `
33+
34+ // Helper to create the chain key
35+ const createChainKey = (
36+ universalChainId : UniversalChainId ,
37+ address : AddressCanonicalBytes
38+ ) : ChainKey => `${ universalChainId } :${ address } `
39+
2740export class BalancesStore {
2841 data = $state ( new SvelteMap < BalanceKey , RawTokenBalance > ( ) )
2942 errors = $state (
3043 new SvelteMap < BalanceKey , Option . Option < FetchEvmBalanceError | FetchCosmosBalanceError > > ( )
3144 )
32- fibers = $state ( new SvelteMap < BalanceKey , Fiber . RuntimeFiber < number , never > > ( ) )
45+ chainFibers = $state ( new SvelteMap < ChainKey , Fiber . RuntimeFiber < void , never > > ( ) )
46+ pendingRequests = $state ( new SvelteMap < ChainKey , Array < BalanceFetchRequest > > ( ) )
3347
3448 setBalance (
3549 universalChainId : UniversalChainId ,
@@ -65,37 +79,102 @@ export class BalancesStore {
6579 return this . errors . get ( createKey ( universalChainId , address , denom ) ) ?? Option . none ( )
6680 }
6781
68- fetchBalance ( chain : Chain , address : AddressCanonicalBytes , denom : TokenRawDenom ) {
69- const key = createKey ( chain . universal_chain_id , address , denom )
82+ // Process balance requests for a specific chain one at a time
83+ private processBatchedBalances (
84+ chain : Chain ,
85+ address : AddressCanonicalBytes ,
86+ denoms : Array < TokenRawDenom >
87+ ) {
88+ const chainKey = createChainKey ( chain . universal_chain_id , address )
89+ const self = this
7090
71- // If there's already a query running for this combination, don't start another one
72- if ( this . fibers . has ( key ) ) {
91+ // If there's already a query running for this chain, don't start another one
92+ if ( this . chainFibers . has ( chainKey ) ) {
93+ // Add these requests to pending
94+ const existing = this . pendingRequests . get ( chainKey ) || [ ]
95+ const newRequests = denoms . map ( denom => ( { chain, address, denom } ) )
96+ this . pendingRequests . set ( chainKey , [ ...existing , ...newRequests ] )
7397 return
7498 }
7599
76- let query =
77- chain . rpc_type === "evm"
78- ? createEvmBalanceQuery ( {
79- chain,
80- tokenAddress : denom ,
81- walletAddress : AddressEvmCanonical . make ( address ) ,
82- refetchInterval : "15 minutes" ,
83- writeData : balance =>
84- this . setBalance ( chain . universal_chain_id , address , denom , balance ) ,
85- writeError : error => this . setError ( chain . universal_chain_id , address , denom , error )
86- } )
87- : createCosmosBalanceQuery ( {
88- chain,
89- tokenAddress : denom ,
90- walletAddress : AddressCosmosCanonical . make ( address ) ,
91- refetchInterval : "15 minutes" ,
92- writeData : balance =>
93- this . setBalance ( chain . universal_chain_id , address , denom , balance ) ,
94- writeError : error => this . setError ( chain . universal_chain_id , address , denom , error )
95- } )
96-
97- const fiber = Effect . runFork ( query )
98- this . fibers . set ( key , fiber )
100+ // Create a queue for processing balance requests
101+ const batchProcessor = Effect . gen ( function * ( _ ) {
102+ // Create a queue for balance requests
103+ const queue = yield * Queue . unbounded < BalanceFetchRequest > ( )
104+
105+ // Add all denoms to the queue
106+ for ( const denom of denoms ) {
107+ yield * Queue . offer ( queue , { chain, address, denom } )
108+ }
109+
110+ yield * Effect . forever (
111+ Effect . gen ( function * ( _ ) {
112+ // Take the next request from the queue
113+ const request = yield * Queue . take ( queue )
114+ const { chain, address, denom } = request
115+
116+ // Process the balance request
117+ yield * Effect . gen ( function * ( _ ) {
118+ let balance : RawTokenBalance
119+ if ( chain . rpc_type === "evm" ) {
120+ balance = yield * fetchEvmBalance ( {
121+ chain,
122+ tokenAddress : denom ,
123+ walletAddress : AddressEvmCanonical . make ( address )
124+ } )
125+ } else {
126+ balance = yield * fetchCosmosBalance ( {
127+ chain,
128+ tokenAddress : denom ,
129+ walletAddress : AddressCosmosCanonical . make ( address )
130+ } )
131+ }
132+
133+ // Update the balance
134+ self . setBalance ( chain . universal_chain_id , address , denom , balance )
135+ self . setError ( chain . universal_chain_id , address , denom , Option . none ( ) )
136+ } ) . pipe (
137+ Effect . catchAll ( error => {
138+ // Update the error
139+ self . setError ( chain . universal_chain_id , address , denom , Option . some ( error ) )
140+ return Effect . succeed ( undefined )
141+ } )
142+ )
143+ } )
144+ )
145+ } ) . pipe (
146+ Effect . catchAll ( error => {
147+ Effect . logError ( "error processing balance batch:" , error )
148+ return Effect . succeed ( undefined )
149+ } ) ,
150+ Effect . ensuring (
151+ Effect . sync ( ( ) => {
152+ // Check if there are pending requests for this chain
153+ const pending = self . pendingRequests . get ( chainKey ) || [ ]
154+ self . pendingRequests . delete ( chainKey )
155+ self . chainFibers . delete ( chainKey )
156+
157+ // If there are pending requests, process them
158+ if ( pending . length > 0 ) {
159+ const pendingDenoms = pending . map ( req => req . denom )
160+ self . processBatchedBalances ( chain , address , pendingDenoms )
161+ }
162+ } )
163+ )
164+ )
165+
166+ // Run the batch processor
167+ const fiber = Effect . runFork ( batchProcessor )
168+ this . chainFibers . set ( chainKey , fiber )
169+ }
170+
171+ fetchBalance ( chain : Chain , address : AddressCanonicalBytes , denom : TokenRawDenom ) {
172+ this . processBatchedBalances ( chain , address , [ denom ] )
173+ }
174+
175+ fetchBalances ( chain : Chain , address : AddressCanonicalBytes , denoms : Array < TokenRawDenom > ) {
176+ if ( denoms . length === 0 ) return
177+ this . processBatchedBalances ( chain , address , denoms )
99178 }
100179}
101180
0 commit comments