Flume-实时监控单目录下的多个新文件案例,实时监控多目录下的多个追加文件案例

╰+哭是因爲堅強的太久メ 2022-11-25 13:09 288阅读 0赞

文章目录

    • 实时监控目录下多个新文件
    • 实时监控多目录下的多个追加文件

实时监控目录下多个新文件

1)案例需求:
使用flume监控某个目录下的日志文件,当某个目录下出现符合要求的文件名称的文件时,则对文件中的日志数据进行读取,并将数据最终写入到hdfs上

2)需求分析:
在这里插入图片描述
3)实现步骤:

(1)创建配置文件files-flume-hdfs.conf

创建一个文件

  1. [qinjl@hadoop102 job]$ vim files-flume-hdfs.conf

添加如下内容

  1. a3.sources = r3
  2. a3.sinks = k3
  3. a3.channels = c3
  4. # Describe/configure the source
  5. a3.sources.r3.type = spooldir
  6. a3.sources.r3.spoolDir = /opt/module/flume/upload
  7. # 每次读完,都会给读完的文件增加.COMPLETED后缀,从而忽略这些文件
  8. a3.sources.r3.fileSuffix = .COMPLETED
  9. #忽略所有以.tmp结尾的文件,不上传
  10. a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
  11. # Describe the sink
  12. a3.sinks.k3.type = hdfs
  13. a3.sinks.k3.hdfs.path = /flume/upload/%Y%m%d/%H
  14. #上传文件的前缀
  15. a3.sinks.k3.hdfs.filePrefix = upload-
  16. #是否按照时间滚动文件夹
  17. a3.sinks.k3.hdfs.round = true
  18. #多少时间单位创建一个新的文件夹
  19. a3.sinks.k3.hdfs.roundValue = 1
  20. #重新定义时间单位
  21. a3.sinks.k3.hdfs.roundUnit = hour
  22. #是否使用本地时间戳
  23. a3.sinks.k3.hdfs.useLocalTimeStamp = true
  24. #积攒多少个Event才flush到HDFS一次
  25. a3.sinks.k3.hdfs.batchSize = 100
  26. #设置文件类型,可支持压缩,默认的数据流的格式为SequenceFile
  27. a3.sinks.k3.hdfs.fileType = DataStream
  28. #多久生成一个新的文件
  29. a3.sinks.k3.hdfs.rollInterval = 60
  30. #设置每个文件的滚动大小大概是128M
  31. a3.sinks.k3.hdfs.rollSize = 134217700
  32. #文件的滚动与Event数量无关
  33. a3.sinks.k3.hdfs.rollCount = 0
  34. # Use a channel which buffers events in memory
  35. a3.channels.c3.type = memory
  36. a3.channels.c3.capacity = 1000
  37. a3.channels.c3.transactionCapacity = 100
  38. # Bind the source and sink to the channel
  39. a3.sources.r3.channels = c3
  40. a3.sinks.k3.channel = c3

在这里插入图片描述
(2)启动监控文件夹命令

  1. [qinjl@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/files-flume-hdfs.conf
  • 说明:在使用Spooling Directory Source时,不要在监控目录中创建并持续修改文件;上传完成的文件会以.COMPLETED结尾;被监控文件夹每500毫秒扫描一次文件变动。

(3)向upload文件夹中添加文件

在/opt/module/flume目录下创建upload文件夹

  1. [qinjl@hadoop102 flume]$ mkdir upload

向upload文件夹中添加文件

  1. [qinjl@hadoop102 upload]$ touch qinjl.txt
  2. [qinjl@hadoop102 upload]$ touch qinjl.tmp
  3. [qinjl@hadoop102 upload]$ touch qinjl.log

(4)查看HDFS上的数据
在这里插入图片描述
(5)等待1s,再次查询upload文件夹

  1. [qinjl@hadoop102 upload]$ ll
  2. 总用量 0
  3. -rw-rw-r--. 1 qinjl qinjl 0 5 20 22:31 qinjl.log.COMPLETED
  4. -rw-rw-r--. 1 qinjl qinjl 0 5 20 22:31 qinjl.tmp
  5. -rw-rw-r--. 1 qinjl qinjl 0 5 20 22:31 qinjl.txt.COMPLETED

实时监控多目录下的多个追加文件

Exec source 适用于监控一个实时追加的文件,不能实现断点续传;Spooldir Source 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步;而Taildir Source适合用于监听多个实时追加的文件,并且能够实现断点续传。

1)案例需求:
使用Flume监听整个目录的实时追加文件,并上传至HDFS

2)需求分析:
在这里插入图片描述

3)3)实现步骤:

(1)创建配置文件taildir-flume-hdfs.conf

创建一个文件

  1. [qinjl@hadoop102 job]$ vim taildir-flume-hdfs.conf

添加如下内容

  1. a3.sources = r3
  2. a3.sinks = k3
  3. a3.channels = c3
  4. # Describe/configure the source
  5. a3.sources.r3.type = TAILDIR
  6. a3.sources.r3.positionFile = /opt/module/flume/tail_dir.json
  7. a3.sources.r3.filegroups = f1 f2
  8. a3.sources.r3.filegroups.f1 = /opt/module/file1/file1.txt
  9. a3.sources.r3.filegroups.f2 = /opt/module/file2/file2.txt
  10. # Describe the sink
  11. a3.sinks.k3.type = hdfs
  12. a3.sinks.k3.hdfs.path =/flume/upload/%Y%m%d/%H
  13. #上传文件的前缀
  14. a3.sinks.k3.hdfs.filePrefix = upload-
  15. #是否按照时间滚动文件夹
  16. a3.sinks.k3.hdfs.round = true
  17. #多少时间单位创建一个新的文件夹
  18. a3.sinks.k3.hdfs.roundValue = 1
  19. #重新定义时间单位
  20. a3.sinks.k3.hdfs.roundUnit = hour
  21. #是否使用本地时间戳
  22. a3.sinks.k3.hdfs.useLocalTimeStamp = true
  23. #积攒多少个Event才flush到HDFS一次
  24. a3.sinks.k3.hdfs.batchSize = 100
  25. #设置文件类型,可支持压缩
  26. a3.sinks.k3.hdfs.fileType = DataStream
  27. #多久生成一个新的文件
  28. a3.sinks.k3.hdfs.rollInterval = 60
  29. #设置每个文件的滚动大小大概是128M
  30. a3.sinks.k3.hdfs.rollSize = 134217000
  31. #文件的滚动与Event数量无关
  32. a3.sinks.k3.hdfs.rollCount = 0
  33. # Use a channel which buffers events in memory
  34. a3.channels.c3.type = memory
  35. a3.channels.c3.capacity = 1000
  36. a3.channels.c3.transactionCapacity = 100
  37. # Bind the source and sink to the channel
  38. a3.sources.r3.channels = c3
  39. a3.sinks.k3.channel = c3

(2)启动监控文件夹命令

  1. [qinjl@hadoop102 flume]$ bin/flume-ng agent --conf conf/ --name a3 --conf-file job/taildir-flume-hdfs.conf

(3)向files文件夹中追加内容

在/opt/module/flume目录下创建file1和file2文件夹

  1. [qinjl@hadoop102 flume]$ mkdir file1
  2. [qinjl@hadoop102 flume]$ mkdir file2

向file1和file2文件夹中添加文件

  1. [qinjl@hadoop102 file1]$ echo hello >> file1.txt
  2. [qinjl@hadoop102 file2]$ echo world >> file2.txt

(4)查看HDFS上的数据
在这里插入图片描述
Taildir说明:

Taildir Source维护了一个json格式的position File,其会定期的往position File中更新每个文件读取到的最新的位置(pos为偏移量),偏移量是在提交事务之后记录的,(即事务提交之后,才会记录偏移量,若事务回滚,则不记录),因此能够实现断点续传。Position File的格式如下:

  1. {
  2. "inode":2496272,"pos":12,"file":"/opt/module/flume/file1/file1.txt"}
  3. {
  4. "inode":2496275,"pos":12,"file":"/opt/module/flume/file2/file2.txt"}
  • 注:Linux中储存文件元数据的区域就叫做inode,每个inode都有一个号码,操作系统用inode号码来识别不同的文件,Unix/Linux系统内部不使用文件名,而使用inode号码来识别文件。

发表评论

表情:
评论列表 (有 0 条评论,288人围观)

还没有评论,来说两句吧...

相关阅读