spring cloud(boot)整合kafka

心已赠人 2023-02-16 09:27 129阅读 0赞

引入依赖

  1. <!--kafka支持-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. </dependency>
  6. <!--kafka支持-->

配置yml

  1. spring:
  2. kafka:
  3. bootstrap-servers: 192.168.1.63:9092 #kafka地址
  4. template:
  5. default-topic: mykafka #topic 名
  6. listener:
  7. concurrency: 5 #指定listener 容器中的线程数,用于提高并发量
  8. consumer:
  9. group-id: myGroup # 指定默认消费者group id
  10. client-id: 200
  11. max-poll-records: 200
  12. auto-offset-reset: earliest # 最早未被消费的offset
  13. producer:
  14. batch-size: 1000 # 每次批量发送消息的数量
  15. retries: 3
  16. client-id: 200

构建生产者

  1. import org.apache.kafka.clients.producer.RecordMetadata;
  2. import org.springframework.kafka.support.SendResult;
  3. import org.springframework.stereotype.Service;
  4. import org.springframework.util.concurrent.ListenableFuture;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.kafka.core.KafkaTemplate;
  7. import org.springframework.stereotype.Component;
  8. import java.util.concurrent.ExecutionException;
  9. /** * spring 2020/6/9 */
  10. @Service
  11. @Component
  12. public class Producer {
  13. @Autowired
  14. private KafkaTemplate<String, String> kafkaTemplate;
  15. /** * 发送消息到kafka */
  16. public RecordMetadata sendChannelMess(String topic, String message) {
  17. ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
  18. RecordMetadata recordMetadata = null;
  19. try {
  20. recordMetadata = future.get().getRecordMetadata();
  21. } catch (InterruptedException | ExecutionException e) {
  22. e.printStackTrace();
  23. System.out.println("发送失败");
  24. }
  25. System.out.println("发送成功");
  26. System.out.println("partition:" + recordMetadata.partition());
  27. System.out.println("offset:" + recordMetadata.offset());
  28. System.out.println("topic:" + recordMetadata.topic());
  29. return recordMetadata;
  30. }
  31. }

构建消费者

  1. import org.slf4j.Logger;
  2. import org.slf4j.LoggerFactory;
  3. import org.springframework.kafka.annotation.KafkaListener;
  4. import org.springframework.stereotype.Component;
  5. import org.springframework.stereotype.Service;
  6. /** * spring 2020/6/9 */
  7. @Service
  8. @Component
  9. public class Consumer {
  10. private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
  11. /** * 有消息就读取,只读取消息value */
  12. @KafkaListener(topics = { "mykafka"})
  13. public void receiveMessage(String message){
  14. logger.error("***********************************************************");
  15. logger.error(message);
  16. logger.error("***********************************************************");
  17. }
  18. }

发表评论

表情:
评论列表 (有 0 条评论,129人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi