大数据面试之Spark Streaming

港控/mmm° 2023-02-17 15:55 72阅读 0赞

大数据面试之Spark Streaming

    1. Spark Streaming
    • 1.1 Spark Streaming工作原理
    • 1.1 Spark Streaming如何读取Kafka中数据?
    • 1.2 Spark Streaming编写步骤

说明,感谢亮哥长期对我的帮助,此处多篇文章均为亮哥带我整理。以及参考诸多博主的文章。如果侵权,请及时指出,我会立马停止该行为;如有不足之处,还请大佬不吝指教,以期共同进步。

1. Spark Streaming

image

  1. Spark StreamingSpark Core的扩展应用,它具有可扩展,高吞吐量,对于流数据的可容错性等特点。可以监控来自Kafka,Flumn,HDFSKinesis,TwitterZeroMQ或者Scoket套接字的数据通过复杂的算法以及一系列的计算分析数据,并且可以将分析结果存入到HDFS文件系统,数据库以及前端页面中。
  2. - 高可扩展性,可以运行在上百台机器上(Scales to hundreds of nodes
  3. - 低延迟,可以在秒级别上对数据进行处理(Achieves low latency
  4. - 高可容错性(Efficiently recover from failures
  5. - 能够集成并行计算程序,比如Spark CoreIntegrates with batch and interactive processing

1.1 Spark Streaming工作原理

  1. 对于Spark Core它的核心就是RDD,对于Spark Streaming来说,它的核心是DStreamDStream类似于RDD,它实质上一系列的RDD的集合,DStream可以按照秒数将数据流进行批量的划分。首先从接收到流数据之后,将其划分为多个batch,然后提交给Spark集群进行计算,最后将结果批量输出到HDFS或者数据库以及前端页面展示等等

参考
参考

1.1 Spark Streaming如何读取Kafka中数据?

  1. 一种是利用接收器(receiver)和kafaka的高层API实现
  2. 一种是不利用接收器,直接用kafka底层的API来实现(spark1.3以后引入)
  3. Receiver 方式
  4. Receiver是使用Kafka的高层次Consumer API来实现的。receiverKafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
  5. 然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead LogWAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
  6. object KafkaRecciver {
  7. def main(args: Array[String]): Unit = {
  8. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  9. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  10. val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
  11. val ssc = new StreamingContext(conf, Seconds(5))
  12. ssc.checkpoint("hdfs://bigdata111:9000/checkpoint")
  13. //创建kafka对象 生产者 和消费者
  14. //模式1 采取的是 receiver 方式 reciver 每次只能读取一条记录
  15. val topic = Map("mydemo2" -> 1)
  16. //直接读取的方式 由于kafka 是分布式消息系统需要依赖Zookeeper
  17. val data = KafkaUtils.createStream(ssc, "192.168.128.111:2181", "mygroup", topic, StorageLevel.MEMORY_AND_DISK)
  18. //数据累计计算
  19. val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
  20. //进行数据统计当前值加上之前的值
  21. var total = curVal.sum
  22. //最初的值应该是0
  23. var previous = preVal.getOrElse(0)
  24. //Some 代表最终的返回值
  25. Some(total+previous)
  26. }
  27. val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
  28. //启动ssc
  29. ssc.start()
  30. ssc.awaitTermination()
  31. }
  32. Direct方式
  33. 这种新的直接方式,是在Spark1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batchoffset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
  34. object KafkaDirector {
  35. def main(args: Array[String]): Unit = {
  36. Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
  37. Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
  38. //构建conf ssc 对象
  39. val conf = new SparkConf().setAppName("Kafka_director").setMaster("local[2]")
  40. val ssc = new StreamingContext(conf,Seconds(3))
  41. //设置数据检查点进行累计统计单词
  42. ssc.checkpoint("hdfs://192.168.128.111:9000/checkpoint")
  43. //kafka 需要Zookeeper 需要消费者组
  44. val topics = Set("mydemo2")
  45. // broker的原信息 ip地址以及端口号
  46. val kafkaPrams = Map[String,String]("metadata.broker.list" -> "192.168.128.111:9092")
  47. // 数据的输入了类型 数据的解码类型
  48. val data = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaPrams, topics)
  49. val updateFunc =(curVal:Seq[Int],preVal:Option[Int])=>{
  50. //进行数据统计当前值加上之前的值
  51. var total = curVal.sum
  52. //最初的值应该是0
  53. var previous = preVal.getOrElse(0)
  54. //Some 代表最终的但会值
  55. Some(total+previous)
  56. }
  57. //统计结果
  58. val result = data.map(_._2).flatMap(_.split(" ")).map(word=>(word,1)).updateStateByKey(updateFunc).print()
  59. //启动程序
  60. ssc.start()
  61. ssc.awaitTermination()
  62. }

参考
参考
参考
参考
参考

1.2 Spark Streaming编写步骤

  1. 1. 通过创建输入DStream来定义输入源。
  2. 2. 通过对DStream应用转换操作和输出操作来丁意思流计算
  3. 3. streamingContext.start()来开始接收数据和处理流程
  4. 4. 通过streamingContext.awaitTermination()方法来等待处理结束(手动结束或因为错误而结束)
  5. 6. 可以通过streamingContext.stop()来手动结束流计算进程

发表评论

表情:
评论列表 (有 0 条评论,72人围观)

还没有评论,来说两句吧...

相关阅读