spring boot 整合kafka
一、spirng boot 集成这里跳过了
简单入门例子:https://www.jianshu.com/p/d24bceea7665
二、pom文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
三、spring boot配置文件
我采用了yml配置
部分配置如下:
spring:
kafka:
bootstrap-servers: 192.168.133.128:9092
producer:
#value-serializer: org.apache.kafka.common.serialization.ByteArraySerializer
consumer:
group-id: test1
#保证每个组一个消费者消费同一条消息,若设置为earliest,那么会从头开始读partition(none)
auto-offset-reset: latest
更多可以配置的属性参考 KafkaProperties 。
四、消费者监听
package com.st.kafka.consumer;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSONObject;
@Component
public class Listener {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@KafkaListener(topics = {
"test"},id="t2")
public void listen(ConsumerRecord<?, ?> record) {
logger.info("kafka的key: " + record.key());
List<String> list=(List<String>) JSONObject.parse(record.value().toString());
for(int i=0;i<list.size();i++){
logger.info("kafka遍历: " + list.get(i));
}
logger.info("kafka的value: " + record.value().toString());
}
}
五、简单测试生产者
package com.st.kafka.controller;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import com.alibaba.fastjson.JSONObject;
import com.st.base.dto.BaseResult;
import com.st.base.util.ResultUtil;
import io.swagger.annotations.ApiOperation;
@RestController
@RequestMapping("/kafka")
public class KafkaController {
protected final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private KafkaTemplate kafkaTemplate;
@ApiOperation("测试卡夫卡生产者")
@RequestMapping(value = "/send", method = RequestMethod.GET)
public BaseResult sendKafka(@RequestParam String key,@RequestParam String message) {
try {
List<String> list=new ArrayList<String>();
for(long i=0;i<1000;i++){
list.add(i+"");
}
logger.info("kafka的消息={}", list);
String temp=JSONObject.toJSONString(list);
kafkaTemplate.send("test","key501", temp);
logger.info("发送kafka成功.");
return ResultUtil.success("SUCCESS","发送kafka成功");
} catch (Exception e) {
logger.error("发送kafka失败", e);
return ResultUtil.success("FAIL","发送kafka失败");
}
}
@KafkaListener(id = "t1", topics = "test")
public void listenT1(ConsumerRecord<?, ?> cr) throws Exception {
logger.info("{} - {} : {}", cr.topic(), cr.key(), cr.value());
}
}
使用kafkaTemplate send 直接传obj 会报错 求各位大神指点解决方案
还没有评论,来说两句吧...