Structured Streaming 简介

雨点打透心脏的1/2处 2023-06-19 11:12 165阅读 0赞
  1. Structured Streaming 关键思想

    把数据流视作一张数据不断增加的表,这样用户就可以基于这张表进行数据处理,就好像使用批处理来处理静态数据一样,但实际Spark 底层是把新数据不断地增量添加到这张无界的表的下一行中。
    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70

  2. 示例代码
    1. import org.apache.log4j.{Level, Logger}
    2. import org.apache.spark.SparkConf
    3. import org.apache.spark.sql.streaming.OutputMode
    4. import org.apache.spark.sql.{DataFrame, SparkSession}
    5. /**
    6. * 监听网络端口发来的内容,然后进行 WordCount
    7. */
    8. object StructuredStreamingDemo {
    9. def main(args: Array[String]): Unit = {
    10. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    11. val conf = new SparkConf()
    12. .setIfMissing("spark.master", "local[4]")
    13. .setAppName("Structured Network Count")
    14. .set("fs.defaultFS","file://D:/temp/defaultFS/")
    15. // 创建程序入口 SparkSession,并引入 spark.implicits 来允许 Scalaobject 隐式转换为 DataFrame
    16. val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
    17. import spark.implicits._
    18. // 第二步: 创建流。配置从 socket 读取流数据,地址和端口为 localhost: 9999
    19. val lines: DataFrame = spark.readStream.format("socket")
    20. .option("host", "192.168.1.101")
    21. .option("port", "9999")
    22. .load()
    23. // 第三步: 进行单词统计。这里 lines 是 DataFrame ,使用 as[String]给它定义类型转换为 Dataset, 之后在 Dataset 里进行单词统计。
    24. val words: Dataset[String] = lines.as[String].flatMap(_.split(" "))
    25. val wordcount: DataFrame = words.groupBy("value").count()
    26. // 第四步: 创建查询句柄,定义打印结果方式并启动程序 这里使用 writeStream 方法, 输出模式为全部输出到控制台。
    27. val query: StreamingQuery = wordcount.writeStream
    28. .outputMode(OutputMode.Complete)
    29. .format("console")
    30. .start()
    31. // 调用 awaitTermination 方法来防止程序在处理数据时停止
    32. query.awaitTermination()
    33. }
    34. }
  3. 运行结果
    1. ...
    2. Connected to the target VM, address: '127.0.0.1:61600', transport: 'socket'
    3. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    4. 19/12/06 15:52:13 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    5. -------------------------------------------
    6. Batch: 0
    7. -------------------------------------------
    8. +------+-----+
    9. | value|count|
    10. +------+-----+
    11. |apache| 1|
    12. | spark| 1|
    13. +------+-----+
    14. -------------------------------------------
    15. Batch: 1
    16. -------------------------------------------
    17. +------+-----+
    18. | value|count|
    19. +------+-----+
    20. |apache| 2|
    21. | spark| 1|
    22. |hadoop| 1|
    23. +------+-----+
    24. ...
  4. 遇到错误及解决

    错误日志:

    1. Connected to the target VM, address: '127.0.0.1:64189', transport: 'socket'
    2. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    3. 19/12/06 10:36:54 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
    4. Exception in thread "main" java.lang.IllegalArgumentException: Pathname /C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets from C:/Users/admin/AppData/Local/Temp/temporary-58e0d2c8-c72e-4f8d-8670-c0931c2f5bfe/offsets is not a valid DFS filename.
    5. at org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:196)
    6. at org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
    7. at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1118)
    8. at org.apache.hadoop.hdfs.DistributedFileSystem$18.doCall(DistributedFileSystem.java:1114)
    9. at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
    10. at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1114)
    11. at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400)
    12. at org.apache.spark.sql.streaming.StreamingQueryManager.createQuery(StreamingQueryManager.scala:221)
    13. at org.apache.spark.sql.streaming.StreamingQueryManager.startQuery(StreamingQueryManager.scala:278)
    14. at org.apache.spark.sql.streaming.DataStreamWriter.start(DataStreamWriter.scala:282)
    15. at com.cloudera.StructuredStreamingDemo$.main(StructuredStreamingDemo.scala:40)
    16. at com.cloudera.StructuredStreamingDemo.main(StructuredStreamingDemo.scala)
    17. Disconnected from the target VM, address: '127.0.0.1:64189', transport: 'socket'
    18. Process finished with exit code 1

    解决办法:

    1. 去掉 core-site.xml 配置文件或注释掉该文件中的 fs.defaultFS 配置

      1. <property>
      2. <name>fs.defaultFS</name>
      3. <value>hdfs://cdh01:8020</value>
      4. </property>
    2. 代码中添加 set("fs.defaultFS","file://D:/temp/defaultFS/")
  5. Structured Streaming 输出模式
    1. CompleteMode 完整模式

      整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整张表的写入。聚合操作以及聚合之后的排序操作支持这种模式。

      上面示例中使用的是 CompleteMode ,程序中接收数据的输入表是 lines ,它是DataFrame ,新来的数据会被添加进去。之后的 wordCounts 是结果表。当程序启动时, Spark 会不断检测是否有新数据加入到 lines 中,如果有新数据,则运行 个增量的查询,与上一次查询的结果合井,并且更新结果表。

      watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70 1

      在上面的 CompleteMode 下, Spark 因为只在有新数据进来的时候才会更新结果,所以帮用户解决了容错和数据一致性的问题。如 at-most-once, at-least-once, exactly-once 问题。

    2. AppendMode 附加模式

      只有自上次触发执行后在结果表中附加的新行会被写入外部存储器。这仅适用于结果表中 的现有行不会更改的查询,如 select、 where、 map、 flatMap、 filter、 join 等操作支持这种模式。

    3. UpdateMode 更新模式

      自Spark 2.1.1起可用。只有自上次触发执行后在结果表中更新的行将被写入外部存储器(不输出未更改的行)。

      注意: 与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,它将等同于追加模式。

  6. Event-time 和 Late Data

    event-time 是嵌入事件本身的时间,记录了事件发生的时间。很多时候我们需要用这个时间来实现业务逻辑,例如,我们要获取 IOT 设备每分钟产生的事件数量,则可能需要使用生成数据的时间(即数据中的 event-time ),而不是 Spark 接收的时间。在这个模式下, event-time 作为每行数据中的一列,可以用于基于时间窗口的聚合(例如,每分钟的事件数),看成是 event-time 列的一种特殊的分组和聚合的特殊类型——每个时间窗口是一个组,每行可以属于多个窗口/组。

  7. 容错语义

    关于容错方面,提供端到端的 exactly-once 语义是 Structured Streaming 主要设计目标之一,为实现(exactly-once),设计了结构化流源(Structured Streaming sources)、执行引擎 (execution) 和存储 (sinks) 3个方面来可靠地跟踪处理的确切进度,以便可以通过重新启动或重新处理来处理任何类型的故障。

    Structured Streaming 是这样实现的: 假定每个数据源都有偏移量(类似于 kafka 的 offset 或 Kinesis 序列号) 用来追溯跟踪数据在数据流中的位置;在执行引擎中会通过 checkpoint (检查点) 和 WAL (writeaheadlogs 预写日志) 记录每个触发器中正在被处理的数据的偏移量范围在内的程序运行进度信息;在存储层设计成多次处理结果幕等,即处理多次结果相同。这样在任何故障下确保了 Structured Streaming 端到端 exactly-once 的语义一致性。

  8. 事件时间窗口操作

    sliding event-time window 上的聚合对于 Structured Streaming 而言非常简单,类似于分组聚合。在分组聚合中,在用户指定的分组列中为每个唯一值维护聚合值。在基于窗口聚合的情况中,行事件时间所属的每个窗口都会维护聚合值。
    示例:
    我们想每5分钟统计一次10分钟内的单词数。也就是说,在10分钟窗口 12:00-12:10、12:05-12:15、12:10-12:20等之间接收的单词中的单词计数。请注意,12:00-12:10 表示数据12:00之后但12:10之前到达。比如在 12:07 收到的单词。这个单词应该在 12:00-12:10 和 12:05-12:15 两个窗口中都要被统计。因此,计数将通过分组键(即单词)和窗口(可以从事件时间计算)来索引。如图:
    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70 2

    示例代码:

    1. import java.sql.Timestamp
    2. import org.apache.log4j.{Level, Logger}
    3. import org.apache.spark.SparkConf
    4. import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery}
    5. import org.apache.spark.sql.{DataFrame, SparkSession}
    6. object WindowOnEventTime {
    7. def main(args: Array[String]): Unit = {
    8. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    9. val conf = new SparkConf()
    10. .setIfMissing("spark.master", "local[4]")
    11. .setAppName("Structured Streaming Window-Event-Time")
    12. .set("fs.defaultFS","file://E:/CDHProjectDemo/spark-demo/temp/")
    13. val spark: SparkSession = SparkSession.builder.config(conf).getOrCreate()
    14. import spark.implicits._
    15. val lines: DataFrame = spark.readStream.format("socket")
    16. .option("host", "192.168.1.101")
    17. .option("port", "9999")
    18. //添加时间戳
    19. .option("includeTimestamp", true)
    20. .load()
  1. val words = lines.as[(String, Timestamp)].flatMap(line =>
  2. line._1.split(" ")
  3. .map(word => TimeWord(word, line._2))
  4. ).toDF()
  5. // 假如输入的数据 words 格式是 timestamp: Timestamp. word: String
  6. import org.apache.spark.sql.functions._
  7. val windowedCounts = words.groupBy(
  8. // 设置窗口按照 timestamp 列为参照时间, 20seconds 为窗口大小,10seconds 滑动一次,并且按照 word 进行分组计数
  9. window($"timestamp", "60 seconds", "30 seconds"),
  10. $"word"
  11. ).count
  12. val query: StreamingQuery = windowedCounts.writeStream
  13. .outputMode(OutputMode.Complete)
  14. .format("console")
  15. .option("truncate", "false")
  16. .start()
  17. query.awaitTermination()
  18. }
  19. case class TimeWord(word: String, timestamp: Timestamp)
  20. }
  21. **运行日志:**
  22. Connected to the target VM, address: '127.0.0.1:59350', transport: 'socket'
  23. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
  24. 19/12/09 13:43:06 WARN TextSocketSourceProvider: The socket source should not be used for production applications! It does not support recovery.
  25. -------------------------------------------
  26. Batch: 0
  27. -------------------------------------------
  28. +---------------------------------------------+----+-----+
  29. |window |word|count|
  30. +---------------------------------------------+----+-----+
  31. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|dog |3 |
  32. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|cat |1 |
  33. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|dog |3 |
  34. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|cat |1 |
  35. +---------------------------------------------+----+-----+
  36. -------------------------------------------
  37. Batch: 1
  38. -------------------------------------------
  39. +---------------------------------------------+----+-----+
  40. |window |word|count|
  41. +---------------------------------------------+----+-----+
  42. |[2019-12-09 13:44:30.0,2019-12-09 13:45:30.0]|cat |1 |
  43. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|dog |3 |
  44. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|cat |1 |
  45. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|dog |3 |
  46. |[2019-12-09 13:44:30.0,2019-12-09 13:45:30.0]|owl |1 |
  47. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|cat |2 |
  48. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|owl |1 |
  49. +---------------------------------------------+----+-----+
  50. -------------------------------------------
  51. Batch: 2
  52. -------------------------------------------
  53. +---------------------------------------------+----+-----+
  54. |window |word|count|
  55. +---------------------------------------------+----+-----+
  56. |[2019-12-09 13:44:30.0,2019-12-09 13:45:30.0]|cat |1 |
  57. |[2019-12-09 13:45:00.0,2019-12-09 13:46:00.0]|dog |1 |
  58. |[2019-12-09 13:44:30.0,2019-12-09 13:45:30.0]|dog |1 |
  59. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|dog |3 |
  60. |[2019-12-09 13:43:30.0,2019-12-09 13:44:30.0]|cat |1 |
  61. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|dog |3 |
  62. |[2019-12-09 13:45:00.0,2019-12-09 13:46:00.0]|owl |1 |
  63. |[2019-12-09 13:44:30.0,2019-12-09 13:45:30.0]|owl |2 |
  64. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|cat |2 |
  65. |[2019-12-09 13:44:00.0,2019-12-09 13:45:00.0]|owl |1 |
  66. +---------------------------------------------+----+-----+
  1. 处理延迟的数据和 Watermarking

    考虑假如消息到达应用延迟的情况。例如,假如一个单词是在 12:04(即事件时间) 产生,但是在 12:11 被接收到。应用程序应使用的时间是 12:04 而不是 12:11 去更新 12:00-12:10 这个窗口。这在基于窗口的分组中很自然地发生- Structured Streaming 可以长时间维持部分聚合的中间状态,以便后期数据可以正确更新旧窗口的聚合,如下所示。

    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70 3

    但是,为了持续几天运行这个查询,系统必须限制其累积的中间内存状态的数量。这意味着系统需要知道何时可以从内存中状态删除旧的聚合,因为应用程序将不再该聚合接收到较晚的数据。为了实现这一点,在 Spark2.1 中引入了 Watermarking 功能,该功能让引擎自动跟踪数据中的当前 event-time ,并尝试相应地清除旧状态。您可以通过指定事件时间列和有关事件时间期望数据延迟的阈值来定义查询的 watermarking。对于在时间T结束的特定窗口,引擎将维持状态并允许延迟数据更新状态,直到(最大事件时间-延迟阈值>T)。换句话说,阈值内的延迟数据将被汇总,但是比阈值晚的数据将被丢弃。让我们通过一个例子来理解这一点。我们可以使用 withWatermark() 轻松定义上一个例子中的watermarking。

    1. import spark.implicits._
    2. val windowedCounts = words
    3. .withWatermark("timestamp", "10 minutes")
    4. .groupBy(
    5. // 设置窗口按照 timestamp 列为参照时间, 10minutes 为窗口大小, 5minutes 滑动一次,并且按照 word 进行分组计数
    6. window($"timestamp", "10 minutes", "5 minutes"),$"word"
    7. ).count

    Update 模式:

    在触发计算时它依然高于Watermark 12:04,
    如图所示,引擎跟踪的最大事件时间是蓝色虚线,在每次触发开始时设置的watermark(计算方法是运算截止到触发点时收到的数据最大的 event-time 减去 latethreshold ,也就是减去 10 )。当水印时间小于窗口的结束时间时,计算的数据都被保留为中间数据,当水印时间大于窗口结束时间时,就把这个窗口的运算结果加入到结果表中去,之后即使再收到属于这个窗口的数据,也不再进行计算,而直接忽略掉。
    例如,当引擎观察到数据时 (12:14, dog),它将下一个触发器的watermark设置为12:04。watermark 可让引擎再保持10分钟的中间状态,以便对迟到的数据进行统计。数据(12:09, cat)不正确且延迟,落在窗口12:00 - 12:10和中12:05 - 12:15。由于在触发计算时它仍在Watermark 12:04 之前,因此引擎仍将中间计数保持为状态,并正确更新相关窗口的计数。但是,当 watermark 更新为12:11时,(12:00 - 12:10)窗口的中间状态会被清除,并且所有后续数据 (例如(12:04, donkey))都被认为太迟而被忽略。请注意,按照更新模式规定,在每次触发之后,更新的计数(即紫色行)将写入到接收器中。
    注意:

    1. 某些接收器(例如文件)可能不支持更新模式所需的细粒度更新。
    2. withWatermark在非流数据集上使用是无效的。由于watermark不应以任何方式影响任何批量查询,因此将会直接忽略 watermark。

    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70 4

    Append 模式:
    仅将最终计数写入接收器。与之前的 Update 模式基本类似,引擎为每个窗口维护中间计数。但是,部分计数不会更新到结果表,也不会写入接收器 sink。引擎等待“10分钟”来计算延迟日期,然后将窗口 < watermark的中间状态丢弃,并将最终计数附加到结果表/接收器。例如,只有在将watermark 更新为12:11之后,窗口12:00 - 12:10的最终计数才 append 到结果表中。。
    watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIxNTI1MA_size_16_color_FFFFFF_t_70 5

    watermark 清除聚合状态的条件

    必须满足以下条件

    1. 输出模式必须是 Append 或者 UpdateComplete 模式要求保留所有聚合数据,因此不能使用 watermark 来中断状态。
    2. 聚合必须具有 event-time 或 event-time 窗口。
    3. withWatermark 必须在与聚合中使用的时间戳列相同的列上调用。例如:df.withWatermark(“time”, “1 min”).groupBy(“time2”).count() 是在Append模式下是无效的,因为 watermark 定义的列和聚合的列不一致。
    4. withWatermark 必须在使用水印详细信息的聚合之前调用。例如,df.groupBy(“time”).count().withWatermark(“time”,“1 min”) 在 Append 模式下无效。

参考: http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

发表评论

表情:
评论列表 (有 0 条评论,165人围观)

还没有评论,来说两句吧...

相关阅读