Skip to content

Commit 93edc24

Browse files
committed
Add Kafka tests with Testcontainers
1 parent d4d7df7 commit 93edc24

File tree

6 files changed

+119
-2
lines changed

6 files changed

+119
-2
lines changed

pom.xml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,24 @@
5454
<artifactId>spring-kafka-test</artifactId>
5555
<scope>test</scope>
5656
</dependency>
57+
<dependency>
58+
<groupId>org.testcontainers</groupId>
59+
<artifactId>testcontainers</artifactId>
60+
<version>1.17.3</version>
61+
<scope>test</scope>
62+
</dependency>
63+
<dependency>
64+
<groupId>org.testcontainers</groupId>
65+
<artifactId>junit-jupiter</artifactId>
66+
<version>1.17.3</version>
67+
<scope>test</scope>
68+
</dependency>
69+
<dependency>
70+
<groupId>org.testcontainers</groupId>
71+
<artifactId>kafka</artifactId>
72+
<version>1.17.3</version>
73+
<scope>test</scope>
74+
</dependency>
5775
</dependencies>
5876

5977
<build>

src/main/java/com/madadipouya/springkafkatest/kafka/consumer/UserKafkaConsumer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.madadipouya.springkafkatest.kafka.consumer;
22

33
import com.madadipouya.springkafkatest.dto.User;
4+
import com.madadipouya.springkafkatest.service.UserService;
45
import org.slf4j.Logger;
56
import org.slf4j.LoggerFactory;
67
import org.springframework.kafka.annotation.KafkaListener;
@@ -12,7 +13,13 @@
1213
@Component
1314
public class UserKafkaConsumer {
1415

15-
private final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);
16+
private static final Logger logger = LoggerFactory.getLogger(UserKafkaConsumer.class);
17+
18+
private final UserService userService;
19+
20+
public UserKafkaConsumer(UserService userService) {
21+
this.userService = userService;
22+
}
1623

1724
@KafkaListener(topics = "${spring.kafka.topic.name}",
1825
concurrency = "${spring.kafka.consumer.level.concurrency:3}")
@@ -21,6 +28,7 @@ public void logKafkaMessages(@Payload User user,
2128
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
2229
@Header(KafkaHeaders.OFFSET) Long offset) {
2330
logger.info("Received a message contains a user information with id {}, from {} topic, " +
24-
"{} partition, and {} offset", user.getUuid(), topic, partition, offset);
31+
"{} partition, and {} offset", user.getUuid(), topic, partition, offset);
32+
userService.save(user);
2533
}
2634
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.madadipouya.springkafkatest.service;
2+
3+
import com.madadipouya.springkafkatest.dto.User;
4+
5+
public interface UserService {
6+
7+
void save(User user);
8+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package com.madadipouya.springkafkatest.service.impl;
2+
3+
import com.madadipouya.springkafkatest.dto.User;
4+
import com.madadipouya.springkafkatest.service.UserService;
5+
import org.slf4j.Logger;
6+
import org.slf4j.LoggerFactory;
7+
import org.springframework.stereotype.Service;
8+
9+
@Service
10+
public class DefaultUserService implements UserService {
11+
12+
private static final Logger logger = LoggerFactory.getLogger(DefaultUserService.class);
13+
14+
@Override
15+
public void save(User user) {
16+
logger.info("Saving user with id = {}", user.getUuid());
17+
}
18+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package com.madadipouya.springkafkatest.kafka;
2+
3+
import com.madadipouya.springkafkatest.dto.User;
4+
import com.madadipouya.springkafkatest.kafka.consumer.UserKafkaConsumer;
5+
import com.madadipouya.springkafkatest.kafka.producer.UserKafkaProducer;
6+
import com.madadipouya.springkafkatest.service.UserService;
7+
import org.junit.jupiter.api.Test;
8+
import org.mockito.ArgumentCaptor;
9+
import org.springframework.beans.factory.annotation.Autowired;
10+
import org.springframework.boot.test.context.SpringBootTest;
11+
import org.springframework.boot.test.mock.mockito.MockBean;
12+
import org.springframework.test.context.DynamicPropertyRegistry;
13+
import org.springframework.test.context.DynamicPropertySource;
14+
import org.testcontainers.containers.KafkaContainer;
15+
import org.testcontainers.junit.jupiter.Container;
16+
import org.testcontainers.junit.jupiter.Testcontainers;
17+
import org.testcontainers.utility.DockerImageName;
18+
19+
import static org.junit.jupiter.api.Assertions.assertEquals;
20+
import static org.junit.jupiter.api.Assertions.assertNotNull;
21+
import static org.mockito.Mockito.timeout;
22+
import static org.mockito.Mockito.verify;
23+
24+
@Testcontainers
25+
@SpringBootTest
26+
class UserKafkaTestcontainersTest {
27+
28+
@Container
29+
static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest"));
30+
31+
@DynamicPropertySource
32+
static void kafkaProperties(DynamicPropertyRegistry registry) {
33+
registry.add("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers);
34+
}
35+
36+
@Autowired
37+
private UserKafkaProducer userKafkaProducer;
38+
39+
@Autowired
40+
private UserKafkaConsumer userKafkaConsumer;
41+
42+
@MockBean
43+
private UserService userService;
44+
45+
@Test
46+
void testProduceAndConsumeKafkaMessage() {
47+
ArgumentCaptor<User> captor = ArgumentCaptor.forClass(User.class);
48+
User user = new User("11111", "John", "Wick");
49+
50+
userKafkaProducer.writeToKafka(user);
51+
52+
verify(userService, timeout(5000)).save(captor.capture());
53+
assertNotNull(captor.getValue());
54+
assertEquals("11111", captor.getValue().getUuid());
55+
assertEquals("John", captor.getValue().getFirstName());
56+
assertEquals("Wick", captor.getValue().getLastName());
57+
}
58+
}

src/test/java/com/madadipouya/springkafkatest/kafka/consumer/UserKafkaConsumerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import com.madadipouya.springkafkatest.dto.User;
6+
import com.madadipouya.springkafkatest.service.UserService;
67
import org.apache.kafka.clients.producer.Producer;
78
import org.apache.kafka.clients.producer.ProducerRecord;
89
import org.apache.kafka.common.serialization.StringSerializer;
@@ -14,6 +15,7 @@
1415
import org.mockito.Captor;
1516
import org.springframework.beans.factory.annotation.Autowired;
1617
import org.springframework.boot.test.context.SpringBootTest;
18+
import org.springframework.boot.test.mock.mockito.MockBean;
1719
import org.springframework.boot.test.mock.mockito.SpyBean;
1820
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
1921
import org.springframework.kafka.test.EmbeddedKafkaBroker;
@@ -25,6 +27,7 @@
2527

2628
import static org.junit.jupiter.api.Assertions.assertEquals;
2729
import static org.junit.jupiter.api.Assertions.assertNotNull;
30+
import static org.mockito.ArgumentMatchers.any;
2831
import static org.mockito.Mockito.timeout;
2932
import static org.mockito.Mockito.verify;
3033

@@ -46,6 +49,9 @@ class UserKafkaConsumerTest {
4649
@SpyBean
4750
private UserKafkaConsumer userKafkaConsumer;
4851

52+
@MockBean
53+
private UserService userService;
54+
4955
@Captor
5056
ArgumentCaptor<User> userArgumentCaptor;
5157

@@ -85,6 +91,7 @@ void testLogKafkaMessages() throws JsonProcessingException {
8591
assertEquals(TOPIC_NAME, topicArgumentCaptor.getValue());
8692
assertEquals(0, partitionArgumentCaptor.getValue());
8793
assertEquals(0, offsetArgumentCaptor.getValue());
94+
verify(userService).save(any(User.class));
8895
}
8996

9097
@AfterAll

0 commit comments

Comments
 (0)