1. 介绍
本文介绍如何使用kafka消息队列服务
2. 使用
在gradle文件中加入
implementation 'org.springframework.kafka:spring-kafka'
配置kafka的参数 在application.yml文件中加入如下的配置。
spring.kafka.bootstrap-servers: localhost:9092 #指明kafka节点或集群的位置
spring.kafka.consumer.group-id: transforGroup #指明分组名
#spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer
一、 定义发送类 创建Sender类文件
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String message){
//第一个参数为topic,第二个参数为key,第三个参数为消息对象
kafkaTemplate.send("test1","kafka key",message);
}
}
二、 定义接收消费类 创建Receiver类文件
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@Component
public class Receiver {
@KafkaListener(topics = ["test1"])
public void processMessage(ConsumerRecord consumerRecord) {
println "kafka receiver: key="+consumerRecord.key()+" value="+consumerRecord.value();
}
}
三、 在应用中注入发送类使用消息队列 在controller或service中注入Sender
@Autowired
Sender sender;
public void send(){
sender.sendMessage("SOME MESSAGE");
}
四、使用docker开启kafka服务,避免本地安装
1. 在docker中运行kitematic
2. 在kitematic查询kafka-zookeeper,选择匹配的repository,如mohamnag/kafka-zookeeper,下载image
3. 配置kafka-zookeeper容器的端口映射9092/tcp -》 localhost:9092
4. 运行此容器
5. 运行程序测试Kafka消息队列