SpringBoot整合kafka

深藏阁楼爱情的钟 2022-04-18 00:09 434阅读 0赞

Maven依赖

  1. <parent>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-parent</artifactId>
  4. <version>2.0.1.RELEASE</version>
  5. </parent>
  6. <dependencies>
  7. <!-- springBoot集成kafka -->
  8. <dependency>
  9. <groupId>org.springframework.kafka</groupId>
  10. <artifactId>spring-kafka</artifactId>
  11. </dependency>
  12. <!-- SpringBoot整合Web组件 -->
  13. <dependency>
  14. <groupId>org.springframework.boot</groupId>
  15. <artifactId>spring-boot-starter-web</artifactId>
  16. </dependency>
  17. </dependencies>

application.yml配置文件

  1. # kafka
  2. spring:
  3. kafka:
  4. # kafka服务器地址(可以多个)
  5. bootstrap-servers: 192.168.128.139:9092,192.168.128.140:9092,192.168.128.141:9092
  6. consumer:
  7. # 指定一个默认的组名
  8. group-id: kafka2
  9. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  10. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  11. # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  12. auto-offset-reset: earliest
  13. # key/value的反序列化
  14. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  15. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  16. producer:
  17. # key/value的序列化
  18. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  19. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  20. # 批量抓取
  21. batch-size: 65536
  22. # 缓存容量
  23. buffer-memory: 524288
  24. # 服务器地址
  25. bootstrap-servers: 192.168.128.139:9092,192.168.128.140:9092,192.168.128.141:9092

测试代码如下:

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.web.bind.annotation.RequestMapping;
  8. import org.springframework.web.bind.annotation.RestController;
  9. @RestController
  10. @SpringBootApplication
  11. public class KafkaController {
  12. /**
  13. * 注入kafkaTemplate
  14. */
  15. @Autowired
  16. private KafkaTemplate<String, String> kafkaTemplate;
  17. /**
  18. * 发送消息的方法
  19. *
  20. * @param key
  21. * 推送数据的key
  22. * @param data
  23. * 推送数据的data
  24. */
  25. private void send(String key, String data) {
  26. // topic 名称 key data 消息数据
  27. kafkaTemplate.send("test-all", key, data);
  28. }
  29. @RequestMapping("/kafka")
  30. public String testKafka() {
  31. for (int i = 6; i <= 10; i++) {
  32. send("key" + i, "data" + i);
  33. }
  34. return "success";
  35. }
  36. public static void main(String[] args) {
  37. SpringApplication.run(KafkaController.class, args);
  38. }
  39. /**
  40. * 消费者使用日志打印消息
  41. */
  42. @KafkaListener(topics = "test-all")
  43. public void receive(ConsumerRecord<?, ?> consumer) {
  44. System.out.println("topic名称:" + consumer.topic() + ",key:" + consumer.key() + ",分区位置:" + consumer.partition()
  45. + ", 下标" + consumer.offset());
  46. }
  47. }

消费者收到的消息:

20181105204449845.png

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3FxXzM4MjcwMTA2_size_16_color_FFFFFF_t_70

发表评论

表情:
评论列表 (有 0 条评论,434人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家