Kafka整合SpringBoot

旧城等待, 2022-05-27 10:23 371阅读 0赞
  1. <!--Spring 整合kafka-->
  2. <dependency>
  3. <groupId>org.springframework.kafka</groupId>
  4. <artifactId>spring-kafka</artifactId>
  5. <version>1.1.1.RELEASE</version>
  6. </dependency>

生产者:

@Component
public class KakfaProducer2 {

  1. @Autowired
  2. private KafkaTemplate<String,String> kafkaTemplate;
  3. private Gson gson=new Gson();
  4. //发送消息的方法
  5. public void send()\{
  6. Message message=new Message();
  7. message.setId(System.currentTimeMillis());
  8. message.setMsg(UUID.randomUUID().toString());
  9. message.setSendTime(new Date());
  10. kafkaTemplate.send("topic",gson.toJson(message));
  11. System.out.println("发送消息:"+gson.toJson(message));
  12. \}

}

application.properties:

#Kafka的配置

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=127.0.0.1:9092

#=============== provider =======================

spring.kafka.producer.retries=0
# 每次批量发送消息的数量
#spring.kafka.producer.batch-size=16384
#spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=test-consumer-group

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

pom.xml文件:




org.springframework.boot
spring-boot-starter-parent
1.5.6.RELEASE




org.springframework.boot
spring-boot-starter-web




org.springframework.boot
spring-boot-starter-test







junit
junit
3.8.1
test







org.springframework.boot
spring-boot-starter-data-mongodb





com.alibaba
fastjson
1.2.7




org.json
json
20131018





com.github.iweinzierl
jsonformat
1.0




com.squareup.okhttp
okhttp
2.3.0




commons-codec
commons-codec
1.9





org.apache.kafka
kafka_2.10
0.10.0.0







org.springframework.kafka
spring-kafka
1.1.1.RELEASE




com.google.code.gson
gson
2.8.2









com.netflix.hystrix
hystrix-core
1.5.12




org.slf4j
1.7.25
slf4j-log4j12






org.projectlombok
lombok
1.16.0
provided






commons-logging
commons-logging
1.2


org.apache.httpcomponents
httpclient
4.5.2




org.springframework.cloud
spring-cloud-starter-config


org.springframework.cloud
spring-cloud-starter-eureka


org.springframework.cloud
spring-cloud-starter-ribbon


org.springframework.cloud
spring-cloud-starter-hystrix










org.springframework.cloud
spring-cloud-dependencies
Dalston.SR1
pom
import



SpringBoot_ES_2

发表评论

表情:
评论列表 (有 0 条评论,371人围观)

还没有评论,来说两句吧...

相关阅读

    相关 SpringBoot整合kafka

    > 经过前三篇文章 安装jdk 安装zookeeper 以及安装kafka 全部已经竣工了,不知道小伙伴们成功搭建kafka了不。 > 憋了三天的大招,今天放出来吧。今天大家