YARN 应用开发

概念和流程

一般的概念是应用程序提交客户端向YARN ResourceManager(RM)提交应用程序。这可以通过设置一个YarnClient对象来完成。在YarnClient启动后,客户端可以设置应用程序上下文,准备包含ApplicationMaster(AM)的应用程序的第一个 运行容器,然后提交应用程序。您需要提供信息,例如需要可供您的应用程序运行的本地文件/ jar的详细信息,需要执行的实际命令(具有必要的命令行参数),任何操作系统环境设置(可选)等等。实际上,您需要描述需要为您的ApplicationMaster启动的Unix进程。

YARN ResourceManager然后将在分配的容器上启动ApplicationMaster(按照指定)。 ApplicationMaster与YARN集群通信,并处理应用程序的执行。它以异步方式执行操作。在应用程序启动期间,ApplicationMaster的主要任务是:a)与ResourceManager进行通信,为将来的容器协商和分配资源,b)在容器分配后,通信YARN * NodeManager *(NM)以在其上启动应用程序容器。可以通过AMRMClientAsync对象异步执行任务a),并使用AMRMClientAsync.CallbackHandler类型的事件处理程序中指定的事件处理方法。事件处理程序需要明确地设置到客户端。任务b)可以通过启动一个可运行的对象来执行再分配的容器中,然后在分配容器时启动容器。作为启动此容器的一部分,AM必须指定具有启动信息(如命令行规范,环境等)的ContainerLaunchContext。

在执行应用程序期间,ApplicationMaster通过NMClientAsync对象与NodeManager进行通信。所有容器事件都由NMClientAsync.CallbackHandler处理,与NMClientAsync相关联。典型的回调处理程序处理客户端启动,停止,状态更新和错误。 ApplicationMaster还通过处理AMRMClientAsync.CallbackHandler的getProgress()方法向ResourceManager报告执行进度。

除了异步客户端以外,还有某些工作流程的同步版本(AMRMClient和NMClient)。由于简单的用法,推荐使用异步客户端,本文主要介绍异步客户端。有关同步客户端的更多信息,请参阅AMRMClient和NMClient。

接口

以下是重要的接口:

Client< – >的ResourceManager

通过使用YarnClient对象。

ApplicationMaster< – >ResourceManager

通过使用AMRMClientAsync对象,通过AMRMClientAsync.CallbackHandler异步处理事件

ApplicationMaster< – > NodeManager

启动容器。 通过使用NMClientAsync对象与NodeManager进行通信,通过NMClientAsync.CallbackHandler处理容器事件

注意

YARN应用程序的三个主要协议(ApplicationClientProtocol,ApplicationMasterProtocol和ContainerManagementProtocol)仍然保留。 这3个客户端包装这3个协议,为YARN应用程序提供更简单的编程模型。

 YARN 协议介绍

ApplicationClientProtocol

客户端和ResourceManager之间的协议,用于提交/中止作业以及获取有关应用程序,群集度量,节点,队列和ACL的信息。

主要方法如下:

1542031698769536.png

ApplicationConstants

这是包含YARN为应用程序和容器设置的常量的应用程序的API。

ApplicationHistoryProtocol

客户端和ApplicationHistoryServer之间的协议,以获取已完成应用程序的信息等。

ApplicationMasterProtocol

ApplicationMaster的活动实例和ResourceManager之间的协议。

主要方法如下:

1542031816288088.png

ClientSCMProtocol

客户端和SharedCacheManager之间的协议声明并释放共享缓存中的资源。

ContainerManagementProtocol

ApplicationMaster和NodeManager之间的协议,用于启动/停止容器并获取正在运行的容器的状态。

主要方法如下:

1542032021671974.png

详细API 介绍,可以参见HADOOP 官网。 

https://hadoop.apache.org/docs/r2.7.2/api/org/apache/hadoop/yarn/api/package-summary.html

编写简单的YARN应用程序

写简单的客户端

客户端需要做的第一步是初始化并启动一个YarnClient。

YarnClient yarnClient = YarnClient.createYarnClient();
  yarnClient.init(conf);
  yarnClient.start();

一旦建立了客户端,客户端需要创建一个应用程序并获取其应用程序ID。

  YarnClientApplication app = yarnClient.createApplication();
  GetNewApplicationResponse appResponse = app.getNewApplicationResponse();

YarnClientApplication对新应用程序的响应还包含有关集群的信息,例如集群的最小/最大资源能力。 这是必需的,以确保您可以正确设置将启动ApplicationMaster的容器的规格。 有关更多详细信息,请参阅GetNewApplicationResponse。

客户端的主要关键是设置ApplicationSubmissionContext,它定义了RM启动AM所需的所有信息。 客户需要在上下文中设置以下内容:

应用程序信息:ID,名称

队列,优先级信息:将向其提交应用程序的队列以及为应用程序分配的优先级。

用户:提交应用程序的用户

ContainerLaunchContext:定义将在其中启动并运行AM的容器的信息。 如前所述,ContainerLaunchContext定义了运行应用程序所需的所有必需信息,例如本地资源(二进制文件,jar文件等),环境设置(CLASSPATH等),要执行的命令和安全Token等。

// set the application submission context
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
// set local resources for the application master
// local files or archives as needed
// In this scenario, the jar file for the application master is part of the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
    localResources, null);
// Set the log4j properties if needed
if (!log4jPropFile.isEmpty()) {
  addToLocalResources(fs, log4jPropFile, log4jPath, appId.toString(),
      localResources, null);
}
// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
  Path shellSrc = new Path(shellScriptPath);
  String shellPathSuffix =
      appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
  Path shellDst =
      new Path(fs.getHomeDirectory(), shellPathSuffix);
  fs.copyFromLocalFile(false, true, shellSrc, shellDst);
  hdfsShellScriptLocation = shellDst.toUri().toString();
  FileStatus shellFileStatus = fs.getFileStatus(shellDst);
  hdfsShellScriptLen = shellFileStatus.getLen();
  hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}
if (!shellCommand.isEmpty()) {
  addToLocalResources(fs, null, shellCommandPath, appId.toString(),
      localResources, shellCommand);
}
if (shellArgs.length > 0) {
  addToLocalResources(fs, null, shellArgsPath, appId.toString(),
      localResources, StringUtils.join(shellArgs, " "));
}
// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
  .append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
    YarnConfiguration.YARN_APPLICATION_CLASSPATH,
    YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
  classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
  classPathEnv.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
  "./log4j.properties");
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
LOG.info("Setting up app master command");
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
vargs.add("--container_memory " + String.valueOf(containerMemory));
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
vargs.add("--num_containers " + String.valueOf(numContainers));
vargs.add("--priority " + String.valueOf(shellCmdPriority));
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
  vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
  vargs.add("--debug");
}
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
  localResources, env, commands, null, null, null);
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
Resource capability = Resource.newInstance(amMemory, amVCores);
appContext.setResource(capability);
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
// Setup security tokens
if (UserGroupInformation.isSecurityEnabled()) {
  // Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
  Credentials credentials = new Credentials();
  String tokenRenewer = conf.get(YarnConfiguration.RM_PRINCIPAL);
  if (tokenRenewer == null | | tokenRenewer.length() == 0) {
    throw new IOException(
      "Can't get Master Kerberos principal for the RM to use as renewer");
  }
  // For now, only getting tokens for the default file-system.
  final Token<?> tokens[] =
      fs.addDelegationTokens(tokenRenewer, credentials);
  if (tokens != null) {
    for (Token<?> token : tokens) {
      LOG.info("Got dt for " + fs.getUri() + "; " + token);
    }
  }
  DataOutputBuffer dob = new DataOutputBuffer();
  credentials.writeTokenStorageToStream(dob);
  ByteBuffer fsTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
  amContainer.setTokens(fsTokens);
}
appContext.setAMContainerSpec(amContainer);

上述过程完成后,客户端准备提交具有指定优先级和队列的应用程序。

// Set the priority for the application master
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
yarnClient.submitApplication(appContext);

此时,RM将接受应用程序,并且在后台将通过分配具有所需指定的容器的过程,然后最终在分配的容器上设置和启动AM。

客户可以通过多种方式跟踪实际任务的进度。

它可以与RM进行通信,并通过YarnClient的getApplicationReport()方法请求应用程序的报告。

// Get application report for the appId we are interested in
ApplicationReport report = yarnClient.getApplicationReport(appId);

从RM收到的ApplicationReport包含以下内容:

一般应用程序信息:应用程序ID,应用程序提交到的队列,提交应用程序的用户以及应用程序的开始时间。

ApplicationMaster详细信息:运行AM的主机,侦听客户端请求的rpc端口(如果有)以及客户端需要与AM进行通信的令牌。

应用程序跟踪信息:如果应用程序支持某种形式的进度跟踪,则可以通过ApplicationReport的getTrackingUrl()方法设置一个跟踪url,客户端可以通过该方法来监视进度。

应用程序状态:ResourceManager所看到的应用程序的状态可通过ApplicationReport#getYarnApplicationState获得。如果YarnApplicationState设置为FINISHED,则客户端应引用ApplicationReport#getFinalApplicationStatus来检查应用程序任务本身的实际成功/失败。如果出现故障,ApplicationReport#getDiagnostics可能会对失败情况有所帮助。

如果ApplicationMaster支持,客户端可以通过主机从应用程序报告中获得的rpcport信息直接查询AM本身的进度更新。 如果可用,它也可以使用从报告中获取的跟踪网址。

在某些情况下,如果应用程序耗时过长或者由于其他因素,客户可能希望终止该应用程序。 YarnClient支持killApplication调用,该调用允许客户端通过ResourceManager向AM发送终止信号。 

yarnClient.killApplication(appId);

编写ApplicationMaster(AM)

AM是工作的实际所有者。它将由RM启动,并通过客户端提供所有必要的信息和资源,它负责监督和完成任务。

由于AM是在可能与其他容器共享物理主机的容器内启动的,考虑到多租户特性以及其他问题,它不能假设对任何预配置端口进行监听。

AM启动时,通过系统环境可以使用多个参数。其中包括AM容器的ContainerId,应用程序提交时间以及运行ApplicationMaster的NM(NodeManager)主机的详细信息。参考ApplicationConstants的参数名称。

与RM的所有交互都需要一个ApplicationAttemptId(如果发生故障,每个应用程序可能会有多次尝试)。 ApplicationAttemptId可以从AM的容器ID中获得。有API将从环境获得的值转换为对象。

Map<String, String> envs = System.getenv();
String containerIdString =
    envs.get(ApplicationConstants.AM_CONTAINER_ID_ENV);
if (containerIdString == null) {
  // container id should always be set in the env by the framework
  throw new IllegalArgumentException(
      "ContainerId not set in the environment");
}
ContainerId containerId = ConverterUtils.toContainerId(containerIdString);
ApplicationAttemptId appAttemptID = containerId.getApplicationAttemptId();

AM完成初始化后,我们可以启动两个客户端:一个到ResourceManager,一个到NodeManagers。 我们用我们自定义的事件处理程序 (event handler)设置它们,我们将在本文后面详细讨论这些事件处理程序。

AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
  amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
  amRMClient.init(conf);
  amRMClient.start();
  containerListener = createNMCallbackHandler();
  nmClientAsync = new NMClientAsyncImpl(containerListener);
  nmClientAsync.init(conf);
  nmClientAsync.start();

AM必须向RM发出心跳信号,以通知AM已处于活动状态并仍在运行。 RM的超时时间间隔由可通过YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS访问的配置设置定义,默认情况下由YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS定义。 ApplicationMaster需要向ResourceManager注册以开始心跳。

// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
RegisterApplicationMasterResponse response = amRMClient
    .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
        appMasterTrackingUrl);

在注册的响应中,包含最大资源能力。 你可能想用它来检查应用程序的请求。

// Dump out information about cluster capability as seen by the
// resource manager
int maxMem = response.getMaximumResourceCapability().getMemory();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
  LOG.info("Container memory specified above max threshold of cluster."
      + " Using max value." + ", specified=" + containerMemory + ", max="
      + maxMem);
  containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
  LOG.info("Container virtual cores specified above max threshold of  cluster."
    + " Using max value." + ", specified=" + containerVirtualCores + ", max="
    + maxVCores);
  containerVirtualCores = maxVCores;
}
List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
        + " previous AM's running containers on AM registration.");

根据任务要求,AM可以要求一组容器来运行其任务。 现在我们可以计算出我们需要多少个容器,并请求这些容器。

List<Container> previousAMRunningContainers =
    response.getContainersFromPreviousAttempts();
LOG.info("Received " + previousAMRunningContainers.size()
    + " previous AM's running containers on AM registration.");
int numTotalContainersToRequest =
    numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
for (int i = 0; i < numTotalContainersToRequest; ++i) {
  ContainerRequest containerAsk = setupContainerAskForRM();
  amRMClient.addContainerRequest(containerAsk);
}

在setupContainerAskForRM()中,需要设置以下两件事情:

资源能力:目前,YARN支持基于内存的资源需求,因此请求应该定义需要多少内存。 该值以MB为单位定义,并且必须小于群集的最大容量和最小容量的精确倍数。 内存资源与在任务容器上的物理内存限制相对应。 它还将支持基于计算的资源(vCore),如代码所示。

优先级:当询问一组容器时,AM可以为每组定义不同的优先级。 例如,Map-Reduce AM可以为Map任务所需的容器分配更高的优先级,并为Reduce任务的容器分配更低的优先级。

private ContainerRequest setupContainerAskForRM() {
  // setup requirements for hosts
  // using * as any host will do for the distributed shell app
  // set the priority for the request
  Priority pri = Priority.newInstance(requestPriority);
  // Set up resource type requirements
  // For now, memory and CPU are supported so we set memory and cpu requirements
  Resource capability = Resource.newInstance(containerMemory,
    containerVirtualCores);
  ContainerRequest request = new ContainerRequest(capability, null, null,
      pri);
  LOG.info("Requested container ask: " + request.toString());
  return request;
}

在应用程序管理器发送容器分配请求后,容器将通过AMRMClientAsync客户端的事件处理程序异步启动。 处理程序应该实现AMRMClientAsync.CallbackHandler接口。

在分配容器时,处理程序会设置一个运行代码以启动容器的线程。 这里我们使用名称LaunchContainerRunnable来演示。 我们将在本文的下一部分讨论LaunchContainerRunnable类。

@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
  LOG.info("Got response from RM for container ask, allocatedCnt="
      + allocatedContainers.size());
  numAllocatedContainers.addAndGet(allocatedContainers.size());
  for (Container allocatedContainer : allocatedContainers) {
    LaunchContainerRunnable runnableLaunchContainer =
        new LaunchContainerRunnable(allocatedContainer, containerListener);
    Thread launchThread = new Thread(runnableLaunchContainer);
    // launch and start the container on a separate thread to keep
    // the main thread unblocked
    // as all containers may not be allocated at one go.
    launchThreads.add(launchThread);
    launchThread.start();
  }
}

在心跳时,事件处理程序报告应用程序的进度。

@Override
public float getProgress() {
  // set progress to deliver to RM on next heartbeat
  float progress = (float) numCompletedContainers.get()
      / numTotalContainers;
  return progress;
}

容器启动线程实际上启动NM上的容器。 在将容器分配给AM后,它需要遵循类似的过程,以便客户端为将要在分配的容器上运行的最终任务设置ContainerLaunchContext。 一旦定义了ContainerLaunchContext,AM可以通过NMClientAsync启动它。

// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!scriptPath.isEmpty()) {
  vargs.add(Shell.WINDOWS ? ExecBatScripStringtPath
    : ExecShellStringPath);
}
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get final command
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
  command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.
// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
  localResources, shellEnv, commands, null, allTokens.duplicate(), null);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);

NMClientAsync对象与其事件处理程序一起处理容器事件。 包括容器启动,停止,状态更新,发生错误等。

在ApplicationMaster确定工作完成后,它需要通过AM-RM客户端注销自己,然后停止客户端。

try {
  amRMClient.unregisterApplicationMaster(appStatus, appMessage, null);
} catch (YarnException ex) {
  LOG.error("Failed to unregister application", ex);
} catch (IOException e) {
  LOG.error("Failed to unregister application", e);
}
amRMClient.stop();

FAQ

如何将应用程序的jar包分发到需要它的YARN群集中的所有节点?

您可以使用LocalResource将资源添加到您的应用程序请求中。 这将导致YARN将资源分发给ApplicationMaster节点。 如果资源是tgz,zip或jar – 您可以用YARN解压缩。 然后,您只需将解压缩的文件夹添加到您的类路径中即可。 例如,在创建您的应用程序请求时:

File packageFile = new File(packagePath);
URL packageUrl = ConverterUtils.getYarnUrlFromPath(
    FileContext.getFileContext().makeQualified(new Path(packagePath)));
packageResource.setResource(packageUrl);
packageResource.setSize(packageFile.length());
packageResource.setTimestamp(packageFile.lastModified());
packageResource.setType(LocalResourceType.ARCHIVE);
packageResource.setVisibility(LocalResourceVisibility.APPLICATION);
resource.setMemory(memory);
containerCtx.setResource(resource);
containerCtx.setCommands(ImmutableList.of(
    "java -cp './package/*' some.class.to.Run "
    + "1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout "
    + "2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"));
containerCtx.setLocalResources(
    Collections.singletonMap("package", packageResource));
appCtx.setApplicationId(appId);
appCtx.setUser(user.getShortUserName);
appCtx.setAMContainerSpec(containerCtx);
yarnClient.submitApplication(appCtx);

如您所见,setLocalResources命令将名称映射到资源。可以使用./package/*引用内部工件。

注意:Java的类路径(cp)参数非常敏感。 确保语法完全正确。

一旦你的软件包被分发到你的AM中,每当你的AM开始一个新的容器时(假设你想把资源发送到你的容器),需要遵循相同的过程。 这个代码是一样的。 您只需确保为AM提供包路径(HDFS或本地),以便它可以将资源URL与容器ctx一起发送。

如何获得ApplicationMaster的ApplicationAttemptId?

ApplicationAttemptId将通过环境传递给AM,并且来自环境的值可以通过ConverterUtils辅助函数转换为ApplicationAttemptId对象。

为什么容器被NodeManager杀死了?

这很可能是由于内存使用量超出了请求的容器内存大小。有很多原因可能导致这种情况。首先,查看NodeManager在杀死容器时存储的进程树。你感兴趣的两件事是物理内存和虚拟内存。如果您超出了物理内存限制,则您的应用使用的物理内存太多。如果您正在运行Java应用程序,则可以使用-hprof来查看占用堆空间的内容。如果超出虚拟内存,则可能需要增加群集范围配置变量yarn.nodemanager.vmem-pmem-ratio的值。

如何包含本地库?

在启动容器时在命令行上设置-Djava.library.path可能会导致Hadoop使用的本机库无法正确加载,并可能导致错误。用LD_LIBRARY_PATH代替它更清洁。

YARN 应用开发

发表评论

邮箱地址不会被公开。 必填项已用*标注

二十 七 ÷ = 3

滚动到顶部