Add ConfirmationChannel for async publisher confirmations#1824
Add ConfirmationChannel for async publisher confirmations#1824lukebakken wants to merge 1 commit intorabbitmq:mainfrom
ConfirmationChannel for async publisher confirmations#1824Conversation
|
Hey @acogoluegnes! Here's a proposal for implementing "automatic" publisher confirmation tracking in a similar manner to the .NET client. As you're the expert here, please suggest better names, a better implementation, etc, as I'm just barely familiar enough with Java and this project to implement this feature with the help of a genie. I did, of course, review the code. If you like the way this is going, I thought I'd also add rate-throttling in a similar manner to the .NET client, i.e. as the outstanding confirmation window closes, increase delay between publishes to (hopefully) allow the broker to catch up. |
6a31a46 to
714370e
Compare
|
Thanks for this contributon @lukebakken. I think it is on the right track. I added some comments in the code. Just a few more remarks:
|
714370e to
44c3aca
Compare
c64f7ba to
5520ef4
Compare
|
@lukebakken I have sent you an external contributor invite for this repo. |
5520ef4 to
dd2fb5e
Compare
ConfirmationChannel for async publisher confirmations
|
@michaelklishin @acogoluegnes I took this comment to heart and re-implemented this feature as a |
041b932 to
b0761bf
Compare
The new `ConfirmationChannel` API introduced in rabbitmq/rabbitmq-java-client#1824 provides asynchronous publisher confirmation tracking with a `CompletableFuture`-based API, rate limiting, and message correlation support. This change adds `PublisherConfirmsAsync.java` to demonstrate the `ConfirmationChannel` API. The tutorial shows how to create a `ConfirmationChannel` wrapper with rate limiting, publish messages asynchronously with correlation context, and wait for all confirmations using `CompletableFuture.allOf()`. Depends on rabbitmq/rabbitmq-java-client#1824
b0761bf to
49a0ff5
Compare
c002375 to
8112046
Compare
|
Alrighty gang, all set I think. Small tutorial here - rabbitmq/rabbitmq-tutorials#707 |
Traditional publisher confirms in the Java client require manual tracking of sequence numbers and correlation of Basic.Return messages. This makes per-message error handling complex and provides no built-in async pattern, backpressure mechanism, or message correlation support. This change introduces `ConfirmationChannel`, a wrapper that provides automatic publisher confirmation tracking with a `CompletableFuture`-based API, optional throttling, and generic context parameter for message correlation. The implementation uses listener-based integration with existing `Channel` instances, requiring no modifications to `ChannelN`. New API components: - `ConfirmationChannel` interface - Extends `Channel` and adds `basicPublishAsync()` methods that return `CompletableFuture<T>` - `ConfirmationChannelN` implementation - Wraps any `Channel` instance and tracks confirmations via return/confirm/shutdown listeners - `PublishException` - Exception thrown when message is nack'd or returned, with sequence number, routing details, and user context The wrapper maintains independent sequence numbers using `AtomicLong` and stores confirmation state in a `ConcurrentHashMap`. Each entry holds the future, rate limiter permit, and user-provided context. Messages include an `x-seq-no` header for correlating Basic.Return responses. Rate limiting is optional via `RateLimiter` parameter. The `ThrottlingRateLimiter` implementation uses progressive delays (0-1000ms) based on capacity usage, applying backpressure when available permits fall below a threshold (default 50%). The `basicPublish()` and `waitForConfirms()` methods throw `UnsupportedOperationException` on `ConfirmationChannel` to prevent mixing synchronous and asynchronous patterns. All other `Channel` methods delegate to the wrapped instance. Tests include 9 unit tests for `ThrottlingRateLimiter` and 24 integration tests for publisher confirmation tracking with context parameter verification, rate limiting scenarios, and error handling.
8112046 to
9a31acc
Compare
|
@lukebakken since this is a feature by most definitions, we now would have to go through a special approval process on our end :( |
The new `ConfirmationChannel` API introduced in rabbitmq/rabbitmq-java-client#1824 provides asynchronous publisher confirmation tracking with a `CompletableFuture`-based API, rate limiting, and message correlation support. This change adds `PublisherConfirmsAsync.java` to demonstrate the `ConfirmationChannel` API. The tutorial shows how to create a `ConfirmationChannel` wrapper with rate limiting, publish messages asynchronously with correlation context, and wait for all confirmations using `CompletableFuture.allOf()`. Depends on rabbitmq/rabbitmq-java-client#1824
Traditional publisher confirms in the Java client require manual
tracking of sequence numbers and correlation of Basic.Return messages.
This makes per-message error handling complex and provides no built-in
async pattern, backpressure mechanism, or message correlation support.
This change introduces
ConfirmationChannel, a wrapper that providesautomatic publisher confirmation tracking with a
CompletableFuture-based API, optional throttling, and generic contextparameter for message correlation. The implementation uses
listener-based integration with existing
Channelinstances, requiringno modifications to
ChannelN.New API components:
ConfirmationChannelinterface - ExtendsChanneland addsbasicPublishAsync()methods that returnCompletableFuture<T>ConfirmationChannelNimplementation - Wraps anyChannelinstanceand tracks confirmations via return/confirm/shutdown listeners
PublishException- Exception thrown when message is nack'd orreturned, with sequence number, routing details, and user context
The wrapper maintains independent sequence numbers using
AtomicLongand stores confirmation state in a
ConcurrentHashMap. Each entry holdsthe future, rate limiter permit, and user-provided context. Messages
include an
x-seq-noheader for correlating Basic.Return responses.Rate limiting is optional via
RateLimiterparameter. TheThrottlingRateLimiterimplementation uses progressive delays(0-1000ms) based on capacity usage, applying backpressure when available
permits fall below a threshold (default 50%).
The
basicPublish()andwaitForConfirms()methods throwUnsupportedOperationExceptiononConfirmationChannelto preventmixing synchronous and asynchronous patterns. All other
Channelmethods delegate to the wrapped instance.