Flink-电商用户行为分析(恶意登录监控-CEP实现)
数据
链接:https://pan.baidu.com/s/1x1lJ1iuuefTn-tTGC6C3UQ
提取码:iz9n需求
在 2 秒之内连续两次登录失败
import java.util
import org.apache.flink.cep.PatternSelectFunction
import org.apache.flink.cep.scala.CEP
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.windowing.time.Time
//输出登录事件样例类
case class LoginEvent(userId:Long,ip:String,eventType:String,eventTime:Long)
//输出的异常报警信息样例类
case class Warning(userId:Long,fistFailTime:Long,lastFailTime:Long,warningMsg:String)
object LoginFailWithCep {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//1.读取事件数据
val resource = getClass.getResource("/LoginLog.csv")
val loginEventStream = env.readTextFile(resource.getPath)
.map(data=>{
val dataArray = data.split(",")
LoginEvent(dataArray(0).trim.toLong,dataArray(1).trim,dataArray(2).trim,dataArray(3).trim.toLong)
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[LoginEvent](Time.seconds(10)) {
override def extractTimestamp(t: LoginEvent): Long = t.eventTime * 1000L
})
.keyBy(_.userId)
//2.定义匹配模式 next()是严格紧邻,意思就是连续
val loginFailPattern = Pattern.begin[LoginEvent]("begin").where(_.eventType == "fail")
.next("next").where(_.eventType == "fail")
.within(Time.seconds(2))
//3.在时间流上应用模式,得到一个pattern stream
val patternSream = CEP.pattern(loginEventStream,loginFailPattern)
//4.从pattern stream上应用select function ,检测匹配事件系列
val loginFailDataStream = patternSream.select(new LoginFailMatch())
loginFailDataStream.print()
env.execute("login fail with cep job")
}
}
class LoginFailMatch() extends PatternSelectFunction[LoginEvent,Warning]{
override def select(map: util.Map[String, util.List[LoginEvent]]): Warning = {
//从map中按照名称取出对应的事件
val firstFail = map.get("begin").iterator().next()
val lastFail = map.get("next").iterator().next()
Warning(firstFail.userId,firstFail.eventTime,lastFail.eventTime,"log fail!")
}
}
还没有评论,来说两句吧...