Spark Shell命令

1. 目的

Spark附带交互式shell/scala提示符,交互式shell可以运行不同的命令来处理数据。本文介绍了Spark shell交互的基本spark命令/操作。我们将讨论创建RDD并执行诸如map()、filter()、partitions()、cache()、count()、take()等各种转换和操作,Spark如何读取来自HDFS的数据并写入HDFS。

2. Spark Shell命令

启动Spark Shell:

$bin/spark-shell

2.1 创建RDD

2.1.1 从本地文件系统读取文件并创建一个RDD

scala> val dataRdd= sc.textFile("data.txt")

注:sc是SparkContext对象,我们首先需要在spark home下创建一个data.txt文件。

2.1.2 创建一个并行集合RDD

scala> val arr= Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> val arrayRdd= sc.parallelize(arr)

2.1.3 从现有RDD创建新的RDD

scala> val newRdd = arrayRdd.map(data => (data * 2))

上面是创建RDD的三种方法,当数据在外部系统(如本地文件系统,HDFS,HBase,Cassandra,S3等)时,我们可以使用第一种方法,通过调用Spark Context的textFile方法来创建RDD,第二种方法可以与现有集合一起使用,第三种方法可以从现有RDD中创建新的RDD。

2.2 统计RDD中的记录数量

scala> dataRdd.count()

2.3 Filter

过滤RDD并创建包含单词“spark”的新RDD。

scala> val filterRdd= dataRdd.filter(line => line.contains("spark"))

2.4 Transformation and Action

对于复杂的需求,我们可以将多个操作链接在一起,如过滤器转换和计数操作:

scala> dataRdd.filter(line => line.contains("spark")).count()

2.5 读取RDD第一条记录

scala> dataRdd.first()

2.6 读取RDD前五条记录

scala> dataRdd.take(5)

2.7 RDD分区

RDD由多个分区组成,得到分区数量

scala> dataRdd.partitions.length

注意:RDD中的分区数量为2(默认情况下),当我们从HDFS文件创建RDD时,块的数量等于分区数量。

2.8 缓存文件

缓存是优化技术。 一旦我们将RDD缓存在内存中,所有将来的计算都将在内存数据上运行,这可以节省磁盘搜索并提高性能。

scala> dataRdd.cache()

2.9 从HDFS文件读数据

从HDFS文件读取数据,我们可以指定完整的HDFS URL,如hdfs://IP:PORT/PATH

scala> var hFile = sc.textFile("hdfs://localhost:9000/data.txt")

2.10 WordCount程序

MapReduce最受欢迎的程序 – WordCount,统计文件中可用的所有单词。

scala> val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
scala> wc.take(5)

显示前5条数据

2.11 写数据到HDFS文件

scala> wc.saveAsTextFile("hdfs://localhost:9000/out")

3. 结论

使用Spark Shell命令,我们可以创建RDD(三种方式),从RDD读取数据和分区,缓存文件,从HDFS文件读取数据和写入数据到HDFS文件,并使用Spark Shell命令对数据执行各种操作。

Spark Shell命令
滚动到顶部