spring boot整合kafka记录

心已赠人 2021-09-24 11:12 368阅读 0赞

pom文件

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-web</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.kafka</groupId>
  8. <artifactId>spring-kafka</artifactId>
  9. </dependency>

yml配置

  1. spring
  2. kafka:
  3. # bootstrap-servers: 192.168.10.45:9092
  4. bootstrap-servers: 192.168.198.128:9092
  5. producer:
  6. # 重试次数,默认Integer.MAX_VALUE
  7. retries: 1
  8. # 同一批次内存大小(默认16K)
  9. batch-size: 16384
  10. # 生产者内存缓存区大小(32M)
  11. buffer-memory: 33554432
  12. # key和value的序列化(默认,可以不设置)
  13. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  14. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  15. # ack应答机制,默认1,即只需要确认leader收到消息
  16. acks: 1
  17. # springboot1.5.16自动装配中不支持properties下的其他配置,不知道为啥。2.x版本可以
  18. #properties:
  19. # 使用自定义的分区选择器
  20. #{partitioner.class: com.msy.kafka.MyPartition, acks: all}
  21. consumer:
  22. group-id: test
  23. enable-auto-commit: false
  24. # earliest:从头开始消费 latest:从最新的开始消费 默认latest
  25. auto-offset-reset: latest
  26. # key和value反序列化(默认,可以不设置)
  27. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  28. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  29. #最大消费数
  30. max-poll-records: 100
  31. listener:
  32. # 消费者并发能力 这个并发量根据分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态 创建一个分区数为8的Topic。
  33. concurrency: 6
  34. # 设置手动提交的时候,需要设置ackMode
  35. ack-mode: MANUAL
  36. topic: test5

生产者需要调用生成数据

  1. package com.example.sms.middleware.sms_middleware.kafka;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.beans.factory.annotation.Value;
  4. import org.springframework.boot.ApplicationArguments;
  5. import org.springframework.boot.ApplicationRunner;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. /**
  9. * @Author kang
  10. * @Date 2020/6/19$ 10:03$
  11. **/
  12. @Component
  13. public class Producer {
  14. @Autowired
  15. private KafkaTemplate<String, String> kafkaTemplate;
  16. @Value("${spring.kafka.topic}")
  17. private String topic;
  18. public void sendMessage(){
  19. kafkaTemplate.send(topic,"message");
  20. }
  21. // @Override
  22. // public void run(ApplicationArguments args) throws Exception {
  23. // System.out.println("11111");
  24. new Producer().sendMessage();
  25. // }
  26. }

消费者

  1. package com.example.sms.middleware.sms_middleware.kafka;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.support.Acknowledgment;
  6. import org.springframework.stereotype.Component;
  7. /**
  8. * @Author kang
  9. * @Date 2020/6/19$ 10:04$
  10. **/
  11. @Component
  12. @Slf4j
  13. public class Consumer {
  14. @KafkaListener(topics = "test5") // 支持监听多个topic的消息
  15. public void consumerMessage(ConsumerRecord<String, String> consumerRecord, Acknowledgment ack) {
  16. try {
  17. System.out.println("1212121");
  18. String value = consumerRecord.value();
  19. System.out.println("1212121"+value);
  20. log.info("监听到的消息为:{}", value);
  21. // 业务处理......
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. } finally {
  25. ack.acknowledge();
  26. }
  27. }
  28. }

可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的中有4个listenPartitionX方法,我的topic设置了4个分区)

  1. @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
  2. public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
  3. log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
  4. log.info("Id0 records size " + records.size());
  5. for (ConsumerRecord<?, ?> record : records) {
  6. String value = record.value();
  7. }
  8. }
  9. }
  10. @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
  11. public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
  12. log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
  13. log.info("Id1 records size " + records.size());
  14. for (ConsumerRecord<?, ?> record : records) {
  15. String value = record.value();
  16. }
  17. }

错误接口访问不到时报错404

解决:

  1. @SpringBootApplication默认的扫描位置就是Application所在的同级目录和子目录,我们修改一下

发表评论

表情:
评论列表 (有 0 条评论,368人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi