7.1 Spark-core概述
文章目录
- 1 环境准备
- 2 RDD
- 2.1 RDD 创建
- 2.1.1 从集合(内存)创建RDD
- 2.1.2 从外部存储(文件)创建RDD
- 2.1.3 从其他创建RDD
- 2.1.4 直接创建RDD(new)
- 2.2 RDD并行度 & 分区
- 2.3 RDD转换算子
- 2.3.1 Value类型
- 1 map
- 2 mapPartitions
- map & mapPartitions的区别
- 3 mapPartitionsWithIndex
- 4 flatMap 扁平映射
- 模式匹配
- 5 glom
- 6 groupBy
- 7 filter
- 8 sample
- 9 distinct
- reduceByKey aggregateByKey flodByKey combineByKey
- 10 coalesce
- 11 repartition
- Coalese & rapartition的区别
- 12 sortBY
- 13 pipe
- 2.3.2 双Value类型
- 15 union
- 16 substract
- 17 zip
- 两个RDD类型不相同
- 2.3.2 Key-Value类型
- 18 partitionBy
- 自定义分区器
- 19 reduceByKey
- 20 groupByKey
- reduceByKey & groupByKey的区别
- 21 aggregateByKey
- 22 foldByKey
- 23 combineByKey
- 24 sortByKey
- 25 join
- 26 leftOuterJoin
- 27 cogroup
- 2.4 RDD 行动算子
- 1 reduce
- 2 collect
- 3 count
- 4 first
- 5 take
- 6 takeOrdered
- 7 aggregate
- 8 fold
- 9 countByCount
- 10 save相关算子
- 11 foreach
- 广播变量
1 环境准备
[root@ifeng software]# tar -zxvf spark-2.4.6-bin-2.6.0-cdh5.16.2.tgz -C /home/ifeng/app
HS2/beeline
beeline ***
spark-class
spark-shell ***** 使用于测试
spark-submit ***** 提交Spark作业
spark-sql *****
Spark on YARN
start-shuffle-service.sh
stop-shuffle-service.sh
start-thriftserver.sh <== HiveServer2
stop-thriftserver.sh
start-history-server.sh
stop-history-server.sh
bin/spark-shell --master local[*]
sparkContext.textFile("hdfs://ifeng:9000/hdfsapi/wc.txt")
.flatMap(_.split(","))
.map((_,1))
.reduceByKey(_+_).collect()
2 RDD
/**
* A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
* partitioned collection of elements that can be operated on in parallel. This class contains the
* basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
* [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
* pairs, such as `groupByKey` and `join`;
*
* Internally, each RDD is characterized by five main properties:
*
* - A list of partitions 一堆分区构成
* - A function for computing each split 一个方法作用在一个分区上的
* - A list of dependencies on other RDDs 新的RDD依赖前者 R1----> R2 ----> R3
* - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
* kv 可以作用上分区器
* - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
* 作业调度到数据的最佳位置
*/
1 一堆分区构成
protected def getPartitions: Array[Partition]
2 一个方法作用在一个分区上的
def compute(split: Partition, context: TaskContext): Iterator[T]
3 新的RDD依赖前者 R1----> R2 ----> R3
protected def getDependencies: Seq[Dependency[_]] = deps
4 kv 可以作用上分区器
val partitioner: Option[Partitioner] = None
5 作业调度到数据的最佳位置
protected def getPreferredLocations(split: Partition): Seq[String] = Nil(空List)
case object Nil extends List[Nothing]
2.1 RDD 创建
Spark中创建RDD的方式分为四种:
2.1.1 从集合(内存)创建RDD
从集合中创建RDD 有parallelize 和 makeRDD 两个方法
parallelize
val rdd1 = sc.parallelize(List(1,2,3,4))
rdd1.collect().foreach(println)
makeRDD
val rdd2 = sc.makeRDD(List(1,2,3,4))
rdd2.collect().foreach(println)
从底层实现来看 ,makeRDD方法其实也是调用的parallelize方法
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
2.1.2 从外部存储(文件)创建RDD
本地文件系统、HSFS
sc.textFile("hdfs://ifeng:9000/hdfsapi/wc.txt")
.flatMap(_.split(","))
.map((_,1))
.reduceByKey(_+_).collect()
2.1.3 从其他创建RDD
2.1.4 直接创建RDD(new)
使用new的方式直接构造RDD,一般由Spark框架自身使用
2.2 RDD并行度 & 分区
默认,Spark将一个作业切分成多个,分发给Executor节点进行并行计算,能够并行计算的任务数量我们称为并行度
可以在创建RDD时指定并行度,并行度 != 切分任务的数量
import org.apache.spark.rdd.RDD
val dataRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4),4)
读取内存数据的时候,可以安装并行度进行分区操作,
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map {
i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
读文件,按照Hadoop文件读取的规则进行切片分区
切片规则和数据读取的规则有些差异
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0; // compute total size
for (FileStatus file: files) {
// check we have valid files
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
...
for (FileStatus file: files) {
...
if (isSplitable(fs, path)) {
long blockSize = file.getBlockSize();
long splitSize = computeSplitSize(goalSize, minSize, blockSize);
...
}
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
2.3 RDD转换算子
RDD根据处理方式不同将算子整体上划分为
- Value类型
- 双value类型
- Key-Value类型
2.3.1 Value类型
1 map
def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据烛台进行映射转换, 可以是 类型的转换 、 值的转换
默认分区数量不变
/*
* 1 map
* */
// 转换:旧RDD= => 算子 => 新RDD
val rdd1 = sc.parallelize(List(1, 2, 3, 4))
rdd1.map((i : Int) => {
i * 2})
rdd1.map((i : Int) => i * 2)
rdd1.map((i) => i * 2)
rdd1.map(i => i *2 )
rdd1.map(_ * 2)
map执行原理
分区按照分区内顺序执行 , 第一条数据走完了全部流程 ,第二条数据才会走
分区间的执行是没有顺序的 ,不确定
2 mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
将待处理的数据以分区为单位发送到计算节点进行处理
val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
datas => {
datas.filter(_==2)
}
)
RDD3.mapPartitions(_.filter(_==2)).collect()
map方法是全量数据的操作,不能丢失数据 不能做过滤
mapPartitions 一次性过去分区的所i有数据,可以执行迭代器集合的所有操作 过滤 max min…
返回值要是是迭代器 ,需要转换
map & mapPartitions的区别
map 一次只处理一个数据
mapPartitions算子每一次都处理一个分区的数据
如果有一个分区的数据没有完全处理完毕,那么所有的数据都不会释放,容易出现OOM
当内存足够的时候,推荐使用mapPartition
3 mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
处理的同时获取当前分区的索引 , 可以知道数据在哪个分区
dataRDD3.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
).collect()
RDD3.mapPartitionsWithIndex(_.map())
map综合练习
逻辑处理是正确的,但是需要返回iter ,if else返回的是() , 向上取就是Any
/*
* map 综合练习
* 获取第二个分区的数据
* */
val dataRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
val rdd = dataRDD.mapPartitionsWithIndex((index,iter) =>{
if(index == 2){
iter
}else{
Nil.iterator
}
})
4 flatMap 扁平映射
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
将数据进行扁平化后再进行映射处理,称为扁平映射
val dataRDD = sc.makeRDD(List(
List(1,2),List(3,4)
),1)
val dataRDD1 = dataRDD.flatMap(
list => list
).collect()
模式匹配
3 不为 List , 应该进行模式匹配
/*
* 将List(List(1,2),3,List(4,5)) 进行扁平化操作
* */
val rddList = sc.parallelize(List(List(1, 2), 3, List(4, 5)))
rddList.flatMap( data => {
data match{
case list: List[_] => list
case d => List(d)
}
})
5 glom
def glom(): RDD[Array[T]]
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val dataRDD = sc.makeRDD(List(
1,2,3,4
),1)
val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
/*
* glom
* 计算所有分区最大值求和
* */
val value = sc.parallelize(List(1, 2, 3, 4, 5, 7, 6, 8, 9), 3)
//将每个分区转换成为数组
val value1: RDD[Array[Int]] = value.glom()
//从数组中获取最大值
val value2: RDD[Int] = value1.map(x => x.max)
//取出最大值求和
println(value2.sum)
6 groupBy
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
根据直送的规则进行分区,分区默认不变,数据被打乱进行重新分组,此过程称为shuffle
极限情况下,数据有可能被分到同一个分区中
一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val dataRDD = sc.makeRDD(List(1,2,3,4),1)
val dataRDD1 = dataRDD.groupBy(
_%2
).collect()
- groupBy 方法根据指定的规则进行分组,指定的规则的返回值就是分组的key
groupBy 方法的返回值为元组
- 元组的第一个元素,表示分组的key
- 元组的第二个元素,表示相同key的数据形成的可迭代的集合
groupBy方法执行后 ,会将数据进行分组操作,但是分区是不会改变的
- 不同组的数据 会打乱 再分入到不同的分区当中
- shuffle可能会存在空的分区,导致数据不均匀
- 可以传参数决定下游分区的数量
分组减少了吗???
打印结果为 3个分区
会产生一个空的分区
把上游分区的数据重新打乱组合到下游的分区当中,称为 shuffle
/*
* 2 groupBy
* */
fileRDD.flatMap(_.split(","))
.groupBy(x => x) //(ifeng,CompactBuffer(ifeng, ifeng, ifeng, ifeng, ifeng, ifeng))
.map {
case (word, iter) => {
(word, iter.size)
}
}.collect().mkString(",")
// .map(x => {
// (x._1 , x._2.size)
// }).foreach(println)
7 filter
根据指定规则对数据进行筛选,满足条件的保留,不满足的丢弃
数据经过筛选后,分区不变,可能会产生数据倾斜
def filter(f: T => Boolean): RDD[T]
指定规则进行过滤,
分区不变,但是分区内的数据可能不均衡,生产环境下 可能出现数据倾斜
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
val dataRDD1 = dataRDD.filter(_%2 == 0)
8 sample
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
指定规则抽取数据
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4
),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子 抽两次之间的关系 设置为一个确定的值 , 多次抽取都是一样的
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
随机数种子的作用
实际开发过程中,往往会出现数据倾斜,从数据倾斜的分区中抽取数据,查看数据的规则 , 分析后,可以进行改善处理
9 distinct
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
将数据集中重复的数据去重
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),1)
val dataRDD1 = dataRDD.distinct()
val dataRDD2 = dataRDD.distinct(2) # 2 指定新的分区数
reduceByKey aggregateByKey flodByKey combineByKey
从源码的角度来看 ,四个算子的底层是相同的
10 coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行晓玲
过多小任务,coalesce方法,合并 减少分区个数 , 减少任务调度成本
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),6)
val dataRDD1 = dataRDD.coalesce(2)
11 repartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
内部还是执行的coalesce操作,参数shuffle的默认值为true
重新分区 可变多也可以变少
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
Coalese & rapartition的区别
Coalesce方法默认情况下不会扩大分区,默认不会进行shuffle。扩大分区是没有意义的
reparation 是打乱重新组合,默认进行shuffle操作 。repatriation底层调用的也是coalesce ,默认shuffle为true
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
12 sortBY
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
排序数据, 排序之前通过f 函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列
分区数一致
val dataRDD = sparkContext.makeRDD(List(
1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
13 pipe
def pipe(command: String): RDD[String]
针对每个分区,都调用一次shell脚本,返回输出的RDD
1) 编写一个脚本,并增加执行权限
[root@linux1 data]# vim pipe.sh
#!/bin/sh
echo "Start"
while read LINE; do
echo ">>>"${
LINE}
done
[root@linux1 data]# chmod 777 pipe.sh
2) 命令行工具中创建一个只有一个分区的RDD
scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"), 1)
3) 将脚本作用该RDD并打印
scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)
2.3.2 双Value类型
两个RDD进行的操作
- 交集 subtract
- 并集 union
- 差集 intersection
- 拉链 zip
15 union
数据合并 ,分区也会合并
def intersection(other: RDD[T]): RDD[T]
返回新的RDD = RDD1 + RDD2
val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
val dataRDD = dataRDD1.intersection(dataRDD2)
16 substract
def subtract(other: RDD[T]): RDD[T]
求差集
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(4,5,6))
val dataRDD = dataRDD1.subtract(dataRDD2)
17 zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
将两个RDD中的元素,以键值对的形式进行合并。
val dataRDD1 = sc.makeRDD(List(1,2,3,4))
val dataRDD2 = sc.makeRDD(List(3,4,5,6))
dataRDD1.zip(dataRDD2).collect()
两个RDD类型不相同
union
zip
intersection
subtract
只有zip可以运行
2.3.2 Key-Value类型
18 partitionBy
patitionBy 参数为分区器对象
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
指定Partitioner重新进行分区。 Spark默认的分区器是HashPartitioner
val rdd: RDD[(Int, String)] =
sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
import org.apache.spark.HashPartitioner
分区器对象:HashPartitioner & RangePartitioner
val rdd2: RDD[(Int, String)] =
rdd.partitionBy(new HashPartitioner(2))
传入的参数为分区器对象
自定义分区器
自己决定数据放置在哪个分区做处理
class MyPartitioner(num:Int) extends Partitioner{
override def numPartitions: Int = {
num}
//根据key来决定数据在哪个分区中进行处理
//方法的返回值表示分区编号(索引)
override def getPartition(key: Any): Int = {
key match{
case "NBA" => 0
case _ => 1
}
}
}
val baskerballl = sc.parallelize(List(
("NBA","热火"),("NBA","灰熊"),("NBA","湖人"),("NBA","马刺"),("","首钢"),
("NBA","骑士"),("NBA","火箭"),("CBA","广州"),("CBA","新疆"),("CBA","山东")
),1)
val NBA = baskerballl.partitionBy(new MyPartitioner(3))
NBA.mapPartitionsWithIndex((index,datas) => {
datas.map(
data => (index,data)
)
}).collect().foreach(println)
19 reduceByKey
分区内和分区间计算规则相同
先分组 再聚合
//TODO 根据数据的Key进行分组,然后对value进行聚合
/*
* 1 ReduceByKey
* */
fileRDD.flatMap(_.split(","))
.map((_,1))
.reduceByKey((x,y) => (x+y)).foreach(println)
// .reduceByKey(_+_).foreach(println)
20 groupByKey
只能根据Key进行分组,相当于groupBy的特殊形式
调用groupByKey之后,返回数据的类型为元组
元组的第一个元素表示用于分组的key
第二个元组表示相同key的value集合
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
将分区数据直接转换为相同类型的内存数组进行后续处理
val dataRDD1 =sc.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = dataRDD1.groupByKey().collect()
val dataRDD3 = dataRDD1.groupByKey(2).collect()
val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2)).collect()
/*
* 3 groupBykey
* */
fileRDD.flatMap(_.split(","))
.map((_,1))
.groupByKey() //(ifeng,CompactBuffer(1, 1, 1, 1, 1, 1))
.map{
case (word,iter) => {
(word,iter.sum)
}
}
// .map(x => {
// (x._1,x._2.sum)
// }).foreach(println)
// fileRDD.map()
reduceByKey & groupByKey的区别
- groupByKey
groupByKey 把一个分区的数据分组后不能继续操作, 需要等待其他分区的数据全部到达后,才能继续执行
但如果是在内存中等待,那么可能会导致内存不足,直接导致执行失败,所有这个等待的过程必须依靠磁盘
一个分区就是一个Task
- reduceByKey
在shuffle阶段减少数据,shuffle之前先聚合(预聚合) conbainer
21 aggregateByKey
aggregate : 聚合
Scala语法,函数柯里化
方法有两个参数列表需要传递参数
第一个参数列表中传递参数为zeroValue:计算的初始值
第二个参数列表中传递的参数为:
seqOp:分区内计算规则
comOp :分区间的计算规则
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
combOp: (U, U) => U): RDD[(K, U)]
根据不同规则进行分区内计算和分区间计算
将数据根据不同的规则进行分区内计算和分区间计算
val dataRDD1 =
sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 =
dataRDD1.aggregateByKey(0)(_+_,_+_)
取出每个分区内相同key的最大值然后分区间相加
// TODO : 取出每个分区内相同key的最大值然后分区间相加
// aggregateByKey算子是函数柯里化,存在两个参数列表
// 1. 第一个参数列表中的参数表示初始值
// 2. 第二个参数列表中含有两个参数
// 2.1 第一个参数表示分区内的计算规则
// 2.2 第二个参数表示分区间的计算规则
val rdd =
sc.makeRDD(List(
("a",1),("a",2),("c",3),
("b",4),("c",5),("c",6)
),2)
// 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
// => (a,10)(b,10)(c,20)
// 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
/*
* aggregateByKey
*
* */
// TODO 分区内求最大值,分区间求和
val rdd = sc.parallelize(List(
("a",2),("a",7),("a",1),
("b",3),("c",5),("b",1)
),2)
rdd.aggregateByKey(0)( //初始值为0 ,下如图紫色(a,0)
(x,y) => math.max(x,y),
(x,y) => x + y
)
22 foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
分区内计算规则 == 分区间计算规则,aggregateByKey就可以简化为foldByKey
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
//val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
23 combineByKey
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致
将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
(_, 1),
(acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
/*
* combineByKey
TODO 每个Key的平均值
*
* */
val rdd1 = sc.parallelize(List(("a",88),("b",95),("a",91),("b",93),("b",98)),2)
// 1 改变value格式,88 ---> (88,1) ---> + 91 = (179,2)
val value = rdd1.combineByKey(
v => (v, 1),
(tuple: (Int, Int), v) => {
(tuple._1 + v, tuple._2 + 1)
},
(t1: (Int, Int), t2: (Int, Int)) => {
(t1._1 + t2._1, t1._2 + t2._2)
}
)
value.map{
case(key,(total,cnt)) => {
(key , total / cnt)
}
}
24 sortByKey
sortBy 使用了RangePartitioner
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)]
在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
小功能:设置key为自定义类User
class User extends Ordered[User]{
override def compare(that: User): Int = {
1
}
}
25 join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD
val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
rdd.join(rdd1).collect().foreach(println)
26 leftOuterJoin
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
类似于SQL语句的左外连接
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
27 cogroup
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
dataRDD1.cogroup(dataRDD2)
2.4 RDD 行动算子
1 reduce
def reduce(f: (T, T) => T): T
聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 聚合数据
val reduceResult: Int = rdd.reduce(_+_)
2 collect
def collect(): Array[T]
Array数组形式返回数据集所有元素
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集数据到Driver
rdd.collect().foreach(println)
3 count
def count(): Long
返回RDD中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回RDD中元素的个数
val countResult: Long = rdd.count()
4 first
def first(): T
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回RDD中元素的个数
val firstResult: Int = rdd.first()
println(firstResult)
5 take
def take(num: Int): Array[T]
返回一个由RDD的前n个元素组成的数组
vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回RDD中元素的个数
val takeResult: Array[Int] = rdd.take(2)
println(takeResult.mkString(","))
6 takeOrdered
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
// 返回RDD中元素的个数
val result: Array[Int] = rdd.takeOrdered(2)
7 aggregate
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
[U: ClassTag]可以理解为是泛型,传递三个值:
zeroValue:初始值,只使用一次
seqOp:函数类型,作用是为每一个partition中的数据遍历应用一次函数 (注意:是每个partition,即1个partition,seqOp会执行一次,3个partition,会执行三次)
combOp:函数类型,在seqOp执行完之后执行,这个参数的输入数据是seqOp函数的输出结果,仅会执行一次,是用来聚合所有partition结果的
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
// 将该RDD所有元素相加得到结果
//val result: Int = rdd.aggregate(0)(_ + _, _ + _)
val result: Int = rdd.aggregate(10)(_ + _, _ + _)
8 fold
def fold(zeroValue: T)(op: (T, T) => T): T
折叠操作,aggreagte的简化版操作
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val foldResult: Int = rdd.fold(0)(_+_)
9 countByCount
def countByKey(): Map[K, Long]
统计每种key的个数
val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
// 统计每种key的个数
val result: collection.Map[Int, Long] = rdd.countByKey()
10 save相关算子
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
保存数据到不同的格式就是
// 保存成Text文件
rdd.saveAsTextFile("output")
// 序列化成对象保存到文件
rdd.saveAsObjectFile("output1")
// 保存成Sequencefile文件
rdd.map((_,1)).saveAsSequenceFile("output2")
11 foreach
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
分布式遍历RDD中的每一个元素,调用指定函数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 收集后打印
rdd.map(num=>num).collect().foreach(println)
println("****************")
// 分布式打印
rdd.foreach(println)
广播变量
sum变量 如果一个Task中存一个的话,会极大浪费 因此采用把 sum 变量提升到 Executor中 ,多个Task共享
//TODO 广播变量实现join
val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
val List111 = List(("a",4),("b",5),("c",6))
// TODO 声明广播变量(只读)
val bcList:Broadcast[List[(String,Int)]] = sc.broadcast(List111)
val rdd2 = rdd1.map{
case (word,count1) => {
var count2 = 0
for(kv <- List111){
val w = kv._1
val v = kv._2
if(w == word){
count2 = v
}
}
(word,(count1,count2))
}
}
println(rdd2.collect().mkString(","))
sc.stop()
}
}
其他Executor访问广播变量
还没有评论,来说两句吧...