内容
概念和流程
一般的概念是应用程序提交客户端向YARN ResourceManager(RM)提交应用程序。这可以通过设置一个YarnClient对象来完成。在YarnClient启动后,客户端可以设置应用程序上下文,准备包含ApplicationMaster(AM)的应用程序的第一个 运行容器,然后提交应用程序。您需要提供信息,例如需要可供您的应用程序运行的本地文件/ jar的详细信息,需要执行的实际命令(具有必要的命令行参数),任何操作系统环境设置(可选)等等。实际上,您需要描述需要为您的ApplicationMaster启动的Unix进程。
YARN ResourceManager然后将在分配的容器上启动ApplicationMaster(按照指定)。 ApplicationMaster与YARN集群通信,并处理应用程序的执行。它以异步方式执行操作。在应用程序启动期间,ApplicationMaster的主要任务是:a)与ResourceManager进行通信,为将来的容器协商和分配资源,b)在容器分配后,通信YARN * NodeManager *(NM)以在其上启动应用程序容器。可以通过AMRMClientAsync对象异步执行任务a),并使用AMRMClientAsync.CallbackHandler类型的事件处理程序中指定的事件处理方法。事件处理程序需要明确地设置到客户端。任务b)可以通过启动一个可运行的对象来执行再分配的容器中,然后在分配容器时启动容器。作为启动此容器的一部分,AM必须指定具有启动信息(如命令行规范,环境等)的ContainerLaunchContext。
在执行应用程序期间,ApplicationMaster通过NMClientAsync对象与NodeManager进行通信。所有容器事件都由NMClientAsync.CallbackHandler处理,与NMClientAsync相关联。典型的回调处理程序处理客户端启动,停止,状态更新和错误。 ApplicationMaster还通过处理AMRMClientAsync.CallbackHandler的getProgress()方法向ResourceManager报告执行进度。
除了异步客户端以外,还有某些工作流程的同步版本(AMRMClient和NMClient)。由于简单的用法,推荐使用异步客户端,本文主要介绍异步客户端。有关同步客户端的更多信息,请参阅AMRMClient和NMClient。
接口
以下是重要的接口:
Client< – >的ResourceManager
通过使用YarnClient对象。
ApplicationMaster< – >ResourceManager
通过使用AMRMClientAsync对象,通过AMRMClientAsync.CallbackHandler异步处理事件
ApplicationMaster< – > NodeManager
启动容器。 通过使用NMClientAsync对象与NodeManager进行通信,通过NMClientAsync.CallbackHandler处理容器事件
注意
YARN应用程序的三个主要协议(ApplicationClientProtocol,ApplicationMasterProtocol和ContainerManagementProtocol)仍然保留。 这3个客户端包装这3个协议,为YARN应用程序提供更简单的编程模型。
YARN 协议介绍
ApplicationClientProtocol
客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序,群集度量,节点,队列和ACL的信息。
主要方法如下:
ApplicationConstants
这是包含YARN为应用程序和容器设置的常量的应用程序的API。
ApplicationHistoryProtocol
客户端和ApplicationHistoryServer之间的协议,以获取已完成应用程序的信息等。
ApplicationMasterProtocol
ApplicationMaster的活动实例和ResourceManager之间的协议。
主要方法如下:
ClientSCMProtocol
客户端和SharedCacheManager之间的协议声明并释放共享缓存中的资源。
ContainerManagementProtocol
ApplicationMaster和NodeManager之间的协议,用于启动/停止容器并获取正在运行的容器的状态。
主要方法如下:
详细API 介绍,可以参见HADOOP 官网。
https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/yarn/api/package-summary.html
编写简单的YARN应用程序
写简单的客户端
客户端需要做的第一步是初始化并启动一个YarnClient。
YarnClient yarnClient = YarnClient.createYarnClient(); yarnClient.init(conf); yarnClient.start();
一旦建立了客户端,客户端需要创建一个应用程序并获取其应用程序ID。
YarnClientApplication app = yarnClient.createApplication(); GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
YarnClientApplication对新应用程序的响应还包含有关集群的信息,例如集群的最小/最大资源能力。 这是必需的,以确保您可以正确设置将启动ApplicationMaster的容器的规格。 有关更多详细信息,请参阅GetNewApplicationResponse。
客户端的主要关键是设置ApplicationSubmissionContext,它定义了RM启动AM所需的所有信息。 客户需要在上下文中设置以下内容:
应用程序信息:ID,名称
队列,优先级信息:将向其提交应用程序的队列以及为应用程序分配的优先级。
用户:提交应用程序的用户
ContainerLaunchContext:定义将在其中启动并运行AM的容器的信息。 如前所述,ContainerLaunchContext定义了运行应用程序所需的所有必需信息,例如本地资源(二进制文件,jar文件等),环境设置(CLASSPATH等),要执行的命令和安全Token等。
// set the application submission context ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext(); ApplicationId appId = appContext.getApplicationId(); appContext.setKeepContainersAcrossApplicationAttempts(keepContainers); appContext.setApplicationName(appName); // set local resources for the application master // local files or archives as needed // In this scenario, the jar file for the application master is part of the local resources Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); LOG.info("Copy App Master jar from local filesystem and add to local environment"); // Copy the application master jar to the filesystem // Create a local resource to point to the destination jar path FileSystem fs = FileSystem.get(conf); addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(), localResources, null); // Set the log4j properties if needed if (!log4jPropFile.isEmpty()) { addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(), localResources, null); } // The shell script has to be made available on the final container(s) // where it will be executed. // To do this, we need to first copy into the filesystem that is visible // to the yarn framework. // We do not need to set this as a local resource for the application // master as the application master does not need it. String hdfsShellScriptLocation = ""; long hdfsShellScriptLen = 0; long hdfsShellScriptTimestamp = 0; if (!shellScriptPath.isEmpty()) { Path shellSrc = new Path(shellScriptPath); String shellPathSuffix = appName + "/" + appId.toString() + "/" + SCRIPT_PATH; Path shellDst = new Path(fs.getHomeDirectory(), shellPathSuffix); fs.copyFromLocalFile(false, true, shellSrc, shellDst); hdfsShellScriptLocation = shellDst.toUri().toString(); FileStatus shellFileStatus = fs.getFileStatus(shellDst); hdfsShellScriptLen = shellFileStatus.getLen(); hdfsShellScriptTimestamp = shellFileStatus.getModificationTime(); } if (!shellCommand.isEmpty()) { addToLocalResources(fs, null, shellCommandPath, appId.toString(), localResources, shellCommand); } if (shellArgs.length > 0) { addToLocalResources(fs, null, shellArgsPath, appId.toString(), localResources, StringUtils.join(shellArgs, " ")); } // Set the env variables to be setup in the env where the application master will be run LOG.info("Set the environment for the application master"); Map<String, String> env = new HashMap<String, String>(); // put location of shell script into env // using the env info, the application master will create the correct local resource for the // eventual containers that will be launched to execute the shell scripts env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp)); env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen)); // Add AppMaster.jar location to classpath // At some point we should not be required to add // the hadoop specific classpaths to the env. // It should be provided out of the box. // For now setting all required classpaths including // the classpath to "." for the application jar StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$()) .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*"); for (String c : conf.getStrings( YarnConfiguration.YARN_APPLICATION_CLASSPATH, YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) { classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR); classPathEnv.append(c.trim()); } classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append( "./log4j.properties"); // Set the necessary command to execute the application master Vector<CharSequence> vargs = new Vector<CharSequence>(30); // Set java executable command LOG.info("Setting up app master command"); vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); // Set Xmx based on am memory size vargs.add("-Xmx" + amMemory + "m"); // Set class name vargs.add(appMasterMainClass); // Set params for Application Master vargs.add("--container_memory " + String.valueOf(containerMemory)); vargs.add("--container_vcores " + String.valueOf(containerVirtualCores)); vargs.add("--num_containers " + String.valueOf(numContainers)); vargs.add("--priority " + String.valueOf(shellCmdPriority)); for (Map.Entry<String, String> entry : shellEnv.entrySet()) { vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue()); } if (debugFlag) { vargs.add("--debug"); } vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } LOG.info("Completed setting up app master command " + command.toString()); List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up the container launch context for the application master ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance( localResources, env, commands, null, null, null); // Set up resource type requirements // For now, both memory and vcores are supported, so we set memory and // vcores requirements Resource capability = Resource.newInstance(amMemory, amVCores); appContext.setResource(capability); // Service data is a binary blob that can be passed to the application // Not needed in this scenario // amContainer.setServiceData(serviceData); // Setup security tokens if (UserGroupInformation.isSecurityEnabled()) { // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce Credentials credentials = new Credentials(); String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL); if (tokenRenewer == null | | tokenRenewer.length() == 0) { throw new IOException( "Can't get Master Kerberos principal for the RM to use as renewer"); } // For now, only getting tokens for the default file-system. final Token<?> tokens[] = fs.addDelegationTokens(tokenRenewer, credentials); if (tokens != null) { for (Token<?> token : tokens) { LOG.info("Got dt for " + fs.getUri() + "; " + token); } } DataOutputBuffer dob = new DataOutputBuffer(); credentials.writeTokenStorageToStream(dob); ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); amContainer.setTokens(fsTokens); } appContext.setAMContainerSpec(amContainer);
上述过程完成后,客户端准备提交具有指定优先级和队列的应用程序。
// Set the priority for the application master Priority pri = Priority.newInstance(amPriority); appContext.setPriority(pri); // Set the queue to which this application is to be submitted in the RM appContext.setQueue(amQueue); // Submit the application to the applications manager // SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest); yarnClient.submitApplication(appContext);
此时,RM将接受应用程序,并且在后台将通过分配具有所需指定的容器的过程,然后最终在分配的容器上设置和启动AM。
客户可以通过多种方式跟踪实际任务的进度。
它可以与RM进行通信,并通过YarnClient的getApplicationReport()方法请求应用程序的报告。
// Get application report for the appId we are interested in ApplicationReport report = yarnClient.getApplicationReport(appId);
从RM收到的ApplicationReport包含以下内容:
一般应用程序信息:应用程序ID,应用程序提交到的队列,提交应用程序的用户以及应用程序的开始时间。
ApplicationMaster详细信息:运行AM的主机,侦听客户端请求的rpc端口(如果有)以及客户端需要与AM进行通信的令牌。
应用程序跟踪信息:如果应用程序支持某种形式的进度跟踪,则可以通过ApplicationReport的getTrackingUrl()方法设置一个跟踪url,客户端可以通过该方法来监视进度。
应用程序状态:ResourceManager所看到的应用程序的状态可通过ApplicationReport#getYarnApplicationState获得。如果YarnApplicationState设置为FINISHED,则客户端应引用ApplicationReport#getFinalApplicationStatus来检查应用程序任务本身的实际成功/失败。如果出现故障,ApplicationReport#getDiagnostics可能会对失败情况有所帮助。
如果ApplicationMaster支持,客户端可以通过主机从应用程序报告中获得的rpcport信息直接查询AM本身的进度更新。 如果可用,它也可以使用从报告中获取的跟踪网址。
在某些情况下,如果应用程序耗时过长或者由于其他因素,客户可能希望终止该应用程序。 YarnClient支持killApplication调用,该调用允许客户端通过ResourceManager向AM发送终止信号。
yarnClient.killApplication(appId);
编写ApplicationMaster(AM)
AM是工作的实际所有者。它将由RM启动,并通过客户端提供所有必要的信息和资源,它负责监督和完成任务。
由于AM是在可能与其他容器共享物理主机的容器内启动的,考虑到多租户特性以及其他问题,它不能假设对任何预配置端口进行监听。
AM启动时,通过系统环境可以使用多个参数。其中包括AM容器的ContainerId,应用程序提交时间以及运行ApplicationMaster的NM(NodeManager)主机的详细信息。参考ApplicationConstants的参数名称。
与RM的所有交互都需要一个ApplicationAttemptId(如果发生故障,每个应用程序可能会有多次尝试)。 ApplicationAttemptId可以从AM的容器ID中获得。有API将从环境获得的值转换为对象。
Map<String, String> envs = System.getenv(); String containerIdString = envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV); if (containerIdString == null) { // container id should always be set in the env by the framework throw new IllegalArgumentException( "ContainerId not set in the environment"); } ContainerId containerId = ConverterUtils.toContainerId(containerIdString); ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();
AM完成初始化后,我们可以启动两个客户端:一个到ResourceManager,一个到NodeManagers。 我们用我们自定义的事件处理程序 (event handler)设置它们,我们将在本文后面详细讨论这些事件处理程序。
AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler(); amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener); amRMClient.init(conf); amRMClient.start(); containerListener = createNMCallbackHandler(); nmClientAsync = new NMClientAsyncImpl(containerListener); nmClientAsync.init(conf); nmClientAsync.start();
AM必须向RM发出心跳信号,以通知AM已处于活动状态并仍在运行。 RM的超时时间间隔由可通过YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS访问的配置设置定义,默认情况下由YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS定义。 ApplicationMaster需要向ResourceManager注册以开始心跳。
// Register self with ResourceManager // This will start heartbeating to the RM appMasterHostname = NetUtils.getHostname(); RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl);
在注册的响应中,包含最大资源能力。 你可能想用它来检查应用程序的请求。
// Dump out information about cluster capability as seen by the // resource manager int maxMem = response.getMaximumResourceCapability().getMemory(); LOG.info("Max mem capability of resources in this cluster " + maxMem); int maxVCores = response.getMaximumResourceCapability().getVirtualCores(); LOG.info("Max vcores capability of resources in this cluster " + maxVCores); // A resource ask cannot exceed the max. if (containerMemory > maxMem) { LOG.info("Container memory specified above max threshold of cluster." + " Using max value." + ", specified=" + containerMemory + ", max=" + maxMem); containerMemory = maxMem; } if (containerVirtualCores > maxVCores) { LOG.info("Container virtual cores specified above max threshold of cluster." + " Using max value." + ", specified=" + containerVirtualCores + ", max=" + maxVCores); containerVirtualCores = maxVCores; } List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration.");
根据任务要求,AM可以要求一组容器来运行其任务。 现在我们可以计算出我们需要多少个容器,并请求这些容器。
List<Container> previousAMRunningContainers = response.getContainersFromPreviousAttempts(); LOG.info("Received " + previousAMRunningContainers.size() + " previous AM's running containers on AM registration."); int numTotalContainersToRequest = numTotalContainers - previousAMRunningContainers.size(); // Setup ask for containers from RM // Send request for containers to RM // Until we get our fully allocated quota, we keep on polling RM for // containers // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); }
在setupContainerAskForRM()中,需要设置以下两件事情:
资源能力:目前,YARN支持基于内存的资源需求,因此请求应该定义需要多少内存。 该值以MB为单位定义,并且必须小于群集的最大容量和最小容量的精确倍数。 内存资源与在任务容器上的物理内存限制相对应。 它还将支持基于计算的资源(vCore),如代码所示。
优先级:当询问一组容器时,AM可以为每组定义不同的优先级。 例如,Map-Reduce AM可以为Map任务所需的容器分配更高的优先级,并为Reduce任务的容器分配更低的优先级。
private ContainerRequest setupContainerAskForRM() { // setup requirements for hosts // using * as any host will do for the distributed shell app // set the priority for the request Priority pri = Priority.newInstance(requestPriority); // Set up resource type requirements // For now, memory and CPU are supported so we set memory and cpu requirements Resource capability = Resource.newInstance(containerMemory, containerVirtualCores); ContainerRequest request = new ContainerRequest(capability, null, null, pri); LOG.info("Requested container ask: " + request.toString()); return request; }
在应用程序管理器发送容器分配请求后,容器将通过AMRMClientAsync客户端的事件处理程序异步启动。 处理程序应该实现AMRMClientAsync.CallbackHandler接口。
在分配容器时,处理程序会设置一个运行代码以启动容器的线程。 这里我们使用名称LaunchContainerRunnable来演示。 我们将在本文的下一部分讨论LaunchContainerRunnable类。
@Override public void onContainersAllocated(List<Container> allocatedContainers) { LOG.info("Got response from RM for container ask, allocatedCnt=" + allocatedContainers.size()); numAllocatedContainers.addAndGet(allocatedContainers.size()); for (Container allocatedContainer : allocatedContainers) { LaunchContainerRunnable runnableLaunchContainer = new LaunchContainerRunnable(allocatedContainer, containerListener); Thread launchThread = new Thread(runnableLaunchContainer); // launch and start the container on a separate thread to keep // the main thread unblocked // as all containers may not be allocated at one go. launchThreads.add(launchThread); launchThread.start(); } }
在心跳时,事件处理程序报告应用程序的进度。
@Override public float getProgress() { // set progress to deliver to RM on next heartbeat float progress = (float) numCompletedContainers.get() / numTotalContainers; return progress; }
容器启动线程实际上启动NM上的容器。 在将容器分配给AM后,它需要遵循类似的过程,以便客户端为将要在分配的容器上运行的最终任务设置ContainerLaunchContext。 一旦定义了ContainerLaunchContext,AM可以通过NMClientAsync启动它。
// Set the necessary command to execute on the allocated container Vector<CharSequence> vargs = new Vector<CharSequence>(5); // Set executable command vargs.add(shellCommand); // Set shell script path if (!scriptPath.isEmpty()) { vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath : ExecShellStringPath); } // Set args for the shell command if any vargs.add(shellArgs); // Add log redirect params vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout"); vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); // Get final command StringBuilder command = new StringBuilder(); for (CharSequence str : vargs) { command.append(str).append(" "); } List<String> commands = new ArrayList<String>(); commands.add(command.toString()); // Set up ContainerLaunchContext, setting local resource, environment, // command and token for constructor. // Note for tokens: Set up tokens for the container too. Today, for normal // shell commands, the container in distribute-shell doesn't need any // tokens. We are populating them mainly for NodeManagers to be able to // download anyfiles in the distributed file-system. The tokens are // otherwise also useful in cases, for e.g., when one is running a // "hadoop dfs" command inside the distributed shell. ContainerLaunchContext ctx = ContainerLaunchContext.newInstance( localResources, shellEnv, commands, null, allTokens.duplicate(), null); containerListener.addContainer(container.getId(), container); nmClientAsync.startContainerAsync(container, ctx);
NMClientAsync对象与其事件处理程序一起处理容器事件。 包括容器启动,停止,状态更新,发生错误等。
在ApplicationMaster确定工作完成后,它需要通过AM-RM客户端注销自己,然后停止客户端。
try { amRMClient.unregisterApplicationMaster(appStatus, appMessage, null); } catch (YarnException ex) { LOG.error("Failed to unregister application", ex); } catch (IOException e) { LOG.error("Failed to unregister application", e); } amRMClient.stop();
FAQ
如何将应用程序的jar包分发到需要它的YARN群集中的所有节点?
您可以使用LocalResource将资源添加到您的应用程序请求中。 这将导致YARN将资源分发给ApplicationMaster节点。 如果资源是tgz,zip或jar – 您可以用YARN解压缩。 然后,您只需将解压缩的文件夹添加到您的类路径中即可。 例如,在创建您的应用程序请求时:
File packageFile = new File(packagePath); URL packageUrl = ConverterUtils.getYarnUrlFromPath( FileContext.getFileContext().makeQualified(new Path(packagePath))); packageResource.setResource(packageUrl); packageResource.setSize(packageFile.length()); packageResource.setTimestamp(packageFile.lastModified()); packageResource.setType(LocalResourceType.ARCHIVE); packageResource.setVisibility(LocalResourceVisibility.APPLICATION); resource.setMemory(memory); containerCtx.setResource(resource); containerCtx.setCommands(ImmutableList.of( "java -cp './package/*' some.class.to.Run " + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout " + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")); containerCtx.setLocalResources( Collections.singletonMap("package", packageResource)); appCtx.setApplicationId(appId); appCtx.setUser(user.getShortUserName); appCtx.setAMContainerSpec(containerCtx); yarnClient.submitApplication(appCtx);
如您所见,setLocalResources命令将名称映射到资源。可以使用./package/*引用内部工件。
注意:Java的类路径(cp)参数非常敏感。 确保语法完全正确。
一旦你的软件包被分发到你的AM中,每当你的AM开始一个新的容器时(假设你想把资源发送到你的容器),需要遵循相同的过程。 这个代码是一样的。 您只需确保为AM提供包路径(HDFS或本地),以便它可以将资源URL与容器ctx一起发送。
如何获得ApplicationMaster的ApplicationAttemptId?
ApplicationAttemptId将通过环境传递给AM,并且来自环境的值可以通过ConverterUtils辅助函数转换为ApplicationAttemptId对象。
为什么容器被NodeManager杀死了?
这很可能是由于内存使用量超出了请求的容器内存大小。有很多原因可能导致这种情况。首先,查看NodeManager在杀死容器时存储的进程树。你感兴趣的两件事是物理内存和虚拟内存。如果您超出了物理内存限制,则您的应用使用的物理内存太多。如果您正在运行Java应用程序,则可以使用-hprof来查看占用堆空间的内容。如果超出虚拟内存,则可能需要增加群集范围配置变量yarn.nodemanager.vmem-pmem-ratio的值。
如何包含本地库?
在启动容器时在命令行上设置-Djava.library.path可能会导致Hadoop使用的本机库无法正确加载,并可能导致错误。用LD_LIBRARY_PATH代替它更清洁。