Flink-电商用户行为分析(订单支付实时监控-CEP实现)

骑猪看日落 2023-02-25 07:12 125阅读 0赞

数据
链接:https://pan.baidu.com/s/1nSMh3JaNDW1SheQ5I4J0FQ
提取码:e49w
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未付,订单就会被取消。
需求
用户支付,订单15分钟内,查看支付成功或超时的
代码实现

  1. import java.util
  2. import org.apache.flink.cep.{ PatternSelectFunction, PatternTimeoutFunction}
  3. import org.apache.flink.cep.scala.CEP
  4. import org.apache.flink.cep.scala.pattern.Pattern
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.scala._
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. //定义输入订单事件的样例类
  9. case class OrderEvent(orderId:Long,envenType:String,txId:String,eventTime:Long)
  10. //定义输出结果样例类
  11. case class OrderResult(orderId:Long,resultMsg:String)
  12. object OrderTimeOut {
  13. def main(args: Array[String]): Unit = {
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  16. env.setParallelism(1)
  17. //1.读取订单数据
  18. val orderEventStream = env.readTextFile("D:\\idea\\flinkUser\\LoginFailDelete\\src\\main\\resources\\OrderLog.csv")
  19. .map(data=>{
  20. val dataArray = data.split(",")
  21. OrderEvent(dataArray(0).trim.toLong,dataArray(1).trim,dataArray(2).trim,dataArray(3).trim.toLong)
  22. })
  23. .assignAscendingTimestamps(_.eventTime * 1000L)
  24. .keyBy(_.orderId)
  25. //2.定义匹配模式 followedBy() 非严格模式
  26. val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.envenType == "create")
  27. .followedBy("follow").where(_.envenType == "pay")
  28. .within(Time.minutes(15))
  29. //3.把模式应用到stream上,得到一个pattern stream
  30. val patternStream = CEP.pattern(orderEventStream,orderPayPattern)
  31. //4.调用select方法,提取时间序列,超时的事件要做报警提示
  32. val orderTimeOutputTag = new OutputTag[OrderResult]("orderTimeOut")
  33. val resultStream = patternStream.select(orderTimeOutputTag,
  34. new OrderTimeOutSelect(),
  35. new OrderPaySelect()
  36. )
  37. resultStream.print("payed")
  38. resultStream.getSideOutput(orderTimeOutputTag).print("timeout")
  39. env.execute("order timeout job")
  40. }
  41. }
  42. //自定义超时事件序列处理函数
  43. class OrderTimeOutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{
  44. override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
  45. val timeoutOrderId = map.get("begin").iterator().next().orderId
  46. OrderResult(timeoutOrderId,"timeout")
  47. }
  48. }
  49. //自定义正常支付事件序列处理函数
  50. class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{
  51. override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
  52. val payedOrderId = map.get("follow").iterator().next().orderId
  53. OrderResult(payedOrderId,"payed successfully")
  54. }
  55. }

最终结果(在15中支付了的和没有支付超时了的)
在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,125人围观)

还没有评论,来说两句吧...

相关阅读