SparkStreaming-从kafka采集数据进行消费 ﹏ヽ暗。殇╰゛Y 2023-02-19 13:29 6阅读 0赞 用法及说明 在工程中需要引入 Maven 工件 spark- streaming-kafka\_2.10 来使用它。包内提供的 KafkaUtils 对象可以在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息创建出 DStream。由于 KafkaUtils 可以订阅多个主题,因此它创建出的 DStream 由成对的主题和消息组成。要创建出一个流数据,需要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(唯一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。 **`案例实操`** 需求:通过SparkStreaming从Kafka读取数据,并将读取过来的数据做简单计算(WordCount),最终打印到控制台。 (1)导入依赖 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.2</version> </dependency> (2)编写代码 import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{ Seconds, StreamingContext} object SparkStream_kafka { def main(args: Array[String]): Unit = { //使用SparkStream完成WordCount //Spark配置对象 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordcount") //实时数据分析的环境对象 //采集周期,以指定的时间为周期采集实时数据 val streamingContext = new StreamingContext(sparkConf, Seconds(5)) //从kafka中采集数据 val kafkaDStream = KafkaUtils.createStream( streamingContext, "hadoop102:2181", "donglin", Map("donglin" -> 3) ) //将采集的数据进行分解(扁平化) val wordDStream = kafkaDStream.flatMap(t => t._2.split(" ")) //将数据进行结构的转换,方便统计分析 val mapDStream = wordDStream.map((_, 1)) //将转换结构的数据进行聚合处理 val wordToSumDStream = mapDStream.reduceByKey(_ + _) //将结果打印出来 wordToSumDStream.print() //能停止采集程序 //启动采集器 streamingContext.start() //Driver等待采集器执行 streamingContext.awaitTermination() } }
还没有评论,来说两句吧...