Spring Boot 2.1.x集成Kafka,其他版本也是类似方法。
确定Kafka版本
在Spring Boot中添加Kafka Client依赖。
1 2 3 4
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
|
进入此依赖,查看kafka-clients版本号。
1 2 3 4 5 6
| <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.1</version> <scope>compile</scope> </dependency>
|
所以在安装Kafka的时候,选择2.0.1版本就可以了。Scala版本选择最高版本就行。
安装Kafka集群
zookeeper单机,Kafka在同一台机器上安装两个实例。
在config/server.properties
里面修改如下属性。
broker.id=0
Broker的id号,第一个实例为0,第二个实例为1。
listeners=PLAINTEXT://:9092
第一个实例用默认端口9092,第二个实例用9093。
advertised.listeners=PLAINTEXT://{host-ip}:9092
第一个实例用默认端口9092,第二个实例用9093。
如果不设置上述暴露给生产者和消费者的地址和端口,可能会出现如下错误:
1
| java.io.IOException: Can't resolve address
|
网上有说到解决这个问题的办法是在hosts里面配置主机的ip地址,个人觉得这个办法不是最佳实践。
log.dirs=/tmp/kafka-logs
单机时,这个文件夹得用不同的。
启动zookeeper和kafka我就不在这里讲了。第二个实例启动后可以看到如下信息。
1
| INFO [KafkaServer id=1] started (kafka.server.KafkaServer)
|
Spring Boot集成
在application.yml里面需要配置如下:
1 2 3 4 5
| spring: kafka: bootstrap-servers: - {host-ip}:9092 - {host-ip}:9093
|
我们再看看默认配置的值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| [16:38:24.889] INFO org.apache.kafka.common.config.AbstractConfig 279 logAll - ProducerConfig values: acks = 1 batch.size = 16384 bootstrap.servers = [{host-ip}:9092, {host-ip}:9093:9093] buffer.memory = 33554432 client.id = compression.type = none connections.max.idle.ms = 540000 enable.idempotence = false interceptor.classes = [] key.serializer = class org.apache.kafka.common.serialization.StringSerializer linger.ms = 0 max.block.ms = 60000 max.in.flight.requests.per.connection = 5 max.request.size = 1048576 metadata.max.age.ms = 300000 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner receive.buffer.bytes = 32768 reconnect.backoff.max.ms = 1000 reconnect.backoff.ms = 50 request.timeout.ms = 30000 retries = 0 retry.backoff.ms = 100 sasl.client.callback.handler.class = null sasl.jaas.config = null sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.login.callback.handler.class = null sasl.login.class = null sasl.login.refresh.buffer.seconds = 300 sasl.login.refresh.min.period.seconds = 60 sasl.login.refresh.window.factor = 0.8 sasl.login.refresh.window.jitter = 0.05 sasl.mechanism = GSSAPI security.protocol = PLAINTEXT send.buffer.bytes = 131072 ssl.cipher.suites = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = https ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS transaction.timeout.ms = 60000 transactional.id = null value.serializer = class org.apache.kafka.common.serialization.StringSerializer
|
Java代码
1 2 3 4 5 6 7 8 9 10 11
| @RestController public class KafkaProducer {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
@GetMapping("/kafka/{message}") public void sendMessage(@PathVariable String message) { kafkaTemplate.send("test-topic", message); } }
|
通过console-consumer就可以消费消息了。