SparkContext

1. 目的

SparkContext是Spark功能的入口,任何Spark应用程序最重要的步骤是生成SparkContext,它允许Spark应用程序通过资源管理器(YARN/Mesos)访问Spark Cluster。

在这一节,我们将深入理解什么是SparkContext,学习SparkContext的各种任务以及如何停止Spark中的SparkContext。

2. SparkContext是什么

SparkContext是Spark功能的入口,任何Spark应用程序最重要的步骤是生成SparkContext,它允许Spark应用程序通过资源管理器访问Spark Cluster。资源管理器可以是Local、Standalone、YARN、Mesos和EC2。

3. 怎么创建SparkContext

如果想创建SparkContext,首先创建SparkConf,SparkConf包含了Spark集群配置的各种参数,一些参数定义了Spark应用程序的属性,一些参数用来分配集群上的资源,例如工作节点上执行器(Executor)的内存大小和CPU核数。简而言之,SparkConf指导如何访问Spark群集。创建一个SparkContext对象后,我们可以调用诸如textFile、sequenceFile、parallelize等功能创建RDD,广播变量和累加器,运行作业。所有这些都可以在SparkContext停止之前执行。

4. 停止SparkContext

每个JVM只能有一个SparkContext处于活动状态,在创建一个新的之前,您必须先停止它,如下所示:

stop(): Unit

它将显示以下消息:

INFO SparkContext: Successfully stopped SparkContext

5. 示例Word Count

让我们看看如何使用SparkConf创建SparkContext

package com.doudianyun.spark.example
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object WordCount {
  def main(args: Array[String]) {
    //Check whether sufficient params are supplied
    if (args.length < 2) {
      println("Usage: WordCount <input> <output>")
      System.exit(1)
    }
    //Create conf object
    val conf = new SparkConf().setAppName("WordCount")
    //create spark context object
    val sc = new SparkContext(conf)
    //Read file and create RDD
    val rawData = sc.textFile(args(0))
    //convert the lines into words using flatMap operation
    val words = rawData.flatMap(line => line.split(" "))
    //count the individual words using map and reduceByKey operation
    val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _)
    //Save the result
    wordCount.saveAsTextFile(args(1))
    //stop the spark context
    sc.stop
  }
}

6. SparkContext在Spark中的功能

6.1 获取Spark应用程序的当前状态

1) SparkConf

SparkConf可以针对应用程序进行属性配置,可以配置一些常见的属性,如Spark Master URL和应用程序名称,也可以通过set()方法配置任意键值对。

2) 部署环境

Spark部署环境有两种类型,即本地模式和集群模式。本地模式是非分布式单JVM部署模式,所有执行组件 – 驱动程序、执行程序、LocalSchedulerBackend和主程序都存在于同一个JVM中。本地模式主要是为了测试、调试或演示目的。集群模式是分布式运行。

6.2 设置配置属性

1) Master URL

master方法返回spark.master的当前值。

2) 本地属性

我们可以设置本地属性来影响从线程提交的Spark作业,例如Spark公平调度池。

3) 默认日志记录级别

允许在Spark应用程序中设置根日志记录级别,例如Spark Shell。

6.3 访问服务

访问TaskScheduler、LiveListenBus、BlockManager、SchedulerBackend、ShuffelManager等服务。

6.4 取消工作

cancleJob简单地要求DAGScheduler取消一个作业。

6.5 取消阶段(Stage)

cancleStage简单地要求DAGScheduler取消一个阶段。

6.6 注册Spark监听器

通过addSparkListener注册一个自定义Spark监听器,也可以通过spark.extraListeners注册Spark监听器。

6.7 可编程动态分配

提供以下方法用于动态分配executor:

requestExecutors,killExecutors,requestTotalExecutors,getExecutorIds。

6.8 访问缓存的RDD

getPersistentRDDs用于访问缓存的RDD。

6.9 移除缓存的RDD

unpersist 移除缓存的RDD。

7. 结论

SparkContext在Spark中提供了各种功能,如获取Spark应用程序的当前状态,设置配置,取消作业,取消阶段等,它是Spark功能的入口。

SparkContext
滚动到顶部