Skip to content

Commit 2af1af3

Browse files
committed
Create a new topic with custom topic and replication factor
1 parent 7bbc570 commit 2af1af3

File tree

2 files changed

+17
-0
lines changed

2 files changed

+17
-0
lines changed

src/main/java/com/madadipouya/springkafkatest/producer/UserKafkaProducer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package com.madadipouya.springkafkatest.producer;
22

33
import com.madadipouya.springkafkatest.dto.User;
4+
import org.apache.kafka.clients.admin.NewTopic;
45
import org.springframework.beans.factory.annotation.Value;
6+
import org.springframework.context.annotation.Bean;
7+
import org.springframework.core.annotation.Order;
58
import org.springframework.kafka.core.KafkaTemplate;
69
import org.springframework.stereotype.Component;
710

@@ -13,11 +16,23 @@ public class UserKafkaProducer {
1316
@Value("${spring.kafka.topic.name}")
1417
private String topic;
1518

19+
@Value("${spring.kafka.replication.factor:1}")
20+
private int replicationFactor;
21+
22+
@Value("${spring.kafka.partition.number:1}")
23+
private int partitionNumber;
24+
1625
public UserKafkaProducer(KafkaTemplate<String, User> kafkaTemplate) {
1726
this.kafkaTemplate = kafkaTemplate;
1827
}
1928

2029
public void writeToKafka(User user) {
2130
kafkaTemplate.send(topic, user.getUuid(), user);
2231
}
32+
33+
@Bean
34+
@Order(-1)
35+
public NewTopic createNewTopic() {
36+
return new NewTopic(topic, partitionNumber, (short) replicationFactor);
37+
}
2338
}

src/main/resources/application.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
spring.kafka.bootstrap-servers=localhost:9092
22
spring.kafka.topic.name=com.madadipouya.kafka.user
3+
spring.kafka.replication.factor=3
4+
spring.kafka.partition.number=2
35
spring.kafka.consumer.bootstrap-servers=localhost:9092
46
spring.kafka.consumer.group-id=kafka-user-listener
57
spring.kafka.consumer.auto-offset-reset=earliest

0 commit comments

Comments
 (0)