Hadoop3.2.1 【 YARN 】源码分析 : RMNode状态机浅析

╰半橙微兮° 2023-01-01 03:49 198阅读 0赞

一. 前言

ResourceManager中用来维护一个节点生命周期的数据结构是RMNode。 RMNode记录了节点可能存在的各个状态, 由RMNodeImpl类实现, 各状态之间的转换由事件触发。

二.RMNode状态机

2.1. RMNode状态机概述

RMNode是ResourceManager中用于维护一个节点生命周期的数据结构, 它的实现是RMNodeImpl类, 该类维护了一个节点状态机, 记录了节点可能存在的各个状态以及导致状态间转换的事件。 当某个事件发生时, RMNodeImpl会根据实际情况进行节点状态转移, 同时触发一个行为 .

RMNode有8种基本状态(Node) 和16种导致状态之间发生转移的事件

在这里插入图片描述

2.2. RMNode基本状态

NEW: 状态机初始状态, 每个NodeManager对应一个状态机, 而每个状态机的初始状态均为NEW。

RUNNING: NodeManager启动后, 会通过RPC函数ResourceTracker#registerNodeManager向ResourceManager注册, 此时NodeManager会进入RUNNING状态。

UNHEALTHY: 管理员可在每个NodeManager上配置一个健康状况监测脚本,NodeManager中有一个专门线程周期性执行该脚本, 以判定NodeManager是否处于健康状态。 NodeManager会通过心跳机制将脚本执行结果汇报给Resource-Manager, 如果
ResourceManager发现它处于不健康状态下, 则会将其状态置为UNHEALTHY, 此后ResouceManager不会再为该节点分配新的任务, 直到它重新变为健康状态。 关于健康状况监测脚本的介绍, 可阅读第7章。

DECOMMSIONED: 如果一个节点被加入到exlude list( 黑名单) 中, 则对应的NodeManager将被置为DECOMMSIONED状态, 这样, 该NodeManager将无法与ResourceManager通信( 直接在RPC层抛出异常导致NodeManager异常退出) 。

LOST: ResourceManager中的组件NMLivelinessMonitor会跟踪每一个NodeManager的心跳信息, 如果一个NodeManager在一定时间间隔内未汇报心跳信息, 则认为它死掉了, RMNodeImpl会将其置为LOST状态, 之后它上面所有正运行的Container信息将被置为FAILED。

REBOOTED: 如果ResourceManager发现NodeManager汇报的心跳ID与自己保存的不一致, 则会将其置为REBOOTED状态, 从而要求它重新启动以达到同步的目的。

DECOMMISSIONING: node退役中…

SHUTDOWN:node优雅退出.

2.3. RMNode基本事件

在这里插入图片描述

基本RMNodeEventType事件包括:

STARTED: NodeManager启动后, 会通过RPC函数ResourceTracker#registerNodeManager向RM注册, 此时会触发STARTED事件。

// Source: AdminService
DECOMMISSION: 当一个节点被加入exlude list中后, 它上面的NodeManager尝试通过RPC函数ResourceTracker#nodeHeartbeat与NodeManager通信时, 会触发一个DECOMMISSION事件。

GRACEFUL_DECOMMISSION: 优雅退役

RECOMMISSION:重新分配

// Source: AdminService, ResourceTrackerService
RESOURCE_UPDATE: 资源更新

// ResourceTrackerService

STATUS_UPDATE: NodeManager会通过RPC函数ResourceTracker#nodeHeartbeat周期性向RM汇报心跳信息, 而每次汇报心跳均会触发一个STATUS_UPDATE事件。

REBOOTING: 当ResourceManager发现NodeManager汇报的心跳ID与自己保存的不一致时, 会触发一个REBOOTING事件。
RECONNECTED: 如果一个已经在ResourceManager上注册过的NodeManager再次请求注册, ResourceManager会触发一个RECONNECTED事件, 而RMNodeImpl收到该事件后将更新NodeManager信息为新注册汇报的信息。
SHUTDOWN:关闭

// Source: Application
CLEANUP_APP: 当一个应用程序执行完成时(可能成功或则失败) , 会触发一个CLEANUP_APP事件, 以清理应用程序占用的资源。

// Source: Container
CONTAINER_ALLOCATED:Container 分配
CLEANUP_CONTAINER: 当一个Container执行完成时(可能成功或则失败) , 会触发一个CLEANUP_CONTAINER事件, 以清理Container占用的资源。
UPDATE_CONTAINER:更新Container

// Source: ClientRMService
SIGNAL_CONTAINER

// Source: RMAppAttempt
FINISHED_CONTAINERS_PULLED_BY_AM

// Source: NMLivelinessMonitor
EXPIRE: ResourceManager中的组件NMLivelinessMonitor会跟踪每一个NodeManager的心跳信息, 如果一个NodeManager在一
定时间间隔内未汇报心跳,NMLiveliness-Monitor会触发一个EXPIRE事件。

三.属性

  1. private static final RecordFactory recordFactory = RecordFactoryProvider
  2. .getRecordFactory(null);
  3. private final ReadLock readLock;
  4. private final WriteLock writeLock;
  5. private final ConcurrentLinkedQueue<UpdatedContainerInfo> nodeUpdateQueue;
  6. private volatile boolean nextHeartBeat = true;
  7. private final NodeId nodeId;
  8. private final RMContext context;
  9. private final String hostName;
  10. private final int commandPort;
  11. private int httpPort;
  12. private final String nodeAddress; // The containerManager address
  13. private String httpAddress;
  14. /* Snapshot of total resources before receiving decommissioning command */
  15. private volatile Resource originalTotalCapability;
  16. private volatile Resource totalCapability;
  17. private final Node node;
  18. private String healthReport;
  19. private long lastHealthReportTime;
  20. private String nodeManagerVersion;
  21. private Integer decommissioningTimeout;
  22. private long timeStamp;
  23. /* Aggregated resource utilization for the containers. */
  24. private ResourceUtilization containersUtilization;
  25. /* Resource utilization for the node. */
  26. private ResourceUtilization nodeUtilization;
  27. /** Physical resources in the node. */
  28. private volatile Resource physicalResource;
  29. /* Container Queue Information for the node.. Used by Distributed Scheduler */
  30. private OpportunisticContainersStatus opportunisticContainersStatus;
  31. private final ContainerAllocationExpirer containerAllocationExpirer;
  32. /* set of containers that have just launched */
  33. private final Set<ContainerId> launchedContainers =
  34. new HashSet<ContainerId>();
  35. /* track completed container globally */
  36. private final Set<ContainerId> completedContainers =
  37. new HashSet<ContainerId>();
  38. /* set of containers that need to be cleaned */
  39. private final Set<ContainerId> containersToClean = new TreeSet<ContainerId>(
  40. new ContainerIdComparator());
  41. /* set of containers that need to be signaled */
  42. private final List<SignalContainerRequest> containersToSignal =
  43. new ArrayList<SignalContainerRequest>();
  44. /* * set of containers to notify NM to remove them from its context. Currently, * this includes containers that were notified to AM about their completion */
  45. private final Set<ContainerId> containersToBeRemovedFromNM =
  46. new HashSet<ContainerId>();
  47. /* the list of applications that have finished and need to be purged */
  48. private final List<ApplicationId> finishedApplications =
  49. new ArrayList<ApplicationId>();
  50. /* the list of applications that are running on this node */
  51. private final List<ApplicationId> runningApplications =
  52. new ArrayList<ApplicationId>();
  53. private final Map<ContainerId, Container> toBeUpdatedContainers =
  54. new HashMap<>();
  55. // NOTE: This is required for backward compatibility.
  56. private final Map<ContainerId, Container> toBeDecreasedContainers =
  57. new HashMap<>();
  58. private final Map<ContainerId, Container> nmReportedIncreasedContainers =
  59. new HashMap<>();
  60. private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
  61. .newRecordInstance(NodeHeartbeatResponse.class);
  62. // 剩下来个有点长, 是 StateMachineFactory 和 stateMachine

四.构造方法

在这里插入图片描述
代码:

  1. public RMNodeImpl(NodeId nodeId, RMContext context, String hostName,
  2. int cmPort, int httpPort, Node node, Resource capability,
  3. String nodeManagerVersion, Resource physResource) {
  4. this.nodeId = nodeId;
  5. this.context = context;
  6. this.hostName = hostName;
  7. this.commandPort = cmPort;
  8. this.httpPort = httpPort;
  9. this.totalCapability = capability;
  10. this.nodeAddress = hostName + ":" + cmPort;
  11. this.httpAddress = hostName + ":" + httpPort;
  12. this.node = node;
  13. this.healthReport = "Healthy";
  14. this.lastHealthReportTime = System.currentTimeMillis();
  15. this.nodeManagerVersion = nodeManagerVersion;
  16. this.timeStamp = 0;
  17. this.physicalResource = physResource;
  18. this.latestNodeHeartBeatResponse.setResponseId(0);
  19. ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  20. this.readLock = lock.readLock();
  21. this.writeLock = lock.writeLock();
  22. this.stateMachine = stateMachineFactory.make(this);
  23. this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
  24. this.containerAllocationExpirer = context.getContainerAllocationExpirer();
  25. }

五.方法

方法我就不细说了, 基本就是获取Node的一些基础信息

比较特殊的就是handle方法, 但也是根据事件交由状态机处理任务.

  1. public void handle(RMNodeEvent event) {
  2. LOG.debug("Processing " + event.getNodeId() + " of type " + event.getType());
  3. try {
  4. writeLock.lock();
  5. NodeState oldState = getState();
  6. try {
  7. stateMachine.doTransition(event.getType(), event);
  8. } catch (InvalidStateTransitionException e) {
  9. LOG.error("Can't handle this event at current state", e);
  10. LOG.error("Invalid event " + event.getType() +
  11. " on Node " + this.nodeId + " oldState " + oldState);
  12. }
  13. if (oldState != getState()) {
  14. LOG.info(nodeId + " Node Transitioned from " + oldState + " to "
  15. + getState());
  16. }
  17. }
  18. finally {
  19. writeLock.unlock();
  20. }
  21. }

发表评论

表情:
评论列表 (有 0 条评论,198人围观)

还没有评论,来说两句吧...

相关阅读