Flink-电商用户行为分析(订单支付实时监控-CEP实现)
数据
链接:https://pan.baidu.com/s/1nSMh3JaNDW1SheQ5I4J0FQ
提取码:e49w
在电商平台中,最终创造收入和利润的是用户下单购买的环节;更具体一点,是用户真正完成支付动作的时候。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未付,订单就会被取消。
需求
用户支付,订单15分钟内,查看支付成功或超时的代码实现
import java.util
import org.apache.flink.cep.{ PatternSelectFunction, PatternTimeoutFunction}
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
//定义输入订单事件的样例类
case class OrderEvent(orderId:Long,envenType:String,txId:String,eventTime:Long)
//定义输出结果样例类
case class OrderResult(orderId:Long,resultMsg:String)
object OrderTimeOut {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.setParallelism(1)
//1.读取订单数据
val orderEventStream = env.readTextFile("D:\\idea\\flinkUser\\LoginFailDelete\\src\\main\\resources\\OrderLog.csv")
.map(data=>{
val dataArray = data.split(",")
OrderEvent(dataArray(0).trim.toLong,dataArray(1).trim,dataArray(2).trim,dataArray(3).trim.toLong)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
//2.定义匹配模式 followedBy() 非严格模式
val orderPayPattern = Pattern.begin[OrderEvent]("begin").where(_.envenType == "create")
.followedBy("follow").where(_.envenType == "pay")
.within(Time.minutes(15))
//3.把模式应用到stream上,得到一个pattern stream
val patternStream = CEP.pattern(orderEventStream,orderPayPattern)
//4.调用select方法,提取时间序列,超时的事件要做报警提示
val orderTimeOutputTag = new OutputTag[OrderResult]("orderTimeOut")
val resultStream = patternStream.select(orderTimeOutputTag,
new OrderTimeOutSelect(),
new OrderPaySelect()
)
resultStream.print("payed")
resultStream.getSideOutput(orderTimeOutputTag).print("timeout")
env.execute("order timeout job")
}
}
//自定义超时事件序列处理函数
class OrderTimeOutSelect() extends PatternTimeoutFunction[OrderEvent,OrderResult]{
override def timeout(map: util.Map[String, util.List[OrderEvent]], l: Long): OrderResult = {
val timeoutOrderId = map.get("begin").iterator().next().orderId
OrderResult(timeoutOrderId,"timeout")
}
}
//自定义正常支付事件序列处理函数
class OrderPaySelect() extends PatternSelectFunction[OrderEvent,OrderResult]{
override def select(map: util.Map[String, util.List[OrderEvent]]): OrderResult = {
val payedOrderId = map.get("follow").iterator().next().orderId
OrderResult(payedOrderId,"payed successfully")
}
}
最终结果(在15中支付了的和没有支付超时了的)
还没有评论,来说两句吧...