大数据从0到1的完美落地之Flume案例1

短命女 2023-10-12 20:33 169阅读 0赞

2fc557fe1242d86443f8c310f7ffeff5.png

案例演示

案例演示:Avro+Memory+Logger

Avro Source:监听一个指定的Avro端口,通过Avro端口可以获取到Avro client发送过来的文件,即只要应用程序通过Avro端口发送文件,source组件就可以获取到该文件中的内容,输出位置为Logger

编写采集方案
  1. [root@qianfeng01 flume-1.9.0]# mkdir flumeconf
  2. [root@qianfeng01 flume-1.9.0]# cd flumeconf
  3. [root@qianfeng01 flumeconf]# vi avro-logger.conf
  4. #定义各个组件的名字
  5. a1.sources=avro-sour1
  6. a1.channels=mem-chan1
  7. a1.sinks=logger-sink1
  8. #定义sources组件的相关属性
  9. a1.sources.avro-sour1.type=avro
  10. a1.sources.avro-sour1.bind=qianfeng01
  11. a1.sources.avro-sour1.port=9999
  12. #定义channels组件的相关属性
  13. a1.channels.mem-chan1.type=memory
  14. #定义sinks组件的相关属性
  15. a1.sinks.logger-sink1.type=logger
  16. a1.sinks.logger-sink1.maxBytesToLog=100
  17. #组件之间进行绑定
  18. a1.sources.avro-sour1.channels=mem-chan1
  19. a1.sinks.logger-sink1.channel=mem-chan1
  20. 复制代码
启动Agent
  1. [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./avro-logger.conf -n a1 -Dflume.root.logger=INFO,console
  2. 复制代码
测试数据
  1. [root@qianfeng01 ~]# mkdir flumedata
  2. [root@qianfeng01 ~]# cd flumedata/
  3. [root@qianfeng01 flumedata]#
  4. [root@qianfeng01 flumedata]# date >> test.data
  5. [root@qianfeng01 flumedata]# cat test.data
  6. 2019 11 21 星期四 21:22:36 CST
  7. [root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
  8. [root@qianfeng01 flumedata]# cat test.data
  9. ....省略....
  10. [root@qianfeng01 flumedata]# flume-ng avro-client -c /usr/local/flume-1.9.0/conf/ -H qianfeng01 -p 9999 -F ./test.data
  11. 复制代码
实时采集(监听文件):Exec+Memory+HDFS

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容

memory:传输数据的Channel为Memory

hdfs 是输出目标为Hdfs

配置方案
  1. [root@qianfeng01 flumeconf]# vi exec-hdfs.conf
  2. #定义各个组件的名字
  3. a1.sources=r1
  4. a1.channels=c1
  5. a1.sinks=k1
  6. a1.sources=r1
  7. a1.sources.r1.type=exec
  8. a1.sources.r1.command=tail -F /root/flumedata/test.data
  9. a1.channels=c1
  10. a1.channels.c1.type=memory
  11. #通道中可以保存的最大事件数量
  12. a1.channels.c1.capacity=1000
  13. #通道从一个source可以获取的最大事件数量或者每个事务中给一个sink的最大事件数量
  14. a1.channels.c1.transactionCapacity=100
  15. a1.sinks=k1
  16. a1.sinks.k1.type=hdfs
  17. a1.sinks.k1.hdfs.path=hdfs://qianfeng01:8020/flume/tailout/%y-%m-%d/%H%M/
  18. #设置文件的前缀
  19. a1.sinks.k1.hdfs.filePrefix=events-
  20. #时间戳是否四舍五入
  21. a1.sinks.k1.hdfs.round=true
  22. #时间戳舍入的最高位数
  23. a1.sinks.k1.hdfs.roundValue=10
  24. #时间戳舍入的单位
  25. a1.sinks.k1.hdfs.roundUnit=second
  26. #设置滚动的条件(关闭当前文件,开启新文件)---3秒钟滚动一次
  27. a1.sinks.k1.hdfs.rollInterval=3
  28. #设置滚动的条件---20字节
  29. a1.sinks.k1.hdfs.rollSize=20
  30. #设置滚动的条件---5个事件
  31. a1.sinks.k1.hdfs.rollCount=5
  32. #刷新进hdfs的事件数量
  33. a1.sinks.k1.hdfs.batchSize=100
  34. #是否使用本地时间戳(自定义拦截器中)---true是使用本地的
  35. a1.sinks.k1.hdfs.useLocalTimeStamp=true
  36. a1.sinks.k1.hdfs.fileType=DataStream
  37. a1.sources.r1.channels=c1
  38. a1.sinks.k1.channel=c1
  39. 复制代码
启动Agent
  1. [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./exec-hdfs.conf -n a1 -Dflume.root.logger=INFO,console
  2. 复制代码

报错解决:

  1. 报错:
  2. (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
  3. java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
  4. 原因:com.google.common.base.Preconditions.checkArgument 这是因为flume-1.9.0内依赖的guava-11.02.jarhadoop内的(guava-27.0-jre.jar)版本不一致造成的。
  5. 检验方法:
  6. 查看hadoop安装目录下share/hadoop/common/libguava.jar版本
  7. 查看Flume安装目录下libguava.jar的版本
  8. 如果两者不一致,删除版本低的,并拷贝高版本过去
  9. 复制代码
测试数据
  1. [root@qianfeng01 flumedata]# ping qianfeng01 >> test.data
  2. 复制代码
实时采集(监听文件) Exec+Memory+Logger

Exec Source:监听一个指定的命令,获取一条命令的结果作为它的数据源
#常用的是tail -F file指令,即只要应用程序向日志(文件)里面写数据,source组件就可以获取到日志(文件)中最新的内容 ,

logger为日志格式输出

配置方案
  1. [root@qianfeng01 flumeconf]# vi exec-logger.conf
  2. a2.sources = r1
  3. a2.channels = c1
  4. a2.sinks = s1
  5. a2.sources.r1.type = exec
  6. a2.sources.r1.command = tail -F /root/flumedata/log.01
  7. a2.channels.c1.type=memory
  8. a2.channels.c1.capacity=1000
  9. a2.channels.c1.transactionCapacity=100
  10. a2.channels.c1.keep-alive=3
  11. #通道中的事件总容量(byteCapacity)和预估总事件容量的百分比
  12. a2.channels.c1.byteCapacityBufferPercentage=20
  13. a2.channels.c1.byteCapacity=800000
  14. a2.sinks.s1.type=logger
  15. a2.sinks.s1.maxBytesToLog=16
  16. a2.sources.r1.channels=c1
  17. a2.sinks.s1.channel=c1
  18. 复制代码
启动agent
  1. [root@qianfeng01 flumeconf]# flume-ng agent -c ../conf -f ./exec-logger.conf -n a2 -Dflume.root.logger=INFO,console
  2. 复制代码
测试:
  1. [root@qianfeng01 ~]# echo "nice" >> /root/flumedata/log.01

更多大数据精彩内容欢迎B站搜索“千锋教育”或者扫码领取全套资料

发表评论

表情:
评论列表 (有 0 条评论,169人围观)

还没有评论,来说两句吧...

相关阅读