springboot整合kafka

r囧r小猫 2022-05-14 03:53 343阅读 0赞
  1. maven项目依赖包


    org.apache.kafka
    kafka-clients
    1.0.0


    org.apache.kafka
    kafka-streams
    1.0.0


    org.springframework.kafka
    spring-kafka
    1.3.0.RELEASE
  2. 生产者

配置文件producer.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6. <bean id="kafkaProducerProperites" class="java.util.HashMap">
  7. <constructor-arg>
  8. <map>
  9. <entry key="bootstrap.servers" value="localhost:9092" />
  10. <entry key="group.id" value="0"/>
  11. <entry key="retries" value="3"/>
  12. <entry key="batch.size" value="16384"/>
  13. <entry key="producer.type" value="async"/>
  14. <entry key="batch.num.messages" value="500"/>
  15. <entry key="linger.ms" value="1"/>
  16. <entry key="queue.buffering.max.ms" value="1000"/>
  17. <entry key="send.buffer.bytes" value="104857600"/>
  18. <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
  19. <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" />
  20. </map>
  21. </constructor-arg>
  22. </bean>
  23. <bean id="kafkaProducerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
  24. <constructor-arg ref="kafkaProducerProperites"/>
  25. </bean>
  26. <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
  27. <constructor-arg ref="kafkaProducerFactory" />
  28. <constructor-arg name="autoFlush" value="true"/>
  29. <property name="defaultTopic" value="defaultTopic"/>
  30. </bean>
  31. </beans>

KafkaService类(起名随意),需要注入kafkaTemplate类用来发送消息。

  1. import net.sf.json.JSONObject;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.kafka.core.KafkaTemplate;
  4. import org.springframework.stereotype.Service;
  5. import org.springframework.transaction.annotation.Transactional;
  6. @Service
  7. @Transactional
  8. public class KafkaService {
  9. @Autowired
  10. private KafkaTemplate kafkaTemplate;
  11. /**
  12. * 向kafka里传入数据
  13. * @param message 要传入的数据
  14. */
  15. public void sendMes(String message){
  16. //第一个参数是自己定义的topic的名字,第二个参数是你要发送的消息
  17. kafkaTemplate.send("TestTopic", message);
  18. }
  19. }

主程序类

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.context.ApplicationContext;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.support.ClassPathXmlApplicationContext;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. @SpringBootApplication
  8. public class Application {
  9. public static void main(String[] args) {
  10. SpringApplication.run(Application.class, args);
  11. }
  12. @Bean
  13. public KafkaTemplate getKafkaTemplate(){
  14. ApplicationContext ctx = new ClassPathXmlApplicationContext("classpath:producer.xml");
  15. return ctx.getBean("kafkaTemplate", KafkaTemplate.class);
  16. }
  17. }
  1. 消费者

配置文件consumer.xml

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xmlns:context="http://www.springframework.org/schema/context"
  5. xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
  6. <!-- 包路径扫描,为监听类所在的地址,本例中为KafkaConsumerListener.java -->
  7. <context:component-scan base-package="com.xxx.consumer" />
  8. <bean id="kafkaConsumerProperites" class="java.util.HashMap">
  9. <constructor-arg>
  10. <map>
  11. <entry key="bootstrap.servers" value="localhost:9092" />
  12. <entry key="group.id" value="0"/>
  13. <entry key="enable.auto.commit" value="true"/>
  14. <entry key="auto.commit.interval.ms" value="1000"/>
  15. <entry key="session.timeout.ms" value="30000"/>
  16. <entry key="num.consumer.fetchers" value="10"/>
  17. <entry key="socket.receive.buffer.bytes" value="67108864"/>
  18. <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
  19. <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer" />
  20. </map>
  21. </constructor-arg>
  22. </bean>
  23. <bean id="kafkaConsumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
  24. <constructor-arg ref="kafkaConsumerProperites"/>
  25. </bean>
  26. <!-- 可配置多个topic -->
  27. <bean id="consumerContainerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
  28. <constructor-arg name="topics">
  29. <list>
  30. <value>TestTopic</value>
  31. <value>TestTopic2</value>
  32. </list>
  33. </constructor-arg>
  34. <property name="messageListener" ref="kafkaConsumerListenerReqJson" />
  35. </bean>
  36. <bean id="conusmerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
  37. <constructor-arg ref="kafkaConsumerFactory"/>
  38. <constructor-arg ref="consumerContainerProperties"/>
  39. </bean>
  40. </beans>

监听类KafkaConsumerListener

需要配置多个topic时,只需在onMessage方法上的KafkaListener中的topics注解中加入其他的(topics = {“test1”,”test2”}),也可以再创建一个监听类监听另一个注解(确认在配置文件中包扫描路径下)。

  1. import org.apache.kafka.clients.consumer.ConsumerRecord;
  2. import org.springframework.kafka.annotation.KafkaListener;
  3. import org.springframework.kafka.listener.MessageListener;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * kafka的消费者监听类,topic为TestTopic
  7. */
  8. @Component("kafkaConsumerListener")
  9. public class KafkaConsumerListener implements MessageListener<String, String> {
  10. @Override
  11. @KafkaListener(topics = "TestTopic")
  12. public void onMessage(ConsumerRecord<String, String> record) {
  13. String topic = record.topic();
  14. String key = record.key();
  15. //拿到从kafka里传回来的数据
  16. String val = record.value();
  17. long offset = record.offset();
  18. int partition = record.partition();
  19. //System.out.println("------------------------------------------------------------");
  20. System.out.printf("receive msg -- topic:%s key:%s val:%s offset:%s partition:%s \r\n",topic,key,val,offset,partition);
  21. }
  22. }

主程序类

  1. import org.springframework.boot.SpringApplication;
  2. import org.springframework.boot.autoconfigure.SpringBootApplication;
  3. import org.springframework.boot.web.servlet.ServletComponentScan;
  4. import org.springframework.context.annotation.ImportResource;
  5. @ServletComponentScan
  6. @SpringBootApplication
  7. @ImportResource("classpath:consumer.xml")
  8. public class Application {
  9. public static void main(String[] args) {
  10. SpringApplication.run(Application.class, args);
  11. }
  12. }

发表评论

表情:
评论列表 (有 0 条评论,343人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家