Springboot项目配置多Kafka集群

当一个项目中出现多于一个 Kafka 集群时,这时需要让 Springboot 自动装配失效。

首页在 pom.xml 里面添加依赖

1
2
3
4
5
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.1.1</version>
</dependency>

在项目启动类上加上如下注解:

1
2
3
@SpringBootApplication(exclude = {
KafkaAutoConfiguration.class
})

在 yaml 文件中配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
spring:
# kafka配置
kafka:
# kafka集群1
cluster1:
bootstrap-servers: server1:9092,server2:9092,server3:9092
producer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 3
acks: 1

# kafka集群2
cluster2:
bootstrap-servers: server4:9092,server5:9092,server6:9092
producer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
retries: 3
acks: 1
consumer:
group-id: consumer_group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest

创建 Kafka 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package gy.finolo.config.kafka;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaConfig {

@Bean
@ConfigurationProperties(prefix = "spring.kafka.cluster1")
public KafkaProperties cluster1KafkaProperties() {
return new KafkaProperties();
}

@Bean
@ConfigurationProperties(prefix = "spring.kafka.cluster2")
public KafkaProperties cluster2KafkaProperties() {
return new KafkaProperties();
}

@Bean("cluster1KafkaTemplate")
public KafkaTemplate<String, String> cluster1KafkaTemplate() {
return new KafkaTemplate<>(cluster1ProducerFactory());
}

@Bean("cluster2KafkaTemplate")
public KafkaTemplate<String, String> cluster2KafkaTemplate() {
return new KafkaTemplate<>(cluster2ProducerFactory());
}

@Bean("cluster1ProducerFactory")
public ProducerFactory<String, String> cluster1ProducerFactory() {
// 直接使用KafkaProperties,更简洁
Map<String, Object> configProps = new HashMap<>();

// 使用KafkaProperties的getter方法
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster1KafkaProperties().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
cluster1KafkaProperties().getProducer().getKeySerializer());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
cluster1KafkaProperties().getProducer().getValueSerializer());

// 添加其他必要的配置
configProps.put(ProducerConfig.ACKS_CONFIG,
cluster1KafkaProperties().getProducer().getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG,
cluster1KafkaProperties().getProducer().getRetries());

return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean("cluster2ProducerFactory")
public ProducerFactory<String, String> cluster2ProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 使用KafkaProperties的getter方法
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
cluster2KafkaProperties().getBootstrapServers());
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
cluster2KafkaProperties().getProducer().getKeySerializer());
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
cluster2KafkaProperties().getProducer().getValueSerializer());

// 添加其他必要的配置
configProps.put(ProducerConfig.ACKS_CONFIG,
cluster2KafkaProperties().getProducer().getAcks());
configProps.put(ProducerConfig.RETRIES_CONFIG,
cluster2KafkaProperties().getProducer().getRetries());

return new DefaultKafkaProducerFactory<>(configProps);
}

}

在服务类里面,注入 KafkaTemplate,然后调用 send 方法就是了

1
2
3
4
5
6
7
8
9
10
11
12
13
@Slf4j
@Service
public class KafkaMessageService {

@Autowired
@Qualifier("cluster1KafkaTemplate")
private KafkaTemplate<String, String> kafkaTemplate;

public CompletableFuture<SendResult<String, String>> sendMessage(String message) {
String topic = "kafka_topic";
return kafkaTemplate.send(topic, message);
}
}