Structure streaming-file sink
存储输出到目录中 仅仅支持 append 模式
需求: 把单词和单词的反转组成 json 格式写入到目录中
import org.apache.spark.sql.SparkSession
object FileSink {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.master("local[1]")
.appName("Test")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "hadoop102")
.option("port", 10001)
.load()
val words = lines.as[String].flatMap(line => {
line.split(" ").map(word => {
(word, word.reverse)
})
}).toDF("原单词", "反单词")
words.writeStream
.outputMode("append")
.format("json") //支持json orc csv
.option("path","./filesink")
.option("checkpointLocation","./ck1") //必须指定checkpointLocation
.start()
.awaitTermination()
}
}
还没有评论,来说两句吧...