Skip to content

Stream Reciever is not type casting the object as expected #3102

@sunilbm

Description

@sunilbm

I have a custom DTO User:

{
   "event_ts" : 1658940994914,
   "event": "session.started",
    "name" : "",
    "email" : "",
    "address" : {
       "state":"",
       "city": ""
}
}

Redis template is built in the following manner :


       Jackson2JsonRedisSerializer<User> serializer = new Jackson2JsonRedisSerializer<>(
            CustomObjectMapper, User.class);
        final RedisSerializationContextBuilder<String, User> builder = RedisSerializationContext
            .newSerializationContext(new StringRedisSerializer());

        RedisSerializationContext<String, User> serializationContext = builder
                .value(serializer)
                .hashValue(serializer)
                .build();

Data to redis stream is sent with the help of this code

    public Mono<RecordId> sendWithRetry(final T record, final String streamName) {
        final ObjectRecord<String, T> payload = StreamRecords.newRecord()
                .ofObject(record)
                .withStreamKey(streamName);

        return this.redisTemplate.opsForStream(new Jackson2HashMapper(JsonUtils.allocateDefaultObjectMapper(), true))
            .add(payload);
    }

At the receiver end we have configured a Stream Receiver as shown below :

StreamReceiver.StreamReceiverOptions
            .builder()
            .batchSize(100)
            .pollTimeout(Duration.ofSeconds(0))
            .objectMapper(new Jackson2HashMapper(JsonUtils.allocateDefaultObjectMapper(), true))
            .targetType(User.class)
            .build();

And the receiver code is written as follow :


final Consumer consumer = Consumer.from(consumerGroupName(), consumerName());
        final StreamOffset<String> streamOffset = StreamOffset.create(streamName(), ReadOffset.lastConsumed());

        final Flux<ObjectRecord<String, User>> streamListener = this.streamReceiver.receive(consumer, streamOffset);

        streamListener
            .repeatWhen(longFlux -> Flux.interval(Duration.ofMillis(100)))
            .doOnNext(record -> {
                getLog().debug("Received message from the stream [{}] with ID [{}} and message [{}]",
                    record.getStream(), record.getId(), record.getValue());
                    return;
            })
           .subscribe();

I have two issues to report :

  1. Even though I have defined the target type in stream receiver, the record received is in the form of linked hash map.
  2. If I enable the debugger , the values in the map are wrapped with the additional double quotes. For example : if my data has an attribute "event": "session.started", the read value looks like "event": ""session.started"". I thought of converting the Map returned to an object explicitly with the help of an object mapper but the double quotes is causing trouble.

I have attached the screen shot of how the data is returned.

Image

can someone help me with this. I have tested this on 3.4.2 and 3.4.0 and is reproducible in both the version

Metadata

Metadata

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions