@@ -28,6 +28,7 @@ export type ReleaseConcurrencyQueueOptions<T> = {
2828 pollInterval ?: number ;
2929 batchSize ?: number ;
3030 retry ?: ReleaseConcurrencyQueueRetryOptions ;
31+ disableConsumers ?: boolean ;
3132} ;
3233
3334const QueueItemMetadata = z . object ( {
@@ -74,7 +75,10 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
7475 } ;
7576
7677 this . #registerCommands( ) ;
77- this . #startConsumers( ) ;
78+
79+ if ( ! options . disableConsumers ) {
80+ this . #startConsumers( ) ;
81+ }
7882 }
7983
8084 public async quit ( ) {
@@ -93,6 +97,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
9397 const maxTokens = await this . #callMaxTokens( releaseQueueDescriptor ) ;
9498
9599 if ( maxTokens === 0 ) {
100+ this . logger . debug ( "No tokens available, skipping release" , {
101+ releaseQueueDescriptor,
102+ releaserId,
103+ maxTokens,
104+ } ) ;
105+
96106 return ;
97107 }
98108
@@ -109,6 +119,14 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
109119 String ( Date . now ( ) )
110120 ) ;
111121
122+ this . logger . debug ( "Consumed token in attemptToRelease" , {
123+ releaseQueueDescriptor,
124+ releaserId,
125+ maxTokens,
126+ result,
127+ releaseQueue,
128+ } ) ;
129+
112130 if ( ! ! result ) {
113131 await this . #callExecutor( releaseQueueDescriptor , releaserId , {
114132 retryCount : 0 ,
@@ -119,6 +137,7 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
119137 releaseQueueDescriptor,
120138 releaserId,
121139 maxTokens,
140+ releaseQueue,
122141 } ) ;
123142 }
124143 }
@@ -130,13 +149,19 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
130149 */
131150 public async consumeToken ( releaseQueueDescriptor : T , releaserId : string ) {
132151 const maxTokens = await this . #callMaxTokens( releaseQueueDescriptor ) ;
152+ const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
133153
134154 if ( maxTokens === 0 ) {
155+ this . logger . debug ( "No tokens available, skipping consume" , {
156+ releaseQueueDescriptor,
157+ releaserId,
158+ maxTokens,
159+ releaseQueue,
160+ } ) ;
161+
135162 return ;
136163 }
137164
138- const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
139-
140165 await this . redis . consumeToken (
141166 this . masterQueuesKey ,
142167 this . #bucketKey( releaseQueue ) ,
@@ -147,6 +172,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
147172 String ( maxTokens ) ,
148173 String ( Date . now ( ) )
149174 ) ;
175+
176+ this . logger . debug ( "Consumed token in consumeToken" , {
177+ releaseQueueDescriptor,
178+ releaserId,
179+ maxTokens,
180+ releaseQueue,
181+ } ) ;
150182 }
151183
152184 /**
@@ -157,6 +189,11 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
157189 public async returnToken ( releaseQueueDescriptor : T , releaserId : string ) {
158190 const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
159191
192+ this . logger . debug ( "Returning token in returnToken" , {
193+ releaseQueueDescriptor,
194+ releaserId,
195+ } ) ;
196+
160197 await this . redis . returnTokenOnly (
161198 this . masterQueuesKey ,
162199 this . #bucketKey( releaseQueue ) ,
@@ -165,6 +202,12 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
165202 releaseQueue ,
166203 releaserId
167204 ) ;
205+
206+ this . logger . debug ( "Returned token in returnToken" , {
207+ releaseQueueDescriptor,
208+ releaserId,
209+ releaseQueue,
210+ } ) ;
168211 }
169212
170213 /**
@@ -177,10 +220,20 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
177220 const releaseQueue = this . keys . fromDescriptor ( releaseQueueDescriptor ) ;
178221
179222 if ( amount < 0 ) {
223+ this . logger . debug ( "Cannot refill with negative tokens" , {
224+ releaseQueueDescriptor,
225+ amount,
226+ } ) ;
227+
180228 throw new Error ( "Cannot refill with negative tokens" ) ;
181229 }
182230
183231 if ( amount === 0 ) {
232+ this . logger . debug ( "Cannot refill with 0 tokens" , {
233+ releaseQueueDescriptor,
234+ amount,
235+ } ) ;
236+
184237 return [ ] ;
185238 }
186239
@@ -192,6 +245,13 @@ export class ReleaseConcurrencyTokenBucketQueue<T> {
192245 String ( amount ) ,
193246 String ( maxTokens )
194247 ) ;
248+
249+ this . logger . debug ( "Refilled tokens in refillTokens" , {
250+ releaseQueueDescriptor,
251+ releaseQueue,
252+ amount,
253+ maxTokens,
254+ } ) ;
195255 }
196256
197257 /**
0 commit comments