Spark SQL优化

1. 目的

本章介绍Spark SQL优化框架,为什么使用Catalyst优化器,Catalyst 优化器的基本原理,Catalyst的通用树转换框架。

2. Spark SQL优化简介

Spark SQL是Spark中技术最为复杂的组件,它处理SQL查询和DataFrame API。Spark SQL使用Catalyst优化器,Catalyst允许您使用一些高级编程语言特性(例如:Scala的模式匹配)来构建可扩展的查询优化器。

Catalyst优化器支持基于规则和成本的优化。在基于规则的优化中,使用一组规则来确定如何执行查询。在基于成本的优化中,使用规则生成多个计划,然后计算其成本,最后找到执行SQL语句的最合适的方法。

3. Catalyst优化器的目的

Catalyst的可扩展设计有两个目的:

1)希望能够轻松地为Spark SQL添加新的优化技术和功能,尤其是为了解决我们在使用大数据时遇到的各种问题(例如:半结构化数据和高级分析)

2)需要一种简单的方法让开发人员可以扩展优化器。

4. Catalyst 优化器的基本原理

Catalyst的核心是使用一个通用库生成树并使用规则操作树。在该框架的基础上,构建了用于关系查询处理库(例如:表达式,逻辑查询计划)和处理执行查询不同阶段的规则:分析、逻辑优化、物理计划和代码生成,代码生成将部分查询编译为Java字节码。对于后者,使用了Scala特性Quasiquotes,它可以很容易地在运行时由组合表达式生成代码。最后,Catalyst提供了几个公共扩展点,包括外部数据源和用户定义的类型。

4.1 树

Catalyst中的主要数据类型是由节点对象组成的树,每个节点都有一个节点类型和零个或多个子节点。新的节点类型是TreeNode类的子类。这些对象是不可变的,并可以使用函数转换来操作,

例如,我们有三个节点类:worth, attribute, sub

worth(value: Int):常数值

attribute(name: String):输入行的属性,例如“x”

sub (left: TreeNode, right: TreeNode):两个表达式的差

例如:x – ( 1 – 2 )

4.2 规则

虽然规则可以在其输入树上运行任意代码(因为该树只是一个Scala对象),但最常见的方法是使用一组模式匹配函数来查找和替换具有特定结构的子树。

模式匹配是许多函数式语言的一个特性,它允许从代数数据类型的潜在嵌套结构中提取值。在Catalyst中,树提供了一种转换方法,该方法递归地在树的所有节点上应用模式匹配函数,将每个模式匹配转换为结果。

例如:

tree.transform {case Add(worth(c1), worth(c2)) => worth(c1+c2)}

将此应用于x + (1 + 2)的树会产生新的树x + 3。这里关键是使用了Scala的标准模式匹配语法,它可用于匹配对象的类型和为提取的值(这里为c1和c2)提供名称。

传递给变换的模式匹配表达式是一个部分函数,这意味着它只需要匹配所有输入树的子集。 Catalyst将测试规则适用树的那些部分,自动跳过并下降到不匹配的子树。这种能力意味着规则只需对给定适用优化的树进行推理,而对那些不适用的树不进行推理。因此,当新的操作符新增到系统中时,这些规则不需要修改。

规则(和一般的Scala模式匹配)可以在同一个变换调用中匹配多个模式,这使得一次实现多个转换非常简洁。

tree.transform {
   case Add(worth(c1), worth(c2)) => worth(c1+c2)
   case Add(left, worth(0)) => left
   case Add(worth(0), right) => right
}

实际上,规则可能需要多次执行才能完全转换树。Catalyst将规则形成批处理,并执行每个批处理至固定点,该固定点是树应用其规则后不发生改变。

5. Spark SQL执行计划

Catalyst的通用树转换框架分为四个阶段:

1)解析

2)逻辑计划优化

3)物理计划

4)代码生成

5.1 解析

某一列的类型是否有效,在查询包含这一列的表元数据之前这些都是未知的。如果不知道它的类型或没有将它匹配到输入表(或别名)时,那么该属性称为未解析。Spark SQL使用Catalyst规则和记录所有表元数据的Catalog对象来解析这些属性的。构建具有未绑定属性和数据类型的“未解析的逻辑计划”树后,然后执行以下规则:

1)从Catalog中查找名称关系。

2)将命名属性(如col)映射到操作符的子项。

3)将那些属性引用相同的值给它们一个唯一的ID(随后遇到如col=col时可以进行优化)。

4)通过表达式传递和强制类型,例如:我们无法知道1+col的返回类型,直到解析出col并将其子表达式转换为兼容类型。

5.2 逻辑计划优化

在逻辑计划优化阶段,逻辑计划应用了标准的基于规则的优化(基于成本的优化通过规则生成多个计划,然后计算其成本来执行),这些优化包括常量折叠、谓词下推、项目裁剪、空值传播、布尔表达式简化等。

5.3 物理计划

在物理计划阶段,Spark SQL使用逻辑计划生成一个或多个物理计划,这个过程采用了匹配Spark执行引擎的物理运算符,然后使用成本模型选择计划。目前,基于成本的优化仅用于选择连接算法:对于已知很小的关系,Spark SQL使用Spark中的点对点广播工具进行广播连接,不过,该框架支持更深入地使用基于成本的优化,因为可以使用规则对整棵树进行递归估计。因此,我们打算在未来实施更加丰富的基于成本的优化。

物理计划还执行基于规则的物理优化,例如:将管道项目或过滤器合并到一个Spark映射操作中,另外,它可以将操作从逻辑计划推送到支持谓词或项目下推的数据源。

5.4 代码生成

查询优化的最后阶段涉及生成Java字节码用于在每台机器上运行。由于Spark SQL经常在内存数据集上运行,其中处理受CPU限制,我们希望Spark SQL支持代码生成以加快执行速度。代码生成引擎的构建通常很复杂,特别是编译器。Catalyst依靠Scala语言的特殊功能Quasiquotes来简化代码生成。Quasiquotes允许在Scala语言中对抽象语法树(AST)进行编程式构建,然后在运行时将其提供给Scala编译器以生成字节码。使用Catalyst将表示SQL表达式的树转换为Scala代码的AST用于描述表达式,然后编译并运行生成的代码。

Spark SQL优化
滚动到顶部