六、Structured Streaming Sink到mysql
1、Spark2.4中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink
2、要使用ForeachSink自定义sink,必须实现ForeachWriter[T](),包括open(),process(),close()三个方法:
3、在每个batch中,这三个方法各调用一次,相当每批数据调用一次。
class redisSink extends ForeachWriter[Row](){
override def open(partitionId: Long, version: Long): Boolean ={
//这个方法进行一些初始化,如redis,获取连接
}
override def process(value: Row): Unit ={
//具体的处理逻辑,写数据到数据库中
}
override def close(errorOrNull: Throwable): Unit = {
//关闭连接
}
4、sink到mysql实例
object ForeachSinkApp {
def main(args: Array[String]): Unit = {
val window_size = 10//args(0) //窗口长度
//窗口滑动距离应当小于或等于窗口长度
val slide_size = 5//if (args.length == 2) args(1).toInt else window_size
if (slide_size > window_size) throw new Exception("===wrong paragrams===")
val windowDuration = s"$window_size seconds"
val slideDuration = s"$slide_size seconds"
Logger.getLogger("org").setLevel(Level.ERROR)
val sqLContext = SparkSession.builder().appName(" event-time-window_App").master("local[*]")
.getOrCreate()
import sqLContext.implicits._
var batchId: Long = 0
//对查询添加一个监听,获取每个批次的处理信息
sqLContext.streams.addListener(new StreamingQueryListener() {
override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
val progress: StreamingQueryProgress = event.progress
batchId = progress.batchId
val inputRowsPerSecond: Double = progress.inputRowsPerSecond
val processRowsPerSecond: Double = progress.processedRowsPerSecond
val numInputRows: Long = progress.numInputRows
println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
" processRowsPerSecond=" + processRowsPerSecond)
}
override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
})
//使用structuredStreaming自带的Source产生数据
//|-- timestamp: timestamp (nullable = true)
// |-- value: long (nullable = true)
val rateSource: DataFrame = sqLContext.readStream
.format("rate")
.option("rowsPerSecond", 100)
.load()
//增加一列 batchId
val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
tuple
}).toDF("batchId","timestamp","num")
// 过滤
// 1.打印到控制台
// val resultDS: Dataset[Row] = addDF.filter("num%2=0")
// resultDS.writeStream.format("console")
// .foreachBatch((df,id)=>{
// println(s"batchid = $id")
// df.show()
// }).start().awaitTermination()
// 2.写出到mysql
val resultDS: Dataset[Row] = addDF.filter("num%2=0")
resultDS.writeStream.format("update")
.foreach(new MySqlSink)
.start().awaitTermination()
}
}
MySqlSink类
class MySqlSink extends ForeachWriter[Row]{
var conn:Connection= _
var ps:PreparedStatement = _
val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
override def open(partitionId: Long, version: Long): Boolean ={
conn = getConnection()
val sql = "insert into structured_stream_sink(batch_id,timestamp,num) values(?, ?, ?);"
ps = conn.prepareStatement(sql)
true
}
def getConnection():Connection= {
var con :Connection = null
try {
Class.forName("com.mysql.jdbc.Driver")
con = DriverManager.getConnection("jdbc:mysql://192.168.120.158:3306/test_sun?useUnicode=true&characterEncoding=UTF-8", "root", "1qaz@WSX3edc")
} catch {
case e:Exception =>System.out.println("-----------mysql get connection has exception , structured_stream_sink = "+ e.getMessage)
}
con
}
override def process(value: Row): Unit ={
ps.setObject(1,value.get(0))
ps.setObject(2,value.get(1))
ps.setObject(3,value.get(2))
val i: Int = ps.executeUpdate()
println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
}
override def close(errorOrNull: Throwable): Unit = {
//关闭连接和释放资源
if (conn != null) {
conn.close()
}
if (ps != null) {
ps.close()
}
}
}
还没有评论,来说两句吧...