Structured Streaming流数据Sink到Mysql

梦里梦外; 2022-03-27 01:36 308阅读 0赞

Structured Streaming写入Mysql

Structured Streaming在Write的过程,并没有提供jdbc的写入format格式。所以需要自己通过foreach自己实现。具体实现代码如下:

  • StructuredWriteMysql类

    package com.test

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.sql.SparkSession

  1. object StructuredWriteMysql {
  2. def main(args: Array[String]): Unit = {
  3. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  4. val spark = SparkSession.builder()
  5. .master("local")
  6. .appName("Test")
  7. .getOrCreate()
  8. val lines = spark.readStream
  9. .format("socket")
  10. .option("host", "localhost")
  11. .option("port", 9998)
  12. .load()
  13. lines.createOrReplaceTempView("tmp1")
  14. val lines2 = spark.sql("select split(value,',') as a from tmp1")
  15. lines2.createOrReplaceTempView("tmp2")
  16. val result = spark.sql("select a[0] as name, a[1] as age, a[2] as sex,a[3] as uuid from tmp2")
  17. val mysqlSink = new MysqlSink("jdbc:mysql://localhost:3306/test", "root", "root")
  18. val query = result.writeStream
  19. .outputMode("append")
  20. .foreach(mysqlSink)
  21. .start()
  22. query.awaitTermination()
  23. }
  24. }

open(),process(),close()会依次执行。


  • 自定义MysqlSink类

    package com.test

    import java.sql.{Connection, DriverManager}

    import org.apache.spark.sql.{ForeachWriter, Row}

    class MysqlSink(url: String, user: String, pwd: String) extends ForeachWriter[Row] {

    var conn: Connection = _

    override def open(partitionId: Long, epochId: Long): Boolean = {

    1. Class.forName("com.mysql.jdbc.Driver")
    2. conn = DriverManager.getConnection(url, user, pwd)
    3. true

    }

    override def process(value: Row): Unit = {

    1. val p = conn.prepareStatement("replace into people(name,age,sex,uuid) values(?,?,?,?)")
    2. p.setString(1, value(0).toString)
    3. p.setString(2, value(1).toString)
    4. p.setString(3, value(2).toString)
    5. p.setString(4, value(3).toString)
    6. p.execute()

    }

    override def close(errorOrNull: Throwable): Unit = {

    1. conn.close()

    }
    }

直接运行,在nc -lk 9998中输入下面数据

  1. caocao,32,male,1001
  2. liubei,30,male,1002
  3. guanyu,28,male,1003

查询mysql:

image

github项目链接

发表评论

表情:
评论列表 (有 0 条评论,308人围观)

还没有评论,来说两句吧...

相关阅读