【Kafka】基本使用:SpringBoot整合Kafka-clinets

迷南。 2022-12-21 04:48 210阅读 0赞

首先说明一点,SpringBoot 的版本和 Kafka-clinets 的版本有一个对照表格,如果没有按照正确的版本来引入,那么会存在版本问题导致ClassNotFound的问题,具体请参考 https://spring.io/projects/spring-kafka

在这里插入图片描述

依赖:

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.2.0.RELEASE</version>
  5. </dependency>

1.Producer

  1. @Component
  2. public class MyKafKaProducer {
  3. @Resource
  4. // Integer:消息key的类型,每条消息都需一个 key,它决定了该消息会去哪个 Partition
  5. // String:消息类型
  6. private KafkaTemplate<Integer, String> kafkaTemplate;
  7. public void send() {
  8. // 通过kafka进行消息发送
  9. // ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
  10. kafkaTemplate.send("test", 1, "msgData");
  11. }
  12. }

2.Consumer

  1. @Component
  2. public class MyKafkaConsumer {
  3. // 订阅相应主题。接收到的消息会被注入ConsumerRecord
  4. @KafkaListener(topics = {
  5. "test"})
  6. public void listener(ConsumerRecord record) {
  7. // Optional,jdk8新特性,用于null处理
  8. Optional msg = Optional.ofNullable(record.value());
  9. // isPresent判斷值是否存在
  10. if (msg.isPresent()) {
  11. // 打印消息
  12. System.out.println(msg.get()); // get,从optional中取出对应值
  13. }
  14. }
  15. }

3.application.properties

  1. # kafka服务器
  2. spring.kafka.bootstrapservers=43.107.136.126:9092
  3. # 编码(Producer)
  4. spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer
  5. spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
  6. # 解码(Consumer)
  7. spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer
  8. spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
  9. # 对Consumer的特殊配置
  10. spring.kafka.consumer.group-id=test-consumer-group
  11. spring.kafka.consumer.auto-offset-reset=earliest
  12. spring.kafka.consumer.enable-auto-commit=true

注:如果我们关闭自动提交,即 enable-auto-commit 为 false 时,我们还可以配置 spring.kafka.listener.concurrency 去配置监听消息的线程数,即每个 @KafkaListener 注解标识的 topic 会有几个线程从队列中取出消息。具体参考请这篇文章…

4.启动类

  1. @SpringBootApplication
  2. public class TestKafkaApplication {
  3. public static void main(String[] args) throws InterruptedException {
  4. ConfigurableApplicationContext context =
  5. SpringApplication.run(TestKafkaApplication.class, args);
  6. // 获取producer的Bean实例,send消息
  7. MyKafKaProducer kafKaProducer = context.getBean(MyKafKaProducer.class);
  8. for (int i = 0; i < 10; i++) {
  9. // Producer发送消息
  10. // 发送之后Consumer自己就会poll,然后打印
  11. kafKaProducer.send();
  12. TimeUnit.SECONDS.sleep(2);
  13. }
  14. }
  15. }

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzkzNTkyNw_size_16_color_FFFFFF_t_70_pic_center 1

发表评论

表情:
评论列表 (有 0 条评论,210人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Springboot整合kafka基本使用

    同样的,需要我们搭建一个maven来一起看下完整的pom.xml配置也很简单然后新建一个启动类,看下控制台是否成功链接了Kafka,在启动之前别忘了开启Kafka集群。

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家