SpringBoot整合Kafka
前言
kafka 也作为消息中间件的一员,与其他消息中间件相比,它的优点在于拥有极高的吞吐量,ms 级的延迟,是一个高性能,分布式的系统。
源码
GitHub地址:https://github.com/intomylife/SpringBoot
环境
- JDK 1.8.0 +
- Maven 3.0 +
- SpringBoot 2.0.3
- ZooKeeper-3.4.5
- kafka_2.12-2.2.0
开发工具
- IntelliJ IDEA
正文
commons 工程 - POM 文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<!-- 三坐标 -->
<groupId>com.zwc</groupId>
<artifactId>springboot-kafka-commons</artifactId>
<version>0.0.1-SNAPSHOT</version>
<!-- 工程名称和描述 -->
<name>springboot-kafka-commons</name>
<description>公用工程</description>
<!-- 打包方式 -->
<packaging>jar</packaging>
<!-- 在properties下声明相应的版本信息,然后在dependency下引用的时候用${spring-version}就可以引入该版本jar包了 -->
<properties>
<!-- 编码 -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- jdk -->
<java.version>1.8</java.version>
<!-- springboot -->
<platform-bom.version>Cairo-SR3</platform-bom.version>
<!-- ali json -->
<fastjson.version>1.2.47</fastjson.version>
<jackson.mapper.asl.version>1.9.9</jackson.mapper.asl.version>
</properties>
<!-- 加入依赖 -->
<dependencies>
<!-- ali json依赖 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>${jackson.mapper.asl.version}</version>
</dependency>
<!-- kafka 依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
<!-- 依赖 jar 包版本管理的管理器 -->
<!-- 如果 dependencies 里的 dependency 自己没有声明 version 元素,那么 maven 就此处来找版本声明。 -->
<!-- 如果有,就会继承它;如果没有就会报错,告诉你没有版本信息 -->
<!-- 优先级:如果 dependencies 里的 dependency 已经声明了版本信息,就不会生效此处的版本信息了 -->
<dependencyManagement>
<dependencies>
<!-- SpringBoot -->
<dependency>
<groupId>io.spring.platform</groupId>
<artifactId>platform-bom</artifactId>
<version>${platform-bom.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<!-- 插件依赖 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
- 配置一些共用依赖,其中包括 spring-kafka 依赖来整合 Kafka
commons 工程 - system.properties
# kafka 配置
## kafka 服务地址
spring.kafka.bootstrap-servers=127.0.0.1:9092
## producer 提供者
### 如果该值大于零时,表示启用重试失败的发送次数
spring.kafka.producer.retries=0
### 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
spring.kafka.producer.buffer-memory=33554432
### 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## consumer 消费者
### 指定默认消费者group id
spring.kafka.consumer.group-id=springboot-consumer-group
### 当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时该怎么办,默认值为latest,表示自动将偏移重置为最新的偏移量,可选的值为latest, earliest, none
spring.kafka.consumer.auto-offset-reset=earliest
### 如果为true,则消费者的偏移量将在后台定期提交,默认值为true
spring.kafka.consumer.enable-auto-commit=false
### 如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000
spring.kafka.consumer.auto-commit-interval=100
### 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
- 一些共用配置,不经常修改的,或者是可以统一修改的
- 比如还可以配置 OSS 的配置信息,Redis 的配置信息,MongoDB 的配置信息等等..
commons 工程 - 项目结构
service 工程
service 工程是一个父工程,里面包含 基础模块,用户模块,每个模块中又会分为 core 和 api
此工程中 base-service 作为 Provider(提供者),user-service 作为 Consumer(消费者)
Provider(提供者)
service 工程 - base-service - base-service-core
package com.zwc.base.kafka;
import com.zwc.core.constants.KafkaConstants;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
/**
* @ClassName kafkaProducer
* @Desc TODO Kafka 提供者
* @Date 2019/4/20 19:41
* @Version 1.0
*/
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
/*
* @ClassName kafkaProducer
* @Desc TODO 发送订阅者消息
* @Date 2019/4/20 19:46
* @Version 1.0
*/
public void sendTopicMessage(){
kafkaTemplate.send(KafkaConstants.KAFKA_TOPIC_NAME,"EN: From springboot-kafka! I'm topic. CN: springboot 整合 kafka 发送订阅者消息。");
}
}
- 直接使用 @Autowired 注解把 KafkaTemplate 装配进来
- 使用 kafkaTemplate 对象的 send() 方法发送消息
- 记得使用 @Service 注解把此 Bean 交给 spring 管理
service 工程 - base-service - 启动项目
注:项目启动前需要启动 zkServer 和 kafka(在文末处会单独说一下 windows 环境下安装启动 kafka 主要注意的
事项)
- 端口:8081(具体可以根据自己的喜好,在 application.properties 配置文件中配置 server.port)
- 发送消息接口:http://localhost:8081/kafka/send(调用成功会在页面显示 success)
Consumer(消费者)
service 工程 - user-service - user-service-core
消费者一号
package com.zwc.user.kafka;
import com.zwc.core.constants.KafkaConstants;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
/**
* @ClassName KafkaConsumer
* @Desc TODO Kafka 消费者 - 一号
* @Date 2019/4/20 19:47
* @Version 1.0
*/
@Service
public class KafkaConsumerOne {
/*
* @ClassName KafkaConsumer
* @Desc TODO 接收订阅者消息
* @Date 2019/4/20 19:50
* @Version 1.0
*/
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC_NAME , groupId = KafkaConstants.KAFKA_GROUP_ID_ONE)
public void receiveTopicMessage(String message){
System.out.println("KafkaConsumerOne ---> receiveTopicMessage:接收订阅者模式发送的消息,内容为:" + message);
}
}
- 使用 @KafkaListener 注解开始监听消息任务
- topics 参数指定消息主题
- groupId 参数指定组
记得使用 @Service 注解把此 Bean 交给 spring 管理
消费者二号
package com.zwc.user.kafka;
import com.zwc.core.constants.KafkaConstants;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;/**
- @ClassName KafkaConsumer
- @Desc TODO Kafka 消费者 - 二号
- @Date 2019/4/20 19:47
@Version 1.0
*/
@Service
public class KafkaConsumerTwo {/*
- @ClassName KafkaConsumer
- @Desc TODO 接收订阅者消息
- @Date 2019/4/20 19:50
- @Version 1.0
*/
@KafkaListener(topics = KafkaConstants.KAFKA_TOPIC_NAME , groupId = KafkaConstants.KAFKA_GROUP_ID_ONE)
public void receiveTopicMessage(String message){
System.out.println(“KafkaConsumerTwo —-> receiveTopicMessage:接收订阅者模式发送的消息,内容为:” + message);
}
}
除了输出消息的类名称不一样以外其他代码与消费者一号基本一样
service 工程 - user-service - 启动项目
注:项目启动前需要启动 zkServer 和 kafka(在文末处会单独说一下 windows 环境下安装启动 kafka 主要注意的
事项)
端口:8082(具体可以根据自己的喜好,在 application.properties 配置文件中配置 server.port)
启动后观察 idea 的控制台,会有一条字符串打印出来
一共有两个消费者,为何只有一条打印的信息?
因为在同一个组中的 Consumer,同一个主题只会被一个 Consumer 接收。类似于列队模式。
停掉 user-service 项目,把消费者二号的 groupId 修改为 KafkaConstants.KAFKA_GROUP_ID_TWO
再次启动 user-service 项目,观察 idea 的控制台,会有一条字符串打印出来
消费者二号改变了组,没有改变消息主题,因此读取到了此主题的历史消息。
再次调用 base-service 项目的发送消息接口:http://localhost:8081/kafka/send(调用成功会在页面显示
success)
观察 idea 的控制台,会有两条字符串打印出来
两个消费者不处于一个组,可以同时接收到相同主题的消息。类似于发布者订阅者模式。
service 工程 - 项目结构
- 在 service 总工程中创建了 base-service (基础模块)和 user-service(用户模块)
- 每一个模块中都包含 api 和 core
把多工程项目使用 IntelliJ IDEA 打开
- 把项目从 GitHub 中下载到你的本地
- 打开 IntelliJ IDEA
- 点击 File -> Open
- 打开你下载到本地的项目目录
- springboot-kafka -> springboot-kafka-service(选择打开此工程)
- 打开 service 工程后
- 再次点击 File -> Project Structrue
- 选择 Modules,点击 ‘+’ 符号
- 点击 Import Module
- 还是打开你下载到本地的项目目录
- springboot-kafka -> springboot-kafka-commons -> pom.xml
- 点击 OK
- 点击 Next,Finish
- 点击 Apply,OK
扩展
windows 本地安装和启动 kafka
就在 windows 本地启动 kafka 来回折腾了我一个小时左右。目前 Kafka 已经更新到了 2.2.0 版本,2.2.0 和 2.1.1
版本有镜像,下载速度还是很快的。
1. 下载
a) Kafka 官网下载地址:http://kafka.apache.org/downloads
b) 看描述说最新的版本为 2.2.0,目前的稳定版本也是2.2.0,就直接下载它了
c) 下载后,准备使用 kafka-server-start.bat ..\..\config\server.properties 指令启动 Kafka,却一直报错
d) 后来发现下载错了压缩包,我点击下载的是
e) 正确的下载入口是
f) 点击 kafka_2.12-2.2.0.tgz 后跳转到一个新的页面,点击最上面的建议下载地址就可以了
注:路径中不要有空格!!!
2. 启动
a) 前提:jdk 版本最低为 1.8.0,并且要配置到环境变量中;必须先启动 zkServer
b) 有的版本需要如下改动,打开 kafka_2.12-2.2.0\bin\windows 文件夹,使用文本编辑器打开
kafka-run-class.bat,把 179 行中 %CLASSPATH% 使用双引号包起来
![20190421160611733.png][]
c) 假如把 kafka_2.12-2.2.0 文件解压到了 D 盘根目录
① 在 kafka\_2.12-2.2.0 文件夹里新建 kafka-logs 文件夹
② 打开 kafka\_2.12-2.2.0\\config 文件夹,
③ 使用文本编辑器打开 server.properties,把 60 行中日志文件路径修改一如下:
**D:\\\\kafka\_2.12-2.2.0\\\\kafka-logs**
![20190625101431286.png][]
d) 打开 cmd, 进入到 kafka_2.12-2.2.0 文件夹下 bin\windows 目录
使用指令 **kafka-server-start.bat ..\\..\\config\\server.properties** 启动 kafka
使用指定 **kafka-topics.bat --list --zookeeper localhost:2181** 查看消息列表
结语
到此 SpringBoot 整合 Kafka 就结束了,注意几个关键的点,多多尝试,一定会成功的!
希望能够帮助到你
over
还没有评论,来说两句吧...