Flink DataSet API (一) Data Source

水深无声 2023-05-31 07:22 139阅读 0赞

一、DataStream 和 DataSet

Flink用DataStream 表示无界数据集,用DataSet表示有界数据集,前者用于流处理应用程序,后者用于批处理应用程序。从操作形式上看,DataStream 和 DataSet 与集合 Collection 有些相似,但两者有着本质的区别:

(1)DataStream 和 DataSet 是不可变的数据集合,因此不可以想操作集合那样增加或者删除 DataStream 和 DataSet 中的元素,也不可以通过诸如下标等方式访问某个元素。

(2)Flink 应用程序通过 Source 创建 DataStream 对象和 DataSet 对象,通过转换操作产生新的 DataStream 对象和 DataSet 对象。

运行时是应用程序被调度执行时的上下文环境,通过StreamExecutionEnvironment或ExecutionEnvironment方法会根据当前环境自动选择本地或者集群运行时环境。

flink在批处理中常见的source主要有两大类。

  1. 1.基于本地集合的sourceCollection-based-source
  2. 2.基于文件的sourceFile-based-source

在flink最常见的创建DataSet方式有三种。

  1. 1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。
  2. 2.使用env.fromCollection(),这种方式支持多种Collection的具体类型
  3. 3.使用env.generateSequence()方法创建基于SequenceDataSet

1、基于本地集合的

  1. package datasetapi.sources
  2. import org.apache.flink.api.scala.ExecutionEnvironment
  3. import scala.collection.mutable
  4. import scala.collection.mutable.{ArrayBuffer, ListBuffer}
  5. /**
  6. * \* Created with IntelliJ IDEA.
  7. * \* User: sunxianpeng
  8. * \* Date: 2019/10/23
  9. * \* Time: 20:04
  10. * \* To change this template use File | Settings | File Templates.
  11. * \* Description:
  12. * \*/
  13. object SourceTest {
  14. import org.apache.flink.api.scala.extensions._
  15. import org.apache.flink.api.scala._
  16. import org.apache.flink.streaming.api.scala.extensions._
  17. def main(args: Array[String]): Unit = {
  18. val env = ExecutionEnvironment.getExecutionEnvironment
  19. //0.用element创建DataSet(fromElements)
  20. val ds0: DataSet[String] = env.fromElements("spark", "flink")
  21. ds0.print()
  22. //1.用Tuple创建DataSet(fromElements)
  23. val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink"))
  24. ds1.print()
  25. //2.用Array创建DataSet
  26. val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink"))
  27. ds2.print()
  28. //3.用ArrayBuffer创建DataSet
  29. val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink"))
  30. ds3.print()
  31. //4.用List创建DataSet
  32. val ds4: DataSet[String] = env.fromCollection(List("spark", "flink"))
  33. ds4.print()
  34. //5.用List创建DataSet
  35. val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink"))
  36. ds5.print()
  37. //6.用Vector创建DataSet
  38. val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink"))
  39. ds6.print()
  40. //7.用Queue创建DataSet
  41. val ds7: DataSet[String] = env.fromCollection(mutable.Queue("spark", "flink"))
  42. ds7.print()
  43. //8.用Stack创建DataSet
  44. val ds8: DataSet[String] = env.fromCollection(mutable.Stack("spark", "flink"))
  45. ds8.print()
  46. //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
  47. val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink"))
  48. ds9.print()
  49. //10.用Seq创建DataSet
  50. val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink"))
  51. ds10.print()
  52. //11.用Set创建DataSet
  53. val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink"))
  54. ds11.print()
  55. //12.用Iterable创建DataSet
  56. val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink"))
  57. ds12.print()
  58. //13.用ArraySeq创建DataSet
  59. val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink"))
  60. ds13.print()
  61. //14.用ArrayStack创建DataSet
  62. val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink"))
  63. ds14.print()
  64. //15.用Map创建DataSet
  65. val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink"))
  66. ds15.print()
  67. //16.用Range创建DataSet
  68. val ds16: DataSet[Int] = env.fromCollection(Range(1, 9))
  69. ds16.print()
  70. //17.用fromElements创建DataSet
  71. val ds17: DataSet[Long] = env.generateSequence(1,9)
  72. ds17.print()
  73. }
  74. }

二、基于文件的source(File-based-source)

(1):读取本地文件

  1. //TODO 使用readTextFile读取本地文件
  2. //TODO 初始化环境
  3. val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  4. //TODO 加载数据
  5. val datas: DataSet[String] = environment.readTextFile("data.txt")
  6. //TODO 指定数据的转化
  7. val flatmap_data: DataSet[String] = datas.flatMap(line => line.split("\\W+"))
  8. val tuple_data: DataSet[(String, Int)] = flatmap_data.map(line => (line , 1))
  9. val groupData: GroupedDataSet[(String, Int)] = tuple_data.groupBy(line => line._1)
  10. val result: DataSet[(String, Int)] = groupData.reduce((x, y) => (x._1 , x._2+y._2))
  11. result.print()

(2):读取hdfs数据

  1. //TODO readTextFile读取hdfs数据
  2. //todo 初始化环境
  3. val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  4. //TODO 加载数据
  5. val file: DataSet[String] = environment.readTextFile("hdfs://hadoop01:9000/README.txt")
  6. val flatData: DataSet[String] = file.flatMap(line => line.split("\\W+"))
  7. val map_data: DataSet[(String, Int)] = flatData.map(line => (line , 1))
  8. val groupdata: GroupedDataSet[(String, Int)] = map_data.groupBy(line => line._1)
  9. val result_data: DataSet[(String, Int)] = groupdata.reduce((x, y) => (x._1 , x._2+y._2))
  10. result_data.print()

(3):读取CSV数据

  1. //TODO 读取csv数据
  2. val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
  3. val path = "data2.csv"
  4. val ds3 = environment.readCsvFile[(String, String, String, String,String,Int,Int,Int)](
  5. filePath = path,
  6. lineDelimiter = "\n",
  7. fieldDelimiter = ",",
  8. lenient = false,
  9. ignoreFirstLine = true,
  10. includedFields = Array(0, 1, 2, 3 , 4 , 5 , 6 , 7))
  11. val first = ds3.groupBy(0 , 1).first(50)
  12. first.print()

三、基于文件的source(遍历目录)

flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。

对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用 recursive.file.enumeration 进行递归读取

  1. val env = ExecutionEnvironment.getExecutionEnvironment
  2. val parameters = new Configuration
  3. // recursive.file.enumeration 开启递归
  4. parameters.setBoolean("recursive.file.enumeration", true)
  5. val ds1 = env.readTextFile("test").withParameters(parameters)
  6. ds1.print()

四、读取压缩文件

对于以下压缩类型,不需要指定任何额外的 inputformat 方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。

  1. //TODO 读取压缩文件
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. val file = env.readTextFile("test/data1/zookeeper.out.gz").print()
  4. tar -czvf ***.tar.gz

发表评论

表情:
评论列表 (有 0 条评论,139人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink DataSet API编程指南

    Flink中的DataSet程序是实现数据集转换的常规程序(例如,过滤,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从本地集合创建)。结果通过接收器返