Structure streaming-去除重复数据

亦凉 2023-02-20 06:28 146阅读 0赞

根据唯一的 id 实现数据去重.
意思就是输出过的id,下次再次输入就不会显示了
数据`

  1. 1,2019-09-14 11:50:00,dog1
  2. 2,2019-09-14 11:51:00,dog2
  3. 1,2019-09-14 11:55:00,dog3
  4. 3,2019-09-14 11:53:00,dog4
  5. 1,2019-09-14 11:50:00,dog5
  6. 4,2019-09-14 11:45:00,dog6
  7. import java.sql.Timestamp
  8. import org.apache.spark.sql.SparkSession
  9. object Drop {
  10. def main(args: Array[String]): Unit = {
  11. val spark = SparkSession
  12. .builder()
  13. .master("local[*]")
  14. .appName("Test")
  15. .getOrCreate()
  16. import spark.implicits._
  17. val lines = spark.readStream
  18. .format("socket")
  19. .option("host", "hadoop102")
  20. .option("port", 10001)
  21. .load()
  22. val words = lines.as[String].map(line => {
  23. val arr = line.split(",")
  24. (arr(0), Timestamp.valueOf(arr(1)), arr(2))
  25. }).toDF("uid", "ts", "word")
  26. val wordcount = words
  27. .withWatermark("ts", "2 minutes")
  28. .dropDuplicates("uid")
  29. wordcount.writeStream
  30. .outputMode("append")
  31. .format("console")
  32. .start()
  33. .awaitTermination()
  34. }
  35. }

第一批
在这里插入图片描述
在这里插入图片描述
第二批
在这里插入图片描述
在这里插入图片描述
第三批
在这里插入图片描述
在这里插入图片描述
id 重复无输出
第 4 批: 3,2019-09-14 11:53:00,dog
省略
第 5 批: 1,2019-09-14 11:50:00,dog 数据重复, 并且数据过期, 所以无输出
第 6 批 4,2019-09-14 11:45:00,dog 数据过时, 所以无输出
注意
注意:

  1. dropDuplicates 不可用在聚合之后, 即通过聚合得到的 df/ds 不能调用 dropDuplicates
  2. 使用 watermask - 如果重复记录的到达时间有上限,则可以在事件时间列上定义水印, 并使用guid和事件时间列进行重复数据删除。该查询将使用水印从过去的记录中删除旧 的状态数据,这些记录不会再被重复。这限制了查询必须维护的状态量。
  3. 没有 watermask - 由于重复记录可能到达时没有界限,查询将来自所有过去记录的数据 存储为状态。

发表评论

表情:
评论列表 (有 0 条评论,146人围观)

还没有评论,来说两句吧...

相关阅读

    相关 mysql去除重复数据

    mysql去除重复数据 在crm中,电话号码就往往就代表了一个客户,所以往往电话号码不能出现重复数据,下面就根据客户表client的phone字段去除重复数据: