spring boot 整合kafka

古城微笑少年丶 2022-05-31 02:41 266阅读 0赞

一、spirng boot 集成这里跳过了

  1. 简单入门例子:https://www.jianshu.com/p/d24bceea7665

二、pom文件

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>1.2.2.RELEASE</version>
  5. </dependency>

三、spring boot配置文件
我采用了yml配置
部分配置如下:

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.133.128:9092
  4. producer:
  5. #value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
  6. consumer:
  7. group-id: test1
  8. #保证每个组一个消费者消费同一条消息,若设置为earliest,那么会从头开始读partition(none)
  9. auto-offset-reset: latest

更多可以配置的属性参考 KafkaProperties 。

四、消费者监听

  1. package com.st.kafka.consumer;
  2. import java.util.List;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.kafka.annotation.KafkaListener;
  7. import org.springframework.stereotype.Component;
  8. import com.alibaba.fastjson.JSONObject;
  9. @Component
  10. public class Listener {
  11. protected final Logger logger = LoggerFactory.getLogger(this.getClass());
  12. @KafkaListener(topics = {
  13. "test"},id="t2")
  14. public void listen(ConsumerRecord<?, ?> record) {
  15. logger.info("kafka的key: " + record.key());
  16. List<String> list=(List<String>) JSONObject.parse(record.value().toString());
  17. for(int i=0;i<list.size();i++){
  18. logger.info("kafka遍历: " + list.get(i));
  19. }
  20. logger.info("kafka的value: " + record.value().toString());
  21. }
  22. }

五、简单测试生产者

  1. package com.st.kafka.controller;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.kafka.clients.consumer.ConsumerRecord;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. import org.springframework.beans.factory.annotation.Autowired;
  8. import org.springframework.kafka.annotation.KafkaListener;
  9. import org.springframework.kafka.core.KafkaTemplate;
  10. import org.springframework.web.bind.annotation.RequestMapping;
  11. import org.springframework.web.bind.annotation.RequestMethod;
  12. import org.springframework.web.bind.annotation.RequestParam;
  13. import org.springframework.web.bind.annotation.RestController;
  14. import com.alibaba.fastjson.JSONObject;
  15. import com.st.base.dto.BaseResult;
  16. import com.st.base.util.ResultUtil;
  17. import io.swagger.annotations.ApiOperation;
  18. @RestController
  19. @RequestMapping("/kafka")
  20. public class KafkaController {
  21. protected final Logger logger = LoggerFactory.getLogger(this.getClass());
  22. @Autowired
  23. private KafkaTemplate kafkaTemplate;
  24. @ApiOperation("测试卡夫卡生产者")
  25. @RequestMapping(value = "/send", method = RequestMethod.GET)
  26. public BaseResult sendKafka(@RequestParam String key,@RequestParam String message) {
  27. try {
  28. List<String> list=new ArrayList<String>();
  29. for(long i=0;i<1000;i++){
  30. list.add(i+"");
  31. }
  32. logger.info("kafka的消息={}", list);
  33. String temp=JSONObject.toJSONString(list);
  34. kafkaTemplate.send("test","key501", temp);
  35. logger.info("发送kafka成功.");
  36. return ResultUtil.success("SUCCESS","发送kafka成功");
  37. } catch (Exception e) {
  38. logger.error("发送kafka失败", e);
  39. return ResultUtil.success("FAIL","发送kafka失败");
  40. }
  41. }
  42. @KafkaListener(id = "t1", topics = "test")
  43. public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
  44. logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
  45. }
  46. }

使用kafkaTemplate send 直接传obj 会报错 求各位大神指点解决方案

发表评论

表情:
评论列表 (有 0 条评论,266人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi