JobTracker节点对Task实例状态报告的处理
前面谈到过,每一个TaskTracker节点都要在它向JobTracker节点发送心跳包的时候顺带报告运行在其上的Task的状态信息,这些Task是指正在TaskTracker节点上运行的,或者从上一次报告到现在的时间间隔中完成的或者是失败的任务。JobTracker节点之所以需要收集这些正在执行或者刚完成的任务的状态信息,是因为它要及时掌握各个作业的执行进度,一方面将此报告给用户,一方面还调整作业的任务调度,如任务的意外处理,任务/作业的善后处理等。这里说的任务的意外处理包括,对于失败的任务或者是拖后退的任务需要从新安排其它的TaskTracker节点来执行等,详细情况我会在后面再作介绍。
对于JobTracker节点处理TaskTracker发送过来的Task状态报告的详细过程,我将结合源代码来阐述。但在此之前,我想先介绍一下相关的几个概念:作业/作业实例、任务/任务实例。作业指的是用户提交的作业,用Job的实例对象来表示,作业实例指的是Job运行的状态信息,用JobInProgress的实例对象来表示;任务指的是该任务的执行进度状态信息,用TaskInProgress的实例对象来表示,任务实例则指的是在将该任务交由某一个TaskTacker节点执行的任务,用TaskAttempID的实例对象来表示。一个作业对应一个作业实例,而一个任务可能对应多个任务实例,这是因为一个任务可能交给了多个TaskTracker节点来执行。JobTracker节点处理Task实例状态报告的源码如下:
void updateTaskStatuses(TaskTrackerStatus status) {
String trackerName = status.getTrackerName();
for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(trackerName);
TaskAttemptID taskId = report.getTaskID();
// expire it
expireLaunchingTasks.removeTask(taskId);
JobInProgress job = getJob(taskId.getJobID());
if (job == null) {
// if job is not there in the cleanup list ... add it
synchronized (trackerToJobsToCleanup) {
Set<JobID> jobs = trackerToJobsToCleanup.get(trackerName);
if (jobs == null) {
jobs = new HashSet<JobID>();
trackerToJobsToCleanup.put(trackerName, jobs);
}
jobs.add(taskId.getJobID());
}
continue;
}
if (!job.inited()) {
// if job is not yet initialized ... kill the attempt
synchronized (trackerToTasksToCleanup) {
Set<TaskAttemptID> tasks = trackerToTasksToCleanup.get(trackerName);
if (tasks == null) {
tasks = new HashSet<TaskAttemptID>();
trackerToTasksToCleanup.put(trackerName, tasks);
}
tasks.add(taskId);
}
continue;
}
TaskInProgress tip = taskidToTIPMap.get(taskId);
// Check if the tip is known to the jobtracker. In case of a restarted
// jt, some tasks might join in later
if (tip != null || hasRestarted()) {
if (tip == null) {
tip = job.getTaskInProgress(taskId.getTaskID());
job.addRunningTaskToTIP(tip, taskId, status, false);
}
// Update the job and inform the listeners if necessary
JobStatus prevStatus = (JobStatus)job.getStatus().clone();
// Clone TaskStatus object here, because JobInProgress
// or TaskInProgress can modify this object and
// the changes should not get reflected in TaskTrackerStatus.
// An old TaskTrackerStatus is used later in countMapTasks, etc.
job.updateTaskStatus(tip, (TaskStatus)report.clone());
JobStatus newStatus = (JobStatus)job.getStatus().clone();
// Update the listeners if an incomplete job completes
if (prevStatus.getRunState() != newStatus.getRunState()) {
JobStatusChangeEvent event = new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, newStatus);
updateJobInProgressListeners(event);
}
} else {
LOG.info("Serious problem. While updating status, cannot find taskid " + report.getTaskID());
}
// Process 'failed fetch' notifications
List<TaskAttemptID> failedFetchMaps = report.getFetchFailedMaps();
if (failedFetchMaps != null) {
for (TaskAttemptID mapTaskId : failedFetchMaps) {
TaskInProgress failedFetchMap = taskidToTIPMap.get(mapTaskId);
if (failedFetchMap != null) {
// Gather information about the map which has to be failed, if need be
String failedFetchTrackerName = getAssignedTracker(mapTaskId);
if (failedFetchTrackerName == null) {
failedFetchTrackerName = "Lost task tracker";
}
failedFetchMap.getJob().fetchFailureNotification(failedFetchMap, mapTaskId, failedFetchTrackerName);
}
}
}
}
}
从上面的源代码可以看出,JobTracker节点对于每一个Task状态报告的处理考虑到了一些意外情况,这些意外情况主要包括:
1).Task实例所属的作业不在JobTracker节点的任务队列内。出现这个情况的可能原因是这个Task实例是一个拖后退的任务,当它完成时,该Task实例所属的Job早已被完成了,或者该Job被用户kill掉了。对于这种情况,JobTracker节点就直接让该TaskTracker把运行在其上的所有属于该Job的Task全部清除(这个TaskTracker节点上可能还运行有该Job的拖后腿任务)。
2).Task实例所属的作业还没有初始化(以前说过,没有被初始化的Job是不能被JobTracker节点调度执行的)。出现这种情况的一个可能的原因是TaskTracker节点刚刚重启过,在恢复作业的时候还没有来得及初始化该Job。对于这种情况,JobTracker节点就直接让该TaskTracker清除该任务。
3).Task实例所属的任务还没有被JobTracker节点调度执行。出现这种情况的一个可能的原因是JobTracker节点在该Task实例执行期间重启了,重启之后还没有来得及调度该任务。对于这种情况,JobTracker节点为了提高性能就认为该情况是还算是正常的,不过得先得通知该Task实例所属的Job实例。
4).对于Reduce Task实例的进度状态报告,它还会附带额外的报告信息,就是报告那些它无法fetch到map输出的Map任务,JobTracker节点需要将这些消息转告给该Task实例所属的作业实例,以便该Job实例能够及时调整其内部任务的调度。至于Job实例是如何处理的,并不是本文所要介绍的重点,不过我会在以后的博文中详细阐述。
在正常的情况下,JobTracker节点不会直接的处理这些Task实例的状态报告,而是将其交给它们所属的Job实例来处理,Job实例处理之后如果它的状态发生变化,JobTracker节点会通知对应的事件监听器。那么,Job实例又是如何根据它的一个Task实例状态来更新自己当前的状态呢?由于这个过程相当的复杂不适合于此时介绍。
还没有评论,来说两句吧...