Flink-电商用户行为分析(恶意登录监控-CEP实现)

冷不防 2023-02-25 07:10 131阅读 0赞

数据
链接:https://pan.baidu.com/s/1x1lJ1iuuefTn-tTGC6C3UQ
提取码:iz9n
需求
在 2 秒之内连续两次登录失败

  1. import java.util
  2. import org.apache.flink.cep.PatternSelectFunction
  3. import org.apache.flink.cep.scala.CEP
  4. import org.apache.flink.cep.scala.pattern.Pattern
  5. import org.apache.flink.streaming.api.TimeCharacteristic
  6. import org.apache.flink.streaming.api.scala._
  7. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  8. import org.apache.flink.streaming.api.windowing.time.Time
  9. //输出登录事件样例类
  10. case class LoginEvent(userId:Long,ip:String,eventType:String,eventTime:Long)
  11. //输出的异常报警信息样例类
  12. case class Warning(userId:Long,fistFailTime:Long,lastFailTime:Long,warningMsg:String)
  13. object LoginFailWithCep {
  14. def main(args: Array[String]): Unit = {
  15. val env = StreamExecutionEnvironment.getExecutionEnvironment
  16. env.setParallelism(1)
  17. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  18. //1.读取事件数据
  19. val resource = getClass.getResource("/LoginLog.csv")
  20. val loginEventStream = env.readTextFile(resource.getPath)
  21. .map(data=>{
  22. val dataArray = data.split(",")
  23. LoginEvent(dataArray(0).trim.toLong,dataArray(1).trim,dataArray(2).trim,dataArray(3).trim.toLong)
  24. })
  25. .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(10)) {
  26. override def extractTimestamp(t: LoginEvent): Long = t.eventTime * 1000L
  27. })
  28. .keyBy(_.userId)
  29. //2.定义匹配模式 next()是严格紧邻,意思就是连续
  30. val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
  31. .next("next").where(_.eventType == "fail")
  32. .within(Time.seconds(2))
  33. //3.在时间流上应用模式,得到一个pattern stream
  34. val patternSream = CEP.pattern(loginEventStream,loginFailPattern)
  35. //4.从pattern stream上应用select function ,检测匹配事件系列
  36. val loginFailDataStream = patternSream.select(new LoginFailMatch())
  37. loginFailDataStream.print()
  38. env.execute("login fail with cep job")
  39. }
  40. }
  41. class LoginFailMatch() extends PatternSelectFunction[LoginEvent,Warning]{
  42. override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
  43. //从map中按照名称取出对应的事件
  44. val firstFail = map.get("begin").iterator().next()
  45. val lastFail = map.get("next").iterator().next()
  46. Warning(firstFail.userId,firstFail.eventTime,lastFail.eventTime,"log fail!")
  47. }
  48. }

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,131人围观)

还没有评论,来说两句吧...

相关阅读