Kafka:提高生产者吞吐量

朴灿烈づ我的快乐病毒、 2024-03-25 21:03 185阅读 0赞

4ea6f05ad3bc0153b26582e20c01a9a1.png

  1. import org.apache.kafka.clients.producer.KafkaProducer;
  2. import org.apache.kafka.clients.producer.ProducerConfig;
  3. import org.apache.kafka.clients.producer.ProducerRecord;
  4. import org.apache.kafka.common.serialization.StringSerializer;
  5. import java.util.Properties;
  6. public class CustomProducerParameters {
  7. public static void main(String[] args) {
  8. // 0 配置
  9. Properties properties = new Properties();
  10. // 连接kafka集群
  11. properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.25.129:9092,192.168.25.128:9092");
  12. // 序列化
  13. properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
  14. properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());
  15. // 缓冲区大小 默认32m 分区越多适当提高一般为64m
  16. properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);
  17. // 批次大小 默认为16k 一般写32k
  18. properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);
  19. // 等待时间 单位ms 默认为0,即为以来消息就向Kafka发送消息,减少延迟;一般写为5-100之间
  20. properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
  21. // 压缩
  22. properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");
  23. // 1 创建生产者
  24. // KafkaProducer<K, V>
  25. // 泛型K为key一般为String类型 泛型V为传递消息的类型,此处发送字符串用String类型
  26. // 下列发送数据即为 “” “hello ”
  27. KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
  28. // 2 发送数据
  29. for (int i = 0; i < 5; i++) {
  30. kafkaProducer.send(new ProducerRecord<>("test","hello"+i));
  31. }
  32. // 3 关闭资源
  33. kafkaProducer.close();
  34. }
  35. }

发表评论

表情:
评论列表 (有 0 条评论,185人围观)

还没有评论,来说两句吧...

相关阅读