Spark Programming Guide

客官°小女子只卖身不卖艺 2023-06-18 12:53 138阅读 0赞

Table of Contents

概述

构建spark程序

初始化 Spark

Spark-shell

RDD

并行集合

外部数据集

RDD算子

基础

将函数作为参数传递

闭包

Shuffle

RDD 持久化

如何选择存储级别

共享变量

Broadcast

Accumulators

部署到集群

java和scala的启动方式


概述

每个 Spark 程序都有一个 Driver 程序,该程序用来执行 main 函数,创建 SparkContext,准备 Spark 程序的执行环境。弹性分布式数据集 RDD,是Spark 中的基本数据结构,通过 HDFS 文件系统或者另一个 RDD 来创建。也可以持久化到内存中。

第二个抽象的概念是可以在并行的任务中使用相同的变量,即共享变量。Spark支持两种类型的共享变量:广播变量(可用于在所有节点的内存中缓存一个值)和累加器(仅“添加”到其中的变量,如计数器和)。

构建spark程序

默认情况下,Spark 2.1.1是使用Scala 2.11构建和发布的。(Spark也可以与Scala的其他版本兼容。)要用 Scala 编写应用程序,你需要使用一个兼容的Scala版本(例如2.11.X)。

  1. <scala.binary.version>2.11</scala.binary.version>
  2. <scala.version>2.11.12</scala.version>
  3. <spark.version>2.1.1</spark.version>
  4. <dependency>
  5. <groupId>org.apache.spark</groupId>
  6. <artifactId>spark-core_${scala.binary.version}</artifactId>
  7. <version>${spark.version}</version>
  8. </dependency>

需要导入的类

  1. import org.apache.spark.SparkContext
  2. import org.apache.spark.SparkConf

初始化 Spark

先创建一个 SparkConf 配置信息对象,再创建一个 SparkContext 上下文对象。

  1. val conf = new SparkConf().setAppName(appName).setMaster(master)
  2. val context = new SparkContext(conf)

每个 JVM 只能激活一个 SparkContext。在创建新上下文之前,必须 context.stop() 来停止当前活动的 SparkContext。

appName 是你的程序在集群 UI 界面上显示的名称。master 是 Messo、Yarn 或 local 本地模式。

Spark-shell

使用 Spark-shell 已经创建了一个默认的 SparkContext 对象,可以通过 —master 指定提交的资源管理集群,通过 —jars 传一个都好分隔的 jar 添加到 classpath 中,通过 —packages 自定maven 依赖。如下使用4个core来执行:

  1. $ ./bin/spark-shell --master local[4]

添加 jar 到 classpath

  1. $ ./bin/spark-shell --master local[4] --jars code.jar

指定 maven 坐标

  1. $ ./bin/spark-shell --master local[4] --packages "org.example:example:0.1"
  2. [root@single bin]# ./spark-shell --help
  3. Usage: ./bin/spark-shell [options]
  4. Options:
  5. --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local.
  6. --deploy-mode DEPLOY_MODE Whether to launch the driver program locally ("client") or
  7. on one of the worker machines inside the cluster ("cluster")
  8. (Default: client).
  9. --class CLASS_NAME 用户程序 main class (for Java / Scala apps).
  10. --name NAME application 名称
  11. --jars JARS driver executer classpath上的逗号分隔的本地jar
  12. --packages 格式为:groupId:artifactId:version
  13. --exclude-packages groupId:artifactId,避免 --packages 中的依赖冲突
  14. --repositories 附加远程存储库的逗号分隔列表,搜索——包给出的maven坐标。
  15. --py-files PY_FILES 要放置的.zip、.egg或.py文件的逗号分隔列表,Python应用程序的
  16. Python路径。
  17. --files FILES 要放置在工作中的文件的逗号分隔列表
  18. 每个 executer 的目录。
  19. --conf PROP=VALUE 任意 spark 的配置属性
  20. --properties-file FILE 加载额外属性的文件的路径。 如果不
  21. 指定将使用 conf/spark-defaults.conf.
  22. --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 1024M).
  23. --driver-java-options Extra Java options to pass to the driver.
  24. --driver-library-path Extra library path entries to pass to the driver.
  25. --driver-class-path Extra class path entries to pass to the driver. Note that
  26. jars added with --jars are automatically included in the
  27. classpath.
  28. --executor-memory MEM Memory per executor (e.g. 1000M, 2G) (Default: 1G).
  29. --proxy-user NAME User to impersonate when submitting the application.
  30. This argument does not work with --principal / --keytab.
  31. --help, -h Show this help message and exit.
  32. --verbose, -v Print additional debug output.
  33. --version, Print the version of current Spark.
  34. Spark standalone with cluster deploy mode only:
  35. --driver-cores NUM Cores for driver (Default: 1).
  36. Spark standalone or Mesos with cluster deploy mode only:
  37. --supervise If given, restarts the driver on failure.
  38. --kill SUBMISSION_ID If given, kills the driver specified.
  39. --status SUBMISSION_ID If given, requests the status of the driver specified.
  40. Spark standalone and Mesos only:
  41. --total-executor-cores NUM Total cores for all executors.
  42. Spark standalone and YARN only:
  43. --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
  44. or all available cores on the worker in standalone mode)
  45. YARN-only:
  46. --driver-cores NUM Number of cores used by the driver, only in cluster mode
  47. (Default: 1).
  48. --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
  49. --num-executors NUM Number of executors to launch (Default: 2).
  50. If dynamic allocation is enabled, the initial number of
  51. executors will be at least NUM.
  52. --archives ARCHIVES Comma separated list of archives to be extracted into the
  53. working directory of each executor.
  54. --principal PRINCIPAL Principal to be used to login to KDC, while running on
  55. secure HDFS.
  56. --keytab KEYTAB The full path to the file that contains the keytab for the
  57. principal specified above. This keytab will be copied to
  58. the node running the Application Master via the Secure
  59. Distributed Cache, for renewing the login tickets and the
  60. delegation tokens periodically.

RDD

创建 RDD 有两种方法:并行化驱动程序中的现有集合,或者引用外部存储系统中的数据集,比如共享文件系统、HDFS、HBase 或任何提供 Hadoop InputFormat 的数据源。

并行集合

并行化集合是通过在程序中调用 SparkContext 的 parallelize 方法来创建的。将集合的元素复制以形成可并行操作的分布式数据集。例如,这里是如何创建一个并行的集合容纳数字1到5:

  1. val data = Array(1, 2, 3, 4, 5)
  2. val distData = sc.parallelize(data)

创建完成之后,就可以对 RDD 进行操作了,如使用 distData.reduce((a, b) => a + b) 对列表中的数据进行累加。

并行集合的计算的一个重要参数是按指定数量对数据进行分区。每一个分区对应一个 task,一般情况下一个 CPU 对应2-4个分区,spark 集群会自动设置分区数量。也可以手动将分区数量作为第二个参数传入:sc.parallelize(data, 10)。

外部数据集

Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持文本文件、Sequence 文件和任何其他 Hadoop InputFormat。

使用 SparkContext 的 textFile 方法创建文本文件 RDD,获取文件的地址:本地路径、hdfs:\\、s3n://,并将其作为行 list 读取,如下实例:

  1. scala> val distFile = sc.textFile("data.txt")
  2. distFile: org.apache.spark.rdd.RDD[String] = data.txt MapPartitionsRDD[10] at textFile at <console>:26

创建完成之后,就可以对数据集进行计算,如使用 distFile.map(s=>s.length).reduce((a,b)=>a+b) 来计算字符数量。

以下是关于读文件的注意事项:

  • 如果使用本地路径,则文件必须在所有 work 节点上,或者共享的网络上。
  • Spark 所有基于文件的输入方法,支持目录、压缩文件和通配符,例如:textFile("/my/directory"), textFile("/my/directory/*.txt"), and textFile("/my/directory/*.gz")
  • textFile 还接收第二个参数用来控制分区的数量。默认情况下,一个数据块产生一个分区,可以通过传入第二个参数来增加分区,注意:分区的数量不能少于数据块的数量

除了文本文件,scala 还支持其他几种数据格式:

  • sparkConext.wholeTextFiles 可以读取包含多个小文件的目录,并以(filename,content)键值对的形式返回,与 textFile 相反,每个文件每行作为一条记录返回。
  • 对于 SequenceFile,SparkContext.sequenceFile[K,V]方法中的 KV 对应文件中键和值的类型。应该是 Hadoop 的可写接口的子类,比如 IntWritable 和 Text,Spark 支持一些常见的类型,如 sequenceFile[Int, Stirng] 对应 Intwritable 和 Text。
  • 对于其他的 Hadoop InputFormat,可以使用 SparkContext.hadoopRDD 方法,通过传入 JobConf、输入格式类、key 类、value 类。与 Hadoop 相同的方式,sparkContext.newAPIHadoopRDD
  • RDD.saveAsObjectFile 和 SparkContext.objectFile 支持java 序列化的方式保存 RDD。但是没有 Avro 格式好,但是提供最简单的方式保存任何 RDD

RDD算子

RDD 支持两种类型的操作:action 和 transformation。例如 map 是一个 transformation 算子,通过传递一个函数并返回一个新的 RDD 数据集。reduce 算子将使用传入的函数聚合所有 RDD 数据集,并将结果返回给 Driver。

所有的 transformation 算子都是惰性的,也就是说真正的开始执行发生在 action 算子上,这种设计使 Spark 的计算更高效的执行。

默认情况下,可以连续的对 RDD 进行 transformation,也可以使用persist 和 cache 方法 将数据持久化到集群内存中,用来减少 IO,提高计算效率,这是 Spark 的核心特征,也可以持久化到磁盘或跨多个节点复制 RDD。

基础

  1. val lines = sc.textFile("data.txt")
  2. val lineLengths = lines.map(s => s.length)
  3. val totalLength = lineLengths.reduce((a, b) => a + b)
  • 第一行定义了来自外部文件的数据 RDD。这个数据集没有加载到内存中,也没有在其他地方执行:行只是指向文件的指针。
  • 第二行将 lineLengths 定义为映射转换的结果。同样,由于懒惰,lineLengths不是立即计算的。
  • 最后,运行 reduce,这是一个动作。此时,Spark 将计算分解为在不同的机器上运行的任务,每台机器都运行其部分映射和局部约简,只向驱动程序返回其答案。

如果以后还要使用该数据的话,可以持久化:

  1. lineLengths.persist()

在 reduce 之前,会在第一次计算之后保存到内存中。

将函数作为参数传递

Spark 的 API 严重依赖传递给 Driver 的函数,通常有两种传递方式:

  • 匿名函数;
  • 全局单例对象中的静态方法。如:

    object MyFunctions {
    def func1(s: String): String = { … }
    }

    myRdd.map(MyFunctions.func1)

注意:将引用传递给类中的方法时,也需要将类中的对象和方法传进去。如:

  1. class MyClass {
  2. def func1(s: String): String = { ... }
  3. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(func1) }
  4. }
  5. class MyClass {
  6. val field = "Hello"
  7. def doStuff(rdd: RDD[String]): RDD[String] = { rdd.map(x => field + x) }
  8. }

等同于编写rdd.map(x = > this.field + x),它引用了所有这些。为了避免这个问题,最简单的方法是将字段复制到一个局部变量中,而不是从外部访问它:

  1. def doStuff(rdd: RDD[String]): RDD[String] = {
  2. val field_ = this.field
  3. rdd.map(x => field_ + x)
  4. }

闭包

涉及共享变量和方法的有效范围和生命周期。

例子

下面简单的 RDD 元素累加,单机模式和集群模式下是不同的。

  1. var counter = 0
  2. var rdd = sc.parallelize(data)
  3. // Wrong: Don't do this!!
  4. rdd.foreach(x => counter += x)
  5. println("Counter value: " + counter)

Local vs. cluster modes

上述代码的行为是未定义的,可能无法按预期工作。为了执行作业,Spark 将 RDD 操作的处理分解为任务,每个任务由执行程序执行。在执行之前,Spark 计算任务的闭包。闭包是那些执行程序在 RDD 上执行其计算时必须可见的变量和方法(在本例中为foreach())。这个闭包被序列化并发送给每个执行器。

传递给每个执行器的闭包中的变量是副本,因此,当在 foreach 函数中引用 counter 时,它不再是 Dirver 节点上的计数器。在 Driver 节点的内存中仍然有一个计数器,但它对 Executrer 不再可见!Executer 只看到来自序列化闭包的副本。因此,counter 的最终值仍然是零,因为 counter 上的所有操作都引用了序列化闭包中的值。

在本地模式下某些情况,foreach 函数实际上会在与 Driver 相同的 JVM 中执行,并引用相同的原始计数器,并可能实际更新它。

为了确保在这类场景中定义良好的行为,应该使用累加器。Spark 中的累加器专门用于提供一种机制,以便在集群中的工作节点之间执行分割时安全地更新变量。

一般来说,像循环或局部定义方法这样的闭包结构不应该用来改变全局状态。Spark不定义或保证闭包外部引用的对象的突变行为。一些这样做的代码可能在本地模式下工作,但那只是偶然的,而且这样的代码在分布式模式下不会像预期的那样工作。如果需要全局聚合,则使用累加器。

Printing elements of an RDD

另一个常见的习惯用法是尝试使用 RDD.foreach(println) 或 RDD.map(println)打印出 RDD 的元素。在一台机器上,这将生成预期的输出并打印所有的 RDD 元素。但是,在集群模式下,执行器调用的 stdout 输出现在是写入执行器的 stdout,而不是写入驱动程序上的 stdout,所以驱动程序上的 stdout 不会显示这些!要打印驱动程序上的所有元素,可以使用 collect() 方法首先将 RDD 带到驱动程序节点,如下所示:RDD.collect().foreach(println)。这可能会导致 Driver 的内存耗尽。

KV键值对类型的数据在 Spark 是如何使用、Transformation 算子、Action 算子:Spark RDD 算子

Shuffle

shuffle 是 spark 中对数据进行分区和分组的一种机制,通常是跨 Executer 和机器复制数据,使其成为耗资源的操作,带有 shuffle 的算子有:

  1. sortByKey
  2. repartitionAndSortWithinPartitions
  3. partitionBy
  4. coalesce
  5. combineByKey
  6. aggregateByKey
  7. foldByKey
  8. reduceByKey
  9. countApproxDistinctByKey
  10. groupByKey
  11. cogroup
  12. subtractByKey

背景

为了理解在 shuffle 期间会发生什么,我们以 reduceByKey 为例子。reduceByKey 生成一个新的 RDD,其中单个键的所有值都被组合成一个元组(键和对)与该键关联的所有值执行 reduce 函数的结果。挑战在于,单个键的所有值不一定都位于相同的分区,甚至也不一定位于同一台机器上,但它们必须位于同一位置才能计算结果。

在 Spark 中,数据不会存放在指定的位置,计算的时候,任务会分发到数据节点中执行,reduceByKey 中的 reduce 从所有分区中读取所有 key 的值,然后将各个分区的值放在一起,计算每个 key 的最终结果,整个过程称之为 shuffle。

  • mappartition来对每个分区进行排序,例如,使用.sort
  • repartitionAndSortWithinPartitions可以有效地对分区进行排序,同时进行重新分区
  • 创建一个全局有序的RDD

可能导致混乱的算子包括 repartition 算子(如 repartition 和 coalesce)、ByKey 算子(除 counting 外)(如 groupByKey 和 reduceByKey )以及 join 算子(如 cogroup 和 join)。

性能

shuffle 操作会产生磁盘IO、数据序列化、网络IO,所以会影响整个流处理的性能。

在 Spark 内部,来自单个 map 任务的结果会保存在内存中,知道任务结束。然后根据目标分区排序并写入文件。在 reduce 端,读取相关的数据块。

某些 shuffle 算子会消耗大量堆内存,因为它们使用内存中的数据结构来组织传输之前或之后的记录。具体来说,reduceByKey 和 aggregateByKey 在 map 端创建这些结构,而’ ByKey 操作在 reduce 端生成这些结构。当数据不适合内存时,Spark 会将这些表溢出到磁盘,导致磁盘I/O的额外开销和增加的垃圾收集。

Shuffle 还会在磁盘上生成大量的中间文件。从Spark 1.3开始,这些文件将一直保留到不再使用相应的 RDDs 并进行垃圾收集。这样做是为了在重新计算时不需要重新创建 shuffle 文件。如果应用程序保留对这些 rdds 的引用,或者 GC 不经常启动,那么垃圾收集可能只会在很长一段时间之后才会发生。这意味着长时间运行的 Spark 作业可能会消耗大量磁盘空间。SparkContxt 的 spark.local.dir 用来配置指定缓存文件存储的目录。

RDD 持久化

跨机器在内存中持久化 RDD,每个节点保存 RDD 的一部分可计算数据。通过 persist 和 cache 方法来持久化。Spark 的缓存是容错性的,如果数据丢失,spark 会从最初创建他的算子中自动重新计算。此外,持久化是有存储级别的:




































Storage Level Meaning
MEMORY_ONLY 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 不适合内存,那么一些分区将不会被缓存,而是在需要它们时动态地重新计算。这是默认级别。
MEMORY_AND_DISK 将 RDD 作为反序列化的 Java 对象存储在 JVM 中。如果 RDD 不适合内存,那么将不适合的分区存储在磁盘上,并在需要时从那里读取它们。
MEMORY_ONLY_SER
(Java and Scala)
将 RDD 存储为序列化的 Java 对象(每个分区一个字节数组)。这通常比反序列化对象更节省空间,特别是在使用快速序列化器时,但读取时需要更多cpu。
MEMORY_AND_DISK_SER
(Java and Scala)
类似于 MEMORY_ONLY_SER,但是将不适合内存的分区溢出到磁盘,而不是每次需要时动态地重新计算它们。
DISK_ONLY 只在磁盘上存储 RDD 分区。
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上面的级别相同,但是在两个集群节点上复制每个分区。
OFF_HEAP (experimental) 类似 于MEMORY_ONLY_SER,但将数据存储在堆外内存中。这需要启用堆外内存。

注意:在Python中,存储的对象将始终使用Pickle库进行序列化,因此是否选择序列化级别并不重要。Python中可用的存储级别包括MEMORY_ONLY、MEMORY_ONLY_2、MEMORY_AND_DISK、MEMORY_AND_DISK_2、DISK_ONLY和DISK_ONLY_2。

甚至在没有用户调用 persist 的情况下,Spark 也会自动持久化一些 shuffle 算子中的中间数据(例如 reduceByKey)。这样做是为了避免在节点转移期间失败时重新计算整个输入。我们仍然建议用户在计划重用结果 RDD 时调用 persist。

如何选择存储级别

存储级别实际上就是在 cpu 和内存上做权衡。

  • MEMORY_ONLY 是cpu效率最高的选项,允许 rdd 上的算子尽可能快的执行。
  • 使用MEMORY_ONLY_SER并选择一个快速序列化库,使对象更节省空间,且访问速度仍然相当快。(Java和Scala)
  • 如果重新计算的速度比读磁盘的速度快,那么就不要持久化到磁盘。
  • 如果需要快速的故障恢复,请使用复制的存储级别(例如,如果使用Spark为来自web应用程序的请求提供服务)。通过重新计算丢失的数据,所有存储级别都提供了完全的容错能力,但是复制的存储级别允许您在RDD上继续运行任务,而不必等待重新计算丢失的分区。

Spark 自动监视每个节点上的缓存使用情况,并以最近最少使用(LRU)的方式删除旧的数据分区。 也可以使用RDD.unpersist()方法直接删除。

共享变量

顾名思义,就是在不同节点上操作同一个变量。Spark 提供了以下两种方式。

Broadcast

广播是在每台机器上缓存一个只读变量,不是将其副本与任务一起发送。

只有当跨多个阶段的任务需要相同的数据,或者以反序列化的形式缓存数据很重要时,显式地创建广播变量才有用。

Broadcast 变量是通过调用 SparkContext.broadcast(v) 从变量 v 创建的。broadcast 变量是 v 的包装器,它的值可以通过调用value 方法来访问。代码如下所示:

  1. scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
  2. broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
  3. scala> broadcastVar.value
  4. res0: Array[Int] = Array(1, 2, 3)

对象被执行广播操作之后不应该再修改,以确保所有节点得到广播变量的相同值。

Accumulators

可以通过调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator()来分别累积Long或Double类型的值来创建数值累加器。然后,可以使用add方法将运行在集群上的任务添加到集群中。然而,他们无法读取它的值。只有 Driver 可以读取累加器的值,使用 value 方法。

下面的代码显示了一个累加器,用于将数组中的元素相加:

  1. scala> val accum = sc.longAccumulator("My Accumulator")
  2. accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 0, name: Some(My Accumulator), value: 0)
  3. scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum.add(x))
  4. scala> accum.value
  5. res2: Long = 10

也可以继承 AccumulatorV2 实现自定义计数器,需要重写以下方法:reset(将累加器重置为零)、add(将另一个值添加到累加器中)、merge(将另一个相同类型的累加器合并到这个累加器中)。

  1. class VectorAccumulatorV2 extends AccumulatorV2[MyVector, MyVector] {
  2. private val myVector: MyVector = MyVector.createZeroVector
  3. def reset(): Unit = {
  4. myVector.reset()
  5. }
  6. def add(v: MyVector): Unit = {
  7. myVector.add(v)
  8. }
  9. ...
  10. }
  11. // Then, create an Accumulator of this type:
  12. val myVectorAcc = new VectorAccumulatorV2
  13. // Then, register it into spark context:
  14. sc.register(myVectorAcc, "MyVectorAcc1")

自定义累加器结果类型可能与添加的元素的类型不同。

累加器的更新只存在于 action 算子,Spark 保证每个任务对累加器的更新只应用一次,即重启任务不会再次更新。但是在 Transformation 算子中重启任务每个任务的更新可能会是多次。

累加器的更新只在 rdd 的 action 算子中进行。因此不能保证在 map() 这样的延迟算子中执行累加器的更新。如下代码:

  1. val accum = sc.longAccumulator
  2. data.map { x => accum.add(x); x }
  3. // 至此,累加器的值还是0,因为没有action 算子开始执行计算

部署到集群

Spark Submitting Applications Guide

java和scala的启动方式

启动Spark应用程序的库。

这个库允许应用程序以编程方式启动Spark。这个库只有一个入口点——SparkLauncher类。可用于启动Spark并提供一个助手来监视和控制运行中的应用程序:

  1. import org.apache.spark.launcher.SparkAppHandle;
  2. import org.apache.spark.launcher.SparkLauncher;
  3. public class MyLauncher {
  4. public static void main(String[] args) throws Exception {
  5. SparkAppHandle handle = new SparkLauncher()
  6. .setAppResource("/my/app.jar")
  7. .setMainClass("my.spark.app.Main")
  8. .setMaster("local")
  9. .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
  10. .startApplication();
  11. // Use handle API to monitor / control application.
  12. }
  13. }

也可以使用SparkLauncher.launch()方法来启动原始的子进程:

  1. import org.apache.spark.launcher.SparkLauncher;
  2. public class MyLauncher {
  3. public static void main(String[] args) throws Exception {
  4. Process spark = new SparkLauncher()
  5. .setAppResource("/my/app.jar")
  6. .setMainClass("my.spark.app.Main")
  7. .setMaster("local")
  8. .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
  9. .launch();
  10. spark.waitFor();
  11. }
  12. }

此方法要求调用代码手动管理子进程,包括其输出流(以避免可能的死锁)。建议使用 SparkLauncher.startApplication( org.apache.spark.launcher.SparkAppHandle.Listener…)

原文地址:http://spark.apache.org/docs/2.1.1/api/java/index.html?org/apache/spark/launcher/package-summary.html

发表评论

表情:
评论列表 (有 0 条评论,138人围观)

还没有评论,来说两句吧...

相关阅读