kafka集群搭建及结合springboot使用

Myth丶恋晨 2023-07-15 13:19 16阅读 0赞

1.场景描述

因kafka以前用的不多,只往topic中写入和读取过数据,这次刚好又要用到,记录下kafka集群搭建及结合springboot使用。

2. 解决方案

2.1 简单介绍

(一)关于kafka,网上的介绍有很多,简单说就是消息中间件,大数据项目中经常使用,我们项目是用于接收日志流水数据。

(二)关于消息中间件,主要有四个:

(1)ActiveMQ:历史悠久,以前项目中使用多,现在更新慢,性能相对不高。
(2)RabbitMQ:可靠性高、安全,模式比较多,java使用比较多,每秒十万级别
(3)Kafka:分布式、高性能、跨语言,性能超高,每秒百万级别,模式简单。
(4)RocketMQ:阿里开源的消息中间件,纯Java实现,有商业版,收费,导致推广一般。

(三)kafka与其他三个相比,优势在于:

(1)性能高,每秒百万级别;

(2)分布式,高可用,水平扩展。

(四) kafka官网图

在这里插入图片描述

有中文官网,可以详细看看。

地址:http://kafka.apachecn.org/intro.html

在这里插入图片描述

2.2 软件下载

2.2.1 kakfa下载

地址:http://kafka.apache.org/downloads,下载最新的2.4.1。

在这里插入图片描述

2.2.2 zookeeper下载

(1)因为kafka要依赖于zookeeper做调度,kafka中实际自带的有kafka,但是一般建议使用独立的zookeeper,方便后续升级及公用。

(2)下载地址:

http://zookeeper.apache.org/,最新的是3.6.0,不过发布不久,建议先跟kafka内置zookeeper保持一致,使用3.5.7版本

在这里插入图片描述

2.2.3 下载说明

文件都不大,zk是9m多,kafka是50多兆

在这里插入图片描述

2.3 kafka单机部署及集群部署

软件老王本地弄了三台虚拟机,ip分别为:

  1. 192.168.85.158
  2. 192.168.85.168
  3. 192.168.85.178
2.3.1 单机部署

(1)上传jar包,就不再新建用户了,直接在root账户下执行,将kafka和zookeeper的tar包上传到/root/tools目录下。

(2)解压

  1. [root@ruanjianlaowang158 tools]# tar -zxvf kafka_2.12-2.4.1.tgz
  2. [root@ruanjianlaowang158 tools]# tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz

(3)配置zookeeper及启动

  1. [root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# cd /root/tools/apache-zookeeper-3.5.7-bin
  2. #软件老王,首先创建个空文件夹,在接下来的配置文件中配置
  3. [root@ruanjianlaowang158 apache-zookeeper-3.5.7-bin]# mkdir data
  4. [root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
  5. [root@ruanjianlaowang158 conf]# cp zoo_sample.cfg zoo.cfg
  6. [root@ruanjianlaowang158 conf]# vi zoo.cfg
  7. #单机只改一个值,保存退出。
  8. #dataDir=/tmp/zookeeper
  9. dataDir=/root/tools/apache-zookeeper-3.5.7-bin/data
  10. #启动zookeeper
  11. [root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
  12. [root@ruanjianlaowang158 bin]# ./zkServer.sh start

(4)配置kafka及启动

  1. [root@ruanjianlaowang158 kafka_2.12-2.4.1]# cd /root/tools/kafka_2.12-2.4.1
  2. #软件老王,新建个空文件夹
  3. [root@ruanjianlaowang158 kafka_2.12-2.4.1]# mkdir data
  4. #软件老王,更改配置文件
  5. [root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
  6. [root@ruanjianlaowang158 config]# vi server.properties
  7. #需要改3个值
  8. #log.dirs=/tmp/kafka-logs
  9. log.dirs=/root/tools/kafka_2.12-2.4.1/data
  10. #listeners = PLAINTEXT://your.host.name:9092
  11. listeners=PLAINTEXT://192.168.85.158:9092
  12. #zookeeper.connect=localhost:2181
  13. zookeeper.connect=192.168.85.158:2181
  14. #启动kafka
  15. [root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
  16. [root@ruanjianlaowang158 bin]# ./kafka-server-start.sh ../config/server.properties &

启动完毕,单机验证就不验证了,直接在集群中进行验证。

2.3.2 集群部署

(1)集群方式,首先把上面的单机模式,再在192.168.85.168和192.168.85.178服务器上先解压配置一遍。

(2)zookeeper是还是更改zoo.cfg

158,168,178三台服务器一样:

  1. [root@ruanjianlaowang158 conf]# cd /root/tools/apache-zookeeper-3.5.7-bin/conf
  2. [root@ruanjianlaowang158 conf]# vi zoo.cfg
  3. #其他不变,最后面新加,三行,三台服务器配置一样,软件老王
  4. server.1=192.168.85.158:2888:3888
  5. server.2=192.168.85.168:2888:3888
  6. server.3=192.168.85.178:2888:3888
  7. 158服务器执行:
  8. echo "1" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
  9. 168服务器执行:
  10. echo "2" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid
  11. 178服务器执行:
  12. echo "3" > /root/tools/apache-zookeeper-3.5.7-bin/data/myid

(3)kafka集群配置

  1. [root@ruanjianlaowang158 config]# cd /root/tools/kafka_2.12-2.4.1/config
  2. [root@ruanjianlaowang158 config]# vi server.properties
  3. #broker.id 三台服务器不一样,158服务器设置为1,168服务器设置为2,178服务器设置为3
  4. broker.id=1
  5. #三个服务器配置一样
  6. zookeeper.connect=192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181

Kafka常用Broker配置说明:





























































配置项 默认值/示例值 说明
broker.id 0 Broker唯一标识
listeners PLAINTEXT://192.168.85.158:9092 监听信息,PLAINTEXT表示明文传输
log.dirs /root/tools/apache-zookeeper-3.5.7-bin/data kafka数据存放地址,可以填写多个。用”,”间隔
message.max.bytes message.max.bytes 单个消息长度限制,单位是字节
num.partitions 1 默认分区数
log.flush.interval.messages Long.MaxValue 在数据被写入到硬盘和消费者可用前最大累积的消息的数量
log.flush.interval.ms Long.MaxValue 在数据被写入到硬盘前的最大时间
log.flush.scheduler.interval.ms Long.MaxValue 检查数据是否要写入到硬盘的时间间隔。
log.retention.hours 24 控制一个log保留时间,单位:小时
zookeeper.connect 192.168.85.158:2181,
192.168.85.168:2181,
192.168.85.178:2181
ZooKeeper服务器地址,多台用”,”间隔

(4)集群启动

启动方式跟单机一样:

  1. #启动zookeeper
  2. [root@ruanjianlaowang158 bin]# cd /root/tools/apache-zookeeper-3.5.7-bin/bin
  3. [root@ruanjianlaowang158 bin]# ./zkServer.sh start
  4. #启动kafka
  5. [root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
  6. [root@ruanjianlaowang158 bin]# ./kafka-server-start.sh ../config/server.properties &

(5)注意点

  1. 集群启动的时候,单机那台服务器(158)可能会报:KafkaConfigured broker.id 2 doesn't match stored broker.id 0 in meta.properties.
  2. 方案:在158服务器data中有个文件:meta.properties,文件中的broker.id也需要修改成与server.properties中的broker.id一样,所以造成了这个问题。

(6)创建个topic,后面springboot项目测试使用。

  1. [root@ruanjianlaowang158 bin]# cd /root/tools/kafka_2.12-2.4.1/bin
  2. [root@ruanjianlaowang158 bin]# ./kafka-topics.sh --create --zookeeper 192.168.85.158:2181,192.168.85.168:2181,192.168.85.178:2181 --replication-factor 3 --partitions 5 --topic aaaa

2.4 结合springboot项目

2.4.1 pom文件
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.2.0.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>com.itany</groupId>
  12. <artifactId>kafka</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>kafka</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-web</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.kafka</groupId>
  26. <artifactId>spring-kafka</artifactId>
  27. </dependency>
  28. </dependencies>
  29. <build>
  30. <plugins>
  31. <plugin>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-maven-plugin</artifactId>
  34. </plugin>
  35. </plugins>
  36. </build>
  37. </project>

说明:

主要就两个gav,一个是spring-boot-starter-web,启动web服务使用;一个是spring-kafka,这个是springboot集成额kafka核心包。

2.4.2 application.yml
  1. spring:
  2. kafka:
  3. # 软件老王,kafka集群服务器地址
  4. bootstrap-servers: 192.168.85.158:9092,192.168.85.168:9092,192.168.85.178:9092
  5. producer:
  6. key-serializer: org.apache.kafka.common.serialization.StringSerializer
  7. value-serializer: org.apache.kafka.common.serialization.StringSerializer
  8. consumer:
  9. group-id: test
  10. key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  11. value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
2.4.3 producer(消息生产者)
  1. @RestController
  2. public class KafkaProducer {
  3. @Autowired
  4. private KafkaTemplate template;
  5. //软件老王,topic使用上测试创建的aaaa
  6. @RequestMapping("/sendMsg")
  7. public String sendMsg(String topic, String message){
  8. template.send(topic,message);
  9. return "success";
  10. }
  11. }
2.3.4 consumer(消费者)
  1. @Component
  2. public class KafkaConsumer {
  3. //软件老王,这里是监控aaaa这个topic,直接打印到idea中,软件老王
  4. @KafkaListener(topics = {"aaaa"})
  5. public void listen(ConsumerRecord record){
  6. System.out.println(record.topic()+":"+record.value());
  7. }
  8. }
2.4.5 验证结果

(1)浏览器上输入

  1. http://localhost:8080/sendMsg?topic=aaaa&message=bbbb

(2)软件老王的idea控制台打印信息

在这里插入图片描述


更多知识请关注公众号:「软件老王」,IT技术与相关干货分享,回复关键字获取对应干货,java,送必看的10本“武功秘籍”;图片,送100多万张可商用高清图片;面试,送刚毕业就能月薪“20k”的java面试题,后续不断更新中,比如“软考”、“工具”等,已经在整理中。

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,16人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Kafka

    Kafka使用背景 在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样一些问题: 我想分析一下用户行为(pageviews),以便我能设计出更

    相关 Kafka

    Kafka初识 1、Kafka使用背景 在我们大量使用分布式数据库、分布式计算集群的时候,是否会遇到这样的一些问题: 1. 我们想分析下用户行为(pageviews

    相关 Kafka

    前言:kafka作为一个消息中间件,由linkedin使用scala编写,用作LinkedIn的活动流,和运营数据处理管道的基础,其特点在于具有高水平扩展也就是动态扩容和高吞吐

    相关 Kafka

    一,环境准备           \ Zookeeper单点/集群服务(演示单点使用单点, 演示集群使用集群)           \ kafka安装包        

    相关 Kafka

    Kafka集群搭建 1、软件环境 1、linux一台或多台,大于等于2 2、已经搭建好的zookeeper集群(参考我上一篇zk集群搭建:[https://blo