自定义分布式shell对应的ApplicationMaster

肖钟城
  • 大数据技术栈
  • Hadoop
大约 7 分钟

自定义分布式shell对应的ApplicationMaster

ResourceManager在container中启动ApplicationMaster,Applicationmaster第一件需要做的事就是向ResourceManager注册自己。注册信息包括ApplicationMaster提供给外部访问的端口,但是在这边的分布式shell中,没有设置该端口信息。

与此同时,ApplicationMaster需要定时向ResourceManager发送心跳,以表示其处于活动状态。AppliactionMasterProtocol#allocate用于发送心跳信息。

对于一个任务,ApplicationMaster需要通过AllocateRequest来向ResourceManager请求container数量,使用ResourceRequest来请求像内存、磁盘、CPU等资源。ResourceManager返回AllocateResponse来告诉ApplicationMaster新分配的containers以及相关状态。

对于每一个分配的container,ApplicationMaster可以通过ContainerLaunchContext来设置必要的信息,比如说本地资源、环境变量、待执行的命令等。然后提交一个StartContainerRequest给ContainerManagementProtocol来在相应的container中执行命令。

ApplicationMaster可以通过ResourceManager的AppliactionMasterProtocol#allocate或者ContainerManagementProtocol来监控运行状态的contianer。

在任务执行完成之后,ApplicationMaster需要发送一个FinishApplicationMasterRequest给ResourceManager来通知ApplicationMaster已经执行完成。

从ApplicaitonMaster入手

该AM分为三部分,第一部分为参数初始化、第二部分为运行阶段、第三部分为AM执行结束获取结果阶段。

  /**
   * @param args Command line args
   */
  public static void main(String[] args) {
    boolean result = false;
    try {
      ApplicationMaster appMaster = new ApplicationMaster();
      LOG.info("Initializing ApplicationMaster");
      boolean doRun = appMaster.init(args);
      if (!doRun) {
        System.exit(0);
      }
      appMaster.run();
      result = appMaster.finish();
    } catch (Throwable t) {
      LOG.error("Error running ApplicationMaster", t);
      LogManager.shutdown();
      ExitUtil.terminate(1, t);
    }
    if (result) {
      LOG.info("Application Master completed successfully. exiting");
      System.exit(0);
    } else {
      LOG.error("Application Master failed. exiting");
      System.exit(2);
    }
  }

参数初始化

该AM接受的参数为:

usage: ApplicationMaster
 -app_attempt_id <arg>                          App Attempt ID. Not to be
                                                used unless for testing
                                                purposes
 -container_failures_validity_interval <arg>    Failures which are out of
                                                the time window will not
                                                be added to the number of
                                                container retry attempts
 -container_max_retries <arg>                   If container could retry,
                                                it specifies max retires
 -container_memory <arg>                        Amount of memory in MB to
                                                be requested to run the
                                                shell command
 -container_resource_profile <arg>              Resource profile to be
                                                requested to run the shell
                                                command
 -container_resources <arg>                     Amount of resources to be
                                                requested to run the shell
                                                command. Specified as
                                                resource type=value pairs
                                                separated by commas. E.g.
                                                -container_resources
                                                memory-mb=512,vcores=1
 -container_retry_error_codes <arg>             When retry policy is set
                                                to
                                                RETRY_ON_SPECIFIC_ERROR_CO
                                                DES, error codes is
                                                specified with this
                                                option, e.g.
                                                --container_retry_error_co
                                                des 1,2,3
 -container_retry_interval <arg>                Interval between each
                                                retry, unit is
                                                milliseconds
 -container_retry_policy <arg>                  Retry policy when
                                                container fails to run, 0:
                                                NEVER_RETRY, 1:
                                                RETRY_ON_ALL_ERRORS, 2:
                                                RETRY_ON_SPECIFIC_ERROR_CO
                                                DES
 -container_type <arg>                          Container execution type,
                                                GUARANTEED or
                                                OPPORTUNISTIC
 -container_vcores <arg>                        Amount of virtual cores to
                                                be requested to run the
                                                shell command
 -debug                                         Dump out debug information
 -enforce_execution_type                        Flag to indicate whether
                                                to enforce execution type
                                                of containers
 -help                                          Print usage
 -keep_containers_across_application_attempts   Flag to indicate whether
                                                to keep containers across
                                                application attempts. If
                                                the flag is true, running
                                                containers will not be
                                                killed when application
                                                attempt fails and these
                                                containers will be
                                                retrieved by the new
                                                application attempt
 -num_containers <arg>                          No. of containers on which
                                                the shell command needs to
                                                be executed
 -placement_spec <arg>                          Placement specification
 -priority <arg>                                Application Priority.
                                                Default 0
 -promote_opportunistic_after_start             Flag to indicate whether
                                                to automatically promote
                                                opportunistic containers
                                                to guaranteed.
 -shell_env <arg>                               Environment for shell
                                                script. Specified as
                                                env_key=env_val pairs

ApplicationMaster执行阶段

接下来看一下run方法

首先,获取执行当前任务提交的用户的信息:

    // Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
    // are marked as LimitedPrivate
    Credentials credentials =
        UserGroupInformation.getCurrentUser().getCredentials();
    DataOutputBuffer dob = new DataOutputBuffer();
    credentials.writeTokenStorageToStream(dob);
    // Now remove the AM->RM token so that containers cannot access it.
    Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
    LOG.info("Executing with tokens:");
    while (iter.hasNext()) {
      Token<?> token = iter.next();
      LOG.info(token.toString());
      if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
        iter.remove();
      }
    }
    allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());

    // Create appSubmitterUgi and add original tokens to it
    String appSubmitterUserName =
        System.getenv(ApplicationConstants.Environment.USER.name());
    appSubmitterUgi =
        UserGroupInformation.createRemoteUser(appSubmitterUserName);
    appSubmitterUgi.addCredentials(credentials);

紧接着创建两个客户端,一个用于与ResourceManager进行交互、一个用于与NodeManager进行交互。

    AMRMClientAsync.AbstractCallbackHandler allocListener =
        new RMCallbackHandler();
    amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
    amRMClient.init(conf);
    amRMClient.start();

    containerListener = createNMCallbackHandler();
    nmClientAsync = new NMClientAsyncImpl(containerListener);
    nmClientAsync.init(conf);
    nmClientAsync.start();

AM必须向ResourceManager注册心跳,以证明其处于活动状态。

    // Setup local RPC Server to accept status requests directly from clients
    // TODO need to setup a protocol for client to be able to communicate to
    // the RPC server
    // TODO use the rpc port info to register with the RM for the client to
    // send requests to this app master

    // Register self with ResourceManager
    // This will start heartbeating to the RM
    appMasterHostname = NetUtils.getHostname();
    Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
    if (this.placementSpecs != null) {
      placementConstraintMap = new HashMap<>();
      for (PlacementSpec spec : this.placementSpecs.values()) {
        if (spec.constraint != null) {
          Set<String> allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ?
              Collections.emptySet() : Collections.singleton(spec.sourceTag);
          placementConstraintMap.put(allocationTags, spec.constraint);
        }
      }
    }

    RegisterApplicationMasterResponse response = amRMClient
        .registerApplicationMaster(appMasterHostname, appMasterRpcPort,
            appMasterTrackingUrl, placementConstraintMap);
    resourceProfiles = response.getResourceProfiles();
    ResourceUtils.reinitializeResources(response.getResourceTypes());

通过心跳信息获取集群资源,并对照本次任务要求,如果本次任务设置的资源大于集群资源,则将本次任务设置的资源为集群最大资源

    // Dump out information about cluster capability as seen by the
    // resource manager
    long maxMem = response.getMaximumResourceCapability().getMemorySize();
    LOG.info("Max mem capability of resources in this cluster " + maxMem);
    
    int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
    LOG.info("Max vcores capability of resources in this cluster " + maxVCores);

    // A resource ask cannot exceed the max.
    if (containerMemory > maxMem) {
      LOG.info("Container memory specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerMemory + ", max="
          + maxMem);
      containerMemory = maxMem;
    }

    if (containerVirtualCores > maxVCores) {
      LOG.info("Container virtual cores specified above max threshold of cluster."
          + " Using max value." + ", specified=" + containerVirtualCores + ", max="
          + maxVCores);
      containerVirtualCores = maxVCores;
    }

AM根据任务要求来请求一组容器来运行其任务,我们可以计算我们需要多少容器,并请求那些容器。

setupContainerAskFroRM()方法用于设置container请求,其中需要设置两个参数,分别是资源数量以及任务优先级。

    int numTotalContainersToRequest =
        numTotalContainers - previousAMRunningContainers.size();
    // Setup ask for containers from RM
    // Send request for containers to RM
    // Until we get our fully allocated quota, we keep on polling RM for
    // containers
    // Keep looping until all the containers are launched and shell script
    // executed on them ( regardless of success/failure).
    if (this.placementSpecs == null) {
      LOG.info("placementSpecs null");
      for (int i = 0; i < numTotalContainersToRequest; ++i) {
        ContainerRequest containerAsk = setupContainerAskForRM();
        amRMClient.addContainerRequest(containerAsk);
      }
    } else {
      LOG.info("placementSpecs to create req:" + placementSpecs);
      List<SchedulingRequest> schedReqs = new ArrayList<>();
      for (PlacementSpec pSpec : this.placementSpecs.values()) {
        LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
            .getNumContainers());
        for (int i = 0; i < pSpec.getNumContainers(); i++) {
          SchedulingRequest sr = setupSchedulingRequest(pSpec);
          schedReqs.add(sr);
        }
      }
      amRMClient.addSchedulingRequests(schedReqs);
    }

在应用程序管理器发送容器分配请求后,AMRMClientAsync客户端的事件处理程序将异步启动容器。处理程序应实现AMRMClientAsync.CallbackHandler接口。

当分配了容器时,处理程序会设置一个线程来运行代码以启动容器。

    @Override
    public void onContainersAllocated(List<Container> allocatedContainers) {
      LOG.info("Got response from RM for container ask, allocatedCnt="
          + allocatedContainers.size());
      for (Container allocatedContainer : allocatedContainers) {
        if (numAllocatedContainers.get() == numTotalContainers) {
          LOG.info("The requested number of containers have been allocated."
              + " Releasing the extra container allocation from the RM.");
          amRMClient.releaseAssignedContainer(allocatedContainer.getId());
        } else {
          numAllocatedContainers.addAndGet(1);
          String yarnShellId = Integer.toString(yarnShellIdCounter);
          yarnShellIdCounter++;
          LOG.info(
              "Launching shell command on a new container."
                  + ", containerId=" + allocatedContainer.getId()
                  + ", yarnShellId=" + yarnShellId
                  + ", containerNode="
                  + allocatedContainer.getNodeId().getHost()
                  + ":" + allocatedContainer.getNodeId().getPort()
                  + ", containerNodeURI="
                  + allocatedContainer.getNodeHttpAddress()
                  + ", containerResourceMemory"
                  + allocatedContainer.getResource().getMemorySize()
                  + ", containerResourceVirtualCores"
                  + allocatedContainer.getResource().getVirtualCores());

          Thread launchThread =
              createLaunchContainerThread(allocatedContainer, yarnShellId);

          // launch and start the container on a separate thread to keep
          // the main thread unblocked
          // as all containers may not be allocated at one go.
          launchThreads.add(launchThread);
          launchedContainers.add(allocatedContainer.getId());
          launchThread.start();

          // Remove the corresponding request
          Collection<AMRMClient.ContainerRequest> requests =
              amRMClient.getMatchingRequests(
                  allocatedContainer.getAllocationRequestId());
          if (requests.iterator().hasNext()) {
            AMRMClient.ContainerRequest request = requests.iterator().next();
            amRMClient.removeContainerRequest(request);
          }
        }
      }
    }

在心跳时,事件处理程序报告应用程序的进度。

    @Override
    public float getProgress() {
      // set progress to deliver to RM on next heartbeat
      float progress = (float) numCompletedContainers.get()
          / numTotalContainers;
      return progress;
    }

容器启动线程实际上是在 NM 上启动容器。将容器分配给 AM 后,它需要遵循客户端在为将在分配的容器上运行的最终任务设置ContainerLaunchContext 时遵循的类似过程。一旦定义了ContainerLaunchContext,AM 就可以通过NMClientAsync启动它。

下属run方法在自定义实现RMCallbackHandler类中

    @Override
    /**
     * Connects to CM, sets up container launch context 
     * for shell command and eventually dispatches the container 
     * start request to the CM. 
     */
    public void run() {
      LOG.info("Setting up container launch container for containerid="
          + container.getId() + " with shellid=" + shellId);

      // Set the local resources
      Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();

      // The container for the eventual shell commands needs its own local
      // resources too.
      // In this scenario, if a shell script is specified, we need to have it
      // copied and made available to the container.
      if (!scriptPath.isEmpty()) {
        Path renamedScriptPath = null;
        if (Shell.WINDOWS) {
          renamedScriptPath = new Path(scriptPath + ".bat");
        } else {
          renamedScriptPath = new Path(scriptPath + ".sh");
        }

        try {
          // rename the script file based on the underlying OS syntax.
          renameScriptFile(renamedScriptPath);
        } catch (Exception e) {
          LOG.error(
              "Not able to add suffix (.bat/.sh) to the shell script filename",
              e);
          // We know we cannot continue launching the container
          // so we should release it.
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();
          return;
        }

        URL yarnUrl = null;
        try {
          yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
        } catch (URISyntaxException e) {
          LOG.error("Error when trying to use shell script path specified"
              + " in env, path=" + renamedScriptPath, e);
          // A failure scenario on bad input such as invalid shell script path
          // We know we cannot continue launching the container
          // so we should release it.
          // TODO
          numCompletedContainers.incrementAndGet();
          numFailedContainers.incrementAndGet();
          return;
        }
        LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
          LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
          shellScriptPathLen, shellScriptPathTimestamp);
        localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
            EXEC_SHELL_STRING_PATH, shellRsrc);
        shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
      }

      // Set the necessary command to execute on the allocated container
      Vector<CharSequence> vargs = new Vector<CharSequence>(5);

      // Set executable command
      vargs.add(shellCommand);
      // Set shell script path
      if (!scriptPath.isEmpty()) {
        vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
            : EXEC_SHELL_STRING_PATH);
      }

      // Set args for the shell command if any
      vargs.add(shellArgs);
      // Add log redirect params
      vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
      vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");

      // Get final commmand
      StringBuilder command = new StringBuilder();
      for (CharSequence str : vargs) {
        command.append(str).append(" ");
      }

      List<String> commands = new ArrayList<String>();
      commands.add(command.toString());

      // Set up ContainerLaunchContext, setting local resource, environment,
      // command and token for constructor.

      // Note for tokens: Set up tokens for the container too. Today, for normal
      // shell commands, the container in distribute-shell doesn't need any
      // tokens. We are populating them mainly for NodeManagers to be able to
      // download anyfiles in the distributed file-system. The tokens are
      // otherwise also useful in cases, for e.g., when one is running a
      // "hadoop dfs" command inside the distributed shell.
      Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
      myShellEnv.put(YARN_SHELL_ID, shellId);
      ContainerRetryContext containerRetryContext =
          ContainerRetryContext.newInstance(
              containerRetryPolicy, containerRetryErrorCodes,
              containerMaxRetries, containrRetryInterval,
              containerFailuresValidityInterval);
      ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
        localResources, myShellEnv, commands, null, allTokens.duplicate(),
          null, containerRetryContext);
      containerListener.addContainer(container.getId(), container);
      nmClientAsync.startContainerAsync(container, ctx);
    }

该NMClientAsync对象,与事件处理一起,处理容器事件。包括容器启动、停止、状态更新以及发生错误。

ApplicationMaster确定工作完成后,需要通过AM-RM客户端注销自己,然后停止客户端。

    try {
      amRMClient.unregisterApplicationMaster(appStatus, message, null);
    } catch (YarnException | IOException ex) {
      LOG.error("Failed to unregister application", ex);
    }
    amRMClient.stop();
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.14.1