Flink 系列三 Flink 实战

爱被打了一巴掌 2024-03-17 18:02 233阅读 0赞

目录

编辑

前言

1、安装flink环境

2、在idea中创建flink的第一个demo

2.1、执行如下maven命令

2.2、填写’groupId’、’artifactId’、’version’、’package’

2.3、选择Yes即可生成创建好的工程

3、开发第一个flink程序

3.1、开发一个简单的统计程序

3.2、直接编译得到jar包

4、启动环境

4.1、启动已经下载好的flink环境

4.2、创建一个服务端的Tcp 监听

4.3、打开计算日志

4.4、在建立nc监听端口中输入text

4.5、在输出日志中就有统计


在这里插入图片描述

前言

Flink 做为一款流式计算框架,它可用来做批处理,即处理静态的数据集、历史的数据集;也可以用来做流处理,即实时的处理些实时数据流,实时的产生数据流结果,只要数据源源不断的过来,Flink 就能够一直计算下去。详细的介绍我本篇就不做阐述了,感兴趣的同学可以回复往期的文章:Flink 系列二 Flink 状态化流处理概述,Flink 系列一 开发机 安装。本篇作为 Flink 系列的第三篇,咱们尝试在本地安装和实操 Flink。

首先需要在你的本地环境安装apache-flink,执行如下命令即可,若采用docker安装更加方便。

  1. brew install apache-flink

2.1、执行如下maven命令

执行如下命令创建工程。这个命令的作用是使用Maven构建一个基于Apache Flink的Java快速启动项目模板。执行完后会下载对应的依赖包。

  1. mvn archetype:generate \
  2. -DarchetypeGroupId=org.apache.flink \
  3. -DarchetypeArtifactId=flink-quickstart-java \
  4. -DarchetypeVersion=1.8.0 \
  5. -DarchetypeCatalog=local

解释一下具体含义:

  • mvn是Maven的命令行工具。
  • archetype:generate表示使用原型机模板生成一个新项目。
  • -DarchetypeGroupId指定了项目模板的组ID,即Apache Flink团队为项目提供的默认模板组ID。
  • -DarchetypeArtifactId指定了项目模板的Artifact ID,即Apache Flink团队为项目提供的默认模板Artifact ID。
  • -DarchetypeVersion指定了项目模板的版本号。
  • -DarchetypeCatalog指定了本地的模板目录。
  • 反斜杠(\)是命令的折行符,它表示这个命令是连续的,但是出于格式上的考虑需要分成多行。

86e54aa357384674bb9307852b624d47.png

2.2、填写’groupId’、’artifactId’、’version’、’package’

  1. Define value for property 'groupId': com.lly.flink.java
  2. Define value for property 'artifactId': flink-traning
  3. Define value for property 'version' 1.0-SNAPSHOT: : 1.0.0
  4. Define value for property 'package' com.lly.flink.java: :
  5. Confirm properties configuration:
  6. groupId: com.lly.flink.java
  7. artifactId: flink-traning
  8. version: 1.0.0
  9. package: com.lly.flink.java

2.3、选择Yes即可生成创建好的工程

特别注意,这里一定要选择 “Y”,保证项目顺利生产。

  1. Y: : Y
  2. [INFO] ----------------------------------------------------------------------------
  3. [INFO] Using following parameters for creating project from Archetype: flink-quickstart-java:1.8.0
  4. [INFO] ----------------------------------------------------------------------------
  5. [INFO] Parameter: groupId, Value: com.lly.flink.java
  6. [INFO] Parameter: artifactId, Value: flink-traning
  7. [INFO] Parameter: version, Value: 1.0.0
  8. [INFO] Parameter: package, Value: com.lly.flink.java
  9. [INFO] Parameter: packageInPathFormat, Value: com/lly/flink/java
  10. [INFO] Parameter: package, Value: com.lly.flink.java
  11. [INFO] Parameter: version, Value: 1.0.0
  12. [INFO] Parameter: groupId, Value: com.lly.flink.java
  13. [INFO] Parameter: artifactId, Value: flink-traning
  14. [WARNING] CP Don't override file /Users/liluyang/flink-traning/src/main/resources
  15. [INFO] Project created from Archetype in dir: /Users/liluyang/flink-traning
  16. [INFO] ------------------------------------------------------------------------
  17. [INFO] BUILD SUCCESS
  18. [INFO] ------------------------------------------------------------------------
  19. [INFO] Total time: 01:17 min
  20. [INFO] Finished at: 2020-11-05T12:42:42+08:00
  21. [INFO] ------------------------------------------------------------------------

3.1、开发一个简单的统计程序

  1. package com.lly.flink.java;
  2. import org.apache.flink.api.common.functions.FlatMapFunction;
  3. import org.apache.flink.api.java.tuple.Tuple2;
  4. import org.apache.flink.streaming.api.datastream.DataStreamSource;
  5. import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
  6. import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  7. import org.apache.flink.util.Collector;
  8. /**
  9. * @author lly
  10. * @date 2020-11-05
  11. **/
  12. public class SocketTextStreamWordCount {
  13. public static void main(String[] args) throws Exception {
  14. //参数检查
  15. if (args.length != 2) {
  16. System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>");
  17. return;
  18. }
  19. String hostname = args[0];
  20. Integer port = Integer.parseInt(args[1]);
  21. // set up the streaming execution environment
  22. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  23. //获取数据
  24. DataStreamSource<String> stream = env.socketTextStream(hostname, port);
  25. //计数
  26. SingleOutputStreamOperator<Tuple2<String, Integer>> sum = stream.flatMap(new LineSplitter())
  27. .keyBy(0)
  28. .sum(1);
  29. sum.print();
  30. env.execute("Java WordCount from SocketTextStream Example");
  31. }
  32. public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
  33. @Override
  34. public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) {
  35. String[] tokens = s.toLowerCase().split("\\W+");
  36. for (String token : tokens) {
  37. if (token.length() > 0) {
  38. collector.collect(new Tuple2<String, Integer>(token, 1));
  39. }
  40. }
  41. }
  42. }
  43. }

我这里简单解释一些这段代码,希望刚开始学习的同学可以理解的更深刻。这段代码使用 Flink 实现了通过网络流读取数据,统计单词出现次数的功能。具体实现细节如下:

  1. 声明一个 SocketTextStreamWordCount 类,定义该类的 main 方法作为程序的入口。
  2. main 方法中,首先对传入的命令行参数进行检查,如果参数个数不为 2,则输出使用说明后直接返回。
  3. 然后获取主机地址和端口号,用于后续建立套接字连接。
  4. 接着创建 Flink 流处理的环境对象 StreamExecutionEnvironment,用于设置执行环境和创建数据流。
  5. 调用 socketTextStream 方法,获取一个 DataStreamSource<String> 对象,用于从套接字连接中获取数据流。
  6. 对获取的数据流进行 flatMap 操作,使用 LineSplitter 类作为转换器将每行文本数据切分成单词,并将单词转化为 "单词,1" 的元组格式,用于后续统计。
  7. 对转换后的数据流使用 keyBy 方法,按照第一个字段(即单词)进行分组。
  8. 对分组后的数据使用 sum 方法,对第二个字段(即出现次数)进行求和,返回一个 SingleOutputStreamOperator<Tuple2<String, Integer>> 类型的结果流。
  9. 最后调用 print 方法,将结果打印输出到控制台。
  10. 最后调用 execute 方法,传入一个字符串 “Java WordCount from SocketTextStream Example” 作为任务名称,开始执行整个 Flink 应用程序。
  11. 声明了一个静态内部类 LineSplitter,实现了 Flink 的 FlatMapFunction 接口,并重写了 flatMap 方法。该方法将输入的文本行按照非单词字符(如空格、逗号等)进行切分,并将每个单词转化为一个元组,其中第一个字段为单词,第二个字段为 1,表示该单词出现了 1 次。

3.2、直接编译得到jar包

3b752856b18242579cc600e97ca3336c.png

4、启动环境

flink run -c 业务类包路径 jar包路径 IP 端口 示例

  1. flink run -c 业务类包路径 jar包路径 IP 端口
  2. 示例:
  3. flink run -c com.lly.flink.SocketTextStreamWordCount /Users/liluyang/flink-traning/target/original-flink-traning-1.0.0.jar 127.0.0.1 9000

启动成功之后会生成Job ID

Job has been submitted with JobID b04bad9f4c05efd67344179ee676b513

启动成功之后访问:http://localhost:8081/就可以直接当问flink的的操作后台,操作后台可以直观的看到Job的执行情况和基本的操作

3fb1c6f6efe74e8490b91f92acc8f818.png

4.2、创建一个服务端的Tcp 监听

创建一个server监听并接受链接

nc -l 9000

4.3、打开计算日志

cd /usr/local/Cellar/apache-flink/1.10.0/libexec/log

619b5ff7e8984feb84f19d9b6465e907.png

4.4、在建立nc监听端口中输入text

  1. liluyang@liluyangdeMacBook-Pro ~ % nc -l 9000
  2. cda
  3. cda
  4. dsas
  5. assgasg
  6. nihao
  7. nihao
  8. nihao
  9. nihao
  10. 1
  11. 1
  12. 1
  13. 1
  14. 1
  15. 1
  16. 1

4.5、在输出日志中就有统计

  1. liluyang@liluyangdeMacBook-Pro log % tail -100f flink-liluyang-taskexecutor-0-liluyangdeMacBook-Pro.local.out
  2. (cda,1)
  3. (cda,2)
  4. (dsas,1)
  5. (assgasg,1)
  6. (nihao,1)
  7. (nihao,2)
  8. (nihao,3)
  9. (nihao,4)
  10. (1,1)
  11. (1,2)
  12. (1,3)
  13. (1,4)
  14. (1,5)
  15. (1,6)

至此:Mac 电脑上安装 Flink,并且运行它。接着通过一个简单的 Flink 程序来介绍如何构建及运行Flink 程序。

发表评论

表情:
评论列表 (有 0 条评论,233人围观)

还没有评论,来说两句吧...

相关阅读