内容
1. 目的
Spark附带交互式shell/scala提示符,交互式shell可以运行不同的命令来处理数据。本文介绍了Spark shell交互的基本spark命令/操作。我们将讨论创建RDD并执行诸如map()、filter()、partitions()、cache()、count()、take()等各种转换和操作,Spark如何读取来自HDFS的数据并写入HDFS。
2. Spark Shell命令
启动Spark Shell:
$bin/spark-shell
2.1 创建RDD
2.1.1 从本地文件系统读取文件并创建一个RDD
scala> val dataRdd= sc.textFile("data.txt")
注:sc是SparkContext对象,我们首先需要在spark home下创建一个data.txt文件。
2.1.2 创建一个并行集合RDD
scala> val arr= Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) scala> val arrayRdd= sc.parallelize(arr)
2.1.3 从现有RDD创建新的RDD
scala> val newRdd = arrayRdd.map(data => (data * 2))
上面是创建RDD的三种方法,当数据在外部系统(如本地文件系统,HDFS,HBase,Cassandra,S3等)时,我们可以使用第一种方法,通过调用Spark Context的textFile方法来创建RDD,第二种方法可以与现有集合一起使用,第三种方法可以从现有RDD中创建新的RDD。
2.2 统计RDD中的记录数量
scala> dataRdd.count()
2.3 Filter
过滤RDD并创建包含单词“spark”的新RDD。
scala> val filterRdd= dataRdd.filter(line => line.contains("spark"))
2.4 Transformation and Action
对于复杂的需求,我们可以将多个操作链接在一起,如过滤器转换和计数操作:
scala> dataRdd.filter(line => line.contains("spark")).count()
2.5 读取RDD第一条记录
scala> dataRdd.first()
2.6 读取RDD前五条记录
scala> dataRdd.take(5)
2.7 RDD分区
RDD由多个分区组成,得到分区数量
scala> dataRdd.partitions.length
注意:RDD中的分区数量为2(默认情况下),当我们从HDFS文件创建RDD时,块的数量等于分区数量。
2.8 缓存文件
缓存是优化技术。 一旦我们将RDD缓存在内存中,所有将来的计算都将在内存数据上运行,这可以节省磁盘搜索并提高性能。
scala> dataRdd.cache()
2.9 从HDFS文件读数据
从HDFS文件读取数据,我们可以指定完整的HDFS URL,如hdfs://IP:PORT/PATH
scala> var hFile = sc.textFile("hdfs://localhost:9000/data.txt")
2.10 WordCount程序
MapReduce最受欢迎的程序 – WordCount,统计文件中可用的所有单词。
scala> val wc = hFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _) scala> wc.take(5)
显示前5条数据
2.11 写数据到HDFS文件
scala> wc.saveAsTextFile("hdfs://localhost:9000/out")
3. 结论
使用Spark Shell命令,我们可以创建RDD(三种方式),从RDD读取数据和分区,缓存文件,从HDFS文件读取数据和写入数据到HDFS文件,并使用Spark Shell命令对数据执行各种操作。