溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

Hadoo是怎么將作業(yè)提交給集群的

發(fā)布時(shí)間:2021-12-09 14:36:09 來源:億速云 閱讀:127 作者:iii 欄目:云計(jì)算

這篇文章主要介紹“Hadoo是怎么將作業(yè)提交給集群的”,在日常操作中,相信很多人在Hadoo是怎么將作業(yè)提交給集群的問題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”Hadoo是怎么將作業(yè)提交給集群的”的疑惑有所幫助!接下來,請(qǐng)跟著小編一起來學(xué)習(xí)吧!

一:MapReduce提交作業(yè)過程的流程圖

Hadoo是怎么將作業(yè)提交給集群的

通過圖可知主要有三個(gè)部分,即: 1) JobClient:作業(yè)客戶端。 2) JobTracker:作業(yè)的跟蹤器。 3) TaskTracker:任務(wù)的跟蹤器。

MapReduce將作業(yè)提交給JobClient,然后JobClient與JobTracker交互,JobTracker再去監(jiān)控與分配TaskTracker,完成具體作業(yè)的處理。

以下分析的是Hadoop2.6.4的源碼。請(qǐng)注意: 源碼與之前Hadoop版本的略有差別,所以有些概念還是與上圖有點(diǎn)差別。

二:MapReduce如何提交作業(yè)

2.1 完成作業(yè)的真正提交,即:

**job.waitForCompletion(true)**

跟蹤waitForCompletion, 注意其中的submit(),如下:

/**
   * Submit the job to the cluster and wait for it to finish.
   */

 public boolean waitForCompletion(boolean verbose
                                   ) throws IOException, InterruptedException,
                                            ClassNotFoundException {
    if (state == JobState.DEFINE) {
      submit();
    }
    if (verbose) {
      monitorAndPrintJob();
    } else {
      // get the completion poll interval from the client.
      int completionPollIntervalMillis = 
        Job.getCompletionPollInterval(cluster.getConf());
      while (!isComplete()) {
        try {
          Thread.sleep(completionPollIntervalMillis);
        } catch (InterruptedException ie) {
        }
      }
    }
    return isSuccessful();
  }

參數(shù) verbose ,如果想在控制臺(tái)打印當(dāng)前的任務(wù)執(zhí)行進(jìn)度,則設(shè)為true

**

2.2 submit()

** 在submit 方法中會(huì)把Job提交給對(duì)應(yīng)的Cluster,然后不等待Job執(zhí)行結(jié)束就立刻返回

同時(shí)會(huì)把Job實(shí)例的狀態(tài)設(shè)置為JobState.RUNNING,從而來表示Job正在進(jìn)行中

然后在Job運(yùn)行過程中,可以調(diào)用getJobState()來獲取Job的運(yùn)行狀態(tài)

 /**
   * Submit the job to the cluster and return immediately.
   */
  public void submit() 
         throws IOException, InterruptedException, ClassNotFoundException {
    ensureState(JobState.DEFINE);
    setUseNewAPI();
    connect();
    final JobSubmitter submitter = 
        getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
    status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
      public JobStatus run() throws IOException, InterruptedException, 
      ClassNotFoundException {
        return submitter.submitJobInternal(Job.this, cluster);
      }
    });
    state = JobState.RUNNING;
    LOG.info("The url to track the job: " + getTrackingURL());
   }

而在任務(wù)提交前,會(huì)先通過connect()方法鏈接集群(Cluster):

private synchronized void connect()
          throws IOException, InterruptedException, ClassNotFoundException {
    if (cluster == null) {
      cluster = 
        ugi.doAs(new PrivilegedExceptionAction<Cluster>() {
                   public Cluster run()
                          throws IOException, InterruptedException, 
                                 ClassNotFoundException {
                     return new Cluster(getConfiguration());
                   }
                 });
    }
  }

這是一個(gè)線程保護(hù)方法。這個(gè)方法中根據(jù)配置信息初始化了一個(gè)Cluster對(duì)象,即代表集群

public Cluster(Configuration conf) throws IOException {
    this(null, conf);
  }

  public Cluster(InetSocketAddress jobTrackAddr, Configuration conf) 
      throws IOException {
    this.conf = conf;
    this.ugi = UserGroupInformation.getCurrentUser();
    initialize(jobTrackAddr, conf);
  }

  private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
      throws IOException {

    synchronized (frameworkLoader) {
      for (ClientProtocolProvider provider : frameworkLoader) {
        LOG.debug("Trying ClientProtocolProvider : "
            + provider.getClass().getName());
        ClientProtocol clientProtocol = null; 
        try {
          if (jobTrackAddr == null) {
            clientProtocol = provider.create(conf);
          } else {
            clientProtocol = provider.create(jobTrackAddr, conf);
          }

          if (clientProtocol != null) {
            clientProtocolProvider = provider;
            client = clientProtocol;
            LOG.debug("Picked " + provider.getClass().getName()
                + " as the ClientProtocolProvider");
            break;
          }
          else {
            LOG.debug("Cannot pick " + provider.getClass().getName()
                + " as the ClientProtocolProvider - returned null protocol");
          }
        } 
        catch (Exception e) {
          LOG.info("Failed to use " + provider.getClass().getName()
              + " due to error: " + e.getMessage());
        }
      }
    }

    if (null == clientProtocolProvider || null == client) {
      throw new IOException(
          "Cannot initialize Cluster. Please check your configuration for "
              + MRConfig.FRAMEWORK_NAME
              + " and the correspond server addresses.");
    }
  }

而在上段代碼之前,

 private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
      ServiceLoader.load(ClientProtocolProvider.class);

可以看出創(chuàng)建客戶端代理階段使用了java.util.ServiceLoader,包含LocalClientProtocolProvider(本地作業(yè))和YarnClientProtocolProvider(yarn作業(yè))(hadoop有一個(gè)Yarn參數(shù)mapreduce.framework.name用來控制你選擇的應(yīng)用框架。在MRv2里,mapreduce.framework.name有兩個(gè)值:local和yarn),此處會(huì)根據(jù)mapreduce.framework.name的配置創(chuàng)建相應(yīng)的客戶端

mapred-site.xml:

<configuration>
    <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
    </property>
</configuration>

2.3 實(shí)例化Cluster后開始真正的任務(wù)提交

submitter.submitJobInternal(Job.this, cluster);
 JobStatus submitJobInternal(Job job, Cluster cluster) 
  throws ClassNotFoundException, InterruptedException, IOException {

    //validate the jobs output specs 
    checkSpecs(job);

    Configuration conf = job.getConfiguration();
    addMRFrameworkToDistributedCache(conf);

    Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
    //configure the command line options correctly on the submitting dfs
    InetAddress ip = InetAddress.getLocalHost();
    if (ip != null) {
      submitHostAddress = ip.getHostAddress();
      submitHostName = ip.getHostName();
      conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
      conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
    }
    JobID jobId = submitClient.getNewJobID();
    job.setJobID(jobId);
    Path submitJobDir = new Path(jobStagingArea, jobId.toString());
    JobStatus status = null;
    try {
      conf.set(MRJobConfig.USER_NAME,
          UserGroupInformation.getCurrentUser().getShortUserName());
      conf.set("hadoop.http.filter.initializers", 
          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
      LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
          + " as the submit dir");
      // get delegation token for the dir
      TokenCache.obtainTokensForNamenodes(job.getCredentials(),
          new Path[] { submitJobDir }, conf);

      populateTokenCache(conf, job.getCredentials());

      // generate a secret to authenticate shuffle transfers
      if (TokenCache.getShuffleSecretKey(job.getCredentials()) == null) {
        KeyGenerator keyGen;
        try {

          int keyLen = CryptoUtils.isShuffleEncrypted(conf) 
              ? conf.getInt(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS, 
                  MRJobConfig.DEFAULT_MR_ENCRYPTED_INTERMEDIATE_DATA_KEY_SIZE_BITS)
              : SHUFFLE_KEY_LENGTH;
          keyGen = KeyGenerator.getInstance(SHUFFLE_KEYGEN_ALGORITHM);
          keyGen.init(keyLen);
        } catch (NoSuchAlgorithmException e) {
          throw new IOException("Error generating shuffle secret key", e);
        }
        SecretKey shuffleKey = keyGen.generateKey();
        TokenCache.setShuffleSecretKey(shuffleKey.getEncoded(),
            job.getCredentials());
      }

      copyAndConfigureFiles(job, submitJobDir);

      Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);

      // Create the splits for the job
      LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
      int maps = writeSplits(job, submitJobDir);
      conf.setInt(MRJobConfig.NUM_MAPS, maps);
      LOG.info("number of splits:" + maps);

      // write "queue admins of the queue to which job is being submitted"
      // to job file.
      String queue = conf.get(MRJobConfig.QUEUE_NAME,
          JobConf.DEFAULT_QUEUE_NAME);
      AccessControlList acl = submitClient.getQueueAdmins(queue);
      conf.set(toFullPropertyName(queue,
          QueueACL.ADMINISTER_JOBS.getAclName()), acl.getAclString());

      // removing jobtoken referrals before copying the jobconf to HDFS
      // as the tasks don't need this setting, actually they may break
      // because of it if present as the referral will point to a
      // different job.
      TokenCache.cleanUpTokenReferral(conf);

      if (conf.getBoolean(
          MRJobConfig.JOB_TOKEN_TRACKING_IDS_ENABLED,
          MRJobConfig.DEFAULT_JOB_TOKEN_TRACKING_IDS_ENABLED)) {
        // Add HDFS tracking ids
        ArrayList<String> trackingIds = new ArrayList<String>();
        for (Token<? extends TokenIdentifier> t :
            job.getCredentials().getAllTokens()) {
          trackingIds.add(t.decodeIdentifier().getTrackingId());
        }
        conf.setStrings(MRJobConfig.JOB_TOKEN_TRACKING_IDS,
            trackingIds.toArray(new String[trackingIds.size()]));
      }

      // Set reservation info if it exists
      ReservationId reservationId = job.getReservationId();
      if (reservationId != null) {
        conf.set(MRJobConfig.RESERVATION_ID, reservationId.toString());
      }

      // Write job file to submit dir
      writeConf(conf, submitJobFile);

      //
      // Now, actually submit the job (using the submit name)
      //
      printTokens(jobId, job.getCredentials());
      status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());
      if (status != null) {
        return status;
      } else {
        throw new IOException("Could not launch job");
      }
    } finally {
      if (status == null) {
        LOG.info("Cleaning up the staging area " + submitJobDir);
        if (jtFs != null && submitJobDir != null)
          jtFs.delete(submitJobDir, true);

      }
    }
  }

通過如下代碼正式提交Job到Y(jié)arn

 status = submitClient.submitJob(
          jobId, submitJobDir.toString(), job.getCredentials());

Hadoo是怎么將作業(yè)提交給集群的

到最后,通過RPC的調(diào)用,最終會(huì)返回一個(gè)JobStatus對(duì)象,它的toString方法可以在JobClient端打印運(yùn)行的相關(guān)日志信息。

if (status != null) {
        return status;
      }
public String toString() {
    StringBuffer buffer = new StringBuffer();
    buffer.append("job-id : " + jobid);
    buffer.append("uber-mode : " + isUber);
    buffer.append("map-progress : " + mapProgress);
    buffer.append("reduce-progress : " + reduceProgress);
    buffer.append("cleanup-progress : " + cleanupProgress);
    buffer.append("setup-progress : " + setupProgress);
    buffer.append("runstate : " + runState);
    buffer.append("start-time : " + startTime);
    buffer.append("user-name : " + user);
    buffer.append("priority : " + priority);
    buffer.append("scheduling-info : " + schedulingInfo);
    buffer.append("num-used-slots" + numUsedSlots);
    buffer.append("num-reserved-slots" + numReservedSlots);
    buffer.append("used-mem" + usedMem);
    buffer.append("reserved-mem" + reservedMem);
    buffer.append("needed-mem" + neededMem);
    return buffer.toString();
  }

(到這里任務(wù)都給yarn了,這里就只剩下監(jiān)控(如果設(shè)置為true的話)),即:

    if (verbose) {
        monitorAndPrintJob();
    }

這只是完成了作業(yè)Job的提交。

到此,關(guān)于“Hadoo是怎么將作業(yè)提交給集群的”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI