1. 目的
在Spark中,RDD之间的所有依赖关系都将记录在一张图表中,这就是我们在Spark中称的血统。我们将介绍在Spark逻辑执行计划中包含的RDD lineage概念,如何通过toDebugString方法获取RDD Lineage。
2. RDD介绍
RDD是“弹性分布式数据集(Resilient Distributed Dataset)”的首字母缩写,是Spark的基础数据结构,具体来说,RDD是Spark中不可变对象的集合。不可变对象集合有助于在群集的不同节点上进行计算。
弹性:
意味着容错,通过使用RDD血统,我们可以重新计算由于节点故障而丢失或损坏的分区。
分布式:
意味着数据驻留在多个节点上。
数据集:
是使用的数据的记录,可以从外部加载数据集,例如:JSON文件、CSV文件、文本文件或数据库。
3. RDD Lineage介绍
RDD本质上是延迟执行的,这意味着在RDD上执行一系列转换,这些转换不会立即执行。
我们从现有的RDD创建新的RDD,新的RDD带有指向父RDD的指针,这与图中记录的RDD之间的所有依赖关系相同,这就是我们所说的Lineage图。RDD Lineage是RDD的所有父RDD的图表,我们也称之为RDD依赖图。具体而言,它是将转换应用于Spark的输出,然后创建一个逻辑执行计划。
例如:
上图描绘了一个RDD图,它是以下一系列转换的结果:
val r00 = sc.parallelize(0 to 9) val r01 = sc.parallelize(0 to 90 by 10) val r10 = r00 filter(_>0) val r11 = r00.map(n => (n, n)) val r12 = r00 zip r01 val r13 = r01.keyBy(_ / 20) val r20 = Seq(r11, r12, r13).foldLeft(r10)(_ union _)
在一个动作被调用之后,这是一个需要执行什么转换的图表。
换句话说,无论何时在现有RDD的基础上,我们创建新的RDD,Spark使用Lineage管理这些依赖关系,基本上,每个RDD维护一个或多个父指针。
例如:
val b=a.map()
RDD b保留对其父RDD a的引用,这是一种RDD Lineage。
4. 使用toDebugString方法获取RDD Lineage
有几种方法可以在Spark中获取RDD Lineage,其中一种是调用toDebugString方法(toDebugString: String)。基本上,我们可以借助这个方法了解Spark RDD Lineage。
scala> val wordCount1 = sc.textFile(“README.md”).flatMap(_.split(“\\s+”)).map((_, 1)).reduceByKey(_ + _) wordCount1: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[21] at reduceByKey at <console>:24 scala> wordCount1.toDebugString res13: String = (2) ShuffledRDD[21] at reduceByKey at <console>:24 [] +-(2) MapPartitionsRDD[20] at map at <console>:24 [] | MapPartitionsRDD[19] at flatMap at <console>:24 [] | README.md MapPartitionsRDD[18] at textFile at <console>:24 [] | README.md HadoopRDD[17] at textFile at <console>:24 [] scala> wordCount1.getNumPartitions res14: Int = 2
启用spark.logLineage属性后,当执行Action时能够自动调用toDebugString方法。
$ ./bin/spark-shell -conf spark.logLineage=true scala> sc.textFile(“README.md”, 4).count … 15/10/17 14:46:42 INFO SparkContext: Starting job: count at <console>:25 15/10/17 14:46:42 INFO SparkContext: RDD’s recursive dependencies: (4) MapPartitionsRDD[1] at textFile at <console>:25 [] | README.md HadoopRDD[0] at textFile at <console>:25 []