溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化

發(fā)布時(shí)間:2021-06-22 16:55:54 來(lái)源:億速云 閱讀:768 作者:chen 欄目:大數(shù)據(jù)

這篇文章主要講解了“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”吧!

DAG Task 任務(wù)調(diào)度算法 實(shí)現(xiàn)

github:https://github.com/smartxing/algorithm

1 有向圖的構(gòu)建

DAG dag = new DAG();
    dag.addVertex("A");
    dag.addVertex("B");
    dag.addVertex("C");
    dag.addVertex("D");
    dag.addEdge("A", "B");
    dag.addEdge("A", "C");
    System.out.println(dag);

2 拓?fù)渑判驒z測(cè)圖中是否有環(huán)

public boolean isCircularity() {
    Set<Object> set = inDegree.keySet();
    //入度表
    Map<Object, AtomicInteger> inDegree = set.stream().collect(Collectors
        .toMap(k -> k, k -> new AtomicInteger(this.inDegree.get(k).size())));
    //入度為0的節(jié)點(diǎn)
    Set sources = getSources();
    LinkedList<Object> queue = new LinkedList();
    queue.addAll(sources);
    while (!queue.isEmpty()) {
      Object o = queue.removeFirst();
      outDegree.get(o)
          .forEach(so -> {
            if (inDegree.get(so).decrementAndGet() == 0) {
              queue.add(so);
            }
          });
    }
    return inDegree.values().stream().filter(x -> x.intValue() > 0).count() > 0;
  }

3 stage優(yōu)化

  eg   
  如果任務(wù)存在如下的關(guān)系 , task1 執(zhí)行完后執(zhí)行 task2 ,task2 執(zhí)行完后執(zhí)行task3 ...     
  Task1 -> Task2 -> Task3 -> Task4 
  這些task 本來(lái)就要串行執(zhí)行的 可以把這些task 打包在一塊 減少線程上下文的切換  
  
  eg : 復(fù)雜一點(diǎn)的DAG:
   /**
     *  H
     *    \
     *      G
     *        \
     *     A -> B
     *            \
     *  C- D  -E  - F-> J
     *
     *
     *
     *    優(yōu)化后得  ==>
     *
     *     (H,G)
     *         \
     *     A -> B
     *            \
     *  (C,D,E)  - (F,J)
     *
     */
     
      詳見chain方法: 關(guān)鍵代碼如下
     
     private void chain_(Set sources, final LinkedHashSetMultimap foutChain, final LinkedHashSetMultimap finChain) {
         sources.forEach(sourceNode -> {
     
           ArrayList<Object> maxStage = Lists.newArrayList();
           findMaxStage(sourceNode, maxStage);
           if (maxStage.size() > 1) { //存在需要合并的stage
             addVertex(foutChain, finChain, maxStage);//添加一個(gè)新節(jié)點(diǎn)
             Object o = maxStage.get(maxStage.size() - 1); //最后一個(gè)節(jié)點(diǎn)
             reChain_(foutChain, finChain, maxStage, o);
           }
           if (maxStage.size() == 1) {
             //不存在需要合并的stage
             addVertex(foutChain, finChain, sourceNode);//添加一個(gè)新節(jié)點(diǎn)
             Set subNodes = outDegree.get(sourceNode);
             addSubNodeage(foutChain, finChain, sourceNode, subNodes);
           }
         });
       }
     
     
     
      
     
4 測(cè)試DAG 執(zhí)行
  
  測(cè)試程序: 詳見 DAGExecTest
  1 新建一個(gè)task  只打印一句話
  public static class Task implements Runnable {
 
     private String taskName;
 
     public Task(final String taskName) {
       this.taskName = taskName;
     }
 
     @Override public void run() {
       try {
         Thread.sleep(2000);
       } catch (InterruptedException e) {
         e.printStackTrace();
       }
       System.out.println("i am running  my name is " + taskName + "  finish ThreadID: " + Thread.currentThread().getId());
     }
 
     public String getTaskName() {
       return taskName;
     }
 
     @Override public String toString() {
       return  taskName;
     }
   }
   2 構(gòu)建DAG 
       DAG dag = DAG.create();
       Task a = new Task("a");
       Task b = new Task("b");
       Task c = new Task("c");
       Task d = new Task("d");
       Task e = new Task("e");
       Task f = new Task("f");
       Task g = new Task("g");
       Task h = new Task("h");
       Task j = new Task("j");
       dag.addVertex(a);
       dag.addVertex(b);
       dag.addVertex(c);
       dag.addVertex(d);
       dag.addVertex(e);
       dag.addVertex(f);
       dag.addVertex(g);
       dag.addVertex(h);
       dag.addVertex(j);
       dag.addEdge(h, g);
       dag.addEdge(g, b);
       dag.addEdge(a, b);
       dag.addEdge(b, f);
       dag.addEdge(c, d);
       dag.addEdge(d, e);
       dag.addEdge(e, f);
       dag.addEdge(f, j);
     構(gòu)建完成后如圖
          *   H
          *    \
          *      G
          *        \
          *     A -> B
          *            \
          *  C- D  -E  - F-> J
          
    3 stage 切分  
    DAG chain = dag.chain();
    執(zhí)行完圖入下:
         *     (H,G)
         *         \
         *     A -> B
         *            \
         *  (C,D,E)  - (F,J)
         
    4 執(zhí)行 DAG  DAGExecTest   最終結(jié)果打印如下如下:
     
     可以發(fā)現(xiàn)有3個(gè)Stage   stage1  包含3個(gè)task  task分別在不同的線程里面執(zhí)行  
     其中c-d-e   g-c  f-j是經(jīng)過(guò)優(yōu)化的在同一個(gè)線程里面執(zhí)行,減少了不必要的上下文切換 
    
      i am running  my name is a  finish ThreadID: 10
      i am running  my name is c  finish ThreadID: 11
      i am running  my name is h  finish ThreadID: 12
      i am running  my name is d  finish ThreadID: 11
      i am running  my name is g  finish ThreadID: 12
      i am running  my name is e  finish ThreadID: 11
      stage 結(jié)束 :  task detached:a, task chain c-d-e task chain h-g
      -----------------------------------------------
      i am running  my name is b  finish ThreadID: 14
      stage 結(jié)束 :  task detached:b,
      -----------------------------------------------
      i am running  my name is f  finish ThreadID: 11
      i am running  my name is j  finish ThreadID: 11
      stage 結(jié)束 :  task chain f-j
      測(cè)試執(zhí)行關(guān)鍵代碼如下:
      chain.execute(col -> {
            Set set = (Set) col;
            List<CompletableFuture> completableFutures = Lists.newArrayList();
            StringBuilder sb = new StringBuilder();
            set.stream().forEach(x -> {
              if (x instanceof Task) {
                CompletableFuture<Void> future = CompletableFuture.runAsync((Task) x, executorService);
                completableFutures.add(future);
                sb.append(" task detached:" + ((Task) x).getTaskName()).append(",");
              }
              if (x instanceof List) {
                List<Task> taskList = (List) x;
                CompletableFuture<Void> future = CompletableFuture.runAsync(()->
                  taskList.forEach(Task::run));
                completableFutures.add(future);
                sb.append(
                    " task chain " + Joiner.on("-").join(taskList.stream().map(Task::getTaskName).collect(Collectors.toList())));
              }
            });
            CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
            System.out.println("stage 結(jié)束 : " + sb.toString());
            System.out.println("-----------------------------------------------");
          });

感謝各位的閱讀,以上就是“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

dag
AI