1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| package gy.finolo.config.kafka;
import org.apache.kafka.clients.producer.ProducerConfig; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap; import java.util.Map;
@Configuration public class KafkaConfig {
@Bean @ConfigurationProperties(prefix = "spring.kafka.cluster1") public KafkaProperties cluster1KafkaProperties() { return new KafkaProperties(); }
@Bean @ConfigurationProperties(prefix = "spring.kafka.cluster2") public KafkaProperties cluster2KafkaProperties() { return new KafkaProperties(); }
@Bean("cluster1KafkaTemplate") public KafkaTemplate<String, String> cluster1KafkaTemplate() { return new KafkaTemplate<>(cluster1ProducerFactory()); }
@Bean("cluster2KafkaTemplate") public KafkaTemplate<String, String> cluster2KafkaTemplate() { return new KafkaTemplate<>(cluster2ProducerFactory()); }
@Bean("cluster1ProducerFactory") public ProducerFactory<String, String> cluster1ProducerFactory() { Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster1KafkaProperties().getBootstrapServers()); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, cluster1KafkaProperties().getProducer().getKeySerializer()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, cluster1KafkaProperties().getProducer().getValueSerializer());
configProps.put(ProducerConfig.ACKS_CONFIG, cluster1KafkaProperties().getProducer().getAcks()); configProps.put(ProducerConfig.RETRIES_CONFIG, cluster1KafkaProperties().getProducer().getRetries());
return new DefaultKafkaProducerFactory<>(configProps); }
@Bean("cluster2ProducerFactory") public ProducerFactory<String, String> cluster2ProducerFactory() { Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster2KafkaProperties().getBootstrapServers()); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, cluster2KafkaProperties().getProducer().getKeySerializer()); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, cluster2KafkaProperties().getProducer().getValueSerializer());
configProps.put(ProducerConfig.ACKS_CONFIG, cluster2KafkaProperties().getProducer().getAcks()); configProps.put(ProducerConfig.RETRIES_CONFIG, cluster2KafkaProperties().getProducer().getRetries());
return new DefaultKafkaProducerFactory<>(configProps); }
}
|