使用kafka消费flume的数据

喜欢ヅ旅行 2023-06-14 10:38 111阅读 0赞

本篇文章将在Apache Flume介绍和使用案例三这篇文章的基础上将logger sink修改为kafka sink(即整合flume到kafka完成实时数据的采集)

1. 先说一下,为什么要使用 Flume + Kafka?

以实时流处理项目为例,由于采集的数据量可能存在峰值和峰谷,假设是一个电商项目,那么峰值通常出现在秒杀时,这时如果直接将 Flume 聚合后的数据输入到 Storm 等分布式计算框架中,可能就会超过集群的处理能力,这时采用 Kafka 就可以起到削峰的作用。Kafka 天生为大数据场景而设计,具有高吞吐的特性,能很好地抗住峰值数据的冲击。

2. 大体流程如图所示:

在这里插入图片描述

将配置文件:avro-memory-logger.conf

  1. avro-memory-logger.sources = avro-source
  2. avro-memory-logger.sinks = logger-sink
  3. avro-memory-logger.channels = memory-channel
  4. avro-memory-logger.sources.avro-source.type = avro
  5. avro-memory-logger.sources.avro-source.bind = hadoop000
  6. avro-memory-logger.sources.avro-source.port = 44444
  7. avro-memory-logger.sinks.logger-sink.type = logger
  8. avro-memory-logger.channels.memory-channel.type = memory
  9. avro-memory-logger.sources.avro-source.channels = memory-channel
  10. avro-memory-logger.sinks.logger-sink.channel = memory-channel

修改为avro-memory-kafka.conf

  1. avro-memory-kafka.sources = avro-source
  2. avro-memory-kafka.sinks = kafka-sink
  3. avro-memory-kafka.channels = memory-channel
  4. avro-memory-kafka.sources.avro-source.type = avro
  5. avro-memory-kafka.sources.avro-source.bind = hadoop000
  6. avro-memory-kafka.sources.avro-source.port = 44444
  7. avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
  8. avro-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
  9. avro-memory-kafka.sinks.kafka-sink.topic = hello_topic
  10. avro-memory-kafka.sinks.kafka-sink.batchSize=5
  11. avro-memory-kafka.sinks.kafka-sink.requiredAcks=1
  12. avro-memory-kafka.channels.memory-channel.type = memory
  13. avro-memory-kafka.sources.avro-source.channels = memory-channel
  14. avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

1. 启动zookeeper

  1. zkServer.sh start

2. 启动kafka

  1. kafka-server-start.sh $KAFKA_HOME/config/server.properties

3. 启动:Flume avro-memory-kafka

  1. flume-ng agent \
  2. --name avro-memory-kafka \
  3. --conf conf \
  4. --conf-file $FLUME_HOME/conf/avro-memory-kafka.conf
  5. -Dflume.root.logger=INFO,console

4. 启动:Flume exec-memory-avro

  1. flume-ng agent \
  2. --name exec-memory-avro \
  3. --conf conf \
  4. --conf-file $FLUME_HOME/conf/exec-memory-avro.conf
  5. -Dflume.root.logger=INFO,console

最后启动消费者

  1. kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic #这里的hello_topic配置在`avro-memory-kafka.conf`

测试:增加两条信息促发Flume采集

  1. [hadoop@hadoop000 data]$ echo hello hadoop >> data.log
  2. [hadoop@hadoop000 data]$ echo hello spark >> data.log

并检测到kafka消费者消费信息

  1. [hadoop@hadoop000 ~]$ kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic
  2. hello hadoop
  3. hello spark

参考:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-sink

发表评论

表情:
评论列表 (有 0 条评论,111人围观)

还没有评论,来说两句吧...

相关阅读

    相关 java使用kafka消费topic数据

    前言 在之前的篇章中,我们探讨了关于kafka的生产端的相关理论知识,本篇结合代码来演示下,在java代码中,如何消费某个topic的数据 环境准备 1、提前安装好k

    相关 查看kafka消费数据

    storm jar接收程序,如果指定了forceFromStart=false,则从最新的数据开始读,最新是指多长时间的,有具体的参数设置 如果指定了为true,则从最老的