Programming Spark: RDD 本质论

函数式编程 · horance · 于 发布 · 933 次阅读
3

Essential RDD

RDD(Resilient Distributed Dataset),是Spark最令人青睐的抽象,是Spark设计的核心。其本质是一个只读的分区记录,并能够被并行操作的集合,它具有如下几方面的特点:

  • 分区:可分区的数据(partitions);
  • 函数式:只读的,不可变的,惰性求值,并行处理
  • 容错:分区数据的自动恢复;
  • 序列化

特征

RDD具有5个最基本的特征或属性:

  • 分区集合(splits)
  • 依赖的RDD集合
  • 分区的计算方法
  • Preferred Locations(可选)
  • Partitioner(可选)

可参阅RDD的源代码:

abstract class RDD[T: ClassTag](
    private var sc: SparkContext,
    private var deps: Seq[Dependency[_]]
  ) {

  def getPartitions: Array[Partition]
  def compute(split: Partition, context: TaskContext): Iterator[T]

  def getDependencies: Seq[Dependency[_]] = deps
  def getPreferredLocations(split: Partition): Seq[String] = Nil
  val partitioner: Option[Partitioner] = None
}

一个简单的例子

val file = sc.textFile("hdfs://...")

首先生成HadoopRDD,在进行了一次Transformation变换为MapPartitionsRDD

val errors = file.filter(_.contains("ERROR"))

filter操作再进行了一次Transformation变换为另外一个MapPartitionsRDD

HadoopRDD

  • partitions = one per HDFS block
  • dependencies = None
  • compute(partition) = read corresponding block
  • preferredLocations(part) = HDFS block location
  • partitioner = None

MapPartitionsRDD

  • partitions = same as parent RDD
  • dependencies = "one-to-one" on parent
  • compute(partition) = compute parent and filter it
  • preferredLocations(part) = None(ask parent)
  • partitioner = None

RDD的计算

RDD中的dependencies, 及其compute(partition)RDD最重要的两个特征函数,其描述了RDD的最基本的行为特征。

首先解读RDD的计算过程,依赖关系后文重点讲述。

函数原型

RDD是针对于一个Partition(分区)的,而且由一个Task负责执行的;也就是说,RDD的分区与Task具有一一对应的关系);其计算结果表示一个类型为T的集合。

为此compute的函数原型正如上文所示:

def compute(split: Partition, context: TaskContext): Iterator[T]

惰性求值

RDDcompute并没有执行真正的计算,它只描绘了计算过程的蓝图,而且计算之间是通过组合来完成的,是一种典型的函数式设计的思维。

依赖关系

RDD的依赖关系可分为两类:

  • Narrow Dependency: 一个父RDD之多被一个子RDD引用;
  • Shuffle/Wide Dependency: 一个父RDD被多个子RDD引用;

也就是说,Narrow Dependency的出度为1Shuffle/Wide Dependency的出度大于1

区分的意义

  • Narrow Dependency可以支持在同一个Cluster Node上以Pipeline的形式执行并发运算多条命令;
  • Narrow Dependency的数据容错性会更有效,它只需重新计算丢失了的父分区即可,并且可以并行地在不同节点上重计算。

Stage划分

DAGScheduler根据RDD之间的依赖关系,识别出Stage列表,并依次将Stage(TaskSet)提交至TaskScheduler进行调度执行。

也就是说,DAGScheduler最终的职责就是完成Stage的划分算法。

划分准则

  • Wide Dependency是划分Stage的边界;
  • Narrow DependencyRDD被放在同一个Stage之中;

一个简单的例子

以下图为例,讲解DAG的划分算法。GFinalRDD,从后往前按照RDD的依赖关系,进行深度遍历算法,依次识别出各个Stage的起始边界。

  • Stage 3的划分:
  1. GB之间是Narrow Dependency,规约为同一Stage(3);
  2. BA之间是Wide DependencyA为新的FinalRDD,递归调用此过程;
  3. GF之间是Wide DependencyF为新的FinalRDD,递归调用此过程;
  • Stage 1的划分
  1. A没有父亲RDDStage(1)划分结束。特殊地Stage(1)仅包含RDD A
  • Stage 2的划分:
  1. RDD之间的关系都为Narrow Dependency,规约为同一个Stage(2);
  2. 直至RDD C,因没有父亲RDDStage(2)划分结束;

最终,形成了Stage的依赖关系,按照广度优先遍历算法,依次提交Stage(TaskSet)TaskScheduler进行调度执行。

算法解读

思考Stage划分的3个基本的问题:

  1. 如何确定Stage的起始边界?

    • 开始:读外部数据源,或读Shuffle
    • 结束:发生Shuffle写,或者Job结束;
  2. 如何确定FinalStage

    • 触发ActionRDD所在的Stage
  3. 如何表示一个Stage

private[spark] abstract class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int)

其中rddStage中最后一个RDD,可反向推演出完整的StagenumTasks表示Stage并发执行的的任务数,等于RDD的分区数。

生命周期

总结一下,一个应用程序的整个生命周期如下图所示:

  • 根据RDD的依赖关系,由DAGScheduler识别出Stage列表;
  • DAGScheduler依次将Stage(TaskSet)提交至TaskScheduler执行;
  • TaskScheduler选择合适的Worker,将Task提交至Executor上计算;
  • Executor从线程池中分配一个空闲的线程执行此Task

「软件匠艺社区」旨在传播匠艺精神,通过分享好的「工作方式」和「习惯」以帮助程序员更加快乐高效地编程。
本帖已被设为精华帖!
暂无回复。
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。