Spring Boot 整合 kafka

古城微笑少年丶 2023-10-09 19:52 81阅读 0赞

Spring Boot 整合 kafka

文章更新时间:2021/10/29

一、创建Spring boot 工程

创建过程不再描述,创建后的工程结构如下:

img

POM文件

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.zhbf</groupId>
  6. <artifactId>kafka</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <name>kafka</name>
  9. <description>kafka示例工程</description>
  10. <properties>
  11. <java.version>1.8</java.version>
  12. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  13. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  14. <spring-boot.version>2.2.9.RELEASE</spring-boot.version>
  15. </properties>
  16. <dependencies>
  17. <dependency>
  18. <groupId>org.springframework.boot</groupId>
  19. <artifactId>spring-boot-starter</artifactId>
  20. </dependency>
  21. <!--引入kafka依赖-->
  22. <dependency>
  23. <groupId>org.springframework.kafka</groupId>
  24. <artifactId>spring-kafka</artifactId>
  25. <version>2.4.1.RELEASE</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.projectlombok</groupId>
  29. <artifactId>lombok</artifactId>
  30. <optional>true</optional>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-test</artifactId>
  35. <scope>test</scope>
  36. <exclusions>
  37. <exclusion>
  38. <groupId>org.junit.vintage</groupId>
  39. <artifactId>junit-vintage-engine</artifactId>
  40. </exclusion>
  41. </exclusions>
  42. </dependency>
  43. <dependency>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-starter-web</artifactId>
  46. </dependency>
  47. </dependencies>
  48. <dependencyManagement>
  49. <dependencies>
  50. <dependency>
  51. <groupId>org.springframework.boot</groupId>
  52. <artifactId>spring-boot-dependencies</artifactId>
  53. <version>${spring-boot.version}</version>
  54. <type>pom</type>
  55. <scope>import</scope>
  56. </dependency>
  57. </dependencies>
  58. </dependencyManagement>
  59. <build>
  60. <plugins>
  61. <plugin>
  62. <groupId>org.apache.maven.plugins</groupId>
  63. <artifactId>maven-compiler-plugin</artifactId>
  64. <version>3.8.1</version>
  65. <configuration>
  66. <source>1.8</source>
  67. <target>1.8</target>
  68. <encoding>UTF-8</encoding>
  69. </configuration>
  70. </plugin>
  71. <plugin>
  72. <groupId>org.springframework.boot</groupId>
  73. <artifactId>spring-boot-maven-plugin</artifactId>
  74. <version>2.3.7.RELEASE</version>
  75. <configuration>
  76. <mainClass>com.zhbf.kafka.KafkaApplication</mainClass>
  77. </configuration>
  78. <executions>
  79. <execution>
  80. <id>repackage</id>
  81. <goals>
  82. <goal>repackage</goal>
  83. </goals>
  84. </execution>
  85. </executions>
  86. </plugin>
  87. </plugins>
  88. </build>
  89. </project>

application.yml配置文件

  1. server:
  2. port: 8080
  3. spring:
  4. application:
  5. name: kafka
  6. kafka:
  7. bootstrap-servers: 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 # kafka集群信息
  8. producer: # 生产者配置
  9. retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
  10. batch-size: 16384 #16K
  11. buffer-memory: 33554432 #32M
  12. acks: 1
  13. # 指定消息key和消息体的编解码方式
  14. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  16. consumer:
  17. group-id: zhTestGroup # 消费者组
  18. enable-auto-commit: false # 关闭自动提交
  19. auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  20. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  21. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  22. listener:
  23. # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
  24. # RECORD
  25. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
  26. # BATCH
  27. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
  28. # TIME
  29. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
  30. # COUNT
  31. # TIME | COUNT 有一个条件满足时提交
  32. # COUNT_TIME
  33. # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
  34. # MANUAL
  35. # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
  36. # MANUAL_IMMEDIATE
  37. ack-mode: manual_immediate

启动ZK、kafka通讯的服务器broker,并启动消费者监听

启动方式参考上一篇文章,戳这里~

启动SpringbootApplication.java

出现下图界面则说明工程创建好了:

img

二、创建kafka生产者类,并通过控制器调用

kafka生产者类

  1. /**
  2. * kafka生产者【实际上就是一个Controller,用来进行消息生产】
  3. */
  4. @RestController
  5. public class KafkaProducer {
  6. private final static String TOPIC_NAME = "zhTest"; //topic的名称
  7. @Autowired
  8. private KafkaTemplate<String, String> kafkaTemplate;
  9. @RequestMapping("/send")
  10. public void send() {
  11. //发送功能就一行代码~
  12. kafkaTemplate.send(TOPIC_NAME, "key", "test message send~");
  13. }
  14. }

三、创建kafka消费者类,并通过@KafkaListener注解消费消息

kafka消费者类

  1. /**
  2. * kafka消费者
  3. */
  4. @Component
  5. public class KafkaConsumer {
  6. //kafka的监听器,topic为"zhTest",消费者组为"zhTestGroup"
  7. @KafkaListener(topics = "zhTest", groupId = "zhTestGroup")
  8. public void listenZhugeGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
  9. String value = record.value();
  10. System.out.println(value);
  11. System.out.println(record);
  12. //手动提交offset
  13. ack.acknowledge();
  14. }
  15. /*//配置多个消费组
  16. @KafkaListener(topics = "zhTest",groupId = "zhTestGroup2")
  17. public void listenTulingGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
  18. String value = record.value();
  19. System.out.println(value);
  20. System.out.println(record);
  21. ack.acknowledge();
  22. }*/
  23. }

启动服务并调用

img

img

转自:https://www.cnblogs.com/riches/p/11720068.html

发表评论

表情:
评论列表 (有 0 条评论,81人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi