SpringBoot整合Kafka

阳光穿透心脏的1/2处 2022-02-16 01:48 430阅读 0赞

前言

kafka 也作为消息中间件的一员,与其他消息中间件相比,它的优点在于拥有极高的吞吐量,ms 级的延迟,是一个高性能,分布式的系统。

源码

GitHub地址:https://github.com/intomylife/SpringBoot

环境

  • JDK 1.8.0 +
  • Maven 3.0 +
  • SpringBoot 2.0.3
  • ZooKeeper-3.4.5
  • kafka_2.12-2.2.0

开发工具

  • IntelliJ IDEA

正文

commons 工程 - POM 文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <!-- 三坐标 -->
  6. <groupId>com.zwc</groupId>
  7. <artifactId>springboot-kafka-commons</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. <!-- 工程名称和描述 -->
  10. <name>springboot-kafka-commons</name>
  11. <description>公用工程</description>
  12. <!-- 打包方式 -->
  13. <packaging>jar</packaging>
  14. <!-- 在properties下声明相应的版本信息,然后在dependency下引用的时候用${spring-version}就可以引入该版本jar包了 -->
  15. <properties>
  16. <!-- 编码 -->
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. <!-- jdk -->
  19. <java.version>1.8</java.version>
  20. <!-- springboot -->
  21. <platform-bom.version>Cairo-SR3</platform-bom.version>
  22. <!-- ali json -->
  23. <fastjson.version>1.2.47</fastjson.version>
  24. <jackson.mapper.asl.version>1.9.9</jackson.mapper.asl.version>
  25. </properties>
  26. <!-- 加入依赖 -->
  27. <dependencies>
  28. <!-- ali json依赖 -->
  29. <dependency>
  30. <groupId>com.alibaba</groupId>
  31. <artifactId>fastjson</artifactId>
  32. <version>${fastjson.version}</version>
  33. </dependency>
  34. <dependency>
  35. <groupId>org.codehaus.jackson</groupId>
  36. <artifactId>jackson-mapper-asl</artifactId>
  37. <version>${jackson.mapper.asl.version}</version>
  38. </dependency>
  39. <!-- kafka 依赖 -->
  40. <dependency>
  41. <groupId>org.springframework.kafka</groupId>
  42. <artifactId>spring-kafka</artifactId>
  43. </dependency>
  44. </dependencies>
  45. <!-- 依赖 jar 包版本管理的管理器 -->
  46. <!-- 如果 dependencies 里的 dependency 自己没有声明 version 元素,那么 maven 就此处来找版本声明。 -->
  47. <!-- 如果有,就会继承它;如果没有就会报错,告诉你没有版本信息 -->
  48. <!-- 优先级:如果 dependencies 里的 dependency 已经声明了版本信息,就不会生效此处的版本信息了 -->
  49. <dependencyManagement>
  50. <dependencies>
  51. <!-- SpringBoot -->
  52. <dependency>
  53. <groupId>io.spring.platform</groupId>
  54. <artifactId>platform-bom</artifactId>
  55. <version>${platform-bom.version}</version>
  56. <type>pom</type>
  57. <scope>import</scope>
  58. </dependency>
  59. </dependencies>
  60. </dependencyManagement>
  61. <!-- 插件依赖 -->
  62. <build>
  63. <plugins>
  64. <plugin>
  65. <groupId>org.springframework.boot</groupId>
  66. <artifactId>spring-boot-maven-plugin</artifactId>
  67. </plugin>
  68. </plugins>
  69. </build>
  70. </project>
  • 配置一些共用依赖,其中包括 spring-kafka 依赖来整合 Kafka

commons 工程 - system.properties

  1. # kafka 配置
  2. ## kafka 服务地址
  3. spring.kafka.bootstrap-servers=127.0.0.1:9092
  4. ## producer 提供者
  5. ### 如果该值大于零时,表示启用重试失败的发送次数
  6. spring.kafka.producer.retries=0
  7. ### 每次批量发送消息的数量
  8. spring.kafka.producer.batch-size=16384
  9. spring.kafka.producer.buffer-memory=33554432
  10. ### 指定消息key和消息体的编解码方式
  11. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  12. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  13. ## consumer 消费者
  14. ### 指定默认消费者group id
  15. spring.kafka.consumer.group-id=springboot-consumer-group
  16. ### 当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量,可选的值为latest, earliest, none
  17. spring.kafka.consumer.auto-offset-reset=earliest
  18. ### 如果为true,则消费者的偏移量将在后台定期提交,默认值为true
  19. spring.kafka.consumer.enable-auto-commit=false
  20. ### 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000
  21. spring.kafka.consumer.auto-commit-interval=100
  22. ### 指定消息key和消息体的编解码方式
  23. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  24. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 一些共用配置,不经常修改的,或者是可以统一修改的
  • 比如还可以配置 OSS 的配置信息,Redis 的配置信息,MongoDB 的配置信息等等..

commons 工程 - 项目结构

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxNDAyMjAw_size_16_color_FFFFFF_t_70

service 工程

service 工程是一个父工程,里面包含 基础模块,用户模块,每个模块中又会分为 core 和 api

此工程中 base-service 作为 Provider(提供者),user-service 作为 Consumer(消费者)

Provider(提供者)

service 工程 - base-service - base-service-core

  1. package com.zwc.base.kafka;
  2. import com.zwc.core.constants.KafkaConstants;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.stereotype.Service;
  6. /**
  7. * @ClassName kafkaProducer
  8. * @Desc TODO Kafka 提供者
  9. * @Date 2019/4/20 19:41
  10. * @Version 1.0
  11. */
  12. @Service
  13. public class KafkaProducer {
  14. @Autowired
  15. private KafkaTemplate kafkaTemplate;
  16. /*
  17. * @ClassName kafkaProducer
  18. * @Desc TODO 发送订阅者消息
  19. * @Date 2019/4/20 19:46
  20. * @Version 1.0
  21. */
  22. public void sendTopicMessage(){
  23. kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC_NAME,"EN: From springboot-kafka! I'm topic. CN: springboot 整合 kafka 发送订阅者消息。");
  24. }
  25. }
  • 直接使用 @Autowired 注解把 KafkaTemplate 装配进来
  • 使用 kafkaTemplate 对象的 send() 方法发送消息
  • 记得使用 @Service 注解把此 Bean 交给 spring 管理

service 工程 - base-service - 启动项目

注:项目启动前需要启动 zkServer 和 kafka(在文末处会单独说一下 windows 环境下安装启动 kafka 主要注意的

事项)

  1. 端口:8081(具体可以根据自己的喜好,在 application.properties 配置文件中配置 server.port)
  2. 发送消息接口:http://localhost:8081/kafka/send(调用成功会在页面显示 success)

Consumer(消费者)

service 工程 - user-service - user-service-core

消费者一号

  1. package com.zwc.user.kafka;
  2. import com.zwc.core.constants.KafkaConstants;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Service;
  5. /**
  6. * @ClassName KafkaConsumer
  7. * @Desc TODO Kafka 消费者 - 一号
  8. * @Date 2019/4/20 19:47
  9. * @Version 1.0
  10. */
  11. @Service
  12. public class KafkaConsumerOne {
  13. /*
  14. * @ClassName KafkaConsumer
  15. * @Desc TODO 接收订阅者消息
  16. * @Date 2019/4/20 19:50
  17. * @Version 1.0
  18. */
  19. @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC_NAME , groupId = KafkaConstants.KAFKA_GROUP_ID_ONE)
  20. public void receiveTopicMessage(String message){
  21. System.out.println("KafkaConsumerOne ---> receiveTopicMessage:接收订阅者模式发送的消息,内容为:" + message);
  22. }
  23. }
  • 使用 @KafkaListener 注解开始监听消息任务
  • topics 参数指定消息主题
  • groupId 参数指定组
  • 记得使用 @Service 注解把此 Bean 交给 spring 管理

    消费者二号

    package com.zwc.user.kafka;

    import com.zwc.core.constants.KafkaConstants;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.stereotype.Service;

    /**

    • @ClassName KafkaConsumer
    • @Desc TODO Kafka 消费者 - 二号
    • @Date 2019/4/20 19:47
    • @Version 1.0
      */
      @Service
      public class KafkaConsumerTwo {

      /*

      • @ClassName KafkaConsumer
      • @Desc TODO 接收订阅者消息
      • @Date 2019/4/20 19:50
      • @Version 1.0
        */
        @KafkaListener(topics = KafkaConstants.KAFKA_TOPIC_NAME , groupId = KafkaConstants.KAFKA_GROUP_ID_ONE)
        public void receiveTopicMessage(String message){
        System.out.println(“KafkaConsumerTwo —-> receiveTopicMessage:接收订阅者模式发送的消息,内容为:” + message);
        }

    }

  • 除了输出消息的类名称不一样以外其他代码与消费者一号基本一样

service 工程 - user-service - 启动项目

注:项目启动前需要启动 zkServer 和 kafka(在文末处会单独说一下 windows 环境下安装启动 kafka 主要注意的

事项)

  1. 端口:8082(具体可以根据自己的喜好,在 application.properties 配置文件中配置 server.port)

  2. 启动后观察 idea 的控制台,会有一条字符串打印出来

20190421151804321.png

一共有两个消费者,为何只有一条打印的信息?

因为在同一个组中的 Consumer,同一个主题只会被一个 Consumer 接收。类似于列队模式。

  1. 停掉 user-service 项目,把消费者二号的 groupId 修改为 KafkaConstants.KAFKA_GROUP_ID_TWO

  2. 再次启动 user-service 项目,观察 idea 的控制台,会有一条字符串打印出来

20190421152505343.png

消费者二号改变了组,没有改变消息主题,因此读取到了此主题的历史消息。

  1. 再次调用 base-service 项目的发送消息接口:http://localhost:8081/kafka/send(调用成功会在页面显示

    success)

  2. 观察 idea 的控制台,会有两条字符串打印出来

20190421152858403.png

两个消费者不处于一个组,可以同时接收到相同主题的消息。类似于发布者订阅者模式。

service 工程 - 项目结构

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxNDAyMjAw_size_16_color_FFFFFF_t_70 1

  • 在 service 总工程中创建了 base-service (基础模块)和 user-service(用户模块)
  • 每一个模块中都包含 api 和 core

把多工程项目使用 IntelliJ IDEA 打开

  1. 把项目从 GitHub 中下载到你的本地
  2. 打开 IntelliJ IDEA
  3. 点击 File -> Open
  4. 打开你下载到本地的项目目录
  5. springboot-kafka -> springboot-kafka-service(选择打开此工程)
  6. 打开 service 工程后
  7. 再次点击 File -> Project Structrue
  8. 选择 Modules,点击 ‘+’ 符号
  9. 点击 Import Module
  10. 还是打开你下载到本地的项目目录
  11. springboot-kafka -> springboot-kafka-commons -> pom.xml
  12. 点击 OK
  13. 点击 Next,Finish
  14. 点击 Apply,OK

扩展

windows 本地安装和启动 kafka

就在 windows 本地启动 kafka 来回折腾了我一个小时左右。目前 Kafka 已经更新到了 2.2.0 版本,2.2.0 和 2.1.1

版本有镜像,下载速度还是很快的。

1. 下载

a) Kafka 官网下载地址:http://kafka.apache.org/downloads

b) 看描述说最新的版本为 2.2.0,目前的稳定版本也是2.2.0,就直接下载它了

c) 下载后,准备使用 kafka-server-start.bat ..\..\config\server.properties 指令启动 Kafka,却一直报错

20190421155037133.png

d) 后来发现下载错了压缩包,我点击下载的是

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxNDAyMjAw_size_16_color_FFFFFF_t_70 2

e) 正确的下载入口是

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxNDAyMjAw_size_16_color_FFFFFF_t_70 3

f) 点击 kafka_2.12-2.2.0.tgz 后跳转到一个新的页面,点击最上面的建议下载地址就可以了

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzQxNDAyMjAw_size_16_color_FFFFFF_t_70 4

注:路径中不要有空格!!!

2. 启动

a) 前提:jdk 版本最低为 1.8.0,并且要配置到环境变量中;必须先启动 zkServer

b) 有的版本需要如下改动,打开 kafka_2.12-2.2.0\bin\windows 文件夹,使用文本编辑器打开

  1. kafka-run-class.bat,把 179 行中 %CLASSPATH% 使用双引号包起来
  2. ![20190421160611733.png][]

c) 假如把 kafka_2.12-2.2.0 文件解压到了 D 盘根目录

  1. kafka\_2.12-2.2.0 文件夹里新建 kafka-logs 文件夹
  2. 打开 kafka\_2.12-2.2.0\\config 文件夹,
  3. 使用文本编辑器打开 server.properties,把 60 行中日志文件路径修改一如下:
  4. **D:\\\\kafka\_2.12-2.2.0\\\\kafka-logs**
  5. ![20190625101431286.png][]

d) 打开 cmd, 进入到 kafka_2.12-2.2.0 文件夹下 bin\windows 目录

  1. 使用指令 **kafka-server-start.bat ..\\..\\config\\server.properties** 启动 kafka
  2. 使用指定 **kafka-topics.bat --list --zookeeper localhost:2181** 查看消息列表

结语

到此 SpringBoot 整合 Kafka 就结束了,注意几个关键的点,多多尝试,一定会成功的!

希望能够帮助到你

over

发表评论

表情:
评论列表 (有 0 条评论,430人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家