springboot 整合 kafka

「爱情、让人受尽委屈。」 2022-05-09 14:12 416阅读 0赞

maven依赖

springboot 依赖省略

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.1.9.RELEASE</version>
  5. </dependency>

代码部分

配置部分

  1. spring:
  2. kafka:
  3. bootstrap-servers: 127.0.0.1:9092
  4. producer:
  5. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  6. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. consumer:
  8. # 应该可以消费者类指定
  9. group-id: 9
  10. enable-auto-commit: true
  11. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  12. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

生产者

  1. import org.springframework.beans.factory.annotation.Autowired;
  2. import org.springframework.kafka.core.KafkaTemplate;
  3. import org.springframework.scheduling.annotation.EnableScheduling;
  4. import org.springframework.scheduling.annotation.Scheduled;
  5. import org.springframework.stereotype.Component;
  6. import java.text.SimpleDateFormat;
  7. import java.util.Date;
  8. /** * @author lf * @描述 生产者 * @date 2018-09-07 */
  9. @EnableScheduling
  10. @Component
  11. public class Producer {
  12. @Autowired
  13. private KafkaTemplate<String, String> kafkaTemplate;
  14. // 3秒发送一个数据
  15. @Scheduled(fixedDelay=3000)
  16. public void send() {
  17. Date date = new Date();
  18. SimpleDateFormat simpleDateFormat=new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
  19. String format = simpleDateFormat.format(date);
  20. // topic 名字 test
  21. kafkaTemplate.send("test",format+" boot");
  22. }
  23. }

消费者

消费者只是简单的试用了几个配置

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.annotation.PartitionOffset;
  4. import org.springframework.kafka.annotation.TopicPartition;
  5. import org.springframework.kafka.support.Acknowledgment;
  6. import org.springframework.stereotype.Component;
  7. /** * @author lf * @描述 消费者 * @date 2018-09-07 */
  8. @Component
  9. public class Consumer {
  10. @KafkaListener(id = "Consumer1",//管理此端点的容器的唯一标识符
  11. // groupId = "",仅为此侦听器使用此值覆盖使用者工厂的属性。
  12. // 消费topic属性配置
  13. //配置topic和分区:监听两个topic,分别为test1、test2,
  14. // test1只接收分区0,1的消息,
  15. //test2接收分区0和分区2的消息,但是分区1的消费者初始位置(offset)为8
  16. topicPartitions =
  17. {
  18. @TopicPartition(topic = "test1", partitions = { "0", "1" }),
  19. @TopicPartition(topic = "test2", partitions = "0",
  20. partitionOffsets = @PartitionOffset(partition = "2", initialOffset = "7"))
  21. })
  22. public void listen (ConsumerRecord<?, ?> record, Acknowledgment ack) throws Exception {
  23. System.out.printf("topic = %s, offset = %d, value = %s partition=%s\n", record.topic(), record.offset(), record.value(),record.partition());
  24. // 开启即手动提交
  25. //ack.acknowledge();
  26. }
  27. }

属性配置参考官网https://docs.spring.io/spring-kafka/api/org/springframework/kafka/annotation/KafkaListener.html

发表评论

表情:
评论列表 (有 0 条评论,416人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家