java集成kafka 0.10版本

た 入场券 2022-05-20 06:52 197阅读 0赞

一、添加maven依赖

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.10.2.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.kafka</groupId>
  8. <artifactId>kafka_2.11</artifactId>
  9. <version>0.10.2.2</version>
  10. </dependency>

二、java代码

(1)生产者

  1. package com.ldy.kafka.service;
  2. import java.util.Properties;
  3. import org.apache.kafka.clients.producer.KafkaProducer;
  4. import org.apache.kafka.clients.producer.Producer;
  5. import org.apache.kafka.clients.producer.ProducerRecord;
  6. import org.springframework.stereotype.Service;
  7. /**
  8. * @类名: KafkaMsgProducer<br>
  9. * @描述: kafka消息生产者(0.10.x版本)<br>
  10. * @创建者: lidongyang<br>
  11. * @修改时间: 2018年7月13日 下午5:07:40<br>
  12. */
  13. @Service
  14. public class KafkaMsgProducer {
  15. private static Properties props = new Properties();
  16. private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";
  17. static {
  18. props.put("bootstrap.servers", servers);
  19. props.put("acks", "all");
  20. props.put("retries ", 1);
  21. props.put("buffer.memory", 33554432);
  22. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  23. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  24. }
  25. /**
  26. * @方法名: sendMsg<br>
  27. * @描述: 发送消息<br>
  28. * @创建者: lidongyan<br>
  29. * @修改时间: 2018年7月13日 下午5:05:52<br>
  30. * @param topic
  31. * @param msg
  32. */
  33. public static void sendMsg(String topic, String msg) {
  34. Producer<String, String> producer = null;
  35. try {
  36. producer = new KafkaProducer<String, String>(props);
  37. producer.send(new ProducerRecord<String, String>(topic, msg));
  38. } catch (Exception e) {
  39. e.printStackTrace();
  40. } finally {
  41. if(null != producer) {
  42. producer.close();
  43. }
  44. }
  45. }
  46. /**
  47. * @方法名: sendMsg<br>
  48. * @描述: 发送消息 K,V格式<br>
  49. * @创建者: lidongyang<br>
  50. * @修改时间: 2018年7月13日 下午5:09:52<br>
  51. * @param topic
  52. * @param key
  53. * @param value
  54. */
  55. public static void sendMsg(String topic, String key, String value) {
  56. Producer<String, String> producer = null;
  57. try {
  58. producer = new KafkaProducer<String, String>(props);
  59. producer.send(new ProducerRecord<String, String>(topic, key, value));
  60. } catch (Exception e) {
  61. e.printStackTrace();
  62. } finally {
  63. if(null != producer) {
  64. producer.close();
  65. }
  66. }
  67. }
  68. public static void main(String[] args) {
  69. for (int i = 0; i < 10; i++) {
  70. sendMsg("test_topic", "content" + i);
  71. }
  72. }
  73. }

(2)消费者

  1. package com.ldy.kafka.service;
  2. import java.util.Arrays;
  3. import java.util.Properties;
  4. import org.apache.commons.lang3.StringUtils;
  5. import org.apache.kafka.clients.consumer.Consumer;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;
  9. /**
  10. * @类名: KafkaMsgConsumer<br>
  11. * @描述: kafka消息消费者(0.10.x版本)<br>
  12. * @创建者: lidongyang<br>
  13. * @修改时间: 2018年7月14日 上午10:35:23<br>
  14. */
  15. public class KafkaMsgConsumer {
  16. private static Properties props = new Properties();
  17. private static String servers = "11.2.3.4:9092,11.2.3.5:9092,11.2.3.6:9092,12.2.3.7:9092,11.2.3.8:9092";
  18. static {
  19. props.put("bootstrap.servers", servers);
  20. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  21. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  22. }
  23. /**
  24. * @方法名: receiveMsg<br>
  25. * @描述: 接收消息<br>
  26. * @创建者: lidongyang<br>
  27. * @修改时间: 2018年7月14日 上午10:44:18<br>
  28. * @param topic
  29. * @param groupId
  30. */
  31. public static void receiveMsg(String topic, String groupId) {
  32. if (StringUtils.isNotBlank(groupId)) {
  33. props.put("group.id", groupId);
  34. }
  35. try {
  36. @SuppressWarnings("resource")
  37. Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
  38. // 配置topic
  39. consumer.subscribe(Arrays.asList(topic));
  40. while (true) {
  41. // 得到ConsumerRecords实例
  42. ConsumerRecords<String, String> records = consumer.poll(100);
  43. for (ConsumerRecord<String, String> record : records) {
  44. // 直接通过record.offset()得到offset的值
  45. System.out.println("offset = " + record.offset());
  46. System.out.println("key = " + record.key() + ",value=" + record.value());
  47. }
  48. }
  49. } catch (Exception e) {
  50. e.printStackTrace();
  51. }
  52. }
  53. public static void main(String[] args) {
  54. String topic = "test_topic_ldy";
  55. String groupId = "group1";
  56. receiveMsg(topic, groupId);
  57. }
  58. }

发表评论

表情:
评论列表 (有 0 条评论,197人围观)

还没有评论,来说两句吧...

相关阅读

    相关 kafka集成flume

    理论知识: 1、定位:分布式的消息队列系统,同时提供数据分布式缓存功能(默认7天) 2、消息持久化到磁盘,达到O(1)访问速度,预读和后写,对磁盘的顺序访问(比内存访