@Bean
public Subscription subscription(RedisConnectionFactory redisConnectionFactory) {
StreamMessageListenerContainer.StreamMessageListenerContainerOptions<String, ObjectRecord<String,ForwardMessage>> containerOptions = StreamMessageListenerContainer
.StreamMessageListenerContainerOptions
.builder()
.pollTimeout(Duration.ofMillis(100))
.targetType(ForwardMessage.class)
.build();
//创建消除监听容器
StreamMessageListenerContainer<String, ObjectRecord<String,ForwardMessage>> container = StreamMessageListenerContainer
.create(redisConnectionFactory, containerOptions);
//主题消息消费配置
StreamOffset<String> streamOffset = StreamOffset.create("message_forward2", ReadOffset.lastConsumed());
StreamMessageListenerContainer.StreamReadRequest<String> readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset)
.cancelOnError((err) -> false) // 异常后不停止消费
.errorHandler((err) -> log.error(err.getMessage())) //输出异常信息
.build();
//消息订阅
Subscription subscription = container.register(readRequest, streamListener());
container.start();
return subscription;
}