Spark SQL DataSet

1. 目的

本章将介绍DataSet,Encoder以及它在DataSet中的重要性,为什么需要DataSet,DataSet的特点以及DataSet实例。

2. DataSet介绍

DataSet是分布式的数据集合,是从Spark 1.6开始引入的一个新的抽象。它集中了RDD的优点(强类型和强大的lambda函数)以及Spark SQL优化的执行引擎。DataSet可以通过JVM的对象进行构建,可以用函数式的转换(map、flatmap、filter)进行操作。

DataSet API在Scala和Java中都可以用,它通过Encoder实现了自定义的序列化格式,使得某些操作可以在无需序列化情况下进行,另外Dataset还进行了包括Tungsten优化在内的很多性能方面的优化。

Encoder是Spark SQL中序列化和反序列化框架的主要概念。Encoder在JVM对象和Spark的自定义二进制格式之间进行转换,Spark具有非常先进的内置Encoder,它生成字节码与堆外数据进行交互。Encoder可以按需访问各个属性,而无需对整个对象进行反序列化。由于Encoder知道记录的模式(Schema),因此可以实现序列化和反序列化。

DataSet提供了RDD和DataFrame的功能,它提供:

1)RDD的便利性。

2)DataFrame的性能优化。

3)Scala的静态类型安全。

因此,DataSet提供了更多功能的编程接口来处理结构化数据。

3. 为什么需要 DataSet

为了克服RDD和DataFrame的限制,所以需要DataSet。在DataFrame中,没有提供编译时类型安全性,不知道结构,数据不能改变。在RDD中没有自动优化,我们在需要时手动完成优化。

4. DataSet的特点

1)优化的查询

DataSet使用Catalyst Query Optimizer和Tungsten提供优化查询。

2)编译时分析

使用DataSet,我们可以在编译时检查语法和分析,使用DataFrame,RDD或常规SQL查询是不可能的。

3)持久存储

DataSet是可序列化和可查询的,我们可以将其保存到持久存储中。

4)内部转换

我们可以将类型安全的DataSet转换为“无类型”的DataFrame。Datasetholder提供了三种从Seq [T]或RDD [T]类型转换为Dataset [T]的方法:

toDS(): Dataset[T]

toDF(): DataFrame

toDF(colNames: String*): DataFrame

5)更快的计算

DataSet的实现比RDD实现快的多,从而提高了系统的性能。要使RDD达到相同性能,用户需手动完成优化。

6)更少的内存消耗

在缓存时,DataSet会创建更优化的内存布局。

7)适用于Scala和Java 的统一API

它为Scala和Java提供了一个统一的接口。

5. DataSet Word Count实例

完整的代码如下:

package com.doudianyun.spark
import org.apache.spark.sql.SparkSession
object DatasetWordCount {
   def main(args: Array[String]) {
     val sparkSession = SparkSession.builder.
      master("local")
      .appName("DatasetWordCount")
      .getOrCreate()
     import sparkSession.implicits._
     val data = sparkSession.read.text("/from/to/datafile").as[String]
     val words = data.flatMap(value => value.split("\\s+"))
     val groupedWords = words.groupByKey(_.toUpperCase)
     val count = groupedWords.count()
     count.show()
   }
 }

解析:

1)创建SparkSession

val sparkSession = SparkSession.builder.master("local").appName("DatasetWordCount").getOrCreate()

2)读取数据并将它转换成DataSet

import sparkSession.implicits._
val data = sparkSession.read.text("/from/to/datafile").as[String]

上面代码中,我们使用read.text来读取数据,as[String]可以为DataSet提供相关的模式。

3)分割单词并且对单词进行分组

val words = data.flatMap(value => value.split("\\s+"))
val groupedWords = words.groupByKey(_.toUpperCase)

以上代码中,我们没有创建出一个key/value键值对,因为DataSet是工作在行级别的抽象,每个值将被看作是带有多列的行数据,并且每个值都可以看作是group的key。

4)计数

val count = groupedWords.count()

以上代码中,使用count方法对每个单词进行计数。

5)打印结果

count.show()

以上代码中,用show来触发计算,显示结果。

Spark SQL DataSet
滚动到顶部