Skip to content

Commit 442b5e3

Browse files
committed
Better class naming
1 parent 2af1af3 commit 442b5e3

File tree

4 files changed

+244
-0
lines changed

4 files changed

+244
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.madadipouya.springkafkatest.kafka.consumer;
2+
3+
import com.madadipouya.springkafkatest.dto.User;
4+
import org.slf4j.Logger;
5+
import org.slf4j.LoggerFactory;
6+
import org.springframework.kafka.annotation.KafkaListener;
7+
import org.springframework.kafka.support.KafkaHeaders;
8+
import org.springframework.messaging.handler.annotation.Header;
9+
import org.springframework.messaging.handler.annotation.Payload;
10+
import org.springframework.stereotype.Component;
11+
12+
@Component
13+
public class UserKafkaConsumer {
14+
15+
private final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);
16+
17+
@KafkaListener(topics = "${spring.kafka.topic.name}",
18+
concurrency = "${spring.kafka.consumer.level.concurrency:3}")
19+
public void logKafkaMessages(@Payload User user,
20+
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
21+
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
22+
@Header(KafkaHeaders.OFFSET) Long offset) {
23+
logger.info("Received a message contains a user information with id {}, from {} topic, " +
24+
"{} partition, and {} offset", user.getUuid(), topic, partition, offset);
25+
}
26+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.madadipouya.springkafkatest.kafka.producer;
2+
3+
import com.madadipouya.springkafkatest.dto.User;
4+
import org.apache.kafka.clients.admin.NewTopic;
5+
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.core.annotation.Order;
8+
import org.springframework.kafka.core.KafkaTemplate;
9+
import org.springframework.stereotype.Component;
10+
11+
@Component
12+
public class UserKafkaProducer {
13+
14+
private final KafkaTemplate<String, User> kafkaTemplate;
15+
16+
@Value("${spring.kafka.topic.name}")
17+
private String topic;
18+
19+
@Value("${spring.kafka.replication.factor:1}")
20+
private int replicationFactor;
21+
22+
@Value("${spring.kafka.partition.number:1}")
23+
private int partitionNumber;
24+
25+
public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
26+
this.kafkaTemplate = kafkaTemplate;
27+
}
28+
29+
public void writeToKafka(User user) {
30+
kafkaTemplate.send(topic, user.getUuid(), user);
31+
}
32+
33+
@Bean
34+
@Order(-1)
35+
public NewTopic createNewTopic() {
36+
return new NewTopic(topic, partitionNumber, (short) replicationFactor);
37+
}
38+
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package com.madadipouya.springkafkatest.kafka.consumer;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.madadipouya.springkafkatest.dto.User;
6+
import org.apache.kafka.clients.producer.Producer;
7+
import org.apache.kafka.clients.producer.ProducerRecord;
8+
import org.apache.kafka.common.serialization.StringSerializer;
9+
import org.junit.jupiter.api.BeforeAll;
10+
import org.junit.jupiter.api.Test;
11+
import org.junit.jupiter.api.TestInstance;
12+
import org.mockito.ArgumentCaptor;
13+
import org.mockito.Captor;
14+
import org.springframework.beans.factory.annotation.Autowired;
15+
import org.springframework.boot.test.context.SpringBootTest;
16+
import org.springframework.boot.test.mock.mockito.SpyBean;
17+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
18+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
19+
import org.springframework.kafka.test.context.EmbeddedKafka;
20+
import org.springframework.kafka.test.utils.KafkaTestUtils;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
25+
import static org.junit.jupiter.api.Assertions.assertEquals;
26+
import static org.junit.jupiter.api.Assertions.assertNotNull;
27+
import static org.mockito.Mockito.timeout;
28+
import static org.mockito.Mockito.verify;
29+
30+
@SpringBootTest
31+
@EmbeddedKafka(ports = 9092)
32+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
33+
class UserKafkaConsumerTest {
34+
35+
private final String TOPIC_NAME = "com.madadipouya.kafka.user";
36+
37+
private Producer<String, String> producer;
38+
39+
@Autowired
40+
private EmbeddedKafkaBroker embeddedKafkaBroker;
41+
42+
@Autowired
43+
private ObjectMapper objectMapper;
44+
45+
@SpyBean
46+
private UserKafkaConsumer userKafkaConsumer;
47+
48+
@Captor
49+
ArgumentCaptor<User> userArgumentCaptor;
50+
51+
@Captor
52+
ArgumentCaptor<String> topicArgumentCaptor;
53+
54+
@Captor
55+
ArgumentCaptor<Integer> partitionArgumentCaptor;
56+
57+
@Captor
58+
ArgumentCaptor<Long> offsetArgumentCaptor;
59+
60+
@BeforeAll
61+
void setUp() {
62+
Map<String, Object> configs = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
63+
producer = new DefaultKafkaProducerFactory<>(configs, new StringSerializer(), new StringSerializer()).createProducer();
64+
}
65+
66+
@Test
67+
void testLogKafkaMessages() throws JsonProcessingException {
68+
// Write a message (John Wick user) to Kafka using a test producer
69+
String uuid = "11111";
70+
String message = objectMapper.writeValueAsString(new User(uuid, "John", "Wick"));
71+
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, uuid, message));
72+
producer.flush();
73+
74+
// Read the message and assert its properties
75+
verify(userKafkaConsumer, timeout(5000).times(1))
76+
.logKafkaMessages(userArgumentCaptor.capture(), topicArgumentCaptor.capture(),
77+
partitionArgumentCaptor.capture(), offsetArgumentCaptor.capture());
78+
79+
User user = userArgumentCaptor.getValue();
80+
assertNotNull(user);
81+
assertEquals("11111", user.getUuid());
82+
assertEquals("John", user.getFirstName());
83+
assertEquals("Wick", user.getLastName());
84+
assertEquals(TOPIC_NAME, topicArgumentCaptor.getValue());
85+
assertEquals(0, partitionArgumentCaptor.getValue());
86+
assertEquals(0, offsetArgumentCaptor.getValue());
87+
}
88+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package com.madadipouya.springkafkatest.kafka.producer;
2+
3+
import com.fasterxml.jackson.core.JsonProcessingException;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import com.madadipouya.springkafkatest.dto.User;
6+
import org.apache.kafka.clients.consumer.ConsumerConfig;
7+
import org.apache.kafka.clients.consumer.ConsumerRecord;
8+
import org.apache.kafka.common.serialization.StringDeserializer;
9+
import org.junit.jupiter.api.AfterAll;
10+
import org.junit.jupiter.api.BeforeAll;
11+
import org.junit.jupiter.api.Test;
12+
import org.junit.jupiter.api.TestInstance;
13+
import org.springframework.beans.factory.annotation.Autowired;
14+
import org.springframework.boot.test.context.SpringBootTest;
15+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
16+
import org.springframework.kafka.listener.ContainerProperties;
17+
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
18+
import org.springframework.kafka.listener.MessageListener;
19+
import org.springframework.kafka.test.EmbeddedKafkaBroker;
20+
import org.springframework.kafka.test.context.EmbeddedKafka;
21+
import org.springframework.kafka.test.utils.ContainerTestUtils;
22+
23+
import java.util.Map;
24+
import java.util.concurrent.BlockingQueue;
25+
import java.util.concurrent.LinkedBlockingQueue;
26+
import java.util.concurrent.TimeUnit;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertNotNull;
30+
31+
@SpringBootTest
32+
@EmbeddedKafka(ports = 9092)
33+
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
34+
class UserKafkaProducerTest {
35+
36+
private BlockingQueue<ConsumerRecord<String, String>> records;
37+
38+
private KafkaMessageListenerContainer<String, String> container;
39+
40+
@Autowired
41+
private EmbeddedKafkaBroker embeddedKafkaBroker;
42+
43+
@Autowired
44+
private UserKafkaProducer producer;
45+
46+
@Autowired
47+
private ObjectMapper objectMapper;
48+
49+
@BeforeAll
50+
void setUp() {
51+
DefaultKafkaConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
52+
ContainerProperties containerProperties = new ContainerProperties("com.madadipouya.kafka.user");
53+
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
54+
records = new LinkedBlockingQueue<>();
55+
container.setupMessageListener((MessageListener<String, String>) records::add);
56+
container.start();
57+
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
58+
}
59+
60+
@Test
61+
void testWriteToKafka() throws InterruptedException, JsonProcessingException {
62+
// Create a user and write to Kafka
63+
User user = new User("11111", "John", "Wick");
64+
producer.writeToKafka(user);
65+
66+
// Read the message (John Wick user) with a test consumer from Kafka and assert its properties
67+
ConsumerRecord<String, String> message = records.poll(500, TimeUnit.MILLISECONDS);
68+
assertNotNull(message);
69+
assertEquals("11111", message.key());
70+
User result = objectMapper.readValue(message.value(), User.class);
71+
assertNotNull(result);
72+
assertEquals("John", result.getFirstName());
73+
assertEquals("Wick", result.getLastName());
74+
}
75+
76+
private Map<String, Object> getConsumerProperties() {
77+
return Map.of(
78+
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString(),
79+
ConsumerConfig.GROUP_ID_CONFIG, "consumer",
80+
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true",
81+
ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10",
82+
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000",
83+
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
84+
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class,
85+
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
86+
}
87+
88+
@AfterAll
89+
void tearDown() {
90+
container.stop();
91+
}
92+
}

0 commit comments

Comments
 (0)