第一个Flink例子-flink写入kafka

迷南。 2022-10-24 13:16 340阅读 0赞
  • 依赖
  • 代码
  • 运行

依赖

1.zookeeper

单机模式,参考:https://blog.csdn.net/sndayYU/article/details/90718238

基本上绿色版改下conf/zoo.cfg的日志和数据目录、建立下对应目录。

2.kafka

单机模式,参考:https://blog.csdn.net/sndayYU/article/details/90718786

基本上建立下 D:\tmp\kafka-logs 这个目录就ok了

代码

1.pom.xml

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  3. <maven.compiler.source>1.8</maven.compiler.source>
  4. <maven.compiler.target>1.8</maven.compiler.target>
  5. </properties>
  6. <dependencies>
  7. <dependency>
  8. <groupId>junit</groupId>
  9. <artifactId>junit</artifactId>
  10. <version>4.13</version>
  11. <scope>test</scope>
  12. </dependency>
  13. <dependency>
  14. <groupId>org.apache.flink</groupId>
  15. <artifactId>flink-connector-kafka_2.12</artifactId>
  16. <version>1.8.0</version>
  17. </dependency>
  18. <dependency>
  19. <groupId>org.apache.flink</groupId>
  20. <artifactId>flink-java</artifactId>
  21. <version>1.8.0</version>
  22. <scope>provided</scope>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.apache.flink</groupId>
  26. <artifactId>flink-streaming-java_2.11</artifactId>
  27. <version>1.8.0</version>
  28. <scope>provided</scope>
  29. </dependency><dependency>
  30. <groupId>org.apache.flink</groupId>
  31. <artifactId>flink-java</artifactId>
  32. <version>1.8.0</version>
  33. <scope>compile</scope>
  34. </dependency>
  35. <dependency>
  36. <groupId>org.apache.flink</groupId>
  37. <artifactId>flink-streaming-java_2.11</artifactId>
  38. <version>1.8.0</version>
  39. <scope>compile</scope>
  40. </dependency>
  41. </dependencies>

2.FlinkToKafka

  1. package com.ydfind;
  2. import org.apache.flink.api.common.serialization.SimpleStringSchema;
  3. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  4. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  5. import org.apache.flink.streaming.api.functions.source.SourceFunction;
  6. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
  7. import java.text.SimpleDateFormat;
  8. import java.util.*;
  9. /**
  10. * flink不断产生数据 发送到 kafka
  11. * @author drd
  12. * @date 2021.1.29
  13. */
  14. public class FlinkToKafka {
  15. public static void main(String[] args) throws Exception{
  16. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  17. // 注册数据源
  18. DataStreamSource<String> text = env.addSource(new MyFlinkProducer()).setParallelism(1);
  19. Properties properties = new Properties();
  20. properties.setProperty("bootstrap.servers", "localhost:9092");
  21. FlinkKafkaProducer<String> producer = new FlinkKafkaProducer("flink-test",new SimpleStringSchema(),properties);
  22. producer.setWriteTimestampToKafka(true);
  23. // 添加处理对象
  24. text.addSink(producer);
  25. // 开始执行
  26. env.execute();
  27. }
  28. public static class MyFlinkProducer implements SourceFunction<String> {//1
  29. private boolean running = true;
  30. @Override
  31. public void run(SourceContext<String> sourceContext) throws Exception {
  32. String format = "yyyy-MM-dd HH:mm:ss.SSS";
  33. SimpleDateFormat sdf = new SimpleDateFormat(format);
  34. while(running){
  35. String msg = sdf.format(new Date(System.currentTimeMillis())) + ", this is from MySourceProducer";
  36. sourceContext.collect(msg);
  37. Thread.sleep(3000);
  38. }
  39. }
  40. @Override
  41. public void cancel() {
  42. running = false;
  43. }
  44. }
  45. }

运行

1.运行zookeeper

  1. win10:
  2. .\bin\zkServer.cmd
  3. contenos:
  4. .\bin\zkServer.sh start

2.kafka

  1. // 创建kafka的topic
  2. cd D:\YDGreenNew\kafka_2.12-2.2.0
  3. bin\windows\kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-test

3.用idea运行代码

4.观察kafka的消费者输出

  1. // 启动消费者观察结果
  2. .\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic flink-test --from-beginning

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,340人围观)

还没有评论,来说两句吧...

相关阅读