RDD操作实例

1. RDD操作

RDD支持两种类型的操作:转换(Transformation)和行动(Action)。

2. RDD转换(Transformation)

转换是将RDD作为输入并生成新RDD作为输出的函数,转换不改变输入RDD,通过转换函数生成新的RDD,例如:map()、filter()、reduceByKey()等。转换是延迟操作,当调用行动(Action)时创建新的RDD。

转换后,生成的RDD始终与其父RDD不同。它可以更小(例如filter、distinct、sample),更大(例如flatMap、union、cartesian)或相同的大小(例如map)。

每个转换操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系,RDD有两种依赖:窄依赖和宽依赖,窄依赖可以在一个阶段(Stage)中进行流水线处理,这是一种优化方法用于提高计算性能。

1) 窄依赖

指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、flatMap、filter、union等操作都会产生窄依赖。

2) 宽依赖

指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、join等操作都会产生宽依赖。

实例:

2.1 map

map[U](f: (T) ? U)(implicit arg0: ClassTag[U]): RDD[U]

对该RDD的所有元素应用函数f,返回新的RDD。

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object  mapTest{
def main(args: Array[String]) = {
val spark = SparkSession.builder.appName("mapExample").master("local").getOrCreate()
val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.map(line => (line,line.length))
mapFile.foreach(println)
}
}

在上面的代码中,map函数将文件的每一行与其长度进行映射。

spark_test.txt的内容如下:

hello…user! this file is created to check the operations of spark.

and how can we apply functions on that RDD partitions?. All this will be done through spark programming which is done with the help of scala language support…

2.2 flatMap

flatMap[U](f: (T) ? TraversableOnce[U])(implicit arg0: ClassTag[U]): RDD[U]

对该RDD的所有元素应用函数f,然后展平结果,返回新的RDD。

val data = spark.read.textFile("spark_test.txt").rdd
val flatmapFile = data.flatMap(lines => lines.split(" "))
flatmapFile.foreach(println)

在上面的代码中,flatMap函数将文件的每一行按照空格分隔成多个单词。

2.3 filter

filter(f: (T) ? Boolean): RDD[T]

返回仅包含满足谓词的元素的新RDD。

val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="spark")
println(mapFile.count())

在上面的代码中,flatMap函数将文件的每一行按照空格分隔成多个单词,然后过滤单词“spark”,最后使用count()对单词“spark”进行计数。

2.4 mapPartitions

mapPartitions[U](f: (Iterator[T]) ? Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

通过将函数f应用于此RDD的每个分区来返回新的RDD。

mapPartitions和map类似,只不过映射的参数由RDD的每个元素变成了RDD中的每一个分区的迭代器。

preservesPartitioning表示是否保留父RDD的partitioner分区信息,preservesPartitioning默认是false。如果这个RDD是一个Pair RDD,而且函数f不改变key,我们可以设置preservesPartitioning为true。

2.5 mapPartitionWithIndex

mapPartitionsWithIndex[U](f: (Int, Iterator[T]) ? Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]

mapPartitionWithIndex类似与mapPartitions,只是输出参数多了一个分区索引。

2.6 union

union(other: RDD[T]): RDD[T]

返回此RDD和另一个的联合。任何相同的元素将出现多次(使用distinct()消除重复元素)。

val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(17,"sep",2015)))
val rdd3 = spark.sparkContext.parallelize(Seq((6,"dec",2011),(16,"may",2015)))
val rddUnion = rdd1.union(rdd2).union(rdd3)
rddUnion.foreach(println)

2.7 intersection 

intersection(other: RDD[T]): RDD[T]

返回此RDD与另一个RDD的交集,输出不会包含重复的元素。

val rdd1 = spark.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014, (16,"feb",2014)))
val rdd2 = spark.sparkContext.parallelize(Seq((5,"dec",2014),(1,"jan",2016)))
val rdd3= rdd1.intersection(rdd2)
rdd3.foreach(println)

2.8 distinct 

distinct(): RDD[T]

去除RDD重复的元素,返回所有元素不重复的RDD。

val rdd1 = park.sparkContext.parallelize(Seq((1,"jan",2016),(3,"nov",2014),(16,"feb",2014),(3,"nov",2014)))
val result = rdd1.distinct()
println(result.collect().mkString(", "))

将删除重复的元素 (3,"nov",2014)

2.9 groupByKey 

groupByKey(): RDD[(K, Iterable[V])]

将RDD[K,V]中每个键K对应的值V合并到一个集合Iterable[V]中。

val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)

2.10 reduceByKey 

reduceByKey(func: (V, V) ? V): RDD[(K, V)]

将RDD[K,V]中每个键K对应的值V根据映射函数来运算。

val words = Array("one","two","two","four","five","six","six","eight","nine","ten")
val data = spark.sparkContext.parallelize(words).map(w => (w,1)).reduceByKey(_+_)
data.foreach(println)

2.11 sortByKey 

sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length): RDD[(K, V)]

按键对RDD进行排序,每个分区都是有序的。

val data = spark.sparkContext.parallelize(Seq(("maths",52), ("english",75), ("science",82), ("computer",65), ("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)

在上面的代码中,sortByKey函数将数据RDD按键(字符串)的升序排序。

2.12 join 

join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

返回一个RDD,包含所有在这两个RDD中具有匹配键的元素对。

val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))

2.13 coalesce 

coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T]

返回一个减少到numPartitions个分区的新RDD。coalesce使用HashPartitioner进行重分区,第一个参数为重分区数目,第二个为是否进行shuffle,默认为false。

val rdd1 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)

3. RDD行动(Action)

当RDD执行Action时候才会触发作业提交,执行相应的计算操作,例如:first()、take()、reduce()、collect()、count()、saveAsTextFile()、saveAsSequenceFile()等。

3.1 count

count(): Long

返回RDD中元素的数量。

val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.flatMap(lines => lines.split(" ")).filter(value => value=="spark")
println(mapFile.count())

3.2 collect 

collect(): Array[T]

返回包含此RDD中所有元素的数组。

val data = spark.sparkContext.parallelize(Array(('A',1),('b',2),('c',3)))
val data2 =spark.sparkContext.parallelize(Array(('A',4),('A',6),('b',7),('c',3),('c',8)))
val result = data.join(data2)
println(result.collect().mkString(","))

3.3 take 

take(num: Int): Array[T]

返回包含此RDD中前num个元素的数组

val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val twoRec = data.take(2)
twoRec.foreach(println)

3.4 top

top(num: Int)(implicit ord: Ordering[T]): Array[T]

按照默认(降序)或者指定的排序规则,返回前num个元素。

val data = spark.read.textFile("spark_test.txt").rdd
val mapFile = data.map(line => (line,line.length))
val res = mapFile.top(3)
res.foreach(println)

3.5 reduce 

reduce(f: (T, T) ? T): T

根据映射函数f,对RDD中的元素进行二元计算。

val rdd1 = spark.sparkContext.parallelize(List(20,32,45,62,8,5))
val sum = rdd1.reduce(_+_)
println(sum)

3.6 fold 

fold(zeroValue: T)(op: (T, T) ? T): T

使用给定的函数op和初始zeroValue值聚合每个分区的元素,然后聚合所有分区的结果。

val rdd1 = spark.sparkContext.parallelize(List(("maths", 80),("science", 90)))
val additionalMarks = ("extra", 4)
val sum = rdd1.fold(additionalMarks){ (acc, marks) => val add = acc._2 + marks._2
("total", add)
}
println(sum)

3.7 aggregate 

aggregate[U](zeroValue: U)(seqOp: (U, T) ? U, combOp: (U, U) ? U)(implicit arg0: ClassTag[U]): U

先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型。注意:seqOp和combOp都会使用zeroValue值。fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

3.8 foreach 

foreach(f: (T) ? Unit): Unit

将函数f应用于此RDD的所有元素。

val data = spark.sparkContext.parallelize(Array(('k',5),('s',3),('s',4),('p',7),('p',5),('t',8),('k',6)),3)
val group = data.groupByKey().collect()
group.foreach(println)

4. 结论

在向RDD应用transformation时会创建新的RDD,在RDD上应用action时会计算结果,这种延迟计算减少了计算开销,使系统更加高效。

RDD操作实例
滚动到顶部