Spring boot集成kafka生产者发布消息事件

系统管理员 2020-11-14 17:39 858阅读 0赞

Spring boot集成kafka

1、引入pom文件,版本的可以自己去maven中找

  1. <dependency>
  2. <groupId>org.springframework.kafka</groupId>
  3. <artifactId>spring-kafka</artifactId>
  4. <version>1.2.2.RELEASE</version>
  5. </dependency>

2、然后在配置文件中放入你kafka的一些配置

  1. kafka.producer.servers=172.16.100.8:9092
  2. kafka.producer.retries=0
  3. kafka.producer.batch.size=16384
  4. kafka.producer.linger=1
  5. kafka.producer.buffer.memory=33554432

3、然后增加kafka 的producer的配置类

  1. @Configuration
  2. @EnableKafka
  3. public class KafkaProducerConfig {
  4. @Value("${kafka.producer.servers}")
  5. private String servers;
  6. @Value("${kafka.producer.retries}")
  7. private int retries;
  8. @Value("${kafka.producer.batch.size}")
  9. private int batchSize;
  10. @Value("${kafka.producer.linger}")
  11. private int linger;
  12. @Value("${kafka.producer.buffer.memory}")
  13. private int bufferMemory;
  14. public Map<String, Object> producerConfigs() {
  15. Map<String, Object> props = new HashMap<>();
  16. props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
  17. props.put(ProducerConfig.RETRIES_CONFIG, retries);
  18. props.put(ProducerConfig.ACKS_CONFIG, "all");
  19. props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
  20. props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
  21. props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
  22. props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  23. props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  24. return props;
  25. }
  26. public ProducerFactory<String, String> producerFactory() {
  27. return new DefaultKafkaProducerFactory<>(producerConfigs());
  28. }
  29. @Bean
  30. public KafkaProducer<String, String> kafkaProducer(){
  31. return new KafkaProducer<String, String>(producerConfigs(), new StringSerializer(), new StringSerializer());
  32. }
  33. @Bean
  34. public KafkaTemplate<String, String> kafkaTemplate() {
  35. return new KafkaTemplate<String, String>(producerFactory());
  36. }
  37. }

这里我是配置了 kafkaProducer 和 kafkaTemplate两个。这两个都可以使用。因为我习惯用 kafkaProducer。

5、测试一下

  1. @Autowired
  2. private KafkaProducer<String, String> kafkaProducer;
  3. private void publish(String topic, Event event) {
  4. if (StringUtils.isEmpty(topic)) {
  5. throw new RuntimeException("topic can not be empty!");
  6. }
  7. if (event == null) {
  8. throw new NullPointerException("event can not be null!");
  9. }
  10. String json = JSON.toJSONString(event);
  11. ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, json);
  12. // kafkaTemplate.send(topic, json);
  13. kafkaProducer.send(record);
  14. }

这是我写的一个utils,试了一下是可以的。Event的话是一个实体类!

  1. public class Event {
  2. /** 事件类型 */
  3. private String type;
  4. /** 事件唯一标识 */
  5. private String id;
  6. /** 事件时间 */
  7. private Date time;
  8. /** 事件负载 */
  9. private String playloadJson;
  10. /**
  11. * 构造方法
  12. */
  13. public Event() {
  14. }
  15. /**
  16. * 构造方法
  17. *
  18. * @param type
  19. */
  20. public Event(String type) {
  21. this.type = type;
  22. this.id = UUID.randomUUID().toString();
  23. this.time = new Date();
  24. }
  25. /**
  26. * 取得事件负载
  27. *
  28. * @param clazz
  29. * @return
  30. */
  31. public <T> T getPayload(Class<T> clazz) {
  32. if (playloadJson != null) {
  33. return JSON.parseObject(playloadJson, clazz);
  34. }
  35. return null;
  36. }
  37. /**
  38. * 取得事件负载
  39. *
  40. * @param typeReference
  41. * @return
  42. */
  43. public <T> T getPayload(TypeReference<T> typeReference) {
  44. if (playloadJson != null) {
  45. return JSON.parseObject(playloadJson, typeReference);
  46. }
  47. return null;
  48. }
  49. /**
  50. * 取得事件负载
  51. *
  52. * @param clazz
  53. * @return
  54. */
  55. public <T> T getPayload(Class<T> clazz, T defaultValue) {
  56. if (playloadJson != null) {
  57. return JSON.parseObject(playloadJson, clazz);
  58. }
  59. return defaultValue;
  60. }
  61. /**
  62. * 取得事件负载
  63. *
  64. * @param typeReference
  65. * @return
  66. */
  67. public <T> T getPayload(TypeReference<T> typeReference, T defaultValue) {
  68. if (playloadJson != null) {
  69. return JSON.parseObject(playloadJson, typeReference);
  70. }
  71. return defaultValue;
  72. }
  73. /**
  74. * @param playload
  75. * the playload to set
  76. */
  77. public void setPayload(Object playload) {
  78. this.playloadJson = JSON.toJSONString(playload, SerializerFeature.DisableCircularReferenceDetect);
  79. }
  80. }

还有一些get和 set方法我没有复制出来。

发表评论

表情:
评论列表 (有 0 条评论,858人围观)

还没有评论,来说两句吧...

相关阅读