-
Notifications
You must be signed in to change notification settings - Fork 2.5k
Add gRPC server and client samples with Spring Integration #377
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,47 @@ | ||
| gRPC Client Sample | ||
| ================== | ||
|
|
||
| This sample demonstrates how to use Spring Integration's gRPC support with an **Outbound Gateway** to make gRPC requests to a server. | ||
|
|
||
| The sample implements a gRPC client that calls a `HelloWorldService` demonstrating different communication patterns: | ||
|
|
||
| * **SayHello** - Unary RPC (single request, single response) | ||
| * **StreamSayHello** - Server streaming RPC (single request, multiple responses) | ||
|
|
||
| The client automatically executes both examples on startup using `ApplicationRunner` beans. | ||
|
|
||
| ## Running the Sample | ||
|
|
||
| **Important:** Start the gRPC server first (see the grpc-server sample) and it must have the same gRPC proto files as the client. | ||
|
|
||
| Then start the gRPC client using Gradle: | ||
|
|
||
| $ gradlew :grpc-client:bootRun | ||
|
|
||
| #### Using an IDE such as SpringSource Tool Suite™ (STS) | ||
|
|
||
| In STS (Eclipse), go to package **org.springframework.integration.samples.grpc**, right-click **Application** and select **Run as** --> **Java Application** (or Spring Boot Application). | ||
|
|
||
| ### Output | ||
|
|
||
| The client will automatically send requests to the server and display the responses: | ||
|
|
||
| ``` | ||
| Single response reply: message: "Hello Jack" | ||
|
|
||
| Stream received reply: Hello Jack | ||
| Stream received reply: Hello again! | ||
| ``` | ||
|
|
||
| ## Configuration | ||
|
|
||
| The gRPC server connection is configured in `application.properties`: | ||
|
|
||
| ```properties | ||
| grpc.server.host=localhost | ||
| grpc.server.port=9090 | ||
| ``` | ||
|
|
||
| ## Resources | ||
|
|
||
| * [Spring Integration gRPC Documentation](https://docs.spring.io/spring-integration/reference/grpc.html) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,44 @@ | ||
| /* | ||
| * Copyright 2026-present the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.integration.samples.grpc; | ||
|
|
||
| import org.springframework.boot.SpringApplication; | ||
| import org.springframework.boot.autoconfigure.SpringBootApplication; | ||
|
|
||
| /** | ||
| * Main application class for the gRPC client sample. | ||
| * This Spring Boot application demonstrates how to use Spring Integration | ||
| * with gRPC for client-side communication. | ||
| * | ||
| * @author Glenn Renfro | ||
| */ | ||
| @SpringBootApplication | ||
| public final class Application { | ||
|
|
||
| private Application() { | ||
| } | ||
|
|
||
| /** | ||
| * Main entry point for the gRPC client application. | ||
| * | ||
| * @param args command line arguments | ||
| */ | ||
| public static void main(String[] args) { | ||
| SpringApplication.run(Application.class, args); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,201 @@ | ||
| /* | ||
| * Copyright 2026-present the original author or authors. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * https://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.springframework.integration.samples.grpc.configuration; | ||
|
|
||
| import java.util.concurrent.CountDownLatch; | ||
| import java.util.concurrent.TimeUnit; | ||
|
|
||
| import io.grpc.ManagedChannel; | ||
| import io.grpc.ManagedChannelBuilder; | ||
| import org.apache.commons.logging.Log; | ||
| import org.apache.commons.logging.LogFactory; | ||
| import reactor.core.publisher.Flux; | ||
|
|
||
| import org.springframework.beans.factory.annotation.Value; | ||
| import org.springframework.boot.ApplicationRunner; | ||
| import org.springframework.context.annotation.Bean; | ||
| import org.springframework.context.annotation.Configuration; | ||
| import org.springframework.integration.channel.DirectChannel; | ||
| import org.springframework.integration.channel.FluxMessageChannel; | ||
| import org.springframework.integration.channel.QueueChannel; | ||
| import org.springframework.integration.dsl.IntegrationFlow; | ||
| import org.springframework.integration.grpc.dsl.Grpc; | ||
| import org.springframework.integration.grpc.outbound.GrpcOutboundGateway; | ||
| import org.springframework.integration.grpc.proto.HelloReply; | ||
| import org.springframework.integration.grpc.proto.HelloRequest; | ||
| import org.springframework.integration.grpc.proto.HelloWorldServiceGrpc; | ||
| import org.springframework.integration.support.MessageBuilder; | ||
| import org.springframework.messaging.Message; | ||
| import org.springframework.messaging.MessageChannel; | ||
|
|
||
| /** | ||
| * Configuration class for the gRPC client sample. | ||
| * Configures the gRPC channel, message channels, and integration flows | ||
| * for both single response and streaming response scenarios. | ||
| * | ||
| * @author Glenn Renfro | ||
| */ | ||
| @Configuration | ||
| class ClientHelloWorldConfiguration { | ||
|
|
||
| private static final Log LOG = LogFactory.getLog(ClientHelloWorldConfiguration.class); | ||
|
|
||
| /** | ||
| * Creates a managed gRPC channel for communication with the server. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not imperative? So, see also about those blank lines in method Javadocs. |
||
| * | ||
| * @param host the gRPC server host (default: localhost) | ||
| * @param port the gRPC server port (default: 9090) | ||
| * @return the configured managed channel | ||
| */ | ||
| @Bean(destroyMethod = "shutdownNow") | ||
| ManagedChannel managedChannel(@Value("${grpc.server.host:localhost}") String host, | ||
| @Value("${grpc.server.port:9090}") int port) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As we talked not one time: if method declaration is multi-line, a blank line before before method body. I also wonder if there are some Spring gRPC auto-configuration for clients. |
||
| return ManagedChannelBuilder.forAddress(host, port) | ||
| .usePlaintext() | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a message channel for single response gRPC requests. | ||
| * | ||
| * @return the direct message channel | ||
| */ | ||
| @Bean | ||
| MessageChannel grpcInputChannelSingleResponse() { | ||
| return new DirectChannel(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a message channel for streaming response gRPC requests. | ||
| * | ||
| * @return the direct message channel | ||
| */ | ||
| @Bean | ||
| MessageChannel grpcInputChannelStreamResponse() { | ||
| return new DirectChannel(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates a FluxMessageChannel for output. | ||
| * | ||
| * @return the flux message channel | ||
| */ | ||
| @Bean | ||
| FluxMessageChannel grpcStreamOutputChannel() { | ||
| return new FluxMessageChannel(); | ||
| } | ||
|
|
||
| /** | ||
| * Creates an application runner that sends a single gRPC request and receives a single response. | ||
| * | ||
| * @param grpcInputChannelSingleResponse the message channel for single response requests | ||
| * @param replyTimeout the time in milliseconds to await for the response. Defaults to 10,000 milliseconds. | ||
| * @return the application runner | ||
| */ | ||
| @Bean | ||
| ApplicationRunner grpcClientSingleResponse(MessageChannel grpcInputChannelSingleResponse, | ||
| @Value("${grpc.client.single.reply.timeout:10000}") long replyTimeout) { | ||
| return args -> { | ||
| HelloRequest request = HelloRequest.newBuilder().setName("Jack").build(); | ||
| QueueChannel replyChannel = new QueueChannel(); | ||
| Message<?> requestMessage = MessageBuilder.withPayload(request) | ||
| .setReplyChannel(replyChannel) | ||
| .build(); | ||
| grpcInputChannelSingleResponse.send(requestMessage); | ||
| Message<?> reply = replyChannel.receive(replyTimeout); | ||
| if (reply != null) { | ||
| LOG.info("Single response reply: " + reply.getPayload()); | ||
| } | ||
| else { | ||
| LOG.warn("No reply received"); | ||
| } | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Creates an application runner that sends a gRPC request and receives a stream of responses. | ||
| * | ||
| * @param grpcInputChannelStreamResponse the message channel for streaming response requests | ||
| * @param grpcStreamOutputChannel channel that contains the responses | ||
| * @param replyTimeout the time in seconds to await for the response. Defaults to 1 second. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why no consistency? |
||
| * @return the application runner | ||
| */ | ||
| @Bean | ||
| ApplicationRunner grpcClientStreamResponse(MessageChannel grpcInputChannelStreamResponse, | ||
| FluxMessageChannel grpcStreamOutputChannel, | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's see if this channel could be used as a |
||
| @Value("${grpc.client.stream.reply.timeout:1}") int replyTimeout) { | ||
| return args -> { | ||
| CountDownLatch latch = new CountDownLatch(1); | ||
|
|
||
| Flux.from(grpcStreamOutputChannel) | ||
| .doOnSubscribe(subscription -> { | ||
| HelloRequest request = HelloRequest.newBuilder().setName("Jack").build(); | ||
| Message<?> requestMessage = MessageBuilder.withPayload(request).build(); | ||
| grpcInputChannelStreamResponse.send(requestMessage); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When we do |
||
| }) | ||
| .map(message -> (HelloReply) message.getPayload()) | ||
| .map(HelloReply::getMessage) | ||
| .doOnNext(msg -> LOG.info("Stream received reply: " + msg)) | ||
| .doOnComplete(latch::countDown) | ||
| .doOnError(error -> { | ||
| LOG.error("Error in stream: " + error.getMessage(), error); | ||
| latch.countDown(); | ||
| }) | ||
| .subscribe(); | ||
|
|
||
| latch.await(replyTimeout, TimeUnit.SECONDS); | ||
| }; | ||
| } | ||
|
|
||
| /** | ||
| * Creates an integration flow for outbound gRPC requests with single responses. | ||
| * | ||
| * @param managedChannel the gRPC managed channel | ||
| * @param grpcInputChannelSingleResponse the input message channel | ||
| * @return the integration flow | ||
| */ | ||
| @Bean | ||
| IntegrationFlow grpcOutboundFlowSingleResponse(ManagedChannel managedChannel, | ||
| MessageChannel grpcInputChannelSingleResponse) { | ||
| return IntegrationFlow.from(grpcInputChannelSingleResponse) | ||
| .handle(Grpc.outboundGateway(managedChannel, HelloWorldServiceGrpc.class) | ||
| .methodName("SayHello")) | ||
| .get(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about the lamba-based ? |
||
| } | ||
|
|
||
| /** | ||
| * Creates an integration flow for outbound gRPC requests with streaming responses. | ||
| * | ||
| * @param managedChannel the gRPC managed channel | ||
| * @param grpcInputChannelStreamResponse the input message channel | ||
| * @param grpcStreamOutputChannel channel containing the results | ||
| * @return the integration flow | ||
| */ | ||
| @Bean | ||
| IntegrationFlow grpcOutboundFlowStreamResponse(ManagedChannel managedChannel, | ||
| MessageChannel grpcInputChannelStreamResponse, | ||
| FluxMessageChannel grpcStreamOutputChannel) { | ||
| GrpcOutboundGateway gateway = new GrpcOutboundGateway(managedChannel, HelloWorldServiceGrpc.class); | ||
| gateway.setMethodName("StreamSayHello"); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why this is still here instead of |
||
|
|
||
| return IntegrationFlow.from(grpcInputChannelStreamResponse) | ||
| .handle(gateway) | ||
| .channel(grpcStreamOutputChannel) | ||
| .get(); | ||
| } | ||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| syntax = "proto3"; | ||
| import "types.proto"; | ||
|
|
||
| package integration.grpc.hello; | ||
|
|
||
| option java_package = "org.springframework.integration.grpc.proto"; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need to double check if these packages have a proper name. |
||
|
|
||
| option java_multiple_files = true; | ||
|
|
||
| option java_outer_classname = "HelloWorldProto"; | ||
|
|
||
| // The greeting service definition. | ||
| service HelloWorldService { | ||
|
|
||
| // Sends a greeting | ||
| rpc SayHello(HelloRequest) returns (HelloReply) {} | ||
|
|
||
| // Sends a greeting and something else | ||
| rpc StreamSayHello(HelloRequest) returns (stream HelloReply) {} | ||
|
|
||
| // Sends a greeting to everyone presenting | ||
| rpc HelloToEveryOne(stream HelloRequest) returns (HelloReply) {} | ||
|
|
||
| // Streams requests and replies | ||
| rpc BidiStreamHello(stream HelloRequest) returns (stream HelloReply) {} | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we have so many service contracts if we don't implement them in the sample? |
||
|
|
||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,23 @@ | ||
| syntax = "proto3"; | ||
|
|
||
| package integration.grpc.hello; | ||
|
|
||
| option java_package = "org.springframework.integration.grpc.proto"; | ||
|
|
||
| option java_multiple_files = true; | ||
|
|
||
| option java_outer_classname = "TypesProto"; | ||
|
|
||
| // The request message containing the user's name. | ||
| message HelloRequest { | ||
|
|
||
| string name = 1; | ||
|
|
||
| } | ||
|
|
||
| // The response message containing the greetings | ||
| message HelloReply { | ||
|
|
||
| string message = 1; | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,4 @@ | ||
| grpc.server.host=localhost | ||
| grpc.server.port=9090 | ||
| grpc.client.stream.reply.timeout=1 | ||
| grpc.client.single.reply.timeout=10000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need this extra code?
I don't think start.spring.io generates such a ctor for us.