Flume案例四:实时监控目录下多个新文件(Spooling Directory Source)

我会带着你远行 2022-10-09 12:10 383阅读 0赞

本文接上篇博客:Flume介绍、安装、使用案例、自定义Source/Sink、监控
Flume 版本:1.9.0
本文hdfs sink,需 Hadoop 支持,Hadoop相关内容,请参考:Hadoop专栏

1.实时监控目录下多个新文件

选型:spooling directory source + memory channel + hdfs sink

文档参考:
spooling directory source:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#spooling-directory-source
memory channel:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#memory-channel
hdfs sink:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#hdfs-sink

提示:
  Exec source 适用于监控一个实时追加的文件,但不能保证数据不丢失;Spooling Directory Source 能够保证数据不丢失,且能够实现断点续传,但延迟较高,不能实时监控;而 Taildir Source 既能够实现断点续传,又可以保证数据不丢失,还能够进行实时监控,集两者优点于一身,更推荐使用Taildir Source。

推荐:
  如果你要使用 Spooling Directory Source,生产上更推荐使用 Taildir source,本文仅做了解学习使用。Taildir Source 参考:https://blog.csdn.net/lzb348110175/article/details/118189312

2.需求图示

在这里插入图片描述

3.flume配置

flume-spooldir-hdfs.conf

  1. # Name the components on this agent
  2. a1.sources = r1
  3. a1.sinks = k1
  4. a1.channels = c1
  5. # Describe/configure the source
  6. a1.sources.r1.type = spooldir
  7. a1.sources.r1.spoolDir = /opt/module/testdir
  8. a1.sources.r1.fileHeader = true
  9. a1.sources.r1.fileSuffix = .COMPLETED
  10. #忽略所有以.tmp 结尾的文件,不上传
  11. a1.sources.r1.ignorePattern = ([^ ]*\.tmp)
  12. # Describe the sink
  13. a1.sinks.k1.type = hdfs
  14. a1.sinks.k1.hdfs.path = /flume/spooldir/%Y-%m-%d/%H
  15. a1.sinks.k1.hdfs.filePrefix = testlog
  16. # 是否使用本地时间戳
  17. a1.sinks.k1.hdfs.useLocalTimeStamp = true
  18. # 是否按照时间滚动文件夹
  19. a1.sinks.k1.hdfs.round = true
  20. # 多少时间单位创建一个新的文件夹
  21. a1.sinks.k1.hdfs.roundValue = 1
  22. # 重新定义时间单位
  23. a1.sinks.k1.hdfs.roundUnit = hour
  24. # 积攒多少个 Event 才 flush 到 HDFS 一次
  25. a2.sinks.k2.hdfs.batchSize = 1000
  26. # 多久生成一个新的文件(seconds)
  27. a1.sinks.k1.hdfs.rollInterval = 30
  28. # 设置每个文件的滚动大小
  29. a1.sinks.k1.hdfs.rollSize = 134217700
  30. # 文件的滚动与 Event 数量无关
  31. a1.sinks.k1.hdfs.rollCount = 0
  32. # 设置文件类型,可支持压缩(不加该配置的话,Flume写入HDFS的文件会出现SEQ !org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable)
  33. a1.sinks.k1.hdfs.fileType = DataStream
  34. # 关闭HDFS文件出错时重试次数,之前设为0即无限重试。现在改为2
  35. test.sinks.sink.hdfs.closeTries=2
  36. # Use a channel which buffers events in memory
  37. a1.channels.c1.type = memory
  38. a1.channels.c1.capacity = 1000
  39. a1.channels.c1.transactionCapacity = 100
  40. # Bind the source and sink to the channel
  41. a1.sources.r1.channels = c1
  42. a1.sinks.k1.channel = c1

4.启动命令

  1. bin/flume-ng agent -c conf -n a1 -f job/flume-spooldir-hdfs.conf

5.异常处理

写入HDFS,报如下错误:java.lang.NoSuchMethodError:com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

  这是因为hadoop目录下得guava版本和flume下的guava版本的问题。进入 flume/lib 目录下,将 guava-11.0.2.jar 包移除即可

6.测试图示

  Spooling Directory Source 实时监听 testdir 目录,提前准备好两个测试文件test001.txttest002.txt,测试 mv 移动文件到 testdir 目录下,flume 端能够正常接收到新增的文件,通过 hdfs sink方式,将文件内容写出到 /flume/spooldir 目录下,测试数据如图所示:
在这里插入图片描述

测试结果,如图所示:
在这里插入图片描述


博主写作不易,加个关注呗

求关注、求点赞,加个关注不迷路 ヾ(◍°∇°◍)ノ゙

我不能保证所写的内容都正确,但是可以保证不复制、不粘贴。保证每一句话、每一行代码都是亲手敲过的,错误也请指出,望轻喷 Thanks♪(・ω・)ノ

发表评论

表情:
评论列表 (有 0 条评论,383人围观)

还没有评论,来说两句吧...

相关阅读