SpringBoot——》@KafkaListener 冷不防 2024-03-31 15:21 56阅读 0赞 > 推荐链接: > [总结——》【Java】][Java] > [总结——》【Mysql】][Mysql] > [总结——》【Redis】][Redis] > [总结——》【Spring】][Spring] > [总结——》【SpringBoot】][SpringBoot] > [总结——》【MyBatis、MyBatis-Plus】][MyBatis_MyBatis-Plus] #### SpringBoot——》@KafkaListener #### * 一、监听器id * * 1、在相同容器中,监听器id不能重复 * 2、使用默认配置的消费组 * 3、使用自定义的消费组 * 二、监听器工厂 * * 1、定义kafkaListenerContainerFactory * 2、配置containerFactory参数 * 三、监听器topics * * 1、固定监听topics * 2、动态监听topics * 四、监听器topics匹配正则表达式 * 五、监听器分区 * 六、异常处理器 * * 1、实现KafkaListenerErrorHandler * 2、配置errorHandler参数 * 七、分组id * 八、是否使用id作为groupId <table> <thead> <tr> <th align="left">方法</th> <th align="left">功能</th> </tr> </thead> <tbody> <tr> <td align="left">String id() default “”;</td> <td align="left">监听器id</td> </tr> <tr> <td align="left">String containerFactory() default “”;</td> <td align="left">监听器工厂</td> </tr> <tr> <td align="left">String[] topics() default {};</td> <td align="left">监听器topics</td> </tr> <tr> <td align="left">String topicPattern() default “”;</td> <td align="left">监听器topics匹配正则表达式</td> </tr> <tr> <td align="left">TopicPartition[] topicPartitions() default {};</td> <td align="left">监听器分区</td> </tr> <tr> <td align="left">String errorHandler() default “”;</td> <td align="left">异常处理器</td> </tr> <tr> <td align="left">String groupId() default “”;</td> <td align="left">分组id</td> </tr> <tr> <td align="left">boolean idIsGroup() default true;</td> <td align="left">是否使用id作为groupId</td> </tr> </tbody> </table> ## 一、监听器id ## @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "test_topic1") ### 1、在相同容器中,监听器id不能重复 ### 如果ID重复,会报错`Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id` ### 2、使用默认配置的消费组 ### `kafka.consumer.group-id = xxxxx` // 消费组为xxxxx @KafkaListener(id = "listenerForSyncEsfCommunity",idIsGroup = false) ### 3、使用自定义的消费组 ### // 方式一:消费组为listenerForSyncEsfCommunity @KafkaListener(id = "listenerForSyncEsfCommunity") // 方式二:消费组为groupId-test @KafkaListener(id = "listenerForSyncEsfCommunity",idIsGroup = false,groupId = "groupId-test") ## 二、监听器工厂 ## ### 1、定义kafkaListenerContainerFactory ### @Bean("kafkaListenerContainerFactory") KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); // consumerGroupId为空时,会用默认的groupId factory.setConsumerFactory(consumerFactory("g1")); factory.setConcurrency(4); // 设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG factory.setBatchListener(true); factory.getContainerProperties().setPollTimeout(3000); return factory; } ### 2、配置containerFactory参数 ### @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "test_topic1", containerFactory = "kafkaListenerContainerFactory") ## 三、监听器topics ## ### 1、固定监听topics ### // 指定多个topic @KafkaListener(id = "listenerForSyncEsfCommunity", topics = { "test_topic1","test_topic2"}) ### 2、动态监听topics ### 自定义配置:`kafka.consumer.topics=topic1,topic2` // Spring的SpEl表达式 @KafkaListener(topics = "#{'${kafka.consumer.topics}'.split(',')}") ## 四、监听器topics匹配正则表达式 ## @KafkaListener(id = "listenerForSyncEsfCommunity", topicPattern = "test_.*topic.*") ## 五、监听器分区 ## @KafkaListener(id = "listenerForSyncEsfCommunity", topicPartitions = { @TopicPartition(topic = "topic1", partitions = { "0", "1" }), @TopicPartition(topic = "topic2", partitions = "0") }) ## 六、异常处理器 ## 异常处理有2种方式: * 方式一:consumer中手动try/catch * 方式二:实现KafkaListenerErrorHandler,重写异常处理逻辑 ### 1、实现KafkaListenerErrorHandler ### @Component("kafkaErrorHandler") public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler { @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { return null; } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { //TODO return null; } } ### 2、配置errorHandler参数 ### // 调用的时候errorHandler的值填写beanName @KafkaListener(id = "listenerForSyncEsfCommunity", topics = "topic1",errorHandler = "kafkaErrorHandler") ## 七、分组id ## 参考监听器id ## 八、是否使用id作为groupId ## 参考监听器id [Java]: https://blog.csdn.net/weixin_43453386/article/details/84788317 [Mysql]: https://blog.csdn.net/weixin_43453386/article/details/88667709 [Redis]: https://blog.csdn.net/weixin_43453386/article/details/127966762 [Spring]: https://blog.csdn.net/weixin_43453386/article/details/124900806 [SpringBoot]: https://blog.csdn.net/weixin_43453386/article/details/84788714 [MyBatis_MyBatis-Plus]: https://blog.csdn.net/weixin_43453386/article/details/84788053
还没有评论,来说两句吧...