spring整合kafka

桃扇骨 2022-10-12 01:02 260阅读 0赞

1、导入pom依赖,kafka的客户端版本要与kafka的服务端版本一致

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>2.5.12.RELEASE</version>
  5. </dependency>

2、注入KafkaTemplate模板

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaConfig {
  4. private final static String CONSUMER_GROUP_ID="yd-group";
  5. public final static String TOPIC_NAME="yd-kf-topic";
  6. @Bean
  7. public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
  8. ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
  9. factory.setConsumerFactory(consumerFactory());
  10. return factory;
  11. }
  12. /**
  13. * 消费工厂
  14. * @return
  15. */
  16. @Bean
  17. public ConsumerFactory<String, String> consumerFactory() {
  18. Map<String, Object> props = new HashMap<>(8);
  19. props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
  20. props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
  21. props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
  22. props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  23. props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
  24. props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  25. props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  26. return new DefaultKafkaConsumerFactory<>(props);
  27. }
  28. /**
  29. * 生产工厂
  30. * @return
  31. */
  32. @Bean
  33. public ProducerFactory<String, String> producerFactory() {
  34. Map<String, Object> props = new HashMap<>(8);
  35. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "PLAINTEXT://192.168.81.200:9092");
  36. props.put(ProducerConfig.ACKS_CONFIG, "all");
  37. props.put(ProducerConfig.RETRIES_CONFIG, 0);
  38. props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
  39. props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  40. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
  41. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  42. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  43. return new DefaultKafkaProducerFactory<>(props);
  44. }
  45. /**
  46. * kafka模板
  47. * @return
  48. */
  49. @Bean("kafkaTemplate")
  50. public KafkaTemplate<String, String> kafkaTemplate() {
  51. return new KafkaTemplate<>(producerFactory());
  52. }
  53. }

3、消息生产、消息消费监听

消息发送

  1. @Slf4j
  2. @Service
  3. public class KafkaProducer {
  4. @Autowired
  5. private KafkaTemplate<String, String> kafkaTemplate;
  6. public String sendSyncMessage(String key,String msg){
  7. String s;
  8. try {
  9. ListenableFuture<SendResult<String, String>> tagA = kafkaTemplate.send(KafkaConfig.TOPIC_NAME, key, msg);
  10. s = tagA.get().toString();
  11. log.info("生产kafka消息 {}",s);
  12. return s;
  13. } catch (InterruptedException|ExecutionException e) {
  14. e.printStackTrace();
  15. s=e.getMessage();
  16. log.error("sendSyncMessage-->发送消息异常{}",e.getMessage());
  17. }
  18. return s;
  19. }
  20. }

监听消息消费

  1. @Slf4j
  2. @Component
  3. public class CustomKafkaListener /**implements MessageListener<String,String>*/ {
  4. @KafkaListener(topics = {KafkaConfig.TOPIC_NAME},id = KafkaConfig.TOPIC_NAME)
  5. public void onMessage1(String msg){
  6. log.info("onMessage1消费kafka消息 {} ",msg);
  7. }
  8. }

4、测试发送

  1. @RestController
  2. public class KafkaSendController {
  3. @Autowired
  4. private KafkaProducer kafkaProducer;
  5. @GetMapping("/kafka/sendMsg")
  6. public String sendMsg(String key,String msg){
  7. return kafkaProducer.sendSyncMessage(key,msg);
  8. }
  9. }

发表评论

表情:
评论列表 (有 0 条评论,260人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Spring Boot整合Kafka

    Kafka是一个分布式的、可分区的、可复制的消息系统,下面是Kafka的几个基本术语: 1. Kafka将消息以topic为单位进行归纳; 2. 将向Kafka topi