博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
JobControl 的实现原理
阅读量:5088 次
发布时间:2019-06-13

本文共 35725 字,大约阅读时间需要 119 分钟。

本文地址:,转载请注明源地址。

引入实例:贝叶斯分类

贝叶斯分类是一种利用概率统计知识进行分类的统计学分类方法。该方法包括两个步骤:训练样本和分类。

其实现由多个MapReduce 作业完成,如图所示。其中,训练样本可由三个 MapReduce 作业实现:

第一个作业(ExtractJob)抽取文档特征,该作业只需要 Map 即可完成 ;

第二个作业(ClassPriorJob)计算类别的先验概率,即统计每个类别中文档的数目,并计算类别概率;

第三个作业(ConditionalProbilityJob)计算单词的条件概率,即统计<label, word> 在所有文档中出现的次数并计算单词的条件概率。

后两个作业的具体实现类似于WordCount。分类过程由一个作业(PredictJob)完成。该作业的 map()函数计算每个待分类文档属于每个类别的概率,reduce() 函数找出每个文档概率最高的类别,并输出 <docid, label>( 编号为 docid 的文档属于类别 label)。


一个完整的贝叶斯分类算法可能需要 4 个有依赖关系的 MapReduce 作业完成,传统的做法是:为每个作业创建相应的 JobConf 对象,并按照依赖关系依次(串行)提交各个作业,如下所示:

// 为 4 个作业分别创建 JobConf 对象JobConf extractJobConf = new JobConf(ExtractJob.class);JobConf classPriorJobConf = new JobConf(ClassPriorJob.class);JobConf conditionalProbilityJobConf = new JobConf(ConditionalProbilityJob. class) ;JobConf predictJobConf = new JobConf(PredictJob.class);...// 配置各个 JobConf// 按照依赖关系依次提交作业JobClient.runJob(extractJobConf);JobClient.runJob(classPriorJobConf);JobClient.runJob(conditionalProbilityJobConf);JobClient.runJob(predictJobConf);

如果使用 JobControl,则用户只需使用 addDepending() 函数添加作业依赖关系接口,JobControl 会按照依赖关系调度各个作业,具体代码如下:

Configuration extractJobConf = new Configuration();Configuration classPriorJobConf = new Configuration();Configuration conditionalProbilityJobConf = new Configuration();Configuration predictJobConf = new Configuration();...// 设置各个Configuration// 创建Job对象。注意,JobControl要求作业必须封装成Job对象Job extractJob = new Job(extractJobConf);Job classPriorJob = new Job(classPriorJobConf);Job conditionalProbilityJob = new Job(conditionalProbilityJobConf);Job predictJob = new Job(predictJobConf);//设置依赖关系,构造一个DAG作业classPriorJob.addDepending(extractJob);conditionalProbilityJob.addDepending(extractJob);predictJob.addDepending(classPriorJob);predictJob.addDepending(conditionalProbilityJob);//创建JobControl对象,由它对作业进行监控和调度JobControl JC = new JobControl("Native Bayes");JC.addJob(extractJob);//把4个作业加入JobControl中JC.addJob(classPriorJob);JC.addJob(conditionalProbilityJob);JC.addJob(predictJob);JC.run(); //提交DAG作业

在实际运行过程中,不依赖于其他任何作业的 extractJob 会优先得到调度,一旦运行完成,classPriorJob 和 conditionalProbilityJob 两个作业同时被调度,待它们全部运行完成后,predictJob 被调度。对比以上两种方案,可以得到一个简单的结论:使用 JobControl 编写 DAG 作业更加简便,且能使多个无依赖关系的作业并行运行。

JobControl 设计原理分析

JobControl 由两个类组成:Job 和 JobControl。其中,Job 类封装了一个 MapReduce 作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态,以此更新自己的状态,其状态转移图如图所示。作业刚开始处于 WAITING 状态。如果没有依赖作业或者所有依赖作业均已运行完成,则进入READY 状态。一旦进入 READY 状态,则作业可被提交到 Hadoop 集群上运行,并进入 RUNNING 状态。在 RUNNING 状态下,根据作业运行情况,可能进入 SUCCESS 或者 FAILED 状态。需要注意的是,如果一个作业的依赖作业失败,则该作业也会失败,于是形成“多米诺骨牌效应”, 后续所有作业均会失败。


JobControl 封装了一系列 MapReduce 作业及其对应的依赖关系。 它将处于不同状态的作业放入不同的哈希表中,并按照图所示的状态转移作业,直到所有作业运行完成。在实现的时候,JobControl 包含一个线程用于周期性地监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于 READY 状态的作业等。同时,它还提供了一些API 用于挂起、恢复和暂停该线程。

Job类深入剖析

在Job类的起始部分,定义了一些数据域,包括job所处的状态,以及其他相关的信息,具体代码如下:

import java.util.ArrayList;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobID;import org.apache.hadoop.mapred.jobcontrol.Job;  // 一个 job 将处于如下的一种状态  final public static int SUCCESS = 0;    //成功  final public static int WAITING = 1;     //警告  final public static int RUNNING = 2;    //运行  final public static int READY = 3;    //准备  final public static int FAILED = 4;    //失败  final public static int DEPENDENT_FAILED = 5;    //依赖的作业失败      private JobConf theJobConf;  private int state;  private String jobID;         // 通过JobControl class分配和使用  private JobID mapredJobID;    // 通过map/reduce分配的job ID  private String jobName;        // 外部名字, 通过client app分配/使用  private String message;        // 一些有用的信息例如用户消耗,   // e.g. job失败的原因  private ArrayList
dependingJobs; // 当前job所依赖的jobs列表 private JobClient jc = null; // map reduce job client

接着定义了两个构造函数:

/**    * Construct a job.   * @param jobConf a mapred job configuration representing a job to be executed.   * @param dependingJobs an array of jobs the current job depends on   */  public Job(JobConf jobConf, ArrayList
dependingJobs) throws IOException { this.theJobConf = jobConf; this.dependingJobs = dependingJobs; this.state = Job.WAITING; this.jobID = "unassigned"; this.mapredJobID = null; //not yet assigned this.jobName = "unassigned"; this.message = "just initialized"; this.jc = new JobClient(jobConf); } /** * Construct a job. * * @param jobConf mapred job configuration representing a job to be executed. * @throws IOException */ public Job(JobConf jobConf) throws IOException { this(jobConf, null); }

接着重写了String类中的toString方法,代码如下:

@Override  public String toString() {    StringBuffer sb = new StringBuffer();    sb.append("job name:\t").append(this.jobName).append("\n");    sb.append("job id:\t").append(this.jobID).append("\n");    sb.append("job state:\t").append(this.state).append("\n");    sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned"         : this.mapredJobID).append("\n");    sb.append("job message:\t").append(this.message).append("\n");            if (this.dependingJobs == null || this.dependingJobs.size() == 0) {      sb.append("job has no depending job:\t").append("\n");    } else {      sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");      for (int i = 0; i < this.dependingJobs.size(); i++) {        sb.append("\t depending job ").append(i).append(":\t");        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");      }    }    return sb.toString();  }
toString

接下来是一长串的get/set获取设置属性的代码: 

/**   * @return the job name of this job   */  public String getJobName() {    return this.jobName;  }      /**   * Set the job name for  this job.   * @param jobName the job name   */  public void setJobName(String jobName) {    this.jobName = jobName;  }      /**   * @return the job ID of this job assigned by JobControl   */  public String getJobID() {    return this.jobID;  }      /**   * Set the job ID for  this job.   * @param id the job ID   */  public void setJobID(String id) {    this.jobID = id;  }      /**   * @return the mapred ID of this job   * @deprecated use {
@link #getAssignedJobID()} instead */ @Deprecated public String getMapredJobID() { return this.mapredJobID.toString(); } /** * Set the mapred ID for this job. * @param mapredJobID the mapred job ID for this job. * @deprecated use {
@link #setAssignedJobID(JobID)} instead */ @Deprecated public void setMapredJobID(String mapredJobID) { this.mapredJobID = JobID.forName(mapredJobID); } /** * @return the mapred ID of this job as assigned by the * mapred framework. */ public JobID getAssignedJobID() { return this.mapredJobID; } /** * Set the mapred ID for this job as assigned by the * mapred framework. * @param mapredJobID the mapred job ID for this job. */ public void setAssignedJobID(JobID mapredJobID) { this.mapredJobID = mapredJobID; } /** * @return the mapred job conf of this job */ public JobConf getJobConf() { return this.theJobConf; } /** * Set the mapred job conf for this job. * @param jobConf the mapred job conf for this job. */ public void setJobConf(JobConf jobConf) { this.theJobConf = jobConf; } /** * @return the state of this job */ public synchronized int getState() { return this.state; } /** * Set the state for this job. * @param state the new state for this job. */ protected synchronized void setState(int state) { this.state = state; } /** * @return the message of this job */ public String getMessage() { return this.message; } /** * Set the message for this job. * @param message the message for this job. */ public void setMessage(String message) { this.message = message; } /** * @return the job client of this job */ public JobClient getJobClient(){ return this.jc; } /** * @return the depending jobs of this job */ public ArrayList
getDependingJobs() { return this.dependingJobs; }
get/set

当Job处于writing状态下的时候,可以向依赖列表中添加所依赖的Job:

/**   * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job    * is waiting to run, not during or afterwards.   *    * @param dependingJob Job that this Job depends on.   * @return true if the Job was added.   */  public synchronized boolean addDependingJob(Job dependingJob) {    if (this.state == Job.WAITING) { //only allowed to add jobs when waiting      if (this.dependingJobs == null) {        this.dependingJobs = new ArrayList
(); } return this.dependingJobs.add(dependingJob); } else { return false; } }

还提供了是否处于完成状态和是否处于准备状态的判断方法:

/**   * @return true if this job is in a complete state   */  public boolean isCompleted() {    return this.state == Job.FAILED ||       this.state == Job.DEPENDENT_FAILED ||      this.state == Job.SUCCESS;  }      /**   * @return true if this job is in READY state   */  public boolean isReady() {    return this.state == Job.READY;  }

提供了检查正在运行的Job的状态,如果完成,判断是成功还是失败,代码如下:

/**   * Check the state of this running job. The state may    * remain the same, become SUCCESS or FAILED.   */  private void checkRunningState() {    RunningJob running = null;    try {      running = jc.getJob(this.mapredJobID);      if (running.isComplete()) {        if (running.isSuccessful()) {          this.state = Job.SUCCESS;        } else {          this.state = Job.FAILED;          this.message = "Job failed!";          try {            running.killJob();          } catch (IOException e1) {          }          try {            this.jc.close();          } catch (IOException e2) {          }        }      }    } catch (IOException ioe) {      this.state = Job.FAILED;      this.message = StringUtils.stringifyException(ioe);      try {        if (running != null)          running.killJob();      } catch (IOException e1) {      }      try {        this.jc.close();      } catch (IOException e1) {      }    }  }

实现了检查并更新Job的状态的checkState()方法:

/**   * Check and update the state of this job. The state changes     * depending on its current state and the states of the depending jobs.   */   synchronized int checkState() {    if (this.state == Job.RUNNING) {      checkRunningState();    }    if (this.state != Job.WAITING) {      return this.state;    }    if (this.dependingJobs == null || this.dependingJobs.size() == 0) {      this.state = Job.READY;      return this.state;    }    Job pred = null;    int n = this.dependingJobs.size();    for (int i = 0; i < n; i++) {      pred = this.dependingJobs.get(i);      int s = pred.checkState();      if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {        break; // a pred is still not completed, continue in WAITING        // state      }      if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) {        this.state = Job.DEPENDENT_FAILED;        this.message = "depending job " + i + " with jobID "          + pred.getJobID() + " failed. " + pred.getMessage();        break;      }      // pred must be in success state      if (i == n - 1) {        this.state = Job.READY;      }    }    return this.state;  }

最后包含提交Job的方法submit(),代码如下:

/**   * Submit this job to mapred. The state becomes RUNNING if submission    * is successful, FAILED otherwise.     */  protected synchronized void submit() {    try {      if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) {        FileSystem fs = FileSystem.get(theJobConf);        Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf);        for (int i = 0; i < inputPaths.length; i++) {          if (!fs.exists(inputPaths[i])) {            try {              fs.mkdirs(inputPaths[i]);            } catch (IOException e) {            }          }        }      }      RunningJob running = jc.submitJob(theJobConf);      this.mapredJobID = running.getID();      this.state = Job.RUNNING;    } catch (IOException ioe) {      this.state = Job.FAILED;      this.message = StringUtils.stringifyException(ioe);    }  }    }

完整的Job类源代码如下:

package org.apache.hadoop.mapred.jobcontrol;import java.io.IOException;import java.util.ArrayList;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.mapred.FileInputFormat;import org.apache.hadoop.mapred.JobClient;import org.apache.hadoop.mapred.JobConf;import org.apache.hadoop.mapred.JobID;import org.apache.hadoop.mapred.RunningJob;import org.apache.hadoop.util.StringUtils;/** This class encapsulates a MapReduce job and its dependency. It monitors  *  the states of the depending jobs and updates the state of this job. *  A job starts in the WAITING state. If it does not have any depending jobs, or *  all of the depending jobs are in SUCCESS state, then the job state will become *  READY. If any depending jobs fail, the job will fail too.  *  When in READY state, the job can be submitted to Hadoop for execution, with *  the state changing into RUNNING state. From RUNNING state, the job can get into  *  SUCCESS or FAILED state, depending the status of the job execution. *   */public class Job {  // A job will be in one of the following states  final public static int SUCCESS = 0;  final public static int WAITING = 1;  final public static int RUNNING = 2;  final public static int READY = 3;  final public static int FAILED = 4;  final public static int DEPENDENT_FAILED = 5;          private JobConf theJobConf;  private int state;  private String jobID;         // assigned and used by JobControl class  private JobID mapredJobID; // the job ID assigned by map/reduce  private String jobName;        // external name, assigned/used by client app  private String message;        // some info for human consumption,   // e.g. the reason why the job failed  private ArrayList
dependingJobs; // the jobs the current job depends on private JobClient jc = null; // the map reduce job client /** * Construct a job. * @param jobConf a mapred job configuration representing a job to be executed. * @param dependingJobs an array of jobs the current job depends on */ public Job(JobConf jobConf, ArrayList
dependingJobs) throws IOException { this.theJobConf = jobConf; this.dependingJobs = dependingJobs; this.state = Job.WAITING; this.jobID = "unassigned"; this.mapredJobID = null; //not yet assigned this.jobName = "unassigned"; this.message = "just initialized"; this.jc = new JobClient(jobConf); } /** * Construct a job. * * @param jobConf mapred job configuration representing a job to be executed. * @throws IOException */ public Job(JobConf jobConf) throws IOException { this(jobConf, null); } @Override public String toString() { StringBuffer sb = new StringBuffer(); sb.append("job name:\t").append(this.jobName).append("\n"); sb.append("job id:\t").append(this.jobID).append("\n"); sb.append("job state:\t").append(this.state).append("\n"); sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" : this.mapredJobID).append("\n"); sb.append("job message:\t").append(this.message).append("\n"); if (this.dependingJobs == null || this.dependingJobs.size() == 0) { sb.append("job has no depending job:\t").append("\n"); } else { sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n"); for (int i = 0; i < this.dependingJobs.size(); i++) { sb.append("\t depending job ").append(i).append(":\t"); sb.append((this.dependingJobs.get(i)).getJobName()).append("\n"); } } return sb.toString(); } /** * @return the job name of this job */ public String getJobName() { return this.jobName; } /** * Set the job name for this job. * @param jobName the job name */ public void setJobName(String jobName) { this.jobName = jobName; } /** * @return the job ID of this job assigned by JobControl */ public String getJobID() { return this.jobID; } /** * Set the job ID for this job. * @param id the job ID */ public void setJobID(String id) { this.jobID = id; } /** * @return the mapred ID of this job * @deprecated use {
@link #getAssignedJobID()} instead */ @Deprecated public String getMapredJobID() { return this.mapredJobID.toString(); } /** * Set the mapred ID for this job. * @param mapredJobID the mapred job ID for this job. * @deprecated use {
@link #setAssignedJobID(JobID)} instead */ @Deprecated public void setMapredJobID(String mapredJobID) { this.mapredJobID = JobID.forName(mapredJobID); } /** * @return the mapred ID of this job as assigned by the * mapred framework. */ public JobID getAssignedJobID() { return this.mapredJobID; } /** * Set the mapred ID for this job as assigned by the * mapred framework. * @param mapredJobID the mapred job ID for this job. */ public void setAssignedJobID(JobID mapredJobID) { this.mapredJobID = mapredJobID; } /** * @return the mapred job conf of this job */ public JobConf getJobConf() { return this.theJobConf; } /** * Set the mapred job conf for this job. * @param jobConf the mapred job conf for this job. */ public void setJobConf(JobConf jobConf) { this.theJobConf = jobConf; } /** * @return the state of this job */ public synchronized int getState() { return this.state; } /** * Set the state for this job. * @param state the new state for this job. */ protected synchronized void setState(int state) { this.state = state; } /** * @return the message of this job */ public String getMessage() { return this.message; } /** * Set the message for this job. * @param message the message for this job. */ public void setMessage(String message) { this.message = message; } /** * @return the job client of this job */ public JobClient getJobClient(){ return this.jc; } /** * @return the depending jobs of this job */ public ArrayList
getDependingJobs() { return this.dependingJobs; } /** * Add a job to this jobs' dependency list. Dependent jobs can only be added while a Job * is waiting to run, not during or afterwards. * * @param dependingJob Job that this Job depends on. * @return
true if the Job was added. */ public synchronized boolean addDependingJob(Job dependingJob) { if (this.state == Job.WAITING) { //only allowed to add jobs when waiting if (this.dependingJobs == null) { this.dependingJobs = new ArrayList
(); } return this.dependingJobs.add(dependingJob); } else { return false; } } /** * @return true if this job is in a complete state */ public boolean isCompleted() { return this.state == Job.FAILED || this.state == Job.DEPENDENT_FAILED || this.state == Job.SUCCESS; } /** * @return true if this job is in READY state */ public boolean isReady() { return this.state == Job.READY; } /** * Check the state of this running job. The state may * remain the same, become SUCCESS or FAILED. */ private void checkRunningState() { RunningJob running = null; try { running = jc.getJob(this.mapredJobID); if (running.isComplete()) { if (running.isSuccessful()) { this.state = Job.SUCCESS; } else { this.state = Job.FAILED; this.message = "Job failed!"; try { running.killJob(); } catch (IOException e1) { } try { this.jc.close(); } catch (IOException e2) { } } } } catch (IOException ioe) { this.state = Job.FAILED; this.message = StringUtils.stringifyException(ioe); try { if (running != null) running.killJob(); } catch (IOException e1) { } try { this.jc.close(); } catch (IOException e1) { } } } /** * Check and update the state of this job. The state changes * depending on its current state and the states of the depending jobs. */ synchronized int checkState() { if (this.state == Job.RUNNING) { checkRunningState(); } if (this.state != Job.WAITING) { return this.state; } if (this.dependingJobs == null || this.dependingJobs.size() == 0) { this.state = Job.READY; return this.state; } Job pred = null; int n = this.dependingJobs.size(); for (int i = 0; i < n; i++) { pred = this.dependingJobs.get(i); int s = pred.checkState(); if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) { break; // a pred is still not completed, continue in WAITING // state } if (s == Job.FAILED || s == Job.DEPENDENT_FAILED) { this.state = Job.DEPENDENT_FAILED; this.message = "depending job " + i + " with jobID " + pred.getJobID() + " failed. " + pred.getMessage(); break; } // pred must be in success state if (i == n - 1) { this.state = Job.READY; } } return this.state; } /** * Submit this job to mapred. The state becomes RUNNING if submission * is successful, FAILED otherwise. */ protected synchronized void submit() { try { if (theJobConf.getBoolean("create.empty.dir.if.nonexist", false)) { FileSystem fs = FileSystem.get(theJobConf); Path inputPaths[] = FileInputFormat.getInputPaths(theJobConf); for (int i = 0; i < inputPaths.length; i++) { if (!fs.exists(inputPaths[i])) { try { fs.mkdirs(inputPaths[i]); } catch (IOException e) { } } } } RunningJob running = jc.submitJob(theJobConf); this.mapredJobID = running.getID(); this.state = Job.RUNNING; } catch (IOException ioe) { this.state = Job.FAILED; this.message = StringUtils.stringifyException(ioe); } } }
Job

JobControl类深入剖析

在JobControl类的起始部分,定义了一些数据域,包括线程所处的状态,以及其他相关的信息,具体代码如下:

// The thread can be in one of the following state  private static final int RUNNING = 0;  private static final int SUSPENDED = 1;  private static final int STOPPED = 2;  private static final int STOPPING = 3;  private static final int READY = 4;      private int runnerState;            // the thread state      private Map
waitingJobs; private Map
readyJobs; private Map
runningJobs; private Map
successfulJobs; private Map
failedJobs; private long nextJobID; private String groupName;

接下来是对应的构造函数:

/**    * Construct a job control for a group of jobs.   * @param groupName a name identifying this group   */  public JobControl(String groupName) {    this.waitingJobs = new Hashtable
(); this.readyJobs = new Hashtable
(); this.runningJobs = new Hashtable
(); this.successfulJobs = new Hashtable
(); this.failedJobs = new Hashtable
(); this.nextJobID = -1; this.groupName = groupName; this.runnerState = JobControl.READY; }

接着是一个将Map的Jobs转换为ArrayList的转换方法(toArrayList),代码如下:

private static ArrayList
toArrayList(Map
jobs) { ArrayList
retv = new ArrayList
(); synchronized (jobs) { for (Job job : jobs.values()) { retv.add(job); } } return retv;}

类中当然少不了一些get方法:

/**   * @return the jobs in the success state   */  public ArrayList
getSuccessfulJobs() { return JobControl.toArrayList(this.successfulJobs); } public ArrayList
getFailedJobs() { return JobControl.toArrayList(this.failedJobs); } private String getNextJobID() { nextJobID += 1; return this.groupName + this.nextJobID; }

类中还有将Job插入Job队列的方法:

private static void addToQueue(Job aJob, Map
queue) { synchronized(queue) { queue.put(aJob.getJobID(), aJob); } } private void addToQueue(Job aJob) { Map
queue = getQueue(aJob.getState()); addToQueue(aJob, queue); }

既然有插入队列,就有从Job队列根据Job运行状态而取出的方法,代码如下:

private Map
getQueue(int state) { Map
retv = null; if (state == Job.WAITING) { retv = this.waitingJobs; } else if (state == Job.READY) { retv = this.readyJobs; } else if (state == Job.RUNNING) { retv = this.runningJobs; } else if (state == Job.SUCCESS) { retv = this.successfulJobs; } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) { retv = this.failedJobs; } return retv; }

添加一个新的Job的方法:

/**   * Add a new job.   * @param aJob the new job   */  synchronized public String addJob(Job aJob) {    String id = this.getNextJobID();    aJob.setJobID(id);    aJob.setState(Job.WAITING);    this.addToQueue(aJob);    return id;      }      /**   * Add a collection of jobs   *    * @param jobs   */  public void addJobs(Collection
jobs) { for (Job job : jobs) { addJob(job); } }

获取线程的状态,设置、停止线程的方法:

/**   * @return the thread state   */  public int getState() {    return this.runnerState;  }      /**   * set the thread state to STOPPING so that the    * thread will stop when it wakes up.   */  public void stop() {    this.runnerState = JobControl.STOPPING;  }      /**   * suspend the running thread   */  public void suspend () {    if (this.runnerState == JobControl.RUNNING) {      this.runnerState = JobControl.SUSPENDED;    }  }      /**   * resume the suspended thread   */  public void resume () {    if (this.runnerState == JobControl.SUSPENDED) {      this.runnerState = JobControl.RUNNING;    }  }

检查运行、等待的Jobs,将符合条件的添加至相应的队列: 

synchronized private void checkRunningJobs() {    Map
oldJobs = null; oldJobs = this.runningJobs; this.runningJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.RUNNING) { System.out.println("The state of the running job " + nextJob.getJobName() + " has changed to: " + nextJob.getState()); } */ this.addToQueue(nextJob); } } synchronized private void checkWaitingJobs() { Map
oldJobs = null; oldJobs = this.waitingJobs; this.waitingJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.WAITING) { System.out.println("The state of the waiting job " + nextJob.getJobName() + " has changed to: " + nextJob.getState()); } */ this.addToQueue(nextJob); } } synchronized private void startReadyJobs() { Map
oldJobs = null; oldJobs = this.readyJobs; this.readyJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName()); nextJob.submit(); //System.out.println("Hadoop ID: " + nextJob.getMapredJobID()); this.addToQueue(nextJob); } }

判断是否所有的JOb都结束的方法:

synchronized public boolean allFinished() {    return this.waitingJobs.size() == 0 &&      this.readyJobs.size() == 0 &&      this.runningJobs.size() == 0;  }

检查运行Jobs的状态、更新等待Job状态、在准备状态下提交的Run方法:

/**   *  The main loop for the thread.   *  The loop does the following:   *      Check the states of the running jobs   *      Update the states of waiting jobs   *      Submit the jobs in ready state   */  public void run() {    this.runnerState = JobControl.RUNNING;    while (true) {      while (this.runnerState == JobControl.SUSPENDED) {        try {          Thread.sleep(5000);        }        catch (Exception e) {                            }      }      checkRunningJobs();          checkWaitingJobs();              startReadyJobs();              if (this.runnerState != JobControl.RUNNING &&           this.runnerState != JobControl.SUSPENDED) {        break;      }      try {        Thread.sleep(5000);      }      catch (Exception e) {                      }      if (this.runnerState != JobControl.RUNNING &&           this.runnerState != JobControl.SUSPENDED) {        break;      }    }    this.runnerState = JobControl.STOPPED;  }}

完整的JobControl类:

/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements.  See the NOTICE file * distributed with this work for additional information * regarding copyright ownership.  The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License.  You may obtain a copy of the License at * *     http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */package org.apache.hadoop.mapred.jobcontrol;import java.util.ArrayList;import java.util.Collection;import java.util.Hashtable;import java.util.Map;/** This class encapsulates a set of MapReduce jobs and its dependency. It tracks  *  the states of the jobs by placing them into different tables according to their  *  states.  *   *  This class provides APIs for the client app to add a job to the group and to get  *  the jobs in the group in different states. When a  *  job is added, an ID unique to the group is assigned to the job.  *   *  This class has a thread that submits jobs when they become ready, monitors the *  states of the running jobs, and updates the states of jobs based on the state changes  *  of their depending jobs states. The class provides APIs for suspending/resuming *  the thread,and for stopping the thread. *   */public class JobControl implements Runnable{  // The thread can be in one of the following state  private static final int RUNNING = 0;  private static final int SUSPENDED = 1;  private static final int STOPPED = 2;  private static final int STOPPING = 3;  private static final int READY = 4;      private int runnerState;            // the thread state      private Map
waitingJobs; private Map
readyJobs; private Map
runningJobs; private Map
successfulJobs; private Map
failedJobs; private long nextJobID; private String groupName; /** * Construct a job control for a group of jobs. * @param groupName a name identifying this group */ public JobControl(String groupName) { this.waitingJobs = new Hashtable
(); this.readyJobs = new Hashtable
(); this.runningJobs = new Hashtable
(); this.successfulJobs = new Hashtable
(); this.failedJobs = new Hashtable
(); this.nextJobID = -1; this.groupName = groupName; this.runnerState = JobControl.READY; } private static ArrayList
toArrayList(Map
jobs) { ArrayList
retv = new ArrayList
(); synchronized (jobs) { for (Job job : jobs.values()) { retv.add(job); } } return retv; } /** * @return the jobs in the waiting state */ public ArrayList
getWaitingJobs() { return JobControl.toArrayList(this.waitingJobs); } /** * @return the jobs in the running state */ public ArrayList
getRunningJobs() { return JobControl.toArrayList(this.runningJobs); } /** * @return the jobs in the ready state */ public ArrayList
getReadyJobs() { return JobControl.toArrayList(this.readyJobs); } /** * @return the jobs in the success state */ public ArrayList
getSuccessfulJobs() { return JobControl.toArrayList(this.successfulJobs); } public ArrayList
getFailedJobs() { return JobControl.toArrayList(this.failedJobs); } private String getNextJobID() { nextJobID += 1; return this.groupName + this.nextJobID; } private static void addToQueue(Job aJob, Map
queue) { synchronized(queue) { queue.put(aJob.getJobID(), aJob); } } private void addToQueue(Job aJob) { Map
queue = getQueue(aJob.getState()); addToQueue(aJob, queue); } private Map
getQueue(int state) { Map
retv = null; if (state == Job.WAITING) { retv = this.waitingJobs; } else if (state == Job.READY) { retv = this.readyJobs; } else if (state == Job.RUNNING) { retv = this.runningJobs; } else if (state == Job.SUCCESS) { retv = this.successfulJobs; } else if (state == Job.FAILED || state == Job.DEPENDENT_FAILED) { retv = this.failedJobs; } return retv; } /** * Add a new job. * @param aJob the new job */ synchronized public String addJob(Job aJob) { String id = this.getNextJobID(); aJob.setJobID(id); aJob.setState(Job.WAITING); this.addToQueue(aJob); return id; } /** * Add a collection of jobs * * @param jobs */ public void addJobs(Collection
jobs) { for (Job job : jobs) { addJob(job); } } /** * @return the thread state */ public int getState() { return this.runnerState; } /** * set the thread state to STOPPING so that the * thread will stop when it wakes up. */ public void stop() { this.runnerState = JobControl.STOPPING; } /** * suspend the running thread */ public void suspend () { if (this.runnerState == JobControl.RUNNING) { this.runnerState = JobControl.SUSPENDED; } } /** * resume the suspended thread */ public void resume () { if (this.runnerState == JobControl.SUSPENDED) { this.runnerState = JobControl.RUNNING; } } synchronized private void checkRunningJobs() { Map
oldJobs = null; oldJobs = this.runningJobs; this.runningJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.RUNNING) { System.out.println("The state of the running job " + nextJob.getJobName() + " has changed to: " + nextJob.getState()); } */ this.addToQueue(nextJob); } } synchronized private void checkWaitingJobs() { Map
oldJobs = null; oldJobs = this.waitingJobs; this.waitingJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { int state = nextJob.checkState(); /* if (state != Job.WAITING) { System.out.println("The state of the waiting job " + nextJob.getJobName() + " has changed to: " + nextJob.getState()); } */ this.addToQueue(nextJob); } } synchronized private void startReadyJobs() { Map
oldJobs = null; oldJobs = this.readyJobs; this.readyJobs = new Hashtable
(); for (Job nextJob : oldJobs.values()) { //System.out.println("Job to submit to Hadoop: " + nextJob.getJobName()); nextJob.submit(); //System.out.println("Hadoop ID: " + nextJob.getMapredJobID()); this.addToQueue(nextJob); } } synchronized public boolean allFinished() { return this.waitingJobs.size() == 0 && this.readyJobs.size() == 0 && this.runningJobs.size() == 0; } /** * The main loop for the thread. * The loop does the following: * Check the states of the running jobs * Update the states of waiting jobs * Submit the jobs in ready state */ public void run() { this.runnerState = JobControl.RUNNING; while (true) { while (this.runnerState == JobControl.SUSPENDED) { try { Thread.sleep(5000); } catch (Exception e) { } } checkRunningJobs(); checkWaitingJobs(); startReadyJobs(); if (this.runnerState != JobControl.RUNNING && this.runnerState != JobControl.SUSPENDED) { break; } try { Thread.sleep(5000); } catch (Exception e) { } if (this.runnerState != JobControl.RUNNING && this.runnerState != JobControl.SUSPENDED) { break; } } this.runnerState = JobControl.STOPPED; }}
JobControl

参考资料

《Hadoop技术内幕 深入理解MapReduce架构设计与实现原理》

转载于:https://www.cnblogs.com/wuyudong/p/hadoop-jobcontrol.html

你可能感兴趣的文章
js 过滤敏感词
查看>>
poj2752 Seek the Name, Seek the Fame
查看>>
软件开发和软件测试,我该如何选择?(蜗牛学院)
查看>>
基本封装方法
查看>>
bcb ole拖拽功能的实现
查看>>
生活大爆炸之何为光速
查看>>
bzoj 2456: mode【瞎搞】
查看>>
[Typescript] Specify Exact Values with TypeScript’s Literal Types
查看>>
[GraphQL] Reuse Query Fields with GraphQL Fragments
查看>>
Illustrated C#学习笔记(一)
查看>>
理解oracle中连接和会话
查看>>
两种最常用的Sticky footer布局方式
查看>>
Scrapy实战篇(三)之爬取豆瓣电影短评
查看>>
HDU 5510 Bazinga KMP
查看>>
[13年迁移]Firefox下margin-top问题
查看>>
Zookeeper常用命令 (转)
查看>>
Java程序IP v6与IP v4的设置
查看>>
RUP(Rational Unified Process),统一软件开发过程
查看>>
数据库链路创建方法
查看>>
Enterprise Library - Data Access Application Block 6.0.1304
查看>>