使用yarn实现分布式shell
使用yarn实现分布式shell
代码地址:https://github.com/xiaozhch5/hadoop-yarn-applications-distributedshell.git
总览
在运行和理解该应用之前你首先需要对以下概念有一个清晰的了解:
Resource Manager
Node Manager
Application Master
应用提交流程
分布式shell 客户端首先运行一个Application Master,然后在containers中执行shell命令。
为了提交yarn应用,客户端首先需要通过ApplicationClientProtocol连接到ResourceManager的ApplicationManager或者ASM。其中,ApplicationClientProtocol给客户端提供了一种获取集群信息以及请求获取一个ApplicationId的方式。
对于一个需要提交的应用,客户端首先创建一个ApplicationSubmissionContext,该ApplicationSubmissionContext定义了应用的详细信息,比如ApplicationId、Application 名称、应用优先级、队列等信息。除此之外,ApplicationSubmissionContext还定义ContainerLaunchContext,该ContainerLaunchContext描述ApplicationMaster启动的Container的详细信息。
ContainerLaunchContext定义给ApplicationMaster分配的资源,比如说container、本地资源(jars、配置文件信息)、需要在ApplicationMaster中设置的环境变量以及需要在ApplicationMaster中执行的命令。
客户端使用ApplicationSubmissionContext将应用提交给ResourceManager,然后定时请求ResourceManager获取ApplicationReport。如果说应用运行的事件太长了,那么客户端可以通过提交一个KillApplicationRequest请求给ResourceManager来终止应用进程。
代码解析
代码下载
git clone https://github.com/xiaozhch5/hadoop-yarn-applications-distributedshell.git
在package org.apache.hadoop.yarn.applications.distributedshell目录下有如下java文件
2021/09/11 18:14 74,693 ApplicationMaster.java
2021/09/12 12:30 54,976 Client.java
2021/09/11 21:13 3,132 DistributedShellTimelinePlugin.java
2021/09/11 18:14 1,937 DSConstants.java
2021/09/11 18:14 1,987 Log4jPropertyHelper.java
2021/09/11 18:14 869 package-info.java
2021/09/11 18:14 4,258 PlacementSpec.java
其中Client用于提交应用,ApplicationMaster对应在Yarn上启动的ApplicationMaster
从Client.java文件入手
首先我们来看一下客户端文件的main函数入手:
/**
* @param args Command line arguments
*/
public static void main(String[] args) {
boolean result = false;
try {
YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnConfiguration.set("yarn.resourcemanager.hostname", "192.168.241.128");
yarnConfiguration.set("fs.defaultFS", "hdfs://hadoop:9000");
yarnConfiguration.set("fs.hdfs.impl", org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
Client client = new Client(yarnConfiguration);
LOG.info("Initializing Client");
try {
boolean doRun = client.init(args);
if (!doRun) {
System.exit(0);
}
} catch (IllegalArgumentException e) {
System.err.println(e.getLocalizedMessage());
client.printUsage();
System.exit(-1);
}
result = client.run();
} catch (Throwable t) {
LOG.error("Error running Client", t);
System.exit(1);
}
if (result) {
LOG.info("Application completed successfully");
System.exit(0);
}
LOG.error("Application failed to complete successfully");
System.exit(2);
}
该函数可分为三个部分
- hadoop集群配置初始化(对应yarnConfiguration)
- client初始化(对应client.init(args))
- 通过client提交yarn applicaiton(对应client.run())
第一部分指明了我们提交的hadoop集群的信息,其中包括ResourceManager对应的hostname,hdfs地址以及使用的文件系统。
第二部分接受输入参数用于初始化client的基本属性
第三部分则是具体的与ResourceManager交互提交Yarn Application。
client初始化参数
client接受如下参数作为输入参数:
usage: Client
-application_tags <arg> Application tags.
-appname <arg> Application Name. Default
value - DistributedShell
-attempt_failures_validity_interval <arg> when
attempt_failures_validity_
interval in milliseconds
is set to > 0,the failure
number will not take
failures which happen out
of the validityInterval
into failure count. If
failure count reaches to
maxAppAttempts, the
application will be
failed.
-container_failures_validity_interval <arg> Failures which are out of
the time window will not
be added to the number of
container retry attempts
-container_max_retries <arg> If container could retry,
it specifies max retires
-container_memory <arg> Amount of memory in MB to
be requested to run the
shell command
-container_resource_profile <arg> Resource profile for the
shell command
-container_resources <arg> Amount of resources to be
requested to run the shell
command. Specified as
resource type=value pairs
separated by commas. E.g.
-container_resources
memory-mb=256,vcores=1
-container_retry_error_codes <arg> When retry policy is set
to
RETRY_ON_SPECIFIC_ERROR_CO
DES, error codes is
specified with this
option, e.g.
--container_retry_error_co
des 1,2,3
-container_retry_interval <arg> Interval between each
retry, unit is
milliseconds
-container_retry_policy <arg> Retry policy when
container fails to run, 0:
NEVER_RETRY, 1:
RETRY_ON_ALL_ERRORS, 2:
RETRY_ON_SPECIFIC_ERROR_CO
DES
-container_type <arg> Container execution type,
GUARANTEED or
OPPORTUNISTIC
-container_vcores <arg> Amount of virtual cores to
be requested to run the
shell command
-create Flag to indicate whether
to create the domain
specified with -domain.
-debug Dump out debug information
-docker_client_config <arg> The docker client
configuration path. The
scheme should be supplied
(i.e. file:// or hdfs://).
Only used when the Docker
runtime is enabled and
requested.
-domain <arg> ID of the timeline domain
where the timeline
entities will be put
-enforce_execution_type Flag to indicate whether
to enforce execution type
of containers
-flow_name <arg> Flow name which the
distributed shell app
belongs to
-flow_run_id <arg> Flow run ID which the
distributed shell app
belongs to
-flow_version <arg> Flow version which the
distributed shell app
belongs to
-help Print usage
-jar <arg> Jar file containing the
application master
-keep_containers_across_application_attempts Flag to indicate whether
to keep containers across
application attempts. If
the flag is true, running
containers will not be
killed when application
attempt fails and these
containers will be
retrieved by the new
application attempt
-log_properties <arg> log4j.properties file
-master_memory <arg> Amount of memory in MB to
be requested to run the
application master
-master_resource_profile <arg> Resource profile for the
application master
-master_resources <arg> Amount of resources to be
requested to run the
application master.
Specified as resource
type=value pairs separated
by commas.E.g.
-master_resources
memory-mb=512,vcores=2
-master_vcores <arg> Amount of virtual cores to
be requested to run the
application master
-modify_acls <arg> Users and groups that
allowed to modify the
timeline entities in the
given domain
-node_label_expression <arg> Node label expression to
determine the nodes where
all the containers of this
application will be
allocated, "" means
containers can be
allocated anywhere, if you
don't specify the option,
default
node_label_expression of
queue will be used.
-num_containers <arg> No. of containers on which
the shell command needs to
be executed
-placement_spec <arg> Placement specification.
Please note, if this
option is specified, The
"num_containers" option
will be ignored. All
requested containers will
be of type GUARANTEED
-priority <arg> Application Priority.
Default 0
-promote_opportunistic_after_start Flag to indicate whether
to automatically promote
opportunistic containers
to guaranteed.
-queue <arg> RM Queue in which this
application is to be
submitted
-rolling_log_pattern <arg> pattern for files that
should be aggregated in a
rolling fashion
-shell_args <arg> Command line args for the
shell script.Multiple args
can be separated by empty
space.
-shell_cmd_priority <arg> Priority for the shell
command containers
-shell_command <arg> Shell command to be
executed by the
Application Master. Can
only specify either
--shell_command or
--shell_script
-shell_env <arg> Environment for shell
script. Specified as
env_key=env_val pairs
-shell_script <arg> Location of the shell
script to be executed. Can
only specify either
--shell_command or
--shell_script
-timeout <arg> Application timeout in
milliseconds
-view_acls <arg> Users and groups that
allowed to view the
timeline entities in the
given domain
其中必须指定的参数为-jar以及-shell_command。
-jar指定包含ApplicationMaster的jar包的本地路径
-shell_command制定了需要执行的shell命令
刚刚我们说到,在本项目中包含的文件中有ApplicationMaster.java,该文件就是我们需要在yarn集群启动的ApplicationMaster,所以我们需要先将本项目打包为jar包,然后使用-jar指定该jar包的路径。打包命令为:
mvn clean package
在client初始化中,还初始化了一个YarnClient对象,该对象即用于与ResourceManager进行交互。
client提交任务到ResourceManager
Client#run
在run方法中启动yarnClient之后即可与ResourceManager进行交互。
- 下述代码用于获取yarn集群相关信息
YarnClusterMetrics clusterMetrics = yarnClient.getYarnClusterMetrics();
LOG.info("Got Cluster metric info from ASM"
+ ", numNodeManagers=" + clusterMetrics.getNumNodeManagers());
List<NodeReport> clusterNodeReports = yarnClient.getNodeReports(
NodeState.RUNNING);
LOG.info("Got Cluster node info from ASM");
for (NodeReport node : clusterNodeReports) {
LOG.info("Got node report from ASM for"
+ ", nodeId=" + node.getNodeId()
+ ", nodeAddress=" + node.getHttpAddress()
+ ", nodeRackName=" + node.getRackName()
+ ", nodeNumContainers=" + node.getNumContainers());
}
QueueInfo queueInfo = yarnClient.getQueueInfo(this.amQueue);
if (queueInfo == null) {
throw new IllegalArgumentException(String
.format("Queue %s not present in scheduler configuration.",
this.amQueue));
}
LOG.info("Queue info"
+ ", queueName=" + queueInfo.getQueueName()
+ ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity()
+ ", queueMaxCapacity=" + queueInfo.getMaximumCapacity()
+ ", queueApplicationCount=" + queueInfo.getApplications().size()
+ ", queueChildQueueCount=" + queueInfo.getChildQueues().size());
List<QueueUserACLInfo> listAclInfo = yarnClient.getQueueAclsInfo();
for (QueueUserACLInfo aclInfo : listAclInfo) {
for (QueueACL userAcl : aclInfo.getUserAcls()) {
LOG.info("User ACL Info for Queue"
+ ", queueName=" + aclInfo.getQueueName()
+ ", userAcl=" + userAcl.name());
}
}
- 下述代码用于从Yarn获取application ID以及可用资源信息
// Get a new application id
YarnClientApplication app = yarnClient.createApplication();
GetNewApplicationResponse appResponse = app.getNewApplicationResponse();
// TODO get min/max resource capabilities from RM and change memory ask if needed
// If we do not have min/max, we may not be able to correctly request
// the required resources from the RM for the app master
// Memory ask has to be a multiple of min and less than max.
// Dump out information about cluster capability as seen by the resource manager
long maxMem = appResponse.getMaximumResourceCapability().getMemorySize();
LOG.info("Max mem capability of resources in this cluster " + maxMem);
// A resource ask cannot exceed the max.
if (amMemory > maxMem) {
LOG.info("AM memory specified above max threshold of cluster. Using max value."
+ ", specified=" + amMemory
+ ", max=" + maxMem);
amMemory = maxMem;
}
int maxVCores = appResponse.getMaximumResourceCapability().getVirtualCores();
LOG.info("Max virtual cores capability of resources in this cluster " + maxVCores);
if (amVCores > maxVCores) {
LOG.info("AM virtual cores specified above max threshold of cluster. "
+ "Using max value." + ", specified=" + amVCores
+ ", max=" + maxVCores);
amVCores = maxVCores;
}
- 在应用提交流程中我们说到,ApplicationSubmissionContext定义了应用的详细信息,以下部分设置了应用的名称、失败重试间隔、tag等信息。
// set the application name
ApplicationSubmissionContext appContext = app.getApplicationSubmissionContext();
ApplicationId appId = appContext.getApplicationId();
// Set up resource type requirements
// For now, both memory and vcores are supported, so we set memory and
// vcores requirements
List<ResourceTypeInfo> resourceTypes = yarnClient.getResourceTypeInfo();
setAMResourceCapability(appContext, profiles, resourceTypes);
setContainerResources(profiles, resourceTypes);
appContext.setKeepContainersAcrossApplicationAttempts(keepContainers);
appContext.setApplicationName(appName);
if (attemptFailuresValidityInterval >= 0) {
appContext
.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
}
Set<String> tags = new HashSet<String>();
if (applicationTags != null) {
tags.addAll(applicationTags);
}
if (flowName != null) {
tags.add(TimelineUtils.generateFlowNameTag(flowName));
}
if (flowVersion != null) {
tags.add(TimelineUtils.generateFlowVersionTag(flowVersion));
}
if (flowRunId != 0) {
tags.add(TimelineUtils.generateFlowRunIdTag(flowRunId));
}
appContext.setApplicationTags(tags);
- 以下部分将上述打包好的jar包拷贝到hdfs上
LOG.info("Copy App Master jar from local filesystem and add to local environment");
// Copy the application master jar to the filesystem
// Create a local resource to point to the destination jar path
FileSystem fs = FileSystem.get(conf);
addToLocalResources(fs, appMasterJar, appMasterJarPath, appId.toString(),
localResources, null);
- 如果我们执行的shell command需要shell脚本,那么需要将这些脚本放到hdfs上所有contianer都可以读取的地方。
// The shell script has to be made available on the final container(s)
// where it will be executed.
// To do this, we need to first copy into the filesystem that is visible
// to the yarn framework.
// We do not need to set this as a local resource for the application
// master as the application master does not need it.
String hdfsShellScriptLocation = "";
long hdfsShellScriptLen = 0;
long hdfsShellScriptTimestamp = 0;
if (!shellScriptPath.isEmpty()) {
Path shellSrc = new Path(shellScriptPath);
String shellPathSuffix =
appName + "/" + appId.toString() + "/" + SCRIPT_PATH;
Path shellDst =
new Path(fs.getHomeDirectory(), shellPathSuffix);
fs.copyFromLocalFile(false, true, shellSrc, shellDst);
hdfsShellScriptLocation = shellDst.toUri().toString();
FileStatus shellFileStatus = fs.getFileStatus(shellDst);
hdfsShellScriptLen = shellFileStatus.getLen();
hdfsShellScriptTimestamp = shellFileStatus.getModificationTime();
}
if (!shellCommand.isEmpty()) {
addToLocalResources(fs, null, shellCommandPath, appId.toString(),
localResources, shellCommand);
}
if (shellArgs.length > 0) {
addToLocalResources(fs, null, shellArgsPath, appId.toString(),
localResources, StringUtils.join(shellArgs, " "));
}
- 以下部分设置Application Master执行的环境变量
// Set the env variables to be setup in the env where the application master will be run
LOG.info("Set the environment for the application master");
Map<String, String> env = new HashMap<String, String>();
// put location of shell script into env
// using the env info, the application master will create the correct local resource for the
// eventual containers that will be launched to execute the shell scripts
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLOCATION, hdfsShellScriptLocation);
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTTIMESTAMP, Long.toString(hdfsShellScriptTimestamp));
env.put(DSConstants.DISTRIBUTEDSHELLSCRIPTLEN, Long.toString(hdfsShellScriptLen));
if (domainId != null && domainId.length() > 0) {
env.put(DSConstants.DISTRIBUTEDSHELLTIMELINEDOMAIN, domainId);
}
// Add AppMaster.jar location to classpath
// At some point we should not be required to add
// the hadoop specific classpaths to the env.
// It should be provided out of the box.
// For now setting all required classpaths including
// the classpath to "." for the application jar
StringBuilder classPathEnv = new StringBuilder(Environment.CLASSPATH.$$())
.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append("./*");
for (String c : conf.getStrings(
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH)) {
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR);
classPathEnv.append(c.trim());
}
classPathEnv.append(ApplicationConstants.CLASS_PATH_SEPARATOR).append(
"./log4j.properties");
// add the runtime classpath needed for tests to work
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
classPathEnv.append(':');
classPathEnv.append(System.getProperty("java.class.path"));
}
env.put("CLASSPATH", classPathEnv.toString());
- 以下命令用于构建启动ApplicationMaster的命令参数
// Set the necessary command to execute the application master
Vector<CharSequence> vargs = new Vector<CharSequence>(30);
// Set java executable command
LOG.info("Setting up app master command");
// Need extra quote here because JAVA_HOME might contain space on Windows,
// e.g. C:/Program Files/Java...
vargs.add("\"" + Environment.JAVA_HOME.$$() + "/bin/java\"");
// Set Xmx based on am memory size
vargs.add("-Xmx" + amMemory + "m");
// Set class name
vargs.add(appMasterMainClass);
// Set params for Application Master
if (containerType != null) {
vargs.add("--container_type " + String.valueOf(containerType));
}
if (autoPromoteContainers) {
vargs.add("--promote_opportunistic_after_start");
}
if (enforceExecType) {
vargs.add("--enforce_execution_type");
}
if (containerMemory > 0) {
vargs.add("--container_memory " + String.valueOf(containerMemory));
}
if (containerVirtualCores > 0) {
vargs.add("--container_vcores " + String.valueOf(containerVirtualCores));
}
if (!containerResources.isEmpty()) {
Joiner.MapJoiner joiner = Joiner.on(',').withKeyValueSeparator("=");
vargs.add("--container_resources " + joiner.join(containerResources));
}
if (containerResourceProfile != null && !containerResourceProfile
.isEmpty()) {
vargs.add("--container_resource_profile " + containerResourceProfile);
}
vargs.add("--num_containers " + String.valueOf(numContainers));
if (placementSpec != null && placementSpec.length() > 0) {
// Encode the spec to avoid passing special chars via shell arguments.
String encodedSpec = Base64.getEncoder()
.encodeToString(placementSpec.getBytes(StandardCharsets.UTF_8));
LOG.info("Encode placement spec: " + encodedSpec);
vargs.add("--placement_spec " + encodedSpec);
}
if (null != nodeLabelExpression) {
appContext.setNodeLabelExpression(nodeLabelExpression);
}
vargs.add("--priority " + String.valueOf(shellCmdPriority));
if (keepContainers) {
vargs.add("--keep_containers_across_application_attempts");
}
for (Map.Entry<String, String> entry : shellEnv.entrySet()) {
vargs.add("--shell_env " + entry.getKey() + "=" + entry.getValue());
}
if (debugFlag) {
vargs.add("--debug");
}
vargs.addAll(containerRetryOptions);
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/AppMaster.stderr");
// Get final commmand
StringBuilder command = new StringBuilder();
for (CharSequence str : vargs) {
command.append(str).append(" ");
}
LOG.info("Completed setting up app master command " + command.toString());
List<String> commands = new ArrayList<String>();
commands.add(command.toString());
- 在应用提交流程中,我们说到,ContainerLaunchContext定义给ApplicationMaster分配的资源,那么以下部分则是使用步骤7中的执行命令以及其他资源设置来构建ApplicationMaster对应的运行container的资源信息。
// Set up the container launch context for the application master
ContainerLaunchContext amContainer = ContainerLaunchContext.newInstance(
localResources, env, commands, null, null, null);
// Service data is a binary blob that can be passed to the application
// Not needed in this scenario
// amContainer.setServiceData(serviceData);
// Setup security tokens
Credentials rmCredentials = null;
if (UserGroupInformation.isSecurityEnabled()) {
// Note: Credentials class is marked as LimitedPrivate for HDFS and MapReduce
rmCredentials = new Credentials();
String tokenRenewer = YarnClientUtils.getRmPrincipal(conf);
if (tokenRenewer == null || tokenRenewer.length() == 0) {
throw new IOException(
"Can't get Master Kerberos principal for the RM to use as renewer");
}
// For now, only getting tokens for the default file-system.
final Token<?> tokens[] =
fs.addDelegationTokens(tokenRenewer, rmCredentials);
if (tokens != null) {
for (Token<?> token : tokens) {
LOG.info("Got dt for " + fs.getUri() + "; " + token);
}
}
}
// Add the docker client config credentials if supplied.
Credentials dockerCredentials = null;
if (dockerClientConfig != null) {
dockerCredentials =
DockerClientConfigHandler.readCredentialsFromConfigFile(
new Path(dockerClientConfig), conf, appId.toString());
}
if (rmCredentials != null || dockerCredentials != null) {
DataOutputBuffer dob = new DataOutputBuffer();
if (rmCredentials != null) {
rmCredentials.writeTokenStorageToStream(dob);
}
if (dockerCredentials != null) {
dockerCredentials.writeTokenStorageToStream(dob);
}
ByteBuffer tokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
amContainer.setTokens(tokens);
}
appContext.setAMContainerSpec(amContainer);
- 以下部分则是设置了应用提交的队列信息、优先级、是否开启日志聚合
// Set the priority for the application master
// TODO - what is the range for priority? how to decide?
Priority pri = Priority.newInstance(amPriority);
appContext.setPriority(pri);
// Set the queue to which this application is to be submitted in the RM
appContext.setQueue(amQueue);
specifyLogAggregationContext(appContext);
- 在执行完上述设置之后,即可提交应用
// Submit the application to the applications manager
// SubmitApplicationResponse submitResp = applicationsManager.submitApplication(appRequest);
// Ignore the response as either a valid response object is returned on success
// or an exception thrown to denote some form of a failure
LOG.info("Submitting application to ASM");
yarnClient.submitApplication(appContext);
- 提交完应用之后通过以下代码对应用进行监控
// Monitor the application
return monitorApplication(appId);
而参考monitorApplication函数我们可以发现,其功能则是每秒钟去获取一次应用的状态,直到该应用状态为FINISHED则停止。