本文共 17402 字,大约阅读时间需要 58 分钟。
首先需要注意的是,累加器()和广播变量()是无法从Spark Streaming的检查点中恢复回来的。所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化。代码示例如下:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance }}object DroppedWordsCounter { @volatile private var instance: Accumulator[Long] = null def getInstance(sc: SparkContext): Accumulator[Long] = { if (instance == null) { synchronized { if (instance == null) { instance = sc.accumulator(0L, "WordsInBlacklistCounter") } } } instance }}wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => { // 获取现有或注册新的blacklist广播变量 val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // 获取现有或注册新的 droppedWordsCounter 累加器 val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中 val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter += count false } else { true } }.collect() val output = "Counts at time " + time + " " + counts})
这里有完整代码:。
在Streaming应用中可以调用来处理流式数据。开发者可以用通过StreamingContext中的SparkContext对象来创建一个SQLContext,并且,开发者需要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能重新创建出来。同样,你还是可以使用懒惰创建的单例模式来实例化SQLContext,如下面的代码所示,这里我们将最开始的那个小栗子做了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每个RDD都转化成一个DataFrame,然后注册成临时表,再用SQL查询这些临时表。
/** streaming应用中调用DataFrame算子 */val words: DStream[String] = ...words.foreachRDD { rdd => // 获得SQLContext单例 val sqlContext = SQLContext.getOrCreate(rdd.sparkContext) import sqlContext.implicits._ // 将RDD[String] 转为 DataFrame val wordsDataFrame = rdd.toDF("word") // DataFrame注册为临时表 wordsDataFrame.registerTempTable("words") // 再用SQL语句查询,并打印出来 val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show()}
See the full .
这里有完整代码:。
你也可以在其他线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不同)。不过这种情况下,你需要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,否则StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能无法得到查询结果。举个栗子,如果你需要查询上一个批次的数据,但是你的SQL查询可能要执行5分钟,那么你就需要StreamingContext至少保留最近5分钟的数据:streamingContext.remember(Minutes(5)) (这是Scala为例,其他语言差不多)
更多DataFrame和SQL的文档见这里:
提供了很多机器学习算法。首先,你需要关注的是流式计算相关的机器学习算法(如:, ),这些流式算法可以在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此以外,对更多的机器学习算法而言,你需要离线训练这些模型,然后将训练好的模型用于在线的流式数据。详见。
和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。
对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。
注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南()。
一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据:
总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。
如果有以下情况出现,你就必须启用检查点了:
注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。
检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。另外,如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:
不过这个行为可以用StreamingContext.getOrCreate来实现,示例如下:
// 首次创建StreamingContext并定义好数据流处理逻辑def functionToCreateContext(): StreamingContext = { val ssc = new StreamingContext(...) // 新建一个StreamingContext对象 val lines = ssc.socketTextStream(...) // 创建DStreams ... ssc.checkpoint(checkpointDirectory) // 设置好检查点目录 ssc}// 创建新的StreamingContext对象,或者从检查点构造一个val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// 无论是否是首次启动都需要设置的工作在这里context. ...// 启动StreamingContext对象context.start()context.awaitTermination()
如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。
除了使用getOrCreate之外,开发者还需要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署()这一节。
另外需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。
本节中将主要讨论一下如何部署Spark Streaming应用。
要运行一个Spark Streaming 应用,你首先需要具备以下条件:
升级Spark Streaming应用程序代码,可以使用以下两种方式:
除了Spark自身的监控能力()之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。
web UI上有两个度量特别重要:
如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来()。
Spark Streaming程序的处理进度可以用接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。
要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情:
有不少优化手段都可以减少Spark对每个批次的处理时间。细节将在优化指南()中详谈。这里仅列举一些最重要的。
跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。
如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下:
val numStreams = 5val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }val unifiedStream = streamingContext.union(kafkaStreams)unifiedStream.print()
另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数()spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了。
另一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操作会在处理前,将数据散开重新分发到集群中多个节点上。
在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(见)。
调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化:
不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销,详见。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见)。
在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。
如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销:
这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。
要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控()页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。
根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。
要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值)
Spark应用内存占用和GC调优已经在调优指南()中有详细的讨论。墙裂建议你读一读那篇文档。本节中,我们只是讨论一下几个专门用于Spark Streaming的调优参数。
Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,如果想要用一个窗口算子操纵最近10分钟的数据,那么你的集群至少需要在内存里保留10分钟的数据;另一个例子是updateStateByKey,如果key很多的话,相对应的保存的key的state也会很多,而这些都需要占用内存。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。
一般情况下,streaming接收器接收到的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,如果内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大降低streaming应用的性能,因此还是建议根据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,然后再放大,再评估。
另一个内存调优的方向就是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现大量的或耗时较长的JVM垃圾回收暂停。
以下是一些能够帮助你减少内存占用和GC开销的参数或手段:
本节中,我们将讨论Spark Streaming应用在出现失败时的具体行为。
要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。
Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据,这回导致Streaming的数据源并不可靠(尤其是对于使用了fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不同worker节点上的Spark执行器来实现(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据:
此外,还有两种可能的故障类型需要考虑:
有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。
流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障):
任何流式处理系统一般都会包含以下三个数据处理步骤:
如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:
不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。
如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次,不多也不少。
对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:
对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。
为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。
下表总结了故障情况下的各种语义:
部署场景 | Worker 故障 | Driver 故障 |
---|---|---|
Spark 1.1及以前版本 或者 Spark 1.2及以后版本,且未开启WAL | 若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据; 若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义 | 若使用不可靠接收器,则缓存数据和已保存数据都可能丢失; 若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证 |
Spark 1.2及以后版本,并启用WAL | 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 | 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证 |
从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。(改功能截止Spark 1.6.1还是实验性的)更详细的说明见:。
输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式:
dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交 }}
在Spark 0.9.1和Spark 1.0之间,有一些API接口变更,变更目的是为了保障未来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工作。
输入DStream(Input DStreams): 所有创建输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(对Java来说是JavaDStream),而是 / (对Java来说是 / / / )。这样才能确保特定输入流的功能能够在未来持续增加到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不需要任何代码修改(新的返回类型都是DStream的子类),只不过需要基于Spark 1.0重新编译一把。
定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但由于错误处理和汇报API方便的限制,该类型不能在Java中使用。所以Spark 1.0开始,用 来替换掉这个NetworkReceiver,主要的好处如下:
为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你需要如下工作:
继承,而不再是
org.apache.spark.streaming.dstream.NetworkReceiver。 基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实 后,
即可从Akka Actors中获取数据。获取数据的类被重命名为 ,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其他org.apache.spark.streaming.receivers包中的工具类也被移到 包下并重命名,新的类名应该比之前更加清晰。