Spark/Streaming

我就是我 2022-08-02 12:53 337阅读 0赞

Spark Streaming 是基于Spark处理流式数据的框架,对数据进行实时处理。

Streaming

streaming将数据离散化,按照batch size分成一段段的Dstream,然后每一个Dstream转换为一个RDD,将RDD经过操作变成中间结果保存在内存中。整个流式计算可以对中间的结果进行叠加,或者存储到外部设备。

Spark Streaming将流式计算分解成多个Spark Job,对于每一段数据的处理都会经过Spark DAG图的分解,以及Spark的任务集的调度过程。

193919bdfdeegmekwhhh0t.png

val ssc=new StreamingContext( “Spark://“ ,”name”,Seconds(20))

数据源:

1.外部文件系统,如HDFS。Streaming可以监控一个目录中新产生的数据,并及时处理。出错时重新读取数据。

2.网络系统,Kafka、Flume、TCP socket。Streaming会默认在两个不同节点加载数据到内存,一个节点出错,系统可以通过另一个节点的数据重算。

File System

streamingContext.fileStream(dataDirectory)

TCP socket

通过socket作为输入流,监听某个特定的端口,得到输入的DStream。

val lines=ssc.socketTextStream(serverIP,serverPort)

Kafka

KafkaUtils.createStream (streamingContext , kafkaParams,…)

Spark为每个input dstream运行一个receiver,这意味着多个input dstream 可以运行在多个core上并行读取。在kafka中,如果每个input dstream读取不同的topic,可以实现同时读取。

Spark Streaming 支持并使用的数据流为Dstream,类似于RDD。Dstream是一种连续的RDDs。

textFileStream,Spark Streaming 以文件系统作为输入流。

(1)path目录下的文件格式都是一样的。

(2)在这个目录下创建文件都是通过移动或者重命名的方式创建的

(3)创建文件后不能修改

操作DStream

Streaming中的Dstream支持两种操作:Transformation和output。

1.Transformation,对离散化数据集进行处理

window( windowlength , slideInterval)

repartition(numPartitions) 增加分区,提高并行度

union(otherstream) 合并两个流

join(otherstream ,[numTasks]) 把(K,V)和(K,W)的Dstream连接成一个(K,(V,W))的新的Dstream

cogroup(otherstream ,[numTasks]) 把(K,V)和(K,W)的Dstream连接成一个(K,Seq[V],Seq[W])的新的Dstream

2.Output操作

print() 打印到控制台

foreachRDD(func) 对Dstream里面的每个RDD执行func,保存到外部系统

3.窗口操作

窗口操作涉及两个参数:滑动窗口的宽度window Duration,窗口滑动频率Slide Duration,这两个参数必须是batch size的倍数。

reduceByKeyAndWindow(_+_,Seconds(5s) , seconds(1)) 以过去5s作为一个输入窗口,每1s执行一次,然后将过去5s的执行结果统计叠加。

4.状态操作

UpdateStateByKey 保存状态信息,使用该操作持续更新状态数值。

(1) 定义状态state,state可以是任意类型的数据类型。

(2) 定义状态更新函数,从一个状态更改到新状态。

状态更新函数对里面的每个元素调用一下更新函数。

def updateFunction(newValues:Seq[Int] ,oldValues:Option[Int]): Option[Int] ={

  1. ...

Some(newValues+oldValues)

}

RDD检查点

状态的操作是基于多个批次的数据的。因为状态的操作要依赖于上一个批次的数据,所以要根据时间,不断积累数据。为了清空数据,支持周期性的检查点,通过把中间结果保存在hdfs上。通常,5-10秒的检查间隔时间是比较合适的。

ssc.checkpoint(hdfspath) //设置检查点的保存位置

dstream.checkpoint(checkpointInterval) //设置检查点间隔

代码总结

val sparkconf = new SparkConf()

//Seconds指定Spark Streaming 处理数据的时间间隔为30秒

val ssc=new StreamingContext(sparkconf, Seconds(30))

//监控目录

val lines=ssc.textFileStream(“file:///path”)

//启动监控

ssc.start()

//等待计算完毕退出

ssc.awaitTermination()

发表评论

表情:
评论列表 (有 0 条评论,337人围观)

还没有评论,来说两句吧...

相关阅读