diff --git a/basic/grpc-client/README.md b/basic/grpc-client/README.md new file mode 100644 index 000000000..210053db3 --- /dev/null +++ b/basic/grpc-client/README.md @@ -0,0 +1,47 @@ +gRPC Client Sample +================== + +This sample demonstrates how to use Spring Integration's gRPC support with an **Outbound Gateway** to make gRPC requests to a server. + +The sample implements a gRPC client that calls a `HelloWorldService` demonstrating different communication patterns: + +* **SayHello** - Unary RPC (single request, single response) +* **StreamSayHello** - Server streaming RPC (single request, multiple responses) + +The client automatically executes both examples on startup using `ApplicationRunner` beans. + +## Running the Sample + +**Important:** Start the gRPC server first (see the grpc-server sample) and it must have the same gRPC proto files as the client. + +Then start the gRPC client using Gradle: + + $ gradlew :grpc-client:bootRun + +#### Using an IDE such as SpringSource Tool Suite™ (STS) + +In STS (Eclipse), go to package **org.springframework.integration.samples.grpc**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). + +### Output + +The client will automatically send requests to the server and display the responses: + +``` +Single response reply: message: "Hello Jack" + +Stream received reply: Hello Jack +Stream received reply: Hello again! +``` + +## Configuration + +The gRPC server connection is configured in `application.properties`: + +```properties +grpc.server.host=localhost +grpc.server.port=9090 +``` + +## Resources + +* [Spring Integration gRPC Documentation](https://docs.spring.io/spring-integration/reference/grpc.html) diff --git a/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/Application.java b/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/Application.java new file mode 100644 index 000000000..b88d85c3d --- /dev/null +++ b/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/Application.java @@ -0,0 +1,44 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Main application class for the gRPC client sample. + * This Spring Boot application demonstrates how to use Spring Integration + * with gRPC for client-side communication. + * + * @author Glenn Renfro + */ +@SpringBootApplication +public final class Application { + + private Application() { + } + + /** + * Main entry point for the gRPC client application. + * + * @param args command line arguments + */ + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/configuration/ClientHelloWorldConfiguration.java b/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/configuration/ClientHelloWorldConfiguration.java new file mode 100644 index 000000000..040d076d0 --- /dev/null +++ b/basic/grpc-client/src/main/java/org/springframework/integration/samples/grpc/configuration/ClientHelloWorldConfiguration.java @@ -0,0 +1,201 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc.configuration; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.ApplicationRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.channel.DirectChannel; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.grpc.dsl.Grpc; +import org.springframework.integration.grpc.outbound.GrpcOutboundGateway; +import org.springframework.integration.grpc.proto.HelloReply; +import org.springframework.integration.grpc.proto.HelloRequest; +import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; + +/** + * Configuration class for the gRPC client sample. + * Configures the gRPC channel, message channels, and integration flows + * for both single response and streaming response scenarios. + * + * @author Glenn Renfro + */ +@Configuration +class ClientHelloWorldConfiguration { + + private static final Log LOG = LogFactory.getLog(ClientHelloWorldConfiguration.class); + + /** + * Creates a managed gRPC channel for communication with the server. + * + * @param host the gRPC server host (default: localhost) + * @param port the gRPC server port (default: 9090) + * @return the configured managed channel + */ + @Bean(destroyMethod = "shutdownNow") + ManagedChannel managedChannel(@Value("${grpc.server.host:localhost}") String host, + @Value("${grpc.server.port:9090}") int port) { + return ManagedChannelBuilder.forAddress(host, port) + .usePlaintext() + .build(); + } + + /** + * Creates a message channel for single response gRPC requests. + * + * @return the direct message channel + */ + @Bean + MessageChannel grpcInputChannelSingleResponse() { + return new DirectChannel(); + } + + /** + * Creates a message channel for streaming response gRPC requests. + * + * @return the direct message channel + */ + @Bean + MessageChannel grpcInputChannelStreamResponse() { + return new DirectChannel(); + } + + /** + * Creates a FluxMessageChannel for output. + * + * @return the flux message channel + */ + @Bean + FluxMessageChannel grpcStreamOutputChannel() { + return new FluxMessageChannel(); + } + + /** + * Creates an application runner that sends a single gRPC request and receives a single response. + * + * @param grpcInputChannelSingleResponse the message channel for single response requests + * @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds. + * @return the application runner + */ + @Bean + ApplicationRunner grpcClientSingleResponse(MessageChannel grpcInputChannelSingleResponse, + @Value("${grpc.client.single.reply.timeout:10000}") long replyTimeout) { + return args -> { + HelloRequest request = HelloRequest.newBuilder().setName("Jack").build(); + QueueChannel replyChannel = new QueueChannel(); + Message requestMessage = MessageBuilder.withPayload(request) + .setReplyChannel(replyChannel) + .build(); + grpcInputChannelSingleResponse.send(requestMessage); + Message reply = replyChannel.receive(replyTimeout); + if (reply != null) { + LOG.info("Single response reply: " + reply.getPayload()); + } + else { + LOG.warn("No reply received"); + } + }; + } + + /** + * Creates an application runner that sends a gRPC request and receives a stream of responses. + * + * @param grpcInputChannelStreamResponse the message channel for streaming response requests + * @param grpcStreamOutputChannel channel that contains the responses + * @param replyTimeout the time in seconds to await for the response. Defaults to 1 second. + * @return the application runner + */ + @Bean + ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStreamResponse, + FluxMessageChannel grpcStreamOutputChannel, + @Value("${grpc.client.stream.reply.timeout:1}") int replyTimeout) { + return args -> { + CountDownLatch latch = new CountDownLatch(1); + + Flux.from(grpcStreamOutputChannel) + .doOnSubscribe(subscription -> { + HelloRequest request = HelloRequest.newBuilder().setName("Jack").build(); + Message requestMessage = MessageBuilder.withPayload(request).build(); + grpcInputChannelStreamResponse.send(requestMessage); + }) + .map(message -> (HelloReply) message.getPayload()) + .map(HelloReply::getMessage) + .doOnNext(msg -> LOG.info("Stream received reply: " + msg)) + .doOnComplete(latch::countDown) + .doOnError(error -> { + LOG.error("Error in stream: " + error.getMessage(), error); + latch.countDown(); + }) + .subscribe(); + + latch.await(replyTimeout, TimeUnit.SECONDS); + }; + } + + /** + * Creates an integration flow for outbound gRPC requests with single responses. + * + * @param managedChannel the gRPC managed channel + * @param grpcInputChannelSingleResponse the input message channel + * @return the integration flow + */ + @Bean + IntegrationFlow grpcOutboundFlowSingleResponse(ManagedChannel managedChannel, + MessageChannel grpcInputChannelSingleResponse) { + return IntegrationFlow.from(grpcInputChannelSingleResponse) + .handle(Grpc.outboundGateway(managedChannel, HelloWorldServiceGrpc.class) + .methodName("SayHello")) + .get(); + } + + /** + * Creates an integration flow for outbound gRPC requests with streaming responses. + * + * @param managedChannel the gRPC managed channel + * @param grpcInputChannelStreamResponse the input message channel + * @param grpcStreamOutputChannel channel containing the results + * @return the integration flow + */ + @Bean + IntegrationFlow grpcOutboundFlowStreamResponse(ManagedChannel managedChannel, + MessageChannel grpcInputChannelStreamResponse, + FluxMessageChannel grpcStreamOutputChannel) { + GrpcOutboundGateway gateway = new GrpcOutboundGateway(managedChannel, HelloWorldServiceGrpc.class); + gateway.setMethodName("StreamSayHello"); + + return IntegrationFlow.from(grpcInputChannelStreamResponse) + .handle(gateway) + .channel(grpcStreamOutputChannel) + .get(); + } + +} diff --git a/basic/grpc-client/src/main/proto/hello.proto b/basic/grpc-client/src/main/proto/hello.proto new file mode 100644 index 000000000..9112bd4c1 --- /dev/null +++ b/basic/grpc-client/src/main/proto/hello.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +import "types.proto"; + +package integration.grpc.hello; + +option java_package = "org.springframework.integration.grpc.proto"; + +option java_multiple_files = true; + +option java_outer_classname = "HelloWorldProto"; + +// The greeting service definition. +service HelloWorldService { + + // Sends a greeting + rpc SayHello(HelloRequest) returns (HelloReply) {} + + // Sends a greeting and something else + rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {} + + // Sends a greeting to everyone presenting + rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {} + + // Streams requests and replies + rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} + +} diff --git a/basic/grpc-client/src/main/proto/types.proto b/basic/grpc-client/src/main/proto/types.proto new file mode 100644 index 000000000..e6bb279cc --- /dev/null +++ b/basic/grpc-client/src/main/proto/types.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package integration.grpc.hello; + +option java_package = "org.springframework.integration.grpc.proto"; + +option java_multiple_files = true; + +option java_outer_classname = "TypesProto"; + +// The request message containing the user's name. +message HelloRequest { + + string name = 1; + +} + +// The response message containing the greetings +message HelloReply { + + string message = 1; + +} diff --git a/basic/grpc-client/src/main/resources/application.properties b/basic/grpc-client/src/main/resources/application.properties new file mode 100644 index 000000000..a7b7cd779 --- /dev/null +++ b/basic/grpc-client/src/main/resources/application.properties @@ -0,0 +1,4 @@ +grpc.server.host=localhost +grpc.server.port=9090 +grpc.client.stream.reply.timeout=1 +grpc.client.single.reply.timeout=10000 diff --git a/basic/grpc-client/src/test/java/org/springframework/integration/samples/grpc/GrpcClientTests.java b/basic/grpc-client/src/test/java/org/springframework/integration/samples/grpc/GrpcClientTests.java new file mode 100644 index 000000000..1bfa6aaea --- /dev/null +++ b/basic/grpc-client/src/test/java/org/springframework/integration/samples/grpc/GrpcClientTests.java @@ -0,0 +1,186 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.Server; +import io.grpc.ServerBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.ApplicationRunner; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.integration.channel.FluxMessageChannel; +import org.springframework.integration.channel.QueueChannel; +import org.springframework.integration.grpc.proto.HelloReply; +import org.springframework.integration.grpc.proto.HelloRequest; +import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc; +import org.springframework.integration.support.MessageBuilder; +import org.springframework.messaging.Message; +import org.springframework.messaging.MessageChannel; +import org.springframework.test.context.DynamicPropertyRegistry; +import org.springframework.test.context.DynamicPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Glenn Renfro + */ +@SpringBootTest +class GrpcClientTests { + + private static Server mockGrpcServer; + + private static int serverPort; + + @Autowired + private MessageChannel grpcInputChannelSingleResponse; + + @Autowired + private MessageChannel grpcInputChannelStreamResponse; + + @Autowired + private FluxMessageChannel grpcStreamOutputChannel; + + @DynamicPropertySource + static void grpcServerProperties(DynamicPropertyRegistry registry) throws IOException { + mockGrpcServer = ServerBuilder.forPort(0) + .addService(new MockHelloWorldService()) + .build() + .start(); + + serverPort = mockGrpcServer.getPort(); + + registry.add("grpc.server.host", () -> "localhost"); + registry.add("grpc.server.port", () -> serverPort); + } + + @AfterAll + static void tearDown() { + if (mockGrpcServer != null) { + mockGrpcServer.shutdownNow(); + } + } + + @Test + void shouldSendSingleRequestAndReceiveSingleResponse() { + HelloRequest request = HelloRequest.newBuilder() + .setName("TestUser") + .build(); + + QueueChannel replyChannel = new QueueChannel(); + Message requestMessage = MessageBuilder.withPayload(request) + .setReplyChannel(replyChannel) + .build(); + + this.grpcInputChannelSingleResponse.send(requestMessage); + + Message reply = replyChannel.receive(5000); + + assertThat(reply).isNotNull(); + assertThat(reply.getPayload()).isInstanceOf(HelloReply.class); + + HelloReply helloReply = (HelloReply) reply.getPayload(); + assertThat(helloReply.getMessage()).isEqualTo("Hello TestUser"); + } + + @Test + void shouldSendStreamRequestAndReceiveMultipleResponses() throws InterruptedException { + HelloRequest request = HelloRequest.newBuilder() + .setName("StreamUser") + .build(); + + List receivedReplies = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + Flux.from(this.grpcStreamOutputChannel) + .doOnSubscribe(subscription -> { + Message requestMessage = MessageBuilder.withPayload(request).build(); + this.grpcInputChannelStreamResponse.send(requestMessage); + }).take(2) + .map(message -> (HelloReply) message.getPayload()) + .doOnNext(receivedReplies::add) + .doOnComplete(latch::countDown) + .doOnError(error -> latch.countDown()) + .subscribe(); + + boolean completed = latch.await(10, TimeUnit.SECONDS); + + assertThat(completed).isTrue(); + assertThat(receivedReplies).hasSize(2).extracting(HelloReply::getMessage) + .containsExactly("Hello StreamUser", "Hello again!"); + } + + /** + * Mock gRPC service implementation for testing. + */ + private static class MockHelloWorldService extends HelloWorldServiceGrpc.HelloWorldServiceImplBase { + + @Override + public void sayHello(HelloRequest request, StreamObserver responseObserver) { + HelloReply reply = HelloReply.newBuilder() + .setMessage("Hello " + request.getName()) + .build(); + responseObserver.onNext(reply); + responseObserver.onCompleted(); + } + + @Override + public void streamSayHello(HelloRequest request, StreamObserver responseObserver) { + HelloReply reply1 = HelloReply.newBuilder() + .setMessage("Hello " + request.getName()) + .build(); + HelloReply reply2 = HelloReply.newBuilder() + .setMessage("Hello again!") + .build(); + + responseObserver.onNext(reply1); + responseObserver.onNext(reply2); + responseObserver.onCompleted(); + } + + } + + @TestConfiguration + private static class GrpcClientTestConfiguration { + + @Bean + public ApplicationRunner grpcClientSingleResponse() { + return args -> { + // No-op for tests + }; + } + + @Bean + public ApplicationRunner grpcClientStreamResponse() { + return args -> { + // No-op for tests + }; + } + + } +} diff --git a/basic/grpc-server/README.md b/basic/grpc-server/README.md new file mode 100644 index 000000000..61179cf49 --- /dev/null +++ b/basic/grpc-server/README.md @@ -0,0 +1,40 @@ +gRPC Server Sample +================== + +This sample demonstrates how to use Spring Integration's gRPC support with an **Inbound Gateway** to handle gRPC requests. + +The sample implements a gRPC server that exposes a `HelloWorldService` with multiple RPC methods demonstrating different communication patterns: + +* **SayHello** - Unary RPC (single request, single response) +* **StreamSayHello** - Server streaming RPC (single request, multiple responses) + +## Running the Sample + +Start the gRPC server using Gradle: + + $ gradlew :grpc-server:bootRun + +The server will start on port 9090 by default (configured in `application.properties`). + +#### Using an IDE such as SpringSource Tool Suite™ (STS) + +In STS (Eclipse), go to package **org.springframework.integration.samples.grpc**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). + +## Testing the Server + +You can test the server using the gRPC client sample or tools like `grpcurl`: + +```bash +# List available services +grpcurl -plaintext localhost:9090 list + +# Call the SayHello method +grpcurl -plaintext -d '{"name": "World"}' localhost:9090 integration.grpc.hello.HelloWorldService/SayHello + +# Call the StreamSayHello method (server streaming) +grpcurl -plaintext -d '{"name": "World"}' localhost:9090 integration.grpc.hello.HelloWorldService/StreamSayHello +``` + +## Resources + +* [Spring Integration gRPC Documentation](https://docs.spring.io/spring-integration/reference/grpc.html) diff --git a/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/Application.java b/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/Application.java new file mode 100644 index 000000000..6f6885a35 --- /dev/null +++ b/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/Application.java @@ -0,0 +1,44 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** + * Main application class for the gRPC server sample. + * This Spring Boot application demonstrates how to use Spring Integration + * with gRPC for server-side communication. + * + * @author Glenn Renfro + */ +@SpringBootApplication +public final class Application { + + private Application() { + } + + /** + * Main entry point for the gRPC server application. + * + * @param args command line arguments + */ + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/configuration/ServerHelloWorldConfiguration.java b/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/configuration/ServerHelloWorldConfiguration.java new file mode 100644 index 000000000..a732eea72 --- /dev/null +++ b/basic/grpc-server/src/main/java/org/springframework/integration/samples/grpc/configuration/ServerHelloWorldConfiguration.java @@ -0,0 +1,88 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc.configuration; + +import reactor.core.publisher.Flux; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.integration.dsl.IntegrationFlow; +import org.springframework.integration.grpc.GrpcHeaders; +import org.springframework.integration.grpc.inbound.GrpcInboundGateway; +import org.springframework.integration.grpc.proto.HelloReply; +import org.springframework.integration.grpc.proto.HelloRequest; +import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc; +import org.springframework.messaging.Message; + +/** + * Configuration class for the gRPC server sample. + * Configures the gRPC inbound gateway and integration flows for handling + * gRPC service methods for single request/response and request/streaming responses. + * + * @author Glenn Renfro + */ +@Configuration +class ServerHelloWorldConfiguration { + + /** + * Creates the main integration flow for handling incoming gRPC requests. + * Routes requests to different sub-flows based on the service method name. + * + * @param helloWorldService the gRPC inbound gateway + * @return the integration flow + */ + @Bean + IntegrationFlow grpcIntegrationFlow(GrpcInboundGateway helloWorldService) { + return IntegrationFlow.from(helloWorldService) + .route(Message.class, message -> + message.getHeaders().get(GrpcHeaders.SERVICE_METHOD, String.class), + router -> router + + .subFlowMapping("SayHello", flow -> flow + .transform(this::requestReply)) + + .subFlowMapping("StreamSayHello", flow -> flow + .transform(this::streamReply)) + ) + .get(); + } + + private HelloReply requestReply(HelloRequest helloRequest) { + return newHelloReply("Hello " + helloRequest.getName()); + } + + private Flux streamReply(HelloRequest helloRequest) { + return Flux.just( + newHelloReply("Hello " + helloRequest.getName()), + newHelloReply("Hello again!")); + } + + private static HelloReply newHelloReply(String message) { + return HelloReply.newBuilder().setMessage(message).build(); + } + + /** + * Creates the gRPC inbound gateway for the HelloWorld service. + * + * @return the configured gRPC inbound gateway + */ + @Bean + GrpcInboundGateway helloWorldService() { + return new GrpcInboundGateway(HelloWorldServiceGrpc.HelloWorldServiceImplBase.class); + } + +} diff --git a/basic/grpc-server/src/main/proto/hello.proto b/basic/grpc-server/src/main/proto/hello.proto new file mode 100644 index 000000000..9112bd4c1 --- /dev/null +++ b/basic/grpc-server/src/main/proto/hello.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; +import "types.proto"; + +package integration.grpc.hello; + +option java_package = "org.springframework.integration.grpc.proto"; + +option java_multiple_files = true; + +option java_outer_classname = "HelloWorldProto"; + +// The greeting service definition. +service HelloWorldService { + + // Sends a greeting + rpc SayHello(HelloRequest) returns (HelloReply) {} + + // Sends a greeting and something else + rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {} + + // Sends a greeting to everyone presenting + rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {} + + // Streams requests and replies + rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} + +} diff --git a/basic/grpc-server/src/main/proto/types.proto b/basic/grpc-server/src/main/proto/types.proto new file mode 100644 index 000000000..e6bb279cc --- /dev/null +++ b/basic/grpc-server/src/main/proto/types.proto @@ -0,0 +1,23 @@ +syntax = "proto3"; + +package integration.grpc.hello; + +option java_package = "org.springframework.integration.grpc.proto"; + +option java_multiple_files = true; + +option java_outer_classname = "TypesProto"; + +// The request message containing the user's name. +message HelloRequest { + + string name = 1; + +} + +// The response message containing the greetings +message HelloReply { + + string message = 1; + +} diff --git a/basic/grpc-server/src/main/resources/application.properties b/basic/grpc-server/src/main/resources/application.properties new file mode 100644 index 000000000..0de3ffebc --- /dev/null +++ b/basic/grpc-server/src/main/resources/application.properties @@ -0,0 +1 @@ +spring.grpc.server.port=9090 diff --git a/basic/grpc-server/src/test/java/org/springframework/integration/samples/grpc/GrpcServerTests.java b/basic/grpc-server/src/test/java/org/springframework/integration/samples/grpc/GrpcServerTests.java new file mode 100644 index 000000000..6b1d05546 --- /dev/null +++ b/basic/grpc-server/src/test/java/org/springframework/integration/samples/grpc/GrpcServerTests.java @@ -0,0 +1,121 @@ +/* + * Copyright 2026-present the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.integration.samples.grpc; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import io.grpc.stub.StreamObserver; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.grpc.server.lifecycle.GrpcServerLifecycle; +import org.springframework.integration.grpc.proto.HelloReply; +import org.springframework.integration.grpc.proto.HelloRequest; +import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc; +import org.springframework.test.context.TestPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Glenn Renfro + */ +@SpringBootTest +@TestPropertySource(properties = "spring.grpc.server.port=0") +class GrpcServerTests { + + @Autowired + private GrpcServerLifecycle grpcServerLifecycle; + + private int grpcServerPort; + + private ManagedChannel channel; + + private HelloWorldServiceGrpc.HelloWorldServiceBlockingStub blockingStub; + + private HelloWorldServiceGrpc.HelloWorldServiceStub asyncStub; + + @BeforeEach + void setUp() { + this.grpcServerPort = this.grpcServerLifecycle.getPort(); + + this.channel = ManagedChannelBuilder.forAddress("localhost", this.grpcServerPort) + .usePlaintext() + .build(); + this.blockingStub = HelloWorldServiceGrpc.newBlockingStub(this.channel); + this.asyncStub = HelloWorldServiceGrpc.newStub(this.channel); + } + + @AfterEach + void tearDown() throws InterruptedException { + if (this.channel != null) { + this.channel.shutdown().awaitTermination(5, TimeUnit.SECONDS); + } + } + + @Test + void testSayHelloBlockingCall() { + HelloRequest request = HelloRequest.newBuilder() + .setName("Integration Test") + .build(); + + HelloReply response = this.blockingStub.sayHello(request); + + assertThat(response).isNotNull().extracting(HelloReply::getMessage).isEqualTo("Hello Integration Test"); + } + + @Test + void testStreamSayHello() throws InterruptedException { + HelloRequest request = HelloRequest.newBuilder() + .setName("Streaming Test") + .build(); + + List responses = new ArrayList<>(); + CountDownLatch latch = new CountDownLatch(1); + + StreamObserver responseObserver = new StreamObserver() { + @Override + public void onNext(HelloReply value) { + responses.add(value); + } + + @Override + public void onError(Throwable t) { + latch.countDown(); + } + + @Override + public void onCompleted() { + latch.countDown(); + } + }; + + this.asyncStub.streamSayHello(request, responseObserver); + + assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); + assertThat(responses).hasSize(2).extracting(HelloReply::getMessage) + .containsExactly("Hello Streaming Test", "Hello again!"); + } + +} diff --git a/build.gradle b/build.gradle index a33d37ffc..1a9a0d522 100644 --- a/build.gradle +++ b/build.gradle @@ -14,6 +14,7 @@ buildscript { classpath 'org.gretty:gretty:4.1.10' classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlinVersion" classpath "org.jetbrains.kotlin:kotlin-allopen:$kotlinVersion" + classpath "com.google.protobuf:protobuf-gradle-plugin:0.9.4" } } @@ -31,6 +32,8 @@ ext { linkScmDevConnection = 'scm:git:ssh://git@github.com:spring-projects/spring-integration-samples.git' } + + allprojects { group = 'org.springframework.integration.samples' @@ -306,6 +309,7 @@ subprojects { subproject -> flexjsonVersion = '3.3' graalvmVersion = '25.0.2' groovyVersion = '5.0.3' + grpcVersion = '1.78.0' hsqldbVersion = '2.7.4' h2Version = '2.4.240' jacksonVersion = '2.21.0' @@ -325,6 +329,7 @@ subprojects { subproject -> mongoDriverVersion = '5.6.2' oracleDriverVersion = '23.26.0.0.0' postgresVersion = '42.7.9' + protobufVersion = '4.29.4' slf4jVersion = '2.0.17' springCloudVersion = '2025.1.0' springIntegrationVersion = '7.1.0-SNAPSHOT' @@ -642,7 +647,6 @@ project('ftp') { runtimeOnly "org.slf4j:slf4j-log4j12:$slf4jVersion" testImplementation "org.apache.logging.log4j:log4j-core:$log4jVersion" - testImplementation 'org.springframework.integration:spring-integration-test' } test { @@ -807,7 +811,6 @@ project('mqtt') { api 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.5' testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation "org.springframework.integration:spring-integration-test" testImplementation "org.testcontainers:testcontainers-junit-jupiter:$testcontainersVersion" } @@ -826,6 +829,120 @@ project('mqtt') { } +project('grpc-server') { + description = 'gRPC Server Basic Sample' + + apply plugin: 'org.springframework.boot' + apply plugin: 'com.google.protobuf' + + dependencies { + implementation 'org.springframework.boot:spring-boot-starter-integration' + implementation "org.springframework.integration:spring-integration-grpc:$springIntegrationVersion" + implementation 'org.springframework.grpc:spring-grpc-spring-boot-starter:1.0.2' + + implementation("com.google.protobuf:protobuf-java:$protobufVersion") + + //Test + testImplementation 'org.springframework.boot:spring-boot-starter-test' + } + + + protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion" + } + } + generateProtoTasks { + all()*.plugins { + grpc { + option '@generated=omit' + } + } + } + } + + // Exclude generated protobuf/grpc code from checkstyle + checkstyleMain { + exclude '**/proto/**' + } + + springBoot { + mainClass = 'org.springframework.integration.samples.grpc.Application' + } + + tasks.register('run', JavaExec) { + mainClass = 'org.springframework.integration.samples.grpc.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } + +} + + +project('grpc-client') { + description = 'gRPC client Basic Sample' + + apply plugin: 'org.springframework.boot' + apply plugin: 'com.google.protobuf' + + dependencies { + implementation 'org.springframework.boot:spring-boot-starter-integration' + implementation "org.springframework.integration:spring-integration-grpc:$springIntegrationVersion" + implementation 'org.springframework.grpc:spring-grpc-spring-boot-starter:1.0.2' + + implementation("com.google.protobuf:protobuf-java:$protobufVersion") + + //Test + testImplementation 'org.springframework.boot:spring-boot-starter-test' + } + + + protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } + plugins { + grpc { + artifact = "io.grpc:protoc-gen-grpc-java:$grpcVersion" + } + } + generateProtoTasks { + all()*.plugins { + grpc { + option '@generated=omit' + } + } + } + } + + // Exclude generated protobuf/grpc code from checkstyle + checkstyleMain { + exclude '**/proto/**' + } + + springBoot { + mainClass = 'org.springframework.integration.samples.grpc.Application' + } + + tasks.register('run', JavaExec) { + mainClass = 'org.springframework.integration.samples.grpc.Application' + classpath = sourceSets.main.runtimeClasspath + } + + tasks.withType(JavaExec) { + standardInput = System.in + } + +} + + project('si4demo') { description = 'Java Configuration/DSL Sample' @@ -911,8 +1028,6 @@ project('jms') { } api "org.apache.activemq:artemis-jakarta-client:$artemisVersion" api "org.apache.logging.log4j:log4j-core:$log4jVersion" - - testImplementation 'org.springframework.integration:spring-integration-test' } } @@ -1085,7 +1200,6 @@ project('tcp-client-server') { dependencies { api 'org.springframework.integration:spring-integration-ip' api "org.apache.commons:commons-lang3:$commonsLangVersion" - api 'org.springframework.integration:spring-integration-test' api "org.apache.logging.log4j:log4j-core:$log4jVersion" } } @@ -1107,7 +1221,6 @@ project('testcontainers-rabbitmq') { testImplementation('org.springframework.boot:spring-boot-starter-test') { exclude group: 'org.junit.vintage', module: 'junit-vintage-engine' } - testImplementation 'org.springframework.integration:spring-integration-test' testImplementation 'org.springframework.amqp:spring-rabbit-test' testImplementation 'org.springframework.cloud:spring-cloud-starter' @@ -1138,8 +1251,6 @@ project('testing-examples') { api 'org.springframework.integration:spring-integration-ws' api 'org.springframework:spring-webmvc' api "org.apache.logging.log4j:log4j-core:$log4jVersion" - - testImplementation 'org.springframework.integration:spring-integration-test' } } @@ -1277,8 +1388,6 @@ project('mail-attachments') { api "commons-io:commons-io:$commonsIoVersion" api "org.apache.logging.log4j:log4j-core:$log4jVersion" - testImplementation 'org.springframework.integration:spring-integration-test' - } } @@ -1496,7 +1605,6 @@ project('tcp-async-bi-directional') { api "org.springframework.integration:spring-integration-ip" testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation "org.springframework.integration:spring-integration-test" } springBoot { @@ -1515,8 +1623,6 @@ project('tcp-client-server-multiplex') { dependencies { api 'org.springframework.integration:spring-integration-ip' api "org.apache.logging.log4j:log4j-core:$log4jVersion" - - testImplementation 'org.springframework.integration:spring-integration-test' } } @@ -1642,7 +1748,6 @@ project('file-split-ftp') { api "org.eclipse.angus:jakarta.mail:$mailVersion" testImplementation 'org.springframework.boot:spring-boot-starter-test' - testImplementation 'org.springframework.integration:spring-integration-test' testImplementation('com.icegreen:greenmail:2.1.0-alpha-3') { exclude group: 'com.sun.mail' exclude group: 'jakarta.activation'