diff --git a/basic/barrier/README.md b/basic/barrier/README.md index 64795f23d..067499054 100644 --- a/basic/barrier/README.md +++ b/basic/barrier/README.md @@ -2,9 +2,8 @@ Barrier Sample ============== This example demonstrates the use of a process barrier component to suspend a thread until some asynchronous operation -completes. The first example uses an **HTTP Inbound Gateway**, splits the request, sends the splits to rabbitmq and then -waits for -the publisher confirms. Finally, the results are returned to the caller. +completes. The first example uses an **HTTP Inbound Gateway**, splits the request, sends the splits to RabbitMQ and then +waits for the publisher confirms. Finally, the results are returned to the caller. The sample is a Spring Boot application that loads 2 contexts: @@ -19,19 +18,35 @@ http -> splitter -> amqp amqp(Acks) -> aggregator -> barrier (release) -qmqp(inbound) -> nullChannel +amqp(inbound) -> nullChannel ``` The last flow drains the messages and allows the auto-delete queue to be removed when the application is closed. +## Configuration Options + +This sample supports both **XML-based** and **Java-based** Spring Integration configurations. You can select which configuration to use by setting the active profile in `application.properties`: + +* `spring.profiles.active=xml-config` - Uses XML configuration files from `META-INF/spring/integration/` +* `spring.profiles.active=java-config` - Uses Java `@Configuration` classes from the `configuration` package + ## Running the sample +### Using Maven + + $ mvn spring-boot:run + +Or with a specific profile: + + $ mvn spring-boot:run -Dspring-boot.run.arguments=--spring.profiles.active=java-config + +### Using Gradle $ gradlew :barrier:run This will package the application and run it using the [Gradle Application Plugin](https://www.gradle.org/docs/current/userguide/application_plugin.html) -#### Using an IDE such as SpringSource Tool Suite™ (STS) +### Using an IDE such as SpringSource Tool Suite™ (STS) In STS (Eclipse), go to package **org.springframework.integration.samples.barrier**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). @@ -52,8 +67,14 @@ An aggregator is used to aggregate the results; if there are no errors, the resu errors occurred, an exception is sent to release the barrier; this is thrown to the caller and has all the consolidated results in a property. +#### Running the Error Handling Example + You can run this example from an IDE, such as STS using the technique above; in this case, the class is -**ErrorHandlingApplication** in the **org.springframework.integration.samples.barrier** package. +**ErrorHandlingApplication** in the **org.springframework.integration.samples.barrier2** package. + +Or using Maven: + + $ mvn spring-boot:run -Dspring-boot.run.main-class=org.springframework.integration.samples.barrier2.ErrorHandlingApplication It sends a list of integers to the flow: diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java index c225dc42b..a18a2cab0 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/Application.java @@ -25,9 +25,12 @@ import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.ImportResource; +import org.springframework.core.env.Environment; /** * @author Gary Russell + * @author Glenn Renfro + * * @since 4.2 */ @SpringBootApplication @@ -37,11 +40,22 @@ public class Application { public static void main(String[] args) { ConfigurableApplicationContext server = SpringApplication.run(Application.class, args); - runClient(args); - server.close(); + Environment env = server.getEnvironment(); + + // Get configurationType via profile + String configurationType = env.getProperty("spring.profiles.active"); + System.out.println("Configuration-Type: " + configurationType); + if (configurationType == null || configurationType.equals("xml-config")) { + runClientWithXMLConfig(args); + } + else { + RequestGateway requestGateway = server.getBean("requestGateway", RequestGateway.class); + echoMessage(requestGateway); + } + System.exit(0); } - static void runClient(String[] args) { + static void runClientWithXMLConfig(String[] args) { SpringApplication application = new SpringApplicationBuilder() .web(WebApplicationType.NONE) .bannerMode(Mode.OFF) @@ -52,11 +66,15 @@ static void runClient(String[] args) { ConfigurableApplicationContext client = application.run(args); RequestGateway requestGateway = client.getBean("requestGateway", RequestGateway.class); + echoMessage(requestGateway); + client.close(); + } + + private static void echoMessage(RequestGateway requestGateway) { String request = "A,B,C"; System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n"); String reply = requestGateway.echo(request); System.out.println("\n\n++++++++++++ Replied with: " + reply + " ++++++++++++\n"); - client.close(); } diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java index 7142f426e..09701c1a3 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/RequestGateway.java @@ -15,12 +15,16 @@ */ package org.springframework.integration.samples.barrier; +import org.springframework.integration.annotation.MessagingGateway; /** * @author Oleg Zhurakousky * @author Gunnar Hillert + * @author Glenn Renfro * */ + +@MessagingGateway(defaultRequestChannel = "requestChannel") public interface RequestGateway { String echo(String request); diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java new file mode 100644 index 000000000..9cadb71d9 --- /dev/null +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ClientConfiguration.java @@ -0,0 +1,66 @@ +package org.springframework.integration.samples.barrier.configuration; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.http.HttpMethod; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.http.dsl.Http; +import org.springframework.messaging.MessageChannel; + +/** + * Configure client-side Spring Integration flows for the Barrier pattern sample. + *

+ * Define the integration flow for sending HTTP requests to the server endpoint. + * Demonstrate how to configure an HTTP outbound gateway that sends POST requests + * and waits for responses. + *

+ * Activate this configuration using the "java-config" profile. + * + * @author Glenn Renfro + */ +@Configuration +@Profile("java-config") +public class ClientConfiguration { + + /** + * The URL endpoint of the server's HTTP inbound gateway. + * Defaults to http://localhost:8080/postGateway if not configured. + */ + @Value("${barrier.url:http://localhost:8080/postGateway}") + private String url; + + /** + * Define the main client integration flow. + *

+ * Receive messages from the {@link #requestChannel()} and send them + * via HTTP POST to the configured server URL. Wait for and return + * the HTTP response as a String. + * + * @return the integration flow definition + */ + @Bean + public IntegrationFlow clientFlow() { + return IntegrationFlow.from(requestChannel()) + .handle(Http.outboundGateway(url) + .httpMethod(HttpMethod.POST) + .expectedResponseType(String.class)) + .get(); + } + + /** + * Create the request channel that feeds messages into the client flow. + *

+ * Use a direct channel that provides point-to-point semantics, + * delivering messages to a single subscriber. + * + * @return a new DirectChannel instance + */ + @Bean + public MessageChannel requestChannel() { + return new DirectChannel(); + } + +} diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java new file mode 100644 index 000000000..548923b9f --- /dev/null +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier/configuration/ServerConfiguration.java @@ -0,0 +1,369 @@ +package org.springframework.integration.samples.barrier.configuration; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.DirectExchange; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.rabbit.support.ValueExpression; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.http.HttpMethod; +import org.springframework.integration.aggregator.ExpressionEvaluatingCorrelationStrategy; +import org.springframework.integration.amqp.dsl.Amqp; +import org.springframework.integration.aggregator.BarrierMessageHandler; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.http.dsl.Http; +import org.springframework.integration.samples.barrier.AckAggregator; +import org.springframework.messaging.MessageChannel; + +/** + * Configure server-side Spring Integration flows demonstrating the Barrier pattern. + *

+ * Showcase an integration pattern where messages are: + *

    + *
  1. Received via HTTP endpoints
  2. + *
  3. Split into individual elements
  4. + *
  5. Published to RabbitMQ with publisher confirmations
  6. + *
  7. Held at a barrier until all confirmations are received
  8. + *
  9. Released as an aggregated result
  10. + *
+ *

+ * Ensure that a message is only considered successfully processed when all of its + * split components have been confirmed by the message broker. + *

+ * Activate this configuration using the "java-config" profile. + * + * @author Glenn Renfro + */ +@Configuration +@Profile("java-config") +public class ServerConfiguration { + + /** + * Create a channel for receiving incoming messages from HTTP endpoints. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel receiveChannel() { + return new DirectChannel(); + } + + /** + * Create a channel for payload messages in the GET gateway flow. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel createPayload() { + return new DirectChannel(); + } + + /** + * Create a channel for broadcasting messages to multiple subscribers. + *

+ * Use a publish-subscribe channel to send messages to both the barrier flow + * and the AMQP outbound flow simultaneously. + * + * @return a PublishSubscribeChannel for broadcasting messages + */ + @Bean + public MessageChannel processChannel() { + return new PublishSubscribeChannel(); + } + + /** + * Create a channel for receiving transformed messages from the barrier. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel transform() { + return new DirectChannel(); + } + + /** + * Create a channel for receiving publisher confirmations (acks and nacks) from RabbitMQ. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel confirmations() { + return new DirectChannel(); + } + + /** + * Create a channel for triggering the release of messages held at the barrier. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel release() { + return new DirectChannel(); + } + + /** + * Create a channel for handling errors that occur during message processing. + * + * @return a DirectChannel for point-to-point message delivery + */ + @Bean + public MessageChannel errorChannel() { + return new DirectChannel(); + } + + /** + * Define an HTTP POST endpoint for receiving requests from clients. + *

+ * Expose a REST endpoint at "/postGateway" that accepts POST requests + * and forwards them to the {@link #receiveChannel()} for processing. + * + * @return the integration flow for handling POST requests + */ + @Bean + public IntegrationFlow postGatewayFlow() { + return IntegrationFlow.from(Http.inboundGateway("/postGateway") + .requestChannel(receiveChannel()) + .errorChannel(errorChannel()) + .requestMapping(m -> m.methods(HttpMethod.POST))) + .get(); + } + + /** + * Define an HTTP GET endpoint for creating a default payload. + *

+ * Expose a REST endpoint at "/getGateway" that accepts GET requests + * and creates a payload for testing purposes. + * + * @return the integration flow for handling GET requests + */ + @Bean + public IntegrationFlow getGatewayFlow() { + return IntegrationFlow.from(Http.inboundGateway("/getGateway") + .requestChannel(createPayload()) + .errorChannel(errorChannel()) + .requestMapping(m -> m.methods(HttpMethod.GET))) + .get(); + } + + /** + * Define a flow to transform GET requests into a comma-separated payload. + *

+ * Generate the default payload "A,B,C" for GET requests + * and forward it to the receive channel for processing. + * + * @return the integration flow for transforming GET requests + */ + @Bean + public IntegrationFlow transformGetFlow() { + return IntegrationFlow.from(createPayload()) + .transform("'A,B,C'") + .channel(receiveChannel()) + .get(); + } + + /** + * Define a flow to enrich message headers with correlation information. + *

+ * Add an "ackCorrelation" header containing the message ID to correlate + * publisher confirmations with the original message. + * + * @return the integration flow for header enrichment + */ + @Bean + public IntegrationFlow headerEnricherFlow() { + return IntegrationFlow.from(receiveChannel()) + .enrichHeaders(h -> h.header("ackCorrelation", "headers['id']")) + .channel(processChannel()) + .get(); + } + + /** + * Define a flow to split messages and publish them to RabbitMQ with publisher confirmations. + *

+ * Perform the following steps: + *

    + *
  1. Receive messages from the process channel
  2. + *
  3. Filter out HTTP-specific headers
  4. + *
  5. Split the payload by comma delimiter
  6. + *
  7. Publish each element to RabbitMQ
  8. + *
  9. Route confirmations (acks/nacks) to the confirmations channel
  10. + *
+ * + * @param rabbitTemplate the RabbitMQ template for publishing messages + * @return the integration flow for AMQP outbound processing + */ + @Bean + public IntegrationFlow amqpOutboundFlow(RabbitTemplate rabbitTemplate) { + return IntegrationFlow.from(processChannel()) + .headerFilter("content-type", "content-length") + .splitWith(s -> s.delimiters(",")) + .handle(Amqp.outboundAdapter(rabbitTemplate) + .exchangeName("barrier.sample.exchange") + .routingKey("barrier.sample.key") + .confirmAckChannel(confirmations()) + .confirmNackChannel(confirmations()) + .returnChannel(errorChannel()) + .confirmCorrelationExpression("#this")) + .get(); + } + + /** + * Create the barrier message handler that holds messages until conditions are met. + *

+ * Configure the barrier to wait for all split message parts to be confirmed by the broker + * before releasing the aggregated result. Use a 10-second timeout and correlate + * messages using the "ackCorrelation" header. + * + * @return the configured BarrierMessageHandler + */ + @Bean + public BarrierMessageHandler barrier() { + BarrierMessageHandler handler = new BarrierMessageHandler(10000, + new ExpressionEvaluatingCorrelationStrategy(new ValueExpression<>("ackCorrelation"))); + handler.setOutputChannelName("transform"); + return handler; + } + + /** + * Define a flow to send messages to the barrier where they wait for confirmations. + *

+ * Feed messages from the process channel into the barrier handler, + * where they are held until all corresponding confirmations are received. + * + * @return the integration flow for barrier processing + */ + @Bean + public IntegrationFlow barrierFlow() { + return IntegrationFlow.from(processChannel()) + .handle(barrier()) + .get(); + } + + /** + * Define a flow to extract and transform the result from the barrier. + *

+ * Take the aggregated result from the barrier (an array containing + * the original message and confirmations) and extract the second element + * (index 1), which contains the confirmation data. + * + * @return the integration flow for transforming barrier output + */ + @Bean + public IntegrationFlow transformFlow() { + return IntegrationFlow.from(transform()) + .transform("payload[1]") + .get(); + } + + /** + * Define a flow to aggregate publisher confirmations and trigger barrier release. + *

+ * Perform the following steps: + *

    + *
  1. Receive individual confirmations from RabbitMQ
  2. + *
  3. Filter out framework headers
  4. + *
  5. Aggregate confirmations by correlation ID
  6. + *
  7. Send the aggregated result to trigger barrier release
  8. + *
+ * + * @return the integration flow for processing confirmations + */ + @Bean + public IntegrationFlow confirmationsFlow() { + return IntegrationFlow.from(confirmations()) + .headerFilter("replyChannel", "errorChannel") + .handle((payload, headers) -> payload) + .aggregate(a -> a.processor(new AckAggregator())) + .channel(release()) + .get(); + } + + /** + * Define a flow to trigger the barrier to release held messages. + *

+ * Invoke the barrier's trigger method to release the corresponding + * message from the barrier when all confirmations for a message have been aggregated. + * + * @return the integration flow for barrier release + */ + @Bean + public IntegrationFlow releaseFlow() { + return IntegrationFlow.from(release()) + .handle(barrier(), "trigger") + .get(); + } + + /** + * Define a flow to consume messages from the RabbitMQ queue. + *

+ * Read messages from the queue and discard them to the null channel. + * In a production scenario, process the messages further as needed. + * + * @param rabbitConnectionFactory the RabbitMQ connection factory + * @return the integration flow for AMQP inbound processing + */ + @Bean + public IntegrationFlow amqpInboundFlow(ConnectionFactory rabbitConnectionFactory) { + return IntegrationFlow.from(Amqp.inboundAdapter(rabbitConnectionFactory, "barrier.sample.queue")) + .channel("nullChannel") + .get(); + } + + /** + * Create the RabbitMQ queue for receiving published messages. + *

+ * Configure the queue as: + *

+ * + * @return the configured Queue instance + */ + @Bean + public Queue barrierSampleQueue() { + return new Queue("barrier.sample.queue", false, false, true); + } + + /** + * Create the RabbitMQ exchange for routing messages. + *

+ * Configure the exchange as: + *

+ * + * @return the configured DirectExchange instance + */ + @Bean + public DirectExchange barrierSampleExchange() { + return new DirectExchange("barrier.sample.exchange", false, true); + } + + /** + * Bind the queue to the exchange with a routing key. + *

+ * Route messages published to the exchange with routing key "barrier.sample.key" + * to the barrier sample queue. + * + * @param barrierSampleQueue the queue to bind + * @param barrierSampleExchange the exchange to bind to + * @return the configured Binding instance + */ + @Bean + public Binding barrierSampleBinding(Queue barrierSampleQueue, DirectExchange barrierSampleExchange) { + return BindingBuilder.bind(barrierSampleQueue) + .to(barrierSampleExchange) + .with("barrier.sample.key"); + } + +} diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java index a56156029..a9a6a8231 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/ErrorHandlingApplication.java @@ -18,6 +18,7 @@ import java.util.Arrays; +import org.springframework.boot.SpringApplication; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; @@ -26,6 +27,8 @@ /** * @author Gary Russell + * @author Glenn Renfro + * * @since 4.2 */ @SpringBootApplication @@ -41,7 +44,7 @@ public static void main(String[] args) { gateway.process(Arrays.asList(2, 0, 2, 0, 2), "foo"); } catch (Exception e) { - System.err.println(e.toString()); + System.err.println(e); } test.close(); } diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java index ae4e6a6a3..d087de590 100644 --- a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/Gateway.java @@ -17,13 +17,19 @@ import java.util.Collection; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; + /** * @author Gary Russell + * @author Glenn Renfro + * * @since 4.2 * */ public interface Gateway { - public void process(Collection numbers, String correlationId); + void process(@Payload Collection numbers, + @Header("barrierCorrelation") String correlationId); } diff --git a/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java new file mode 100644 index 000000000..95030361d --- /dev/null +++ b/basic/barrier/src/main/java/org/springframework/integration/samples/barrier2/configuration/ErrorHandlingConfiguration.java @@ -0,0 +1,259 @@ +package org.springframework.integration.samples.barrier2.configuration; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Profile; +import org.springframework.integration.aggregator.BarrierMessageHandler; +import org.springframework.integration.aggregator.HeaderAttributeCorrelationStrategy; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.PublishSubscribeChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.dsl.Pollers; +import org.springframework.integration.samples.barrier2.Aggregator; +import org.springframework.integration.samples.barrier2.Gateway; +import org.springframework.messaging.MessageChannel; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +/** + * Configure Spring Integration flows demonstrating error handling with the Barrier pattern. + *

+ * This configuration implements an error-handling scenario where a gateway launches asynchronous + * tasks and waits for completion, aggregating both successful results and failures. The barrier + * pattern ensures the calling thread is suspended until all tasks complete, then returns either + * the aggregated results or throws an exception containing all errors. + *

+ * Flow Overview: + *

    + *
  1. Gateway receives a collection of integers and a correlation ID
  2. + *
  3. Messages are split into individual elements and sent to a queue
  4. + *
  5. Polled messages are transformed (10 / payload), causing divide-by-zero for zero values
  6. + *
  7. Successful results and errors are aggregated by correlation
  8. + *
  9. Barrier is released when aggregation completes, returning to the caller
  10. + *
+ *

+ * Activate this configuration using the "java-config" profile in application.properties. + * + * @author Glenn Renfro + */ +@Configuration +@Profile("java-config") +public class ErrorHandlingConfiguration { + + /** + * Create the main process channel for broadcasting messages to multiple subscribers. + *

+ * Use a publish-subscribe channel to send messages to both the splitter flow + * and the barrier flow simultaneously. + * + * @return a PublishSubscribeChannel for message broadcasting + */ + @Bean + public MessageChannel processChannel() { + return new PublishSubscribeChannel(); + } + + /** + * Create a queue channel for buffering messages awaiting processing. + *

+ * Store split messages in this queue until the poller retrieves them for + * transformation. + * + * @return a QueueChannel for message buffering + */ + @Bean + public QueueChannel process() { + return new QueueChannel(); + } + + /** + * Create the aggregator channel for receiving processed messages and errors. + *

+ * Direct both successful transformations and errors to this channel for + * aggregation by correlation ID. + * + * @return a DirectChannel for aggregator input + */ + @Bean + public MessageChannel aggregatorChannel() { + return new DirectChannel(); + } + + /** + * Create the release channel for triggering barrier release. + *

+ * Send aggregated results to this channel to trigger the barrier handler's + * release mechanism, allowing the suspended thread to continue. + * + * @return a DirectChannel for barrier release signals + */ + @Bean + public MessageChannel release() { + return new DirectChannel(); + } + + /** + * Create the errors channel for receiving transformation failures. + *

+ * Route exceptions from the transformation step to this channel for header + * enrichment before aggregation. + * + * @return a DirectChannel for error handling + */ + @Bean + public MessageChannel errors() { + return new DirectChannel(); + } + + /** + * Create a thread pool executor for asynchronous message processing. + *

+ * Configure a task executor to handle polling and transformation operations + * on separate threads from the calling thread. + * + * @return a ThreadPoolTaskExecutor for async operations + */ + @Bean + public ThreadPoolTaskExecutor exec() { + return new ThreadPoolTaskExecutor(); + } + + /** + * Define the gateway integration flow for receiving messages from the Gateway interface. + *

+ * Create a gateway proxy from the {@link Gateway} interface that routes incoming + * messages to the process channel for splitting and barrier processing. + * + * @return the integration flow for gateway message ingress + */ + @Bean + public IntegrationFlow gatewayFlow() { + return IntegrationFlow.from(Gateway.class) + .channel(processChannel()) + .get(); + } + + /** + * Define the splitter flow for dividing collections into individual messages. + *

+ * Split incoming message payloads into individual elements and send each element + * to the process queue channel for independent transformation. + * + * @return the integration flow for message splitting + */ + @Bean + public IntegrationFlow splitterFlow() { + return IntegrationFlow.from(processChannel()) + .split() + .channel(process()) + .get(); + } + + /** + * Define the transform flow for processing messages with error handling. + *

+ * Poll messages from the queue every second using the configured task executor, + * transform each message using the expression "10 / payload" (causing ArithmeticException + * for zero values), and route results to the aggregator channel. Send any errors + * to the errors channel for special handling. + * + * @param exec the thread pool executor for polling operations + * @return the integration flow for message transformation + */ + @Bean + public IntegrationFlow transformFlow(ThreadPoolTaskExecutor exec) { + + @SuppressWarnings("unchecked") + org.springframework.integration.core.MessageSource messageSource = + () -> (org.springframework.messaging.Message) process().receive(1000); + return IntegrationFlow.from(messageSource, e -> e.poller(Pollers.fixedDelay(1000) + .errorChannel(errors()) + .taskExecutor(exec))) + .transform("10 / payload") + .channel(aggregatorChannel()) + .get(); + } + + /** + * Create the barrier message handler for suspending the calling thread. + *

+ * Configure a barrier with a 10-second timeout that correlates messages using + * the "barrierCorrelation" header. Suspend the gateway caller until the barrier + * is triggered by the release flow. + * + * @return the configured BarrierMessageHandler + */ + @Bean + public BarrierMessageHandler errorBarrier() { + return new BarrierMessageHandler(10000, new HeaderAttributeCorrelationStrategy("barrierCorrelation")); + } + + /** + * Define the barrier flow for suspending the calling thread. + *

+ * Send messages from the process channel to the barrier handler, which suspends + * the thread until the barrier is released by the release flow. + * + * @return the integration flow for barrier processing + */ + @Bean + public IntegrationFlow barrierFlow() { + return IntegrationFlow.from(processChannel()) + .handle(errorBarrier()) + .get(); + } + + /** + * Define the aggregator flow for combining results and errors. + *

+ * Aggregate messages from the aggregator channel using a custom aggregator + * that consolidates both successful results and exceptions. Send the aggregated + * result to the release channel to trigger barrier release. + * + * @return the integration flow for message aggregation + */ + @Bean + public IntegrationFlow aggregatorFlow() { + return IntegrationFlow.from(aggregatorChannel()) + .aggregate(a -> a.processor(new Aggregator())) + .channel(release()) + .get(); + } + + /** + * Define the release flow for triggering barrier release. + *

+ * Send aggregated results from the release channel to the barrier's trigger method, + * which releases the suspended thread and returns the result to the gateway caller. + * + * @return the integration flow for barrier release + */ + @Bean + public IntegrationFlow releaseFlow() { + return IntegrationFlow.from(release()) + .handle(errorBarrier(), "trigger") + .get(); + } + + /** + * Define the error flow for handling transformation failures. + *

+ * Enrich error messages with correlation headers extracted from the failed message, + * then route them to the aggregator channel for consolidation with successful results. + * Restore the correlationId, sequenceSize, and sequenceNumber headers to enable proper + * aggregation of error messages alongside successful transformations. + * + * @return the integration flow for error processing + */ + @Bean + public IntegrationFlow errorFlow() { + return IntegrationFlow.from(errors()) + .enrichHeaders(h -> h + .headerExpression("correlationId", "payload.failedMessage.headers.correlationId") + .headerExpression("sequenceSize", "payload.failedMessage.headers.sequenceSize") + .headerExpression("sequenceNumber", "payload.failedMessage.headers.sequenceNumber")) + .channel(aggregatorChannel()) + .get(); + } + +} diff --git a/basic/barrier/src/main/resources/META-INF/spring/integration/client-context.xml b/basic/barrier/src/main/resources/META-INF/spring/integration/client-context.xml index 5a963d8bb..f779c61a2 100644 --- a/basic/barrier/src/main/resources/META-INF/spring/integration/client-context.xml +++ b/basic/barrier/src/main/resources/META-INF/spring/integration/client-context.xml @@ -5,7 +5,8 @@ xmlns:int-http="http://www.springframework.org/schema/integration/http" xsi:schemaLocation="http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd - http://www.springframework.org/schema/integration/http https://www.springframework.org/schema/integration/http/spring-integration-http.xsd"> + http://www.springframework.org/schema/integration/http https://www.springframework.org/schema/integration/http/spring-integration-http.xsd" + profile="xml-config"> - - + diff --git a/basic/barrier/src/main/resources/META-INF/spring/integration/errorhandling-context.xml b/basic/barrier/src/main/resources/META-INF/spring/integration/errorhandling-context.xml index 4428e73e7..dda701675 100644 --- a/basic/barrier/src/main/resources/META-INF/spring/integration/errorhandling-context.xml +++ b/basic/barrier/src/main/resources/META-INF/spring/integration/errorhandling-context.xml @@ -2,22 +2,18 @@ + http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd" + profile="xml-config"> - - - + - + @@ -33,7 +29,7 @@ @@ -55,5 +51,6 @@ + diff --git a/basic/barrier/src/main/resources/META-INF/spring/integration/server-context.xml b/basic/barrier/src/main/resources/META-INF/spring/integration/server-context.xml index ee1039288..ae9f6f545 100644 --- a/basic/barrier/src/main/resources/META-INF/spring/integration/server-context.xml +++ b/basic/barrier/src/main/resources/META-INF/spring/integration/server-context.xml @@ -7,10 +7,10 @@ xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/integration/amqp https://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd - http://www.springframework.org/schema/integration/file https://www.springframework.org/schema/integration/file/spring-integration-file.xsd http://www.springframework.org/schema/rabbit https://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/integration/http https://www.springframework.org/schema/integration/http/spring-integration-http.xsd - http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd"> + http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd" + profile="xml-config"> + * This configuration demonstrates the Control Bus component functionality, + * which uses SpEL expressions to control Spring Integration endpoints + * (e.g., start/stop an inbound adapter). + *

+ * Activate this configuration using the "java-config" profile. + *

+ * This configuration is functionally equivalent to {@code ControlBusDemo-context.xml} + * and defines: + *

    + *
  • A control channel for sending control messages
  • + *
  • An output channel (queue-based) for the inbound adapter
  • + *
  • A control bus endpoint that processes control messages
  • + *
  • An inbound channel adapter that generates "Hello" messages at a fixed rate
  • + *
+ * + * @author Glenn Renfro + */ +@Configuration +@EnableIntegration +@Profile("java-config") +public class ControlBusConfiguration { + + /** + * Create a direct channel for sending control messages to the control bus. + *

+ * This channel is used to send SpEL expressions that control Spring Integration + * endpoints (e.g., {@code @inboundAdapter.start()} or {@code @inboundAdapter.stop()}). + * + * @return a DirectChannel for control messages + */ + @Bean + public MessageChannel controlChannel() { + return new DirectChannel(); + } + + /** + * Create a queue-based channel for receiving messages from the inbound adapter. + *

+ * This channel uses a queue to allow asynchronous message consumption. + * + * @return a QueueChannel for adapter output + */ + @Bean + public MessageChannel adapterOutputChannel() { + return new QueueChannel(); + } + + /** + * Create an integration flow that connects the control channel to the control bus. + *

+ * Messages sent to the control channel are processed by the control bus, + * which evaluates SpEL expressions to control Spring Integration endpoints. + * + * @param controlChannel input channel for control commands + * @return an IntegrationFlow connecting controlChannel to controlBus + */ + @Bean + public IntegrationFlow controlBusFlow(@Qualifier("controlChannel") MessageChannel controlChannel) { + return IntegrationFlow.from(controlChannel) + .controlBus() + .get(); + } + + /** + * Create an inbound channel adapter that generates messages. + *

+ * The adapter: + *

    + *
  • Generates the string "Hello" as the payload
  • + *
  • Sends messages to the adapterOutputChannel
  • + *
  • Starts with auto-startup disabled (must be started via control bus)
  • + *
  • Polls at a fixed rate of 1000ms
  • + *
+ * + * @return an IntegrationFlow representing the inbound adapter + */ + @Bean + public IntegrationFlow inboundAdapter() { + return IntegrationFlow.fromSupplier(() -> "Hello", + c -> c.id("inboundAdapter") + .autoStartup(false) + .poller(Pollers.fixedRate(1000))) + .channel(adapterOutputChannel()) + .get(); + } + +} + diff --git a/basic/control-bus/src/main/resources/META-INF/spring/integration/ControlBusDemo-context.xml b/basic/control-bus/src/main/resources/META-INF/spring/integration/ControlBusDemo-context.xml index 01b622890..9f513ac94 100644 --- a/basic/control-bus/src/main/resources/META-INF/spring/integration/ControlBusDemo-context.xml +++ b/basic/control-bus/src/main/resources/META-INF/spring/integration/ControlBusDemo-context.xml @@ -7,14 +7,14 @@ - + diff --git a/basic/control-bus/src/main/resources/application.properties b/basic/control-bus/src/main/resources/application.properties new file mode 100644 index 000000000..62bc0f301 --- /dev/null +++ b/basic/control-bus/src/main/resources/application.properties @@ -0,0 +1,2 @@ +spring.profiles.active=java-config + diff --git a/basic/control-bus/src/test/java/org/springframework/integration/samples/controlbus/ControlBusDemoTest.java b/basic/control-bus/src/test/java/org/springframework/integration/samples/controlbus/ControlBusDemoTest.java index 34c7ff626..a03e6f73f 100644 --- a/basic/control-bus/src/test/java/org/springframework/integration/samples/controlbus/ControlBusDemoTest.java +++ b/basic/control-bus/src/test/java/org/springframework/integration/samples/controlbus/ControlBusDemoTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2017-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,7 +20,9 @@ import org.junit.jupiter.api.Test; import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.core.env.StandardEnvironment; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.PollableChannel; import org.springframework.messaging.support.GenericMessage; @@ -28,6 +30,7 @@ /** * @author Oleg Zhurakousky * @author Gary Russell + * @author Glenn Renfro * */ public class ControlBusDemoTest { @@ -35,16 +38,31 @@ public class ControlBusDemoTest { private static Log logger = LogFactory.getLog(ControlBusDemoTest.class); @Test - public void demoControlBus(){ + public void demoControlBusWithJavaConfig() { + AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(); + ac.setEnvironment(new StandardEnvironment()); + ac.getEnvironment().setActiveProfiles("java-config"); + ac.scan("org.springframework.integration.samples.controlbus.config"); + ac.refresh(); + runControlBusDemo(ac); + ac.close(); + } + + @Test + public void demoControlBusWithXmlConfig() { ConfigurableApplicationContext ac = new ClassPathXmlApplicationContext( "/META-INF/spring/integration/ControlBusDemo-context.xml"); + runControlBusDemo(ac); + ac.close(); + } + + private void runControlBusDemo(ConfigurableApplicationContext ac) { MessageChannel controlChannel = ac.getBean("controlChannel", MessageChannel.class); - PollableChannel adapterOutputChanel = ac.getBean("adapterOutputChanel", PollableChannel.class); - logger.info("Received before adapter started: " + adapterOutputChanel.receive(1000)); + PollableChannel adapterOutputChannel = ac.getBean("adapterOutputChannel", PollableChannel.class); + logger.info("Received before adapter started: " + adapterOutputChannel.receive(1000)); controlChannel.send(new GenericMessage("@inboundAdapter.start()")); - logger.info("Received before adapter started: " + adapterOutputChanel.receive(1000)); + logger.info("Received before adapter started: " + adapterOutputChannel.receive(1000)); controlChannel.send(new GenericMessage("@inboundAdapter.stop()")); - logger.info("Received after adapter stopped: " + adapterOutputChanel.receive(1000)); - ac.close(); + logger.info("Received after adapter stopped: " + adapterOutputChannel.receive(1000)); } } diff --git a/basic/enricher/README.md b/basic/enricher/README.md index 03de82e3e..5d4249378 100644 --- a/basic/enricher/README.md +++ b/basic/enricher/README.md @@ -3,20 +3,53 @@ Spring Integration - Enricher Sample # Overview -This sample demonstrates how the Enricher components can be used. +This sample demonstrates how the Content Enricher pattern can be used in Spring Integration. The Content Enricher allows you to augment a message with additional data from an external source without requiring the original sender to know about the enrichment process. + +This sample supports both **XML-based** and **Java-based** Spring Integration configurations. You can select which configuration to use by setting the active Spring profile. + +## Configuration Options + +This sample supports two configuration modes: + +* **Java Configuration** (`java-config` profile) - Uses Java `@Configuration` classes from the `config` package. This is the **default** mode. +* **XML Configuration** (`xml-config` profile) - Uses XML configuration files from `META-INF/spring/integration/` + +### Profile Selection + +Only one configuration profile should be active at a time. The `java-config` and `xml-config` profiles are mutually exclusive. + +* If no profile is explicitly provided, `java-config` is used by default (as configured in `application.properties`). +* To use XML configuration, explicitly activate the `xml-config` profile at runtime. # Getting Started -You can run the sample application by either +## Using Java Configuration (Default) -* running the "Main" class from within STS (Right-click on Main class --> Run As --> Java Application) -* or from the command line execute: +The sample uses Java configuration by default. You can run the sample application by either: + +* Running the "Main" class from within your IDE (Right-click on Main class --> Run As --> Java Application) +* Or from the command line execute: $ gradlew :enricher:run -This example illustrates the usage of the Content Enricher. +In this mode, the application context is defined using Spring Integration Java configuration classes instead of XML. The configuration classes are located in the `org.springframework.integration.samples.enricher.config` package. + +## Using XML Configuration + +To run the sample with XML configuration, activate the `xml-config` profile: + +* From your IDE, set the system property: `-Dspring.profiles.active=xml-config` +* Or from the command line: + + $ gradlew :enricher:run -Dspring.profiles.active=xml-config + +This mode uses the legacy XML-based wiring from `spring-integration-context.xml`. + +# How It Works + +This example illustrates the usage of the Content Enricher pattern. -Once the application has started, please execute the various Content Enricher examples by +Once the application has started, please execute the various Content Enricher examples by: * entering 1 + Enter * entering 2 + Enter @@ -24,13 +57,19 @@ Once the application has started, please execute the various Content Enricher ex 3 different message flows are triggered. For use-cases 1+2 a **User** object containing only the **username** is passed in. For use-case 3 a Map with the **username** key is passed in and enriched with the **User** object using the **user** key: -* 1: In the *Enricher*, pass the full **User** object to the **request channel**. -* 2: In the *Enricher*, pass only the **username** to the **request channel** by using the **request-payload-expression** attribute. -* 3: In the *Enricher*, pass only the username to the **request channel**, executing the same Service Activator as in **2**. +* **1**: In the *Enricher*, pass the full **User** object to the **request channel**. +* **2**: In the *Enricher*, pass only the **username** to the **request channel** by using the **request-payload-expression** attribute. +* **3**: In the *Enricher*, pass only the username to the **request channel**, executing the same Service Activator as in **2**. + +## About This Pattern + +Spring Integration is moving toward Java configuration as the primary style for new development, while XML configuration remains available for users who prefer that style or are migrating older systems. This sample demonstrates how both approaches can coexist in the same project using Spring profiles to select the desired configuration mode at runtime. + +This pattern (Java first, XML as an alternative) is being applied across other samples in this repository, using `basic/barrier` as a reference model. # Resources For help please take a look at the Spring Integration documentation: -https://www.springsource.org/spring-integration +https://docs.spring.io/spring-integration/reference/ diff --git a/basic/enricher/src/main/java/org/springframework/integration/samples/enricher/Main.java b/basic/enricher/src/main/java/org/springframework/integration/samples/enricher/Main.java index 218b5bdaa..4971d7e80 100644 --- a/basic/enricher/src/main/java/org/springframework/integration/samples/enricher/Main.java +++ b/basic/enricher/src/main/java/org/springframework/integration/samples/enricher/Main.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2017 the original author or authors. + * Copyright 2011-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,17 +22,27 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.AbstractApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.core.env.StandardEnvironment; import org.springframework.integration.samples.enricher.domain.User; import org.springframework.integration.samples.enricher.service.UserService; /** * Starts the Spring Context and will initialize the Spring Integration routes. + *

+ * This application supports both Java-based and XML-based Spring Integration + * configurations. The configuration mode is selected via Spring profiles: + *

    + *
  • java-config (default) - Uses Java configuration classes
  • + *
  • xml-config - Uses XML configuration files
  • + *
* * @author Gunnar Hillert * @author Gary Russell + * @author Glenn Renfro * @version 1.0 * */ @@ -62,8 +72,23 @@ public static void main(final String... args) { + EMPTY_LINE + LINE_SEPARATOR ); - final AbstractApplicationContext context = - new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml"); + // Determine which profile to use + String profile = System.getProperty("spring.profiles.active", "java-config"); + + final AbstractApplicationContext context; + + if ("xml-config".equals(profile)) { + LOGGER.info("\n\n Using XML-based configuration\n\n"); + context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/integration/*-context.xml"); + } else { + LOGGER.info("\n\n Using Java-based configuration\n\n"); + AnnotationConfigApplicationContext annotationContext = new AnnotationConfigApplicationContext(); + annotationContext.setEnvironment(new StandardEnvironment()); + annotationContext.getEnvironment().setActiveProfiles("java-config"); + annotationContext.scan("org.springframework.integration.samples.enricher.config"); + annotationContext.refresh(); + context = annotationContext; + } context.registerShutdownHook(); diff --git a/basic/enricher/src/test/java/org/springframework/integration/samples/enricher/service/UserServiceTest.java b/basic/enricher/src/test/java/org/springframework/integration/samples/enricher/service/UserServiceTest.java index b092df6fc..3e9e8b94d 100644 --- a/basic/enricher/src/test/java/org/springframework/integration/samples/enricher/service/UserServiceTest.java +++ b/basic/enricher/src/test/java/org/springframework/integration/samples/enricher/service/UserServiceTest.java @@ -1,5 +1,5 @@ /* - * Copyright 2002-2010 the original author or authors. + * Copyright 2011-present the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,18 +18,30 @@ import org.junit.jupiter.api.Test; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.integration.samples.enricher.domain.User; import static org.assertj.core.api.Assertions.assertThat; /** - * Verify that the Spring Integration Application Context starts successfully. + * Verify that the Spring Integration Application Context starts successfully + * with both Java and XML configurations. + * + * @author Glenn Renfro */ public class UserServiceTest { @Test - public void testStartupOfSpringIntegrationContext() throws Exception { + public void testStartupOfSpringIntegrationContextWithJavaConfig() throws Exception { + AnnotationConfigApplicationContext context = createJavaConfigContext(); + Thread.sleep(2000); + context.close(); + } + + @Test + public void testStartupOfSpringIntegrationContextWithXmlConfig() throws Exception { final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "/META-INF/spring/integration/spring-integration-context.xml", UserServiceTest.class); Thread.sleep(2000); @@ -37,11 +49,40 @@ public void testStartupOfSpringIntegrationContext() throws Exception { } @Test - public void testExecuteFindUser() { + public void testExecuteFindUserWithJavaConfig() { + AnnotationConfigApplicationContext context = createJavaConfigContext(); + + runFindUserTest(context); + context.close(); + } + + @Test + public void testExecuteFindUserWithXmlConfig() { + final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( + "/META-INF/spring/integration/spring-integration-context.xml", UserServiceTest.class); + + runFindUserTest(context); + context.close(); + } + + @Test + public void testExecuteFindUserByUsernameWithJavaConfig() { + AnnotationConfigApplicationContext context = createJavaConfigContext(); + runFindUserByUsernameTest(context); + context.close(); + } + + @Test + public void testExecuteFindUserByUsernameWithXmlConfig() { final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( "/META-INF/spring/integration/spring-integration-context.xml", UserServiceTest.class); + runFindUserByUsernameTest(context); + context.close(); + } + + private void runFindUserTest(ConfigurableApplicationContext context) { final UserService service = context.getBean(UserService.class); User user = new User("foo", null, null); @@ -50,15 +91,9 @@ public void testExecuteFindUser() { assertThat(fullUser.getUsername()).isEqualTo("foo"); assertThat(fullUser.getEmail()).isEqualTo("foo@springintegration.org"); assertThat(fullUser.getPassword()).isEqualTo("secret"); - context.close(); - } - @Test - public void testExecuteFindUserByUsername() { - final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext( - "/META-INF/spring/integration/spring-integration-context.xml", UserServiceTest.class); - + private void runFindUserByUsernameTest(ConfigurableApplicationContext context) { final UserService service = context.getBean(UserService.class); User user = new User("foo", null, null); @@ -67,8 +102,14 @@ public void testExecuteFindUserByUsername() { assertThat(fullUser.getUsername()).isEqualTo("foo"); assertThat(fullUser.getEmail()).isEqualTo("foo@springintegration.org"); assertThat(fullUser.getPassword()).isEqualTo("secret"); - context.close(); - } + + private AnnotationConfigApplicationContext createJavaConfigContext() { + AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(); + context.getEnvironment().setActiveProfiles("java-config"); + context.scan("org.springframework.integration.samples.enricher.config"); + context.refresh(); + return context; + } } diff --git a/build.gradle b/build.gradle index d4ba721e6..64b0bb0d2 100644 --- a/build.gradle +++ b/build.gradle @@ -532,6 +532,9 @@ project('enricher') { api "com.h2database:h2:$h2Version" api "org.apache.logging.log4j:log4j-core:$log4jVersion" } + run { + standardInput = System.in + } } project('feed') {