您好,登錄后才能下訂單哦!
這篇文章主要講解了“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”,文中的講解內(nèi)容簡(jiǎn)單清晰,易于學(xué)習(xí)與理解,下面請(qǐng)大家跟著小編的思路慢慢深入,一起來(lái)研究和學(xué)習(xí)“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”吧!
github:https://github.com/smartxing/algorithm
1 有向圖的構(gòu)建
DAG dag = new DAG(); dag.addVertex("A"); dag.addVertex("B"); dag.addVertex("C"); dag.addVertex("D"); dag.addEdge("A", "B"); dag.addEdge("A", "C"); System.out.println(dag);
2 拓?fù)渑判驒z測(cè)圖中是否有環(huán)
public boolean isCircularity() { Set<Object> set = inDegree.keySet(); //入度表 Map<Object, AtomicInteger> inDegree = set.stream().collect(Collectors .toMap(k -> k, k -> new AtomicInteger(this.inDegree.get(k).size()))); //入度為0的節(jié)點(diǎn) Set sources = getSources(); LinkedList<Object> queue = new LinkedList(); queue.addAll(sources); while (!queue.isEmpty()) { Object o = queue.removeFirst(); outDegree.get(o) .forEach(so -> { if (inDegree.get(so).decrementAndGet() == 0) { queue.add(so); } }); } return inDegree.values().stream().filter(x -> x.intValue() > 0).count() > 0; }
3 stage優(yōu)化
eg 如果任務(wù)存在如下的關(guān)系 , task1 執(zhí)行完后執(zhí)行 task2 ,task2 執(zhí)行完后執(zhí)行task3 ... Task1 -> Task2 -> Task3 -> Task4 這些task 本來(lái)就要串行執(zhí)行的 可以把這些task 打包在一塊 減少線程上下文的切換 eg : 復(fù)雜一點(diǎn)的DAG: /** * H * \ * G * \ * A -> B * \ * C- D -E - F-> J * * * * 優(yōu)化后得 ==> * * (H,G) * \ * A -> B * \ * (C,D,E) - (F,J) * */ 詳見chain方法: 關(guān)鍵代碼如下 private void chain_(Set sources, final LinkedHashSetMultimap foutChain, final LinkedHashSetMultimap finChain) { sources.forEach(sourceNode -> { ArrayList<Object> maxStage = Lists.newArrayList(); findMaxStage(sourceNode, maxStage); if (maxStage.size() > 1) { //存在需要合并的stage addVertex(foutChain, finChain, maxStage);//添加一個(gè)新節(jié)點(diǎn) Object o = maxStage.get(maxStage.size() - 1); //最后一個(gè)節(jié)點(diǎn) reChain_(foutChain, finChain, maxStage, o); } if (maxStage.size() == 1) { //不存在需要合并的stage addVertex(foutChain, finChain, sourceNode);//添加一個(gè)新節(jié)點(diǎn) Set subNodes = outDegree.get(sourceNode); addSubNodeage(foutChain, finChain, sourceNode, subNodes); } }); } 4 測(cè)試DAG 執(zhí)行 測(cè)試程序: 詳見 DAGExecTest 1 新建一個(gè)task 只打印一句話 public static class Task implements Runnable { private String taskName; public Task(final String taskName) { this.taskName = taskName; } @Override public void run() { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("i am running my name is " + taskName + " finish ThreadID: " + Thread.currentThread().getId()); } public String getTaskName() { return taskName; } @Override public String toString() { return taskName; } } 2 構(gòu)建DAG DAG dag = DAG.create(); Task a = new Task("a"); Task b = new Task("b"); Task c = new Task("c"); Task d = new Task("d"); Task e = new Task("e"); Task f = new Task("f"); Task g = new Task("g"); Task h = new Task("h"); Task j = new Task("j"); dag.addVertex(a); dag.addVertex(b); dag.addVertex(c); dag.addVertex(d); dag.addVertex(e); dag.addVertex(f); dag.addVertex(g); dag.addVertex(h); dag.addVertex(j); dag.addEdge(h, g); dag.addEdge(g, b); dag.addEdge(a, b); dag.addEdge(b, f); dag.addEdge(c, d); dag.addEdge(d, e); dag.addEdge(e, f); dag.addEdge(f, j); 構(gòu)建完成后如圖 * H * \ * G * \ * A -> B * \ * C- D -E - F-> J 3 stage 切分 DAG chain = dag.chain(); 執(zhí)行完圖入下: * (H,G) * \ * A -> B * \ * (C,D,E) - (F,J) 4 執(zhí)行 DAG DAGExecTest 最終結(jié)果打印如下如下: 可以發(fā)現(xiàn)有3個(gè)Stage stage1 包含3個(gè)task task分別在不同的線程里面執(zhí)行 其中c-d-e g-c f-j是經(jīng)過(guò)優(yōu)化的在同一個(gè)線程里面執(zhí)行,減少了不必要的上下文切換 i am running my name is a finish ThreadID: 10 i am running my name is c finish ThreadID: 11 i am running my name is h finish ThreadID: 12 i am running my name is d finish ThreadID: 11 i am running my name is g finish ThreadID: 12 i am running my name is e finish ThreadID: 11 stage 結(jié)束 : task detached:a, task chain c-d-e task chain h-g ----------------------------------------------- i am running my name is b finish ThreadID: 14 stage 結(jié)束 : task detached:b, ----------------------------------------------- i am running my name is f finish ThreadID: 11 i am running my name is j finish ThreadID: 11 stage 結(jié)束 : task chain f-j 測(cè)試執(zhí)行關(guān)鍵代碼如下: chain.execute(col -> { Set set = (Set) col; List<CompletableFuture> completableFutures = Lists.newArrayList(); StringBuilder sb = new StringBuilder(); set.stream().forEach(x -> { if (x instanceof Task) { CompletableFuture<Void> future = CompletableFuture.runAsync((Task) x, executorService); completableFutures.add(future); sb.append(" task detached:" + ((Task) x).getTaskName()).append(","); } if (x instanceof List) { List<Task> taskList = (List) x; CompletableFuture<Void> future = CompletableFuture.runAsync(()-> taskList.forEach(Task::run)); completableFutures.add(future); sb.append( " task chain " + Joiner.on("-").join(taskList.stream().map(Task::getTaskName).collect(Collectors.toList()))); } }); CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join(); System.out.println("stage 結(jié)束 : " + sb.toString()); System.out.println("-----------------------------------------------"); });
感謝各位的閱讀,以上就是“DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化”的內(nèi)容了,經(jīng)過(guò)本文的學(xué)習(xí)后,相信大家對(duì)DAG實(shí)現(xiàn)任務(wù)調(diào)度以及優(yōu)化這一問(wèn)題有了更深刻的體會(huì),具體使用情況還需要大家實(shí)踐驗(yàn)證。這里是億速云,小編將為大家推送更多相關(guān)知識(shí)點(diǎn)的文章,歡迎關(guān)注!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。