SpringBoot整合kafka

比眉伴天荒 2022-01-15 01:07 466阅读 0赞

经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。

憋了三天的大招,今天放出来吧。今天大家用java代码连接kafka。

第一步:修改kafka的server.properties文件

file

  1. 命令: vi server.properties
  2. 修改内容:
  3. broker.id=0
  4. port=9092
  5. host.name=192.168.241.134
  6. log.dirs=/DATA/kafka/kafka_2.12-2.0.0/log
  7. zookeeper.connect=localhost:2181

第二步开发端口9092端口

1.可以直接关闭防火墙、

  1. centos6关闭防火墙命令以下:
  2. 1. 永久性生效
  3. 开启:chkconfig iptables on
  4. 关闭:chkconfig iptables off
  5. 2. 即时生效,重启后失效
  6. 开启:service iptables start
  7. 关闭:service iptables stop
  8. CentOS 7.0默认使用的是firewall作为防火墙,使用iptables必须重新设置一下
  9. 1、直接关闭防火墙
  10. systemctl stop firewalld.service #停止firewall
  11. systemctl disable firewalld.service #禁止firewall开机启动

2、修改iptables 文件

  1. centos 6修改方法:
  2. 执行命令:vi /etc/sysconfig/iptables
  3. 然后在文件中增加一行
  4. -A RH-Firewall-1-INPUT -m state state NEW -m tcp -p tcp dport 9092 -j ACCEPT

file

  1. centos7 设置 iptables service
  2. yum -y install iptables-services
  3. 如果要修改防火墙配置, 增加kafka端口9092
  4. vi /etc/sysconfig/iptables
  5. 增加规则
  6. -A INPUT -m state --state NEW -m tcp -p tcp --dport 9092 -j ACCEPT
  7. 保存退出后
  8. systemctl restart iptables.service #重启防火墙使配置生效
  9. systemctl enable iptables.service #设置防火墙开机启动

启动kafka

  1. 1、启动zookeeper
  2. 命令: sh $zookeeper_home/bin/zkServer.sh start
  3. 2、启动kafka
  4. 命令:在kafka目录下 输入./kafkaStart.sh

file

我们来写一下消息生成者 创建一个SpringBoot kafka_product Demo

目录结构如下:

file

  1. pom代码:
  2. <?xml version="1.0" encoding="UTF-8"?>
  3. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>cn.cnbuilder</groupId>
  7. <artifactId>kafka</artifactId>
  8. <version>0.0.1-SNAPSHOT</version>
  9. <name>kafka</name>
  10. <description>kafka Demo project for Spring Boot</description>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>1.5.8.RELEASE</version>
  15. <relativePath/> <!-- lookup parent from repository -->
  16. </parent>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-freemarker</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.kafka</groupId>
  29. <artifactId>spring-kafka</artifactId>
  30. <version>1.0.6.RELEASE</version>
  31. </dependency>
  32. <dependency>
  33. <groupId>org.springframework.boot</groupId>
  34. <artifactId>spring-boot-starter-web</artifactId>
  35. </dependency>
  36. <dependency>
  37. <groupId>org.springframework.boot</groupId>
  38. <artifactId>spring-boot-starter-test</artifactId>
  39. <scope>test</scope>
  40. </dependency>
  41. </dependencies>
  42. <build>
  43. <plugins>
  44. <plugin>
  45. <groupId>org.springframework.boot</groupId>
  46. <artifactId>spring-boot-maven-plugin</artifactId>
  47. </plugin>
  48. </plugins>
  49. </build>
  50. </project>

KafkaProducer代码如下:

  1. package cn.cnbuilder.kafka.producer;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.scheduling.annotation.EnableScheduling;
  5. import org.springframework.scheduling.annotation.Scheduled;
  6. import org.springframework.stereotype.Component;
  7. import org.springframework.util.concurrent.ListenableFuture;
  8. import java.util.UUID;
  9. /**
  10. * 生产者
  11. * 使用@EnableScheduling注解开启定时任务
  12. */
  13. @Component
  14. @EnableScheduling
  15. public class KafkaProducer {
  16. @Autowired
  17. private KafkaTemplate kafkaTemplate;
  18. /**
  19. * 定时任务1
  20. */
  21. @Scheduled(cron = "00/1 * * * * ?")
  22. public void send(){
  23. String message = UUID.randomUUID().toString();
  24. ListenableFuture future = kafkaTemplate.send("test", message);
  25. future.addCallback(o -> System.out.println("消息发送成功:" + message), throwable -> System.out.println("消息发送失败:" + message));
  26. }
  27. }

KafkaApplication代码:

  1. package cn.cnbuilder.kafka;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaApplication.class, args);
  8. }
  9. }

application-dev.yml代码:

  1. server:
  2. port: 8888
  3. spring:
  4. kafka:
  5. producer:
  6. bootstrap-servers: 192.168.241.134:9092 #服务器ip+端口

application.yml代码:

  1. spring:
  2. profiles:
  3. active: dev #选择要用那个配置文件

项目构建完成我们启动一下:

file

然后在服务器启动一下消费者

file

测试结果:

file

我们再来封装一下消费者(可以直接在生产者项目写消费者信息,但是为了给你们展示清楚,我就分成两个项目了。):

创建一个SpringBoot kafka_consumer Demo

目录结构:

file

代码如下:

pom文件代码:

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>cn.cnbuilder</groupId>
  6. <artifactId>consumer</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <name>kafka_consumer</name>
  9. <description>kafka_consumer Demo project for Spring Boot</description>
  10. <parent>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-parent</artifactId>
  13. <version>1.5.8.RELEASE</version>
  14. <relativePath/> <!-- lookup parent from repository -->
  15. </parent>
  16. <properties>
  17. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  18. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  19. <java.version>1.8</java.version>
  20. </properties>
  21. <dependencies>
  22. <dependency>
  23. <groupId>org.springframework.boot</groupId>
  24. <artifactId>spring-boot-starter-freemarker</artifactId>
  25. </dependency>
  26. <dependency>
  27. <groupId>org.springframework.kafka</groupId>
  28. <artifactId>spring-kafka</artifactId>
  29. <version>1.0.6.RELEASE</version>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter-web</artifactId>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.springframework.boot</groupId>
  37. <artifactId>spring-boot-starter-test</artifactId>
  38. <scope>test</scope>
  39. </dependency>
  40. </dependencies>
  41. <build>
  42. <plugins>
  43. <plugin>
  44. <groupId>org.springframework.boot</groupId>
  45. <artifactId>spring-boot-maven-plugin</artifactId>
  46. </plugin>
  47. </plugins>
  48. </build>
  49. </project>

KafkaConsumer代码:

  1. package cn.cnbuilder.consumer.consumer;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.stereotype.Component;
  4. /**
  5. * 消费者
  6. * 使用@KafkaListener注解,可以指定:主题,分区,消费组
  7. */
  8. @Component
  9. public class KafkaConsumer {
  10. @KafkaListener(topics = {"test"})
  11. public void receive(String message){
  12. System.out.println("test--消费消息:" + message);
  13. }
  14. }

KafkaConsumerApplication

  1. package cn.cnbuilder.consumer;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. @SpringBootApplication
  5. public class KafkaConsumerApplication {
  6. public static void main(String[] args) {
  7. SpringApplication.run(KafkaConsumerApplication.class, args);
  8. }
  9. }

application.yml

  1. server:
  2. port: 8082
  3. spring:
  4. kafka:
  5. consumer:
  6. group-id: test
  7. bootstrap-servers: 192.168.241.134:9092 # ip和端口

启动项目:

file

启动命令行生产者

file

测试结果:

file

我们现在用代码来生产消息和消费消息(启动两个项目用两个端口号哦!)

file

终、、


以上就是SpringBoot整合kafka,有什么问题可以联系我哈。

代码vip下载地址:https://download.csdn.net/download/weixin_39984161/11241620

平民请移步:https://blog.cnbuilder.cn/archives/SpringBootzhKafka 结尾百度云盘哦!

鼓励作者写出更好的技术文档,就请我喝一瓶哇哈哈哈哈哈哈哈。。你们的赞助决定我更新的速度哦!

微信:

E5_BE_AE_E4_BF_A1_E5_9B_BE_E7_89_87_20180718143757_meitu_220180718150018666.jpg

支付宝:

E5_BE_AE_E4_BF_A1_E6_88_AA_E5_9B_BE_2018071814381620180718143910635.png


  1. 感谢一路支持我的人。。。。。
  2. Love me and hold me
  3. QQ:6967380416年老号)
  4. EMAIL:itw@tom.com
  5. 友链交换
  6. 如果有兴趣和本博客交换友链的话,请按照下面的格式在评论区进行评论,我会尽快添加上你的链接。

  1. 网站名称:KingYiFanS Blog
  2. 网站地址:http://blog.cnbuilder.cn
  3. 网站描述:年少是你未醒的梦话,风华是燃烬的彼岸花。
  4. 网站Logo/头像: [头像地址](https://blog.cnbuilder.cn/upload/2018/7/avatar20180720144536200.jpg)

发表评论

表情:
评论列表 (有 0 条评论,466人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家