Structured Streaming流数据Sink到Mysql
Structured Streaming写入Mysql
Structured Streaming在Write的过程,并没有提供jdbc的写入format格式。所以需要自己通过foreach自己实现。具体实现代码如下:
StructuredWriteMysql类
package com.test
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object StructuredWriteMysql {
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
val spark = SparkSession.builder()
.master("local")
.appName("Test")
.getOrCreate()
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9998)
.load()
lines.createOrReplaceTempView("tmp1")
val lines2 = spark.sql("select split(value,',') as a from tmp1")
lines2.createOrReplaceTempView("tmp2")
val result = spark.sql("select a[0] as name, a[1] as age, a[2] as sex,a[3] as uuid from tmp2")
val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/test", "root", "root")
val query = result.writeStream
.outputMode("append")
.foreach(mysqlSink)
.start()
query.awaitTermination()
}
}
open(),process(),close()会依次执行。
自定义MysqlSink类
package com.test
import java.sql.{Connection, DriverManager}
import org.apache.spark.sql.{ForeachWriter, Row}
class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {
var conn: Connection = _
override def open(partitionId: Long, epochId: Long): Boolean = {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, user, pwd)
true
}
override def process(value: Row): Unit = {
val p = conn.prepareStatement("replace into people(name,age,sex,uuid) values(?,?,?,?)")
p.setString(1, value(0).toString)
p.setString(2, value(1).toString)
p.setString(3, value(2).toString)
p.setString(4, value(3).toString)
p.execute()
}
override def close(errorOrNull: Throwable): Unit = {
conn.close()
}
}
直接运行,在nc -lk 9998
中输入下面数据
caocao,32,male,1001
liubei,30,male,1002
guanyu,28,male,1003
查询mysql:
github项目链接
还没有评论,来说两句吧...