博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
《Spark官方文档》Spark Streaming编程指南(二)
阅读量:6292 次
发布时间:2019-06-22

本文共 17402 字,大约阅读时间需要 58 分钟。

累加器和广播变量

首先需要注意的是,累加器()和广播变量()是无法从Spark Streaming的检查点中恢复回来的。所以如果你开启了检查点功能,并同时在使用累加器和广播变量,那么你最好是使用懒惰实例化的单例模式,因为这样累加器和广播变量才能在驱动器(driver)故障恢复后重新实例化。代码示例如下:

object WordBlacklist {  @volatile private var instance: Broadcast[Seq[String]] = null  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {    if (instance == null) {      synchronized {        if (instance == null) {          val wordBlacklist = Seq("a", "b", "c")          instance = sc.broadcast(wordBlacklist)        }      }    }    instance  }}object DroppedWordsCounter {  @volatile private var instance: Accumulator[Long] = null  def getInstance(sc: SparkContext): Accumulator[Long] = {    if (instance == null) {      synchronized {        if (instance == null) {          instance = sc.accumulator(0L, "WordsInBlacklistCounter")        }      }    }    instance  }}wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {  // 获取现有或注册新的blacklist广播变量  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)  // 获取现有或注册新的 droppedWordsCounter 累加器  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)  // 基于blacklist来过滤词,并将过滤掉的词的个数累加到 droppedWordsCounter 中  val counts = rdd.filter { case (word, count) =>    if (blacklist.value.contains(word)) {      droppedWordsCounter += count      false    } else {      true    }  }.collect()  val output = "Counts at time " + time + " " + counts})

这里有完整代码:。


DataFrame和SQL相关算子

在Streaming应用中可以调用来处理流式数据。开发者可以用通过StreamingContext中的SparkContext对象来创建一个SQLContext,并且,开发者需要确保一旦驱动器(driver)故障恢复后,该SQLContext对象能重新创建出来。同样,你还是可以使用懒惰创建的单例模式来实例化SQLContext,如下面的代码所示,这里我们将最开始的那个小栗子做了一些修改,使用DataFrame和SQL来统计单词计数。其实就是,将每个RDD都转化成一个DataFrame,然后注册成临时表,再用SQL查询这些临时表。

/** streaming应用中调用DataFrame算子 */val words: DStream[String] = ...words.foreachRDD { rdd =>  // 获得SQLContext单例  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)  import sqlContext.implicits._  // 将RDD[String] 转为 DataFrame  val wordsDataFrame = rdd.toDF("word")  // DataFrame注册为临时表  wordsDataFrame.registerTempTable("words")  // 再用SQL语句查询,并打印出来  val wordCountsDataFrame =     sqlContext.sql("select word, count(*) as total from words group by word")  wordCountsDataFrame.show()}

See the full .

这里有完整代码:。

你也可以在其他线程里执行SQL查询(异步查询,即:执行SQL查询的线程和运行StreamingContext的线程不同)。不过这种情况下,你需要确保查询的时候 StreamingContext 没有把所需的数据丢弃掉,否则StreamingContext有可能已将老的RDD数据丢弃掉了,那么异步查询的SQL语句也可能无法得到查询结果。举个栗子,如果你需要查询上一个批次的数据,但是你的SQL查询可能要执行5分钟,那么你就需要StreamingContext至少保留最近5分钟的数据:streamingContext.remember(Minutes(5)) (这是Scala为例,其他语言差不多)

更多DataFrame和SQL的文档见这里: 


MLlib算子

 提供了很多机器学习算法。首先,你需要关注的是流式计算相关的机器学习算法(如:, ),这些流式算法可以在流式数据上一边学习训练模型,一边用最新的模型处理数据。除此以外,对更多的机器学习算法而言,你需要离线训练这些模型,然后将训练好的模型用于在线的流式数据。详见。


缓存/持久化

和RDD类似,DStream也支持将数据持久化到内存中。只需要调用 DStream的persist() 方法,该方法内部会自动调用DStream中每个RDD的persist方法进而将数据持久化到内存中。这对于可能需要计算很多次的DStream非常有用(例如:对于同一个批数据调用多个算子)。对于基于滑动窗口的算子,如:reduceByWindow和reduceByKeyAndWindow,或者有状态的算子,如:updateStateByKey,数据持久化就更重要了。因此,滑动窗口算子产生的DStream对象默认会自动持久化到内存中(不需要开发者调用persist)。

对于从网络接收数据的输入数据流(如:Kafka、Flume、socket等),默认的持久化级别会将数据持久化到两个不同的节点上互为备份副本,以便支持容错。

注意,与RDD不同的是,DStream的默认持久化级别是将数据序列化到内存中。进一步的讨论见性能调优这一小节。关于持久化级别(或者存储级别)的更详细说明见Spark编程指南()。


检查点

一般来说Streaming 应用都需要7*24小时长期运行,所以必须对一些与业务逻辑无关的故障有很好的容错(如:系统故障、JVM崩溃等)。对于这些可能性,Spark Streaming 必须在检查点保存足够的信息到一些可容错的外部存储系统中,以便能够随时从故障中恢复回来。所以,检查点需要保存以下两种数据:

  • 元数据检查点(Metadata checkpointing) – 保存流式计算逻辑的定义信息到外部可容错存储系统(如:HDFS)。主要用途是用于在故障后回复应用程序本身(后续详谈)。元数包括:
    • Configuration – 创建Streaming应用程序的配置信息。
    • DStream operations – 定义流式处理逻辑的DStream操作信息。
    • Incomplete batches – 已经排队但未处理完的批次信息。
  • 数据检查点(Data checkpointing) – 将生成的RDD保存到可靠的存储中。这对一些需要跨批次组合数据或者有状态的算子来说很有必要。在这种转换算子中,往往新生成的RDD是依赖于前几个批次的RDD,因此随着时间的推移,有可能产生很长的依赖链条。为了避免在恢复数据的时候需要恢复整个依赖链条上所有的数据,检查点需要周期性地保存一些中间RDD状态信息,以斩断无限制增长的依赖链条和恢复时间。

总之,元数据检查点主要是为了恢复驱动器节点上的故障,而数据或RDD检查点是为了支持对有状态转换操作的恢复。

何时启用检查点

如果有以下情况出现,你就必须启用检查点了:

  • 使用了有状态的转换算子(Usage of stateful transformations) – 不管是用了 updateStateByKey 还是用了 reduceByKeyAndWindow(有”反归约”函数的那个版本),你都必须配置检查点目录来周期性地保存RDD检查点。
  • 支持驱动器故障中恢复(Recovering from failures of the driver running the application) – 这时候需要元数据检查点以便恢复流式处理的进度信息。

注意,一些简单的流式应用,如果没有用到前面所说的有状态转换算子,则完全可以不开启检查点。不过这样的话,驱动器(driver)故障恢复后,有可能会丢失部分数据(有些已经接收但还未处理的数据可能会丢失)。不过通常这点丢失时可接受的,很多Spark Streaming应用也是这样运行的。对非Hadoop环境的支持未来还会继续改进。

如何配置检查点

检查点的启用,只需要设置好保存检查点信息的检查点目录即可,一般会会将这个目录设为一些可容错的、可靠性较高的文件系统(如:HDFS、S3等)。开发者只需要调用 streamingContext.checkpoint(checkpointDirectory)。设置好检查点,你就可以使用前面提到的有状态转换算子了。另外,如果你需要你的应用能够支持从驱动器故障中恢复,你可能需要重写部分代码,实现以下行为:

  • 如果程序是首次启动,就需要new一个新的StreamingContext,并定义好所有的数据流处理,然后调用StreamingContext.start()。
  • 如果程序是故障后重启,就需要从检查点目录中的数据中重新构建StreamingContext对象。
 

不过这个行为可以用StreamingContext.getOrCreate来实现,示例如下:

// 首次创建StreamingContext并定义好数据流处理逻辑def functionToCreateContext(): StreamingContext = {    val ssc = new StreamingContext(...)   // 新建一个StreamingContext对象    val lines = ssc.socketTextStream(...) // 创建DStreams    ...    ssc.checkpoint(checkpointDirectory)   // 设置好检查点目录    ssc}// 创建新的StreamingContext对象,或者从检查点构造一个val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)// 无论是否是首次启动都需要设置的工作在这里context. ...// 启动StreamingContext对象context.start()context.awaitTermination()

如果 checkpointDirectory 目录存在,则context对象会从检查点数据重新构建出来。如果该目录不存在(如:首次运行),则 functionToCreateContext 函数会被调用,创建一个新的StreamingContext对象并定义好DStream数据流。完整的示例请参见,这个例子会将网络数据中的单词计数统计结果添加到一个文件中。

除了使用getOrCreate之外,开发者还需要确保驱动器进程能在故障后重启。这一点只能由应用的部署环境基础设施来保证。进一步的讨论见部署()这一节。

另外需要注意的是,RDD检查点会增加额外的保存数据的开销。这可能会导致数据流的处理时间变长。因此,你必须仔细的调整检查点间隔时间。如果批次间隔太小(比如:1秒),那么对每个批次保存检查点数据将大大减小吞吐量。另一方面,检查点保存过于频繁又会导致血统信息和任务个数的增加,这同样会影响系统性能。对于需要RDD检查点的有状态转换算子,默认的间隔是批次间隔的整数倍,且最小10秒。开发人员可以这样来自定义这个间隔:dstream.checkpoint(checkpointInterval)。一般推荐设为批次间隔时间的5~10倍。


部署应用

本节中将主要讨论一下如何部署Spark Streaming应用。

前提条件

要运行一个Spark Streaming 应用,你首先需要具备以下条件:

  • 集群以及集群管理器 – 这是一般Spark应用的基本要求,详见 。
  • 给Spark应用打个JAR包 – 你需要将你的应用打成一个JAR包。如果使用 提交应用,那么你不需要提供Spark和Spark Streaming的相关JAR包。但是,如果你使用了高级数据源( – 如:Kafka、Flume、Twitter等),那么你需要将这些高级数据源相关的JAR包及其依赖一起打包并部署。例如,如果你使用了TwitterUtils,那么就必须将spark-streaming-twitter_2.10及其相关依赖都打到应用的JAR包中。
  • 为执行器(executor)预留足够的内存 – 执行器必须配置预留好足够的内存,因为接受到的数据都得存在内存里。注意,如果某些窗口长度达到10分钟,那也就是说你的系统必须知道保留10分钟的数据在内存里。可见,到底预留多少内存是取决于你的应用处理逻辑的。
  • 配置检查点 – 如果你的流式应用需要检查点,那么你需要配置一个Hadoop API兼容的可容错存储目录作为检查点目录,流式应用的信息会写入这个目录,故障恢复时会用到这个目录下的数据。详见前面的检查点小节。
  • 配置驱动程序自动重启 – 流式应用自动恢复的前提就是,部署基础设施能够监控驱动器进程,并且能够在其故障时,自动重启之。不同的集群管理器有不同的工具来实现这一功能:
    • Spark独立部署 – Spark独立部署集群可以支持将Spark应用的驱动器提交到集群的某个worker节点上运行。同时,Spark的集群管理器可以对该驱动器进程进行监控,一旦驱动器退出且返回非0值,或者因worker节点原始失败,Spark集群管理器将自动重启这个驱动器。详见Spark独立部署指南()。
    • YARN – YARN支持和独立部署类似的重启机制。详细请参考YARN的文档。
    • Mesos – Mesos上需要用来实现这一功能。
  • 配置WAL(write ahead log)- 从Spark 1.2起,我们引入了write ahead log来提高容错性。如果启用这个功能,则所有接收到的数据都会以write ahead log形式写入配置好的检查点目录中。这样就能确保数据零丢失(容错语义有详细的讨论)。用户只需将 spark.streaming.receiver.writeAheadLog 设为true。不过,这同样可能会导致接收器的吞吐量下降。不过你可以启动多个接收器并行接收数据,从而提升整体的吞吐量()。另外,建议在启用WAL后禁用掉接收数据多副本功能,因为WAL其实已经是存储在一个多副本存储系统中了。你只需要把存储级别设为 StorageLevel.MEMORY_AND_DISK_SER。如果是使用S3(或者其他不支持flushing的文件系统)存储WAL,一定要记得启用这两个标识:spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和 spark.streaming.receiver.writeAheadLog.closeFileAfterWrite。更详细请参考: 。
  • 设置好最大接收速率 – 如果集群可用资源不足以跟上接收数据的速度,那么可以在接收器设置一下最大接收速率,即:每秒接收记录的条数。相关的主要配置有:spark.streaming.receiver.maxRate,如果使用Kafka Direct API 还需要设置 spark.streaming.kafka.maxRatePerPartition。从Spark 1.5起,我们引入了backpressure的概念来动态地根据集群处理速度,评估并调整该接收速率。用户只需将 spark.streaming.backpressure.enabled设为true即可启用该功能。

升级应用代码

升级Spark Streaming应用程序代码,可以使用以下两种方式:

  • 新的Streaming程序和老的并行跑一段时间,新程序完成初始化以后,再关闭老的。注意,这种方式适用于能同时发送数据到多个目标的数据源(即:数据源同时将数据发给新老两个Streaming应用程序)。
  • 老程序能够优雅地退出(参考   or  ),即:确保所收到的数据都已经处理完毕后再退出。然后再启动新的Streaming程序,而新程序将接着在老程序退出点上继续拉取数据。注意,这种方式需要数据源支持数据缓存(或者叫数据堆积,如:Kafka、Flume),因为在新旧程序交接的这个空档时间,数据需要在数据源处缓存。目前还不能支持从检查点重启,因为检查点存储的信息包含老程序中的序列化对象信息,在新程序中将其反序列化可能会出错。这种情况下,只能要么指定一个新的检查点目录,要么删除老的检查点目录。

应用监控

除了Spark自身的监控能力()之外,对Spark Streaming还有一些额外的监控功能可用。如果实例化了StreamingContext,那么你可以在上看到多出了一个Streaming tab页,上面显示了正在运行的接收器(是否活跃,接收记录的条数,失败信息等)和处理完的批次信息(批次处理时间,查询延时等)。这些信息都可以用来监控streaming应用。

web UI上有两个度量特别重要:

  • 批次处理耗时(Processing Time) – 处理单个批次耗时
  • 批次调度延时(Scheduling Delay) -各批次在队列中等待时间(等待上一个批次处理完)

如果批次处理耗时一直比批次间隔时间大,或者批次调度延时持续上升,就意味着系统处理速度跟不上数据接收速度。这时候你就得考虑一下怎么把批次处理时间降下来()。

Spark Streaming程序的处理进度可以用接口来监听,这个接口可以监听到接收器的状态和处理时间。不过需要注意的是,这是一个developer API接口,换句话说这个接口未来很可能会变动(可能会增加更多度量信息)。



性能调优

要获得Spark Streaming应用的最佳性能需要一点点调优工作。本节将深入解释一些能够改进Streaming应用性能的配置和参数。总体上来说,你需要考虑这两方面的事情:

  1. 提高集群资源利用率,减少单批次处理耗时。
  2. 设置合适的批次大小,以便使数据处理速度能跟上数据接收速度。

减少批次处理时间

有不少优化手段都可以减少Spark对每个批次的处理时间。细节将在优化指南()中详谈。这里仅列举一些最重要的。

数据接收并发度

跨网络接收数据(如:从Kafka、Flume、socket等接收数据)需要在Spark中序列化并存储数据。

如果接收数据的过程是系统瓶颈,那么可以考虑增加数据接收的并行度。注意,每个输入DStream只包含一个单独的接收器(receiver,运行约worker节点),每个接收器单独接收一路数据流。所以,配置多个输入DStream就能从数据源的不同分区分别接收多个数据流。例如,可以将从Kafka拉取两个topic的数据流分成两个Kafka输入数据流,每个数据流拉取其中一个topic的数据,这样一来会同时有两个接收器并行地接收数据,因而增加了总体的吞吐量。同时,另一方面我们又可以把这些DStream数据流合并成一个,然后可以在合并后的DStream上使用任何可用的transformation算子。示例代码如下:

val numStreams = 5val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }val unifiedStream = streamingContext.union(kafkaStreams)unifiedStream.print()

另一个可以考虑优化的参数就是接收器的阻塞间隔,该参数由配置参数()spark.streaming.blockInterval决定。大多数接收器都会将数据合并成一个个数据块,然后再保存到spark内存中。对于map类算子来说,每个批次中数据块的个数将会决定处理这批数据并行任务的个数,每个接收器每批次数据处理任务数约等于 (批次间隔 / 数据块间隔)。例如,对于2秒的批次间隔,如果数据块间隔为200ms,则创建的并发任务数为10。如果任务数太少(少于单机cpu core个数),则资源利用不够充分。如需增加这个任务数,对于给定的批次间隔来说,只需要减少数据块间隔即可。不过,我们还是建议数据块间隔至少要50ms,否则任务的启动开销占比就太高了。

另一个切分接收数据流的方法是,显示地将输入数据流划分为多个分区(使用 inputStream.repartition(<number of partitions>))。该操作会在处理前,将数据散开重新分发到集群中多个节点上。

数据处理并发度

在计算各个阶段(stage)中,任何一个阶段的并发任务数不足都有可能造成集群资源利用率低。例如,对于reduce类的算子,如:reduceByKey 和 reduceByKeyAndWindow,其默认的并发任务数是由 spark.default.parallelism 决定的。你既可以修改这个默认值(spark.default.parallelism),也可以通过参数指定这个并发数量(见)。

数据序列化

调整数据的序列化格式可以大大减少数据序列化的开销。在spark Streaming中主要有两种类型的数据需要序列化:

  • 输入数据: 默认地,接收器收到的数据是以  的存储级别存储到执行器(executor)内存中的。也就是说,收到的数据会被序列化以减少GC开销,同时保存两个副本以容错。同时,数据会优先保存在内存里,当内存不足时才吐出到磁盘上。很明显,这个过程中会有数据序列化的开销 – 接收器首先将收到的数据反序列化,然后再以spark所配置指定的格式来序列化数据。
  • Streaming算子所生产的持久化的RDDs: Streaming计算所生成的RDD可能会持久化到内存中。例如,基于窗口的算子会将数据持久化到内存,因为窗口数据可能会多次处理。所不同的是,spark core默认用  级别持久化RDD数据,而spark streaming默认使用 级别持久化接收到的数据,以便尽量减少GC开销。

不管是上面哪一种数据,都可以使用Kryo序列化来减少CPU和内存开销,详见。另,对于Kryo,你可以考虑这些优化:注册自定义类型,禁用对象引用跟踪(详见)。

在一些特定的场景下,如果数据量不是很大,那么你可以考虑不用序列化格式,不过你需要注意的是取消序列化是否会导致大量的GC开销。例如,如果你的批次间隔比较短(几秒)并且没有使用基于窗口的算子,这种情况下你可以考虑禁用序列化格式。这样可以减少序列化的CPU开销以优化性能,同时GC的增长也不多。

任务启动开销

如果每秒启动的任务数过多(比如每秒50个以上),那么将任务发送给slave节点的开销会明显增加,那么你也就很难达到亚秒级(sub-second)的延迟。不过以下两个方法可以减少任务的启动开销:

  • 任务序列化(Task Serialization): 使用Kryo来序列化任务,以减少任务本身的大小,从而提高发送任务的速度。任务的序列化格式是由 spark.closure.serializer 属性决定的。不过,目前还不支持闭包序列化,未来的版本可能会增加对此的支持。
  • 执行模式(Execution mode): Spark独立部署或者Mesos粗粒度模式下任务的启动时间比Mesos细粒度模式下的任务启动时间要短。详见。

这些调整有可能能够减少100ms的批次处理时间,这也使得亚秒级的批次间隔成为可能。


设置合适的批次间隔

要想streaming应用在集群上稳定运行,那么系统处理数据的速度必须能跟上其接收数据的速度。换句话说,批次数据的处理速度应该和其生成速度一样快。对于特定的应用来说,可以从其对应的监控()页面上观察验证,页面上显示的处理耗时应该要小于批次间隔时间。

根据spark streaming计算的性质,在一定的集群资源限制下,批次间隔的值会极大地影响系统的数据处理能力。例如,在WordCountNetwork示例中,对于特定的数据速率,一个系统可能能够在批次间隔为2秒时跟上数据接收速度,但如果把批次间隔改为500毫秒系统可能就处理不过来了。所以,批次间隔需要谨慎设置,以确保生产系统能够处理得过来。

要找出适合的批次间隔,你可以从一个比较保守的批次间隔值(如5~10秒)开始测试。要验证系统是否能跟上当前的数据接收速率,你可能需要检查一下端到端的批次处理延迟(可以看看Spark驱动器log4j日志中的Total delay,也可以用接口来检测)。如果这个延迟能保持和批次间隔差不多,那么系统基本就是稳定的。否则,如果这个延迟持久在增长,也就是说系统跟不上数据接收速度,那也就意味着系统不稳定。一旦系统文档下来后,你就可以尝试提高数据接收速度,或者减少批次间隔值。不过需要注意,瞬间的延迟增长可以只是暂时的,只要这个延迟后续会自动降下来就没有问题(如:降到小于批次间隔值)


内存调优

Spark应用内存占用和GC调优已经在调优指南()中有详细的讨论。墙裂建议你读一读那篇文档。本节中,我们只是讨论一下几个专门用于Spark Streaming的调优参数。

Spark Streaming应用在集群中占用的内存量严重依赖于具体所使用的tranformation算子。例如,如果想要用一个窗口算子操纵最近10分钟的数据,那么你的集群至少需要在内存里保留10分钟的数据;另一个例子是updateStateByKey,如果key很多的话,相对应的保存的key的state也会很多,而这些都需要占用内存。而如果你的应用只是做一个简单的 “映射-过滤-存储”(map-filter-store)操作的话,那需要的内存就很少了。

一般情况下,streaming接收器接收到的数据会以 StorageLevel.MEMORY_AND_DISK_SER_2 这个存储级别存到spark中,也就是说,如果内存装不下,数据将被吐到磁盘上。数据吐到磁盘上会大大降低streaming应用的性能,因此还是建议根据你的应用处理的数据量,提供充足的内存。最好就是,一边小规模地放大内存,再观察评估,然后再放大,再评估。

另一个内存调优的方向就是垃圾回收。因为streaming应用往往都需要低延迟,所以肯定不希望出现大量的或耗时较长的JVM垃圾回收暂停。

以下是一些能够帮助你减少内存占用和GC开销的参数或手段:

  • DStream持久化级别(Persistence Level of DStreams): 前面数据序列化()这小节已经提到过,默认streaming的输入RDD会被持久化成序列化的字节流。相对于非序列化数据,这样可以减少内存占用和GC开销。如果启用Kryo序列化,还能进一步减少序列化数据大小和内存占用量。如果你还需要进一步减少内存占用的话,可以开启数据压缩(通过spark.rdd.compress这个配置设定),只不过数据压缩会增加CPU消耗。
  • 清除老数据(Clearing old data): 默认情况下,所有的输入数据以及DStream的transformation算子产生的持久化RDD都是自动清理的。Spark Streaming会根据所使用的transformation算子来清理老数据。例如,你用了一个窗口操作处理最近10分钟的数据,那么Spark Streaming会保留至少10分钟的数据,并且会主动把更早的数据都删掉。当然,你可以设置 streamingContext.remember 以保留更长时间段的数据(比如:你可能会需要交互式地查询更老的数据)。
  • CMS垃圾回收器(CMS Garbage Collector): 为了尽量减少GC暂停的时间,我们墙裂建议使用CMS垃圾回收器(concurrent mark-and-sweep GC)。虽然CMS GC会稍微降低系统的总体吞吐量,但我们仍建议使用它,因为CMS GC能使批次处理的时间保持在一个比较恒定的水平上。最后,你需要确保在驱动器(通过spark-submit中的–driver-java-options设置)和执行器(使用spark.executor.extraJavaOptions配置参数)上都设置了CMS GC。
  • 其他提示: 如果还想进一步减少GC开销,以下是更进一步的可以尝试的手段:
    • 配合Tachyon使用堆外内存来持久化RDD。详见Spark编程指南()
    • 使用更多但是更小的执行器进程。这样GC压力就会分散到更多的JVM堆中。


容错语义

本节中,我们将讨论Spark Streaming应用在出现失败时的具体行为。

背景

要理解Spark Streaming所提供的容错语义,我们首先需要回忆一下Spark RDD所提供的基本容错语义。

  1. RDD是不可变的,可重算的,分布式数据集。每个RDD都记录了其创建算子的血统信息,其中每个算子都以可容错的数据集作为输入数据。
  2. 如果RDD的某个分区因为节点失效而丢失,则该分区可以根据RDD的血统信息以及相应的原始输入数据集重新计算出来。
  3. 假定所有RDD transformation算子计算过程都是确定性的,那么通过这些算子得到的最终RDD总是包含相同的数据,而与Spark集群的是否故障无关。

Spark主要操作一些可容错文件系统的数据,如:HDFS或S3。因此,所有从这些可容错数据源产生的RDD也是可容错的。然而,对于Spark Streaming并非如此,因为多数情况下Streaming需要从网络远端接收数据,这回导致Streaming的数据源并不可靠(尤其是对于使用了fileStream的应用)。要实现RDD相同的容错属性,数据接收就必须用多个不同worker节点上的Spark执行器来实现(默认副本因子是2)。因此一旦出现故障,系统需要恢复两种数据:

  1. 接收并保存了副本的数据 – 数据不会因为单个worker节点故障而丢失,因为有副本!
  2. 接收但尚未保存副本数据 – 因为数据并没有副本,所以一旦故障,只能从数据源重新获取。

此外,还有两种可能的故障类型需要考虑:

  1. Worker节点故障 – 任何运行执行器的worker节点一旦故障,节点上内存中的数据都会丢失。如果这些节点上有接收器在运行,那么其包含的缓存数据也会丢失。
  2. Driver节点故障 – 如果Spark Streaming的驱动节点故障,那么很显然SparkContext对象就没了,所有执行器及其内存数据也会丢失。

有了以上这些基本知识,下面我们就进一步了解一下Spark Streaming的容错语义。

定义

流式系统的可靠度语义可以据此来分类:单条记录在系统中被处理的次数保证。一个流式系统可能提供保证必定是以下三种之一(不管系统是否出现故障):

  1. 至多一次(At most once): 每条记录要么被处理一次,要么就没有处理。
  2. 至少一次(At least once): 每条记录至少被处理过一次(一次或多次)。这种保证能确保没有数据丢失,比“至多一次”要强。但有可能出现数据重复。
  3. 精确一次(Exactly once): 每条记录都精确地只被处理一次 – 也就是说,既没有数据丢失,也不会出现数据重复。这是三种保证中最强的一种。

基础语义

任何流式处理系统一般都会包含以下三个数据处理步骤:

  1. 数据接收(Receiving the data): 从数据源拉取数据。
  2. 数据转换(Transforming the data): 将接收到的数据进行转换(使用DStream和RDD transformation算子)。
  3. 数据推送(Pushing out the data): 将转换后最终数据推送到外部文件系统,数据库或其他展示系统。

如果Streaming应用需要做到端到端的“精确一次”的保证,那么就必须在以上三个步骤中各自都保证精确一次:即,每条记录必须,只接收一次、处理一次、推送一次。下面让我们在Spark Streaming的上下文环境中来理解一下这三个步骤的语义:

  1. 数据接收: 不同数据源提供的保证不同,下一节再详细讨论。
  2. 数据转换: 所有的数据都会被“精确一次”处理,这要归功于RDD提供的保障。即使出现故障,只要数据源还能访问,最终所转换得到的RDD总是包含相同的内容。
  3. 数据推送: 输出操作默认保证“至少一次”的语义,是否能“精确一次”还要看所使用的输出算子(是否幂等)以及下游系统(是否支持事务)。不过用户也可以开发自己的事务机制来实现“精确一次”语义。这个后续会有详细讨论。

接收数据语义

不同的输入源提供不同的数据可靠性级别,从“至少一次”到“精确一次”。

从文件接收数据

如果所有的输入数据都来源于可容错的文件系统,如HDFS,那么Spark Streaming就能在任何故障中恢复并处理所有的数据。这种情况下就能保证精确一次语义,也就是说不管出现什么故障,所有的数据总是精确地只处理一次,不多也不少。

基于接收器接收数据

对于基于接收器的输入源,容错语义将同时依赖于故障场景和接收器类型。前面也已经提到过,spark Streaming主要有两种类型的接收器:

  1. 可靠接收器 – 这类接收器会在数据接收并保存好副本后,向可靠数据源发送确认信息。这类接收器故障时,是不会给缓存的(已接收但尚未保存副本)数据发送确认信息。因此,一旦接收器重启,没有收到确认的数据,会重新从数据源再获取一遍,所以即使有故障也不会丢数据。
  2. 不可靠接收器 – 这类接收器不会发送确认信息,因此一旦worker和driver出现故障,就有可能会丢失数据。

对于不同的接收器,我们可以获得如下不同的语义。如果一个worker节点故障了,对于可靠接收器来书,不会有数据丢失。而对于不可靠接收器,缓存的(接收但尚未保存副本)数据可能会丢失。如果driver节点故障了,除了接收到的数据之外,其他的已经接收且已经保存了内存副本的数据都会丢失,这将会影响有状态算子的计算结果。

为了避免丢失已经收到且保存副本的数,从 spark 1.2 开始引入了WAL(write ahead logs),以便将这些数据写入到可容错的存储中。只要你使用可靠接收器,同时启用WAL(),那么久再也不用为数据丢失而担心了。并且这时候,还能提供“至少一次”的语义保证。

下表总结了故障情况下的各种语义:

部署场景 Worker 故障 Driver 故障
Spark 1.1及以前版本 或者
Spark 1.2及以后版本,且未开启WAL
若使用不可靠接收器,则可能丢失缓存(已接收但尚未保存副本)数据;
若使用可靠接收器,则没有数据丢失,且提供至少一次处理语义
若使用不可靠接收器,则缓存数据和已保存数据都可能丢失;
若使用可靠接收器,则没有缓存数据丢失,但已保存数据可能丢失,且不提供语义保证
Spark 1.2及以后版本,并启用WAL 若使用可靠接收器,则没有数据丢失,且提供至少一次语义保证 若使用可靠接收器和文件,则无数据丢失,且提供至少一次语义保证

从Kafka Direct API接收数据

从Spark 1.3开始,我们引入Kafka Direct API,该API能为Kafka数据源提供“精确一次”语义保证。有了这个输入API,再加上输出算子的“精确一次”保证,你就能真正实现端到端的“精确一次”语义保证。(改功能截止Spark 1.6.1还是实验性的)更详细的说明见:。

输出算子的语义

输出算子(如 foreachRDD)提供“至少一次”语义保证,也就是说,如果worker故障,单条输出数据可能会被多次写入外部实体中。不过这对于文件系统来说是可以接受的(使用saveAs***Files 多次保存文件会覆盖之前的),所以我们需要一些额外的工作来实现“精确一次”语义。主要有两种实现方式:

  • 幂等更新(Idempotent updates): 就是说多次操作,产生的结果相同。例如,多次调用saveAs***Files保存的文件总是包含相同的数据。
  • 事务更新(Transactional updates): 所有的更新都是事务性的,这样一来就能保证更新的原子性。以下是一种实现方式:
    • 用批次时间(在foreachRDD中可用)和分区索引创建一个唯一标识,该标识代表流式应用中唯一的一个数据块。
    • 基于这个标识建立更新事务,并使用数据块数据更新外部系统。也就是说,如果该标识未被提交,则原子地将标识代表的数据更新到外部系统。否则,就认为该标识已经被提交,直接忽略之。
      dstream.foreachRDD { (rdd, time) =>  rdd.foreachPartition { partitionIterator =>    val partitionId = TaskContext.get.partitionId()    val uniqueId = generateUniqueId(time.milliseconds, partitionId)    // 使用uniqueId作为事务的唯一标识,基于uniqueId实现partitionIterator所指向数据的原子事务提交  }}


迁移指南 – 从0.9.1及以下升级到1.x

在Spark 0.9.1和Spark 1.0之间,有一些API接口变更,变更目的是为了保障未来版本API的稳定。本节将详细说明一下从已有版本迁移升级到1.0所需的工作。

输入DStream(Input DStreams): 所有创建输入流的算子(如:StreamingContext.socketStream, FlumeUtils.createStream 等)的返回值不再是DStream(对Java来说是JavaDStream),而是  / (对Java来说是 /  / / )。这样才能确保特定输入流的功能能够在未来持续增加到这些class中,而不会打破二进制兼容性。注意,已有的Spark Streaming应用应该不需要任何代码修改(新的返回类型都是DStream的子类),只不过需要基于Spark 1.0重新编译一把。

定制网络接收器(Custom Network Receivers): 自从Spark Streaming发布以来,Scala就能基于NetworkReceiver来定制网络接收器。但由于错误处理和汇报API方便的限制,该类型不能在Java中使用。所以Spark 1.0开始,用  来替换掉这个NetworkReceiver,主要的好处如下:

  • 该类型新增了stop和restart方法,便于控制接收器的生命周期。详见。
  • 定制接收器用Scala和Java都能实现。

为了将已有的基于NetworkReceiver的自定义接收器迁移到Receiver上来,你需要如下工作:

  • 首先你的自定义接收器类型需要从 继承,而不再是org.apache.spark.streaming.dstream.NetworkReceiver。
  • 原先,我们需要在自定义接收器中创建一个BlockGenerator来保存接收到的数据。你必须显示的实现onStart() 和 onStop() 方法。而在新的Receiver class中,这些都不需要了,你只需要调用它的store系列的方法就能将数据保存到Spark中。所以你接下来需要做的迁移工作就是,删除BlockGenerator对象(这个类型在Spark 1.0之后也没有了~),然后用store(…)方法来保存接收到的数据。

基于Actor的接收器(Actor-based Receivers): 从actor class继承后,并实 后,即可从Akka Actors中获取数据。获取数据的类被重命名为   ,而保存数据的pushBlocks(…)方法也被重命名为 store(…)。其他org.apache.spark.streaming.receivers包中的工具类也被移到   包下并重命名,新的类名应该比之前更加清晰。



下一步

  • 其他相关参考文档
  • API文档
    • Scala 文档
      •  和 
      • , , , , , 以及 
    • Java 文档
      • ,  以及 
      • , ,  , , 以及 
    • Python 文档
      •  和 
  • 其他示例: , 以及 
  • Spark Streaming相关的  和 。
转载自 
你可能感兴趣的文章
jdk6.0 + Tomcat6.0的简单jsp,Servlet,javabean的调试
查看>>
Android:apk签名
查看>>
2(2).选择排序_冒泡(双向循环链表)
查看>>
MySQL 索引 BST树、B树、B+树、B*树
查看>>
微信支付
查看>>
CodeBlocks中的OpenGL
查看>>
短址(short URL)
查看>>
第十三章 RememberMe——《跟我学Shiro》
查看>>
mysql 时间函数 时间戳转为日期
查看>>
索引失效 ORA-01502
查看>>
Oracle取月份,不带前面的0
查看>>
Linux Network Device Name issue
查看>>
IP地址的划分实例解答
查看>>
如何查看Linux命令源码
查看>>
运维基础命令
查看>>
入门到进阶React
查看>>
SVN 命令笔记
查看>>
检验手机号码
查看>>
重叠(Overlapped)IO模型
查看>>
Git使用教程
查看>>