spring boot整合Kafka

梦里梦外; 2024-04-03 13:07 140阅读 0赞

文章目录

      • Kafka安装
          • 安装
      • springboot整合Kafka
        • 小结:

Kafka安装

安装

windows版安装包下载地址:https://kafka.apache.org/downloads

下载完毕后得到tgz压缩文件,使用解压缩软件解压缩即可使用,解压后得到如下文件

image-20221016191055340

启动服务器

kafka服务器的功能相当于RocketMQ中的broker,kafka运行还需要一个类似于命名服务器的服务。在kafka安装目录中自带一个类似于命名服务器的工具,叫做zookeeper,它的作用是注册中心,相关知识请到对应课程中学习。

  1. zookeeper-server-start.bat ..\..\config\zookeeper.properties //启动zookeeper
  2. kafka-server-start.bat ..\..\config\server.properties //启动kafka

1.启动zookeeper

运行bin目录下的windows目录下的zookeeper-server-start命令即可启动注册中心,默认对外服务端口2181

image-20221016191012305

2.启动kafka

运行bin目录下的windows目录下的kafka-server-start命令即可启动kafka服务器,默认对外服务端口9092

image-20221016191446374

创建主题

和之前操作其他MQ产品相似,kakfa也是基于主题操作,操作之前需要先初始化topic

  1. # 创建topic
  2. kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic qil0820
  3. # 查询topic
  4. kafka-topics.bat --zookeeper 127.0.0.1:2181 --list
  5. # 删除topic
  6. kafka-topics.bat --delete --zookeeper localhost:2181 --topic qil0820

创建topic以及查看topic

image-20221016193132449

测试服务器启动状态

Kafka提供有一套测试服务器功能的测试程序,运行bin目录下的windows目录下的命令即可使用。

  1. kafka-console-producer.bat --broker-list localhost:9092 --topic qil0820 # 测试生产消息
  2. kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic qil0820 --from-beginning # 测试消息消费

image-20221016193447492

springboot整合Kafka

步骤①:导入springboot整合Kafka的starter,此坐标由springboot维护版本

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

步骤②:配置Kafka的服务器地址

  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. consumer:
  5. group-id: Qgchun// 设置默认的生产者消费者所属组id

步骤③:使用KafkaTemplate操作Kafka

  1. @Service
  2. public class MessageServiceKafkaImpl implements MessageService {
  3. @Resource
  4. private KafkaTemplate<String,String> kafkaTemplate;
  5. @Override
  6. public void sendMessage(String id) {
  7. System.out.println("待发送短信的订单已纳入处理队列(kafka),id:"+id);
  8. kafkaTemplate.send("qil0820",id);//使用send方法发送消息,需要传入topic名称
  9. }
  10. @Override
  11. public String doMessage() {
  12. return null;
  13. }
  14. }

步骤④:使用消息监听器在服务器启动后,监听指定位置,当消息出现后,立即消费消息

  1. @Component
  2. public class MessageListener {
  3. @KafkaListener(topics = "qil0820")
  4. public void onMessage(ConsumerRecord<String,String> record){
  5. System.out.println("已完成短信发送业务(kafka),id:"+record.value());
  6. }
  7. }

使用注解@KafkaListener定义当前方法监听Kafka中指定topic的消息,接收到的消息封装在对象ConsumerRecord中,获取数据从ConsumerRecord对象中获取即可。

验证成功:

image-20221016195013760

小结:

1.springboot整合Kafka使用KafkaTemplate对象作为客户端操作消息队列
2.操作Kafka需要配置Kafka服务器地址,默认端口9092
3.企业开发时通常使用监听器来处理消息队列中的消息,设置监听器使用注解@KafkaListener。接收消息保存在形参ConsumerRecord对象中

发表评论

表情:
评论列表 (有 0 条评论,140人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi