Springboot整合kafka基本使用

蔚落 2024-04-29 19:04 153阅读 0赞

一. 项目搭建

同样的,我们需要创建一个基于Maven的项目,并且整合非常简单。我们只需要使用以下的.

  1. <!--kafka依赖-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>

模块中的完整的pom.xml文件:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>org.example</groupId>
  7. <artifactId>springboot-kafka-all</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <java.version>1.8</java.version>
  11. </properties>
  12. <parent>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-parent</artifactId>
  15. <version>2.1.3.RELEASE</version>
  16. </parent>
  17. <dependencies>
  18. <!--web-->
  19. <dependency>
  20. <groupId>org.springframework.boot</groupId>
  21. <artifactId>spring-boot-starter-web</artifactId>
  22. </dependency>
  23. <!--test-->
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. </dependency>
  28. <!-- kafka的依赖-->
  29. <dependency>
  30. <groupId>org.springframework.kafka</groupId>
  31. <artifactId>spring-kafka</artifactId>
  32. </dependency>
  33. <!--Hutool依赖-->
  34. <dependency>
  35. <groupId>cn.hutool</groupId>
  36. <artifactId>hutool-all</artifactId>
  37. <version>5.8.4</version>
  38. </dependency>
  39. <!--fast-json格式-->
  40. <dependency>
  41. <groupId>com.alibaba</groupId>
  42. <artifactId>fastjson</artifactId>
  43. <version>1.2.58</version>
  44. </dependency>
  45. <dependency>
  46. <groupId> org.slf4j </groupId>
  47. <artifactId> slf4j-api </artifactId>
  48. <version> 1.6.4 </version>
  49. </dependency>
  50. <dependency>
  51. <groupId>org.slf4j</groupId>
  52. <artifactId>slf4j-simple</artifactId>
  53. <version>1.7.25</version>
  54. <scope>compile</scope>
  55. </dependency>
  56. <!-- set get-->
  57. <dependency>
  58. <groupId>org.projectlombok</groupId>
  59. <artifactId>lombok</artifactId>
  60. </dependency>
  61. </dependencies>
  62. <build>
  63. <plugins>
  64. <plugin>
  65. <groupId>org.springframework.boot</groupId>
  66. <artifactId>spring-boot-maven-plugin</artifactId>
  67. <version>2.1.3.RELEASE</version>
  68. </plugin>
  69. </plugins>
  70. </build>
  71. </project>

配置文件中的也很简单 application.yml。指定项目启动的端口号以及kafka的端口路径

  1. server:
  2. port: 8081
  3. spring:
  4. kafka:
  5. producer:
  6. bootstrap-servers: 127.0.0.1:9092

然后新建一个启动类,在启动之前别忘了开启Kafka集群,看下控制台是否成功链接了Kafka,

二. 基本使用如下

先从一个简单的例子,来快速体验一下Kafka,新建HelloController.java接收信息和发送信息

  1. @Slf4j
  2. @RestController
  3. public class HelloController {
  4. private static final String topic = "test";
  5. @Autowired
  6. private KafkaTemplate<Object, Object> kafkaTemplate;
  7. // 接收消息
  8. @KafkaListener(id = "helloGroup", topics = topic)
  9. public void listen(String msg) {
  10. log.info("hello receive value: {}" , msg);
  11. // hello receive value: hello kafka
  12. }
  13. @GetMapping("/hello")
  14. public String hello() {
  15. // 发送消息
  16. kafkaTemplate.send(topic, "hello kafka");
  17. return "hello";
  18. }
  19. }

我们通过KafkaTemplate进行消息的发送, 通过@KafkaListener进行消息的消费,我们可以指定消费者ID以及监听的topic,请求localhost:8081/hello观察控制台的变化。请求后,发现消息发送和接收的非常快,我们也可以观察UI后台的消息详情,同步对比

topic创建

之前我们的topic是在UI后台创建的,那么在SpringBoot中如何创建呢? 下面我们试着发送一个不存在的topic

  1. // 当topic不存在时 会默认创建一个topic
  2. // num.partitions = 1 #默认Topic分区数
  3. // num.replica.fetchers = 1 #默认副本数
  4. @GetMapping("/hello1")
  5. public String hello1() {
  6. // 发送消息
  7. kafkaTemplate.send("hello1", "hello1");
  8. return "hello1";
  9. }
  10. // 接收消息
  11. @KafkaListener(id = "hello1Group", topics = "hello1")
  12. public void listen1(String msg) {
  13. log.info("hello1 receive value: {}" , msg);
  14. // hello1 receive value: hello1
  15. }

在请求之后,当我们观察控制台和管理后台时,我们发现没有出现任何错误,并且系统还自动为我们创建了一个话题。在自动创建话题过程中,默认使用了一组参数。

  1. num.partitions = 1 #默认Topic分区数
  2. num.replica.fetchers = 1 #默认副本数

如果我想手动创建的话,我们可以通过NewTopic来手动创建如下:

  1. @Configuration
  2. public class KafkaConfig {
  3. @Bean
  4. public KafkaAdmin admin(KafkaProperties properties){
  5. KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties());
  6. // 默认False,在Broker不可用时,如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True
  7. admin.setFatalIfBrokerNotAvailable(true);
  8. // setAutoCreate(false) : 默认值为True,也就是Kafka实例化后会自动创建已经实例化的NewTopic对象
  9. // initialize():当setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象
  10. return admin;
  11. }
  12. /**
  13. * 创建指定参数的 topic
  14. * @return
  15. */
  16. @Bean
  17. public NewTopic topic() {
  18. return new NewTopic("hello2", 0, (short) 0);
  19. }
  20. }

创建之后我们需要更新他的话直接可以按照下面的方式

  1. /**
  2. * 更新 topic
  3. * @return
  4. */
  5. @Bean
  6. public NewTopic topicUpdate() {
  7. return new NewTopic("hello2", 1, (short) 1);
  8. }

注意这里的参数只能+不能-

这种方式太简单了,如果我想在代码逻辑中来创建呢?我们可以通过AdminClient来手动创建

  1. /**
  2. * AdminClient 创建
  3. */
  4. @Autowired
  5. private KafkaProperties properties;
  6. /**
  7. * 创建主题
  8. * @param topicName 主题名称
  9. * @return 创建的主题名称
  10. */
  11. @GetMapping("/create/{topicName}")
  12. public String createTopic(@PathVariable String topicName) {
  13. AdminClient client = AdminClient.create(properties.buildAdminProperties());
  14. // 判断AdminClient是否创建成功
  15. if (client != null) {
  16. try {
  17. Collection<NewTopic> newTopics = new ArrayList<>(1);
  18. newTopics.add(new NewTopic(topicName, 1, (short) 1));
  19. client.createTopics(newTopics);
  20. } catch (Throwable e) {
  21. e.printStackTrace();
  22. } finally {
  23. client.close();
  24. }
  25. }
  26. return topicName;
  27. }

观察下管理后台,发现topic都创建成功了

三. 获取消息发送的结果

有时候我们发送消息不知道是不是发成功了,需要有一个结果通知。有两种方式,一种是同步一种是异步

1. 同步获取结果

  1. /**
  2. * 获取通知结果
  3. * @return
  4. */
  5. @GetMapping("/hello2")
  6. public String hello2() {
  7. // 同步获取结果
  8. ListenableFuture<SendResult<Object,Object>> future = kafkaTemplate.send("hello2","hello2");
  9. try {
  10. SendResult<Object,Object> result = future.get();
  11. log.info("success >>> {}", result.getRecordMetadata().topic()); // success >>> hello2
  12. }catch (Throwable e){
  13. e.printStackTrace();
  14. }
  15. return "hello2";
  16. }

2. 异步获取

  1. /**
  2. * 获取通知结果
  3. * @return
  4. */
  5. @GetMapping("/hello2")
  6. public String hello2() {
  7. // 发送消息 - 异步获取通知结果
  8. kafkaTemplate.send("hello2", "async hello2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
  9. /**
  10. * 当异步消息发送失败时,会执行该方法
  11. * @param throwable 异常信息
  12. */
  13. @Override
  14. public void onFailure(Throwable throwable) {
  15. log.error("fail >>>>{}", throwable.getMessage());
  16. }
  17. /**
  18. * 当异步消息发送成功时,会执行该方法
  19. * @param objectObjectSendResult 发送结果
  20. */
  21. @Override
  22. public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
  23. log.info("async success >>> {}", objectObjectSendResult.getRecordMetadata().topic()); // async success >>> hello2
  24. }
  25. });
  26. return "hello2";
  27. }

四. Kafka事务

同样的,消息也会存在事务,如果第一条消息发送成功,再发第二条消息的时候出现异常,那么就会抛出异常并回滚第一条消息,下面通过一个简单的例子体会一下

  1. /**
  2. * 这是一个使用 Spring Cloud 的 @KafkaListener 注解接收消息的示例,
  3. * 定义了一个名为 hello3 的方法,该方法在执行过程中,会发送两条消息到指定的主题中,
  4. * 然后接收该主题的消息,并打印出来。
  5. */
  6. @GetMapping("/hello3")
  7. public String hello3() {
  8. kafkaTemplate.executeInTransaction(t -> {
  9. t.send("hello3", "msg1");
  10. if (true) {
  11. throw new RuntimeException("failed");
  12. }
  13. t.send("hello3", "msg2");
  14. return true;
  15. });
  16. return "hello3";
  17. }
  18. /**
  19. * 这是一个使用 Spring Cloud 的 @KafkaListener 注解接收消息的示例,
  20. * 定义了一个名为 listen3 的方法,该方法在执行过程中,会接收指定主题的消息,
  21. * 并打印出来。
  22. */
  23. @KafkaListener(id = "hello3Group", topics = "hello3")
  24. public void listen3(String msg) {
  25. log.info("hello3 receive value: {}", msg);
  26. }

默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。我们需要添加transaction-id-prefix来激活它

  1. spring:
  2. kafka:
  3. producer:
  4. bootstrap-servers: 127.0.0.1:9092
  5. transaction-id-prefix: kafka_.

启动之后,观察控制台的变化 ,除此之外,还可以使用注解的方式@Transactional来开启事务

  1. /**
  2. * 这是一个使用 Spring Cloud 的 @Transactional 注解进行事务控制的示例,
  3. * 定义了一个名为 hello4 的方法,该方法在执行过程中,会发送两条消息到指定的主题中,
  4. * 然后如果抛出了一个 RuntimeException,则会进行事务回滚,否则继续执行。
  5. */
  6. @Transactional(rollbackFor = RuntimeException.class)
  7. @GetMapping("/hello4")
  8. public String hello4() {
  9. kafkaTemplate.send("hello3", "msg1");
  10. if (true) {
  11. throw new RuntimeException("failed");
  12. }
  13. kafkaTemplate.send("hello3", "msg2");
  14. return "hello4";
  15. }

以上就是简单的使用kafka的集成,可以学习到对他的基本认识。

面试题:

1. Kafka是什么?它的主要特性是什么?

Kafka是一个分布式流处理平台,可以处理和存储大规模的流式数据。它具有以下主要特性:
- 可靠性:Kafka以分布式和冗余的方式存储数据,可以提供高可靠性的数据传输和存储。
- 可扩展性:Kafka可以水平扩展,适应各种数据规模和负载。
- 高性能:Kafka通过批量处理和零拷贝技术来实现高性能的数据传输。
- 消息顺序保证:Kafka会保证相同分区内的消息按照发送顺序进行处理。
- 多样的使用场景:Kafka可以用于日志收集、流式处理、消息队列、事件源等多个应用场景。

2. Kafka的消息传输机制是怎样的?

Kafka采用发布-订阅模式的消息传输机制。生产者将消息发送到一个或多个主题(topic)中,消费者通过订阅这些主题来接收消息。主题被分成多个分区(partition),每个分区中的消息有一个唯一的偏移量(offset),用于标识消息在分区中的位置。消费者可以以线性的方式读取和处理消息,每个消费者只能消费特定分区中的消息。

3. Kafka的数据持久化机制是怎样的?

Kafka使用日志(log)的方式来持久化数据。每个分区都有一个对应的日志文件,通过追加的方式将消息写入到日志文件中。消息在写入时会被持久化到磁盘,在内存中存在一定时间后,可以被清理。消费者读取消息时,可以根据偏移量来定位消息的位置。

4. Kafka如何保证数据的可靠性?

Kafka通过复制机制来保证数据的可靠性。每个分区可以有多个副本(replica),每个副本都保存了完整的分区数据。当一个副本宕机时,其他副本可以接替其工作,保证数据的连续性。同时,Kafka还可以配置多个副本之间的同步机制,以保证数据在不同副本之间的一致性。

5. Kafka的数据消费是如何实现的?

消费者通过订阅主题来接收消息。Kafka使用拉取模式(pull)来实现消费者的消息读取。消费者通过请求服务器获取分区中的消息,然后将这些消息进行处理。消费者可以手动控制消息的偏移量,以实现精确的消息读取和处理。

6. Kafka的容错机制是怎样的?

Kafka使用分布式的复制机制来实现容错。每个主题的分区可以有多个副本,当一个副本发生故障时,其他副本可以继续提供服务。Kafka还使用持久化的方式来存储消息,即使服务器发生故障,数据也不会丢失。此外,Kafka还具有监控和警报机制,可以及时发现和处理故障情况。

7. Kafka的并发控制是如何实现的?

Kafka使用分区来实现消息的并发处理。每个主题被分成多个分区,每个分区只能由一个消费者进行处理。这样可以保证相同分区内的消息按照发送顺序进行处理,同时能够实现消息的并发处理。

8. Kafka的优缺点是什么?

Kafka的优点包括高吞吐量、高可靠性、高扩展性、低延迟、灵活的数据模型和丰富的生态系统。它可以用于处理和存储大规模的流式数据,并支持多种使用场景。

Kafka的缺点包括相对复杂的部署和配置、对于小规模数据的应用可能过于庞大、消息一旦被消费则无法再次获取等。

9. Kafka的主要应用场景有哪些?

Kafka可以应用于日志收集、流式处理、消息队列、事件源等多个场景。在日志收集中,Kafka可以将日志数据进行实时处理和存储。在流式处理中,Kafka可以作为数据源和数据目的地,实现实时的数据处理和分析。在消息队列中,Kafka可以用于异步的消息传递和处理。在事件源中,Kafka可以作为事件的发布和订阅平台,实现实时的事件处理和分发。

10. Kafka与其他消息队列系统的比较有哪些?

Kafka与其他消息队列系统相比,主要有以下特点:
- 高吞吐量:Kafka通过批量处理和零拷贝技术实现了高吞吐量的数据传输和存储。
- 高可靠性:Kafka采用分布式复制机制和持久化存储,可以保证数据的高可靠性。
- 分布式和可扩展:Kafka可以水平扩展,适应各种数据规模和负载。
- 支持多种使用场景:Kafka可以应用于日志收集、流式处理、消息队列、事件源等多个场景。
- 较复杂的部署和配置:相对于其他消息队列系统,Kafka的部署和配置可能会较为复杂。

发表评论

表情:
评论列表 (有 0 条评论,153人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Springboot整合kafka基本使用

    同样的,需要我们搭建一个maven来一起看下完整的pom.xml配置也很简单然后新建一个启动类,看下控制台是否成功链接了Kafka,在启动之前别忘了开启Kafka集群。

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家