1. 目的
本章将介绍DataSet,Encoder以及它在DataSet中的重要性,为什么需要DataSet,DataSet的特点以及DataSet实例。
2. DataSet介绍
DataSet是分布式的数据集合,是从Spark 1.6开始引入的一个新的抽象。它集中了RDD的优点(强类型和强大的lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map、flatmap、filter)进行操作。
DataSet API在Scala和Java中都可以用,它通过Encoder实现了自定义的序列化格式,使得某些操作可以在无需序列化情况下进行,另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。
Encoder是Spark SQL中序列化和反序列化框架的主要概念。Encoder在JVM对象和Spark的自定义二进制格式之间进行转换,Spark具有非常先进的内置Encoder,它生成字节码与堆外数据进行交互。Encoder可以按需访问各个属性,而无需对整个对象进行反序列化。由于Encoder知道记录的模式(Schema),因此可以实现序列化和反序列化。
DataSet提供了RDD和DataFrame的功能,它提供:
1)RDD的便利性。
2)DataFrame的性能优化。
3)Scala的静态类型安全。
因此,DataSet提供了更多功能的编程接口来处理结构化数据。
3. 为什么需要 DataSet
为了克服RDD和DataFrame的限制,所以需要DataSet。在DataFrame中,没有提供编译时类型安全性,不知道结构,数据不能改变。在RDD中没有自动优化,我们在需要时手动完成优化。
4. DataSet的特点
1)优化的查询
DataSet使用Catalyst Query Optimizer和Tungsten提供优化查询。
2)编译时分析
使用DataSet,我们可以在编译时检查语法和分析,使用DataFrame,RDD或常规SQL查询是不可能的。
3)持久存储
DataSet是可序列化和可查询的,我们可以将其保存到持久存储中。
4)内部转换
我们可以将类型安全的DataSet转换为“无类型”的DataFrame。Datasetholder提供了三种从Seq [T]或RDD [T]类型转换为Dataset [T]的方法:
toDS(): Dataset[T]
toDF(): DataFrame
toDF(colNames: String*): DataFrame
5)更快的计算
DataSet的实现比RDD实现快的多,从而提高了系统的性能。要使RDD达到相同性能,用户需手动完成优化。
6)更少的内存消耗
在缓存时,DataSet会创建更优化的内存布局。
7)适用于Scala和Java 的统一API
它为Scala和Java提供了一个统一的接口。
5. DataSet Word Count实例
完整的代码如下:
package com.doudianyun.spark import org.apache.spark.sql.SparkSession object DatasetWordCount { def main(args: Array[String]) { val sparkSession = SparkSession.builder. master("local") .appName("DatasetWordCount") .getOrCreate() import sparkSession.implicits._ val data = sparkSession.read.text("/from/to/datafile").as[String] val words = data.flatMap(value => value.split("\\s+")) val groupedWords = words.groupByKey(_.toUpperCase) val count = groupedWords.count() count.show() } }
解析:
1)创建SparkSession
val sparkSession = SparkSession.builder.master("local").appName("DatasetWordCount").getOrCreate()
2)读取数据并将它转换成DataSet
import sparkSession.implicits._ val data = sparkSession.read.text("/from/to/datafile").as[String]
上面代码中,我们使用read.text来读取数据,as[String]可以为DataSet提供相关的模式。
3)分割单词并且对单词进行分组
val words = data.flatMap(value => value.split("\\s+")) val groupedWords = words.groupByKey(_.toUpperCase)
以上代码中,我们没有创建出一个key/value键值对,因为DataSet是工作在行级别的抽象,每个值将被看作是带有多列的行数据,并且每个值都可以看作是group的key。
4)计数
val count = groupedWords.count()
以上代码中,使用count方法对每个单词进行计数。
5)打印结果
count.show()
以上代码中,用show来触发计算,显示结果。