创建RDD

1. 目的

了解在Spark中创建RDD的三种方法 – 从程序中的集合创建RDD、从外部存储系统创建RDD、从现有RDD创建RDD。

2. 三种方法创建RDD

1) 从程序中的集合创建RDD

2) 从外部存储系统创建RDD(例如:HDFS、HBase等)。

3) 从现有的RDD创建RDD。

2.1 从程序中的集合创建RDD

在学习Spark的初始阶段,通常是通过程序中的集合创建RDD,即把程序中的集合传递给SparkContext的parallelize()方法。此方法用于学习Spark的初始阶段,因为它可以快速在Spark shell中创建自己的RDD并对它们执行操作。此方法很少在测试和原型之外使用,因为此方法需要在一台机器上存储整个数据集。

实例:

val data = sc.parallelize(Seq(("maths",52),("english",75),("science",82), ("computer",65),("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)

使用parallelize方法注意的关键点是数据集中的分区数量,Spark将为每个分区运行一个任务。 Spark根据我们的集群设置分区数量,但是我们也可以通过将分区数作为第二个参数来手动设置分区的数量。

在下面的实例,我们手动给出分区数为3。

val rdd1 = sc.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)

2.2 从外部存储系统创建RDD

在Spark中,数据集可以由Hadoop支持的任何数据源构成,包括本地文件系统、HDFS、Cassandra、HBase等。

实例:

1)csv

从HDFS中加载一个CSV文件并将结果作为Dataset<Row>返回。

import org.apache.spark.sql.SparkSession
def main(args: Array[String]):Unit = {
object DataFormat {
val spark =  SparkSession.builder.appName("AvgAnsTime").master("local").getOrCreate()
val dataRDD = spark.read.csv("hdfs:///path/of/csv/file").rdd

2)json

从HDFS中加载一个JSON文件(每行一个对象)并将结果作为Dataset<Row>返回

val dataRDD = spark.read.json("hdfs:///path/of/json/file").rdd

3)textFile

从HDFS中加载文本文件并返回字符串数据集。

val dataRDD = spark.read.textFile("hdfs:///path/of/text/file").rdd

2.3 从现有的RDD创建RDD

转换(transformation)是从现有的RDD创建新的RDD,现有的RDD没有变化。以下列出一些转换:map、flatMap、filter、union、join、cogroup、cartesian等。

实例:

val words = sc.parallelize(Seq("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"))
val wordPairs = words.map(w => (w.charAt(0), w))
wordPairs.foreach(println)

在上面的代码中,“wordPairs”RDD是从现有的“words”RDD创建的。

创建RDD
滚动到顶部