Kafka Flink示例(基于OSX环境)

柔情只为你懂 2022-12-30 13:47 213阅读 0赞

需要Java环境、依赖zookeeper

kafka

下载及安装

Java环境配置

解压至/opt目录

zk启动

  1. cd /opt/zookeeper-3.5.8
  2. bin/zkServer.sh start

kafka启动

  1. cd /opt/kafka_2.12-2.7.0
  2. bin/kafka-server-start.sh config/server.properties

show port status

  1. netstat -tnlp

测试

  1. cd /opt/kafka_2.12-2.7.0
  2. bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic flink-test

模拟接收消息

  1. cd /opt/kafka_2.12-2.7.0
  2. bin/kafka-console-consumer.sh --topic flink-test --from-beginning --bootstrap-server 127.0.0.1:9092

模拟发送消息

  1. cd /opt/kafka_2.12-2.7.0
  2. bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic flink-test

下面可以写Java程序了!

需要一个maven项目
要引入flink核心包,flink-streaming,还有flink-kafka-connector;

  1. <properties>
  2. <flink.version>1.11.3</flink.version>
  3. </properties>
  4. <repositories>
  5. <repository>
  6. <id>aliyun</id>
  7. <name>aliyun</name>
  8. <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  9. </repository>
  10. </repositories>
  11. <dependencies>
  12. <dependency>
  13. <groupId>org.apache.flink</groupId>
  14. <artifactId>flink-clients_2.12</artifactId>
  15. <version>${flink.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-kafka_2.12</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. <dependency>
  23. <groupId>org.apache.flink</groupId>
  24. <artifactId>flink-java</artifactId>
  25. <version>${flink.version}</version>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.flink</groupId>
  29. <artifactId>flink-streaming-java_2.12</artifactId>
  30. <version>${flink.version}</version>
  31. </dependency>
  32. </dependencies>
  33. package com.hi7s.tech;
  34. import org.apache.commons.lang3.StringUtils;
  35. import org.apache.flink.api.common.functions.FlatMapFunction;
  36. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  37. import org.apache.flink.api.java.tuple.Tuple2;
  38. import org.apache.flink.streaming.api.TimeCharacteristic;
  39. import org.apache.flink.streaming.api.datastream.DataStream;
  40. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  41. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
  42. import org.apache.flink.util.Collector;
  43. import java.util.Properties;
  44. public class StreamWorkCount {
  45. private static final String KAFKFA_URL = "192.168.0.106:9092,192.168.0.107:9092";
  46. private static final String ZK_URL = "192.168.0.106:2181,192.168.0.107:2181";
  47. private static final String TOPIC = "flink-test";
  48. private static final String GROUP_ID = "test-consumer-group";
  49. private static final Properties PROP = new Properties();
  50. static {
  51. PROP.setProperty("bootstrap.servers", KAFKFA_URL);
  52. PROP.setProperty("zookeeper.connect", ZK_URL);
  53. PROP.setProperty("group.id", GROUP_ID);
  54. }
  55. public static void main(String[] args) throws Exception {
  56. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  57. env.enableCheckpointing(1000);
  58. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
  59. FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>(TOPIC, new SimpleStringSchema(), PROP);//test0是kafka中开启的topic
  60. DataStream<Tuple2<String, Integer>> keyedStream = env.addSource(kafkaConsumer).flatMap(new LineSplitter()).keyBy(0).sum(1);
  61. keyedStream.print();
  62. env.execute("Flink Streaming Java API Skeleton");
  63. }
  64. static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  65. public void flatMap(String val, Collector<Tuple2<String, Integer>> collector) {
  66. for (String word : val.toLowerCase().split("\\W+")) {
  67. if (StringUtils.isNotBlank(word)) {
  68. collector.collect(new Tuple2<String, Integer>(word, 1));
  69. }
  70. }
  71. }
  72. }
  73. }

发表评论

表情:
评论列表 (有 0 条评论,213人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink-Kafka-MySQL

    2018年开始处理大数据相关的业务,Flink作为流处理新秀,在实时计算领域发挥着越来越大作用,本文主要整理在以往开发中Flink使用Kafka作为数据源,计算处理之后,再将数