Flink教程(6)-Flink Window 详解

「爱情、让人受尽委屈。」 2022-09-08 10:29 399阅读 0赞

文章目录

      • Flink Window
        • window原理与分类
          • 基于时间的Time Window
          • 基于计数的Count Window
        • window api
            • 1)滚动时间窗口
            • 2)滑动时间窗口
            • 3)会话窗口
            • 4)滚动计数窗口
            • 5)滑动计数窗口
            • 6)全量窗口
            • 7)简写方式
            • 8)WindowAssigner
            • 9)开窗函数(window function)
            • 10)案例:自定义数据源,每10秒钟统计一下窗口内所有输入数据的最大值
      • Flink中的时间语义

实际应用中真实的流数据都是无界的,即源源不断地传入。那么如何处理这种无界的数据呢?从微观的角度出发,我们可以把无限的流数据进行切分,得到一个个局部有限数据集。比如1分钟内的数据、1小时内的数据、1天的数据;前5个数据、前10个数据、前100个数据。这种将无限流划分为有限流的方式在flink中称之为window。Flink通过window机制将无限的流数据分发到具有有限大小的bucket(桶)中进行处理。

window原理与分类

基于时间的Time Window
  • 滚动时间窗口(tumbling windows)

    按照固定的时间间隔对数据进行切分。时间对齐,窗口跨度固定,没有重叠。

    比如下图中的窗口划分,每隔1分钟统计一次这1分钟内的数据之和。第一个窗口[1min,2min)期间,user1这条数据流的和3、第二个窗口[2min,3min)和为4、第三个窗口[3min,4min)和为5,第四个窗口[4min,5min)和为24。

在这里插入图片描述

  • 滑动时间窗口(sliding windows)

    滑动窗口可以看作是固定窗口的更一般化形式。滑动窗口由固定的窗口长度和滑动间隔构成,窗口长度固定,窗口可以重叠。

    比如下图中的窗口划分,每隔半分钟(30秒)统计一次1分钟内的数据之和。第一个窗口[1min,2min)期间,user1这条数据流的和3、第二个窗口[1.5min,2.5min)和为3、第三个窗口[2min,3min)和为4,第四个窗口[2.5min,3.5min)和为3。

在这里插入图片描述

  • 会话窗口(session windows)

    类似web应用的session。由一系列事件加上一个指定长度的timeout间隔组成,比如指定的一段时间内没有收到新数据生成一个新的窗口。比如下图中的窗口划分,每隔10秒没有新的数据到达则开启一个会话窗口统计这段时间内数据之和。user1数据流第一个会话窗口内数据和为10、第二个会话窗口内数据和为14;user2数据流第一个会话窗口内数据和为27。
    在这里插入图片描述

滚动窗口前闭后开,滑动的步子大一点,等于一个窗口大小,就退化为滚动窗口。

基于计数的Count Window
  • 滚动计数窗口

    ​ 按照固定的元素个数(事件数量)对数据进行切分。窗口元素个数跨度固定,没有重叠。

    比如下图中,数据流中每到达3个数据统计一次和。user1数据流第一个窗口内的数据和为10、第二个窗口内数据和为17。user2数据流第一个窗口内数据和为10、第二个窗口和为14。

在这里插入图片描述

  • 滑动计数窗口

    ​ 由固定的窗口长度(元素个数)和滑动个数隔构成,窗口长度固定,可以重叠。

    比如下图中,每隔2个数据统计一次3个数之和。user1数据流第一个窗口内的数据之和为5、第二个窗口内数据和为15、第三个窗口内数据和为17。

在这里插入图片描述

3)Global Window 和 Keyed Window

在使用窗口计算时,Flink根据上游数据集是否为KeyedStream类型,对应的Window类型也会有所不同。

  • Keyed Window

    上游数据集如果是KeyedStream类型,即keyBy之后的数据,则调用DataStream API 的Window()方法时数据会根据Key在不同的Task中分别聚合,最后得出窗口内针对每个Key分别统计的结果。

  • Global Window
    如果上游数据集是Non-Keyed类型,即没有keyBy操作,则调用WindowsAll()方法,所有的数据都会在同一个窗口中汇到一个Task中计算,并得出窗口内的全局统计结果。
  • 举例:

    // Global Window
    dataStream.windowAll(自定义的WindowAssigner)
    //KeyedWindow
    dataStream.keyBy(_.filed).window(自定义的WindowAssigner)

window api

前文讲述了几种window的分类和概念。本节展示在flink应用程序中如何使用window。

首先要引入flink的依赖包

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-java</artifactId>
  4. <version>1.11.2</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.flink</groupId>
  8. <artifactId>flink-streaming-java_2.12</artifactId>
  9. <version>1.11.2</version>
  10. </dependency>
  11. <dependency>
  12. <groupId>org.apache.flink</groupId>
  13. <artifactId>flink-clients_2.12</artifactId>
  14. <version>1.11.2</version>
  15. </dependency>

在flink中通常使用.window()开启一个Keyed Window窗口,用.windowAll()方法开启Global Window。即window()方法必须在keyBy之后才能调用,然后可以基于此window做业务聚合操作。下面以一个统计司机实时上报里程数的例子展示各种window的用法。

实体类DriverMileages

  1. public class DriverMileages {
  2. public String driverId;
  3. /** * 上报时的时间戳,单位毫秒 */
  4. public long timestamp;
  5. /** * 相比上一个时间戳内行驶的里程数,单位米 */
  6. public double currentMileage;
  7. //省略getter/setter/toString方法
  8. }
1)滚动时间窗口

.window(TumblingProcessingTimeWindows.of(Time.seconds(x)))

简写方式:.timeWindow(Time.seconds(x))

需求:每隔5秒钟统计一次这5s内各个司机行驶的总里程数

  1. public class WindowDemo1 {
  2. public static void main(String[] args) throws Exception {
  3. int port = 9001;
  4. String hostname = "192.168.174.136";
  5. String delimiter = "\n";
  6. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  7. environment.setParallelism(1);
  8. //连接socket获取输入数据
  9. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  10. //字符串数据转换为实体类
  11. data.map(new MapFunction<String, DriverMileages>() {
  12. @Override
  13. public DriverMileages map(String value) throws Exception {
  14. String[] split = value.split(",");
  15. DriverMileages driverMileages = new DriverMileages();
  16. driverMileages.driverId = split[0];
  17. driverMileages.currentMileage = Double.parseDouble(split[1]);
  18. driverMileages.timestamp = Long.parseLong(split[2]);
  19. return driverMileages;
  20. }
  21. })
  22. .keyBy(DriverMileages::getDriverId)
  23. .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  24. //.timeWindow(Time.seconds(5))
  25. .sum("currentMileage")
  26. .print();
  27. //启动计算任务
  28. environment.execute("window demo1");
  29. }
  30. }

Linux 命令行开启socket监听,输入数据

  1. [root@vm1 ~]# nc -lk 9001
  2. 1001,20,1605419689100
  3. 1002,25,1605419690100
  4. 1001,20,1605419689100

IDEA控制台输出

  1. DriverMileages{driverId='1001', timestamp=1605419689100, mileage=40.0}
  2. DriverMileages{driverId='1002', timestamp=1605419690100, mileage=25.0}

如果不设置窗口,则会从开始统计时,一直累加,不会划分窗口来统计。

2)滑动时间窗口

.window(SlidingProcessingTimeWindows.of(Time.seconds(x), Time.seconds(y)))

简写方式:.timeWindow(Time.seconds(x),Time.seconds(y))

需求:每隔2秒钟统计一次司机5秒内的总里程数

  1. public static void main(String[] args) throws Exception {
  2. int port = 9001;
  3. String hostname = "192.168.174.136";
  4. String delimiter = "\n";
  5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  6. environment.setParallelism(1);
  7. //连接socket获取输入数据
  8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  9. //字符串数据转换为实体类
  10. data.map(new MapFunction<String, DriverMileages>() {
  11. @Override
  12. public DriverMileages map(String value) throws Exception {
  13. String[] split = value.split(",");
  14. DriverMileages driverMileages = new DriverMileages();
  15. driverMileages.driverId = split[0];
  16. driverMileages.currentMileage = Double.parseDouble(split[1]);
  17. driverMileages.timestamp = Long.parseLong(split[2]);
  18. return driverMileages;
  19. }
  20. })
  21. .keyBy(DriverMileages::getDriverId)
  22. .window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(2)))
  23. //.timeWindow(Time.seconds(5),Time.seconds(2))
  24. .sum("currentMileage")
  25. .print();
  26. //启动计算任务
  27. environment.execute("window demo1");
  28. }
3)会话窗口

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

需求:若司机每隔10s没有新数据则输出一次统计结果

  1. public static void main(String[] args) throws Exception {
  2. int port = 9001;
  3. String hostname = "192.168.174.136";
  4. String delimiter = "\n";
  5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  6. environment.setParallelism(1);
  7. //连接socket获取输入数据
  8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  9. //字符串数据转换为实体类
  10. data.map(new MapFunction<String, DriverMileages>() {
  11. @Override
  12. public DriverMileages map(String value) throws Exception {
  13. String[] split = value.split(",");
  14. DriverMileages driverMileages = new DriverMileages();
  15. driverMileages.driverId = split[0];
  16. driverMileages.currentMileage = Double.parseDouble(split[1]);
  17. driverMileages.timestamp = Long.parseLong(split[2]);
  18. return driverMileages;
  19. }
  20. })
  21. .keyBy(DriverMileages::getDriverId)
  22. .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
  23. .sum("currentMileage")
  24. .print();
  25. //启动计算任务
  26. environment.execute("window demo1");
  27. }

数据源源不断地进入flink,如果两个数据之间的时间间隔小于设定的10秒,它们还是属于同一个时间窗口。如果两个数据之间的时间间隔大于10秒了,则前一个窗口关闭,后一个窗口开启。

4)滚动计数窗口

需求:司机每上传3次数据统计一次这3次的总里程

.countWindow(x)

  1. public static void main(String[] args) throws Exception {
  2. int port = 9001;
  3. String hostname = "192.168.174.136";
  4. String delimiter = "\n";
  5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  6. environment.setParallelism(1);
  7. //连接socket获取输入数据
  8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  9. //字符串数据转换为实体类
  10. data.map(new MapFunction<String, DriverMileages>() {
  11. @Override
  12. public DriverMileages map(String value) throws Exception {
  13. String[] split = value.split(",");
  14. DriverMileages driverMileages = new DriverMileages();
  15. driverMileages.driverId = split[0];
  16. driverMileages.currentMileage = Double.parseDouble(split[1]);
  17. driverMileages.timestamp = Long.parseLong(split[2]);
  18. return driverMileages;
  19. }
  20. })
  21. .keyBy(DriverMileages::getDriverId)
  22. .countWindow(3)
  23. .sum("currentMileage")
  24. .print();
  25. //启动计算任务
  26. environment.execute("window demo1");
  27. }

linux命令行输入如下数据,可见只有3个数据到达时才会触发计算

  1. [root@vm1 ~]# nc -lk 9001
  2. 1002,25,1605419690100
  3. 1001,20,1605419689100
  4. 1002,25,1605419690100
  5. 1001,20,1605419689100
  6. 1002,25,1605419690100
  7. 1001,20,1605419689100

IDEA控制台输出

  1. DriverMileages{ driverId='1002', timestamp=1605419690100, mileage=75.0}
  2. DriverMileages{ driverId='1001', timestamp=1605419689100, mileage=60.0}
5)滑动计数窗口

需求:司机每上传1次数据统计一次前3次(包含本次)数据的总里程

.countWindow(x,y)

  1. public static void main(String[] args) throws Exception {
  2. int port = 9001;
  3. String hostname = "192.168.174.136";
  4. String delimiter = "\n";
  5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  6. environment.setParallelism(1);
  7. //连接socket获取输入数据
  8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  9. //字符串数据转换为实体类
  10. data.map(new MapFunction<String, DriverMileages>() {
  11. @Override
  12. public DriverMileages map(String value) throws Exception {
  13. String[] split = value.split(",");
  14. DriverMileages driverMileages = new DriverMileages();
  15. driverMileages.driverId = split[0];
  16. driverMileages.currentMileage = Double.parseDouble(split[1]);
  17. driverMileages.timestamp = Long.parseLong(split[2]);
  18. return driverMileages;
  19. }
  20. })
  21. .keyBy(DriverMileages::getDriverId)
  22. .countWindow(3,1)
  23. .sum("currentMileage")
  24. .print();
  25. //启动计算任务
  26. environment.execute("window demo1");
  27. }
6)全量窗口

.windowAll统计时,不再需要根据key区分数据,而是对全量数据在窗口期内做统计。

全量时间:.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)));每5秒钟统计一次

全量计数:.windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(2)));每2个数据统计一次

  1. public class WindowDemo1 {
  2. public static void main(String[] args) throws Exception {
  3. int port = 9001;
  4. String hostname = "192.168.174.136";
  5. String delimiter = "\n";
  6. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
  7. environment.setParallelism(1);
  8. //连接socket获取输入数据
  9. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
  10. //字符串数据转换为实体类
  11. data.map(new MapFunction<String, DriverMileages>() {
  12. @Override
  13. public DriverMileages map(String value) throws Exception {
  14. String[] split = value.split(",");
  15. DriverMileages driverMileages = new DriverMileages();
  16. driverMileages.driverId = split[0];
  17. driverMileages.currentMileage = Double.parseDouble(split[1]);
  18. driverMileages.timestamp = Long.parseLong(split[2]);
  19. return driverMileages;
  20. }
  21. })
  22. .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
  23. .sum("currentMileage")
  24. .print();
  25. //启动计算任务
  26. environment.execute("windowAll demo1");
  27. }
  28. }

此时,不管是按时间还是计数方式,输出的是全量数据统计结果,不按司机id区分

  1. [root@vm1 ~]# nc -lk 9001
  2. 1001,50,1605419689100
  3. 1002,25,1605419690100
  4. DriverMileages{ driverId='1001', timestamp=1605419689100, mileage=75.0}
7)简写方式

在上述api的例子中,可以看到Flink为内置的窗口提供了简写方式,不需要调用.window()方法,直接简写即可,比如:
滚动时间窗口.timeWindow(Time.seconds(10))
滑动时间窗口.timeWindow(Time.seconds(10),Time.seconds(2))
滚动计数窗口.countWindow(10)
滑动计数窗口.countWindow(10,2)

简写方式,传一个参数表示滚动窗口,2个参数表示滑动窗口。滑动窗口有2个参数,时间和滑动步长。

滚动窗口有offset参数,比如9:00->10:00 是1小时, 9:05->10:05也是1小时,这里的5分钟就是offset。

简化方式也不能指定offset偏移量,原始方式可以。

会话窗口没有简写方式。

8)WindowAssigner

​ .window()方法的输入参数实质上是一个WindowAssigner类型,到达窗口的元素被传递给 WindowAssigner,由WindowAssigner负责把每个输入数据分发到正确的window中。

Flink内置了一些通用的WindowAssigner。

TumblingEventTimeWindows:基于EventTime的滚动时间窗口分配器

TumblingProcessingTimeWindows:基于ProcessingTime的滚动时间窗口分配器

SlidingEventTimeWindows:基于EventTime的滑动时间窗口分配器

SlidingProcessingTimeWindows:基于ProcessingTime的滑动时间窗口分配器

EventTimeSessionWindows:基于EventTime的会话窗口分配器

ProcessingTimeSessionWindows:基于ProcessingTime的会话窗口分配器

GlobalWindows:全局窗口分配器,所有数据分配到一个窗口

上面例子中,窗口操作都没有设置时间语义,因此用的是Flink默认的时间语义,即ProcessTime。也可以使用EventTime(事件时间)。事件时间一般结合watermark使用,具体使用方式在讲解watermark时给出。

9)开窗函数(window function)

开窗函数定义了对window中收集的数据如何处理,可以分为两类。

  • 增量聚合函数
    来一条数据计算一次,比如ReduceFunction和AggregationFunction。性能比全量聚合高,但无法获取窗口元数据。

    ReduceFunction函数:使用reduce函数自行实现求和功能。

    1. public class WindowFunctionDemo1 {
    2. public static void main(String[] args) throws Exception {
    3. int port = 9001;
    4. String hostname = "192.168.174.136";
    5. String delimiter = "\n";
    6. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    7. environment.setParallelism(1);
    8. //连接socket获取输入数据
    9. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
    10. //字符串数据转换为实体类
    11. data.map(new MapFunction<String, DriverMileages>() {
    12. @Override
    13. public DriverMileages map(String value) throws Exception {
    14. String[] split = value.split(",");
    15. DriverMileages driverMileages = new DriverMileages();
    16. driverMileages.driverId = split[0];
    17. driverMileages.currentMileage = Double.parseDouble(split[1]);
    18. driverMileages.timestamp = Long.parseLong(split[2]);
    19. return driverMileages;
    20. }
    21. })
    22. .keyBy(DriverMileages::getDriverId)
    23. .timeWindow(Time.seconds(5))
    24. .reduce(new ReduceFunction<DriverMileages>() {
    25. @Override
    26. public DriverMileages reduce(DriverMileages t1, DriverMileages t2) throws Exception {
    27. DriverMileages newItem = new DriverMileages();
    28. newItem.driverId = t1.driverId;
    29. newItem.currentMileage = t1.getCurrentMileage() + t2.getCurrentMileage();
    30. return newItem;
    31. }
    32. })
    33. .print();
    34. //启动计算任务
    35. environment.execute("window demo1");

    AggregationFunction函数,需求:使用aggregation函数实现求和功能。Aggregation可以看作是ReduceFunction的通用版本。

    Aggregation支持三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT),可以使用自定义实体类。

    1. public static void main(String[] args) throws Exception {
    2. int port = 9001;
    3. String hostname = "192.168.174.136";
    4. String delimiter = "\n";
    5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    6. environment.setParallelism(1);
    7. //连接socket获取输入数据
    8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
    9. //字符串数据转换为实体类
    10. data.map(new MapFunction<String, DriverMileages>() {
    11. @Override
    12. public DriverMileages map(String value) throws Exception {
    13. String[] split = value.split(",");
    14. DriverMileages driverMileages = new DriverMileages();
    15. driverMileages.driverId = split[0];
    16. driverMileages.currentMileage = Double.parseDouble(split[1]);
    17. driverMileages.timestamp = Long.parseLong(split[2]);
    18. return driverMileages;
    19. }
    20. })
    21. .keyBy(DriverMileages::getDriverId)
    22. .timeWindow(Time.seconds(5))
    23. .aggregate(new AggregateFunction<DriverMileages, DriverMileages, DriverMileages>() {
    24. /** * 创建累加器保存中间状态 * @return */
    25. @Override
    26. public DriverMileages createAccumulator() {
    27. return new DriverMileages();
    28. }
    29. /** * 将元素添加到累加器并返回新的累加器 * @param value * @param accumulator * @return */
    30. @Override
    31. public DriverMileages add(DriverMileages value, DriverMileages accumulator) {
    32. DriverMileages newItem = new DriverMileages();
    33. newItem.currentMileage = value.currentMileage + accumulator.currentMileage;
    34. newItem.driverId = value.driverId;
    35. return newItem;
    36. }
    37. /** * 从累加器提取结果 * @param accumulator * @return */
    38. @Override
    39. public DriverMileages getResult(DriverMileages accumulator) {
    40. return accumulator;
    41. }
    42. /** * 合并累加器 * @param a * @param b * @return */
    43. @Override
    44. public DriverMileages merge(DriverMileages a, DriverMileages b) {
    45. DriverMileages newItem = new DriverMileages();
    46. newItem.currentMileage = a.currentMileage + b.currentMileage;
    47. newItem.driverId = a.driverId;
    48. return newItem;
    49. }
    50. })
    51. .print();
    52. //启动计算任务
    53. environment.execute("window demo1");
    54. }
  • 全量窗口函数

    先把窗口期内的所有数据收集起来,等到计算的时候一次性处理所有数据,比如ProcessWindowFunction。由于先要缓存窗口内的所有元素,因此性能略低,但是可以拿到窗口元数据。

    ProcessWindowFunction:实现司机里程数求和功能,并输出详细的window信息。

    1. public static void main(String[] args) throws Exception {
    2. int port = 9001;
    3. String hostname = "192.168.174.136";
    4. String delimiter = "\n";
    5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    6. environment.setParallelism(1);
    7. //连接socket获取输入数据
    8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
    9. //字符串数据转换为实体类
    10. data.map(new MapFunction<String, DriverMileages>() {
    11. @Override
    12. public DriverMileages map(String value) throws Exception {
    13. String[] split = value.split(",");
    14. DriverMileages driverMileages = new DriverMileages();
    15. driverMileages.driverId = split[0];
    16. driverMileages.currentMileage = Double.parseDouble(split[1]);
    17. driverMileages.timestamp = Long.parseLong(split[2]);
    18. return driverMileages;
    19. }
    20. })
    21. .keyBy(DriverMileages::getDriverId)
    22. .timeWindow(Time.seconds(5))
    23. .process(new ProcessWindowFunction<DriverMileages, Object, String, TimeWindow>() {
    24. @Override
    25. public void process(String s, Context context, Iterable<DriverMileages> elements, Collector<Object> out) throws Exception {
    26. Iterator<DriverMileages> iterator = elements.iterator();
    27. long total = 0;
    28. int count = 0;//统计窗口内元素个数
    29. String key = "";
    30. while (iterator.hasNext()) {
    31. DriverMileages next = iterator.next();
    32. total += next.currentMileage;
    33. count++;
    34. key = next.driverId;
    35. }
    36. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    37. out.collect("key:" + key + ",window[" + sdf.format(context.window().getStart()) + "," + sdf.format(context.window().getEnd()) + "),count:" + count + ",sum is:" + total);
    38. }
    39. })
    40. .print();
    41. //启动计算任务
    42. environment.execute("window demo1");
    43. }

    linux命令行输入

    1. [root@vm1 ~]# nc -lk 9001
    2. 1002,25,1605419690100
    3. 1002,25,1605419690100
    4. 1001,70,1605419689100

    IDEA控制台输出

    1. key:1002,window[2020-11-15 21:20:20.000,2020-11-15 21:20:25.000),count:2,sum is:50
    2. key:1001,window[2020-11-15 21:20:40.000,2020-11-15 21:20:45.000),count:1,sum is:70

    如果上游数据是windowedStream,则可以直接使用apply方法。process方法更底层一些,keyBy后就可以直接调用process()。

    1. public static void main(String[] args) throws Exception {
    2. int port = 9001;
    3. String hostname = "192.168.174.136";
    4. String delimiter = "\n";
    5. StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    6. environment.setParallelism(1);
    7. //连接socket获取输入数据
    8. DataStreamSource<String> data = environment.socketTextStream(hostname, port, delimiter);
    9. //字符串数据转换为实体类
    10. data.map(new MapFunction<String, DriverMileages>() {
    11. @Override
    12. public DriverMileages map(String value) throws Exception {
    13. String[] split = value.split(",");
    14. DriverMileages driverMileages = new DriverMileages();
    15. driverMileages.driverId = split[0];
    16. driverMileages.currentMileage = Double.parseDouble(split[1]);
    17. driverMileages.timestamp = Long.parseLong(split[2]);
    18. return driverMileages;
    19. }
    20. })
    21. .keyBy(DriverMileages::getDriverId)
    22. .timeWindow(Time.seconds(5))
    23. .apply(new WindowFunction<DriverMileages, Object, String, TimeWindow>() {
    24. @Override
    25. public void apply(String s, TimeWindow window, Iterable<DriverMileages> input, Collector<Object> out) throws Exception {
    26. Iterator<DriverMileages> iterator = input.iterator();
    27. long total = 0;
    28. int count = 0;
    29. while (iterator.hasNext()) {
    30. DriverMileages next = iterator.next();
    31. total += next.currentMileage;
    32. count++;
    33. }
    34. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    35. out.collect("key:" + s + ",window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "],count:" + count + ",sum is:" + total);
    36. }
    37. })
    38. .print();
    39. //启动计算任务
    40. environment.execute("window demo1");
    41. }
10)案例:自定义数据源,每10秒钟统计一下窗口内所有输入数据的最大值
  1. public class MyDataSource implements SourceFunction<Integer> {
  2. private boolean isRunning = true;
  3. /** * run方法里编写数据产生逻辑 * * @param ctx * @throws Exception */
  4. @Override
  5. public void run(SourceContext<Integer> ctx) throws Exception {
  6. SecureRandom secureRandom = new SecureRandom();
  7. while (isRunning) {
  8. int i = secureRandom.nextInt(1000);
  9. ctx.collect(i);
  10. System.out.println("source time:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")) + ",value:" + i);
  11. Thread.sleep(1000);//1秒钟生成一次数据
  12. }
  13. }
  14. @Override
  15. public void cancel() {
  16. isRunning = false;
  17. }
  18. }
  19. public class WindowDemo {
  20. public static void main(String[] args) throws Exception {
  21. final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  22. env.setParallelism(1);
  23. DataStreamSource<Integer> dataStreamSource = env.addSource(new MyDataSource());
  24. dataStreamSource
  25. .timeWindowAll(Time.seconds(10))
  26. .apply(new AllWindowFunction<Integer, Object, TimeWindow>() {
  27. @Override
  28. public void apply(TimeWindow window, Iterable<Integer> values, Collector<Object> out) throws Exception {
  29. List<Integer> items = new ArrayList<>();
  30. Iterator<Integer> iterator = values.iterator();
  31. while (iterator.hasNext()) {
  32. items.add(iterator.next());
  33. }
  34. Collections.sort(items);
  35. Integer max = items.get(items.size() - 1);
  36. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
  37. out.collect("window[" + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd()) + "),max:" + max);
  38. }
  39. })
  40. .print();
  41. env.execute("window demo");
  42. }
  43. }

Flink中窗口时间的划分与数据无关,是系统定义好了的。Flink先按照自然时间将 window 划分,比如定义的 window 大小是 5秒,那么 1 分钟内会把 window 划分为如下的区间[左闭右开):

  1. [00:00:00 00:00:05)
  2. [00:00:05 00:00:10)
  3. [00:00:10 00:00:15)
  4. ...
  5. [00:00:55 00:01:00)

运行上述window api相关的demo,便会发现此特征。比如设置的window时间是5秒,则每次都是在05,10,15…60这些时间点触发窗口计算。并不是真正意义上的”每隔”5秒,如果数据达到时间很接近窗口边界,那么很快就会触发。

前文讲解window时我们提到了设置窗口时间,那么这个时间到底是什么时间呢?事件发生的时间还是数据被处理的时间呢?这就取决于在flink应用程序中设置的时间语义了。

在flink中,有3中不同的时间语义

  • event time:事件创建的时间。比如用户点击按钮触发时。
  • ingestion time:数据进入flink的时间。即经过一系列的网络传输,进入flink source的时间。
  • processing time:flink执行operator操作时本地系统时间。从source进来到分配到TaskManager中的slot处理也是耗时的,比如数据重分区,因此存在理论上的时间差。即理论上processing time晚于ingestion time。
    在这里插入图片描述

3种时间语义中Flink默认使用ProcessTime,实际业务开发中一般更关心事件发生的时间,即event time。

事件时间event time是数据本身携带的时间,需要从业务数据中提取。比如日志数据中,一般都会自带timestamp业务字段。

使用EventTime要在应用程序中显式告诉flink如何从输入数据中提取事件时间戳,否则应用程序启动报错。

相比Event Time ,使用Ingestion Time 时应用程序无法处理任何乱序事件或延迟数据,当然程序中也不必指定如何生成watermark,会自动分配时间戳和生成水印。

flink中设置流的时间特性:

  1. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

如果不设置时间特性,flink默认使用processing time:这是数据一致性和处理性能之间权衡的结果。

发表评论

表情:
评论列表 (有 0 条评论,399人围观)

还没有评论,来说两句吧...

相关阅读

    相关 Flink窗口(Flink Window)

    上一篇讲到Flink 中事件时间和水位线的概念,那它们有什么具体应用呢?当然是做基于时间的处理计算了。其中最常见的场景,就是窗口聚合计算。之前我们已经了解了 Flink ...

    相关 Flink教程

    Flink教程 【订阅[专栏合集][Link 1],作者所有付费文章都能看(持续更新)】 推荐【Kafka教程】[https://bigbird.blog.csdn.n