【开发篇】二十一、SpringBoot整合Kafka

谁践踏了优雅 2023-10-16 18:15 198阅读 0赞

文章目录

  • 1、整合
  • 2、消息的生产
  • 3、消费
  • 4、补充:安装

Kafka主体不是用来做消息中间件的,但也有这个功能,接下来整合Kafka

1、整合

导入依赖坐标:

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

添加相关配置:

  1. spring:
  2. kafka:
  3. bootstrap-servers: localhost:9092
  4. consumer:
  5. group-id: order # 不可省略,否则监听时报错No group.id found in consumer config

在需要的地方注入操作对象:

  1. @Autowired
  2. private KafkaTemplate<String ,String> kafkaTemplate;
  3. //key和value的泛型自适应

2、消息的生产

继续在Service层演示消息的发送,send方法,传入topic和message

  1. @Service
  2. @Slf4j
  3. public class MessageServiceKafkaImpl implements MessageService {
  4. @Autowired
  5. private KafkaTemplate<String ,String> kafkaTemplate;
  6. @Override
  7. public void sendMessage(String id) {
  8. log.info("使用Kafka将待发送短信的订单纳入处理队列,id:"+id);
  9. kafkaTemplate.send("kafka_topic",id);
  10. }
  11. }

3、消费

直接使用监听器监听队列,不演示手动自己拿。使用@KafkaListener注解,其有一属性topic,就是send时的那个topic。需要注意的是,监听时,message依旧在形参,但类型是ConsumerRecord

  1. @Component
  2. @Slf4j
  3. public class KafkaMessageListener{
  4. @KafkaListener(topics = {
  5. "kafka_topic"})
  6. public void onMessage(ConsumerRecord<?, ?> record) {
  7. log.info("已完成短信发送业务,id:"+record.value());
  8. }
  9. }

打印下ConsumerRecord对象看看:

在这里插入图片描述

注意监听时记得配置spring.kafka.consumer.group-id:

在这里插入图片描述

4、补充:安装

生产环境一般都用docker安装在容器里或者Linux上,这里备份下Windows安装,因为本地开发调式还得用。

  • 下载

    下载地址:https://kafka.apache.org/downloads
    windows 系统下3.0.0版本存在BUG,建议使用2.X版本

在这里插入图片描述

  • 安装:解压缩即安装
  • 启动zookeeper(注册中心),注意这里双重目录,用第二个kafka的bin/windows下的bat文件

    在windows/zookeeper-server-start.bat所在目录cmd,再执行以下指令

    默认端口:2181,zookeeper.properties中自行修改

    zookeeper-server-start.bat ....\config\zookeeper.properties

    报错输入行太长时,移动文件夹位置

    或者将文件夹的版本号重命名去掉

在这里插入图片描述

  • 启动Kafka

    在windows/kafka-server-start.bat所在目录cmd,再执行以下指令

    用第二个kafka的bin/windows下的bat文件

    默认端口:9092,server.properties中自行修改

    kafka-server-start.bat ....\config\server.properties

到此,安装启动成功。可测试下功能,进入对应的bat文件目录执行指令:

  • 创建topic

    kafka-topics.bat —create —zookeeper localhost:2181 —replication-factor 1 —partitions 1 —topic test123 # 分区等参数在kafka学

  • 查看topic

    kafka-topics.bat —zookeeper 127.0.0.1:2181 —list

  • 删除topic

    kafka-topics.bat —delete —zookeeper localhost:2181 —topic test

  • 生产者功能测试

    kafka-console-producer.bat —broker-list localhost:9092 —topic test123

  • 消费者功能测试

    kafka-console-consumer.bat —bootstrap-server localhost:9092 —topic test123 —from-beginning

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,198人围观)

还没有评论,来说两句吧...

相关阅读