【Flink】基于 Flink 的电商用户行为分析(一)

妖狐艹你老母 2023-02-18 09:33 139阅读 0赞

1、项目整体介绍

电商的用户行为

电商平台中的用户行为频繁且较复杂,系统上线运行一段时间后,可以收集到大量的用户行为数据,进而利用大数据技术进行深入挖掘和分析,得到感兴趣的商业指标并增强对风险的控制。
电商用户行为数据多样,整体可以分为用户行为习惯数据和业务行为数据两大类。用户的行为习惯数据包括了用户的登录方式、上线的时间点及时长、点击和浏览页面、页面停留时间以及页面跳转等等,我们可以从中进行流量统计和热门商品的统计,也可以深入挖掘用户的特征;这些数据往往可以从 web 服务器日志中直接读取到。而业务行为数据就是用户在电商平台中针对每个业务(通常是某个具体商品)所作的操作,我们一般会在业务系统中相应的位置埋点,然后收集日志进行分析。业务行为数据又可以简单分为两类:一类是能够明显地表现出用户兴趣的行为,比如对商品的收藏、喜欢、评分和评价,我们可以从中对数据进行深入分析,得到用户画像,进而对用户给出个性化的推荐商品列表,这个过程往往会用到机器学习相关的算法;另一类则是常规的业务操作,但需要着重关注一些异常状况以做好风
控,比如登录和订单支付。
watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dvbmd4aWZhY2FpX2JlbGlldmU_size_16_color_FFFFFF_t_70_pic_center

项目主要模块

基于对电商用户行为数据的基本分类,我们可以发现主要有以下三个分析方向:

  1. 热门统计
    利用用户的点击浏览行为,进行流量统计、近期热门商品统计等。
  2. 偏好统计
    利用用户的偏好行为,比如收藏、喜欢、评分等,进行用户画像分析,给出个性化的商品推荐列表。
  3. 风险控制
    利用用户的常规业务行为,比如登录、下单、支付等,分析数据,对异常情况进行报警提示。
    本项目限于数据,我们只实现热门统计和风险控制中的部分内容,将包括以下四大模块:实时热门商品统计、实时流量统计、恶意登录监控和订单支付失效监控。

watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2dvbmd4aWZhY2FpX2JlbGlldmU_size_16_color_FFFFFF_t_70_pic_center 1
由于对实时性要求较高,我们会用 flink 作为数据处理的框架。在项目中,我们将综合运用 flink 的各种 API,基于 EventTime 去处理基本的业务需求,并且灵活地使用底层的 processFunction,基于状态编程和 CEP 去处理更加复杂的情形。

数据源解析

我们准备了一份淘宝用户行为数据集,保存为 csv 文件。本数据集包含了淘宝上某一天随机一百万用户的所有行为(包括点击、购买、收藏、喜欢)。数据集的每一行表示一条用户行为,由用户 ID、商品 ID、商品类目 ID、行为类型和时间戳组成,并以逗号分隔。关于数据集中每一列的详细描述如下:




































字段名 数据类型 说明
userId Long 加密后的用户 ID
itemId Long 加密后的商品 ID
categoryId Int 加密后的商品所属类别 ID
behavior String 用户行为类型,包括(‘pv’, ‘’buy, ‘cart’,‘fav’)
timestamp Long 行为发生的时间戳,单位秒

另外,我们还可以拿到 web 服务器的日志数据,这里以 apache 服务器的一份 log 为例,每一行日志记录了访问者的 IP、userId、访问时间、访问方法以及访问的 url,具体描述如下:




































字段名 数据类型 说明
ip String 访问的 IP
userId Long 访问的 user ID
eventTime Long 访问时间
method String 访问方法 GET/POST/PUT/DELETE
url String 访问的 url

由于行为数据有限,在实时热门商品统计模块中可以使用 UserBehavior 数据集,而对于恶意登录监控和订单支付失效监控,我们只以示例数据来做演示。

2、实时热门商品统计

首先要实现的是实时热门商品统计,我们将会基于 UserBehavior 数据集来进行分析。
项目主体用 Scala 编写,采用 IDEA 作为开发环境进行项目编写,采用 maven 作为项目构建和管理工具。首先我们需要搭建项目框架。

项目框架搭建

打开 IDEA,创建一个 maven 项目,命名为 UserBehaviorAnalysis。由于包含了多个模块,我们可以以 UserBehaviorAnalysis 作为父项目,并在其下建一个名为 HotItemsAnalysis 的子项目,用于实时统计热门 topN 商品。
在 UserBehaviorAnalysis 下新 建一个 maven module 作 为子项 目,命名为 HotItemsAnalysis。
父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以 UserBehaviorAnalysis 下的 src 文件夹可以删掉。

声明项目中工具的版本信息

我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在最外层的 UserBehaviorAnalysis 中声明所有子模块共用的版本信息。
在 pom.xml 中加入以下配置:
UserBehaviorAnalysis/pom.xml

  1. <properties>
  2. <flink.version>1.7.2</flink.version>
  3. <scala.binary.version>2.11</scala.binary.version>
  4. <kafka.version>2.2.0</kafka.version>
  5. </properties>
添加项目依赖

对于整个项目而言,所有模块都会用到 flink 相关的组件,所以我们在 UserBehaviorAnalysis 中引入公有依赖:
UserBehaviorAnalysis/pom.xml

  1. <dependencies>
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-scala_${scala.binary.version}</artifactId>
  5. <version>${flink.version}</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.flink</groupId>
  9. <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
  10. <version>${flink.version}</version>
  11. </dependency>
  12. <dependency>
  13. <groupId>org.apache.kafka</groupId>
  14. <artifactId>kafka_${scala.binary.version}</artifactId>
  15. <version>${kafka.version}</version>
  16. </dependency>
  17. <dependency>
  18. <groupId>org.apache.flink</groupId>
  19. <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
  20. <version>${flink.version}</version>
  21. </dependency>
  22. </dependencies>

同样,对于 maven 项目的构建,可以引入公有的插件:

  1. <build>
  2. <plugins>
  3. <!-- 该插件用于将 Scala 代码编译成 class 文件 -->
  4. <plugin>
  5. <groupId>net.alchim31.maven</groupId>
  6. <artifactId>scala-maven-plugin</artifactId>
  7. <version>3.4.6</version>
  8. <executions>
  9. <execution>
  10. <!-- 声明绑定到 maven 的 compile 阶段 -->
  11. <goals>
  12. <goal>testCompile</goal>
  13. </goals>
  14. </execution>
  15. </executions>
  16. </plugin>
  17. <plugin>
  18. <groupId>org.apache.maven.plugins</groupId>
  19. <artifactId>maven-assembly-plugin</artifactId>
  20. <version>3.0.0</version>
  21. <configuration>
  22. <descriptorRefs>
  23. <descriptorRef>
  24. jar-with-dependencies
  25. </descriptorRef>
  26. </descriptorRefs>
  27. </configuration>
  28. <executions>
  29. <execution>
  30. <id>make-assembly</id>
  31. <phase>package</phase>
  32. <goals>
  33. <goal>single</goal>
  34. </goals>
  35. </execution>
  36. </executions>
  37. </plugin>
  38. </plugins>
  39. </build>

在 HotItemsAnalysis 子模块中,我们并没有引入更多的依赖,所以不需要改动 pom 文件。

数据准备

在 src/main/目录下,可以看到已有的默认源文件目录是 java,我们可以将其改名为 scala。将数据文件 UserBehavior.csv 复制到资源文件目录 src/main/resources 下,我们将从这里读取数据。
至此,我们的准备工作都已完成,接下来可以写代码了。

模块代码实现

我们将实现一个“实时热门商品”的需求,可以将“实时热门商品”翻译成程序员更好理解的需求:每隔 5 分钟输出最近一小时内点击量最多的前 N 个商品。将这个需求进行分解我们大概要做这么几件事情:

  • 抽取出业务时间戳,告诉 Flink 框架基于业务时间做窗口
  • 过滤出点击行为数据
  • 按一小时的窗口大小,每 5 分钟统计一次,做滑动窗口聚合(Sliding Window)
  • 按每个窗口聚合,输出每个窗口中点击量前 N 名的商品
程序主体

在 src/main/scala 下创建 HotItems.scala 文件,新建一个单例对象。定义样例类 UserBehavior 和 ItemViewCount,在 main 函数中创建 StreamExecutionEnvironment 并做配置,然后从 UserBehavior.csv 文件中读取数据,并包装成 UserBehavior 类型。
代码如下:
HotItemsAnalysis/src/main/scala/HotItems.scala

  1. case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
  3. object HotItems {
  4. def main(args: Array[String]): Unit = {
  5. // 创建一个 StreamExecutionEnvironment
  6. val env = StreamExecutionEnvironment.getExecutionEnvironment
  7. // 设定 Time 类型为 EventTime
  8. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  9. // 为了打印到控制台的结果不乱序,我们配置全局的并发为 1 ,这里改变并发对结果正确性没有影响
  10. env.setParallelism(1)
  11. val stream = env
  12. // 以 window 下为例,需替换成自己的路径
  13. .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
  14. .map(line => {
  15. val linearray = line.split(",")
  16. UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
  17. })
  18. // 指定时间戳和 watermark
  19. .assignAscendingTimestamps(_.timestamp * 1000)
  20. env.execute("Hot Items Job")
  21. }

这里注意,我们需要统计业务时间上的每小时的点击量,所以要基于 EventTime 来处理。那么如果让 Flink 按照我们想要的业务时间来处理呢?这里主要有两件事情要做。
第一件是告诉 Flink 我们现在按照 EventTime 模式进行处理,Flink 默认使用 ProcessingTime 处理,所以我们要显式设置如下:

  1. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

第二件事情是指定如何获得业务时间,以及生成 Watermark。Watermark 是用来追踪业务事件的概念,可以理解成 EventTime 世界中的时钟,用来指示当前处理到什么时刻的数据了。由于我们的数据源的数据已经经过整理,没有乱序,即事件的时间戳是单调递增的,所以可以将每条数据的业务时间就当做 Watermark。这里我们用 assignAscendingTimestamps 来实现时间戳的抽取和 Watermark 的生成。
注:真实业务场景一般都是乱序的,所以一般不用assignAscendingTimestamps,而是使用 BoundedOutOfOrdernessTimestampExtractor。

  1. .assignAscendingTimestamps(_.timestamp * 1000)

这样我们就得到了一个带有时间标记的数据流了,后面就能做一些窗口的操作。

过滤出点击事件

在开始窗口操作之前,先回顾下需求“每隔 5 分钟输出过去一小时内点击量最多的前 N 个商品”。由于原始数据中存在点击、购买、收藏、喜欢各种行为的数据,但是我们只需要统计点击量,所以先使用 filter 将点击行为数据过滤出来。

  1. .filter(_.behavior == "pv")
设置滑动窗口,统计点击量

由于要每隔 5 分钟统计一次最近一小时每个商品的点击量,所以窗口大小是一小时,每隔 5 分钟滑动一次。即分别要统计[09:00, 10:00), [09:05, 10:05), [09:10,10:10)…等窗口的商品点击量。是一个常见的滑动窗口需求(Sliding Window)。

  1. .keyBy("itemId")
  2. .timeWindow(Time.minutes(60), Time.minutes(5))
  3. .aggregate(new CountAgg(), new WindowResultFunction());

我们使用.keyBy(“itemId”)对商品进行分组,使用.timeWindow(Time size, Time slide)对每个商品做滑动窗口(1 小时窗口,5 分钟滑动一次)。然后我们使用 .aggregate(AggregateFunction af, WindowFunction wf) 做增量的聚合操作,它能使用 AggregateFunction 提 前 聚 合 掉 数 据 , 减 少 state 的 存 储 压 力 。 较之 .apply(WindowFunction wf) 会将窗口中的数据都存储下来,最后一起计算要高效得多。这里的 CountAgg 实现了 AggregateFunction 接口,功能是统计窗口中的条数,即遇到一条数据就加一。

  1. // COUNT 统计的聚合函数实现,每出现一条记录就加一
  2. class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  3. override def createAccumulator(): Long = 0L
  4. override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
  5. override def getResult(acc: Long): Long = acc
  6. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  7. }

聚合操作.aggregate(AggregateFunction af, WindowFunction wf)的第二个参数 WindowFunction 将每个 key 每个窗口聚合后的结果带上其他信息进行输出。我们这里实现的 WindowResultFunction 将 <主键商品 ID,窗 口 , 点击量>封装成了 ItemViewCount 进行输出。

  1. // 商品点击量(窗口操作的输出类型)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)

代码如下:

  1. // 用于输出窗口的结果
  2. class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  3. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[ItemViewCount]) : Unit = {
  4. val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
  5. val count = aggregateResult.iterator.next
  6. collector.collect(ItemViewCount(itemId, window.getEnd, count))
  7. }
  8. }

现在我们就得到了每个商品在每个窗口的点击量的数据流。

计算最热门 Top N 商品

为了统计每个窗口下最热门的商品,我们需要再次按窗口进行分组,这里根据 ItemViewCount 中的 windowEnd 进行 keyBy()操作。然后使用ProcessFunction 实现一个自定义的 TopN 函数 TopNHotItems 来计算点击量排名前 3 名的商品,并将排名结果格式化成字符串,便于后续输出。

  1. .keyBy("windowEnd")
  2. .process(new TopNHotItems(3)); // 求点击量前 3 名的商品

ProcessFunction 是 Flink 提供的一个 low-level API,用于实现更高级的功能。它主要提供了定时器 timer 的功能(支持 EventTime 或 ProcessingTime)。本案例中我们将利用 timer 来判断何时收齐了某个 window 下所有商品的点击量数据。由于 Watermark 的进度是全局的,在 processElement 方法中,每当收到一条数据 ItemViewCount,我们就注册一个 windowEnd+1 的定时器(Flink 框架会自动忽略同一时间的重复注册)。windowEnd+1 的定时器被触发时,意味着收到了 windowEnd+1
的 Watermark,即收齐了该 windowEnd 下的所有商品窗口统计值。我们在 onTimer() 中处理将收集的所有商品及点击量进行排序,选出 TopN,并将排名信息格式化成字符串后进行输出。
这里我们还使用了 ListState来存储收到的每条 ItemViewCount 消息,保证在发生故障时,状态数据的不丢失和一致性。ListState 是 Flink 提供的类似 Java List 接口的 State API,它集成了框架的 checkpoint 机制,自动做到了 exactly-once 的语义保证。

  1. // 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
  2. class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
  3. private var itemState : ListState[ItemViewCount] = _
  4. override def open(parameters: Configuration): Unit = {
  5. super.open(parameters)
  6. // 命名状态变量的名字和状态变量的类型
  7. val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
  8. // 定义状态变量
  9. itemState = getRuntimeContext.getListState(itemsStateDesc)
  10. }
  11. override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  12. // 每条数据都保存到状态中
  13. itemState.add(input)
  14. // 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd 窗口的所有商品数据
  15. // 也就是当程序看到 windowend + 1 的水位线 watermark 时,触发 onTimer 回调函数
  16. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  17. }
  18. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  19. // 获取收到的所有商品点击量
  20. val allItems: ListBuffer[ItemViewCount] = ListBuffer()
  21. import scala.collection.JavaConversions._
  22. for (item <- itemState.get) {
  23. allItems += item
  24. }
  25. // 提前清除状态中的数据,释放空间
  26. itemState.clear()
  27. // 按照点击量从大到小排序
  28. val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
  29. // 将排名信息格式化成 String, 便于打印
  30. val result: StringBuilder = new StringBuilder
  31. result.append("====================================\n")
  32. result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  33. for(i <- sortedItems.indices){
  34. val currentItem: ItemViewCount = sortedItems(i)
  35. // e.g. No1 : 商品 ID=12224 浏览量 =2413
  36. result.append("No").append(i+1).append(":")
  37. .append(" 商品 ID=").append(currentItem.itemId)
  38. .append(" 浏览量=").append(currentItem.count).append("\n")
  39. }
  40. result.append("====================================\n\n")
  41. // 控制输出频率,模拟实时滚动结果
  42. Thread.sleep(1000)
  43. out.collect(result.toString)
  44. }
  45. }

最后我们可以在 main 函数中将结果打印输出到控制台,方便实时观测:
.print();至此整个程序代码全部完成,我们直接运行 main 函数,就可以在控制台看到不断输出的各个时间点统计出的热门商品。

完整代码

最终完整代码如下:

  1. case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
  2. case class ItemViewCount(itemId: Long, windowEnd: Long, count: Long)
  3. object HotItems {
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env
  9. .readTextFile("YOUR_PATH\\resources\\UserBehavior.csv")
  10. .map(line => {
  11. val linearray = line.split(",")
  12. UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt, linearray(3), linearray(4).toLong)
  13. })
  14. .assignAscendingTimestamps(_.timestamp * 1000)
  15. .filter(_.behavior=="pv")
  16. .keyBy("itemId")
  17. .timeWindow(Time.minutes(60), Time.minutes(5))
  18. .aggregate(new CountAgg(), new WindowResultFunction())
  19. .keyBy(1)
  20. .process(new TopNHotItems(3))
  21. .print()
  22. env.execute("Hot Items Job")
  23. }
  24. // COUNT 统计的聚合函数实现,每出现一条记录加一
  25. class CountAgg extends AggregateFunction[UserBehavior, Long, Long] {
  26. override def createAccumulator(): Long = 0L
  27. override def add(userBehavior: UserBehavior, acc: Long): Long = acc + 1
  28. override def getResult(acc: Long): Long = acc
  29. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  30. }
  31. // 用于输出窗口的结果
  32. class WindowResultFunction extends WindowFunction[Long, ItemViewCount, Tuple, TimeWindow] {
  33. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[ItemViewCount]) : Unit = {
  34. val itemId: Long = key.asInstanceOf[Tuple1[Long]].f0
  35. val count = aggregateResult.iterator.next
  36. collector.collect(ItemViewCount(itemId, window.getEnd, count))
  37. }
  38. }
  39. // 求某个窗口中前 N 名的热门点击商品,key 为窗口时间戳,输出为 TopN 的结果字符串
  40. class TopNHotItems(topSize: Int) extends KeyedProcessFunction[Tuple, ItemViewCount, String] {
  41. private var itemState : ListState[ItemViewCount] = _
  42. override def open(parameters: Configuration): Unit = {
  43. super.open(parameters)
  44. // 命名状态变量的名字和状态变量的类型
  45. val itemsStateDesc = new ListStateDescriptor[ItemViewCount]("itemState-state", classOf[ItemViewCount])
  46. // 从运行时上下文中获取状态并赋值
  47. itemState = getRuntimeContext.getListState(itemsStateDesc)
  48. }
  49. override def processElement(input: ItemViewCount, context: KeyedProcessFunction[Tuple, ItemViewCount, String]#Context, collector: Collector[String]): Unit = {
  50. // 每条数据都保存到状态中
  51. itemState.add(input)
  52. // 注册 windowEnd+1 的 EventTime Timer,当触发时,说明收齐了属于windowEnd 窗口的所有商品数据
  53. // 也就是当程序看到 windowend + 1 的水位线 watermark 时,触发 onTimer 回调函数
  54. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  55. }
  56. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, ItemViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  57. // 获取收到的所有商品点击量
  58. val allItems: ListBuffer[ItemViewCount] = ListBuffer()
  59. import scala.collection.JavaConversions._
  60. for (item <- itemState.get) {
  61. allItems += item
  62. }
  63. // 提前清除状态中的数据,释放空间
  64. itemState.clear()
  65. // 按照点击量从大到小排序
  66. val sortedItems = allItems.sortBy(_.count)(Ordering.Long.reverse).take(topSize)
  67. // 将排名信息格式化成 String, 便于打印
  68. val result: StringBuilder = new StringBuilder()
  69. result.append("====================================\n")
  70. result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  71. for(i <- sortedItems.indices){
  72. val currentItem: ItemViewCount = sortedItems(i)
  73. // e.g. No1: 商品 ID=12224 浏览量 =2413
  74. result.append("No").append(i+1).append(":")
  75. .append(" 商品 ID=").append(currentItem.itemId)
  76. .append(" 浏览量=").append(currentItem.count).append("\n")
  77. }
  78. result.append("====================================\n\n")
  79. // 控制输出频率,模拟实时滚动结果
  80. Thread.sleep(1000)
  81. out.collect(result.toString)
  82. }
  83. }
  84. }
更换 Kafka 作为数据源

实际生产环境中,我们的数据流往往是从 Kafka 获取到的。如果要让代码更贴近生产实际,我们只需将 source 更换为 Kafka 即可:

  1. val properties = new Properties()
  2. properties.setProperty("bootstrap.servers", "localhost:9092")
  3. properties.setProperty("group.id", "consumer-group")
  4. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  5. properties.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer")
  6. properties.setProperty("auto.offset.reset", "latest")
  7. val env = StreamExecutionEnvironment.getExecutionEnvironment
  8. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  9. env.setParallelism(1)
  10. val stream = env.addSource(new FlinkKafkaConsumer[String]("hotitems", new SimpleStringSchema(),properties))

当然,根据实际的需要,我们还可以将 Sink 指定为 Kafka、ES、Redis 或其它存储,这里就不一一展开实现了。

3、实时流量统计

模块创建和数据准备

在 UserBehaviorAnalysis 下新 建一个 maven module 作 为子项 目, 命名为 NetworkFlowAnalysis。在这个子模块中,我们同样并没有引入更多的依赖,所以也不需要改动 pom 文件。
在 src/main/目录下,将默认源文件目录 java 改名为 scala。将 apache 服务器的日志文件 apache.log 复制到资源文件目录 src/main/resources 下,我们将从这里读取数据。
当然,我们也可以仍然用 UserBehavior.csv 作为数据源,这时我们分析的就不是每一次对服务器的访问请求了,而是具体的页面浏览(“pv”)操作。

基于服务器 log 的热门页面浏览量统计

我们现在要实现的模块是 “实时流量统计”。对于一个电商平台而言,用户登录的入口流量、不同页面的访问流量都是值得分析的重要数据,而这些数据,可以简单地从 web 服务器的日志中提取出来。
我们在这里先实现“热门页面浏览数”的统计,也就是读取服务器日志中的每一行 log,统计在一段时间内用户访问每一个 url 的次数,然后排序输出显示。
具体做法为:每隔 5 秒,输出最近 10 分钟内访问量最多的前 N 个 URL。可以看出,这个需求与之前“实时热门商品统计”非常类似,所以我们完全可以借鉴此前的代码。
在 src/main/scala 下创建 NetworkFlow.scala 文件,新建一个单例对象。定义样例类 ApacheLogEvent,这是输入的日志数据流;另外还有 UrlViewCount,这是窗口操作统计的输出数据类型。在 main 函数中创建 StreamExecutionEnvironment 并做配置,然后从 apache.log 文件中读取数据,并包装成 ApacheLogEvent 类型。
需要注意的是,原始日志中的时间是“dd/MM/yyyy:HH:mm:ss”的形式,需要定义一个 DateTimeFormat 将其转换为我们需要的时间戳格式:

  1. .map(line => {
  2. val linearray = line.split(" ")
  3. val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  4. val timestamp = sdf.parse(linearray(3)).getTime
  5. ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
  6. })

完整代码如下:
NetworkFlowAnalysis/src/main/scala/NetworkFlow.scala

  1. case class ApacheLogEvent(ip: String, userId: String, eventTime: Long, method: String, url: String)
  2. case class UrlViewCount(url: String, windowEnd: Long, count: Long)
  3. object NetworkFlow{
  4. def main(args: Array[String]): Unit = {
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env
  9. // 以 window 下为例,需替换成自己的路径
  10. .readTextFile("YOUR_PATH\\resources\\apache.log")
  11. .map(line => {
  12. val linearray = line.split(" ")
  13. val simpleDateFormat = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
  14. val timestamp = simpleDateFormat.parse(linearray(3)).getTime
  15. ApacheLogEvent(linearray(0), linearray(2), timestamp, linearray(5), linearray(6))
  16. })
  17. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[ApacheLogEvent](Time.milliseconds(1000)) {
  18. override def extractTimestamp(t: ApacheLogEvent): Long = {
  19. t.eventTime
  20. }
  21. })
  22. .filter( data => {
  23. val pattern = "^((?!\\.(css|js)$).)*$".r
  24. (pattern findFirstIn data.url).nonEmpty
  25. })
  26. .keyBy("url")
  27. .timeWindow(Time.minutes(10), Time.seconds(5))
  28. .aggregate(new CountAgg(), new WindowResultFunction())
  29. .keyBy(1)
  30. .process(new TopNHotUrls(5))
  31. .print()
  32. env.execute("Network Flow Job")
  33. }
  34. class CountAgg extends AggregateFunction[ApacheLogEvent, Long, Long] {
  35. override def createAccumulator(): Long = 0L
  36. override def add(apacheLogEvent: ApacheLogEvent, acc: Long): Long = acc + 1
  37. override def getResult(acc: Long): Long = acc
  38. override def merge(acc1: Long, acc2: Long): Long = acc1 + acc2
  39. }
  40. class WindowResultFunction extends WindowFunction[Long, UrlViewCount, Tuple, TimeWindow] {
  41. override def apply(key: Tuple, window: TimeWindow, aggregateResult: Iterable[Long], collector: Collector[UrlViewCount]) : Unit = {
  42. val url: String = key.asInstanceOf[Tuple1[String]].f0
  43. val count = aggregateResult.iterator.next
  44. collector.collect(UrlViewCount(url, window.getEnd, count))
  45. }
  46. }
  47. class TopNHotUrls(topsize: Int) extends KeyedProcessFunction[Tuple, UrlViewCount, String] {
  48. private var urlState : ListState[UrlViewCount] = _
  49. override def open(parameters: Configuration): Unit = {
  50. super.open(parameters)
  51. val urlStateDesc = new ListStateDescriptor[UrlViewCount]("urlState-state", classOf[UrlViewCount])
  52. urlState = getRuntimeContext.getListState(urlStateDesc)
  53. }
  54. override def processElement(input: UrlViewCount, context: KeyedProcessFunction[Tuple, UrlViewCount, String]#Context, collector: Collector[String]): Unit = {
  55. // 每条数据都保存到状态中
  56. urlState.add(input)
  57. context.timerService.registerEventTimeTimer(input.windowEnd + 1)
  58. }
  59. override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Tuple, UrlViewCount, String]#OnTimerContext, out: Collector[String]): Unit = {
  60. // 获取收到的所有 URL 访问量
  61. val allUrlViews: ListBuffer[UrlViewCount] = ListBuffer()
  62. import scala.collection.JavaConversions._
  63. for (urlView <- urlState.get) {
  64. allUrlViews += urlView
  65. }
  66. // 提前清除状态中的数据,释放空间
  67. urlState.clear()
  68. // 按照访问量从大到小排序
  69. val sortedUrlViews = allUrlViews
  70. .sortBy(_.count)(Ordering.Long.reverse)
  71. .take(topSize)
  72. // 将排名信息格式化成 String,便于打印
  73. var result: StringBuilder = new StringBuilder
  74. result.append("====================================\n")
  75. result.append(" 时间: ").append(new Timestamp(timestamp - 1)).append("\n")
  76. for (i <- sortedUrlViews.indices) {
  77. val currentUrlView: UrlViewCount = sortedUrlViews(i)
  78. // e.g. No1: URL =/blog/tags/firefox?flav=rss20 流量 =55
  79. result.append("No").append(i+1).append(":")
  80. .append(" URL=").append(currentUrlView.url)
  81. .append(" 流量=").append(currentUrlView.count).append("\n")
  82. }
  83. result.append("====================================\n\n")
  84. // 控制输出频率,模拟实时滚动结果
  85. Thread.sleep(1000)
  86. out.collect(result.toString)
  87. }
  88. }
  89. }
基于埋点日志数据的网络流量统计

我们发现,从 web 服务器 log 中得到的 url,往往更多的是请求某个资源地址(/.js、/.css),如果要针对页面进行统计往往还需要进行过滤。而在实际电商应用中,相比每个单独页面的访问量,我们可能更加关心整个电商网站的网络流量。
这个指标,除了合并之前每个页面的统计结果之外,还可以通过统计埋点日志数据中的“pv”行为来得到。

网站总浏览量(PV )的统计

衡量网站流量一个最简单的指标,就是网站的页面浏览量(Page View,PV)。用户每次打开一个页面便记录 1 次 PV,多次打开同一页面则浏览量累计。一般来说,PV 与来访者的数量成正比,但是 PV 并不直接决定页面的真实来访者数量,如同一个来访者通过不断的刷新页面,也可以制造出非常高的 PV。
我们知道,用户浏览页面时,会从浏览器向网络服务器发出一个请求(Request),网络服务器接到这个请求后,会将该请求对应的一个网页(Page)发送给浏览器,从而产生了一个 PV。所以我们的统计方法,可以是从 web 服务器的日志中去提取对应的页面访问然后统计,就向上一节中的做法一样;也可以直接从埋点日志中提取用户发来的页面请求,从而统计出总浏览量。
所以,接下来我们用 UserBehavior.csv 作为数据源,实现一个网站总浏览量的统计。我们可以设置滚动时间窗口,实时统计每小时内的网站 PV。
在 src/main/scala 下创建 PageView.scala 文件,具体代码如下:
NetworkFlowAnalysis/src/main/scala/PageView.scala

  1. case class UserBehavior(userId: Long, itemId: Long, categoryId: Int, behavior: String, timestamp: Long)
  2. object PageView {
  3. def main(args: Array[String]): Unit = {
  4. val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env.readTextFile(resourcesPath.getPath)
  9. .map(data => {
  10. val dataArray = data.split(",")
  11. UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
  12. })
  13. .assignAscendingTimestamps(_.timestamp * 1000)
  14. .filter(_.behavior == "pv")
  15. .map(x => ("pv", 1))
  16. .keyBy(_._1)
  17. .timeWindow(Time.seconds(60 * 60))
  18. .sum(1)
  19. .print()
  20. env.execute("Page View Job")
  21. }
  22. }
网站独立访客数 (UV )的统计

在上节的例子中,我们统计的是所有用户对页面的所有浏览行为,也就是说,同一用户的浏览行为会被重复统计。而在实际应用中,我们往往还会关注,在一段时间内到底有多少不同的用户访问了网站。
另外一个统计流量的重要指标是网站的独立访客数(Unique Visitor,UV)。UV 指的是一段时间(比如一小时)内访问网站的总人数,1 天内同一访客的多次访问只记录为一个访客。通过 IP 和 cookie 一般是判断 UV 值的两种方式。当客户端第一次访问某个网站服务器的时候,网站服务器会给这个客户端的电脑发出一个 Cookie,通常放在这个客户端电脑的 C盘当中。在这个 Cookie中会分配一个独一无二的编号,这其中会记录一些访问服务器的信息,如访问时间,访问了哪些页面等等。当你下次再访问这个服务器的时候,服务器就可以直接从你的电脑中找到上一次放进去的 Cookie 文件,并且对其进行一些更新,但那个独一无二的编号是不会变的。
当然,对于 UserBehavior 数据源来说,我们直接可以根据 userId 来区分不同的用户。
在 src/main/scala 下创建 UniqueVisitor.scala 文件,具体代码如下:
NetworkFlowAnalysis/src/main/scala/UniqueVisitor.scala

  1. case class UvCount(windowEnd: Long, count: Long)
  2. object UniqueVisitor {
  3. def main(args: Array[String]): Unit = {
  4. val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
  5. val env = StreamExecutionEnvironment.getExecutionEnvironment
  6. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  7. env.setParallelism(1)
  8. val stream = env
  9. .readTextFile(resourcesPath.getPath)
  10. .map(line => {
  11. val linearray = line.split(",")
  12. UserBehavior(linearray(0).toLong, linearray(1).toLong, linearray(2).toInt,linearray(3), linearray(4).toLong)
  13. })
  14. .assignAscendingTimestamps(_.timestamp * 1000)
  15. .filter(_.behavior == "pv")
  16. .timeWindowAll(Time.seconds(60 * 60))
  17. .apply(new UvCountByWindow())
  18. .print()
  19. env.execute("Unique Visitor Job")
  20. }
  21. }
  22. class UvCountByWindow extends AllWindowFunction[UserBehavior, UvCount, TimeWindow] {
  23. override def apply(window: TimeWindow, input: Iterable[UserBehavior], out: Collector[UvCount]): Unit = {
  24. val s: collection.mutable.Set[Long] = collection.mutable.Set()
  25. var idSet = Set[Long]()
  26. for ( userBehavior <- input) {
  27. idSet += userBehavior.userId
  28. }
  29. out.collect(UvCount(window.getEnd, idSet.size))
  30. }
  31. }
使用布隆过滤器的 UV 统计

在上节的例子中,我们把所有数据的 userId 都存在了窗口计算的状态里,在窗口收集数据的过程中,状态会不断增大。一般情况下,只要不超出内存的承受范围,这种做法也没什么问题;但如果我们遇到的数据量很大呢?
把所有数据暂存放到内存里,显然不是一个好注意。我们会想到,可以利用 redis 这种内存级 k-v 数据库,为我们做一个缓存。但如果我们遇到的情况非常极端,数据大到惊人呢?比如上亿级的用户,要去重计算 UV。
如果放到 redis 中,亿级的用户 id(每个 20 字节左右的话)可能需要几 G 甚至几十 G 的空间来存储。当然放到 redis 中,用集群进行扩展也不是不可以,但明显代价太大了。
一个更好的想法是,其实我们不需要完整地存储用户 ID 的信息,只要知道他在不在就行了。所以其实我们可以进行压缩处理,用一位(bit)就可以表示一个用户的状态。这个思想的具体实现就是布隆过滤器(Bloom Filter)。
本质上布隆过滤器是一种数据结构,比较巧妙的概率型数据结构(probabilistic data structure),特点是高效地插入和查询,可以用来告诉你 “某样东西一定不存在或者可能存在”。
它本身是一个很长的二进制向量,既然是二进制的向量,那么显而易见的,存放的不是 0,就是 1。相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。
我们的目标就是,利用某种方法(一般是 Hash 函数)把每个数据,对应到一个位图的某一位上去;如果数据存在,那一位就是 1,不存在则为 0。
接下来我们就来具体实现一下。
注意这里我们用到了 redis 连接存取数据,所以需要加入 redis 客户端的依赖:

  1. <dependencies>
  2. <dependency>
  3. <groupId>redis.clients</groupId>
  4. <artifactId>jedis</artifactId>
  5. <version>2.8.1</version>
  6. </dependency>
  7. </dependencies>

在 src/main/scala 下创建 UniqueVisitor.scala 文件,具体代码如下:
NetworkFlowAnalysis/src/main/scala/UvWithBloom.scala

  1. object UvWithBloomFilter {
  2. def main(args: Array[String]): Unit = {
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment
  4. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  5. env.setParallelism(1)
  6. val resourcesPath = getClass.getResource("/UserBehaviorTest.csv")
  7. val stream = env
  8. .readTextFile(resourcesPath.getPath)
  9. .map(data => {
  10. val dataArray = data.split(",")
  11. UserBehavior(dataArray(0).toLong, dataArray(1).toLong, dataArray(2).toInt, dataArray(3), dataArray(4).toLong)
  12. })
  13. .assignAscendingTimestamps(_.timestamp * 1000)
  14. .filter(_.behavior == "pv")
  15. .map(data => ("dummyKey", data.userId))
  16. .keyBy(_._1)
  17. .timeWindow(Time.seconds(60 * 60))
  18. .trigger(new MyTrigger()) // 自定义窗口触发规则
  19. .process(new UvCountWithBloom()) // 自定义窗口处理规则
  20. stream.print()
  21. env.execute("Unique Visitor with bloom Job")
  22. }
  23. }
  24. // 自定义触发器
  25. class MyTrigger() extends Trigger[(String, Long), TimeWindow] {
  26. override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
  27. TriggerResult.CONTINUE
  28. }
  29. override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
  30. TriggerResult.CONTINUE
  31. }
  32. override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
  33. }
  34. override def onElement(element: (String, Long), timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext): TriggerResult = {
  35. // 每来一条数据,就触发窗口操作并清空
  36. TriggerResult.FIRE_AND_PURGE
  37. }
  38. }
  39. // 自定义窗口处理函数
  40. class UvCountWithBloom() extends ProcessWindowFunction[(String, Long), UvCount, String, TimeWindow] {
  41. // 创建 redis 连接
  42. lazy val jedis = new Jedis("localhost", 6379)
  43. lazy val bloom = new Bloom(1 << 29)
  44. override def process(key: String, context: Context, elements: Iterable[(String,Long)], out: Collector[UvCount]): Unit = {
  45. val storeKey = context.window.getEnd.toString
  46. var count = 0L
  47. if (jedis.hget("count", storeKey) != null) {
  48. count = jedis.hget("count", storeKey).toLong
  49. }
  50. val userId = elements.last._2.toString
  51. val offset = bloom.hash(userId, 61)
  52. val isExist = jedis.getbit(storeKey, offset)
  53. if (!isExist) {
  54. jedis.setbit(storeKey, offset, true)
  55. jedis.hset("count", storeKey, (count + 1).toString)
  56. out.collect(UvCount(storeKey.toLong, count + 1))
  57. } else {
  58. out.collect(UvCount(storeKey.toLong, count))
  59. }
  60. }
  61. }
  62. // 定义一个布隆过滤器
  63. class Bloom(size: Long) extends Serializable {
  64. private val cap = size
  65. def hash(value: String, seed: Int): Long = {
  66. var result = 0
  67. for (i <- 0 until value.length) {
  68. // 最简单的 hash 算法,每一位字符的 ascii 码值,乘以 seed 之后,做叠加
  69. result = result * seed + value.charAt(i)
  70. }
  71. (cap - 1) & result
  72. }
  73. }

发表评论

表情:
评论列表 (有 0 条评论,139人围观)

还没有评论,来说两句吧...

相关阅读