Slipstream中的Window Stream(窗口流)

桃扇骨 2021-09-18 01:12 555阅读 0赞
  1. Window StreamDerived Stream的一种,对一个Stream进行窗口变形需要两个重要的参数:LengthSlideLength是窗口的持续时间,Slide则是执行窗口操作的时间间隔。

1 流处理的窗口以及窗口的切分方式

  1. 流处理的窗口有两种,分别是滑动窗口和跳动窗口。
  2. 滑动窗口需要由两个量来定义:窗口长度(LENGTH)和滑动间隔(SLIDE)。滑动窗口是指按照一定的SLIDE向未来滑动的长度为LENGTH 的窗口。相邻两个窗口之间可能会有重叠的部分。例如:如果窗口长度为2s,滑动间隔为1s,那么第一个窗口为\[0s, 2s),第二个窗口为\[1s, 3s),第三个窗口为\[2s, 4s),以此类推。
  3. 当窗口间隔和滑动间隔相同,滑动窗口就退化为跳动窗口。换句话说,跳动窗口就是滑动窗口LENGTH等于SLIDE的特例。所以跳动窗口只需要一个时间长度(INTERVAL)即可定义,它既是窗口长度也是滑动间隔。例如:INTERVAL2s跳动窗口第一个区间为\[0s, 2s),第二个区间为\[2s, 4s),第三个区间为\[4s, 6s),以此类推。
  4. Slipstream中窗口的切分方式也分为两种,分别是按照系统时间切分和按照事件时间切分。
  5. 按照系统时间切分即为以流处理引擎的时间为基准切分窗口。
  6. 按照事件时间切分即为将数据中的某个字段作为时间字段切分窗口。
  7. Slipstream中,滑动窗口和跳动窗口都可以按照系统时间和事件时间进行切分。

1.1 系统时间切分滑动窗口

  1. 首先创建一个输入流用于接收Kafka传来的数据:
  2. CREATE STREAM s1(id INT, name STRING)
  3. ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  4. TBLPROPERTIES("topic"="yxy",
  5. "kafka.zookeeper"="node3:2181,node2:2181,node1:2181",
  6. "kafka.broker.list"="node3:9092,node2:9092,node1:9092",
  7. "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  8. "transwarp.consumer.sasl.mechanism"="GSSAPI",
  9. "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  10. "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\""
  11. );
  12. 创建一个接收流数据的表:
  13. CREATE TABLE t1 (id INT, name STRING);
  14. 触发一个窗口大小为2秒,滑动间隔为1秒的滑动窗口:
  15. INSERT INTO t1 SELECT * FROM s1 STREAMWINDOW w1 AS (LENGTH '2' SECOND SLIDE '1' SECOND);

20190212135620505.png

  1. 使用Kafka生产数据,数据每隔1秒查询2秒之内的数据并插入到t1中,查询t1表可得:

20190212135233650.png

20190212135656558.png

20190212135233642.png

1.2 事件时间切分滑动窗口

  1. 创建一个输入流用于接收Kafka传来的数据:
  2. CREATE STREAM s2(id INT, name STRING, ts TIMESTAMP)
  3. ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
  4. TBLPROPERTIES("topic"="yxy",
  5. "kafka.zookeeper"="node3:2181,node2:2181,node1:2181",
  6. "kafka.broker.list"="node3:9092,node2:9092,node1:9092",
  7. "transwarp.consumer.security.protocol"="SASL_PLAINTEXT",
  8. "transwarp.consumer.sasl.mechanism"="GSSAPI",
  9. "transwarp.consumer.sasl.kerberos.service.name"="kafka",
  10. "transwarp.consumer.sasl.jaas.config"="com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab=\"/etc/slipstream1/conf/kafka.keytab\" principal=\"kafka@TDH\""
  11. );
  12. 创建一个接收流数据的表:
  13. CREATE TABLE t2 (id INT, name STRING, ts TIMESTAMP);
  14. 触发一个窗口大小为2秒,滑动间隔为1秒的滑动窗口,使用SEPARATED BY指定切分时间的字段:
  15. INSERT INTO t2 SELECT * FROM s2 STREAMWINDOW w1 AS (SEPARATED BY ts LENGTH '2' SECOND SLIDE '1' SECOND);
  16. 使用Kafka生产数据,数据每隔1秒查询2秒之内的数据并插入到t2中。

1.3 系统时间切分跳动窗口

  1. 用相同的方法创建一个输入流(s3)并建立对应的表(t3),使用如下命令触发一个窗口大小和时间间隔均为2秒的跳动窗口:
  2. INSERT INTO t3 SELECT * FROM s3 STREAMWINDOW w1 AS (INTERVAL '2' SECOND);
  3. 跳动窗口以INTERVAL指定窗口长度以及窗口间隔。上述语句的触发流,以2秒为间隔的跳动时间窗口对它进行查询,窗口切分的依据是系统时间。

1.4 事件时间切分跳动窗口

  1. 事件时间切分跳动窗口与系统时间切分跳动窗口的不同之处在于触发流的时候,事件时间切分跳动窗口的触发流语句如下:
  2. INSERT INTO t4 SELECT * FROM s4 STREAMWINDOW w1 AS (SEPARATED BY ts INTERVAL '2' SECOND);
  3. 上述语句以2秒为间隔的跳动时间窗口对它进行查询,窗口切分的依据是s4中的字段ts

发表评论

表情:
评论列表 (有 0 条评论,555人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Slipstream高可用(HA)

           一个Application或者一个StreamJob,如果上游的流发生故障(例如意外退出)无法及时恢复,可能会导致整个系统的瘫痪。因此,流处理系统的高可用性显得尤