diff --git a/basic/barrier/README.md b/basic/barrier/README.md
index 64795f23d..067499054 100644
--- a/basic/barrier/README.md
+++ b/basic/barrier/README.md
@@ -2,9 +2,8 @@ Barrier Sample
==============
This example demonstrates the use of a process barrier component to suspend a thread until some asynchronous operation
-completes. The first example uses an **HTTP Inbound Gateway**, splits the request, sends the splits to rabbitmq and then
-waits for
-the publisher confirms. Finally, the results are returned to the caller.
+completes. The first example uses an **HTTP Inbound Gateway**, splits the request, sends the splits to RabbitMQ and then
+waits for the publisher confirms. Finally, the results are returned to the caller.
The sample is a Spring Boot application that loads 2 contexts:
@@ -19,19 +18,35 @@ http -> splitter -> amqp
amqp(Acks) -> aggregator -> barrier (release)
-qmqp(inbound) -> nullChannel
+amqp(inbound) -> nullChannel
```
The last flow drains the messages and allows the auto-delete queue to be removed when the application is closed.
+## Configuration Options
+
+This sample supports both **XML-based** and **Java-based** Spring Integration configurations. You can select which configuration to use by setting the active profile in `application.properties`:
+
+* `spring.profiles.active=xml-config` - Uses XML configuration files from `META-INF/spring/integration/`
+* `spring.profiles.active=java-config` - Uses Java `@Configuration` classes from the `configuration` package
+
## Running the sample
+### Using Maven
+
+ $ mvn spring-boot:run
+
+Or with a specific profile:
+
+ $ mvn spring-boot:run -Dspring-boot.run.arguments=--spring.profiles.active=java-config
+
+### Using Gradle
$ gradlew :barrier:run
This will package the application and run it using the [Gradle Application Plugin](https://www.gradle.org/docs/current/userguide/application_plugin.html)
-#### Using an IDE such as SpringSource Tool Suite™ (STS)
+### Using an IDE such as SpringSource Tool Suite™ (STS)
In STS (Eclipse), go to package **org.springframework.integration.samples.barrier**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application).
@@ -52,8 +67,14 @@ An aggregator is used to aggregate the results; if there are no errors, the resu
errors occurred, an exception is sent to release the barrier; this is thrown to the caller and has all the consolidated
results in a property.
+#### Running the Error Handling Example
+
You can run this example from an IDE, such as STS using the technique above; in this case, the class is
-**ErrorHandlingApplication** in the **org.springframework.integration.samples.barrier** package.
+**ErrorHandlingApplication** in the **org.springframework.integration.samples.barrier2** package.
+
+Or using Maven:
+
+ $ mvn spring-boot:run -Dspring-boot.run.main-class=org.springframework.integration.samples.barrier2.ErrorHandlingApplication
It sends a list of integers to the flow:
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java
index c225dc42b..a18a2cab0 100644
--- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java
@@ -25,9 +25,12 @@
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;
+import org.springframework.core.env.Environment;
/**
* @author Gary Russell
+ * @author Glenn Renfro
+ *
* @since 4.2
*/
@SpringBootApplication
@@ -37,11 +40,22 @@ public class Application {
public static void main(String[] args) {
ConfigurableApplicationContext server = SpringApplication.run(Application.class, args);
- runClient(args);
- server.close();
+ Environment env = server.getEnvironment();
+
+ // Get configurationType via profile
+ String configurationType = env.getProperty("spring.profiles.active");
+ System.out.println("Configuration-Type: " + configurationType);
+ if (configurationType == null || configurationType.equals("xml-config")) {
+ runClientWithXMLConfig(args);
+ }
+ else {
+ RequestGateway requestGateway = server.getBean("requestGateway", RequestGateway.class);
+ echoMessage(requestGateway);
+ }
+ System.exit(0);
}
- static void runClient(String[] args) {
+ static void runClientWithXMLConfig(String[] args) {
SpringApplication application = new SpringApplicationBuilder()
.web(WebApplicationType.NONE)
.bannerMode(Mode.OFF)
@@ -52,11 +66,15 @@ static void runClient(String[] args) {
ConfigurableApplicationContext client = application.run(args);
RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class);
+ echoMessage(requestGateway);
+ client.close();
+ }
+
+ private static void echoMessage(RequestGateway requestGateway) {
String request = "A,B,C";
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);
System.out.println("\n\n++++++++++++ Replied with: " + reply + " ++++++++++++\n");
- client.close();
}
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java
index 7142f426e..09701c1a3 100644
--- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java
@@ -15,12 +15,16 @@
*/
package org.springframework.integration.samples.barrier;
+import org.springframework.integration.annotation.MessagingGateway;
/**
* @author Oleg Zhurakousky
* @author Gunnar Hillert
+ * @author Glenn Renfro
*
*/
+
+@MessagingGateway(defaultRequestChannel = "requestChannel")
public interface RequestGateway {
String echo(String request);
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java
new file mode 100644
index 000000000..9cadb71d9
--- /dev/null
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java
@@ -0,0 +1,66 @@
+package org.springframework.integration.samples.barrier.configuration;
+
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.http.HttpMethod;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.http.dsl.Http;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * Configure client-side Spring Integration flows for the Barrier pattern sample.
+ *
+ * Define the integration flow for sending HTTP requests to the server endpoint.
+ * Demonstrate how to configure an HTTP outbound gateway that sends POST requests
+ * and waits for responses.
+ *
+ * Activate this configuration using the "java-config" profile.
+ *
+ * @author Glenn Renfro
+ */
+@Configuration
+@Profile("java-config")
+public class ClientConfiguration {
+
+ /**
+ * The URL endpoint of the server's HTTP inbound gateway.
+ * Defaults to http://localhost:8080/postGateway if not configured.
+ */
+ @Value("${barrier.url:http://localhost:8080/postGateway}")
+ private String url;
+
+ /**
+ * Define the main client integration flow.
+ *
+ * Receive messages from the {@link #requestChannel()} and send them
+ * via HTTP POST to the configured server URL. Wait for and return
+ * the HTTP response as a String.
+ *
+ * @return the integration flow definition
+ */
+ @Bean
+ public IntegrationFlow clientFlow() {
+ return IntegrationFlow.from(requestChannel())
+ .handle(Http.outboundGateway(url)
+ .httpMethod(HttpMethod.POST)
+ .expectedResponseType(String.class))
+ .get();
+ }
+
+ /**
+ * Create the request channel that feeds messages into the client flow.
+ *
+ * Use a direct channel that provides point-to-point semantics,
+ * delivering messages to a single subscriber.
+ *
+ * @return a new DirectChannel instance
+ */
+ @Bean
+ public MessageChannel requestChannel() {
+ return new DirectChannel();
+ }
+
+}
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java
new file mode 100644
index 000000000..548923b9f
--- /dev/null
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java
@@ -0,0 +1,369 @@
+package org.springframework.integration.samples.barrier.configuration;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.ValueExpression;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.http.HttpMethod;
+import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy;
+import org.springframework.integration.amqp.dsl.Amqp;
+import org.springframework.integration.aggregator.BarrierMessageHandler;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.PublishSubscribeChannel;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.http.dsl.Http;
+import org.springframework.integration.samples.barrier.AckAggregator;
+import org.springframework.messaging.MessageChannel;
+
+/**
+ * Configure server-side Spring Integration flows demonstrating the Barrier pattern.
+ *
+ * Showcase an integration pattern where messages are:
+ *
+ *
Received via HTTP endpoints
+ *
Split into individual elements
+ *
Published to RabbitMQ with publisher confirmations
+ *
Held at a barrier until all confirmations are received
+ *
Released as an aggregated result
+ *
+ *
+ * Ensure that a message is only considered successfully processed when all of its
+ * split components have been confirmed by the message broker.
+ *
+ * Activate this configuration using the "java-config" profile.
+ *
+ * @author Glenn Renfro
+ */
+@Configuration
+@Profile("java-config")
+public class ServerConfiguration {
+
+ /**
+ * Create a channel for receiving incoming messages from HTTP endpoints.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel receiveChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a channel for payload messages in the GET gateway flow.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel createPayload() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a channel for broadcasting messages to multiple subscribers.
+ *
+ * Use a publish-subscribe channel to send messages to both the barrier flow
+ * and the AMQP outbound flow simultaneously.
+ *
+ * @return a PublishSubscribeChannel for broadcasting messages
+ */
+ @Bean
+ public MessageChannel processChannel() {
+ return new PublishSubscribeChannel();
+ }
+
+ /**
+ * Create a channel for receiving transformed messages from the barrier.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel transform() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a channel for receiving publisher confirmations (acks and nacks) from RabbitMQ.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel confirmations() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a channel for triggering the release of messages held at the barrier.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel release() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a channel for handling errors that occur during message processing.
+ *
+ * @return a DirectChannel for point-to-point message delivery
+ */
+ @Bean
+ public MessageChannel errorChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Define an HTTP POST endpoint for receiving requests from clients.
+ *
+ * Expose a REST endpoint at "/postGateway" that accepts POST requests
+ * and forwards them to the {@link #receiveChannel()} for processing.
+ *
+ * @return the integration flow for handling POST requests
+ */
+ @Bean
+ public IntegrationFlow postGatewayFlow() {
+ return IntegrationFlow.from(Http.inboundGateway("/postGateway")
+ .requestChannel(receiveChannel())
+ .errorChannel(errorChannel())
+ .requestMapping(m -> m.methods(HttpMethod.POST)))
+ .get();
+ }
+
+ /**
+ * Define an HTTP GET endpoint for creating a default payload.
+ *
+ * Expose a REST endpoint at "/getGateway" that accepts GET requests
+ * and creates a payload for testing purposes.
+ *
+ * @return the integration flow for handling GET requests
+ */
+ @Bean
+ public IntegrationFlow getGatewayFlow() {
+ return IntegrationFlow.from(Http.inboundGateway("/getGateway")
+ .requestChannel(createPayload())
+ .errorChannel(errorChannel())
+ .requestMapping(m -> m.methods(HttpMethod.GET)))
+ .get();
+ }
+
+ /**
+ * Define a flow to transform GET requests into a comma-separated payload.
+ *
+ * Generate the default payload "A,B,C" for GET requests
+ * and forward it to the receive channel for processing.
+ *
+ * @return the integration flow for transforming GET requests
+ */
+ @Bean
+ public IntegrationFlow transformGetFlow() {
+ return IntegrationFlow.from(createPayload())
+ .transform("'A,B,C'")
+ .channel(receiveChannel())
+ .get();
+ }
+
+ /**
+ * Define a flow to enrich message headers with correlation information.
+ *
+ * Add an "ackCorrelation" header containing the message ID to correlate
+ * publisher confirmations with the original message.
+ *
+ * @return the integration flow for header enrichment
+ */
+ @Bean
+ public IntegrationFlow headerEnricherFlow() {
+ return IntegrationFlow.from(receiveChannel())
+ .enrichHeaders(h -> h.header("ackCorrelation", "headers['id']"))
+ .channel(processChannel())
+ .get();
+ }
+
+ /**
+ * Define a flow to split messages and publish them to RabbitMQ with publisher confirmations.
+ *
+ * Perform the following steps:
+ *
+ *
Receive messages from the process channel
+ *
Filter out HTTP-specific headers
+ *
Split the payload by comma delimiter
+ *
Publish each element to RabbitMQ
+ *
Route confirmations (acks/nacks) to the confirmations channel
+ *
+ *
+ * @param rabbitTemplate the RabbitMQ template for publishing messages
+ * @return the integration flow for AMQP outbound processing
+ */
+ @Bean
+ public IntegrationFlow amqpOutboundFlow(RabbitTemplate rabbitTemplate) {
+ return IntegrationFlow.from(processChannel())
+ .headerFilter("content-type", "content-length")
+ .splitWith(s -> s.delimiters(","))
+ .handle(Amqp.outboundAdapter(rabbitTemplate)
+ .exchangeName("barrier.sample.exchange")
+ .routingKey("barrier.sample.key")
+ .confirmAckChannel(confirmations())
+ .confirmNackChannel(confirmations())
+ .returnChannel(errorChannel())
+ .confirmCorrelationExpression("#this"))
+ .get();
+ }
+
+ /**
+ * Create the barrier message handler that holds messages until conditions are met.
+ *
+ * Configure the barrier to wait for all split message parts to be confirmed by the broker
+ * before releasing the aggregated result. Use a 10-second timeout and correlate
+ * messages using the "ackCorrelation" header.
+ *
+ * @return the configured BarrierMessageHandler
+ */
+ @Bean
+ public BarrierMessageHandler barrier() {
+ BarrierMessageHandler handler = new BarrierMessageHandler(10000,
+ new ExpressionEvaluatingCorrelationStrategy(new ValueExpression<>("ackCorrelation")));
+ handler.setOutputChannelName("transform");
+ return handler;
+ }
+
+ /**
+ * Define a flow to send messages to the barrier where they wait for confirmations.
+ *
+ * Feed messages from the process channel into the barrier handler,
+ * where they are held until all corresponding confirmations are received.
+ *
+ * @return the integration flow for barrier processing
+ */
+ @Bean
+ public IntegrationFlow barrierFlow() {
+ return IntegrationFlow.from(processChannel())
+ .handle(barrier())
+ .get();
+ }
+
+ /**
+ * Define a flow to extract and transform the result from the barrier.
+ *
+ * Take the aggregated result from the barrier (an array containing
+ * the original message and confirmations) and extract the second element
+ * (index 1), which contains the confirmation data.
+ *
+ * @return the integration flow for transforming barrier output
+ */
+ @Bean
+ public IntegrationFlow transformFlow() {
+ return IntegrationFlow.from(transform())
+ .transform("payload[1]")
+ .get();
+ }
+
+ /**
+ * Define a flow to aggregate publisher confirmations and trigger barrier release.
+ *
+ * Perform the following steps:
+ *
+ *
Receive individual confirmations from RabbitMQ
+ *
Filter out framework headers
+ *
Aggregate confirmations by correlation ID
+ *
Send the aggregated result to trigger barrier release
+ *
+ *
+ * @return the integration flow for processing confirmations
+ */
+ @Bean
+ public IntegrationFlow confirmationsFlow() {
+ return IntegrationFlow.from(confirmations())
+ .headerFilter("replyChannel", "errorChannel")
+ .handle((payload, headers) -> payload)
+ .aggregate(a -> a.processor(new AckAggregator()))
+ .channel(release())
+ .get();
+ }
+
+ /**
+ * Define a flow to trigger the barrier to release held messages.
+ *
+ * Invoke the barrier's trigger method to release the corresponding
+ * message from the barrier when all confirmations for a message have been aggregated.
+ *
+ * @return the integration flow for barrier release
+ */
+ @Bean
+ public IntegrationFlow releaseFlow() {
+ return IntegrationFlow.from(release())
+ .handle(barrier(), "trigger")
+ .get();
+ }
+
+ /**
+ * Define a flow to consume messages from the RabbitMQ queue.
+ *
+ * Read messages from the queue and discard them to the null channel.
+ * In a production scenario, process the messages further as needed.
+ *
+ * @param rabbitConnectionFactory the RabbitMQ connection factory
+ * @return the integration flow for AMQP inbound processing
+ */
+ @Bean
+ public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory) {
+ return IntegrationFlow.from(Amqp.inboundAdapter(rabbitConnectionFactory, "barrier.sample.queue"))
+ .channel("nullChannel")
+ .get();
+ }
+
+ /**
+ * Create the RabbitMQ queue for receiving published messages.
+ *
+ * Configure the queue as:
+ *
+ *
Non-durable (not persisted to disk)
+ *
Non-exclusive (can be accessed by multiple connections)
+ *
Auto-delete (deleted when no longer in use)
+ *
+ *
+ * @return the configured Queue instance
+ */
+ @Bean
+ public Queue barrierSampleQueue() {
+ return new Queue("barrier.sample.queue", false, false, true);
+ }
+
+ /**
+ * Create the RabbitMQ exchange for routing messages.
+ *
+ * Configure the exchange as:
+ *
+ *
Direct exchange (routes based on exact routing key match)
+ *
Non-durable (not persisted to disk)
+ *
Auto-delete (deleted when no longer bound to any queues)
+ *
+ *
+ * @return the configured DirectExchange instance
+ */
+ @Bean
+ public DirectExchange barrierSampleExchange() {
+ return new DirectExchange("barrier.sample.exchange", false, true);
+ }
+
+ /**
+ * Bind the queue to the exchange with a routing key.
+ *
+ * Route messages published to the exchange with routing key "barrier.sample.key"
+ * to the barrier sample queue.
+ *
+ * @param barrierSampleQueue the queue to bind
+ * @param barrierSampleExchange the exchange to bind to
+ * @return the configured Binding instance
+ */
+ @Bean
+ public Binding barrierSampleBinding(Queue barrierSampleQueue, DirectExchange barrierSampleExchange) {
+ return BindingBuilder.bind(barrierSampleQueue)
+ .to(barrierSampleExchange)
+ .with("barrier.sample.key");
+ }
+
+}
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java
index a56156029..a9a6a8231 100644
--- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java
@@ -18,6 +18,7 @@
import java.util.Arrays;
+import org.springframework.boot.SpringApplication;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
@@ -26,6 +27,8 @@
/**
* @author Gary Russell
+ * @author Glenn Renfro
+ *
* @since 4.2
*/
@SpringBootApplication
@@ -41,7 +44,7 @@ public static void main(String[] args) {
gateway.process(Arrays.asList(2, 0, 2, 0, 2), "foo");
}
catch (Exception e) {
- System.err.println(e.toString());
+ System.err.println(e);
}
test.close();
}
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java
index ae4e6a6a3..d087de590 100644
--- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java
@@ -17,13 +17,19 @@
import java.util.Collection;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
+
/**
* @author Gary Russell
+ * @author Glenn Renfro
+ *
* @since 4.2
*
*/
public interface Gateway {
- public void process(Collection numbers, String correlationId);
+ void process(@Payload Collection numbers,
+ @Header("barrierCorrelation") String correlationId);
}
diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java
new file mode 100644
index 000000000..95030361d
--- /dev/null
+++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java
@@ -0,0 +1,259 @@
+package org.springframework.integration.samples.barrier2.configuration;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Profile;
+import org.springframework.integration.aggregator.BarrierMessageHandler;
+import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy;
+import org.springframework.integration.channel.DirectChannel;
+import org.springframework.integration.channel.PublishSubscribeChannel;
+import org.springframework.integration.channel.QueueChannel;
+import org.springframework.integration.dsl.IntegrationFlow;
+import org.springframework.integration.dsl.Pollers;
+import org.springframework.integration.samples.barrier2.Aggregator;
+import org.springframework.integration.samples.barrier2.Gateway;
+import org.springframework.messaging.MessageChannel;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+/**
+ * Configure Spring Integration flows demonstrating error handling with the Barrier pattern.
+ *
+ * This configuration implements an error-handling scenario where a gateway launches asynchronous
+ * tasks and waits for completion, aggregating both successful results and failures. The barrier
+ * pattern ensures the calling thread is suspended until all tasks complete, then returns either
+ * the aggregated results or throws an exception containing all errors.
+ *
+ * Flow Overview:
+ *
+ *
Gateway receives a collection of integers and a correlation ID
+ *
Messages are split into individual elements and sent to a queue
+ *
Polled messages are transformed (10 / payload), causing divide-by-zero for zero values
+ *
Successful results and errors are aggregated by correlation
+ *
Barrier is released when aggregation completes, returning to the caller
+ *
+ *
+ * Activate this configuration using the "java-config" profile in application.properties.
+ *
+ * @author Glenn Renfro
+ */
+@Configuration
+@Profile("java-config")
+public class ErrorHandlingConfiguration {
+
+ /**
+ * Create the main process channel for broadcasting messages to multiple subscribers.
+ *
+ * Use a publish-subscribe channel to send messages to both the splitter flow
+ * and the barrier flow simultaneously.
+ *
+ * @return a PublishSubscribeChannel for message broadcasting
+ */
+ @Bean
+ public MessageChannel processChannel() {
+ return new PublishSubscribeChannel();
+ }
+
+ /**
+ * Create a queue channel for buffering messages awaiting processing.
+ *
+ * Store split messages in this queue until the poller retrieves them for
+ * transformation.
+ *
+ * @return a QueueChannel for message buffering
+ */
+ @Bean
+ public QueueChannel process() {
+ return new QueueChannel();
+ }
+
+ /**
+ * Create the aggregator channel for receiving processed messages and errors.
+ *
+ * Direct both successful transformations and errors to this channel for
+ * aggregation by correlation ID.
+ *
+ * @return a DirectChannel for aggregator input
+ */
+ @Bean
+ public MessageChannel aggregatorChannel() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create the release channel for triggering barrier release.
+ *
+ * Send aggregated results to this channel to trigger the barrier handler's
+ * release mechanism, allowing the suspended thread to continue.
+ *
+ * @return a DirectChannel for barrier release signals
+ */
+ @Bean
+ public MessageChannel release() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create the errors channel for receiving transformation failures.
+ *
+ * Route exceptions from the transformation step to this channel for header
+ * enrichment before aggregation.
+ *
+ * @return a DirectChannel for error handling
+ */
+ @Bean
+ public MessageChannel errors() {
+ return new DirectChannel();
+ }
+
+ /**
+ * Create a thread pool executor for asynchronous message processing.
+ *
+ * Configure a task executor to handle polling and transformation operations
+ * on separate threads from the calling thread.
+ *
+ * @return a ThreadPoolTaskExecutor for async operations
+ */
+ @Bean
+ public ThreadPoolTaskExecutor exec() {
+ return new ThreadPoolTaskExecutor();
+ }
+
+ /**
+ * Define the gateway integration flow for receiving messages from the Gateway interface.
+ *
+ * Create a gateway proxy from the {@link Gateway} interface that routes incoming
+ * messages to the process channel for splitting and barrier processing.
+ *
+ * @return the integration flow for gateway message ingress
+ */
+ @Bean
+ public IntegrationFlow gatewayFlow() {
+ return IntegrationFlow.from(Gateway.class)
+ .channel(processChannel())
+ .get();
+ }
+
+ /**
+ * Define the splitter flow for dividing collections into individual messages.
+ *
+ * Split incoming message payloads into individual elements and send each element
+ * to the process queue channel for independent transformation.
+ *
+ * @return the integration flow for message splitting
+ */
+ @Bean
+ public IntegrationFlow splitterFlow() {
+ return IntegrationFlow.from(processChannel())
+ .split()
+ .channel(process())
+ .get();
+ }
+
+ /**
+ * Define the transform flow for processing messages with error handling.
+ *
+ * Poll messages from the queue every second using the configured task executor,
+ * transform each message using the expression "10 / payload" (causing ArithmeticException
+ * for zero values), and route results to the aggregator channel. Send any errors
+ * to the errors channel for special handling.
+ *
+ * @param exec the thread pool executor for polling operations
+ * @return the integration flow for message transformation
+ */
+ @Bean
+ public IntegrationFlow transformFlow(ThreadPoolTaskExecutor exec) {
+
+ @SuppressWarnings("unchecked")
+ org.springframework.integration.core.MessageSource