Hadoop3.2.1 【 YARN 】源码分析 : RMNode状态机浅析
一. 前言
ResourceManager中用来维护一个节点生命周期的数据结构是RMNode。 RMNode记录了节点可能存在的各个状态, 由RMNodeImpl类实现, 各状态之间的转换由事件触发。
二.RMNode状态机
2.1. RMNode状态机概述
RMNode是ResourceManager中用于维护一个节点生命周期的数据结构, 它的实现是RMNodeImpl类, 该类维护了一个节点状态机, 记录了节点可能存在的各个状态以及导致状态间转换的事件。 当某个事件发生时, RMNodeImpl会根据实际情况进行节点状态转移, 同时触发一个行为 .
RMNode有8种基本状态(Node) 和16种导致状态之间发生转移的事件
2.2. RMNode基本状态
❑ NEW: 状态机初始状态, 每个NodeManager对应一个状态机, 而每个状态机的初始状态均为NEW。
❑ RUNNING: NodeManager启动后, 会通过RPC函数ResourceTracker#registerNodeManager向ResourceManager注册, 此时NodeManager会进入RUNNING状态。
❑ UNHEALTHY: 管理员可在每个NodeManager上配置一个健康状况监测脚本,NodeManager中有一个专门线程周期性执行该脚本, 以判定NodeManager是否处于健康状态。 NodeManager会通过心跳机制将脚本执行结果汇报给Resource-Manager, 如果
ResourceManager发现它处于不健康状态下, 则会将其状态置为UNHEALTHY, 此后ResouceManager不会再为该节点分配新的任务, 直到它重新变为健康状态。 关于健康状况监测脚本的介绍, 可阅读第7章。
❑ DECOMMSIONED: 如果一个节点被加入到exlude list( 黑名单) 中, 则对应的NodeManager将被置为DECOMMSIONED状态, 这样, 该NodeManager将无法与ResourceManager通信( 直接在RPC层抛出异常导致NodeManager异常退出) 。
❑ LOST: ResourceManager中的组件NMLivelinessMonitor会跟踪每一个NodeManager的心跳信息, 如果一个NodeManager在一定时间间隔内未汇报心跳信息, 则认为它死掉了, RMNodeImpl会将其置为LOST状态, 之后它上面所有正运行的Container信息将被置为FAILED。
❑ REBOOTED: 如果ResourceManager发现NodeManager汇报的心跳ID与自己保存的不一致, 则会将其置为REBOOTED状态, 从而要求它重新启动以达到同步的目的。
❑ DECOMMISSIONING: node退役中…
❑ SHUTDOWN:node优雅退出.
2.3. RMNode基本事件
基本RMNodeEventType事件包括:
❑ STARTED: NodeManager启动后, 会通过RPC函数ResourceTracker#registerNodeManager向RM注册, 此时会触发STARTED事件。
// Source: AdminService
❑ DECOMMISSION: 当一个节点被加入exlude list中后, 它上面的NodeManager尝试通过RPC函数ResourceTracker#nodeHeartbeat与NodeManager通信时, 会触发一个DECOMMISSION事件。
❑ GRACEFUL_DECOMMISSION: 优雅退役
❑ RECOMMISSION:重新分配
// Source: AdminService, ResourceTrackerService
❑ RESOURCE_UPDATE: 资源更新
// ResourceTrackerService
❑ STATUS_UPDATE: NodeManager会通过RPC函数ResourceTracker#nodeHeartbeat周期性向RM汇报心跳信息, 而每次汇报心跳均会触发一个STATUS_UPDATE事件。
❑ REBOOTING: 当ResourceManager发现NodeManager汇报的心跳ID与自己保存的不一致时, 会触发一个REBOOTING事件。
❑ RECONNECTED: 如果一个已经在ResourceManager上注册过的NodeManager再次请求注册, ResourceManager会触发一个RECONNECTED事件, 而RMNodeImpl收到该事件后将更新NodeManager信息为新注册汇报的信息。
❑ SHUTDOWN:关闭
// Source: Application
❑ CLEANUP_APP: 当一个应用程序执行完成时(可能成功或则失败) , 会触发一个CLEANUP_APP事件, 以清理应用程序占用的资源。
// Source: Container
❑ CONTAINER_ALLOCATED:Container 分配
❑ CLEANUP_CONTAINER: 当一个Container执行完成时(可能成功或则失败) , 会触发一个CLEANUP_CONTAINER事件, 以清理Container占用的资源。
❑ UPDATE_CONTAINER:更新Container
// Source: ClientRMService
❑ SIGNAL_CONTAINER:
// Source: RMAppAttempt
❑ FINISHED_CONTAINERS_PULLED_BY_AM:
// Source: NMLivelinessMonitor
❑ EXPIRE: ResourceManager中的组件NMLivelinessMonitor会跟踪每一个NodeManager的心跳信息, 如果一个NodeManager在一
定时间间隔内未汇报心跳,NMLiveliness-Monitor会触发一个EXPIRE事件。
三.属性
private static final RecordFactory recordFactory = RecordFactoryProvider
.getRecordFactory(null);
private final ReadLock readLock;
private final WriteLock writeLock;
private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
private volatile boolean nextHeartBeat = true;
private final NodeId nodeId;
private final RMContext context;
private final String hostName;
private final int commandPort;
private int httpPort;
private final String nodeAddress; // The containerManager address
private String httpAddress;
/* Snapshot of total resources before receiving decommissioning command */
private volatile Resource originalTotalCapability;
private volatile Resource totalCapability;
private final Node node;
private String healthReport;
private long lastHealthReportTime;
private String nodeManagerVersion;
private Integer decommissioningTimeout;
private long timeStamp;
/* Aggregated resource utilization for the containers. */
private ResourceUtilization containersUtilization;
/* Resource utilization for the node. */
private ResourceUtilization nodeUtilization;
/** Physical resources in the node. */
private volatile Resource physicalResource;
/* Container Queue Information for the node.. Used by Distributed Scheduler */
private OpportunisticContainersStatus opportunisticContainersStatus;
private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
private final Set<ContainerId> launchedContainers =
new HashSet<ContainerId>();
/* track completed container globally */
private final Set<ContainerId> completedContainers =
new HashSet<ContainerId>();
/* set of containers that need to be cleaned */
private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
new ContainerIdComparator());
/* set of containers that need to be signaled */
private final List<SignalContainerRequest> containersToSignal =
new ArrayList<SignalContainerRequest>();
/* * set of containers to notify NM to remove them from its context. Currently, * this includes containers that were notified to AM about their completion */
private final Set<ContainerId> containersToBeRemovedFromNM =
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */
private final List<ApplicationId> finishedApplications =
new ArrayList<ApplicationId>();
/* the list of applications that are running on this node */
private final List<ApplicationId> runningApplications =
new ArrayList<ApplicationId>();
private final Map<ContainerId, Container> toBeUpdatedContainers =
new HashMap<>();
// NOTE: This is required for backward compatibility.
private final Map<ContainerId, Container> toBeDecreasedContainers =
new HashMap<>();
private final Map<ContainerId, Container> nmReportedIncreasedContainers =
new HashMap<>();
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
// 剩下来个有点长, 是 StateMachineFactory 和 stateMachine
四.构造方法
代码:
public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
int cmPort, int httpPort, Node node, Resource capability,
String nodeManagerVersion, Resource physResource) {
this.nodeId = nodeId;
this.context = context;
this.hostName = hostName;
this.commandPort = cmPort;
this.httpPort = httpPort;
this.totalCapability = capability;
this.nodeAddress = hostName + ":" + cmPort;
this.httpAddress = hostName + ":" + httpPort;
this.node = node;
this.healthReport = "Healthy";
this.lastHealthReportTime = System.currentTimeMillis();
this.nodeManagerVersion = nodeManagerVersion;
this.timeStamp = 0;
this.physicalResource = physResource;
this.latestNodeHeartBeatResponse.setResponseId(0);
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
this.readLock = lock.readLock();
this.writeLock = lock.writeLock();
this.stateMachine = stateMachineFactory.make(this);
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
this.containerAllocationExpirer = context.getContainerAllocationExpirer();
}
五.方法
方法我就不细说了, 基本就是获取Node的一些基础信息
比较特殊的就是handle方法, 但也是根据事件交由状态机处理任务.
public void handle(RMNodeEvent event) {
LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
try {
writeLock.lock();
NodeState oldState = getState();
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitionException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Invalid event " + event.getType() +
" on Node " + this.nodeId + " oldState " + oldState);
}
if (oldState != getState()) {
LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
+ getState());
}
}
finally {
writeLock.unlock();
}
}
还没有评论,来说两句吧...