六、Structured Streaming Sink到mysql

柔情只为你懂 2024-04-18 15:17 111阅读 0赞

1、Spark2.4中,StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink

2、要使用ForeachSink自定义sink,必须实现ForeachWriter[T](),包括open(),process(),close()三个方法:

3、在每个batch中,这三个方法各调用一次,相当每批数据调用一次。

  1. class redisSink extends ForeachWriter[Row](){
  2. override def open(partitionId: Long, version: Long): Boolean ={
  3. //这个方法进行一些初始化,如redis,获取连接
  4. }
  5. override def process(value: Row): Unit ={
  6. //具体的处理逻辑,写数据到数据库中
  7. }
  8. override def close(errorOrNull: Throwable): Unit = {
  9. //关闭连接
  10. }

4、sink到mysql实例

  1. object ForeachSinkApp {
  2. def main(args: Array[String]): Unit = {
  3. val window_size = 10//args(0) //窗口长度
  4. //窗口滑动距离应当小于或等于窗口长度
  5. val slide_size = 5//if (args.length == 2) args(1).toInt else window_size
  6. if (slide_size > window_size) throw new Exception("===wrong paragrams===")
  7. val windowDuration = s"$window_size seconds"
  8. val slideDuration = s"$slide_size seconds"
  9. Logger.getLogger("org").setLevel(Level.ERROR)
  10. val sqLContext = SparkSession.builder().appName(" event-time-window_App").master("local[*]")
  11. .getOrCreate()
  12. import sqLContext.implicits._
  13. var batchId: Long = 0
  14. //对查询添加一个监听,获取每个批次的处理信息
  15. sqLContext.streams.addListener(new StreamingQueryListener() {
  16. override def onQueryStarted(event: StreamingQueryListener.QueryStartedEvent): Unit = {}
  17. override def onQueryProgress(event: StreamingQueryListener.QueryProgressEvent): Unit = {
  18. val progress: StreamingQueryProgress = event.progress
  19. batchId = progress.batchId
  20. val inputRowsPerSecond: Double = progress.inputRowsPerSecond
  21. val processRowsPerSecond: Double = progress.processedRowsPerSecond
  22. val numInputRows: Long = progress.numInputRows
  23. println("batchId=" + batchId, " numInputRows=" + numInputRows + " inputRowsPerSecond=" + inputRowsPerSecond +
  24. " processRowsPerSecond=" + processRowsPerSecond)
  25. }
  26. override def onQueryTerminated(event: StreamingQueryListener.QueryTerminatedEvent): Unit = {}
  27. })
  28. //使用structuredStreaming自带的Source产生数据
  29. //|-- timestamp: timestamp (nullable = true)
  30. // |-- value: long (nullable = true)
  31. val rateSource: DataFrame = sqLContext.readStream
  32. .format("rate")
  33. .option("rowsPerSecond", 100)
  34. .load()
  35. //增加一列 batchId
  36. val addDF: DataFrame = rateSource.as[(Timestamp, Long)].map(x => {
  37. val tuple: (Long, Timestamp, Long) = (batchId, x._1, x._2)
  38. tuple
  39. }).toDF("batchId","timestamp","num")
  40. // 过滤
  41. // 1.打印到控制台
  42. // val resultDS: Dataset[Row] = addDF.filter("num%2=0")
  43. // resultDS.writeStream.format("console")
  44. // .foreachBatch((df,id)=>{
  45. // println(s"batchid = $id")
  46. // df.show()
  47. // }).start().awaitTermination()
  48. // 2.写出到mysql
  49. val resultDS: Dataset[Row] = addDF.filter("num%2=0")
  50. resultDS.writeStream.format("update")
  51. .foreach(new MySqlSink)
  52. .start().awaitTermination()
  53. }
  54. }

MySqlSink类

  1. class MySqlSink extends ForeachWriter[Row]{
  2. var conn:Connection= _
  3. var ps:PreparedStatement = _
  4. val sql="insert into rate(batchId,InputTimestamp,num) values(?,?,?)"
  5. override def open(partitionId: Long, version: Long): Boolean ={
  6. conn = getConnection()
  7. val sql = "insert into structured_stream_sink(batch_id,timestamp,num) values(?, ?, ?);"
  8. ps = conn.prepareStatement(sql)
  9. true
  10. }
  11. def getConnection():Connection= {
  12. var con :Connection = null
  13. try {
  14. Class.forName("com.mysql.jdbc.Driver")
  15. con = DriverManager.getConnection("jdbc:mysql://192.168.120.158:3306/test_sun?useUnicode=true&characterEncoding=UTF-8", "root", "1qaz@WSX3edc")
  16. } catch {
  17. case e:Exception =>System.out.println("-----------mysql get connection has exception , structured_stream_sink = "+ e.getMessage)
  18. }
  19. con
  20. }
  21. override def process(value: Row): Unit ={
  22. ps.setObject(1,value.get(0))
  23. ps.setObject(2,value.get(1))
  24. ps.setObject(3,value.get(2))
  25. val i: Int = ps.executeUpdate()
  26. println(i+" "+value.get(0)+" "+value.get(1)+" "+value.get(2))
  27. }
  28. override def close(errorOrNull: Throwable): Unit = {
  29. //关闭连接和释放资源
  30. if (conn != null) {
  31. conn.close()
  32. }
  33. if (ps != null) {
  34. ps.close()
  35. }
  36. }
  37. }

发表评论

表情:
评论列表 (有 0 条评论,111人围观)

还没有评论,来说两句吧...

相关阅读