千家信息网

Hadoo是怎么将作业提交给集群的

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,这篇文章主要介绍"Hadoo是怎么将作业提交给集群的",在日常操作中,相信很多人在Hadoo是怎么将作业提交给集群的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Had
千家信息网最后更新 2025年12月01日Hadoo是怎么将作业提交给集群的

这篇文章主要介绍"Hadoo是怎么将作业提交给集群的",在日常操作中,相信很多人在Hadoo是怎么将作业提交给集群的问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答"Hadoo是怎么将作业提交给集群的"的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

一:MapReduce提交作业过程的流程图

通过图可知主要有三个部分,即: 1) JobClient:作业客户端。 2) JobTracker:作业的跟踪器。 3) TaskTracker:任务的跟踪器。

MapReduce将作业提交给JobClient,然后JobClient与JobTracker交互,JobTracker再去监控与分配TaskTracker,完成具体作业的处理。

以下分析的是Hadoop2.6.4的源码。请注意: 源码与之前Hadoop版本的略有差别,所以有些概念还是与上图有点差别。

二:MapReduce如何提交作业

2.1 完成作业的真正提交,即:

**job.waitForCompletion(true)**

跟踪waitForCompletion, 注意其中的submit(),如下:

/**   * Submit the job to the cluster and wait for it to finish.   */ public boolean waitForCompletion(boolean verbose                                   ) throws IOException, InterruptedException,                                            ClassNotFoundException {    if (state == JobState.DEFINE) {      submit();    }    if (verbose) {      monitorAndPrintJob();    } else {      // get the completion poll interval from the client.      int completionPollIntervalMillis =         Job.getCompletionPollInterval(cluster.getConf());      while (!isComplete()) {        try {          Thread.sleep(completionPollIntervalMillis);        } catch (InterruptedException ie) {        }      }    }    return isSuccessful();  }

参数 verbose ,如果想在控制台打印当前的任务执行进度,则设为true

**

2.2 submit()

** 在submit 方法中会把Job提交给对应的Cluster,然后不等待Job执行结束就立刻返回

同时会把Job实例的状态设置为JobState.RUNNING,从而来表示Job正在进行中

然后在Job运行过程中,可以调用getJobState()来获取Job的运行状态

 /**   * Submit the job to the cluster and return immediately.   */  public void submit()          throws IOException, InterruptedException, ClassNotFoundException {    ensureState(JobState.DEFINE);    setUseNewAPI();    connect();    final JobSubmitter submitter =         getJobSubmitter(cluster.getFileSystem(), cluster.getClient());    status = ugi.doAs(new PrivilegedExceptionAction() {      public JobStatus run() throws IOException, InterruptedException,       ClassNotFoundException {        return submitter.submitJobInternal(Job.this, cluster);      }    });    state = JobState.RUNNING;    LOG.info("The url to track the job: " + getTrackingURL());   }

而在任务提交前,会先通过connect()方法链接集群(Cluster):

private synchronized void connect()          throws IOException, InterruptedException, ClassNotFoundException {    if (cluster == null) {      cluster =         ugi.doAs(new PrivilegedExceptionAction() {                   public Cluster run()                          throws IOException, InterruptedException,                                  ClassNotFoundException {                     return new Cluster(getConfiguration());                   }                 });    }  }

这是一个线程保护方法。这个方法中根据配置信息初始化了一个Cluster对象,即代表集群

public Cluster(Configuration conf) throws IOException {    this(null, conf);  }  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf)       throws IOException {    this.conf = conf;    this.ugi = UserGroupInformation.getCurrentUser();    initialize(jobTrackAddr, conf);  }  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)      throws IOException {    synchronized (frameworkLoader) {      for (ClientProtocolProvider provider : frameworkLoader) {        LOG.debug("Trying ClientProtocolProvider : "            + provider.getClass().getName());        ClientProtocol clientProtocol = null;         try {          if (jobTrackAddr == null) {            clientProtocol = provider.create(conf);          } else {            clientProtocol = provider.create(jobTrackAddr, conf);          }          if (clientProtocol != null) {            clientProtocolProvider = provider;            client = clientProtocol;            LOG.debug("Picked " + provider.getClass().getName()                + " as the ClientProtocolProvider");            break;          }          else {            LOG.debug("Cannot pick " + provider.getClass().getName()                + " as the ClientProtocolProvider - returned null protocol");          }        }         catch (Exception e) {          LOG.info("Failed to use " + provider.getClass().getName()              + " due to error: " + e.getMessage());        }      }    }    if (null == clientProtocolProvider || null == client) {      throw new IOException(          "Cannot initialize Cluster. Please check your configuration for "              + MRConfig.FRAMEWORK_NAME              + " and the correspond server addresses.");    }  }

而在上段代码之前,

 private static ServiceLoader frameworkLoader =      ServiceLoader.load(ClientProtocolProvider.class);

可以看出创建客户端代理阶段使用了java.util.ServiceLoader,包含LocalClientProtocolProvider(本地作业)和YarnClientProtocolProvider(yarn作业)(hadoop有一个Yarn参数mapreduce.framework.name用来控制你选择的应用框架。在MRv2里,mapreduce.framework.name有两个值:local和yarn),此处会根据mapreduce.framework.name的配置创建相应的客户端

mapred-site.xml:

            mapreduce.framework.name        yarn    

2.3 实例化Cluster后开始真正的任务提交

submitter.submitJobInternal(Job.this, cluster);
 JobStatus submitJobInternal(Job job, Cluster cluster)   throws ClassNotFoundException, InterruptedException, IOException {    //validate the jobs output specs     checkSpecs(job);    Configuration conf = job.getConfiguration();    addMRFrameworkToDistributedCache(conf);    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);    //configure the command line options correctly on the submitting dfs    InetAddress ip = InetAddress.getLocalHost();    if (ip != null) {      submitHostAddress = ip.getHostAddress();      submitHostName = ip.getHostName();      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);    }    JobID jobId = submitClient.getNewJobID();    job.setJobID(jobId);    Path submitJobDir = new Path(jobStagingArea, jobId.toString());    JobStatus status = null;    try {      conf.set(MRJobConfig.USER_NAME,          UserGroupInformation.getCurrentUser().getShortUserName());      conf.set("hadoop.http.filter.initializers",           "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());      LOG.debug("Configuring job " + jobId + " with " + submitJobDir           + " as the submit dir");      // get delegation token for the dir      TokenCache.obtainTokensForNamenodes(job.getCredentials(),          new Path[] { submitJobDir }, conf);      populateTokenCache(conf, job.getCredentials());      // generate a secret to authenticate shuffle transfers      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {        KeyGenerator keyGen;        try {          int keyLen = CryptoUtils.isShuffleEncrypted(conf)               ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS,                   MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)              : SHUFFLE_KEY_LENGTH;          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);          keyGen.init(keyLen);        } catch (NoSuchAlgorithmException e) {          throw new IOException("Error generating shuffle secret key", e);        }        SecretKey shuffleKey = keyGen.generateKey();        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),            job.getCredentials());      }      copyAndConfigureFiles(job, submitJobDir);      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);      // Create the splits for the job      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));      int maps = writeSplits(job, submitJobDir);      conf.setInt(MRJobConfig.NUM_MAPS, maps);      LOG.info("number of splits:" + maps);      // write "queue admins of the queue to which job is being submitted"      // to job file.      String queue = conf.get(MRJobConfig.QUEUE_NAME,          JobConf.DEFAULT_QUEUE_NAME);      AccessControlList acl = submitClient.getQueueAdmins(queue);      conf.set(toFullPropertyName(queue,          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());      // removing jobtoken referrals before copying the jobconf to HDFS      // as the tasks don't need this setting, actually they may break      // because of it if present as the referral will point to a      // different job.      TokenCache.cleanUpTokenReferral(conf);      if (conf.getBoolean(          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {        // Add HDFS tracking ids        ArrayList trackingIds = new ArrayList();        for (Token t :            job.getCredentials().getAllTokens()) {          trackingIds.add(t.decodeIdentifier().getTrackingId());        }        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,            trackingIds.toArray(new String[trackingIds.size()]));      }      // Set reservation info if it exists      ReservationId reservationId = job.getReservationId();      if (reservationId != null) {        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());      }      // Write job file to submit dir      writeConf(conf, submitJobFile);      //      // Now, actually submit the job (using the submit name)      //      printTokens(jobId, job.getCredentials());      status = submitClient.submitJob(          jobId, submitJobDir.toString(), job.getCredentials());      if (status != null) {        return status;      } else {        throw new IOException("Could not launch job");      }    } finally {      if (status == null) {        LOG.info("Cleaning up the staging area " + submitJobDir);        if (jtFs != null && submitJobDir != null)          jtFs.delete(submitJobDir, true);      }    }  }

通过如下代码正式提交Job到Yarn

 status = submitClient.submitJob(          jobId, submitJobDir.toString(), job.getCredentials());

到最后,通过RPC的调用,最终会返回一个JobStatus对象,它的toString方法可以在JobClient端打印运行的相关日志信息。

if (status != null) {        return status;      }
public String toString() {    StringBuffer buffer = new StringBuffer();    buffer.append("job-id : " + jobid);    buffer.append("uber-mode : " + isUber);    buffer.append("map-progress : " + mapProgress);    buffer.append("reduce-progress : " + reduceProgress);    buffer.append("cleanup-progress : " + cleanupProgress);    buffer.append("setup-progress : " + setupProgress);    buffer.append("runstate : " + runState);    buffer.append("start-time : " + startTime);    buffer.append("user-name : " + user);    buffer.append("priority : " + priority);    buffer.append("scheduling-info : " + schedulingInfo);    buffer.append("num-used-slots" + numUsedSlots);    buffer.append("num-reserved-slots" + numReservedSlots);    buffer.append("used-mem" + usedMem);    buffer.append("reserved-mem" + reservedMem);    buffer.append("needed-mem" + neededMem);    return buffer.toString();  }

(到这里任务都给yarn了,这里就只剩下监控(如果设置为true的话)),即:

    if (verbose) {        monitorAndPrintJob();    }

这只是完成了作业Job的提交。

到此,关于"Hadoo是怎么将作业提交给集群的"的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注网站,小编会继续努力为大家带来更多实用的文章!

作业 集群 方法 任务 学习 客户 客户端 跟踪 运行 代码 信息 参数 实例 对象 差别 更多 源码 状态 过程 跟踪器 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 基础设施网络安全意识 全国消防日安全网络安全竞赛 管理学试卷软件开发 奥的斯用服务器呼梯方法 服务器开机没反应 福建服务器机柜定做高档虚拟主机 咸宁华康网络技术服务有限公司 dns服务器没检测到但有响应 网络安全会议议程 基于hodoop的数据库 武汉网络安全基地建设情况 淮安创新软件开发答疑解惑 向乙方网络安全督办函 软件测试转软件开发 网络安全主题短视频素材 京东商城网络安全的防范措施 求mc服务器 杭州数据网络技术服务电话 嵌入式软件开发工资怎样 连连周边游软件开发 北京咨询云控软件开发商 php 关闭数据库 京东商城网络安全的防范措施 软件开发企业怎么确认收入 武汉网络安全基地二期大致位置 电子表格数据库名称怎么设置 美工和软件开发有啥关系 义乌软件开发商 软件开发团队奖励计划 服务器硬盘普通台式电脑可以用吗
0