Springboot整合kafka

1. 整合kafka

1、引入依赖

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. </dependency>

2、设置yml文件

  1. spring:
  2. application:
  3. name: demo
  4. kafka:
  5. bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
  6. producer: # producer 生产者
  7. retries: 0 # 重试次数
  8. acks: 1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选0、1、all/-1)
  9. batch-size: 16384 # 批量大小
  10. buffer-memory: 33554432 # 生产端缓冲区大小
  11. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  12. # value-serializer: com.itheima.demo.config.MySerializer
  13. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. consumer: # consumer消费者
  15. group-id: javagroup # 默认的消费组ID
  16. enable-auto-commit: true # 是否自动提交offset
  17. auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset)
  18. # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
  19. # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
  20. # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
  21. auto-offset-reset: latest
  22. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  23. # value-deserializer: com.itheima.demo.config.MyDeserializer
  24. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、启动项目
在这里插入图片描述

2. 消息发送

2.1 发送类型

KafkaTemplate调用send时默认采用异步发送,如果需要同步获取发送结果,调用get方法

异步发送生产者:

  1. @RestController
  2. public class KafkaProducer {
  3. @Resource
  4. private KafkaTemplate<String, Object> kafkaTemplate;
  5. @GetMapping("/kafka/test/{msg}")
  6. public void sendMessage(@PathVariable("msg") String msg) {
  7. Message message = new Message();
  8. message.setMessage(msg);
  9. kafkaTemplate.send("test", JSON.toJSONString(message));
  10. }
  11. }

同步发送生产者:

  1. //测试同步发送与监听
  2. @RestController
  3. public class AsyncProducer {
  4. private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
  5. @Resource
  6. private KafkaTemplate<String, Object> kafkaTemplate;
  7. //同步发送
  8. @GetMapping("/kafka/sync/{msg}")
  9. public void sync(@PathVariable("msg") String msg) throws Exception {
  10. Message message = new Message();
  11. message.setMessage(msg);
  12. ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
  13. //注意,可以设置等待时间,超出后,不再等候结果
  14. SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
  15. logger.info("send result:{}",result.getProducerRecord().value());
  16. }
  17. }

消费者:

  1. @Component
  2. public class KafkaConsumer {
  3. private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  4. //不指定group,默认取yml里配置的
  5. @KafkaListener(topics = {
  6. "test"})
  7. public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
  8. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  9. if (optional.isPresent()) {
  10. Object msg = optional.get();
  11. logger.info("message:{}", msg);
  12. }
  13. }
  14. }

那么我们怎么看出来同步发送和异步发送的区别呢?

①首先在服务器上,将kafka暂停服务。
②在swagger发送消息

  • 调同步发送:请求被阻断,一直等待,超时后返回错误
    在这里插入图片描述
  • 而调异步发送的(默认发送接口),请求立刻返回。
    在这里插入图片描述

那么,异步发送的消息怎么确认发送情况呢?
我们使用注册监听
即新建一个类:KafkaListener.java

  1. @Configuration
  2. public class KafkaListener {
  3. private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);
  4. @Autowired
  5. KafkaTemplate kafkaTemplate;
  6. //配置监听
  7. @PostConstruct
  8. private void listener() {
  9. kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
  10. @Override
  11. public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
  12. logger.info("ok,message={}", producerRecord.value());
  13. }
  14. @Override
  15. public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
  16. logger.error("error!message={}", producerRecord.value());
  17. }
  18. });
  19. }
  20. }

查看控制台,等待一段时间后,异步发送失败的消息会被回调给注册过的listener
在这里插入图片描述
如果是正常发送异步消息,则会获得该消息。可以看到,在内部类 KafkaListener$1 中,即注册的Listener的消息。
在这里插入图片描述

2.2 序列化

消费者使用:KafkaConsumer.java

  1. @Component
  2. public class KafkaConsumer {
  3. private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
  4. //不指定group,默认取yml里配置的
  5. @KafkaListener(topics = {
  6. "test"})
  7. public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
  8. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  9. if (optional.isPresent()) {
  10. Object msg = optional.get();
  11. logger.info("message:{}", msg);
  12. }
  13. }
  14. }

1)序列化详解

  • 前面用到的是Kafka自带的字符串序列化器(org.apache.kafka.common.serialization.StringSerializer
  • 除此之外还有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 这些序列化器都实现了接口(org.apache.kafka.common.serialization.Serializer
  • 基本上,可以满足绝大多数场景

2)自定义序列化
自己实现,实现对应的接口即可,有以下方法:

  1. public interface Serializer<T> extends Closeable {
  2. default void configure(Map<String, ?> configs, Boolean isKey) {
  3. }
  4. //理论上,只实现这个即可正常运行
  5. byte[] serialize(String var1, T var2);
  6. //默认调上面的方法
  7. default byte[] serialize(String topic, Headers headers, T data) {
  8. return this.serialize(topic, data);
  9. }
  10. default void close() {
  11. }
  12. }

我们来自己实现一个序列化器:MySerializer.java

  1. public class MySerializer implements Serializer {
  2. @Override
  3. public byte[] serialize(String s, Object o) {
  4. String json = JSON.toJSONString(o);
  5. return json.getBytes();
  6. }
  7. }

3)解码
MyDeserializer.java,实现方式与编码器几乎一样.

  1. public class MyDeserializer implements Deserializer {
  2. private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);
  3. @Override
  4. public Object deserialize(String s, byte[] bytes) {
  5. try {
  6. String json = new String(bytes,"utf-8");
  7. return JSON.parse(json);
  8. } catch (UnsupportedEncodingException e) {
  9. e.printStackTrace();
  10. }
  11. return null;
  12. }
  13. }

4)在yaml中配置自己的编码器、解码器
在这里插入图片描述

再次收发,消息正常
在这里插入图片描述

2.3 分区策略

分区策略决定了消息根据key投放到哪个分区,也是顺序消费保障的基石。

  • 给定了分区号,直接将数据发送到指定的分区里面去
  • 没有给定分区号,给定数据的key值,通过key取上hashCode进行分区
  • 既没有给定分区号,也没有给定key值,直接轮循进行分区(默认
  • 自定义分区,你想怎么做就怎么做

1)验证默认分区规则
发送者代码参考:PartitionProducer.java

  1. //测试分区发送
  2. @RestController
  3. public class PartitionProducer {
  4. @Resource
  5. private KafkaTemplate<String, Object> kafkaTemplate;
  6. // 指定分区发送
  7. // 不管你key是什么,到同一个分区
  8. @GetMapping("/kafka/partitionSend/{key}")
  9. public void setPartition(@PathVariable("key") String key) {
  10. kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0号分区");
  11. }
  12. // 指定key发送,不指定分区
  13. // 根据key做hash,相同的key到同一个分区
  14. @GetMapping("/kafka/keysend/{key}")
  15. public void setKey(@PathVariable("key") String key) {
  16. kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分区");
  17. }
  18. }

消费者代码使用:PartitionConsumer.java

  1. @Component
  2. public class PartitionConsumer {
  3. private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);
  4. //分区消费
  5. @KafkaListener(topics = {
  6. "test"},topicPattern = "0")
  7. public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
  8. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  9. if (optional.isPresent()) {
  10. Object msg = optional.get();
  11. logger.info("partition=0,message:[{}]", msg);
  12. }
  13. }
  14. @KafkaListener(topics = {
  15. "test"},topicPattern = "1")
  16. public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
  17. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  18. if (optional.isPresent()) {
  19. Object msg = optional.get();
  20. logger.info("partition=1,message:[{}]", msg);
  21. }
  22. }
  23. }

通过swagger访问setKey(也就是只给了key的方法):
在这里插入图片描述

可以看到key相同的被hash到了同一个分区

再访问setPartition来设置分区号0来发送:
在这里插入图片描述

可以看到无论key是什么,都是分区0来消费

2)自定义分区
参考代码:MyPartitioner.java , MyPartitionTemplate.java。
发送使用:MyPartitionProducer.java。

  1. public class MyPartitioner implements Partitioner {
  2. @Override
  3. public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
  4. // 定义自己的分区策略
  5. // 如果key以0开头,发到0号分区
  6. // 其他都扔到1号分区
  7. String keyStr = key+"";
  8. if (keyStr.startsWith("0")){
  9. return 0;
  10. }else {
  11. return 1;
  12. }
  13. }
  14. @Override
  15. public void close() {
  16. }
  17. @Override
  18. public void configure(Map<String, ?> map) {
  19. }
  20. }
  21. @Configuration
  22. public class MyPartitionTemplate {
  23. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  24. @Value("${spring.kafka.bootstrap-servers}")
  25. private String bootstrapServers;
  26. KafkaTemplate kafkaTemplate;
  27. @PostConstruct
  28. public void setKafkaTemplate() {
  29. Map<String, Object> props = new HashMap<>();
  30. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  31. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  32. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  33. //注意分区器在这里!!!
  34. props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
  35. this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
  36. }
  37. public KafkaTemplate getKafkaTemplate(){
  38. return kafkaTemplate;
  39. }
  40. }
  41. //测试自定义分区发送
  42. @RestController
  43. public class MyPartitionProducer {
  44. @Autowired
  45. MyPartitionTemplate template;
  46. // 使用0开头和其他任意字母开头的key发送消息
  47. // 看控制台的输出,在哪个分区里?
  48. @GetMapping("/kafka/myPartitionSend/{key}")
  49. public void setPartition(@PathVariable("key") String key) {
  50. template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定义分区策略");
  51. }
  52. }

使用swagger,发送0开头和非0开头两种key
在这里插入图片描述

3. 消息消费

3.1 消息组别

发送者使用:KafkaProducer.java

  1. @RestController
  2. public class KafkaProducer {
  3. @Resource
  4. private KafkaTemplate<String, Object> kafkaTemplate;
  5. @GetMapping("/kafka/test/{msg}")
  6. public void sendMessage(@PathVariable("msg") String msg) {
  7. Message message = new Message();
  8. message.setMessage(msg);
  9. kafkaTemplate.send("test", JSON.toJSONString(message));
  10. }
  11. }

1)代码参考:GroupConsumer.java,Listener拷贝3份,分别赋予两组group,验证分组消费:

  1. //测试组消费
  2. @Component
  3. public class GroupConsumer {
  4. private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);
  5. //组1,消费者1
  6. @KafkaListener(topics = {
  7. "test"},groupId = "group1")
  8. public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
  9. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  10. if (optional.isPresent()) {
  11. Object msg = optional.get();
  12. logger.info("group:group1-1 , message:{}", msg);
  13. }
  14. }
  15. //组1,消费者2
  16. @KafkaListener(topics = {
  17. "test"},groupId = "group1")
  18. public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
  19. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  20. if (optional.isPresent()) {
  21. Object msg = optional.get();
  22. logger.info("group:group1-2 , message:{}", msg);
  23. }
  24. }
  25. //组2,只有一个消费者
  26. @KafkaListener(topics = {
  27. "test"},groupId = "group2")
  28. public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
  29. Optional<?> optional = Optional.ofNullable(consumerRecord.value());
  30. if (optional.isPresent()) {
  31. Object msg = optional.get();
  32. logger.info("group:group2 , message:{}", msg);
  33. }
  34. }
  35. }

2)启动
在这里插入图片描述
3)通过swagger发送2条消息
在这里插入图片描述

  • 同一group下的两个消费者,在group1均分消息
  • group2下只有一个消费者,得到全部消息

4)消费端闲置
注意分区数与消费者数的搭配,如果 ( 消费者数 > 分区数量 ),将会出现消费者闲置(因为一个分区只能分配给一个消费者),浪费资源!

验证方式:
停掉项目,删掉test主题,重新建一个 ,这次只给它分配一个分区。
重新发送两条消息,试一试
在这里插入图片描述

  • group2可以消费到1、2两条消息
  • group1下有两个消费者,但是只分配给了 1 , 2这个进程被闲置

3.2 位移提交

1)自动提交
前面的案例中,我们设置了以下两个选项,则kafka会按延时设置自动提交

  1. enable-auto-commit: true # 是否自动提交offset
  2. auto-commit-interval: 100 # 提交offset延时(接收到消息后多久提交offset,默认单位为ms)

2)手动提交
有些时候,我们需要手动控制偏移量的提交时机,比如确保消息严格消费后再提交,以防止丢失或重复。
下面我们自己定义配置,覆盖上面的参数
代码参考:MyOffsetConfig.java

  1. @Configuration
  2. public class MyOffsetConfig {
  3. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  4. @Value("${spring.kafka.bootstrap-servers}")
  5. private String bootstrapServers;
  6. @Bean
  7. public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
  8. Map<String, Object> configProps = new HashMap<>();
  9. configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  10. configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  11. configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  12. // 注意这里!!!设置手动提交
  13. configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
  14. ConcurrentKafkaListenerContainerFactory<String, String> factory =
  15. new ConcurrentKafkaListenerContainerFactory<>();
  16. factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
  17. // ack模式:
  18. // AckMode针对ENABLE_AUTO_COMMIT_CONFIG=false时生效,有以下几种:
  19. //
  20. // RECORD
  21. // 每处理一条commit一次
  22. //
  23. // BATCH(默认)
  24. // 每次poll的时候批量提交一次,频率取决于每次poll的调用频率
  25. //
  26. // TIME
  27. // 每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
  28. //
  29. // COUNT
  30. // 累积达到ackCount次的ack去commit
  31. //
  32. // COUNT_TIME
  33. // ackTime或ackCount哪个条件先满足,就commit
  34. //
  35. // MANUAL
  36. // listener负责ack,但是背后也是批量上去
  37. //
  38. // MANUAL_IMMEDIATE
  39. // listner负责ack,每调用一次,就立即commit
  40. factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
  41. return factory;
  42. }
  43. }

然后通过在消费端的Consumer来提交偏移量
MyOffsetConsumer:

  1. @Component
  2. public class MyOffsetConsumer {
  3. private final Logger logger = LoggerFactory.getLogger(this.getClass());
  4. @KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")
  5. public void manualCommit(@Payload String message,
  6. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  7. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  8. Consumer consumer,
  9. Acknowledgment ack) {
  10. logger.info("手动提交偏移量 , partition={}, msg={}", partition, message);
  11. // 同步提交
  12. consumer.commitSync();
  13. //异步提交
  14. //consumer.commitAsync();
  15. // ack提交也可以,会按设置的ack策略走(参考MyOffsetConfig.java里的ack模式)
  16. // ack.acknowledge();
  17. }
  18. @KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")
  19. public void noCommit(@Payload String message,
  20. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  21. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  22. Consumer consumer,
  23. Acknowledgment ack) {
  24. logger.info("忘记提交偏移量, partition={}, msg={}", partition, message);
  25. // 不做commit!
  26. }
  27. /**
  28. * 现实状况:
  29. * commitSync和commitAsync组合使用
  30. * <p>
  31. * 手工提交异步 consumer.commitAsync();
  32. * 手工同步提交 consumer.commitSync()
  33. * <p>
  34. * commitSync()方法提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,
  35. * commitSync()会一直重试,但是commitAsync()不会。
  36. * <p>
  37. * 一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题
  38. * 因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。
  39. * 但如果这是发生在关闭消费者或再均衡前的最后一次提交,就要确保能够提交成功。否则就会造成重复消费
  40. * 因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
  41. */
  42. // @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
  43. public void manualOffset(@Payload String message,
  44. @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
  45. @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
  46. Consumer consumer,
  47. Acknowledgment ack) {
  48. try {
  49. logger.info("同步异步搭配 , partition={}, msg={}", partition, message);
  50. //先异步提交
  51. consumer.commitAsync();
  52. //继续做别的事
  53. } catch (Exception e) {
  54. System.out.println("commit failed");
  55. } finally {
  56. try {
  57. consumer.commitSync();
  58. } finally {
  59. consumer.close();
  60. }
  61. }
  62. }
  63. /**
  64. * 甚至可以手动提交,指定任意位置的偏移量
  65. * 不推荐日常使用!!!
  66. */
  67. // @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
  68. public void offset(ConsumerRecord record, Consumer consumer) {
  69. logger.info("手动指定任意偏移量, partition={}, msg={}", record.partition(), record);
  70. Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
  71. currentOffset.put(new TopicPartition(record.topic(), record.partition()),
  72. new OffsetAndMetadata(record.offset() + 1));
  73. consumer.commitSync(currentOffset);
  74. }
  75. }

3)重复消费问题
如果手动提交模式被打开,一定不要忘记提交偏移量。否则会造成重复消费!

用km将test主题删除,新建一个test空主题。方便观察消息偏移 注释掉其他Consumer的Component注解,只保留当前MyOffsetConsumer.java 启动项目,使用swagger的KafkaProducer发送连续几条消息 留心控制台,都能消费,没问题:
在这里插入图片描述
但是!重启项目:
在这里插入图片描述
无论重启多少次,不提交偏移量的消费组,会重复消费一遍!!!

再通过命令行查询偏移量
在这里插入图片描述
4)经验与总结
commitSync()方法,即同步提交,会提交最后一个偏移量。在成功提交或碰到无怯恢复的错误之前,commitSync()会一直重试,但是commitAsync()不会。

这就造成一个陷阱:
如果异步提交,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。只要成功一次,偏移量就会提交上去。

但是!如果这是发生在关闭消费者时的最后一次提交,就要确保能够提交成功,如果还没提交完就停掉了进程。就会造成重复消费!

因此,在消费者关闭前一般会组合使用commitAsync()和commitSync()。
详细代码参考:MyOffsetConsumer.manualOffset()

发表评论

表情:
评论列表 (有 0 条评论,15人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家