内容
1. 目的
SparkContext是Spark功能的入口,任何Spark应用程序最重要的步骤是生成SparkContext,它允许Spark应用程序通过资源管理器(YARN/Mesos)访问Spark Cluster。
在这一节,我们将深入理解什么是SparkContext,学习SparkContext的各种任务以及如何停止Spark中的SparkContext。
2. SparkContext是什么
SparkContext是Spark功能的入口,任何Spark应用程序最重要的步骤是生成SparkContext,它允许Spark应用程序通过资源管理器访问Spark Cluster。资源管理器可以是Local、Standalone、YARN、Mesos和EC2。
3. 怎么创建SparkContext
如果想创建SparkContext,首先创建SparkConf,SparkConf包含了Spark集群配置的各种参数,一些参数定义了Spark应用程序的属性,一些参数用来分配集群上的资源,例如工作节点上执行器(Executor)的内存大小和CPU核数。简而言之,SparkConf指导如何访问Spark群集。创建一个SparkContext对象后,我们可以调用诸如textFile、sequenceFile、parallelize等功能创建RDD,广播变量和累加器,运行作业。所有这些都可以在SparkContext停止之前执行。
4. 停止SparkContext
每个JVM只能有一个SparkContext处于活动状态,在创建一个新的之前,您必须先停止它,如下所示:
stop(): Unit
它将显示以下消息:
INFO SparkContext: Successfully stopped SparkContext
5. 示例Word Count
让我们看看如何使用SparkConf创建SparkContext
package com.doudianyun.spark.example import org.apache.spark.SparkContext import org.apache.spark.SparkConf object WordCount { def main(args: Array[String]) { //Check whether sufficient params are supplied if (args.length < 2) { println("Usage: WordCount <input> <output>") System.exit(1) } //Create conf object val conf = new SparkConf().setAppName("WordCount") //create spark context object val sc = new SparkContext(conf) //Read file and create RDD val rawData = sc.textFile(args(0)) //convert the lines into words using flatMap operation val words = rawData.flatMap(line => line.split(" ")) //count the individual words using map and reduceByKey operation val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _) //Save the result wordCount.saveAsTextFile(args(1)) //stop the spark context sc.stop } }
6. SparkContext在Spark中的功能
6.1 获取Spark应用程序的当前状态
1) SparkConf
SparkConf可以针对应用程序进行属性配置,可以配置一些常见的属性,如Spark Master URL和应用程序名称,也可以通过set()方法配置任意键值对。
2) 部署环境
Spark部署环境有两种类型,即本地模式和集群模式。本地模式是非分布式单JVM部署模式,所有执行组件 – 驱动程序、执行程序、LocalSchedulerBackend和主程序都存在于同一个JVM中。本地模式主要是为了测试、调试或演示目的。集群模式是分布式运行。
6.2 设置配置属性
1) Master URL
master方法返回spark.master的当前值。
2) 本地属性
我们可以设置本地属性来影响从线程提交的Spark作业,例如Spark公平调度池。
3) 默认日志记录级别
允许在Spark应用程序中设置根日志记录级别,例如Spark Shell。
6.3 访问服务
访问TaskScheduler、LiveListenBus、BlockManager、SchedulerBackend、ShuffelManager等服务。
6.4 取消工作
cancleJob简单地要求DAGScheduler取消一个作业。
6.5 取消阶段(Stage)
cancleStage简单地要求DAGScheduler取消一个阶段。
6.6 注册Spark监听器
通过addSparkListener注册一个自定义Spark监听器,也可以通过spark.extraListeners注册Spark监听器。
6.7 可编程动态分配
提供以下方法用于动态分配executor:
requestExecutors,killExecutors,requestTotalExecutors,getExecutorIds。
6.8 访问缓存的RDD
getPersistentRDDs用于访问缓存的RDD。
6.9 移除缓存的RDD
unpersist 移除缓存的RDD。
7. 结论
SparkContext在Spark中提供了各种功能,如获取Spark应用程序的当前状态,设置配置,取消作业,取消阶段等,它是Spark功能的入口。