YARN Resource Management

YARN Resource Management

PHD 3.0

YARN Resource Management

Capacity Scheduler

This guide describes how to use the Capacity Scheduler to allocate shared cluster resources among users and groups.

Introduction

    The fundamental unit of scheduling in YARN is the queue. Each queue in the Capacity Scheduler has the following properties:
  • A short queue name.

  • A full queue path name.

  • A list of associated child-queues and applications.

  • The guaranteed capacity of the queue.

  • The maximum capacity of the queue.

  • A list of active users and their corresponding resource allocation limits.

  • The state of the queue.

  • Access control lists (ACLs) governing access to the queue. The following sections will describe how to configure these properties, and how Capacity Scheduler uses these properties to make various scheduling decisions.

Enabling Capacity Scheduler

To enable the Capacity Scheduler, set the following property in the /etc/hadoop/conf/ yarn-site.xml file on the ResourceManager host:

<property>
  <name>yarn.resourcemanager.scheduler.class</name>
  <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

The settings for the Capacity Scheduler are contained in the /etc/hadoop/conf/capacity-scheduler.xml file on the ResourceManager host. The Capacity Scheduler reads this file when starting, and also when an administrator modifies the capacity- scheduler.xml file and then reloads the settings by running the following command:

yarn rmadmin -refreshQueues

This command can only be run by cluster administrators. Administrator privileges are configured with the yarn.admin.acl property on the ResourceManager.

Setting up Queues

The fundamental unit of scheduling in YARN is a queue. The capacity of each queue specifies the percentage of cluster resources that are available for applications submitted to the queue. Queues can be set up in a hierarchy that reflects the database structure, resource requirements, and access restrictions required by the various organizations, groups, and users that utilize cluster resources.

For example, suppose that a company has three organizations: Engineering, Support, and Marketing. The Engineering organization has two sub-teams: Development and QA. The Support organization has two sub-teams: Training and Services. And finally, the Marketing organization is divided into Sales and Advertising. The following image shoes the queue hierarchy for this example:

Each child queue is tied to its parent queue with the yarn.scheduler.capacity.<queue-path>.queues configuration property in the capacity-scheduler.xml file. The top-level "support", "engineering", and "marketing" queues would be tied to the "root" queue as follows:

Property: yarn.scheduler.capacity.root.queues

Value: support,engineering,marketing

Example:

<property>
  <name>yarn.scheduler.capacity.root.queues</name>
  <value>support,engineering,marketing</value>
  <description>The top-level queues below root.</description>
</property>

Similarly, the children of the "support" queue would be defined as follows:

Property: yarn.scheduler.capacity.support.queues

Value: training,services

Example:

<property>
  <name>yarn.scheduler.capacity.support.queues</name>
  <value>training,services</value>
  <description>child queues under support</description>
</property>

The children of the "engineering" queue would be defined as follows: Property: yarn.scheduler.capacity.engineering.queues Value: development,qa

Example:

<property>
  <name>yarn.scheduler.capacity.engineering.queues</name>
  <value>development,qa</value>
  <description>child queues under engineering</description>
</property>

And the children of the "marketing" queue would be defined as follows: Property: yarn.scheduler.capacity.marketing.queues Value: sales,advertising

Example:

<property>
  <name>yarn.scheduler.capacity.marketing.queues</name>
  <value>sales,advertising</value>
  <description>child queues under marketing</description>
</property>

Hierarchical Queue Characteristics

  • There are two types of queues: parent queues and leaf queues.

  • Parent queues enable the management of resources across organizations and sub- organizations. They can contain more parent queues or leaf queues. They do not themselves accept any application submissions directly.

  • Leaf queues are the queues that live under a parent queue and accept applications. Leaf queues do not have any child queues, and therefore do not have any configuration property that ends with ".queues".

  • There is a top-level parent root queue that does not belong to any organization, but instead represents the cluster itself.

  • Using parent and leaf queues, administrators can specify capacity allocations for various organizations and sub-organizations.

Scheduling Among Queues

Hierarchical queues ensure that guaranteed resources are first shared among the sub- queues of an organization before any remaining free resources are shared with queues belonging to other organizations. This enables each organization to have control over the utilization of its guaranteed resources.

  • At each level in the hierarchy, every parent queue keeps the list of its child queues in a sorted manner based on demand. The sorting of the queues is determined by the currently used fraction of each queue’s capacity (or the full-path queue names if the reserved capacity of any two queues is equal).

  • The root queue understands how the cluster capacity needs to be distributed among the first level of parent queues and invokes scheduling on each of its child queues.

  • Every parent queue applies its capacity constraints to all of its child queues.

  • Leaf queues hold the list of active applications (potentially from multiple users) and schedules resources in a FIFO (first-in, first-out) manner, while at the same time adhering to capacity limits specified for individual users.

Controlling Access to Queues with ACLs

Access-control lists (ACLs) can be used to restrict user and administrator access to queues. Application submission can really only happen at the leaf queue level, but an ACL restriction set on a parent queue will be applied to all of its descendant queues.

In the Capacity Scheduler, ACLs are configured by granting queue access to a list of users and groups with the acl_submit_applications property. The format of the list is "user1,user2 group1,group2" -- a comma-separated list of users, followed by a space, followed by a comma-separated list of groups.

The value of acl_submit_applications can also be set to "*" (asterisk) to allow access to all users and groups, or can be set to " " (space character) to block access to all users and groups.

As mentioned previously, ACL settings on a parent queue are applied to all of its descendant queues. Therefore, if the parent queue uses the "*" (asterisk) value (or is not specified) to allow access to all users and groups, its child queues cannot restrict access. Similarly, before you can restrict access to a child queue, you must first set the parent queue to " " (space character) to block access to all users and groups.

For example, the following properties would set the root acl_submit_applications value to " " (space character) to block access to all users and groups, and also restrict access to its child "support" queue to the users "sherlock" and "pacioli" and the members of the "cfo-group" group:

<property>
  <name>yarn.scheduler.capacity.root.acl_submit_applications</name>
  <value> </value>
</property>

<property>
  <name>yarn.scheduler.capacity.root.support.acl_submit_applications</name>
  <value>sherlock,pacioli cfo-group</value>
</property>

A separate ACL can be used to control the administration of queues at various levels. Queue administrators can submit applications to the queue, kill applications in the queue, and obtain information about any application in the queue (whereas normal users are restricted from viewing all of the details of other users' applications).

Administrator ACLs are configured with the acl_administer_queue property. ACLs for this property are inherited from the parent queue if not specified. For example, the following properties would set the root acl_administer_queue value to " " (space character) to block access to all users and groups, and also grant administrator access to its child "support" queue to the users "sherlock" and "pacioli" and the members of the "cfo- group" group:

<property>
  <name>yarn.scheduler.capacity.root.acl_administer_queue</name>
  <value> </value>
</property>

<property>
  <name>yarn.scheduler.capacity.root.support.acl_administer_queue</name>
  <value>sherlock,pacioli cfo-group</value>
</property>

Managing Cluster Capacity with Queues

The Capacity Scheduler is designed to allow organizations to share compute clusters using the very familiar notion of FIFO (first-in, first-out) queues. YARN does not assign entire nodes to queues. Queues own a fraction of the capacity of the cluster, and this specified queue capacity can be fulfilled from any number of nodes in a dynamic fashion.

Scheduling is the process of matching resource requirements -- of multiple applications from various users, and submitted to different queues at multiple levels in the queue hierarchy -- with the free capacity available on the nodes in the cluster. Because total cluster capacity can vary, capacity configuration values are expressed as percents.

The capacity property can be used by administrators to allocate a percentage of cluster capacity to a queue. The following properties would divide the cluster resources between the Engineering, Support, and Marketing organizations in a 6:1:3 ratio (60%, 10%, and 30%).

Property: yarn.scheduler.capacity.root.engineering.capacity

Value: 60

Property: yarn.scheduler.capacity.root.support.capacity

Value: 10

Property: yarn.scheduler.capacity.root.marketing.capacity

Value: 30

Now suppose that the Engineering group decides to split its capacity between the Development and QA sub-teams in a 1:4 ratio. That would be implemented by setting the following properties:

Property: yarn.scheduler.capacity.root.engineering.development.capacity

Value: 20

Property: yarn.scheduler.capacity.root.engineering.qa.capacity

Value: 80

The following image illustrates this cluster capacity configuration:

Resource Distribution Workflow

    During scheduling, queues at any level in the hierarchy are sorted in the order of their current used capacity, and available resources are distributed among them starting with queues that are currently the most under-served. With respect to capacities alone, the resource scheduling has the following workflow:
  • The more under-served a queue is, the higher the priority it receives during resource allocation. The most under-served queue is the queue with the least ratio of used capacity as compared to the total cluster capacity.

    • The used capacity of any parent queue is defined as the aggregate sum of used capacity of all of its descendant queues, recursively.

    • The used capacity of a leaf queue is the amount of resources used by the allocated Containers of all of the applications running in that queue.

  • Once it is decided to give a parent queue the currently available free resources, further scheduling is done recursively to determine which child queue gets to use the resources -- based on the previously described concept of used capacities.

  • Further scheduling happens inside each leaf queue to allocate resources to applications in a FIFO order.

    • This is also dependent on locality, user level limits, and application limits.

    • Once an application within a leaf queue is chosen, scheduling also happens within the application. Applications may have different priorities of resource requests.

  • To ensure elasticity, capacity that is configured but not utilized by any queue due to lack of demand is automatically assigned to the queues that are in need of resources.

Resource Distribution Workflow Example

Suppose our cluster has 100 nodes, each with 10 GB of memory allocated for YARN Containers, for a total cluster capacity of 1000 GB (1 TB). According to the previously described configuration, the Engineering organization is assigned 60% of the cluster capacity, i.e., an absolute capacity of 600 GB. Similarly, the Support organization is assigned 100 GB, and the Marketing organization gets 300 GB.

Under the Engineering organization, capacity is distributed between the Development team and the QA team in a in a 1:4 ratio. So Development gets 120 GB, and 480 GB is assigned to QA.

    Now consider the following timeline of events:
  • Initially, the entire "engineering" queue is free with no applications running, while the "support" and "marketing" queues are utilizing their full capacities.

  • Users Sid and Hitesh first submit applications to the "development" leaf queue. Their applications are elastic and can run with either all of the resources available in the cluster, or with a subset of cluster resources (depending upon the state of the resource-usage).

    • Even though the "development" queue is allocated 120 GB, Sid and Hitesh are each allowed to occupy 120 GB, for a total of 240 GB.

    • This can happen despite the fact that the "development" queue is configured to be run with a capacity of 120 GB. Capacity Scheduler allows elastic sharing of cluster resources for better utilization of available cluster resources. Since there are no other users in the "engineering" queue, Sid and Hitesh are allowed to use the available free resources.

  • Next, users Jian, Zhijie and Xuan submit more applications to the "development" leaf queue. Even though each is restricted to 120 GB, the overall used capacity in the queue becomes 600 GB -- essentially taking over all of the resources allocated to the "qa" leaf queue.

  • User Gupta now submits an application to the "qa" queue. With no free resources available in the cluster, his application must wait.

    • Given that the "development" queue is utilizing all of the available cluster resources, Gupta may or may not be able to immediately get back the guaranteed capacity of his "qa" queue -- depending upon whether or not preemption is enabled.

  • As the applications of Sid, Hitesh, Jian, Zhijie, and Xuan finish running and resources become available, the newly available Containers will be allocated to Gupta’s application.

This will continue until the cluster stabilizes at the intended 1:4 resource usage ratio for the "development" and "qa" queues.

From this example, you can see that it is possible for abusive users to submit applications continuously, and thereby lock out other queues from resource allocation until Containers finish running or get preempted. To avoid this scenario, Capacity Scheduler supports limits on the elastic growth of any queue. For example, to restrict the "development" queue from monopolizing the "engineering" queue capacity, an administrator can set a the maximum- capacity property:

Property: yarn.scheduler.capacity.root.engineering.development.maximum- capacity

Value: 40

Once this is set, users of the "development" queue can still go beyond their capacity of 120 GB, but they will not be allocated any more than 40% of the "engineering" parent queue's capacity (i.e., 40% of 600 GB = 240 GB).

The capacity and maximum-capacity properties can be used to control sharing and elasticity across the organizations and sub-organizations utilizing a YARN cluster. Administrators should balance these properties to avoid strict limits that result in a loss of utilization, and to avoid excessive cross-organization sharing.

Capacity and maximum capacity settings can be dynamically changed at run-time using yarn rmadmin -refreshQueues.

Setting User Limits

The minimum-user-limit-percent property can be used to set the minimum percentage of resources allocated to each leaf queue user. For example, to enable equal sharing of the "services" leaf queue capacity among five users, you would set the minimum- user-limit property to 20%:

Property: yarn.scheduler.capacity.root.support.services.minimum-user- limit-percent

Value: 20

This setting determines the minimum limit that any user’s share of the queue capacity can shrink to. Irrespective of this limit, any user can come into the queue and take more than his or her allocated share if there are idle resources available.

The following table shows how the queue resources are adjusted as users submit jobs to a queue with a minimum-user-limit-percent value of 20%:

  • The Capacity Scheduler also manages resources for decreasing numbers of users. As users’ applications finish running, other existing users with outstanding requirements begin to reclaim that share.

  • Note that despite this sharing among users, the FIFO application scheduling order of Capacity Scheduler does not change. This guarantees that users cannot monopolize queues by submitting new applications continuously. Applications (and thus the corresponding users) that are submitted first always get a higher priority than applications that are submitted later.

Capacity Scheduler’s leaf queues can also use the user-limit-factor property to control user resource allocations. This property denotes the fraction of queue capacity that any single user can consume up to a maximum value, regardless of whether or not there are idle resources in the cluster.

Property: yarn.scheduler.capacity.root.support.user-limit-factor Value: 1 The default value of "1" means that any single user in the queue can at maximum only occupy the queue’s configured capacity. This prevents users in a single queue from monopolizing resources across all queues in a cluster. Setting the value to "2" would restrict the queue's users to twice the queue’s configured capacity. Setting it to a value of 0.5 would restrict any user from using resources beyond half of the queue capacity.

These settings can also be dynamically changed at run-time using yarn rmadmin - refreshQueues.

Application Reservations

The Capacity Scheduler is responsible for matching free resources in the cluster with the resource requirements of an application. Many times, a scheduling cycle occurs such that even though there are free resources on a node, they are not sized large enough to satisfy the application waiting for a resource at the head of the queue. This typically happens with high-memory applications whose resource demand for Containers is much larger than the typical application running in the cluster. This mismatch can lead to starving these resource- intensive applications.

    The Capacity Scheduler reservations feature addresses this issue:
  • When a node reports in with a finished Container, the Capacity Scheduler selects an appropriate queue to utilized the newly available resources based on capacity and maximum capacity settings.

  • Within that selected queue, the Capacity Scheduler looks at the applications in a FIFO order along with the user limits. Once a needy application is found, the Capacity Scheduler tries to see if the requirements of that application can be met by the node’s free capacity.

  • If there is a size mismatch, the Capacity Scheduler immediately creates a reservation on the node for the application’s required Container.

  • Once a reservation is made for an application on a node, those resources are not used by the Capacity Scheduler for any other queue, application, or Container until the application reservation is fulfilled.

  • The node on which a reservation is made reports back when enough Containers finish running such that the total free capacity on the node now matches the reservation size. When that happens, the Capacity Scheduler marks the reservation as fulfilled, removes it, and allocates a Container on the node.

  • In some cases another node fulfills the resources required by the application, so the application no longer needs the reserved capacity on the first node. In this situation, the reservation is simply cancelled.

To prevent the number of reservations from growing in an unbounded manner, and to avoid any potential scheduling deadlocks, the Capacity Scheduler maintains only one active reservation at a time on each node.

Starting and Stopping Queues

Queues in YARN can be in two states: RUNNING or STOPPED. A RUNNING state indicates that a queue can accept application submissions, and a STOPPED queue does not accept application submissions. The default state of any configured queue is RUNNING.

In Capacity Scheduler, parent queues, leaf queues, and the root queue can all be stopped. For an application to be accepted at any leaf queue, all the queues in the hierarchy all the way up to the root queue must be running. This means that if a parent queue is stopped, all of the descendant queues in that hierarchy are inactive, even if their own state is RUNNING.

The following example sets the value of the state property of the "support" queue to RUNNING:

Property: yarn.scheduler.capacity.root.support.state

Value: RUNNING

Administrators can use the ability to stop and drain applications in a queue for a number of reasons, such as when decommissioning a queue and migrating its users to other queues. Administrators can stop queues at run-time, so that while current applications run to completion, no new applications are admitted. Existing applications can continue until they finish running, and thus the queue can be drained gracefully without any end-user impact.

Administrators can also restart the stopped queues by modifying the state configuration property and then refreshing the queue using yarn rmadmin -refreshQueues as previously described.

Setting Application Limits

To avoid system-thrash due to an unmanageable load -- caused either by malicious users, or by accident -- the Capacity Scheduler enables you to place a static, configurable limit on the total number of concurrently active (both running and pending) applications at any one time. The maximum-applications configuration property is used to set this limit, with a default value of 10,000:

Property: yarn.scheduler.capacity.maximum-applications

Value: 10000

The limit for running applications in any specific queue is a fraction of this total limit, proportional to its capacity. This is a hard limit, which means that once this limit is reached for a queue, any new applications to that queue will be rejected, and clients will have to wait and retry later. This limit can be explicitly overridden on a per-queue basis with the following configuration property:

Property: yarn.scheduler.capacity.<queue-path>.maximum-applications

Value: absolute-capacity * yarn.scheduler.capacity.maximum- applications

There is another resource limit that can be used to set a maximum percentage of cluster resources allocated specifically to ApplicationMasters. The maximum-am-resource-percent property has a default value of 10%, and exists to avoid cross-application deadlocks where significant resources in the cluster are occupied entirely by the Containers running ApplicationMasters. This property also indirectly controls the number of concurrent running applications in the cluster, with each queue limited to a number of running applications proportional to its capacity.

Property: yarn.scheduler.capacity.maximum-am-resource-percent

Value: 0.1

As with maximum-applications, this limit can also be overridden on a per-queue basis:

Property: yarn.scheduler.capacity.<queue-path>.maximum-am-resource-percent

Value: 0.1

All of these limits ensure that no single application, user, or queue can cause catastrophic failure, or monopolize the cluster and cause excessive degradation of cluster performance.

Preemption

As mentioned previously, a scenario can occur in which a queue has a guaranteed level of cluster resources, but must wait to run applications because other queues are utilizing all of the available resources. If Preemption is enabled, higher-priority applications do not have to wait because lower priority applications have taken up the available capacity. With Preemption enabled, under-served queues can begin to claim their allocated cluster resources almost immediately, without having to wait for other queues' applications to finish running.

Preemption Workflow

Preemption is governed by a set of capacity monitor policies, which must be enabled by setting the yarn.resourcemanager.scheduler.monitor.enable property to "true". These capacity monitor policies apply Preemption in configurable intervals based on defined capacity allocations, and in as graceful a manner as possible. Containers are only killed as a last resort. The following image demonstrates the Preemption workflow:

Preemption Configuration

The following properties in the /etc/hadoop/conf/yarn-site.xml file on the ResourceManager host are used to enable and configure Preemption.

  • Property: yarn.resourcemanager.scheduler.monitor.enable Value: true Description: Setting this property to "true" enables Preemption. It enables a set of periodic monitors that affect the Capacity Scheduler. This default value for this property is "false" (disabled).

  • Property: yarn.resourcemanager.scheduler.monitor.policies Value: org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy Description: The list of SchedulingEditPolicy classes that interact with the scheduler. The only policy currently available for preemption is the “ProportionalCapacityPreemptionPolicy”.

  • Property: yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval Value: 3000 Description: The time in milliseconds between invocations of this policy. Setting this value to a longer time interval will cause the Capacity Monitor to run less frequently.

  • Property: yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill Value: 15000 Description: The time in milliseconds between requesting a preemption from an application and killing the container. Setting this to a higher value will give applications more time to respond to preemption requests and gracefully release Containers.

  • Property: yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round Value: 0.1 Description: The maximum percentage of resources preempted in a single round. You can use this value to restrict the pace at which Containers are reclaimed from the cluster. After computing the total desired preemption, the policy scales it back to this limit.

Scheduler User Interface

You can use the Scheduler page in the Hadoop User Interface (UI) page to view the status and settings of Capacity Scheduler queues. The following image show the Hadoop UI Scheduler page (http://<hostname>:8088/cluster/scheduler) with the "support" queue selected:

CGroups

You can use CGroups to isolate CPU-heavy processes in a Hadoop cluster. If you are using CPU scheduling, you should also use CGroups to constrain and manage CPU processes. If you are not using CPU scheduling, do not enable CGroups.

Introduction

When you enable CPU scheduling, queues are still used to allocate cluster resources, but both CPU and memory are taken into consideration using a scheduler that utilizes Dominant Resource Fairness (DRF). In the DRF model, resource allocation takes into account the dominant resource required by a process. CPU-heavy processes receive more CPU and less memory. Memory-heavy processes (such as MapReduce) receive more memory and less CPU. The DRF scheduler is designed to fairly distribute memory and CPU resources among different types of processes in a mixed- workload cluster.

CGroups compliments CPU scheduling by providing CPU resource isolation. It enables you to set limits on the amount of CPU resources granted to individual YARN containers, and also lets you set a limit on the total amount of CPU resources used by YARN processes.

CGroups represents one aspect of YARN resource management capabilities that includes CPU scheduling, node labels, archival storage, and memory as storage. If CPU scheduling is used, CGroups should be used along with it to constrain and manage CPU processes.

Enabling CGroups

CGroups is a Linux kernel feature. Currently PHD supports CGroups on RHEL6 only. At this time there is no CGroups equivalent for Windows. CGroups are not enabled by default on PHD.

Enable CGroups

On Centos 6, CGroups are not set up by default. Run the following commands to set up CGroups:

yum install libcgroup
sudo mkdir /cgroup
mount -t cgroup -o cpu cpu /cgroup

Set the following properties in the /etc/hadoop/conf/yarn-site.xml file on the ResourceManager and NodeManager hosts:

Property: yarn.nodemanager.container-executor.class

Value: org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor

Example:

<property>
  <name>yarn.nodemanager.container-executor.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor</value>
</property>

Property: yarn.nodemanager.container-executor.group

Value: hadoop

Example:

<property>
  <name>yarn.nodemanager.container-executor.group</name>
  <value>hadoop</value>
</property>

Property: yarn.nodemanager.container-executor.resources-handler.class

Value: org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler

Example:

<property>
  <name>yarn.nodemanager.container-executor.resources-handler.class</name>
  <value>org.apache.hadoop.yarn.server.nodemanager.util.CgroupsLCEResourcesHandler</value>
</property>

Property: yarn.nodemanager.container-executor.cgroups.hierarchy

Value: /yarn

Example:

<property>
  <name>yarn.nodemanager.container-executor.cgroups.hierarchy</name>
  <value>/yarn</value>
</property>

Property: yarn.nodemanager.container-executor.cgroups.mount

Value: true

Example:

<property>
  <name>yarn.nodemanager.container-executor.cgroups.mount</name>
  <value>true</value>
</property>

Property: yarn.nodemanager.linux-container-executor.cgroups.mount-path

Value: /cgroup

Example:

<property>
  <name>yarn.nodemanager.linux-container-executor.cgroups.mount-path</name>
  <value>/cgroup</value>
</property>
Set the Percentage of CPU used by YARN

Set the percentage of CPU that can be allocated for YARN containers. In most cases, the default value of 100% should be used. If you have another process that needs to run on a node that also requires CPU resources, you can lower the percentage of CPU allocated to YARN to free up resources for the other process.

Property: yarn.nodemanager.resource.percentage-physical-cpu-limit

Value: 100

Example:

<property>
  <name>yarn.nodemanager.resource.percentage-physical-cpu-limit</name>
  <value>100</value>
</property>
Set Flexible or Strict CPU limits

CPU jobs are constrained with CPU scheduling and CGroups enabled, but by default these are flexible limits. If spare CPU cycles are available, containers are allowed to exceed the CPU limits set for them. With flexible limits, the amount of CPU resources available for containers to use can vary based on cluster usage -- the amount of CPU available in the cluster at any given time.

You can use CGroups to set strict limits on CPU usage. When strict limits are enabled, each process receives only the amount of CPU resources it requests. With strict limits, a CPU process will receive the same amount of cluster resources every time it runs.

Strict limits are not enabled (set to false) by default. To enable strict CPU limits, set the following property to true.

Property: yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage

Value: true

Example:

<property>
  <name>yarn.nodemanager.linux-container-executor.cgroups.strict-resource-usage</name>
  <value>true</value>
</property>

Using CGroups

Strict CGroup CPU limits can be used to constrain CPU processes in mixed workload clusters.

When you enable strict CGroup CPU limits, each resource gets only what it asks for, even if there is extra CPU available. This would be useful for scenarios involving charge-backs or strict SLA enforcement, where you always need to know exactly what percentage or CPU is being used.

Also, enabling strict CPU limits would make job performance predictable, whereas without setting strict limits a CPU-intensive job would run faster when the cluster was not under heavy use, but slower when more jobs were running in the cluster. Strict CPU limits would therefore also be useful for benchmarking.

You could also use node labels in conjunction with CGroups and CPU scheduling to restrict mixed workload jobs to a subset of cluster nodes.

If you are using CGroups and want more information on CPU performance, you can review the statistics available in the /cgroup/cpu/yarn/cpu.stat file.

CPU Scheduling

This guide describes how to allocate shared CPU and memory resources among users and groups in a Hadoop cluster.

Introduction

As discussed in the Capacity Scheduler guide, the fundamental unit of scheduling in YARN is the queue. The capacity of each queue specifies the percentage of cluster resources that are available for applications submitted to the queue. Queues can be set up in a hierarchy that reflects the database structure, resource requirements, and access restrictions required by the various organizations, groups, and users that utilize cluster resources. When the default resource calculator (DefaultResourceCalculator) is used, resources are allocated based on memory alone.

CPU scheduling is enabled by using the Dominant Resource Calculator (DominantResourceCalculator) rather than the default resource calculator. The Dominant Resource Calculator is based on the Dominant Resource Fairness (DRF) model of resource allocation.

With the Dominant Resource Calculator, queues are still used to allocate cluster resources, but both CPU and memory are taken into consideration. In the DRF model, resource allocation takes into account the dominant resource required by a process. The Dominant Resource Calculator schedules both CPU-heavy and memory-heavy processes on the same node. CPU-heavy processes receive more CPU and less memory. Memory-heavy processes (such as MapReduce) receive more memory and less CPU. The DRF scheduler is designed to fairly distribute memory and CPU resources among different types of processes in a mixed-workload cluster.

CPU scheduling represents one aspect of YARN resource management capabilities that includes CGroups, node labels, archival storage, and memory as storage. CGroups should be used with CPU scheduling to constrain and manage CPU processes.

CPU scheduling is only recommended for Linux. Currently there is no isolation mechanism (CGroups equivalent) for Windows, so do not enable CPU scheduling on Windows.

Enabling CPU Scheduling

Enable CPU Scheduling in capacity-scheduler.xml

CPU scheduling is not enabled by default. To enable the CPU Scheduling, set the following property in the /etc/hadoop/conf/capacity-scheduler.xml file on the ResourceManager and NodeManager hosts:

Replace the DefaultResourceCalculator with the DominantResourceCalculator.

Property: yarn.scheduler.capacity.resource-calculator

Value: org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

<property>
  <name>yarn.scheduler.capacity.resource-calculator</name>
  <!-- <value>org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator</value> -->
  <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
</property>
Set Vcores in yarn-site.xml

In YARN, vcores (virtual cores) are used to normalize CPU resources across the cluster. The yarn.nodemanager.resource.cpu-vcores value sets the number of CPU cores that can be allocated for containers.

The number of vcores should be set to match the number of physical CPU cores on the NodeManager hosts. Set the following property in the /etc/hadoop/conf/yarn-site.xml file on the ResourceManager and NodeManager hosts:

Property: yarn.nodemanager.resource.cpu-vcores

Value: <number_of_physical_cores>

Example:

<property>
  <name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>

It is also recommended that you enable CGroups along with CPU scheduling. CGroups are used as the isolation mechanism for CPU processes. With CGroups strict enforcement turned on, each CPU process gets only the resources it asks for. Without CGroups turned on, the DRF scheduler attempts to balance the load, but unpredictable behavior may occur.

Currently there is no isolation mechanism (CGroups equivalent) for Windows, so do not enable CPU scheduling on Windows.

Using CPU Scheduling

MapReduce Jobs Only

If you primarily run MapReduce jobs on your cluster, you probably will not see much of a change in performance if you enable CPU scheduling. The dominant resource for MapReduce is memory, so the DRF scheduler continues to balance MapReduce jobs out similar to the default resource calculator. In the single resource case, the DRF reduces to max-min fairness for that resource.

Mixed Workloads

CGroups can be used along with CPU scheduling to help manage mixed workloads. CGroups provides isolation for CPU-intensive processes, thereby enabling you to predictably plan and constrain the CPU-intensive jobs.

You could also use node labels in conjunction with CPU scheduling and CGroups to restrict jobs to a subset of cluster nodes.

Dominant Resource Fairness (DRF)

The Dominant Resource Calculator (DominantResourceCalculator) is used to enable CPU scheduling. The Dominant Resource Calculator is based on the Dominant Resource Fairness (DRF) model of resource allocation.

DRF uses the concept of the dominant resource to compare multi-dimensional resources. The idea is that in a multi-resource environment, resource allocation should be determined by the dominant share of an entity (user or queue), which is the maximum share that the entity has been allocated of any resource (memory or CPU). Essentially, the DRF seeks to maximize the minimum dominant share across all entities.

For example, if user A runs CPU-heavy tasks and user B runs memory-heavy tasks, the DRF attempts to equalize CPU share of user A with the memory share of user B. In this case, the DRF would allocate more CPU and less memory to the tasks run by user A, and allocate less CPU and more memory to the tasks run by user B. In the single resource case -- where all jobs are requesting the same resources -- the DRF reduces to max-min fairness for that resource.

For more information about DRF, see Dominant Resource Fairness: Fair Allocation of Mulitple Resources.

Log Aggregation for Long-Running Applications

This guide describes how to use log aggregation to collect logs for long-running YARN applications.

Introduction

In Hadoop logs are collected for an application only when it finishes running. This works well for applications that only run for a short time, but is not ideal for long-running applications such as HBase running on YARN.

If an application runs for days or weeks, it is useful to collect log information while the application is running. This log information can be used to:

  • Debug hardware and software issues

  • Review and optimize application performance

  • Review application resource utilization

A long-running application may generate a large amount of log information. The application can use log rotation to prevent the log file from becoming excessively large. The YARN NodeManager will periodically aggregate the latest completed log files and make them accessible.

Enable Log Aggregation

Log aggregation is enabled in the yarn-site.xml file. The yarn.log-aggregation-enable property enables log aggregation for running applications. You must also specify a log aggregation time interval using the yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds property. The logs for running applications are aggregated at the specified interval. The minimum monitoring interval value is 3600 seconds (one hour).

You can also set the monitoring interval value to -1 to disable log aggregation for running applications, and wait until the application finishes running to enable log aggregation.

  <property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
  </property>

  <property>
    <name>yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds</name>
    <value>3600</value>
  </property>

Node Labels

This guide describes how to use Node labels to restrict YARN applications so that they run only on cluster nodes that have a specified node label.

Introduction

As discussed in the Capacity Scheduler guide, the fundamental unit of scheduling in YARN is the queue. The capacity of each queue specifies the percentage of cluster resources that are available for applications submitted to the queue. Queues can be set up in a hierarchy that reflects the resource requirements and access restrictions required by the various organizations, groups, and users that utilize cluster resources.

Node labels can be assigned to cluster nodes. You can then associate node labels with capacity scheduler queues to specify which node label each queue is allowed to access.

When a queue is associated with one or more node labels, all applications submitted by the queue run only on nodes with those specified labels. If no node label is assigned to a queue, the applications submitted by the queue can run on any node without a node label.

Node labels represent one aspect of YARN resource management capabilities that includes CPU scheduling, CGroups, archival storage, and memory as storage.

Configuring Node Labels

To enable node labels, make the following configuration changes on the YARN ResourceManager host.

1. Create a Label Directory in HDFS

Use the following commands to create a "node-labels" directory in which to store the node labels in HDFS.

sudo su hdfs
hadoop fs -mkdir -p /yarn/node-labels
hadoop fs -chown -R yarn:yarn /yarn
hadoop fs -chmod -R 700 /yarn

-chmod -R 700 specifies that only the yarn user can access the "node-labels" directory.

You can then use the following command to confirm that the directory was created in HDFS.

hadoop fs -ls /yarn

The new node label directory should appear in the list returned by the following command. The owner should be yarn, and the permission should be drwx.

Found 1 items
drwx------   - yarn yarn          0 2014-11-24 13:09 /yarn/node-labels

Use the following commands to create a /user/yarn directory that is required by the distributed shell.

hadoop fs -mkdir -p /user/yarn
hadoop fs -chown -R yarn:yarn /user/yarn
hadoop fs -chmod -R 700 /user/yarn

The preceding commands assume that the yarn user will be submitting jobs with the distributed shell. To run the distributed shell with a different user, create the user, then use /user/<user_name> in the file paths of the commands above to create a new user directory.

2. Configure YARN for Node Labels

Add the following properties to the /etc/hadoop/conf/yarn-site.xml file on the ResourceManager host.

Set the following property to enable node labels:

<property>
    <name>yarn.node-labels.manager-class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager</value>
</property>

Set the following property to reference the HDFS node label directory:

<property>
      <name>yarn.node-labels.fs-store.root-dir</name>
      <value>hdfs://<host>:<port>/<absolute_path_to_node_label_directory></value>
   </property>

For example:

<property>
      <name>yarn.node-labels.fs-store.root-dir</name>
      <value>hdfs://node-1.example.com:8020/yarn/node-labels/</value>
   </property>
3. Start or Restart the YARN ResourceManager

In order for the configuration changes in the yarn-site.xml file to take effect, you must stop and restart the YARN ResourceManager if it is running, or start the ResourceManager if it is not running. To start or stop the ResourceManager, use the applicable commands in the "Controlling PHD Services Manually" section of the PHD Reference Guide.

4. Add and Assign Node Labels

For demonstration purposes, the following commands show how to use the yarn rmadmin client to add the node labels "x" and "y", but you can add your own node labels. You should run these commands as the yarn user. Node labels must be added before they can be assigned to nodes and associated with queues.

sudo su yarn
yarn rmadmin -addToClusterNodeLabels "x,y"

You can use the yarn cluster --list-node-labels command to confirm that node labels have been added:

[root@node-1 /]# yarn cluster --list-node-labels
14/11/21 13:09:55 INFO impl.TimelineClientImpl: Timeline service address: http://node-1.example.com:8188/ws/v1/timeline/
14/11/21 13:09:55 INFO client.RMProxy: Connecting to ResourceManager at node-1.example.com/192.0.2.0:8032
Node Labels: x,y
[root@node-1 /]# 

You can use the following command format to remove node labels:

yarn rmadmin -removeFromClusterNodeLabels "<label1>,<label2>"

You can use the following command format to add or replace node label assignments on cluster nodes:

yarn rmadmin -replaceLabelsOnNode "<node1>:<port>,<label1>,<label2> <node2>:<port>,<label1>,<label2>"

For example, the following commands assign node label "x" to "node-1.example.com", and node label "y" to "node-2.example.com".

sudo su yarn
yarn rmadmin -replaceLabelsOnNode "node-1.example.com,x node-2.example.com,y"

To remove node label assignments from a node, use -replaceLabelsOnNode, but do not specify any labels. For example, you would use the following commands to remove the "x" label from node-1.example.com:

sudo su yarn
yarn rmadmin -replaceLabelsOnNode "node-1.example.com"
5. Associating Node Labels with Queues

Now that we have created node labels, we can associate them with queues in the /etc/hadoop/conf/capacity-scheduler.xml file.

You must specify capacity on each node label of each queue, and also ensure that the sum of capacities of each node-label of direct children of a parent queue at every level is equal to 100%. Node labels that a queue can access (accessible node labels of a queue) must be the same as, or a subset of, the accessible node labels of its parent queue.

Example:

Assume that a cluster has a total of 8 nodes. The first 3 nodes (n1-n3) have node label=x, the next 3 nodes (n4-n6) have node label=y, and the final 2 nodes (n7, n8) do not have any node labels. Each node can run 10 containers.

The queue hierarchy is as follows:

Assume that queue “a” can access node labels “x” and “y”, and queue “b” can only access node label “y”. By definition, nodes without labels can be accessed by all queues.

Consider the following example label configuration for the queues:

capacity(a) = 40, capacity(a, label=x) = 100, capacity(a, label=y) = 50; capacity(b) = 60, capacity(b, label=y) = 50

This means that:

  • Queue “a” can access 40% of the resources on nodes without any labels, 100% of the resources on nodes with label=x, and 50% of the resources on nodes with label=y.

  • Queue “b” can access 60% of the resources on nodes without any labels, and 50% of the resources on nodes with label=y.

You can also see that for this configuration:

capacity(a) + capacity(b) = 100 capacity(a, label=x) + capacity(b, label=x) (b cannot access label=x, it is 0) = 100 capacity(a, label=y) + capacity(b, label=y) = 100

For child queues under the same parent queue, the sum of the capacity for each label should equal 100%.

Similarly, we can set the capacities of the child queues a1, a2, and b1:

a1 and a2: capacity(a.a1) = 40, capacity(a.a1, label=x) =30, capacity(a.a1, label=y) =50 capacity(a.a2) = 60, capacity(a.a2, label=x) =70, capacity(a.a2, label=y) =50 b1: capacity(b.b1) = 100 capacity(b.b1, label=y) = 100

You can see that for the a1 and a2 configuration:

capacity(a.a1) + capacity(a.a2) = 100 capacity(a.a1, label=x) + capacity(a.a2, label=x) = 100 capacity(a.a1, label=y) + capacity(a.a2, label=y) = 100

How many resources can queue a1 access?

Resources on nodes without any labels: Resource = 20 (total containers that can be allocated on nodes without label, in this case n7, n8) * 40% (a.capacity) * 40% (a.a1.capacity) = 3.2 (containers)

Resources on nodes with label=x

Resource = 30 (total containers that can be allocated on nodes with label=x, in this case n1-n3) * 100% (a.label-x.capacity) * 30% = 9 (containers)

To implement this example configuration, you would add the following properties in the /etc/hadoop/conf/capacity-scheduler.xml file.

  <property>
    <name>yarn.scheduler.capacity.root.queues</name>
    <value>a,b</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.accessible-node-labels.x.capacity</name>
    <value>100</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.accessible-node-labels.y.capacity</name>
    <value>100</value>
  </property>

  <!-- configuration of queue-a -->
  <property>
    <name>yarn.scheduler.capacity.root.a.accessible-node-labels</name>
    <value>x,y</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.capacity</name>
    <value>40</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.accessible-node-labels.x.capacity</name>
    <value>100</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.accessible-node-labels.y.capacity</name>
    <value>50</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.queues</name>
    <value>a1,a2</value>
  </property>

  <!-- configuration of queue-b -->
  <property>
    <name>yarn.scheduler.capacity.root.b.accessible-node-labels</name>
    <value>y</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.b.capacity</name>
    <value>60</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.b.accessible-node-labels.y.capacity</name>
    <value>50</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.b.queues</name>
    <value>b1</value>
  </property>

  <!-- configuration of queue-a.a1 -->
  <property>
    <name>yarn.scheduler.capacity.root.a.a1.accessible-node-labels</name>
    <value>x,y</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.a1.capacity</name>
    <value>40</value>
  </property>

  <property>
   <name>yarn.scheduler.capacity.root.a.a1.accessible-node-labels.x.capacity</name>
    <value>30</value>
  </property>

  <property>
   <name>yarn.scheduler.capacity.root.a.a1.accessible-node-labels.y.capacity</name>
    <value>50</value>
  </property>

  <!-- configuration of queue-a.a2 -->
  <property>
    <name>yarn.scheduler.capacity.root.a.a2.accessible-node-labels</name>
    <value>x,y</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.a2.capacity</name>
    <value>60</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.a.a2.accessible-node-labels.x.capacity</name>
    <value>70</value>
  </property>

  <property>
   <name>yarn.scheduler.capacity.root.a.a2.accessible-node-labels.y.capacity</name>
    <value>50</value>
  </property>

  <!-- configuration of queue-b.b1 -->
  <property>
    <name>yarn.scheduler.capacity.root.b.b1.accessible-node-labels</name>
    <value>y</value>
  </property>

  <property>
    <name>yarn.scheduler.capacity.root.b.b1.capacity</name>
    <value>100</value>
  </property>

  <property>
   <name>yarn.scheduler.capacity.root.b.b1.accessible-node-labels.y.capacity</name>
    <value>100</value>
  </property>
6. Refresh Queues

After adding or updating queue node label properties in the capacity-scheduler.xml file, you must run the following commands to refresh the queues:

sudo su yarn
yarn rmadmin -refreshQueues 
8. Confirm Node Label Assignments
    You can use the following commands to view information about node labels.
  • List all running nodes in the cluster: yarn node -list Example:

    [root@node-1 /]# yarn node -list
    14/11/21 12:14:06 INFO impl.TimelineClientImpl: Timeline service address: http://node-1.example.com:8188/ws/v1/timeline/
    14/11/21 12:14:07 INFO client.RMProxy: Connecting to ResourceManager at node-1.example.com/192.0.2.0:8032
    Total Nodes:3
             Node-Id	     Node-State	Node-Http-Address	Number-of-Running-Containers
    node-3.example.com:45454	        RUNNING	node-3.example.com:50060	                           0
    node-1.example.com:45454	        RUNNING	node-1.example.com:50060	                           0
    node-2.example.com:45454	        RUNNING	node-2.example.com:50060	                           0
    [root@node-1 /]# 
  • List all node labels in the cluster: yarn cluster --list-node-labels Example:

    [root@node-1 /]# yarn cluster --list-node-labels
    14/11/21 13:09:55 INFO impl.TimelineClientImpl: Timeline service address: http://node-1.example.com:8188/ws/v1/timeline/
    14/11/21 13:09:55 INFO client.RMProxy: Connecting to ResourceManager at node-1.example.com/192.0.2.0:8032
    Node Labels: x,y
    [root@node-1 /]# 
  • List the status of a node (includes node labels): yarn node -status <Node_ID> Example:

    [root@node-1 /]# yarn node -status node-1.example.com:45454
    14/11/21 06:32:35 INFO impl.TimelineClientImpl: Timeline service address: http://node-1.example.com:8188/ws/v1/timeline/
    14/11/21 06:32:35 INFO client.RMProxy: Connecting to ResourceManager at node-1.example.com/192.0.2.0:8032
    Node Report : 
    	Node-Id : node-1.example.com:45454
    	Rack : /default-rack
    	Node-State : RUNNING
    	Node-Http-Address : node-1.example.com:50060
    	Last-Health-Update : Fri 21/Nov/14 06:32:09:473PST
    	Health-Report : 
    	Containers : 0
    	Memory-Used : 0MB
    	Memory-Capacity : 1408MB
    	CPU-Used : 0 vcores
    	CPU-Capacity : 8 vcores
    	Node-Labels : x
    
    [root@node-1 /]# 

Node labels are also displayed in the ResourceManager UI on the Nodes and Scheduler pages.

Specifying a Child Queue with No Node Label

If no node label is specified for a child queue, it inherits the node label setting of its parent queue. To specify a child queue with no node label, use a blank space for the value of the node label. For example:

  <property>
    <name>yarn.scheduler.capacity.root.b.b1.accessible-node-labels</name>
    <value> </value>
  </property>
Setting a Default Queue Node Label Expression

You can set a default node label on a queue. The default node label will be used if no label is specified when the job is submitted.

For example, to set "x"as the default node label for queue "b1", you would add the following property in the capacity-scheduler.xml file.

<property>
  <name>yarn.scheduler.capacity.root.b.b1.default-node-label-expression</name>
   <value>x</value>
</property> 

Using Node Labels

ResourceManger UI

The ResourceManager UI displays the node labels on the Nodes page, and also on the Scheduler page.

Setting Node Labels when Submitting Jobs

You can specify a label (using the -node_label_expression parameter) and a queue when you submit YARN jobs using the distributed shell client. If the queue has a label that satisfies the label expression, it will run the job on the labeled node(s). If the label expression does not reference a label associated with the specified queue, the job will not run and an error will be returned.

For example, the following commands run a simple YARN distributed shell "sleep for a long time" job. In this example we are asking for more containers than the cluster can run so we can see which node the job runs on. We are specifying that the job should run on queue "a1", which our user has permission to run jobs on. We are also using the -label_expression parameter to specify that the job will run on all nodes with label "x".

sudo su yarn
hadoop jar /usr/phd/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar -shell_command "sleep 100" -jar /usr/phd/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar -num_containers 30 -queue a1 -node_label_expression x

If we run this job on the example cluster we configured previously, containers are allocated on node-1, as this node has been assigned node label "x", and queue "a1" also has node label "x":

The following commands run the same job that we specifies node label "x", but this time we will specify queue "b1" rather than queue "a1".

sudo su yarn
hadoop jar /usr/phd/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar -shell_command "sleep 100000" -jar /usr/phd/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar -num_containers 30 -queue b1 -node_label_expression x

When we attempt to run this job on our example cluster, the job will fail with the following error message because label "x" is not associated with queue "b1".

14/11/24 13:42:21 INFO distributedshell.Client: Submitting application to ASM
14/11/24 13:42:21 FATAL distributedshell.Client: Error running Client
org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException: Invalid resource request, queue=b1 doesn't have permission to access all labels in resource request. labelExpression of resource request=x. Queue labels=y
MapReduce Jobs and Node Labels

Currently you cannot specify a node label when submitting a MapReduce job. However, if you submit a MapReduce job to a queue that has a default node label expression, the default node label will be applied to the MapReduce job.

Using default node label expressions tends to constrain larger portions of the cluster, which at some point starts to become counter-productive for jobs -- such as MapReduce jobs -- that benefit from the advantages offered by distributed parallel processing.

Running Multiple MapReduce Versions Using the YARN Distributed Cache

Introduction

Beginning in PHD 3.0, multiple versions of the MapReduce framework can be deployed using the YARN Distributed Cache. By setting the appropriate configuration properties, you can run jobs using a different version of the MapReduce framework than the one currently installed on the cluster. Distributed cache ensures that the MapReduce job framework version is consistent throughout the entire job lifecycle. This enables you to maintain consistent results from MapReduce jobs during a rolling upgrade of the cluster. Without using Distributed Cache, a MapReduce job might start with one framework version, but finish with the new (upgrade) version, which could lead to unpredictable results.

YARN Distributed Cache enables you to efficiently distribute large read-only files ( text files, archives, .jar files, etc.) for use by YARN applications. Applications use URLs (hdfs://) to specify the files to be cached, and the Distributed Cache framework copies the necessary files to the applicable nodes before any tasks for the job are executed. Its efficiency stems from the fact that the files are copied only once per job, and archives are extracted after they are copied to the applicable nodes. Note that Distributed Cache assumes that the files to be cached (and specified via hdfs:// URLs) are already present on the HDFS file system and are accessible by every node in the cluster.

Configuring MapReduce for the YARN Distributed Cache
  • Copy the tarball that contains the version of MapReduce you would like to use into an HDFS directory that applications can access.

    $HADOOP_HOME/bin/hdfs dfs -put mapreduce.tar.gz /mapred/framework/
  • In the mapred-site.xml file, set the value of the mapreduce.application.framework.path property URL to point to the archive file you just uploaded. The URL allows you to create an alias for the archive if a URL fragment identifier is specified. In the following example, mr-framework is specified as the alias:

    <property>
      <name>mapreduce.application.framework.path</name>
      <value>hdfs:/mapred/framework/mapreduce.tar.gz#mr-framework</value>
    </property>
  • In the mapred-site.xml file, the default value of the mapreduce.application.classpath uses the ${phd.version} environment variable to reference the currently installed version of PHD:

    <property>
      <name>mapreduce.application.classpath</name>
      <value>$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/phd/${phd.version}/hadoop/lib/hadoop-lzo-0.6.0.${phd.version}.jar</value>
    </property>

    Change the value of the mapreduce.application.classpath property to reference the applicable version of the MapReduce framework .jar files. In this case we need to replace ${phd.version} with the applicable PHD version, which in our example is 3.0.0.0-249. Note that in the following example the mr-framework alias is used in the path references.

    <property>
      <name>mapreduce.application.classpath</name>      
    <value>$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:/usr/phd/3.0.0.0-249/hadoop/lib/hadoop-lzo-0.6.0.3.0.0.0-249.jar</value>
    </property>

    With this configuration in place, MapReduce jobs will run on the version 3.0.0.0-249 framework referenced in the mapred-site.xml file.

    You can upload multiple versions of the MapReduce framework to HDFS and create a separate mapred-site.xml file to reference each version of the framework. Users can then run jobs against a specific version by referencing the applicable mapred-site.xml file. The following example would run a MapReduce job on version 2.1 of the MapReduce framework:

    hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar pi -conf etc/phd-2.1.0.0/mapred-site.xml 10 10

    You can use the ApplicationMaster log file to confirm that the job ran on the localized version of MapReduce on the Distributed Cache. For example:

    2014-06-10 08:19:30,199 INFO [main] org.mortbay.log: Extract jar: file:/<nm-local-dirs>/filecache/10/hadoop-2.3.0.tar.gz/hadoop-2.3.0/share/hadoop/yarn/hadoop-yarn-common-2.3.0.jar!/webapps/mapreduce to /tmp/Jetty_0_0_0_0_42544_mapreduce____.pryk9q/webapp
Limitations

Support for deploying the MapReduce framework via the YARN Distributed Cache currently does not address the job client code used to submit and query jobs. It also does not address the ShuffleHandler code that runs as an auxiliary service within each NodeManager. Therefore, the following limitations apply to MapReduce versions that can be successfully deployed via the Distributed Cache:

  • The MapReduce version must be compatible with the job client code used to submit and query jobs. If it is incompatible, the job client must be upgraded separately on any node on which jobs are submitted using the new MapReduce version.

  • The MapReduce version must be compatible with the configuration files used by the job client submitting the jobs. If it is incompatible with that configuration (that is, a new property must be set, or an existing property value must be changed), the configuration must be updated before submitting jobs.

  • The MapReduce version must be compatible with the ShuffleHandler version running on the cluster nodes. If it is incompatible, the new ShuffleHandler code must be deployed to all nodes in the cluster, and the NodeManagers must be restarted to pick up the new ShuffleHandler code.

Troubleshooting Tips
  • You can use the ApplicationMaster log file to check the version of MapReduce being used by a running job. For example:

    2014-11-20 08:19:30,199 INFO [main] org.mortbay.log: Extract jar: file:/<nm-local-dirs>/filecache/{...}/hadoop-2.6.0.tar.gz/hadoop-2.6.0/share/hadoop/yarn/hadoop-yarn-common-2.6.0.jar!/webapps/mapreduce to /tmp/Jetty_0_0_0_0_42544_mapreduce____.pryk9q/webapp
  • If shuffle encryption is enabled, MapReduce jobs may fail with the following exception:

    2014-10-10 02:17:16,600 WARN [fetcher#1] org.apache.hadoop.mapreduce.task.reduce.Fetcher: Failed to connect to junping-du-centos6.x-3.cs1cloud.internal:13562 with 1 map outputs
    javax.net.ssl.SSLHandshakeException: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        at com.sun.net.ssl.internal.ssl.Alerts.getSSLException(Alerts.java:174)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.fatal(SSLSocketImpl.java:1731)
        at com.sun.net.ssl.internal.ssl.Handshaker.fatalSE(Handshaker.java:241)
        at com.sun.net.ssl.internal.ssl.Handshaker.fatalSE(Handshaker.java:235)
        at com.sun.net.ssl.internal.ssl.ClientHandshaker.serverCertificate(ClientHandshaker.java:1206)
        at com.sun.net.ssl.internal.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:136)
        at com.sun.net.ssl.internal.ssl.Handshaker.processLoop(Handshaker.java:593)
        at com.sun.net.ssl.internal.ssl.Handshaker.process_record(Handshaker.java:529)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:925)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.performInitialHandshake(SSLSocketImpl.java:1170)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1197)
        at com.sun.net.ssl.internal.ssl.SSLSocketImpl.startHandshake(SSLSocketImpl.java:1181)
        at sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:434)
        at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.setNewClient(AbstractDelegateHttpsURLConnection.java:81)
        at sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.setNewClient(AbstractDelegateHttpsURLConnection.java:61)
        at sun.net.www.protocol.http.HttpURLConnection.writeRequests(HttpURLConnection.java:584)
        at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1193)
        at java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:379)
        at sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:318)
        at org.apache.hadoop.mapreduce.task.reduce.Fetcher.verifyConnection(Fetcher.java:427)
    ....

    To fix this problem, create a sub-directory under $HADOOP_CONF ($HADOOP_HOME/etc/hadoop by default), and copy the ssl-client.xml file to that directory. Add this new directory path (/etc/hadoop/conf/secure) to the MapReduce classpath specified in mapreduce.application.classpath in the mapred-site.xml file.

Timeline Server (Technical Preview)

This guide describes how to configure and run the Timeline Server, which enables you to collect generic and per-framework information about YARN applications.

Introduction

The Timeline Server maintains historical state and provides metrics visibility for YARN applications, similar to the functionality the Job History Server provides for MapReduce.

    The Timeline Server provides the following information:
  • Generic Information about Completed Applications Generic information includes application-level data such as queue name, user information, information about application attempts, a list of Containers that were run under each application attempt, and information about each Container. Generic data about completed applications can be accessed using the web UI or via REST APIs.

  • Per-Framework Information for Running and Completed Applications Per-framework information is specific to an application or framework. For example, the Hadoop MapReduce framework can include pieces of information such as the number of map tasks, reduce tasks, counters, etc. Application developers can publish this information to the Timeline Server via the TimelineClient (from within a client), the ApplicationMaster, or the application's Containers. This information can then be queried via REST APIs that enable rendering by application/framework-specific UIs.

The Timeline Server is a stand-alone server daemon that is deployed to a cluster node. It may or may not be co-located with the ResourceManager.

Configuring the Timeline Server

Required Properties
    Only one property needs to be specified in the etc/hadoop/conf/yarn-site.xml file in order to enable the Timeline Server:
  • yarn.timeline-service.hostname The host name of the Timeline Server web application. Example:

    <property>
         <description>The hostname of the timeline server web application.</description>
         <name>yarn.timeline-service.hostname</name>
         <value>0.0.0.0</value>
    </property>
Advanced Properties

In addition to the host name, administrators can also configure the ports of the RPC and the web interfaces, as well as the number of RPC handler threads.

  • yarn.timeline-service.address The default address for the Timeline Server to start the RPC server. Example:

    <property>
         <description>This is default address for the timeline server to start the RPC server.</description>
         <name>yarn.timeline-service.address</name>
         <value>${yarn.timeline-service.hostname}:10200</value>
    </property>
  • yarn.timeline-service.webapp.address The HTTP address of the Timeline Server web application. Example:

    <property>
         <description>The http address of the timeline server web application.</description>
         <name>yarn.timeline-service.webapp.address</name>
         <value>${yarn.timeline-service.hostname}:8188</value>
    </property>
  • yarn.timeline-service.webapp.https.address The HTTPS address of the Timeline Server web application. Example:

    <property>
         <description>The https adddress of the timeline server web application.</description>
         <name>yarn.timeline-service.webapp.https.address</name>
         <value>${yarn.timeline-service.hostname}:8190</value>
    </property>
  • yarn.timeline-service.handler-thread-count The handler thread count to serve the client RPC requests. Example:

    <property>
         <description>Handler thread count to serve the client RPC requests.</description>
         <name>yarn.timeline-service.handler-thread-count</name>
         <value>10</value>
    </property>

Enabling Generic Data Collection

  • yarn.resourcemanager.system-metrics-publisher.enabled This property indicates to the ResourceManager, as well as to clients, whether or not the Generic History Service (GHS) is enabled. If the GHS is enabled, the ResourceManager begins recording historical data that the GHS can consume, and clients can redirect to the GHS when applications finish running. Example:

    <property>
         <description>Enable or disable the GHS</description>
         <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
         <value>true</value>
    </property>

Configuring Per-Framework Data Collection

  • yarn.timeline-service.enabled Indicates to clients whether or not the Timeline Server is enabled. If it is enabled, the TimelineClient library used by end-users will post entities and events to the Timeline Server. Example:

    <property>
         <description>Enable or disable the Timeline Server.</description>
         <name>yarn.timeline-service.enabled</name>
         <value>true</value>
    </property>

Configuring the Timeline Server Store

  • yarn.timeline-service.store-class The class name for the Timeline store. Example:

    <property>
         <description>Store class name for timeline store</description>
         <name>yarn.timeline-service.store-class</name>
         <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
    </property>
  • yarn.timeline-service.leveldb-timeline-store.path The store file path and name for the Timeline Server LevelDB store (if the LevelDB store is used). Example:

    <property>
         <description>Store file name for leveldb timeline store</description>
         <name>yarn.timeline-service.leveldb-timeline-store.path</name>
         <value>${yarn.log.dir}/timeline</value>
    </property>
  • yarn.timeline-service.ttl-enable Enable age-off of timeline store data. Example:

    <property>
      <description>Enable age off of timeline store data.</description>
      <name>yarn.timeline-service.ttl-enable</name>
      <value>true</value>
    </property>
  • yarn.timeline-service.ttl-ms The Time-to-live for timeline store data (in milliseconds). Example:

    <property>
      <description>Time to live for timeline store data in milliseconds.</description>
      <name>yarn.timeline-service.ttl-ms</name>
      <value>604800000</value>
    </property>

Configuring Timeline Server Security

Configuring Kerberos Authentication

To configure Kerberos Authentication for the Timeline Server, add the following properties to the yarn-site.xml file.

<property>
  <name>yarn.timeline-service.http-authentication.type</name>
  <value>kerberos</value>
</property>

<property>
  <name>yarn.timeline-service.http-authentication.kerberos.principal</name>
  <value>HTTP/localhost@EXAMPLE.COM</value>
</property>

<property>
  <name>yarn.timeline-service.http-authentication.kerberos.keytab</name>
  <value>/etc/krb5.keytab</value>
</property>

Configuring Timeline Server Authorization (ACLs)

Timeline Server ACLs are configured in the same way as other YARN ACLs. To configure Timeline Server authorization with ACLs, add the following properties to the yarn-site.xml file.

<property>
  <name>yarn.acl.enable</name>
  <value>true</value>
</property>

<property>
  <name>yarn.admin.acl</name>
  <value> </value>
</property>

Configuring Timeline Server SSL

Timeline Server SSL is configured in the same way as other Hadoop components. To configure Timeline Server SSL, add the following properties to the core-site.xml file.

<property>
  <name>hadoop.ssl.require.client.cert</name>
  <value>false</value>
</property>

<property>
  <name>hadoop.ssl.hostname.verifier</name>
  <value>DEFAULT</value>
</property>

<property>
  <name>hadoop.ssl.keystores.factory.class</name>
  <value>org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory</value>
</property>

<property>
  <name>hadoop.ssl.server.conf</name>
  <value>ssl-server.xml</value>
</property>

<property>
  <name>hadoop.ssl.client.conf</name>
  <value>ssl-client.xml</value>
</property>

Running the Timeline Server

To start the Timeline Server, run the following command:

yarn timelineserver

To start the Timeline Server as a daemon, run the following command:

sbin/yarn-daemon.sh start timelineserver

Accessing Generic Data from the Command- Line

You can use the following commands to access application generic history data from the command-line. Note that these same commands can be used to obtain corresponding information about running applications.

yarn application -status <Application ID>
yarn applicationattempt -list <Application ID>
yarn applicationattempt -status <Application Attempt ID>
yarn container -list <Application Attempt ID>
yarn container -status <Container ID>

Publishing Per-Framework Data in Applications

Developers can define the information they would like to record for their applications by composing TimelineEntity and TimelineEvent objects, and then putting the entities and events to the Timeline server via TimelineClient. For example:

  // Create and start the Timeline client
  TimelineClient client = TimelineClient.createTimelineClient();
  client.init(conf);
  client.start();

  TimelineEntity entity = null;
  // Compose the entity
  try {
    TimelinePutResponse response = client.putEntities(entity);
  } catch (IOException e) {
    // Handle the exception
  } catch (YarnException e) {
    // Handle the exception
  }

  // Stop the Timeline client
  client.stop();

Using the YARN REST APIs to Manage Applications

This guide describes how to use the YARN REST APIs to submit, monitor, and kill applications.

Get an Application ID

You can use the New Application API to get an application ID, which can then be used to submit an application. For example:

curl -v -X POST 'http://localhost:8088/ws/v1/cluster/apps/new-application'

The response returns the application ID, and also includes the maximum resource capabilities available on the cluster. For example:

{
application-id: application_1409421698529_0012", 
"maximum-resource-capability":{"memory":"8192","vCores":"32"}
} 

Set Up an Application .json File

Before you submitting an application, you must set up a .json file with the parameters required by the application. This is analogous to creating your own ApplicationMaster. The application .json file contains all of the fields you are required to submit in order to launch the application.

The following is an example of an application .json file:

  {
    "application-id":"application_1404203615263_0001",
    "application-name":"test",
    "am-container-spec":
    {
      "local-resources":
      {
        "entry":
        [
          {
            "key":"AppMaster.jar",
            "value":
            {
              "resource":"hdfs://hdfs-namenode:9000/user/testuser/DistributedShell/demo-app/AppMaster.jar",
              "type":"FILE",
              "visibility":"APPLICATION",
              "size": "43004",
              "timestamp": "1405452071209"
            }
          }
        ]
      },
      "commands":
      {
        "command":"{{JAVA_HOME}}/bin/java -Xmx10m org.apache.hadoop.yarn.applications.distributedshell.ApplicationMaster --container_memory 10 --container_vcores 1 --num_containers 1 --priority 0 1><LOG_DIR>/AppMaster.stdout 2><LOG_DIR>/AppMaster.stderr"
      },
      "environment":
      {
        "entry":
        [
          {
            "key": "DISTRIBUTEDSHELLSCRIPTTIMESTAMP",
            "value": "1405459400754"
          },
          {
            "key": "CLASSPATH",
            "value": "{{CLASSPATH}}<CPS>./*<CPS>{{HADOOP_CONF_DIR}}<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/*<CPS>{{HADOOP_COMMON_HOME}}/share/hadoop/common/lib/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/*<CPS>{{HADOOP_HDFS_HOME}}/share/hadoop/hdfs/lib/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/*<CPS>{{HADOOP_YARN_HOME}}/share/hadoop/yarn/lib/*<CPS>./log4j.properties"
          },
          {
            "key": "DISTRIBUTEDSHELLSCRIPTLEN",
            "value": "6"
          },
          {
            "key": "DISTRIBUTEDSHELLSCRIPTLOCATION",
            "value": "hdfs://hdfs-namenode:9000/user/testuser/demo-app/shellCommands"
          }
        ]
      }
    },
    "unmanaged-AM":"false",
    "max-app-attempts":"2",
    "resource":
    {
      "memory":"1024",
      "vCores":"1"
    },
    "application-type":"YARN",
    "keep-containers-across-application-attempts":"false"
  }

Submit an Application

You can use the Submit Application API to submit applications. For example:

curl -v -X POST -d @example-submit-app.json -H "Content-type: application/json" 'http://localhost:8088/ws/v1/cluster/apps'

After you submit an application the response includes the following field:

HTTP/1.1 202 Accepted

The response also includes the Location field, which you can use to get the status of the application (app ID). The following is an example of a returned Location code:

Location: http://localhost:8088/ws/v1/cluster/apps/application_1409421698529_0012

Monitor an Application

You can use the Application State API to query the application state. To return only the state of a running application, use the following command format:

curl 'http://localhost:8088/ws/v1/cluster/apps/application_1409421698529_0012/state'

You can also use the value of the Location field (returned in the application submission response) to check the application status. For example:

curl -v 'http://localhost:8088/ws/v1/cluster/apps/application_1409421698529_0012'

You can use the following command format to check the logs:

yarn logs -appOwner 'dr.who' -applicationId application_1409421698529_0012 | less

Kill an Application

You can also use the Application State API to kill an application by using a PUT operation to set the application state to KILLED. For example:

curl -v -X PUT -d '{"state": "KILLED"}' 'http://localhost:8088/ws/v1/cluster/apps/application_1409421698529_0012'

Access the Apache YARN REST API Specification

For more information, see the Apache YARN REST APIs documentation.

Work-Preserving Restart

This guide describes how to configure YARN to preserve the work of running applications in the event of a ResourceManager or NodeManager restart. Work-preserving ResourceManager and NodeManager restart ensures that node restart or fail-over is completely transparent to end-users, with minimal impact to running applications.

Configuring the ResourceManager for Work-Preserving Restart

Work-preserving ResourceManager restart ensures that applications continuously function during a ResourceManager restart with minimal impact to end-users. The overall concept is that the ResourceManager preserves application queue state in a pluggable state store, and reloads that state on restart. While the ResourceManager is down, ApplicationMasters and NodeManagers continuously poll the ResourceManager until it restarts. When the ResourceManager comes back online, the ApplicationMasters and NodeManagers re-register with the newly started ResourceManger. When the ResourceManager restarts, it also recovers container information by absorbing the container statuses sent from all NodeManagers. Thus, no work will be lost due to a ResourceManager crash-reboot event

To configure work-preserving restart for the ResourceManager, set the following properties in the yarn-site.xml file.

Property: yarn.resourcemanager.recovery.enabled Value: true Description: Enables ResourceManager restart. The default value is false. If this configuration property is set to true, running applications will resume when the ResourceManager is restarted.

Example:

  <property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
  </property>

Property: yarn.resourcemanager.store.class Value: <specified_state_store>

Description: Specifies the state-store used to store application and application-attempt state and other credential information to enable restart. The available state-store implementations are:

org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore – a state-store implementation persisting state to a file system such as HDFS. This is the default value. org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore - a LevelDB-based state-store implementation. org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore – a ZooKeeper-based state-store implementation.

Example:

  <property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
  </property>
FileSystemRMStateStore Configuration

The following properties apply only if org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore has been specified as the state-store in the yarn.resourcemanager.store.class property.

Property: yarn.resourcemanager.fs.state-store.uri Value: <hadoop.tmp.dir>/yarn/system/rmstore Description: The URI pointing to the location of the file system path where the RM state will be stored (e.g. hdfs://localhost:9000/rmstore). The default value is <hadoop.tmp.dir>/yarn/system/rmstore.

Example:

  <property>
    <name>yarn.resourcemanager.fs.state-store.uri</name>
    <value>hdfs://localhost:9000/rmstore</value>
  </property

Property: yarn.resourcemanager.fs.state-store.retry-policy-spec Value: 2000, 500 Description: The Hadoop FileSystem client retry policy specification. Hadoop FileSystem client retry is always enabled. This is pecified in pairs of sleep-time and number-of-retries i.e. (t0, n0), (t1, n1), ..., the first n0 retries sleep t0 milliseconds on average, the following n1 retries sleep t1 milliseconds on average, and so on. The default value is (2000, 500).

Example:

  <property>
    <name>yarn.resourcemanager.fs.state-store.retry-policy-spec</name>
    <value>2000, 500</value>
  </property
LeveldbRMStateStore Configuration

The following properties apply only if org.apache.hadoop.yarn.server.resourcemanager.recovery.LeveldbRMStateStore has been specified as the state-store in the yarn.resourcemanager.store.class property.

Property: yarn.resourcemanager.leveldb-state-store.path Value: <hadoop.tmp.dir>/yarn/system/rmstore Description: The local path where the RM state will be stored.

Example:

  <property>
    <name>yarn.resourcemanager.leveldb-state-store.path</name>
    <value><hadoop.tmp.dir>/yarn/system/rmstore</value>
  </property
ZKRMStateStore Configuration

The following properties apply only if org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore has been specified as the state-store in the yarn.resourcemanager.store.class property.

Property: yarn.resourcemanager.zk-address Value: <host>:<port> Description: A comma-separated list of <host>:<port> pairs, each corresponding to a server in a ZooKeeper cluster where the ResourceManager state will be stored.

Example:

  <property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>127.0.0.1:2181</value>
  </property

Property: yarn.resourcemanager.zk-state-store.parent-path Value: /rmstore Description: The full path of the root znode where RM state will be stored. The default value is /rmstore.

Example:

  <property>
    <name>yarn.resourcemanager.zk-state-store.parent-path</name>
    <value>/rmstore</value>
  </property

Property: yarn.resourcemanager.zk-num-retries Value: 500

Description: The number of times the ZooKeeper-client running inside the ZKRMStateStore tries to connect to ZooKeeper in case of connection timeouts. The default value is 500.

Example:

  <property>
    <name>yarn.resourcemanager.zk-num-retries</name>
    <value>500</value>
  </property

Property: yarn.resourcemanager.zk-retry-interval-ms Value: 2000

Description: The interval in milliseconds between retries when connecting to a ZooKeeper server. The default value is 2 seconds.

Example:

  <property>
    <name>yarn.resourcemanager.zk-retry-interval-ms</name>
    <value>2000</value>
  </property

Property: yarn.resourcemanager.zk-timeout-ms Value: 10000

Description: The ZooKeeper session timeout in milliseconds. This configuration is used by the ZooKeeper server to determine when the session expires. Session expiration happens when the server does not hear from the client (i.e. no heartbeat) within the session timeout period specified by this property. The default value is 10 seconds.

Example:

  <property>
    <name>yarn.resourcemanager.zk-timeout-ms</name>
    <value>10000</value>
  </property

Property: yarn.resourcemanager.zk-acl Value: world:anyone:rwcda

Description: The ACLs to be used for setting permissions on ZooKeeper znodes. The default value is world:anyone:rwcda. Example

<property> 
 <name>yarn.resourcemanager.zk-acl</name>
 <value>world:anyone:rwcda</value> 
</property>

Configuring NodeManagers for Work-Preserving Restart

NodeManager work-preserving enables a NodeManager to be restarted without losing the active containers running on the node. At a high level, the NodeManager stores any necessary state to a local state store as it processes container management requests. When the NodeManager restarts, it recovers by first loading the state for various subsystems, and then lets those subsystems perform recovery using the loaded state.

To configure work-preserving restart for NodeManagers, set the following properties in the yarn-site.xml file on all NodeManagers in the cluster.

Property: yarn.nodemanager.recovery.enabled Value: true

Description: Enables the NodeManager to recover after a restart.

Example:

  <property
    <name>yarn.nodemanager.recovery.enabled</name>
    <value>true</value>
  </property>

Property: yarn.nodemanager.recovery.dir Value: <yarn_log_dir_prefix>/nodemanager/recovery-state

Description: The local file system directory in which the NodeManager will store state information when recovery is enabled.

Example:

  <property>
    <name>yarn.nodemanager.recovery.dir</name>
    <value><yarn_log_dir_prefix>/nodemanager/recovery-state</value>
  </property>

You should also confirm that the yarn.nodemanager.address port is set to a non-zero value, e.g. 45454:

  <property>
    <name>yarn.nodemanager.address</name>
    <value>0.0.0.0:45454</value>
  </property>