1. 介绍

本文介绍如何使用kafka消息队列服务

2. 使用

在gradle文件中加入

implementation 'org.springframework.kafka:spring-kafka'

配置kafka的参数 在application.yml文件中加入如下的配置。

spring.kafka.bootstrap-servers: localhost:9092   #kafka
spring.kafka.consumer.group-id: transforGroup        #
#spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

一、 定义发送类 创建Sender类文件

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class Sender {
    @Autowired
    private KafkaTemplate kafkaTemplate;
    public void sendMessage(String message){
        //第一个参数为topic,第二个参数为key,第三个参数为消息对象
        kafkaTemplate.send("test1","kafka key",message);
    }
}

二、 定义接收消费类 创建Receiver类文件

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.consumer.ConsumerRecord;
@Component
public class Receiver {
    @KafkaListener(topics = ["test1"])
    public void processMessage(ConsumerRecord consumerRecord) {
        println "kafka receiver: key="+consumerRecord.key()+"   value="+consumerRecord.value();
    }
}

三、 在应用中注入发送类使用消息队列 在controller或service中注入Sender

    @Autowired
    Sender sender;
    public void send(){
        sender.sendMessage("SOME MESSAGE");
    }

四、使用docker开启kafka服务,避免本地安装

1. 在docker中运行kitematic
2. 在kitematic查询kafka-zookeeper,选择匹配的repository,如mohamnag/kafka-zookeeper,下载image
3. 配置kafka-zookeeper容器的端口映射9092/tcp -》 localhost:9092
4. 运行此容器
5. 运行程序测试Kafka消息队列