自定义分布式shell对应的ApplicationMaster
自定义分布式shell对应的ApplicationMaster
ResourceManager在container中启动ApplicationMaster,Applicationmaster第一件需要做的事就是向ResourceManager注册自己。注册信息包括ApplicationMaster提供给外部访问的端口,但是在这边的分布式shell中,没有设置该端口信息。
与此同时,ApplicationMaster需要定时向ResourceManager发送心跳,以表示其处于活动状态。AppliactionMasterProtocol#allocate用于发送心跳信息。
对于一个任务,ApplicationMaster需要通过AllocateRequest来向ResourceManager请求container数量,使用ResourceRequest来请求像内存、磁盘、CPU等资源。ResourceManager返回AllocateResponse来告诉ApplicationMaster新分配的containers以及相关状态。
对于每一个分配的container,ApplicationMaster可以通过ContainerLaunchContext来设置必要的信息,比如说本地资源、环境变量、待执行的命令等。然后提交一个StartContainerRequest给ContainerManagementProtocol来在相应的container中执行命令。
ApplicationMaster可以通过ResourceManager的AppliactionMasterProtocol#allocate或者ContainerManagementProtocol来监控运行状态的contianer。
在任务执行完成之后,ApplicationMaster需要发送一个FinishApplicationMasterRequest给ResourceManager来通知ApplicationMaster已经执行完成。
从ApplicaitonMaster入手
该AM分为三部分,第一部分为参数初始化、第二部分为运行阶段、第三部分为AM执行结束获取结果阶段。
/**
* @param args Command line args
*/
public static void main(String[] args) {
boolean result = false;
try {
ApplicationMaster appMaster = new ApplicationMaster();
LOG.info("Initializing ApplicationMaster");
boolean doRun = appMaster.init(args);
if (!doRun) {
System.exit(0);
}
appMaster.run();
result = appMaster.finish();
} catch (Throwable t) {
LOG.error("Error running ApplicationMaster", t);
LogManager.shutdown();
ExitUtil.terminate(1, t);
}
if (result) {
LOG.info("Application Master completed successfully. exiting");
System.exit(0);
} else {
LOG.error("Application Master failed. exiting");
System.exit(2);
}
}
参数初始化
该AM接受的参数为:
usage: ApplicationMaster
-app_attempt_id <arg> App Attempt ID. Not to be
used unless for testing
purposes
-container_failures_validity_interval <arg> Failures which are out of
the time window will not
be added to the number of
container retry attempts
-container_max_retries <arg> If container could retry,
it specifies max retires
-container_memory <arg> Amount of memory in MB to
be requested to run the
shell command
-container_resource_profile <arg> Resource profile to be
requested to run the shell
command
-container_resources <arg> Amount of resources to be
requested to run the shell
command. Specified as
resource type=value pairs
separated by commas. E.g.
-container_resources
memory-mb=512,vcores=1
-container_retry_error_codes <arg> When retry policy is set
to
RETRY_ON_SPECIFIC_ERROR_CO
DES, error codes is
specified with this
option, e.g.
--container_retry_error_co
des 1,2,3
-container_retry_interval <arg> Interval between each
retry, unit is
milliseconds
-container_retry_policy <arg> Retry policy when
container fails to run, 0:
NEVER_RETRY, 1:
RETRY_ON_ALL_ERRORS, 2:
RETRY_ON_SPECIFIC_ERROR_CO
DES
-container_type <arg> Container execution type,
GUARANTEED or
OPPORTUNISTIC
-container_vcores <arg> Amount of virtual cores to
be requested to run the
shell command
-debug Dump out debug information
-enforce_execution_type Flag to indicate whether
to enforce execution type
of containers
-help Print usage
-keep_containers_across_application_attempts Flag to indicate whether
to keep containers across
application attempts. If
the flag is true, running
containers will not be
killed when application
attempt fails and these
containers will be
retrieved by the new
application attempt
-num_containers <arg> No. of containers on which
the shell command needs to
be executed
-placement_spec <arg> Placement specification
-priority <arg> Application Priority.
Default 0
-promote_opportunistic_after_start Flag to indicate whether
to automatically promote
opportunistic containers
to guaranteed.
-shell_env <arg> Environment for shell
script. Specified as
env_key=env_val pairs
ApplicationMaster执行阶段
接下来看一下run方法
首先,获取执行当前任务提交的用户的信息:
// Note: Credentials, Token, UserGroupInformation, DataOutputBuffer class
// are marked as LimitedPrivate
Credentials credentials =
UserGroupInformation.getCurrentUser().getCredentials();
DataOutputBuffer dob = new DataOutputBuffer();
credentials.writeTokenStorageToStream(dob);
// Now remove the AM->RM token so that containers cannot access it.
Iterator<Token<?>> iter = credentials.getAllTokens().iterator();
LOG.info("Executing with tokens:");
while (iter.hasNext()) {
Token<?> token = iter.next();
LOG.info(token.toString());
if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
iter.remove();
}
}
allTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Create appSubmitterUgi and add original tokens to it
String appSubmitterUserName =
System.getenv(ApplicationConstants.Environment.USER.name());
appSubmitterUgi =
UserGroupInformation.createRemoteUser(appSubmitterUserName);
appSubmitterUgi.addCredentials(credentials);
紧接着创建两个客户端,一个用于与ResourceManager进行交互、一个用于与NodeManager进行交互。
AMRMClientAsync.AbstractCallbackHandler allocListener =
new RMCallbackHandler();
amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
amRMClient.init(conf);
amRMClient.start();
containerListener = createNMCallbackHandler();
nmClientAsync = new NMClientAsyncImpl(containerListener);
nmClientAsync.init(conf);
nmClientAsync.start();
AM必须向ResourceManager注册心跳,以证明其处于活动状态。
// Setup local RPC Server to accept status requests directly from clients
// TODO need to setup a protocol for client to be able to communicate to
// the RPC server
// TODO use the rpc port info to register with the RM for the client to
// send requests to this app master
// Register self with ResourceManager
// This will start heartbeating to the RM
appMasterHostname = NetUtils.getHostname();
Map<Set<String>, PlacementConstraint> placementConstraintMap = null;
if (this.placementSpecs != null) {
placementConstraintMap = new HashMap<>();
for (PlacementSpec spec : this.placementSpecs.values()) {
if (spec.constraint != null) {
Set<String> allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ?
Collections.emptySet() : Collections.singleton(spec.sourceTag);
placementConstraintMap.put(allocationTags, spec.constraint);
}
}
}
RegisterApplicationMasterResponse response = amRMClient
.registerApplicationMaster(appMasterHostname, appMasterRpcPort,
appMasterTrackingUrl, placementConstraintMap);
resourceProfiles = response.getResourceProfiles();
ResourceUtils.reinitializeResources(response.getResourceTypes());
通过心跳信息获取集群资源,并对照本次任务要求,如果本次任务设置的资源大于集群资源,则将本次任务设置的资源为集群最大资源
// Dump out information about cluster capability as seen by the
// resource manager
long maxMem = response.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
int maxVCores = response.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max vcores capability of resources in this cluster " + maxVCores);
// A resource ask cannot exceed the max.
if (containerMemory > maxMem) {
LOG.info("Container memory specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerMemory + ", max="
+ maxMem);
containerMemory = maxMem;
}
if (containerVirtualCores > maxVCores) {
LOG.info("Container virtual cores specified above max threshold of cluster."
+ " Using max value." + ", specified=" + containerVirtualCores + ", max="
+ maxVCores);
containerVirtualCores = maxVCores;
}
AM根据任务要求来请求一组容器来运行其任务,我们可以计算我们需要多少容器,并请求那些容器。
setupContainerAskFroRM()方法用于设置container请求,其中需要设置两个参数,分别是资源数量以及任务优先级。
int numTotalContainersToRequest =
numTotalContainers - previousAMRunningContainers.size();
// Setup ask for containers from RM
// Send request for containers to RM
// Until we get our fully allocated quota, we keep on polling RM for
// containers
// Keep looping until all the containers are launched and shell script
// executed on them ( regardless of success/failure).
if (this.placementSpecs == null) {
LOG.info("placementSpecs null");
for (int i = 0; i < numTotalContainersToRequest; ++i) {
ContainerRequest containerAsk = setupContainerAskForRM();
amRMClient.addContainerRequest(containerAsk);
}
} else {
LOG.info("placementSpecs to create req:" + placementSpecs);
List<SchedulingRequest> schedReqs = new ArrayList<>();
for (PlacementSpec pSpec : this.placementSpecs.values()) {
LOG.info("placementSpec :" + pSpec + ", container:" + pSpec
.getNumContainers());
for (int i = 0; i < pSpec.getNumContainers(); i++) {
SchedulingRequest sr = setupSchedulingRequest(pSpec);
schedReqs.add(sr);
}
}
amRMClient.addSchedulingRequests(schedReqs);
}
在应用程序管理器发送容器分配请求后,AMRMClientAsync客户端的事件处理程序将异步启动容器。处理程序应实现AMRMClientAsync.CallbackHandler接口。
当分配了容器时,处理程序会设置一个线程来运行代码以启动容器。
@Override
public void onContainersAllocated(List<Container> allocatedContainers) {
LOG.info("Got response from RM for container ask, allocatedCnt="
+ allocatedContainers.size());
for (Container allocatedContainer : allocatedContainers) {
if (numAllocatedContainers.get() == numTotalContainers) {
LOG.info("The requested number of containers have been allocated."
+ " Releasing the extra container allocation from the RM.");
amRMClient.releaseAssignedContainer(allocatedContainer.getId());
} else {
numAllocatedContainers.addAndGet(1);
String yarnShellId = Integer.toString(yarnShellIdCounter);
yarnShellIdCounter++;
LOG.info(
"Launching shell command on a new container."
+ ", containerId=" + allocatedContainer.getId()
+ ", yarnShellId=" + yarnShellId
+ ", containerNode="
+ allocatedContainer.getNodeId().getHost()
+ ":" + allocatedContainer.getNodeId().getPort()
+ ", containerNodeURI="
+ allocatedContainer.getNodeHttpAddress()
+ ", containerResourceMemory"
+ allocatedContainer.getResource().getMemorySize()
+ ", containerResourceVirtualCores"
+ allocatedContainer.getResource().getVirtualCores());
Thread launchThread =
createLaunchContainerThread(allocatedContainer, yarnShellId);
// launch and start the container on a separate thread to keep
// the main thread unblocked
// as all containers may not be allocated at one go.
launchThreads.add(launchThread);
launchedContainers.add(allocatedContainer.getId());
launchThread.start();
// Remove the corresponding request
Collection<AMRMClient.ContainerRequest> requests =
amRMClient.getMatchingRequests(
allocatedContainer.getAllocationRequestId());
if (requests.iterator().hasNext()) {
AMRMClient.ContainerRequest request = requests.iterator().next();
amRMClient.removeContainerRequest(request);
}
}
}
}
在心跳时,事件处理程序报告应用程序的进度。
@Override
public float getProgress() {
// set progress to deliver to RM on next heartbeat
float progress = (float) numCompletedContainers.get()
/ numTotalContainers;
return progress;
}
容器启动线程实际上是在 NM 上启动容器。将容器分配给 AM 后,它需要遵循客户端在为将在分配的容器上运行的最终任务设置ContainerLaunchContext 时遵循的类似过程。一旦定义了ContainerLaunchContext,AM 就可以通过NMClientAsync启动它。
下属run方法在自定义实现RMCallbackHandler类中
@Override
/**
* Connects to CM, sets up container launch context
* for shell command and eventually dispatches the container
* start request to the CM.
*/
public void run() {
LOG.info("Setting up container launch container for containerid="
+ container.getId() + " with shellid=" + shellId);
// Set the local resources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
// The container for the eventual shell commands needs its own local
// resources too.
// In this scenario, if a shell script is specified, we need to have it
// copied and made available to the container.
if (!scriptPath.isEmpty()) {
Path renamedScriptPath = null;
if (Shell.WINDOWS) {
renamedScriptPath = new Path(scriptPath + ".bat");
} else {
renamedScriptPath = new Path(scriptPath + ".sh");
}
try {
// rename the script file based on the underlying OS syntax.
renameScriptFile(renamedScriptPath);
} catch (Exception e) {
LOG.error(
"Not able to add suffix (.bat/.sh) to the shell script filename",
e);
// We know we cannot continue launching the container
// so we should release it.
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}
URL yarnUrl = null;
try {
yarnUrl = URL.fromURI(new URI(renamedScriptPath.toString()));
} catch (URISyntaxException e) {
LOG.error("Error when trying to use shell script path specified"
+ " in env, path=" + renamedScriptPath, e);
// A failure scenario on bad input such as invalid shell script path
// We know we cannot continue launching the container
// so we should release it.
// TODO
numCompletedContainers.incrementAndGet();
numFailedContainers.incrementAndGet();
return;
}
LocalResource shellRsrc = LocalResource.newInstance(yarnUrl,
LocalResourceType.FILE, LocalResourceVisibility.APPLICATION,
shellScriptPathLen, shellScriptPathTimestamp);
localResources.put(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH :
EXEC_SHELL_STRING_PATH, shellRsrc);
shellCommand = Shell.WINDOWS ? windows_command : linux_bash_command;
}
// Set the necessary command to execute on the allocated container
Vector<CharSequence> vargs = new Vector<CharSequence>(5);
// Set executable command
vargs.add(shellCommand);
// Set shell script path
if (!scriptPath.isEmpty()) {
vargs.add(Shell.WINDOWS ? EXEC_BAT_SCRIPT_STRING_PATH
: EXEC_SHELL_STRING_PATH);
}
// Set args for the shell command if any
vargs.add(shellArgs);
// Add log redirect params
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
// Set up ContainerLaunchContext, setting local resource, environment,
// command and token for constructor.
// Note for tokens: Set up tokens for the container too. Today, for normal
// shell commands, the container in distribute-shell doesn't need any
// tokens. We are populating them mainly for NodeManagers to be able to
// download anyfiles in the distributed file-system. The tokens are
// otherwise also useful in cases, for e.g., when one is running a
// "hadoop dfs" command inside the distributed shell.
Map<String, String> myShellEnv = new HashMap<String, String>(shellEnv);
myShellEnv.put(YARN_SHELL_ID, shellId);
ContainerRetryContext containerRetryContext =
ContainerRetryContext.newInstance(
containerRetryPolicy, containerRetryErrorCodes,
containerMaxRetries, containrRetryInterval,
containerFailuresValidityInterval);
ContainerLaunchContext ctx = ContainerLaunchContext.newInstance(
localResources, myShellEnv, commands, null, allTokens.duplicate(),
null, containerRetryContext);
containerListener.addContainer(container.getId(), container);
nmClientAsync.startContainerAsync(container, ctx);
}
该NMClientAsync对象,与事件处理一起,处理容器事件。包括容器启动、停止、状态更新以及发生错误。
ApplicationMaster确定工作完成后,需要通过AM-RM客户端注销自己,然后停止客户端。
try {
amRMClient.unregisterApplicationMaster(appStatus, message, null);
} catch (YarnException | IOException ex) {
LOG.error("Failed to unregister application", ex);
}
amRMClient.stop();