|
18 | 18 | import io.lettuce.core.XAddArgs; |
19 | 19 | import io.lettuce.core.XClaimArgs; |
20 | 20 | import io.lettuce.core.XGroupCreateArgs; |
| 21 | +import io.lettuce.core.XPendingArgs; |
21 | 22 | import io.lettuce.core.XReadArgs; |
22 | 23 | import io.lettuce.core.XReadArgs.StreamOffset; |
23 | 24 | import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands; |
|
56 | 57 | * @author Tugdual Grall |
57 | 58 | * @author Dengliming |
58 | 59 | * @author Mark John Moreno |
| 60 | + * @author Jeonggyu Choi |
59 | 61 | * @since 2.2 |
60 | 62 | */ |
61 | 63 | class LettuceReactiveStreamCommands implements ReactiveStreamCommands { |
@@ -235,9 +237,16 @@ public Flux<CommandResponse<PendingRecordsCommand, PendingMessages>> xPending( |
235 | 237 | io.lettuce.core.Limit limit = command.isLimited() ? io.lettuce.core.Limit.from(command.getCount()) |
236 | 238 | : io.lettuce.core.Limit.unlimited(); |
237 | 239 |
|
238 | | - Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), |
239 | | - io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) |
240 | | - : cmd.xpending(command.getKey(), groupName, range, limit); |
| 240 | + XPendingArgs<ByteBuffer> xPendingArgs = XPendingArgs.Builder.xpending(groupName, range, limit); |
| 241 | + if (command.hasConsumer()) { |
| 242 | + io.lettuce.core.Consumer<ByteBuffer> consumer = io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())); |
| 243 | + xPendingArgs.consumer(consumer); |
| 244 | + } |
| 245 | + if (command.hasIdle()) { |
| 246 | + xPendingArgs.idle(command.getIdle()); |
| 247 | + } |
| 248 | + |
| 249 | + Flux<PendingMessage> publisher = cmd.xpending(command.getKey(), xPendingArgs); |
241 | 250 |
|
242 | 251 | return publisher.collectList().map(it -> { |
243 | 252 |
|
|
0 commit comments