JobTracker节点对Task实例状态报告的处理

今天药忘吃喽~ 2022-06-13 01:58 204阅读 0赞
  1. 前面谈到过,每一个TaskTracker节点都要在它向JobTracker节点发送心跳包的时候顺带报告运行在其上的Task的状态信息,这些Task是指正在TaskTracker节点上运行的,或者从上一次报告到现在的时间间隔中完成的或者是失败的任务。JobTracker节点之所以需要收集这些正在执行或者刚完成的任务的状态信息,是因为它要及时掌握各个作业的执行进度,一方面将此报告给用户,一方面还调整作业的任务调度,如任务的意外处理,任务/作业的善后处理等。这里说的任务的意外处理包括,对于失败的任务或者是拖后退的任务需要从新安排其它的TaskTracker节点来执行等,详细情况我会在后面再作介绍。
  2. 对于JobTracker节点处理TaskTracker发送过来的Task状态报告的详细过程,我将结合源代码来阐述。但在此之前,我想先介绍一下相关的几个概念:作业/作业实例、任务/任务实例。作业指的是用户提交的作业,用Job的实例对象来表示,作业实例指的是Job运行的状态信息,用JobInProgress的实例对象来表示;任务指的是该任务的执行进度状态信息,用TaskInProgress的实例对象来表示,任务实例则指的是在将该任务交由某一个TaskTacker节点执行的任务,用TaskAttempID的实例对象来表示。一个作业对应一个作业实例,而一个任务可能对应多个任务实例,这是因为一个任务可能交给了多个TaskTracker节点来执行。JobTracker节点处理Task实例状态报告的源码如下:
  3. void updateTaskStatuses(TaskTrackerStatus status) {
  4. String trackerName = status.getTrackerName();
  5. for (TaskStatus report : status.getTaskReports()) {
  6. report.setTaskTracker(trackerName);
  7. TaskAttemptID taskId = report.getTaskID();
  8. // expire it
  9. expireLaunchingTasks.removeTask(taskId);
  10. JobInProgress job = getJob(taskId.getJobID());
  11. if (job == null) {
  12. // if job is not there in the cleanup list ... add it
  13. synchronized (trackerToJobsToCleanup) {
  14. Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
  15. if (jobs == null) {
  16. jobs = new HashSet<JobID>();
  17. trackerToJobsToCleanup.put(trackerName, jobs);
  18. }
  19. jobs.add(taskId.getJobID());
  20. }
  21. continue;
  22. }
  23. if (!job.inited()) {
  24. // if job is not yet initialized ... kill the attempt
  25. synchronized (trackerToTasksToCleanup) {
  26. Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
  27. if (tasks == null) {
  28. tasks = new HashSet<TaskAttemptID>();
  29. trackerToTasksToCleanup.put(trackerName, tasks);
  30. }
  31. tasks.add(taskId);
  32. }
  33. continue;
  34. }
  35. TaskInProgress tip = taskidToTIPMap.get(taskId);
  36. // Check if the tip is known to the jobtracker. In case of a restarted
  37. // jt, some tasks might join in later
  38. if (tip != null || hasRestarted()) {
  39. if (tip == null) {
  40. tip = job.getTaskInProgress(taskId.getTaskID());
  41. job.addRunningTaskToTIP(tip, taskId, status, false);
  42. }
  43. // Update the job and inform the listeners if necessary
  44. JobStatus prevStatus = (JobStatus)job.getStatus().clone();
  45. // Clone TaskStatus object here, because JobInProgress
  46. // or TaskInProgress can modify this object and
  47. // the changes should not get reflected in TaskTrackerStatus.
  48. // An old TaskTrackerStatus is used later in countMapTasks, etc.
  49. job.updateTaskStatus(tip, (TaskStatus)report.clone());
  50. JobStatus newStatus = (JobStatus)job.getStatus().clone();
  51. // Update the listeners if an incomplete job completes
  52. if (prevStatus.getRunState() != newStatus.getRunState()) {
  53. JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
  54. updateJobInProgressListeners(event);
  55. }
  56. } else {
  57. LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
  58. }
  59. // Process 'failed fetch' notifications
  60. List<TaskAttemptID> failedFetchMaps = report.getFetchFailedMaps();
  61. if (failedFetchMaps != null) {
  62. for (TaskAttemptID mapTaskId : failedFetchMaps) {
  63. TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId);
  64. if (failedFetchMap != null) {
  65. // Gather information about the map which has to be failed, if need be
  66. String failedFetchTrackerName = getAssignedTracker(mapTaskId);
  67. if (failedFetchTrackerName == null) {
  68. failedFetchTrackerName = "Lost task tracker";
  69. }
  70. failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, mapTaskId, failedFetchTrackerName);
  71. }
  72. }
  73. }
  74. }
  75. }
  76. 从上面的源代码可以看出,JobTracker节点对于每一个Task状态报告的处理考虑到了一些意外情况,这些意外情况主要包括:
  77. 1).Task实例所属的作业不在JobTracker节点的任务队列内。出现这个情况的可能原因是这个Task实例是一个拖后退的任务,当它完成时,该Task实例所属的Job早已被完成了,或者该Job被用户kill掉了。对于这种情况,JobTracker节点就直接让该TaskTracker把运行在其上的所有属于该JobTask全部清除(这个TaskTracker节点上可能还运行有该Job的拖后腿任务)。
  78. 2).Task实例所属的作业还没有初始化(以前说过,没有被初始化的Job是不能被JobTracker节点调度执行的)。出现这种情况的一个可能的原因是TaskTracker节点刚刚重启过,在恢复作业的时候还没有来得及初始化该Job。对于这种情况,JobTracker节点就直接让该TaskTracker清除该任务。

3).Task实例所属的任务还没有被JobTracker节点调度执行。出现这种情况的一个可能的原因是JobTracker节点在该Task实例执行期间重启了,重启之后还没有来得及调度该任务。对于这种情况,JobTracker节点为了提高性能就认为该情况是还算是正常的,不过得先得通知该Task实例所属的Job实例。

  1. 4).对于Reduce Task实例的进度状态报告,它还会附带额外的报告信息,就是报告那些它无法fetchmap输出的Map任务,JobTracker节点需要将这些消息转告给该Task实例所属的作业实例,以便该Job实例能够及时调整其内部任务的调度。至于Job实例是如何处理的,并不是本文所要介绍的重点,不过我会在以后的博文中详细阐述。
  2. 在正常的情况下,JobTracker节点不会直接的处理这些Task实例的状态报告,而是将其交给它们所属的Job实例来处理,Job实例处理之后如果它的状态发生变化,JobTracker节点会通知对应的事件监听器。那么,Job实例又是如何根据它的一个Task实例状态来更新自己当前的状态呢?由于这个过程相当的复杂不适合于此时介绍。

发表评论

表情:
评论列表 (有 0 条评论,204人围观)

还没有评论,来说两句吧...

相关阅读