【Kafka】基本使用:SpringBoot整合Kafka-clinets
首先说明一点,SpringBoot 的版本和 Kafka-clinets 的版本有一个对照表格,如果没有按照正确的版本来引入,那么会存在版本问题导致ClassNotFound的问题,具体请参考 https://spring.io/projects/spring-kafka
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
1.Producer
@Component
public class MyKafKaProducer {
@Resource
// Integer:消息key的类型,每条消息都需一个 key,它决定了该消息会去哪个 Partition
// String:消息类型
private KafkaTemplate<Integer, String> kafkaTemplate;
public void send() {
// 通过kafka进行消息发送
// ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
kafkaTemplate.send("test", 1, "msgData");
}
}
2.Consumer
@Component
public class MyKafkaConsumer {
// 订阅相应主题。接收到的消息会被注入ConsumerRecord
@KafkaListener(topics = {
"test"})
public void listener(ConsumerRecord record) {
// Optional,jdk8新特性,用于null处理
Optional msg = Optional.ofNullable(record.value());
// isPresent判斷值是否存在
if (msg.isPresent()) {
// 打印消息
System.out.println(msg.get()); // get,从optional中取出对应值
}
}
}
3.application.properties
# kafka服务器
spring.kafka.bootstrapservers=43.107.136.126:9092
# 编码(Producer)
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# 解码(Consumer)
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# 对Consumer的特殊配置
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
注:如果我们关闭自动提交,即 enable-auto-commit 为 false 时,我们还可以配置 spring.kafka.listener.concurrency 去配置监听消息的线程数,即每个 @KafkaListener 注解标识的 topic 会有几个线程从队列中取出消息。具体参考请这篇文章…
4.启动类
@SpringBootApplication
public class TestKafkaApplication {
public static void main(String[] args) throws InterruptedException {
ConfigurableApplicationContext context =
SpringApplication.run(TestKafkaApplication.class, args);
// 获取producer的Bean实例,send消息
MyKafKaProducer kafKaProducer = context.getBean(MyKafKaProducer.class);
for (int i = 0; i < 10; i++) {
// Producer发送消息
// 发送之后Consumer自己就会poll,然后打印
kafKaProducer.send();
TimeUnit.SECONDS.sleep(2);
}
}
}
还没有评论,来说两句吧...