Spark--sparkStreaming 左手的ㄟ右手 2022-05-17 07:10 259阅读 0赞 # # ## 概述 ## Spark Streaming是核心Spark API的扩展,可实现可扩展、高吞吐量、可容错的实时数据流处理。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等众多来源获取,并且可以使用由高级函数(如map,reduce,join和window)开发的复杂算法进行流数据处理。最后,处理后的数据可以被推送到文件系统,数据库和实时仪表板。而且,您还可以在数据流上应用Spark提供的机器学习和图处理算法。 ![70][] 在内部,它的工作原理如下。Spark Streaming接收实时输入数据流,并将数据切分成批,然后由Spark引擎对其进行处理,最后生成“批”形式的结果流。![70 1][] Spark Streaming将连续的数据流抽象为*discretizedstream*或*DStream*。 可以从诸如Kafka,Flume和Kinesis等来源的输入数据流中创建DStream,或者通过对其他DStream应用高级操作来创建。在内部,DStream 由一个RDD序列表示。 本指南介绍如何开始利用DStreams编写Spark Streaming程序。您可以在Scala,Java或Python中编写SparkStreaming程序(在Spark 1.2中引入),所有这些都在本指南中介绍。 您可以在本指南中找到标签,让您可以选择不同语言的代码段(译者注:本文内容仅翻译了Java部分,如果想学习其他语言接口,请参阅官网)。 ## A Quick Example ## 在我们详细介绍如何编写自己的Spark Streaming程序之前,让我们快速了解一下简单的Spark Streaming程序是什么样的。 假设我们想要计算从TCP套接字上侦听的数据服务器接收的文本数据中的字数。 您需要做的就是如下: 首先,我们将Spark Streaming相关的类和StreamingContext的一些隐式转换导入到我们的环境中,以便为我们需要的其他类(如DStream)添加有用的方法。**StreamingContext****是所有流功能的主要入口点**。我们创建一个带有两个执行线程(译者注:如果要执行本例,必须确保机器cpu核心大于2)的本地StreamingContext,并且设置流数据每批的间隔为1秒。 import org.apache.spark.SparkConf; import org.apache.spark.streaming.StreamingContext; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import java.util.Arrays; // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); 使用此context,我们可以创建一个DStream,它表示来自特定主机名(例如localhost)和端口(例如9999)TCP源的流数据。 // Create a DStream that will connect to hostname:port, like localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); 在这行代码中,DStream表示从数据服务器接收的数据流。此DStream中的每个记录都是一行文本。接下来,我们要将每行文本以空格符为分隔符切分成一个个单词。 // Split each line into words JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator()); flatMap是一个一对多的DStream操作,该操作通过从源DStream中的每个记录生成多个新记录来创建新的DStream。在这种情况下,每一行将被分割成多个单词,并将单词流表示为单词DStream。接下来,我们对这些单词进行计数。 // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1)); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); 用PairFunction对象将单词DStream进一步映射(一对一变换)到(word,1) 键值对的DStream,然后用function2进行聚合以获得每批数据中的单词的频率。最后,wordCounts.print()将打印每秒产生的计数结果中的若干条记录。 请注意,当执行这些代码时,Spark Streaming仅是设置了预计算流程,目前为止这些计算还没有真正的开始执行。在设置好所有计算操作后,要开始真正的执行过程,我们最终需要调用如下方法: jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate ## Basic Concepts ## 要从Spark Streaming核心API中不存在的Kafka,Flume和Kinesis等源中提取数据,您必须将相应的工件spark-streaming-xyz\_2.11添加到依赖项中。 ### Initializing StreamingContext ### 要初始化Spark Streaming程序,必须创建一个StreamingContext对象,它是所有Spark Streaming功能的主要入口点。 SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000)); appName参数是应用程序在集群UI上显示的名称。 master是Spark,Mesos或YARN群集URL,或者是在本地模式下运行的特殊“local \[\*\]”字符串。 实际上,当在群集上运行时,您不希望在程序中对master进行硬编码,而是使用spark-submit启动应用程序并在那里接收它。 但是,对于本地测试和单元测试,您可以传递“local \[\*\]”以在进程中运行Spark Streaming。 请注意,这会在内部创建一个JavaSparkContext(所有Spark功能的起点),可以作为ssc.sparkContext访问。 必须根据应用程序的延迟要求和可用的群集资源设置批处理间隔。 有关更多详细信息,请参见性能调整部分。 还可以从现有JavaSparkContext创建JavaStreamingContext对象。 JavaSparkContext sc = ... //existing JavaSparkContext JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); 定义上下文后,您必须执行以下操作。 1. 通过创建输入DStreams来定义输入源。 2. 通过将转换和输出操作应用于DStream来定义流式计算。 3. 开始接收数据并使用streamingContext.start()处理它。 4. 等待使用streamingContext.awaitTermination()停止处理(手动或由于任何错误)。 5. 可以使用streamingContext.stop()手动停止处理。 要记住的要点: 1. 一旦启动了上下文,就不能设置或添加新的流式计算。 2. 上下文停止后,无法重新启动。 3. 在JVM中只能同时激活一个StreamingContext。 4. StreamingContext上的stop()也会停止SparkContext。要仅停止StreamingContext,请将名为stopSparkContext的stop()的可选参数设置为false。 5. 只要在创建下一个StreamingContext之前停止前一个StreamingContext(不停止SparkContext),就可以重复使用SparkContext来创建多个StreamingContexts。 ### Discretized Streams (DStreams) ### Discretized Stream或DStream是Spark Streaming提供的基本抽象。 它表示连续的数据流,可以是从源接收的输入数据流,也可以是通过转换输入流生成的已处理数据流。 在内部,DStream由一系列连续的RDD表示,这是Spark对不可变分布式数据集的抽象(有关更多详细信息,请参阅Spark编程指南)。 DStream中的每个RDD都包含来自特定时间间隔的数据,如下图所示。 ![70 2][] 应用于DStream的任何操作都转换为底层RDD上的操作。 例如,在先前将行流转换为单词的示例中,flatMap操作应用于行DStream中的每个RDD以生成单词DStream的RDD。 如下图所示。 ![70 3][] 这些底层RDD转换由Spark引擎计算。 DStream操作隐藏了大部分细节,并为开发人员提供了更高级别的API以方便使用。 这些操作将在后面的章节中详细讨论。 ### Input DStreams and Receivers ### 输入DStream是表示从流源接收的输入数据流的DStream。在快速示例中,lines是输入DStream,因为它表示从netcat服务器接收的数据流。每个输入DStream(文件流除外,本节稍后讨论)都与Receiver(Scala doc,Java doc)对象相关联,该对象从源接收数据并将其存储在Spark的内存中进行处理。 Spark Streaming提供**两类内置流媒体源**。 1. 基本来源:StreamingContext API中直接提供的源。示例:文件系统和套接字连接。 2. 高级资源:Kafka,Flume,Kinesis等资源可通过额外的实用程序类获得。这些需要链接额外的依赖关系,如链接部分所述。 我们将在本节后面讨论每个类别中的一些来源。 请注意,如果要在流应用程序中并行接收多个数据流,可以创建多个输入DStream(在“性能调整”部分中进一步讨论)。这将创建多个接收器,这些接收器将同时接收多个数据流。但请注意,Spark worker / executor是一个长期运行的任务,因此它占用了分配给Spark Streaming应用程序的其中一个核心。因此,重要的是要记住,Spark Streaming应用程序需要分配足够的内核(或线程,如果在本地运行)来处理接收的数据,以及运行接收器。 要记住的要点 * 在本地运行Spark Streaming程序时,请勿使用“local”或“local \[1\]”作为主URL。 这两种方法都意味着只有一个线程将用于本地运行任务。 如果您正在使用基于接收器的输入DStream(例如套接字,Kafka,Flume等),那么将使用单个线程来运行接收器,而不留下用于处理接收数据的线程。 因此,在本地运行时,始终使用“local \[n\]”作为主URL,其中n>要运行的接收器数量(有关如何设置主服务器的信息,请参阅Spark属性)。 * 将逻辑扩展到在集群上运行,分配给Spark Streaming应用程序的核心数必须大于接收器数。 否则系统将接收数据,但无法处理数据。 Basic Sources 我们已经在快速示例中查看了ssc.socketTextStream(...),它通过TCP套接字连接从文本数据创建DStream。 除了套接字之外,StreamingContext API还提供了从文件创建DStream作为输入源的方法。 File Streams 对于从与HDFS API兼容的任何文件系统(即HDFS,S3,NFS等)上的文件读取数据,可以通过StreamingContext.fileStream \[KeyClass,ValueClass,InputFormatClass\]创建DStream。 文件流不需要运行接收器,因此不需要分配任何内核来接收文件数据。 对于简单的文本文件,最简单的方法是StreamingContext.textFileStream(dataDirectory)。 streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory); streamingContext.textFileStream(dataDirectory); 如何监控目录 1. Spark Streaming将监视目录dataDirectory并处理在该目录中创建的所有文件。 2. 可以监视一个简单的目录,例如“hdfs:// namenode:8040 / logs /”。直接在这种路径下的所有文件将在发现时进行处理。 3. 可以提供POSIX glob模式,例如“hdfs:// namenode:8040 / logs / 2017 / \*”。这里,DStream将包含与模式匹配的目录中的所有文件。那就是:它是目录的模式,而不是目录中的文件。 4. 所有文件必须采用相同的数据格式。 5. 根据文件的修改时间而不是创建时间,文件被视为时间段的一部分。 6. 处理完毕后,对当前窗口中文件的更改不会导致重新读取文件。即:忽略更新。 7. 目录下的文件越多,扫描更改所需的时间就越长 - 即使没有修改过任何文件。 8. 如果使用通配符来标识目录,例如“hdfs:// namenode:8040 / logs / 2016- \*”,则重命名整个目录以匹配路径将把目录添加到受监视目录列表中。只有修改时间在当前窗口内的目录中的文件才会包含在流中。 9. 调用FileSystem.setTimes()来修复时间戳是一种在稍后的窗口中拾取文件的方法,即使其内容未更改。 使用对象存储作为数据源 “完整”文件系统(如HDFS)会在创建输出流后立即在其文件上设置修改时间。打开文件时,即使在完全写入数据之前,它也可能包含在DStream中 - 之后将忽略对同一窗口中文件的更新。即:可能会遗漏更改,并从流中省略数据。 要保证在窗口中选择更改,请将文件写入不受监视的目录,然后在关闭输出流后立即将其重命名为目标目录。如果重命名的文件在其创建窗口期间出现在扫描的目标目录中,则将拾取新数据。 相比之下,Amazon S3和Azure Storage等对象存储通常具有较慢的重命名操作,因为实际上是复制了数据。此外,重命名的对象可能具有rename()操作的时间作为其修改时间,因此可能不被视为原始创建时间所暗示的窗口的一部分。 需要对目标对象存储进行仔细测试,以验证存储的时间戳行为是否与Spark Streaming所期望的一致。可能是直接写入目标目录是通过所选对象库流式传输数据的适当策略。 有关此主题的更多详细信息,请参阅Hadoop文件系统规范。 基于自定义接收器的流 可以使用通过自定义接收器接收的数据流创建DStream。 有关详细信息,请参阅自定义接收器指南。 RDD作为流的队列 为了使用测试数据测试Spark Streaming应用程序,还可以使用streamingContext.queueStream(queueOfRDDs)基于RDD队列创建DStream。 推入队列的每个RDD将被视为DStream中的一批数据,并像流一样处理。 有关来自套接字和文件的流的更多详细信息,请参阅StrealaContext for Scala,JavaStreamingContext for Java和StreamingContext for Python中相关函数的API文档。 Advanced Sources 此类源需要与外部非Spark库连接,其中一些具有复杂的依赖性(例如,Kafka和Flume)。因此,为了最大限度地减少与依赖项版本冲突相关的问题,从这些源创建DStream的功能已移至可在必要时显式链接的单独库。 请注意,Spark shell中不提供这些高级源,因此无法在shell中测试基于这些高级源的应用程序。如果您真的想在Spark shell中使用它们,则必须下载相应的Maven工件JAR及其依赖项,并将其添加到类路径中。 其中一些高级资源如下。 Kafka:Spark Streaming 2.3.1与Kafka经纪人版本0.8.2.1或更高版本兼容。有关更多详细信息,请参阅Kafka集成指南。 Flume:Spark Streaming 2.3.1与Flume 1.6.0兼容。有关详细信息,请参阅Flume集成指南。 Kinesis:Spark Streaming 2.3.1与Kinesis Client Library 1.2.1兼容。有关详细信息,请参阅Kinesis集成指南。 Custom Sources 输入DStream也可以从自定义数据源创建。 您所要做的就是实现一个用户定义的接收器(参见下一节以了解它是什么),它可以从自定义源接收数据并将其推送到Spark。 有关详细信息,请参阅自定义接收器指南 Receiver Reliability 根据其可靠性,可以有两种数据源。 来源(如Kafka和Flume)允许传输数据得到确认。 如果从这些可靠来源接收数据的系统正确地确认接收到的数据,则可以确保不会因任何类型的故障而丢失数据。 这导致两种接收器: 1. 可靠的接收器 - 可靠的接收器在接收到数据并通过复制存储在Spark中时正确地向可靠的源发送确认。 2. 不可靠的接收器 - 不可靠的接收器不会向源发送确认。 这可以用于不支持确认的源,甚至可以用于不需要或需要进入确认复杂性的可靠源。 “自定义接收器指南”中讨论了如何编写可靠接收器的详细信息。 ### Transformations on Dstreams ### 与RDD类似,转换允许修改来自输入DStream的数据。 DStreams支持普通Spark RDD上可用的许多转换。 一些常见的如下。 **map**(func)通过函数func传递源DStream的每个元素来返回一个新的DStream。 **flatMap**(func)与map类似,但每个输入项可以映射到0个或更多输出项。 **filter**(func)通过仅选择func返回true的源DStream的记录来返回新的DStream。 **repartition****(numPartitions****)**通过创建更多或更少的分区来更改此DStream中的并行度级别。 **union****(otherStream****)**返回一个新的DStream,它包含源DStream和otherDStream中元素的并集。 **count****()**通过计算源DStream的每个RDD中的元素数量,返回单元素RDD的新DStream。 **reduce****(func****)**通过使用函数func(它接受两个参数并返回一个)聚合源DStream的每个RDD中的元素,返回单元素RDD的新DStream。该函数应该是关联的和可交换的,以便可以并行计算。 **countByValue****()**当在类型为K的元素的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在源DStream的每个RDD中的频率。 **reduceByKey****(func****,\[numTasks\]****)**在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数聚合每个键的值。注意:默认情况下,这使用Spark的默认并行任务数(2表示本地模式,在集群模式下,数字由配置属性spark.default.parallelism确定)进行分组。您可以传递可选的numTasks参数来设置不同数量的任务。 **join****(otherStream****,\[numTasks\]****)**当调用(K,V)和(K,W)对的两个DStream时,返回一个新的DStream(K,(V,W))对,每个键的所有元素对。 **cogroup****(otherStream****,\[numTasks\]****)**在(K,V)和(K,W)对的DStream上调用时,返回(K,Seq \[V\],Seq \[W\])元组的新DStream。 **transform**(func)通过将RDD-to-RDD函数应用于源DStream的每个RDD来返回新的DStream。这可以用于在DStream上执行任意RDD操作。 **updateStateByKey**(func)返回一个新的“状态”DStream,其中通过在键的先前状态和键的新值上应用给定函数来更新每个键的状态。这可用于维护每个密钥的任意状态数据。 UpdateStateByKey Operation updateStateByKey操作允许您在使用新信息不断更新时保持任意状态。 要使用它,您必须执行两个步骤。 定义状态 - 状态可以是任意数据类型。 定义状态更新功能 - 使用函数指定如何使用先前状态和输入流中的新值更新状态。 在每个批处理中,Spark都会对所有现有密钥应用状态更新功能,无论它们是否在批处理中都有新数据。 如果更新函数返回None,则将删除键值对。 让我们举一个例子来说明这一点。 假设您要维护文本数据流中看到的每个单词的运行计数。 这里,运行计数是状态,它是一个整数。 我们将更新函数定义为: Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = (values, state) -> { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); }; 这适用于包含单词的DStream(例如,快速示例中包含(word,1)对的DStream对)。 JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); 将为每个单词调用更新函数,newValues的序列为1(来自(word,1)对),runningCount具有前一个计数。 有关完整的Java代码,请查看示例JavaStatefulNetworkWordCount.java。 请注意,使用updateStateByKey需要配置检查点目录,这将在检查点部分中详细讨论。 Transform Operation 变换操作(以及像transformWith这样的变体)允许在DStream上应用任意RDD到RDD功能。 它可用于应用未在DStream API中公开的任何RDD操作。 例如,将数据流中的每个批次与另一个数据集连接的功能不会直接在DStream API中公开。 但是,您可以轻松地使用transform来执行此操作。 这使得非常强大的可能性。 例如,可以通过将输入数据流与预先计算的垃圾邮件信息(也可以使用Spark生成)连接,然后根据它进行过滤来进行实时数据清理。 // RDD containing spam information JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...); JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> { rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning ... }); 请注意,在每个批处理间隔中都会调用提供的函数。 这允许您进行时变RDD操作,即RDD操作,分区数,广播变量等可以在批次之间进行更改。 Window Operations Spark Streaming还提供窗口计算,允许您在滑动数据窗口上应用转换。 下图说明了此滑动窗口。 如图所示,每当窗口在源DStream上滑动时,落入窗口内的源RDD被组合并操作以产生窗口化DStream的RDD。在这种特定情况下,操作应用于最后3个时间单位的数据,并按2个时间单位滑动。这表明任何窗口操作都需要指定两个参数。 1. 窗口长度 - 窗口的持续时间(图中的3)。 2. 滑动间隔 - 执行窗口操作的间隔(图中的2)。 这两个参数必须是源DStream的批处理间隔的倍数(图中的1)。 让我们举一个例子来说明窗口操作。比如说,您希望通过每隔10秒在最后30秒的数据中生成字数来扩展前面的示例。为此,我们必须在最后30秒的数据上对(word,1)对的DStream对应用reduceByKey操作。这是使用**reduceByKeyAndWindow**操作完成的。 ***JavaPairDStream***<***String***, ***Integer***> **windowedWordCounts** = pairs.reduceByKeyAndWindow((*i1*, *i2*) \-> *i1* \+ *i2*, ***Durations***.seconds(30), ***Durations***.seconds(10)); 一些常见的窗口操作如下。 所有这些操作都采用上述两个参数 - **windowLength**和**slideInterval**。 1. **window****(windowLength****,slideInterval****)**返回一个新的DStream,它是根据源DStream的窗口批次计算的。 2. **countByWindow****(windowLength****,slideInterval****)**返回流中元素的滑动窗口数。 3. **reduceByWindow****(func****,windowLength****,slideInterval****)**返回一个新的单元素流,它是通过使用func在滑动间隔内聚合流中的元素而创建的。该函数应该是关联的和可交换的,以便可以并行正确计算。 4. **reduceByKeyAndWindow****(func****,windowLength****,slideInterval****,\[numTasks\]****)**在(K,V)对的DStream上调用时,返回(K,V)对的新DStream,其中使用给定的reduce函数func聚合每个键的值在滑动窗口中批量生产。注意:默认情况下,这使用Spark的默认并行任务数(2表示本地模式,在集群模式下,数字由配置属性spark.default.parallelism确定)进行分组。您可以传递可选的numTasks参数来设置不同数量的任务。 5. **reduceByKeyAndWindow****(func****,invFunc****,windowLength****,slideInterval****,\[numTasks\]****)** 上述reduceByKeyAndWindow()的更高效版本,其中每个窗口的reduce值使用前一个窗口的reduce值逐步计算。这是通过减少进入滑动窗口的新数据,并“反向减少”离开窗口的旧数据来完成的。一个例子是当窗口滑动时“添加”和“减去”键的计数。但是,它仅适用于“可逆减少函数”,即那些具有相应“反向减少”函数的减函数(作为参数invFunc)。与reduceByKeyAndWindow类似,reduce任务的数量可通过可选参数进行配置。请注意,必须启用检查点才能使用此操作。 1. **countByValueAndWindow****(windowLength****,slideInterval****,\[numTasks\]****)**当在(K,V)对的DStream上调用时,返回(K,Long)对的新DStream,其中每个键的值是其在滑动窗口内的频率。与reduceByKeyAndWindow类似,reduce任务的数量可通过可选参数进行配置。 Join Operations Stream-stream joins ***JavaPairDStream***<***String***, ***String***> **stream1** = ... ***JavaPairDStream***<***String***, ***String***> **stream2** = ... ***JavaPairDStream***<***String***, ***Tuple2***<***String***, ***String***>> **joinedStream** = **stream1**.join(**stream2**); 这里,在每个批处理间隔中,stream1生成的RDD将与stream2生成的RDD连接。 你也可以做leftOuterJoin,rightOuterJoin,fullOuterJoin。 此外,在流的窗口上进行连接通常非常有用。 这也很容易。 ***JavaPairDStream***<***String***, ***String***> **windowedStream1** = **stream1**.window(***Durations***.seconds(20)); ***JavaPairDStream***<***String***, ***String***> **windowedStream2** = **stream2**.window(***Durations***.minutes(1)); ***JavaPairDStream***<***String***, ***Tuple2***<***String***, ***String***>> **joinedStream** = **windowedStream1**.join(**windowedStream2**); Stream-dataset joins ***JavaPairRDD***<***String***, ***String***> **dataset** = ... ***JavaPairDStream***<***String***, ***String***> **windowedStream** = stream.window(***Durations***.seconds(20)); ***JavaPairDStream***<***String***, ***String***> **joinedStream** = **windowedStream**.transform(*rdd* \-> *rdd*.join(**dataset**)); 实际上,您还可以动态更改要加入的数据集。 每个批处理间隔都会评估为转换提供的函数,因此将使用数据集引用所指向的当前数据集。 API文档中提供了完整的DStream转换列表。 对于Scala API,请参阅DStream和PairDStreamFunctions。 对于Java API,请参阅JavaDStream和JavaPairDStream。 对于Python API,请参阅DStream。 ### Output Operations on Dstreams ### 输出操作允许将DStream的数据推送到外部系统,如数据库或文件系统。 由于输出操作实际上允许外部系统使用转换后的数据,因此它们会触发所有DStream转换的实际执行(类似于RDD的操作)。 目前,定义了以下输出操作: **print****()**在运行流应用程序的驱动程序节点上打印DStream中每批数据的前十个元素。这对开发和调试很有用。 Python API这在Python API中称为pprint()。 **saveAsTextFiles****(prefix****,\[suffix\]****)**将此DStream的内容保存为文本文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME\_IN\_MS \[.suffix\]”。 **saveAsObjectFiles****(prefix****,\[suffix\]****)**将此DStream的内容保存为序列化Java对象的SequenceFiles。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME\_IN\_MS \[.suffix\]”。 Python API这在Python API中不可用。 **saveAsHadoopFiles****(prefix****,\[suffix\]****)**将此DStream的内容保存为Hadoop文件。每个批处理间隔的文件名基于前缀和后缀生成:“prefix-TIME\_IN\_MS \[.suffix\]”。 Python API这在Python API中不可用。 **foreachRDD****(func****)**最通用的输出运算符,它将函数func应用于从流生成的每个RDD。此函数应将每个RDD中的数据推送到外部系统,例如将RDD保存到文件,或通过网络将其写入数据库。请注意,函数func在运行流应用程序的驱动程序进程中执行,并且通常会在其中执行RDD操作,这将强制计算流式RDD。 Design Patterns for using foreachRDD dstream.foreachRDD是一个功能强大的原语,允许将数据发送到外部系统。 但是,了解如何正确有效地使用此原语非常重要。 一些常见的错误要避免如下。 通常将数据写入外部系统需要创建连接对象(例如,到远程服务器的TCP连接)并使用它将数据发送到远程系统。 为此,开发人员可能无意中尝试在Spark驱动程序中创建连接对象,然后尝试在Spark工作程序中使用它来保存RDD中的记录。 例如(在Scala中), dstream.foreachRDD(rdd -> \{ Connection connection = createNewConnection(); // executed at the driver rdd.foreach(*record* \-> \{ connection.send(*record*); // executed at the worker \}); \}); 这是不正确的,因为这需要连接对象被序列化并从驱动程序发送到worker。 这种连接对象很少可以跨机器转移。 此错误可能表现为序列化错误(连接对象不可序列化),初始化错误(需要在worker处初始化连接对象)等。正确的解决方案是在worker处创建连接对象。 但是,这可能会导致另一个常见错误 - 为每条记录创建一个新连接。 例如, dstream.foreachRDD(rdd -> \{ rdd.foreach(*record* \-> \{ Connection connection = createNewConnection(); connection.send(*record*); connection.close(); \}); \}); 通常,创建连接对象会产生时间和资源开销。 因此,为每个记录创建和销毁连接对象可能会产生不必要的高开销,并且可能显着降低系统的总吞吐量。 更好的解决方案是使用rdd.foreachPartition - 创建单个连接对象并使用该连接发送RDD分区中的所有记录。 dstream.foreachRDD(rdd -> \{ rdd.foreachPartition(*partitionOfRecords* \-> \{ Connection connection = createNewConnection(); while (*partitionOfRecords*.hasNext()) \{ connection.send(*partitionOfRecords*.next()); \} connection.close(); \}); \}); 这会在许多记录上分摊连接创建开销。 最后,通过跨多个RDD /批处理重用连接对象,可以进一步优化这一点。 由于多个批次的RDD被推送到外部系统,因此可以维护连接对象的静态池,而不是可以重用的连接对象,从而进一步减少了开销。 dstream.foreachRDD(rdd -> \{ rdd.foreachPartition(*partitionOfRecords* \-> \{ // ConnectionPool is a static, lazily initialized pool of connections Connection connection = ***ConnectionPool***.getConnection(); while (*partitionOfRecords*.hasNext()) \{ connection.send(*partitionOfRecords*.next()); \} ***ConnectionPool***.returnConnection(connection); // return to the pool for future reuse \}); \}); 请注意,池中的连接应根据需要延迟创建,如果暂时不使用,则会超时。 这实现了最有效的数据发送到外部系统。 其他要记住的要点: 1. DStreams由输出操作延迟执行,就像RDD由RDD操作延迟执行一样。 具体而言,DStream输出操作中的RDD操作会强制处理接收到的数据。 因此,如果您的应用程序没有任何输出操作,或者具有dstream.foreachRDD()之类的输出操作而其中没有任何RDD操作,则不会执行任何操作。 系统将简单地接收数据并将其丢弃。 1. 默认情况下,输出操作一次执行一次。 它们按照应用程序中定义的顺序执行。 ### DataFrame and SQL Operations ### 您可以轻松地对流数据使用DataFrames和SQL操作。 您必须使用StreamingContext正在使用的SparkContext创建SparkSession。 此外,必须这样做以便可以在驱动器故障时重新启动。 这是通过创建一个延迟实例化的SparkSession单例实例来完成的。 这在以下示例中显示。 它修改了早期的单词计数示例,以使用DataFrames和SQL生成单词计数。 每个RDD都转换为DataFrame,注册为临时表,然后使用SQL进行查询。 public final class JavaSqlNetworkWordCount \{ private static final Pattern SPACE = Pattern.compile(" "); public static void main(String\[\] args) throws Exception \{ if (args.length < 2) \{ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>"); System.exit(1); \} StreamingExamples.setStreamingLogLevels(); // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaSqlNetworkWordCount"); JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); // Create a JavaReceiverInputDStream on target ip:port and count the // words in input stream of \\n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. JavaReceiverInputDStream<String> lines = ssc.socketTextStream( args\[0\], Integer.parseInt(args\[1\]), StorageLevels.MEMORY\_AND\_DISK\_SER); JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x)).iterator()); // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD((rdd, time) \-> \{ SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf()); // Convert JavaRDD\[String\] to JavaRDD\[bean class\] to DataFrame JavaRDD<JavaRecord> rowRDD = rdd.map(word -> \{ JavaRecord record = new JavaRecord(); record.setWord(word); return record; \}); Dataset<Row> wordsDataFrame = spark.createDataFrame(rowRDD, JavaRecord.class); // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words"); // Do word count on table using SQL and print it Dataset<Row> wordCountsDataFrame = spark.sql("select word, count(\*) as total from words group by word"); System.out.println("========= " \+ time \+ "========="); wordCountsDataFrame.show(); \}); ssc.start(); ssc.awaitTermination(); \} \} /\*\* Lazily instantiated singleton instance of SparkSession \*/ class JavaSparkSessionSingleton \{ private static transient SparkSession instance = null; public static SparkSession getInstance(SparkConf sparkConf) \{ if (instance == null) \{ instance = SparkSession .builder() .config(sparkConf) .getOrCreate(); \} return instance; \} \} 您还可以对从不同线程(即,与正在运行的StreamingContext异步)的流数据上定义的表运行SQL查询。 只需确保将StreamingContext设置为记住足够数量的流数据,以便查询可以运行。 否则,不知道任何异步SQL查询的StreamingContext将在查询完成之前删除旧的流数据。 例如,如果要查询最后一批,但查询可能需要5分钟才能运行,则调用streamingContext.remember(Minutes(5))(在Scala中,或在其他语言中等效)。 有关DataFrame的详细信息,请参阅DataFrames and SQL指南。 ### MLlib Operations ### 您还可以轻松使用MLlib提供的机器学习算法。 首先,存在流机器学习算法(例如,流线性回归,流KMeans等),其可以同时从流数据中学习以及将模型应用于流数据。 除此之外,对于更大类的机器学习算法,您可以离线学习学习模型(即使用历史数据),然后在线将数据应用于流数据。 有关详细信息,请参阅MLlib指南。 ### Caching / Persistence ### 与RDD类似,DStreams还允许开发人员将流的数据保存在内存中。也就是说,在DStream上使用persist()方法会自动将该DStream的每个RDD保存在内存中。如果DStream中的数据将被多次计算(例如,对相同数据的多个操作),则这是有用的。对于像reduceByWindow和reduceByKeyAndWindow这样的基于窗口的操作以及像updateStateByKey这样的基于状态的操作,这是隐含的。因此,基于窗口的操作生成的DStream会自动保留在内存中,而开发人员不会调用persist()。 对于通过网络接收数据的输入流(例如,Kafka,Flume,套接字等),默认持久性级别设置为将数据复制到两个节点以实现容错。 请注意,与RDD不同,DStreams的默认持久性级别使数据在内存中保持序列化。 “Performance Tuning”部分对此进行了进一步讨论。有关不同持久性级别的更多信息,请参阅“Spark programming Guide”。 ### Checkpointing ### 流应用程序必须全天候运行,因此必须能够适应与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。为了实现这一点,Spark Streaming需要将足够的信息检查到容错存储系统,以便它可以从故障中恢复。检查点有两种类型的数据。 1. 元数据检查点 - 将定义流式计算的信息保存到容错存储(如HDFS)。这用于从运行流应用程序的驱动程序的节点的故障中恢复(稍后详细讨论)。元数据包括: * 配置 - 用于创建流应用程序的配置。 * DStream操作 - 定义流应用程序的DStream操作集。 * 不完整的批次 - 其工作排队但尚未完成的批次。 1. 数据检查点 - 将生成的RDD保存到可靠的存储。在一些跨多个批次组合数据的有状态转换中,这是必需的。在这种转换中,生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间增加。为了避免恢复时间的这种无限增加(与依赖链成比例),有状态变换的中间RDD周期性地检查点到可靠存储(例如HDFS)以切断依赖链。 总而言之,元数据检查点主要用于从驱动程序故障中恢复,而如果使用状态转换,即使对于基本功能也需要数据或RDD检查点。 When to enable Checkpointing 必须为具有以下任何要求的应用程序启用检查点: 1. 有状态转换的用法 - 如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具有反函数),则必须提供检查点目录以允许定期RDD检查点。 2. 从运行应用程序的驱动程序的故障中恢复 - 元数据检查点用于使用进度信息进行恢复。 请注意,可以在不启用检查点的情况下运行没有上述有状态转换的简单流应用程序。 在这种情况下,驱动程序故障的恢复也将是部分的(某些已接收但未处理的数据可能会丢失)。 这通常是可以接受的,并且许多以这种方式运行Spark Streaming应用程序。 预计对非Hadoop环境的支持将在未来得到改善。 How to configure Checkpointing 可以通过在容错,可靠的文件系统(例如,HDFS,S3等)中设置目录来启用检查点,其中将保存检查点信息。 这是通过使用**streamingContext.checkpoint****(checkpointDirectory****)**完成的。 这将允许您使用上述有状态转换。 此外,如果要使应用程序从驱动程序故障中恢复,则应重写流应用程序以使其具有以下行为。 当程序第一次启动时,它将创建一个新的StreamingContext,设置所有流然后调用start()。 在失败后重新启动程序时,它将从检查点目录中的检查点数据重新创建StreamingContext。 //This behavior is made simple by using JavaStreamingContext.getOrCreate. This is used as follows. // Create a factory object that can create and setup a new JavaStreamingContext JavaStreamingContextFactory **contextFactory** = new JavaStreamingContextFactory() \{ @Override public ***JavaStreamingContext create***() \{ ***JavaStreamingContext*** jssc = new JavaStreamingContext(...); // new context ***JavaDStream***<***String***> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; \} \}; // Get JavaStreamingContext from checkpoint data or create a new one ***JavaStreamingContext*** **context** = ***JavaStreamingContext***.getOrCreate(checkpointDirectory, **contextFactory**); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted **context**. ... // Start the context **context**.start(); **context**.awaitTermination(); 如果checkpointDirectory存在,则将从检查点数据重新创建上下文。如果目录不存在(即第一次运行),则将调用函数contextFactory以创建新上下文并设置DStream。请参阅Java示例**JavaRecoverableNetworkWordCount**。此示例将网络数据的字数附加到文件中。 除了使用getOrCreate之外,还需要确保驱动程序进程在失败时自动重新启动。这只能通过用于运行应用程序的部署基础结构来完成。这在“部署Deployment”部分中进一步讨论。 请注意,**RDD****的检查点会导致节省可靠存储的成本**。这可能导致RDD被检查点的那些批次的处理时间增加。因此,需要仔细设置检查点的间隔。在小批量(例如1秒)下,每批次检查点可能会显着降低操作吞吐量。相反,检查点过于频繁会导致谱系和任务大小增加,这可能会产生不利影响。**对于需要****RDD****检查点的有状态转换,默认时间间隔是批处理间隔的倍数,至少为10****秒。可以使用dstream.checkpoint****(checkpointInterval****)进行设置。通常,DStream****的5-10****个滑动间隔的检查点间隔是一个很好的设置**。 ### Accumulators, Broadcast Variables, and Checkpoints ### 无法从Spark Streaming中的**检查点**恢复**累加器**和**广播变量**。 如果启用了检查点并使用累加器或广播变量,则必须为**累加器和广播变量创建延迟实例化的单例实例**,以便在驱动程序重新启动失败后重新实例化它们。 这在以下示例中显示。 **参照例子:JavaRecoverableNetworkWordCount****。** ### Deploying Applications ### 本节讨论部署Spark Streaming应用程序的步骤。 Requirements 要运行Spark Streaming应用程序,您需要具备以下条件。 1. 具有集群管理器的集群 - 这是任何Spark应用程序的一般要求,并在部署指南中进行了详细讨论。 2. 打包应用程序JAR - 您必须将流应用程序编译为JAR。如果您使用spark-submit来启动应用程序,那么您将不需要在JAR中提供Spark和Spark Streaming。但是,如果您的应用程序使用高级源(例如Kafka,Flume),那么您将不得不将它们链接的额外工件及其依赖项打包在用于部署应用程序的JAR中。例如,使用KafkaUtils的应用程序必须在应用程序JAR中包含spark-streaming-kafka-0-10\_2.11及其所有传递依赖项。 3. 为执行程序配置足够的内存 - 由于接收的数据必须存储在内存中,因此必须为执行程序配置足够的内存来保存接收的数据。请注意,如果您正在进行10分钟的窗口操作,则系统必须至少将最后10分钟的数据保留在内存中。因此,应用程序的内存要求取决于其中使用的操作。 4. 配置检查点 - 如果流应用程序需要它,然后在Hadoop的API兼容的容错存储(如HDFS,S3等)的目录必须配置为检查点目录和书面的方式流媒体应用程序,检查点信息用于故障恢复。有关详细信息,请参阅检查点部分。 5. 配置应用程序驱动程序的自动重新启动 - 要自动从驱动程序故障中恢复,用于运行流应用程序的部署基础结构必须监视驱动程序进程并在驱动程序失败时重新启动驱动程序。不同的集群管理器有不同的工具来实现这一点 * Spark Standalone - 可以提交Spark应用程序驱动程序以在Spark Standalone集群中运行(请参阅集群部署模式),即应用程序驱动程序本身在其中一个工作节点上运行。此外,可以指示独立集群管理器监督驱动程序,如果驱动程序由于非零退出代码或由于运行驱动程序的节点故障而失败,则重新启动它。有关详细信息,请参阅Spark Standalone指南中的集群模式和监督。 * YARN - Yarn支持类似的机制来自动重启应用程序。有关更多详细信息,请参阅YARN文档。 * Mesos - Marathon已被用于与Mesos实现这一目标。 1. 配置预写日志 - 自Spark 1.2起,我们引入了预写日志以实现强大的容错保证。如果启用,则从接收器接收的所有数据都将写入配置检查点目录中的预写日志。这可以防止驱动程序恢复时的数据丢失,从而确保零数据丢失(在容错语义部分中详细讨论)。可以通过将配置参数spark.streaming.receiver.writeAheadLog.enable设置为true来启用此功能。然而,这些更强的语义可能以单个接收器的接收吞吐量为代价。这可以通过并行运行更多接收器来更正聚合吞吐量来纠正。此外,建议在启用预写日志时禁用Spark中接收数据的复制,因为日志已存储在复制存储系统中。这可以通过将输入流的存储级别设置为StorageLevel.MEMORY\_AND\_DISK\_SER来完成。使用S3(或任何不支持刷新的文件系统)进行预写日志时,请记得启用spark.streaming.driver.writeAheadLog.closeFileAfterWrite和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。有关详细信息,请参阅Spark Streaming配置。请注意,启用I / O加密时,Spark不会加密写入预写日志的数据。如果需要加密预写日志数据,则应将其存储在本机支持加密的文件系统中。 1. 设置最大接收速率 - 如果群集资源不足以使流应用程序以接收数据的速度处理数据,则可以通过设置记录/秒的最大速率限制来限制接收器。请参阅接收器的配置参数spark.streaming.receiver.maxRate和直接Kafka方法的spark.streaming.kafka.maxRatePerPartition。在Spark 1.5中,我们引入了一项称为背压的功能,无需设置此速率限制,因为Spark Streaming会自动计算出速率限制,并在处理条件发生变化时动态调整它们。可以通过将配置参数spark.streaming.backpressure.enabled设置为true来启用此背压。 Upgrading Application Code升级应用程序代码 如果需要使用新的应用程序代码升级正在运行的Spark Streaming应用程序,则有两种可能: 1. 升级的Spark Streaming应用程序启动并与现有应用程序并行运行。一旦新的(接收与旧的数据相同的数据)已经预热并准备好黄金时间,旧的可以被放下。请注意,这可以用于支持将数据发送到两个目的地(即早期和升级的应用程序)的数据源。 1. 正常关闭现有应用程序(请参阅StreamingContext.stop(...)或JavaStreamingContext.stop(...)以获得正常关闭选项),以确保在关闭之前完全处理已接收的数据。然后可以启动升级的应用程序,该应用程序将从早期应用程序停止的同一点开始处理。请注意,这只能通过支持源端缓冲的输入源(如Kafka和Flume)来完成,因为在前一个应用程序关闭且升级的应用程序尚未启动时需要缓冲数据。并且无法从早期检查点重新启动升级前代码的信息。检查点信息基本上包含序列化的Scala / Java / Python对象,并尝试使用新的修改类反序列化对象可能会导致错误。在这种情况下,要么使用不同的检查点目录启动升级的应用程序,要么删除以前的检查点目录。 ### Monitoring Applications ### 除了Spark的监控功能外,还有一些特定于Spark Streaming的功能。使用StreamingContext时,Spark Web UI会显示另一个Streaming选项卡,其中显示有关运行接收器的统计信息(接收器是否处于活动状态,接收的记录数,接收器错误等)和已完成的批处理(批处理时间,排队延迟等) )。这可用于监视流应用程序的进度。 Web UI中的以下两个指标尤为重要: 1. 处理时间 - 处理每批数据的时间。 2. 计划延迟 - 批处理在队列中等待处理先前批处理完成的时间。 如果批处理时间始终大于批处理间隔和/或排队延迟不断增加,则表明系统无法以最快的速度处理批处理并且落后。在这种情况下,请考虑减少批处理时间。 还可以使用StreamingListener接口监视Spark Streaming程序的进度,该接口允许您获取接收器状态和处理时间。请注意,这是一个开发人员API,未来可能会对其进行改进(即报告更多信息)。 ## Performance Tuning ## 从群集上的Spark Streaming应用程序中获得最佳性能需要进行一些调整。 本节介绍了许多可以调整以提高应用程序性能的参数和配置。 在高层次上,您需要考虑两件事: 1. 通过有效使用群集资源减少每批数据的处理时间。 1. 设置正确的批量大小,以便可以像接收到的那样快速处理批量数据(即,数据处理与数据摄取保持同步)。 ### Reducing the Batch Processing Times ### 可以在Spark中进行许多优化,以最大限度地缩短每个批处理的处理时间。 这些已在“Tuning Guide”中详细讨论过。 本节重点介绍一些最重要的内容。 Level of Parallelism in Data Receiving 通过网络接收数据(如Kafka,Flume,socket等)需要将数据反序列化并存储在Spark中。如果数据接收成为系统中的瓶颈,则考虑并行化数据接收。请注意,每个输入DStream都会创建一个接收单个数据流的接收器(在工作机器上运行)。因此,可以通过创建多个输入DStream并将它们配置为从源接收数据流的不同分区来实现接收多个数据流。例如,接收两个数据主题的单个Kafka输入DStream可以分成两个Kafka输入流,每个输入流只接收一个主题。这将运行两个接收器,允许并行接收数据,从而提高整体吞吐量。这些多个DStream可以组合在一起以创建单个DStream。然后,可以在统一流上应用在单个输入DStream上应用的转换。这样做如下。 int **numStreams** = 5; List<***JavaPairDStream***<***String***, ***String***>> **kafkaStreams** = new ArrayList<>(**numStreams**); for (int **i** = 0; i < **numStreams**; **i**\++) \{ **kafkaStreams**.add(***KafkaUtils***.createStream(...)); \} ***JavaPairDStream***<***String***, ***String***> **unifiedStream** = streamingContext.union(**kafkaStreams**.get(0), **kafkaStreams**.subList(1, **kafkaStreams**.size())); **unifiedStream**.print(); 应考虑的另一个参数是接收器的块间隔,它由配置参数spark.streaming.blockInterval确定。对于大多数接收器,接收的数据在存储在Spark的内存中之前合并为数据块。每个批次中的块数决定了在类似地图的转换中用于处理接收数据的任务数。每批每个接收器的任务数量大约是(批处理间隔/块间隔)。例如,200 ms的块间隔将每2秒批次创建10个任务。如果任务数量太少(即,少于每台机器的核心数),那么效率将会很低,因为所有可用的核心都不会用于处理数据。要增加给定批处理间隔的任务数,请减少块间隔。但是,块间隔的建议最小值约为50 ms,低于该值时,任务启动开销可能会成为问题。 使用多个输入流/接收器接收数据的替代方法是显式地重新分区输入数据流(使用inputStream.repartition(<分区数>))。这会在进一步处理之前将收到的批量数据分布到群集中指定数量的计算机上。 Level of Parallelism in Data Processing 如果在计算的任何阶段中使用的并行任务的数量不够高,则群集资源可能未被充分利用。 例如,对于reduceByKey和reduceByKeyAndWindow等分布式reduce操作,默认的并行任务数由spark.default.parallelism配置属性控制。 您可以将并行级别作为参数传递(请参阅PairDStreamFunctions文档),或者设置spark.default.parallelism配置属性以更改默认值。 Data Serialization 通过调整序列化格式可以减少数据序列化的开销。在流式传输的情况下,有两种类型的数据被序列化。 1. 输入数据:默认情况下,通过Receiver接收的输入数据通过StorageLevel.MEMORY\_AND\_DISK\_SER\_2存储在执行程序的内存中。也就是说,数据被序列化为字节以减少GC开销,并且复制以容忍执行程序失败。此外,数据首先保存在内存中,并且仅在内存不足以保存流式计算所需的所有输入数据时才溢出到磁盘。这种序列化显然有开销 - 接收器必须反序列化接收的数据并使用Spark的序列化格式重新序列化。 1. 流式传输操作生成的持久RDD:流式计算生成的RDD可以保留在内存中。例如,窗口操作将数据保留在内存中,因为它们将被多次处理。但是,与StorageLevel.MEMORY\_ONLY的Spark Core默认值不同,默认情况下,通过流式计算生成的持久RDD与StorageLevel.MEMORY\_ONLY\_SER(即序列化)一起保留,以最小化GC开销。 在这两种情况下,使用Kryo序列化可以减少CPU和内存开销。有关详细信息,请参阅Spark Tuning Guide。对于Kryo,请考虑注册自定义类,并禁用对象引用跟踪(请参阅“配置指南”中的Kryo相关配置)。 在需要为流应用程序保留的数据量不大的特定情况下,将数据(两种类型)保存为反序列化对象可能是可行的,而不会产生过多的GC开销。例如,如果您使用的是几秒钟的批处理间隔而没有窗口操作,则可以尝试通过相应地显式设置存储级别来禁用持久数据中的序列化。这将减少由于序列化导致的CPU开销,可能在没有太多GC开销的情况下提高性能。 Task Launching Overheads 如果每秒启动的任务数量很高(例如,每秒50或更多),则向从属设备发送任务的开销可能很大,并且将难以实现亚秒级延迟。 通过以下更改可以减少开销: 1. 执行模式:在独立模式或粗粒度Mesos模式下运行Spark可以获得比细粒度Mesos模式更好的任务启动时间。 有关更多详细信息,请参阅Running on Mesos指南。 这些更改可以将批处理时间减少100毫秒,从而允许亚秒级批量大小可行。 ### Setting the Right Batch Interval ### 要使群集上运行的Spark Streaming应用程序保持稳定,系统应该能够以接收数据的速度处理数据。换句话说,批处理数据应该在生成时尽快处理。通过监视流式Web UI中的处理时间可以找到是否适用于应用程序,其中批处理时间应小于批处理间隔。 根据流式计算的性质,所使用的批处理间隔可能对应用程序在固定的一组集群资源上可以维持的数据速率产生重大影响。例如,让我们考虑一下早期的WordCountNetwork示例。对于特定数据速率,系统可能能够每2秒(即,2秒的批处理间隔)跟上报告字数,但不是每500毫秒。因此需要设置批处理间隔,以便可以维持生产中的预期数据速率。 确定应用程序正确批量大小的好方法是使用保守的批处理间隔(例如,5-10秒)和低数据速率进行测试。要验证系统是否能够跟上数据速率,您可以检查每个已处理批处理所遇到的端到端延迟的值(在Spark驱动程序log4j日志中查找“总延迟”,或使用StreamingListener接口)。如果延迟保持与批量大小相当,则系统稳定。否则,如果延迟不断增加,则意味着系统无法跟上,因此不稳定。一旦了解了稳定配置,就可以尝试提高数据速率和/或减小批量大小。注意,只要延迟减小到低值(即,小于批量大小),由于临时数据速率增加引起的延迟的瞬时增加可能是正常的。 ### Memory Tuning ### “调优指南”中详细讨论了调整Spark应用程序的内存使用情况和GC行为。强烈建议您阅读。在本节中,我们将特别在Spark Streaming应用程序的上下文中讨论一些调优参数。 Spark Streaming应用程序所需的集群内存量在很大程度上取决于所使用的转换类型。例如,如果要在最后10分钟的数据上使用窗口操作,那么您的群集应该有足够的内存来在内存中保存10分钟的数据。或者,如果要将updateStateByKey与大量键一起使用,则必要的内存将很高。相反,如果你想做一个简单的map-filter-store操作,那么必要的内存就会很少。 通常,由于通过接收器接收的数据与StorageLevel.MEMORY\_AND\_DISK\_SER\_2一起存储,因此不适合内存的数据将溢出到磁盘。这可能会降低流应用程序的性能,因此建议您根据流应用程序的需要提供足够的内存。最好尝试小规模地查看内存使用情况并进行相应估算。 内存调整的另一个方面是垃圾收集。对于需要低延迟的流应用程序,不希望由JVM垃圾收集导致大的暂停。 有一些参数可以帮助您调整内存使用和GC开销: 1. DStream的持久性级别:如前面数据序列化部分所述,输入数据和RDD默认持久化为序列化字节。与反序列化持久性相比,这减少了内存使用和GC开销。启用Kryo序列化可进一步减少序列化大小和内存使用量。通过压缩(参见Spark配置spark.rdd.compress)可以实现内存使用的进一步减少,但代价是CPU时间。 1. 清除旧数据:默认情况下,DStream转换生成的所有输入数据和持久RDD都会自动清除。 Spark Streaming根据使用的转换决定何时清除数据。例如,如果您使用10分钟的窗口操作,那么Spark Streaming将保留最后10分钟的数据,并主动丢弃旧数据。通过设置streamingContext.remember,可以将数据保留更长的时间(例如,交互式查询旧数据)。 1. CMS垃圾收集器:强烈建议使用并发标记和清除GC,以保持GC相关的暂停始终较低。尽管已知并发GC会降低系统的整体处理吞吐量,但仍建议使用它来实现更一致的批处理时间。确保在驱动程序(使用spark-submit中的--driver-java-options)和执行程序(使用Spark配置spark.executor.extraJavaOptions)上设置CMS GC。 1. 其他提示:为了进一步降低GC开销,这里有一些尝试的提示。 * 使用OFF\_HEAP存储级别保留RDD。请参阅Spark编程指南中的更多详细信息。 * 使用具有较小堆大小的更多执行程序。这将降低每个JVM堆中的GC压力。 Important points to remember: DStream与单个接收器相关联。为了获得读取并行性,需要创建多个接收器,即多个DStream。接收器在执行器内运行。它占据一个核心。确保在预订接收器插槽后有足够的内核进行处理,即spark.cores.max应考虑接收器插槽。接收器以循环方式分配给执行器。 当从流源接收数据时,接收器创建数据块。每隔blockInterval毫秒生成一个新的数据块。在batchInterval期间创建N个数据块,其中N = batchInterval / blockInterval。这些块由当前执行程序的BlockManager分发给其他执行程序的块管理器。之后,在驱动程序上运行的网络输入跟踪器将被告知有关进一步处理的块位置。 在驱动程序上为batchInterval期间创建的块创建RDD。 batchInterval期间生成的块是RDD的分区。每个分区都是spark中的任务。 blockInterval == batchinterval意味着创建了一个分区,并且可能在本地处理它。 块中的映射任务在执行器中处理(一个接收块,另一个块复制块),具有块而不管块间隔,除非非本地调度启动。具有更大的blockinterval意味着更大的块。 spark.locality.wait的值很高,增加了在本地节点上处理块的机会。需要在这两个参数之间找到平衡,以确保在本地处理更大的块。 您可以通过调用inputDstream.repartition(n)来定义分区数,而不是依赖于batchInterval和blockInterval。这会随机重新调整RDD中的数据以创建n个分区。是的,为了更大的并行性。虽然以洗牌为代价。 RDD的处理由驾驶员的jobcheduler作为工作安排。在给定的时间点,只有一个作业处于活动状态。因此,如果一个作业正在执行,则其他作业将排队。 如果你有两个dstream,那么将形成两个RDD,并且将创建两个将一个接一个地安排的作业。为了避免这种情况,你可以结合两个dstreams。这将确保为dstream的两个RDD形成单个unionRDD。然后,此unionRDD被视为单个作业。但是,RDD的分区不受影响。 如果批处理时间超过批处理间隔,那么显然接收方的内存将开始填满,最终会抛出异常(最有可能是BlockNotFoundException)。目前没有办法暂停接收器。使用SparkConf配置spark.streaming.receiver.maxRate,可以限制接收器的速率。 ## Fault-tolerance Semantics ## 在本节中,我们将讨论Spark Streaming应用程序在发生故障时的行为。 ### Background ### 要理解Spark Streaming提供的语义,让我们记住Spark的RDD的基本容错语义。 1. RDD是一个不可变的,确定性可重新计算的分布式数据集。 每个RDD都会记住在容错输入数据集上用于创建它的确定性操作的沿袭。 2. 如果由于工作节点故障导致RDD的任何分区丢失,则可以使用操作系列从原始容错数据集重新计算该分区。 3. 假设所有RDD转换都是确定性的,那么无论Spark集群中的故障如何,最终转换的RDD中的数据将始终相同。 Spark对容错文件系统(如HDFS或S3)中的数据进行操作。 因此,从容错数据生成的所有RDD也是容错的。 但是,Spark Streaming不是这种情况,因为大多数情况下的数据是通过网络接收的(除非使用了fileStream)。 要为所有生成的RDD实现相同的容错属性,接收的数据将在群集中的工作节点中的多个Spark执行程序之间进行复制(默认复制因子为2)。 这导致系统中需要在发生故障时恢复的两种数据: 1. 接收和复制的数据 - 此数据在单个工作节点发生故障时仍然存在,因为其副本存在于其中一个节点上。 2. 收到但缓冲复制的数据 - 由于没有复制,恢复此数据的唯一方法是从源中再次获取它。 有了这些基础知识,让我们了解Spark Streaming的容错语义。 ### Definitions ### 流系统的语义通常根据系统处理每条记录的次数来捕获。 系统可以在所有可能的操作条件下提供三种类型的保证(尽管出现故障等) 1. 最多一次:每条记录将被处理一次或根本不处理。 2. 至少一次:每条记录将被处理一次或多次。 这比最多一次强,因为它确保不会丢失任何数据。 但可能有重复。 3. 恰好一次:每条记录只处理一次 - 不会丢失任何数据,也不会多次处理任何数据。 这显然是三者的最有力保障。 ### Basic Semantics ### 在任何流处理系统中,从广义上讲,处理数据有三个步骤。 1. 接收数据:使用接收器或其他方式从数据源接收数据。 2. 转换数据:使用DStream和RDD转换转换接收的数据。 3. 推出数据:最终转换的数据被推送到外部系统,如文件系统,数据库,仪表板等。 如果流应用程序必须实现端到端的一次性保证,那么每个步骤都必须提供一次性保证。也就是说,每条记录必须只接收一次,转换一次,然后推送到下游系统一次。让我们理解Spark Streaming上下文中这些步骤的语义。 1. 接收数据:不同的输入源提供不同的保证。这将在下一小节中详细讨论。 2. 转换数据:由于RDD提供的保证,所有已接收的数据将只处理一次。即使存在故障,只要可以访问接收的输入数据,最终变换的RDD将始终具有相同的内容。 3. 推出数据:默认情况下输出操作至少确保一次语义,因为它取决于输出操作的类型(幂等或不等)和下游系统的语义(支持或不支持事务)。但是用户可以实现自己的事务机制来实现一次性语义。本节稍后将对此进行更详细的讨论。 ### Semantics of Received Data ### 不同的输入源提供不同的保证,范围从至少一次到恰好一次。阅读更多详情。 With Files 如果所有输入数据都已存在于HDFS等容错文件系统中,则Spark Streaming始终可以从任何故障中恢复并处理所有数据。这给出了一次性语义,这意味着无论失败什么,所有数据都将被处理一次。 使用基于Receiver的源 对于基于接收器的输入源,容错语义取决于故障情形和接收器类型。如前所述,有两种类型的接收器: 1. 可靠的接收器 - 这些接收器仅在确保已复制接收的数据后才确认可靠的源。如果此类接收器发生故障,则源将不会收到对缓冲(未复制)数据的确认。因此,如果重新启动接收器,源将重新发送数据,并且不会因失败而丢失数据。 2. 不可靠的接收器 - 此类接收器不发送确认,因此当它们因工作人员或驱动程序故障而失败时可能会丢失数据。 根据使用的接收器类型,我们实现以下语义。如果工作节点发生故障,则可靠接收器不会丢失数据。对于不可靠的接收器,接收但未复制的数据可能会丢失。如果驱动程序节点出现故障,那么除了这些丢失之外,在内存中接收和复制的所有过去数据都将丢失。这将影响有状态转换的结果。 为了避免丢失过去收到的数据,Spark 1.2引入了预写日志,将接收到的数据保存到容错存储中。通过启用预写日志和可靠的接收器,数据丢失为零。在语义方面,它提供至少一次保证。 ### Semantics of output operations ### 输出操作(如foreachRDD)至少具有一次语义,也就是说,在工作者发生故障时,转换后的数据可能会多次写入外部实体。虽然使用saveAs \*\*\* Files操作保存到文件系统是可以接受的(因为文件只会被相同的数据覆盖),但是可能需要额外的努力来实现一次性语义。有两种方法。 1. 幂等更新:多次尝试始终写入相同的数据。例如,saveAs \*\*\* Files始终将相同的数据写入生成的文件。 1. 事务性更新:所有更新都是以事务方式进行的,以便以原子方式完成更新。一种方法是: * 使用批处理时间(在foreachRDD中可用)和RDD的分区索引来创建标识符。该标识符唯一地标识流应用程序中的blob数据。 * 使用标识符以事务方式(即,一次,原子地)使用此blob更新外部系统。也就是说,如果标识符尚未提交,则以原子方式提交分区数据和标识符。否则,如果已经提交,请跳过更新。 dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } } [70]: /images/20220517/d6d3aacd8cb34c14a1a700f3ad1f5978.png [70 1]: /images/20220517/e1c061fd1c9f4ccbbff057ef36ae7b75.png [70 2]: /images/20220517/05f416d23a194800a206fe015e869639.png [70 3]: /images/20220517/ceed2a1d583445aa8c2266fb1837c4e2.png
还没有评论,来说两句吧...