7.1 Spark-core概述

亦凉 2022-12-02 10:53 316阅读 0赞

文章目录

  • 1 环境准备
  • 2 RDD
    • 2.1 RDD 创建
      • 2.1.1 从集合(内存)创建RDD
      • 2.1.2 从外部存储(文件)创建RDD
      • 2.1.3 从其他创建RDD
      • 2.1.4 直接创建RDD(new)
    • 2.2 RDD并行度 & 分区
    • 2.3 RDD转换算子
      • 2.3.1 Value类型
          • 1 map
          • 2 mapPartitions
          • map & mapPartitions的区别
          • 3 mapPartitionsWithIndex
          • 4 flatMap 扁平映射
          • 模式匹配
          • 5 glom
          • 6 groupBy
          • 7 filter
          • 8 sample
          • 9 distinct
          • reduceByKey aggregateByKey flodByKey combineByKey
          • 10 coalesce
          • 11 repartition
          • Coalese & rapartition的区别
          • 12 sortBY
            • 13 pipe
      • 2.3.2 双Value类型
          • 15 union
          • 16 substract
          • 17 zip
        • 两个RDD类型不相同
      • 2.3.2 Key-Value类型
          • 18 partitionBy
          • 自定义分区器
          • 19 reduceByKey
          • 20 groupByKey
          • reduceByKey & groupByKey的区别
          • 21 aggregateByKey
          • 22 foldByKey
          • 23 combineByKey
          • 24 sortByKey
          • 25 join
          • 26 leftOuterJoin
          • 27 cogroup
    • 2.4 RDD 行动算子
          • 1 reduce
          • 2 collect
          • 3 count
          • 4 first
          • 5 take
          • 6 takeOrdered
          • 7 aggregate
          • 8 fold
          • 9 countByCount
          • 10 save相关算子
          • 11 foreach
      • 广播变量

1 环境准备

  1. [root@ifeng software]# tar -zxvf spark-2.4.6-bin-2.6.0-cdh5.16.2.tgz -C /home/ifeng/app
  2. HS2/beeline
  3. beeline ***
  4. spark-class
  5. spark-shell ***** 使用于测试
  6. spark-submit ***** 提交Spark作业
  7. spark-sql *****
  8. Spark on YARN
  9. start-shuffle-service.sh
  10. stop-shuffle-service.sh
  11. start-thriftserver.sh <== HiveServer2
  12. stop-thriftserver.sh
  13. start-history-server.sh
  14. stop-history-server.sh
  15. bin/spark-shell --master local[*]

在这里插入图片描述
在这里插入图片描述

  1. sparkContext.textFile("hdfs://ifeng:9000/hdfsapi/wc.txt")
  2. .flatMap(_.split(","))
  3. .map((_,1))
  4. .reduceByKey(_+_).collect()

在这里插入图片描述

2 RDD

  1. /**
  2. * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
  3. * partitioned collection of elements that can be operated on in parallel. This class contains the
  4. * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition,
  5. * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value
  6. * pairs, such as `groupByKey` and `join`;
  7. *
  8. * Internally, each RDD is characterized by five main properties:
  9. *
  10. * - A list of partitions 一堆分区构成
  11. * - A function for computing each split 一个方法作用在一个分区上的
  12. * - A list of dependencies on other RDDs 新的RDD依赖前者 R1----> R2 ----> R3
  13. * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  14. * kv 可以作用上分区器
  15. * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for
  16. * an HDFS file)
  17. * 作业调度到数据的最佳位置
  18. */
  19. 1 一堆分区构成
  20. protected def getPartitions: Array[Partition]
  21. 2 一个方法作用在一个分区上的
  22. def compute(split: Partition, context: TaskContext): Iterator[T]
  23. 3 新的RDD依赖前者 R1----> R2 ----> R3
  24. protected def getDependencies: Seq[Dependency[_]] = deps
  25. 4 kv 可以作用上分区器
  26. val partitioner: Option[Partitioner] = None
  27. 5 作业调度到数据的最佳位置
  28. protected def getPreferredLocations(split: Partition): Seq[String] = Nil(空List)
  29. case object Nil extends List[Nothing]

2.1 RDD 创建

Spark中创建RDD的方式分为四种:

2.1.1 从集合(内存)创建RDD

从集合中创建RDD 有parallelize 和 makeRDD 两个方法

  1. parallelize

    val rdd1 = sc.parallelize(List(1,2,3,4))

    1. rdd1.collect().foreach(println)
  2. makeRDD

    val rdd2 = sc.makeRDD(List(1,2,3,4))

    1. rdd2.collect().foreach(println)

从底层实现来看 ,makeRDD方法其实也是调用的parallelize方法

  1. def makeRDD[T: ClassTag](
  2. seq: Seq[T],
  3. numSlices: Int = defaultParallelism): RDD[T] = withScope {
  4. parallelize(seq, numSlices)
  5. }

2.1.2 从外部存储(文件)创建RDD

本地文件系统、HSFS

  1. sc.textFile("hdfs://ifeng:9000/hdfsapi/wc.txt")
  2. .flatMap(_.split(","))
  3. .map((_,1))
  4. .reduceByKey(_+_).collect()

2.1.3 从其他创建RDD

2.1.4 直接创建RDD(new)

使用new的方式直接构造RDD,一般由Spark框架自身使用

2.2 RDD并行度 & 分区

默认,Spark将一个作业切分成多个,分发给Executor节点进行并行计算,能够并行计算的任务数量我们称为并行度

可以在创建RDD时指定并行度,并行度 != 切分任务的数量

  1. import org.apache.spark.rdd.RDD
  2. val dataRDD:RDD[Int] = sc.makeRDD(List(1,2,3,4),4)

读取内存数据的时候,可以安装并行度进行分区操作

  1. def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
  2. (0 until numSlices).iterator.map {
  3. i =>
  4. val start = ((i * length) / numSlices).toInt
  5. val end = (((i + 1) * length) / numSlices).toInt
  6. (start, end)
  7. }
  8. }

读文件,按照Hadoop文件读取的规则进行切片分区

切片规则和数据读取的规则有些差异

  1. public InputSplit[] getSplits(JobConf job, int numSplits)
  2. throws IOException {
  3. long totalSize = 0; // compute total size
  4. for (FileStatus file: files) {
  5. // check we have valid files
  6. if (file.isDirectory()) {
  7. throw new IOException("Not a file: "+ file.getPath());
  8. }
  9. totalSize += file.getLen();
  10. }
  11. long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
  12. long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
  13. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize);
  14. ...
  15. for (FileStatus file: files) {
  16. ...
  17. if (isSplitable(fs, path)) {
  18. long blockSize = file.getBlockSize();
  19. long splitSize = computeSplitSize(goalSize, minSize, blockSize);
  20. ...
  21. }
  22. protected long computeSplitSize(long goalSize, long minSize,
  23. long blockSize) {
  24. return Math.max(minSize, Math.min(goalSize, blockSize));
  25. }

2.3 RDD转换算子

RDD根据处理方式不同将算子整体上划分为

  • Value类型
  • 双value类型
  • Key-Value类型

2.3.1 Value类型

1 map
  1. def map[U: ClassTag](f: T => U): RDD[U]

将处理的数据烛台进行映射转换, 可以是 类型的转换 、 值的转换

默认分区数量不变

在这里插入图片描述

  1. /*
  2. * 1 map
  3. * */
  4. // 转换:旧RDD= => 算子 => 新RDD
  5. val rdd1 = sc.parallelize(List(1, 2, 3, 4))
  6. rdd1.map((i : Int) => {
  7. i * 2})
  8. rdd1.map((i : Int) => i * 2)
  9. rdd1.map((i) => i * 2)
  10. rdd1.map(i => i *2 )
  11. rdd1.map(_ * 2)

map执行原理

在这里插入图片描述

分区按照分区内顺序执行 , 第一条数据走完了全部流程 ,第二条数据才会走
分区间的执行是没有顺序的 ,不确定

2 mapPartitions
  1. def mapPartitions[U: ClassTag](
  2. f: Iterator[T] => Iterator[U],
  3. preservesPartitioning: Boolean = false): RDD[U]

将待处理的数据以分区为单位发送到计算节点进行处理

  1. val dataRDD1: RDD[Int] = dataRDD.mapPartitions(
  2. datas => {
  3. datas.filter(_==2)
  4. }
  5. )
  6. RDD3.mapPartitions(_.filter(_==2)).collect()

在这里插入图片描述

map方法是全量数据的操作,不能丢失数据 不能做过滤

mapPartitions 一次性过去分区的所i有数据,可以执行迭代器集合的所有操作 过滤 max min…

在这里插入图片描述
返回值要是是迭代器 ,需要转换

在这里插入图片描述

map & mapPartitions的区别

map 一次只处理一个数据

mapPartitions算子每一次都处理一个分区的数据
如果有一个分区的数据没有完全处理完毕,那么所有的数据都不会释放,容易出现OOM
当内存足够的时候,推荐使用mapPartition

3 mapPartitionsWithIndex

在这里插入图片描述

  1. def mapPartitionsWithIndex[U: ClassTag](
  2. f: (Int, Iterator[T]) => Iterator[U],
  3. preservesPartitioning: Boolean = false): RDD[U]

处理的同时获取当前分区的索引 , 可以知道数据在哪个分区

  1. dataRDD3.mapPartitionsWithIndex(
  2. (index, datas) => {
  3. datas.map(index, _)
  4. }
  5. ).collect()
  6. RDD3.mapPartitionsWithIndex(_.map())

在这里插入图片描述

map综合练习

在这里插入图片描述
逻辑处理是正确的,但是需要返回iter ,if else返回的是() , 向上取就是Any

  1. /*
  2. * map 综合练习
  3. * 获取第二个分区的数据
  4. * */
  5. val dataRDD = sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
  6. val rdd = dataRDD.mapPartitionsWithIndex((index,iter) =>{
  7. if(index == 2){
  8. iter
  9. }else{
  10. Nil.iterator
  11. }
  12. })
4 flatMap 扁平映射
  1. def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]

将数据进行扁平化后再进行映射处理,称为扁平映射

  1. val dataRDD = sc.makeRDD(List(
  2. List(1,2),List(3,4)
  3. ),1)
  4. val dataRDD1 = dataRDD.flatMap(
  5. list => list
  6. ).collect()

在这里插入图片描述

模式匹配

3 不为 List , 应该进行模式匹配

  1. /*
  2. * 将List(List(1,2),3,List(4,5)) 进行扁平化操作
  3. * */
  4. val rddList = sc.parallelize(List(List(1, 2), 3, List(4, 5)))
  5. rddList.flatMap( data => {
  6. data match{
  7. case list: List[_] => list
  8. case d => List(d)
  9. }
  10. })
5 glom
  1. def glom(): RDD[Array[T]]

将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

  1. val dataRDD = sc.makeRDD(List(
  2. 1,2,3,4
  3. ),1)
  4. val dataRDD1:RDD[Array[Int]] = dataRDD.glom()
  5. /*
  6. * glom
  7. * 计算所有分区最大值求和
  8. * */
  9. val value = sc.parallelize(List(1, 2, 3, 4, 5, 7, 6, 8, 9), 3)
  10. //将每个分区转换成为数组
  11. val value1: RDD[Array[Int]] = value.glom()
  12. //从数组中获取最大值
  13. val value2: RDD[Int] = value1.map(x => x.max)
  14. //取出最大值求和
  15. println(value2.sum)
6 groupBy
  1. def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]

根据直送的规则进行分区,分区默认不变,数据被打乱进行重新分组,此过程称为shuffle

极限情况下,数据有可能被分到同一个分区中

一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

  1. val dataRDD = sc.makeRDD(List(1,2,3,4),1)
  2. val dataRDD1 = dataRDD.groupBy(
  3. _%2
  4. ).collect()

在这里插入图片描述

  • groupBy 方法根据指定的规则进行分组,指定的规则的返回值就是分组的key
  • groupBy 方法的返回值为元组

    • 元组的第一个元素,表示分组的key
    • 元组的第二个元素,表示相同key的数据形成的可迭代的集合
  • groupBy方法执行后 ,会将数据进行分组操作,但是分区是不会改变的

    • 不同组的数据 会打乱 再分入到不同的分区当中
    • shuffle可能会存在空的分区,导致数据不均匀
    • 可以传参数决定下游分区的数量

在这里插入图片描述
分组减少了吗???
在这里插入图片描述
打印结果为 3个分区

在这里插入图片描述
会产生一个空的分区

把上游分区的数据重新打乱组合到下游的分区当中,称为 shuffle

  1. /*
  2. * 2 groupBy
  3. * */
  4. fileRDD.flatMap(_.split(","))
  5. .groupBy(x => x) //(ifeng,CompactBuffer(ifeng, ifeng, ifeng, ifeng, ifeng, ifeng))
  6. .map {
  7. case (word, iter) => {
  8. (word, iter.size)
  9. }
  10. }.collect().mkString(",")
  11. // .map(x => {
  12. // (x._1 , x._2.size)
  13. // }).foreach(println)
7 filter

根据指定规则对数据进行筛选,满足条件的保留,不满足的丢弃

数据经过筛选后,分区不变,可能会产生数据倾斜

  1. def filter(f: T => Boolean): RDD[T]

指定规则进行过滤,
分区不变,但是分区内的数据可能不均衡,生产环境下 可能出现数据倾斜

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4
  3. ),1)
  4. val dataRDD1 = dataRDD.filter(_%2 == 0)
8 sample
  1. def sample(
  2. withReplacement: Boolean,
  3. fraction: Double,
  4. seed: Long = Utils.random.nextLong): RDD[T]

指定规则抽取数据

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4
  3. ),1)
  4. // 抽取数据不放回(伯努利算法)
  5. // 伯努利算法:又叫0、1分布。例如扔硬币,要么正面,要么反面。
  6. // 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
  7. // 第一个参数:抽取的数据是否放回,false:不放回
  8. // 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
  9. // 第三个参数:随机数种子 抽两次之间的关系 设置为一个确定的值 , 多次抽取都是一样的
  10. val dataRDD1 = dataRDD.sample(false, 0.5)
  11. // 抽取数据放回(泊松算法)
  12. // 第一个参数:抽取的数据是否放回,true:放回;false:不放回
  13. // 第二个参数:重复数据的几率,范围大于等于0.表示每一个元素被期望抽取到的次数
  14. // 第三个参数:随机数种子
  15. val dataRDD2 = dataRDD.sample(true, 2)

随机数种子的作用

实际开发过程中,往往会出现数据倾斜,从数据倾斜的分区中抽取数据,查看数据的规则 , 分析后,可以进行改善处理

9 distinct
  1. def distinct()(implicit ord: Ordering[T] = null): RDD[T]
  2. def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

将数据集中重复的数据去重

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),1)
  4. val dataRDD1 = dataRDD.distinct()
  5. val dataRDD2 = dataRDD.distinct(2) # 2 指定新的分区数

在这里插入图片描述
在这里插入图片描述

reduceByKey aggregateByKey flodByKey combineByKey

从源码的角度来看 ,四个算子的底层是相同的

10 coalesce
  1. def coalesce(numPartitions: Int, shuffle: Boolean = false,
  2. partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
  3. (implicit ord: Ordering[T] = null)
  4. : RDD[T]

根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行晓玲

过多小任务,coalesce方法,合并 减少分区个数 , 减少任务调度成本

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),6)
  4. val dataRDD1 = dataRDD.coalesce(2)
11 repartition
  1. def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]

内部还是执行的coalesce操作,参数shuffle的默认值为true

重新分区 可变多也可以变少

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),2)
  4. val dataRDD1 = dataRDD.repartition(4)
Coalese & rapartition的区别

Coalesce方法默认情况下不会扩大分区,默认不会进行shuffle。扩大分区是没有意义的
在这里插入图片描述

reparation 是打乱重新组合,默认进行shuffle操作 。repatriation底层调用的也是coalesce ,默认shuffle为true

  1. def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  2. coalesce(numPartitions, shuffle = true)
  3. }
12 sortBY
  1. def sortBy[K](
  2. f: (T) => K,
  3. ascending: Boolean = true,
  4. numPartitions: Int = this.partitions.length)
  5. (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]

排序数据, 排序之前通过f 函数进行处理,之后按照f函数处理的结果进行排序,默认为正序排列

分区数一致

  1. val dataRDD = sparkContext.makeRDD(List(
  2. 1,2,3,4,1,2
  3. ),2)
  4. val dataRDD1 = dataRDD.sortBy(num=>num, false, 4)
13 pipe
  1. def pipe(command: String): RDD[String]

针对每个分区,都调用一次shell脚本,返回输出的RDD

  1. 1) 编写一个脚本,并增加执行权限
  2. [root@linux1 data]# vim pipe.sh
  3. #!/bin/sh
  4. echo "Start"
  5. while read LINE; do
  6. echo ">>>"${
  7. LINE}
  8. done
  9. [root@linux1 data]# chmod 777 pipe.sh
  10. 2) 命令行工具中创建一个只有一个分区的RDD
  11. scala> val rdd = sc.makeRDD(List("hi","Hello","how","are","you"), 1)
  12. 3) 将脚本作用该RDD并打印
  13. scala> rdd.pipe("/opt/module/spark/pipe.sh").collect()
  14. res18: Array[String] = Array(Start, >>>hi, >>>Hello, >>>how, >>>are, >>>you)

在这里插入图片描述

2.3.2 双Value类型

两个RDD进行的操作

  • 交集 subtract
  • 并集 union
  • 差集 intersection
  • 拉链 zip
15 union

数据合并 ,分区也会合并

  1. def intersection(other: RDD[T]): RDD[T]

返回新的RDD = RDD1 + RDD2

  1. val dataRDD1 = sparkContext.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sparkContext.makeRDD(List(3,4,5,6))
  3. val dataRDD = dataRDD1.intersection(dataRDD2)
16 substract
  1. def subtract(other: RDD[T]): RDD[T]

求差集

  1. val dataRDD1 = sc.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sc.makeRDD(List(4,5,6))
  3. val dataRDD = dataRDD1.subtract(dataRDD2)

在这里插入图片描述

17 zip
  1. def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]

将两个RDD中的元素,以键值对的形式进行合并。

  1. val dataRDD1 = sc.makeRDD(List(1,2,3,4))
  2. val dataRDD2 = sc.makeRDD(List(3,4,5,6))
  3. dataRDD1.zip(dataRDD2).collect()

在这里插入图片描述

两个RDD类型不相同

union
zip
intersection
subtract

只有zip可以运行

2.3.2 Key-Value类型

18 partitionBy

patitionBy 参数为分区器对象

  1. def partitionBy(partitioner: Partitioner): RDD[(K, V)]

指定Partitioner重新进行分区。 Spark默认的分区器是HashPartitioner

  1. val rdd: RDD[(Int, String)] =
  2. sc.makeRDD(Array((1,"aaa"),(2,"bbb"),(3,"ccc")),3)
  3. import org.apache.spark.HashPartitioner
  4. 分区器对象:HashPartitioner & RangePartitioner
  5. val rdd2: RDD[(Int, String)] =
  6. rdd.partitionBy(new HashPartitioner(2))

传入的参数为分区器对象

在这里插入图片描述

自定义分区器

自己决定数据放置在哪个分区做处理

  1. class MyPartitioner(num:Int) extends Partitioner{
  2. override def numPartitions: Int = {
  3. num}
  4. //根据key来决定数据在哪个分区中进行处理
  5. //方法的返回值表示分区编号(索引)
  6. override def getPartition(key: Any): Int = {
  7. key match{
  8. case "NBA" => 0
  9. case _ => 1
  10. }
  11. }
  12. }
  13. val baskerballl = sc.parallelize(List(
  14. ("NBA","热火"),("NBA","灰熊"),("NBA","湖人"),("NBA","马刺"),("","首钢"),
  15. ("NBA","骑士"),("NBA","火箭"),("CBA","广州"),("CBA","新疆"),("CBA","山东")
  16. ),1)
  17. val NBA = baskerballl.partitionBy(new MyPartitioner(3))
  18. NBA.mapPartitionsWithIndex((index,datas) => {
  19. datas.map(
  20. data => (index,data)
  21. )
  22. }).collect().foreach(println)
19 reduceByKey

分区内和分区间计算规则相同

先分组 再聚合
//TODO 根据数据的Key进行分组,然后对value进行聚合

  1. /*
  2. * 1 ReduceByKey
  3. * */
  4. fileRDD.flatMap(_.split(","))
  5. .map((_,1))
  6. .reduceByKey((x,y) => (x+y)).foreach(println)
  7. // .reduceByKey(_+_).foreach(println)
20 groupByKey

只能根据Key进行分组,相当于groupBy的特殊形式

调用groupByKey之后,返回数据的类型为元组
元组的第一个元素表示用于分组的key
第二个元组表示相同key的value集合

  1. def groupByKey(): RDD[(K, Iterable[V])]
  2. def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
  3. def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]

将分区数据直接转换为相同类型的内存数组进行后续处理

  1. val dataRDD1 =sc.makeRDD(List(("a",1),("b",2),("c",3)))
  2. val dataRDD2 = dataRDD1.groupByKey().collect()
  3. val dataRDD3 = dataRDD1.groupByKey(2).collect()
  4. val dataRDD4 = dataRDD1.groupByKey(new HashPartitioner(2)).collect()
  5. /*
  6. * 3 groupBykey
  7. * */
  8. fileRDD.flatMap(_.split(","))
  9. .map((_,1))
  10. .groupByKey() //(ifeng,CompactBuffer(1, 1, 1, 1, 1, 1))
  11. .map{
  12. case (word,iter) => {
  13. (word,iter.sum)
  14. }
  15. }
  16. // .map(x => {
  17. // (x._1,x._2.sum)
  18. // }).foreach(println)
  19. // fileRDD.map()
reduceByKey & groupByKey的区别
  1. groupByKey

groupByKey 把一个分区的数据分组后不能继续操作, 需要等待其他分区的数据全部到达后,才能继续执行

但如果是在内存中等待,那么可能会导致内存不足,直接导致执行失败,所有这个等待的过程必须依靠磁盘

一个分区就是一个Task

在这里插入图片描述

  1. reduceByKey

在shuffle阶段减少数据,shuffle之前先聚合(预聚合) conbainer

在这里插入图片描述

21 aggregateByKey

aggregate : 聚合

Scala语法,函数柯里化
方法有两个参数列表需要传递参数
第一个参数列表中传递参数为zeroValue:计算的初始值
第二个参数列表中传递的参数为:
seqOp:分区内计算规则
comOp :分区间的计算规则

  1. def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
  2. combOp: (U, U) => U): RDD[(K, U)]

根据不同规则进行分区内计算和分区间计算

  1. 将数据根据不同的规则进行分区内计算和分区间计算
  2. val dataRDD1 =
  3. sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  4. val dataRDD2 =
  5. dataRDD1.aggregateByKey(0)(_+_,_+_)
  6. 取出每个分区内相同key的最大值然后分区间相加
  7. // TODO : 取出每个分区内相同key的最大值然后分区间相加
  8. // aggregateByKey算子是函数柯里化,存在两个参数列表
  9. // 1. 第一个参数列表中的参数表示初始值
  10. // 2. 第二个参数列表中含有两个参数
  11. // 2.1 第一个参数表示分区内的计算规则
  12. // 2.2 第二个参数表示分区间的计算规则
  13. val rdd =
  14. sc.makeRDD(List(
  15. ("a",1),("a",2),("c",3),
  16. ("b",4),("c",5),("c",6)
  17. ),2)
  18. // 0:("a",1),("a",2),("c",3) => (a,10)(c,10)
  19. // => (a,10)(b,10)(c,20)
  20. // 1:("b",4),("c",5),("c",6) => (b,10)(c,10)
  21. /*
  22. * aggregateByKey
  23. *
  24. * */
  25. // TODO 分区内求最大值,分区间求和
  26. val rdd = sc.parallelize(List(
  27. ("a",2),("a",7),("a",1),
  28. ("b",3),("c",5),("b",1)
  29. ),2)
  30. rdd.aggregateByKey(0)( //初始值为0 ,下如图紫色(a,0)
  31. (x,y) => math.max(x,y),
  32. (x,y) => x + y
  33. )

在这里插入图片描述

22 foldByKey
  1. def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

分区内计算规则 == 分区间计算规则,aggregateByKey就可以简化为foldByKey

  1. val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  2. //val dataRDD2 = dataRDD1.aggregateByKey(0)(_+_,_+_)
  3. val dataRDD2 = dataRDD1.foldByKey(0)(_+_)
23 combineByKey
  1. def combineByKey[C](
  2. createCombiner: V => C,
  3. mergeValue: (C, V) => C,
  4. mergeCombiners: (C, C) => C): RDD[(K, C)]

最通用的对key-value型rdd进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致

  1. 将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值
  2. val list: List[(String, Int)] = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))
  3. val input: RDD[(String, Int)] = sc.makeRDD(list, 2)
  4. val combineRdd: RDD[(String, (Int, Int))] = input.combineByKey(
  5. (_, 1),
  6. (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),
  7. (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

在这里插入图片描述
在这里插入图片描述

  1. /*
  2. * combineByKey
  3. TODO 每个Key的平均值
  4. *
  5. * */
  6. val rdd1 = sc.parallelize(List(("a",88),("b",95),("a",91),("b",93),("b",98)),2)
  7. // 1 改变value格式,88 ---> (88,1) ---> + 91 = (179,2)
  8. val value = rdd1.combineByKey(
  9. v => (v, 1),
  10. (tuple: (Int, Int), v) => {
  11. (tuple._1 + v, tuple._2 + 1)
  12. },
  13. (t1: (Int, Int), t2: (Int, Int)) => {
  14. (t1._1 + t2._1, t1._2 + t2._2)
  15. }
  16. )
  17. value.map{
  18. case(key,(total,cnt)) => {
  19. (key , total / cnt)
  20. }
  21. }
24 sortByKey

sortBy 使用了RangePartitioner

  1. def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
  2. : RDD[(K, V)]

在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的

  1. val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  2. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
  3. val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
  4. 小功能:设置key为自定义类User
  5. class User extends Ordered[User]{
  6. override def compare(that: User): Int = {
  7. 1
  8. }
  9. }
25 join
  1. def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素连接在一起的(K,(V,W))的RDD

  1. val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "a"), (2, "b"), (3, "c")))
  2. val rdd1: RDD[(Int, Int)] = sc.makeRDD(Array((1, 4), (2, 5), (3, 6)))
  3. rdd.join(rdd1).collect().foreach(println)
26 leftOuterJoin
  1. def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
  2. 类似于SQL语句的左外连接
  3. val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  4. val dataRDD2 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
  5. val rdd: RDD[(String, (Int, Option[Int]))] = dataRDD1.leftOuterJoin(dataRDD2)
27 cogroup
  1. def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
  2. 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD
  3. val dataRDD1 = sparkContext.makeRDD(List(("a",1),("a",2),("c",3)))
  4. val dataRDD2 = sparkContext.makeRDD(List(("a",1),("c",2),("c",3)))
  5. val value: RDD[(String, (Iterable[Int], Iterable[Int]))] =
  6. dataRDD1.cogroup(dataRDD2)

2.4 RDD 行动算子

1 reduce
  1. def reduce(f: (T, T) => T): T
  2. 聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
  3. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  4. // 聚合数据
  5. val reduceResult: Int = rdd.reduce(_+_)
2 collect
  1. def collect(): Array[T]

Array数组形式返回数据集所有元素

  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. // 收集数据到Driver
  3. rdd.collect().foreach(println)
3 count
  1. def count(): Long

返回RDD中元素的个数

  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. // 返回RDD中元素的个数
  3. val countResult: Long = rdd.count()
4 first
  1. def first(): T
  2. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  3. // 返回RDD中元素的个数
  4. val firstResult: Int = rdd.first()
  5. println(firstResult)
5 take
  1. def take(num: Int): Array[T]
  2. 返回一个由RDD的前n个元素组成的数组
  3. vval rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  4. // 返回RDD中元素的个数
  5. val takeResult: Array[Int] = rdd.take(2)
  6. println(takeResult.mkString(","))
6 takeOrdered
  1. def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  2. val rdd: RDD[Int] = sc.makeRDD(List(1,3,2,4))
  3. // 返回RDD中元素的个数
  4. val result: Array[Int] = rdd.takeOrdered(2)
7 aggregate
  1. def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
  2. [U: ClassTag]可以理解为是泛型,传递三个值:
  3. zeroValue:初始值,只使用一次
  4. seqOp:函数类型,作用是为每一个partition中的数据遍历应用一次函数 (注意:是每个partition,即1partitionseqOp会执行一次,3partition,会执行三次)
  5. combOp:函数类型,在seqOp执行完之后执行,这个参数的输入数据是seqOp函数的输出结果,仅会执行一次,是用来聚合所有partition结果的

分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

  1. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
  2. // 将该RDD所有元素相加得到结果
  3. //val result: Int = rdd.aggregate(0)(_ + _, _ + _)
  4. val result: Int = rdd.aggregate(10)(_ + _, _ + _)
8 fold

def fold(zeroValue: T)(op: (T, T) => T): T

折叠操作,aggreagte的简化版操作

  1. val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
  2. val foldResult: Int = rdd.fold(0)(_+_)
9 countByCount
  1. def countByKey(): Map[K, Long]

统计每种key的个数

  1. val rdd: RDD[(Int, String)] = sc.makeRDD(List((1, "a"), (1, "a"), (1, "a"), (2, "b"), (3, "c"), (3, "c")))
  2. // 统计每种key的个数
  3. val result: collection.Map[Int, Long] = rdd.countByKey()
10 save相关算子
  1. def saveAsTextFile(path: String): Unit
  2. def saveAsObjectFile(path: String): Unit
  3. def saveAsSequenceFile(
  4. path: String,
  5. codec: Option[Class[_ <: CompressionCodec]] = None): Unit

保存数据到不同的格式就是

  1. // 保存成Text文件
  2. rdd.saveAsTextFile("output")
  3. // 序列化成对象保存到文件
  4. rdd.saveAsObjectFile("output1")
  5. // 保存成Sequencefile文件
  6. rdd.map((_,1)).saveAsSequenceFile("output2")
11 foreach
  1. def foreach(f: T => Unit): Unit = withScope {
  2. val cleanF = sc.clean(f)
  3. sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
  4. }

分布式遍历RDD中的每一个元素,调用指定函数

  1. val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
  2. // 收集后打印
  3. rdd.map(num=>num).collect().foreach(println)
  4. println("****************")
  5. // 分布式打印
  6. rdd.foreach(println)

广播变量

sum变量 如果一个Task中存一个的话,会极大浪费 因此采用把 sum 变量提升到 Executor中 ,多个Task共享
在这里插入图片描述

  1. //TODO 广播变量实现join
  2. val rdd1 = sc.parallelize(List(("a",1),("b",2),("c",3)))
  3. val List111 = List(("a",4),("b",5),("c",6))
  4. // TODO 声明广播变量(只读)
  5. val bcList:Broadcast[List[(String,Int)]] = sc.broadcast(List111)
  6. val rdd2 = rdd1.map{
  7. case (word,count1) => {
  8. var count2 = 0
  9. for(kv <- List111){
  10. val w = kv._1
  11. val v = kv._2
  12. if(w == word){
  13. count2 = v
  14. }
  15. }
  16. (word,(count1,count2))
  17. }
  18. }
  19. println(rdd2.collect().mkString(","))
  20. sc.stop()
  21. }
  22. }

其他Executor访问广播变量

在这里插入图片描述

发表评论

表情:
评论列表 (有 0 条评论,316人围观)

还没有评论,来说两句吧...

相关阅读

    相关 初学者sparkCore入门

    1,概述 Spark 是一个基于内存的用于处理、分析大数据的集群计算框架它提供了一套简单的编程接口,从而使得应用程序开发者方便使用集群节点的 CPU 、内存、存储资源来处

    相关 SparkCore杂记一

             接触Spark也有一段时间了,最开始一直都是上网看一些博客,自己搭建个虚拟机倒腾,都是一些零散的学习,回头想想还是有必要系统性的学习、理解一遍,本系列博客将会

    相关 SparkCore(一)

    1.什么是RDD RDD分布式数据集,是Spark中最基础的数据抽象,代码中谁一个抽象类,它代表一不可变、可分区、元素可并行计算的集合。 1.2RDD的特点 分区