Skip to content

Commit 55141f8

Browse files
mp911dechristophstrobl
authored andcommitted
DATAREDIS-1230 - Revise error handling in StreamReceiver and StreamMessageListenerContainer.
Reading and deserialization in StreamReceiver and StreamMessageListener is now decoupled from each other to allow fine-grained control over errors and resumption. Previously, we used the Template API to read and deserialize stream records. Now the actual read happens before the deserialization so that errors during deserialization of individual messages can be handled properly. This split also allows advancing in the stream read. Previously, the failed deserialization prevented of getting hold of the non-serialized Stream record which caused the stream receiver to remain at the offset that fetched the offending record which effectively lead to an infinite loop. We also support a resume function in StreamReceiver to control whether stream reads should be resumed or terminated. Original Pull Request: #576
1 parent be22fc6 commit 55141f8

12 files changed

+413
-84
lines changed

src/main/java/org/springframework/data/redis/core/DefaultReactiveStreamOperations.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -416,7 +416,8 @@ private HV deserializeHashValue(ByteBuffer buffer) {
416416
return (HV) serializationContext.getHashValueSerializationPair().read(buffer);
417417
}
418418

419-
private MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record) {
419+
@Override
420+
public MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record) {
420421
return record.map(it -> it.mapEntries(this::deserializeRecordFields).withStreamKey(readKey(record.getStream())));
421422
}
422423

src/main/java/org/springframework/data/redis/core/DefaultStreamOperations.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,11 @@ public <V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType) {
347347
return objectMapper.getHashMapper(targetType);
348348
}
349349

350+
@Override
351+
public MapRecord<K, HK, HV> deserializeRecord(ByteRecord record) {
352+
return record.deserialize(keySerializer(), hashKeySerializer(), hashValueSerializer());
353+
}
354+
350355
protected byte[] serializeHashValueIfRequires(HV value) {
351356
return hashValueSerializerPresent() ? serialize(value, hashValueSerializer())
352357
: objectMapper.getConversionService().convert(value, byte[].class);
@@ -386,7 +391,7 @@ public final List<MapRecord<K, HK, HV>> doInRedis(RedisConnection connection) {
386391

387392
List<MapRecord<K, HK, HV>> result = new ArrayList<>();
388393
for (ByteRecord record : raw) {
389-
result.add(record.deserialize(keySerializer(), hashKeySerializer(), hashValueSerializer()));
394+
result.add(deserializeRecord(record));
390395
}
391396

392397
return result;

src/main/java/org/springframework/data/redis/core/ReactiveStreamOperations.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Map;
2323

2424
import org.reactivestreams.Publisher;
25+
2526
import org.springframework.data.domain.Range;
2627
import org.springframework.data.redis.connection.RedisZSetCommands.Limit;
2728
import org.springframework.data.redis.connection.stream.*;
@@ -343,7 +344,7 @@ default <V> Flux<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<Str
343344

344345
Assert.notNull(targetType, "Target type must not be null");
345346

346-
return range(key, range, limit).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
347+
return range(key, range, limit).map(it -> map(it, targetType));
347348
}
348349

349350
/**
@@ -424,7 +425,7 @@ default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions
424425

425426
Assert.notNull(targetType, "Target type must not be null");
426427

427-
return read(readOptions, streams).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
428+
return read(readOptions, streams).map(it -> map(it, targetType));
428429
}
429430

430431
/**
@@ -478,7 +479,7 @@ default <V> Flux<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer
478479

479480
Assert.notNull(targetType, "Target type must not be null");
480481

481-
return read(consumer, readOptions, streams).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
482+
return read(consumer, readOptions, streams).map(it -> map(it, targetType));
482483
}
483484

484485
/**
@@ -532,7 +533,7 @@ default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
532533

533534
Assert.notNull(targetType, "Target type must not be null");
534535

535-
return reverseRange(key, range, limit).map(it -> StreamObjectMapper.toObjectRecord(this, it, targetType));
536+
return reverseRange(key, range, limit).map(it -> map(it, targetType));
536537
}
537538

538539
/**
@@ -566,4 +567,29 @@ default <V> Flux<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
566567
*/
567568
@Override
568569
<V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType);
570+
571+
/**
572+
* Map records from {@link MapRecord} to {@link ObjectRecord}.
573+
*
574+
* @param record the stream records to map.
575+
* @param targetType the target type of the payload.
576+
* @return the mapped {@link ObjectRecord}.
577+
* @since 2.x
578+
*/
579+
default <V> ObjectRecord<K, V> map(MapRecord<K, HK, HV> record, Class<V> targetType) {
580+
581+
Assert.notNull(record, "Records must not be null");
582+
Assert.notNull(targetType, "Target type must not be null");
583+
584+
return StreamObjectMapper.toObjectRecord(record, this, targetType);
585+
}
586+
587+
/**
588+
* Deserialize a {@link ByteBufferRecord} using the configured serialization context into a {@link MapRecord}.
589+
*
590+
* @param record the stream record to map.
591+
* @return deserialized {@link MapRecord}.
592+
* @since 2.x
593+
*/
594+
MapRecord<K, HK, HV> deserializeRecord(ByteBufferRecord record);
569595
}

src/main/java/org/springframework/data/redis/core/StreamObjectMapper.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,13 +128,13 @@ static <K, V, HK, HV> MapRecord<K, HK, HV> toMapRecord(HashMapperProvider<HK, HV
128128
/**
129129
* Convert the given {@link Record} into an {@link ObjectRecord}.
130130
*
131-
* @param provider provider for {@link HashMapper} to apply mapping for {@link ObjectRecord}.
132131
* @param source the source value.
132+
* @param provider provider for {@link HashMapper} to apply mapping for {@link ObjectRecord}.
133133
* @param targetType the desired target type.
134134
* @return the converted {@link ObjectRecord}.
135135
*/
136-
static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(HashMapperProvider<HK, HV> provider,
137-
MapRecord<K, HK, HV> source, Class<V> targetType) {
136+
static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(MapRecord<K, HK, HV> source,
137+
HashMapperProvider<HK, HV> provider, Class<V> targetType) {
138138
return source.toObjectRecord(provider.getHashMapper(targetType));
139139
}
140140

@@ -149,7 +149,7 @@ static <K, V, HK, HV> ObjectRecord<K, V> toObjectRecord(HashMapperProvider<HK, H
149149
* {@literal null}.
150150
*/
151151
@Nullable
152-
static <K, V, HK, HV> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, HK, HV>> records,
152+
static <K, V, HK, HV> List<ObjectRecord<K, V>> toObjectRecords(@Nullable List<MapRecord<K, HK, HV>> records,
153153
HashMapperProvider<HK, HV> hashMapperProvider, Class<V> targetType) {
154154

155155
if (records == null) {
@@ -161,7 +161,7 @@ static <K, V, HK, HV> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, H
161161
}
162162

163163
if (records.size() == 1) {
164-
return Collections.singletonList(toObjectRecord(hashMapperProvider, records.get(0), targetType));
164+
return Collections.singletonList(toObjectRecord(records.get(0), hashMapperProvider, targetType));
165165
}
166166

167167
List<ObjectRecord<K, V>> transformed = new ArrayList<>(records.size());

src/main/java/org/springframework/data/redis/core/StreamOperations.java

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ default <V> List<ObjectRecord<K, V>> range(Class<V> targetType, K key, Range<Str
345345

346346
Assert.notNull(targetType, "Target type must not be null");
347347

348-
return StreamObjectMapper.map(range(key, range, limit), this, targetType);
348+
return map(range(key, range, limit), targetType);
349349
}
350350

351351
/**
@@ -398,7 +398,7 @@ default <V> List<ObjectRecord<K, V>> read(Class<V> targetType, StreamReadOptions
398398

399399
Assert.notNull(targetType, "Target type must not be null");
400400

401-
return StreamObjectMapper.map(read(readOptions, streams), this, targetType);
401+
return map(read(readOptions, streams), targetType);
402402
}
403403

404404
/**
@@ -456,7 +456,7 @@ default <V> List<ObjectRecord<K, V>> read(Class<V> targetType, Consumer consumer
456456

457457
Assert.notNull(targetType, "Target type must not be null");
458458

459-
return StreamObjectMapper.map(read(consumer, readOptions, streams), this, targetType);
459+
return map(read(consumer, readOptions, streams), targetType);
460460
}
461461

462462
/**
@@ -512,7 +512,7 @@ default <V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
512512

513513
Assert.notNull(targetType, "Target type must not be null");
514514

515-
return StreamObjectMapper.map(reverseRange(key, range, limit), this, targetType);
515+
return map(reverseRange(key, range, limit), targetType);
516516
}
517517

518518
/**
@@ -549,4 +549,45 @@ default <V> List<ObjectRecord<K, V>> reverseRange(Class<V> targetType, K key, Ra
549549
@Override
550550
<V> HashMapper<V, HK, HV> getHashMapper(Class<V> targetType);
551551

552+
/**
553+
* Map record from {@link MapRecord} to {@link ObjectRecord}.
554+
*
555+
* @param record the stream record to map.
556+
* @param targetType the target type of the payload.
557+
* @return the mapped {@link ObjectRecord}.
558+
* @since 2.x
559+
*/
560+
default <V> ObjectRecord<K, V> map(MapRecord<K, HK, HV> record, Class<V> targetType) {
561+
562+
Assert.notNull(record, "Record must not be null");
563+
Assert.notNull(targetType, "Target type must not be null");
564+
565+
return StreamObjectMapper.toObjectRecord(record, this, targetType);
566+
}
567+
568+
/**
569+
* Map records from {@link MapRecord} to {@link ObjectRecord}s.
570+
*
571+
* @param records the stream records to map.
572+
* @param targetType the target type of the payload.
573+
* @return the mapped {@link ObjectRecord object records}.
574+
* @since 2.x
575+
*/
576+
@Nullable
577+
default <V> List<ObjectRecord<K, V>> map(@Nullable List<MapRecord<K, HK, HV>> records, Class<V> targetType) {
578+
579+
Assert.notNull(records, "Records must not be null");
580+
Assert.notNull(targetType, "Target type must not be null");
581+
582+
return StreamObjectMapper.toObjectRecords(records, this, targetType);
583+
}
584+
585+
/**
586+
* Deserialize a {@link ByteRecord} using the configured serializers into a {@link MapRecord}.
587+
*
588+
* @param record the stream record to map.
589+
* @return deserialized {@link MapRecord}.
590+
* @since 2.x
591+
*/
592+
MapRecord<K, HK, HV> deserializeRecord(ByteRecord record);
552593
}

src/main/java/org/springframework/data/redis/stream/DefaultStreamMessageListenerContainer.java

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,25 @@
1919
import java.util.ArrayList;
2020
import java.util.List;
2121
import java.util.concurrent.Executor;
22-
import java.util.function.BiFunction;
22+
import java.util.function.Function;
2323

2424
import org.apache.commons.logging.Log;
2525
import org.apache.commons.logging.LogFactory;
26+
27+
import org.springframework.core.convert.TypeDescriptor;
2628
import org.springframework.dao.DataAccessResourceFailureException;
2729
import org.springframework.data.redis.connection.RedisConnectionFactory;
30+
import org.springframework.data.redis.connection.stream.ByteRecord;
2831
import org.springframework.data.redis.connection.stream.Consumer;
32+
import org.springframework.data.redis.connection.stream.MapRecord;
2933
import org.springframework.data.redis.connection.stream.ReadOffset;
3034
import org.springframework.data.redis.connection.stream.Record;
3135
import org.springframework.data.redis.connection.stream.StreamOffset;
3236
import org.springframework.data.redis.connection.stream.StreamReadOptions;
37+
import org.springframework.data.redis.core.RedisCallback;
3338
import org.springframework.data.redis.core.RedisTemplate;
3439
import org.springframework.data.redis.core.StreamOperations;
40+
import org.springframework.data.redis.serializer.RedisSerializer;
3541
import org.springframework.util.Assert;
3642
import org.springframework.util.ErrorHandler;
3743
import org.springframework.util.ObjectUtils;
@@ -79,8 +85,8 @@ class DefaultStreamMessageListenerContainer<K, V extends Record<K, ?>> implement
7985
this.template = createRedisTemplate(connectionFactory, containerOptions);
8086
this.containerOptions = containerOptions;
8187

82-
if (containerOptions.getHashMapper() != null) {
83-
this.streamOperations = this.template.opsForStream(containerOptions.getHashMapper());
88+
if (containerOptions.hasHashMapper()) {
89+
this.streamOperations = this.template.opsForStream(containerOptions.getRequiredHashMapper());
8490
} else {
8591
this.streamOperations = this.template.opsForStream();
8692
}
@@ -207,16 +213,39 @@ public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<
207213
return doRegister(getReadTask(streamRequest, listener));
208214
}
209215

210-
@SuppressWarnings("unchecked")
216+
@SuppressWarnings({ "unchecked", "rawtypes" })
211217
private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
212218

213-
BiFunction<K, ReadOffset, List<? extends Record<?, ?>>> readFunction = getReadFunction(streamRequest);
219+
Function<ReadOffset, List<ByteRecord>> readFunction = getReadFunction(streamRequest);
220+
Function<ByteRecord, V> deserializerToUse = getDeserializer();
214221

215-
return new StreamPollTask<>(streamRequest, listener, errorHandler, (BiFunction) readFunction);
222+
TypeDescriptor targetType = TypeDescriptor
223+
.valueOf(containerOptions.hasHashMapper() ? containerOptions.getTargetType() : MapRecord.class);
224+
225+
return new StreamPollTask<>(streamRequest, listener, errorHandler, targetType, readFunction, deserializerToUse);
226+
}
227+
228+
@SuppressWarnings({ "unchecked", "rawtypes" })
229+
private Function<ByteRecord, V> getDeserializer() {
230+
231+
Function<ByteRecord, MapRecord<K, Object, Object>> deserializer = streamOperations::deserializeRecord;
232+
233+
if (containerOptions.getHashMapper() == null) {
234+
return (Function) deserializer;
235+
}
236+
237+
return source -> {
238+
239+
MapRecord<K, Object, Object> intermediate = deserializer.apply(source);
240+
return (V) streamOperations.map(intermediate, this.containerOptions.getTargetType());
241+
};
216242
}
217243

218244
@SuppressWarnings("unchecked")
219-
private BiFunction<K, ReadOffset, List<? extends Record<?, ?>>> getReadFunction(StreamReadRequest<K> streamRequest) {
245+
private Function<ReadOffset, List<ByteRecord>> getReadFunction(StreamReadRequest<K> streamRequest) {
246+
247+
byte[] rawKey = ((RedisSerializer<K>) template.getKeySerializer())
248+
.serialize(streamRequest.getStreamOffset().getKey());
220249

221250
if (streamRequest instanceof StreamMessageListenerContainer.ConsumerStreamReadRequest) {
222251

@@ -226,20 +255,12 @@ private StreamPollTask<K, V> getReadTask(StreamReadRequest<K> streamRequest, Str
226255
: this.readOptions;
227256
Consumer consumer = consumerStreamRequest.getConsumer();
228257

229-
if (this.containerOptions.getHashMapper() != null) {
230-
return (key, offset) -> streamOperations.read(this.containerOptions.getTargetType(), consumer, readOptions,
231-
StreamOffset.create(key, offset));
232-
}
233-
234-
return (key, offset) -> streamOperations.read(consumer, readOptions, StreamOffset.create(key, offset));
235-
}
236-
237-
if (this.containerOptions.getHashMapper() != null) {
238-
return (key, offset) -> streamOperations.read(this.containerOptions.getTargetType(), readOptions,
239-
StreamOffset.create(key, offset));
258+
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
259+
.xReadGroup(consumer, readOptions, StreamOffset.create(rawKey, offset)));
240260
}
241261

242-
return (key, offset) -> streamOperations.read(readOptions, StreamOffset.create(key, offset));
262+
return (offset) -> template.execute((RedisCallback<List<ByteRecord>>) connection -> connection.streamCommands()
263+
.xRead(readOptions, StreamOffset.create(rawKey, offset)));
243264
}
244265

245266
private Subscription doRegister(Task task) {

0 commit comments

Comments
 (0)