溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

發(fā)布時間:2021-12-03 16:39:50 來源:億速云 閱讀:124 作者:柒染 欄目:云計算

本篇文章為大家展示了Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細(xì)介紹希望你能有所收獲。

1. 前言

Schedulerx2.0的客戶端提供分布式執(zhí)行、多種任務(wù)類型、統(tǒng)一日志等框架,用戶只要依賴schedulerx-worker這個jar包,通過schedulerx2.0提供的編程模型,簡單幾行代碼就能實(shí)現(xiàn)一套高可靠可運(yùn)維的分布式執(zhí)行引擎。

這篇文章重點(diǎn)是介紹基于schedulerx2.0的分布式執(zhí)行引擎原理和最佳實(shí)踐,相信看完這篇文章,大家都能寫出高效率的分布式作業(yè),說不定速度能提升好幾倍:)

2. 可擴(kuò)展的執(zhí)行引擎

Worker總體架構(gòu)參考Yarn的架構(gòu),分為TaskMaster, Container, Processor三層:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

  • TaskMaster:類似于yarn的AppMaster,支持可擴(kuò)展的分布式執(zhí)行框架,進(jìn)行整個jobInstance的生命周期管理、container的資源管理,同時還有failover等能力。默認(rèn)實(shí)現(xiàn)StandaloneTaskMaster(單機(jī)執(zhí)行),BroadcastTaskMaster(廣播執(zhí)行),MapTaskMaster(并行計算、內(nèi)存網(wǎng)格、網(wǎng)格計算),MapReduceTaskMaster(并行計算、內(nèi)存網(wǎng)格、網(wǎng)格計算)。

  • Container:執(zhí)行業(yè)務(wù)邏輯的容器框架,支持線程/進(jìn)程/docker/actor等。

  • Processor:業(yè)務(wù)邏輯框架,不同的processor表示不同的任務(wù)類型。

以MapTaskMaster為例,大概的原理如下圖所示:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

3. 分布式編程模型之Map模型

Schedulerx2.0提供了多種分布式編程模型,這篇文章主要介紹Map模型(之后的文章還會介紹MapReduce模型,適用更多的業(yè)務(wù)場景),簡單幾行代碼就可以將海量數(shù)據(jù)分布式到多臺機(jī)器上進(jìn)行分布式跑批,非常簡單易用。

針對不同的跑批場景,map模型作業(yè)還提供了并行計算、內(nèi)存網(wǎng)格、網(wǎng)格計算三種執(zhí)行方式:

  • 并行計算:子任務(wù)300以下,有子任務(wù)列表。

  • 內(nèi)存網(wǎng)格:子任務(wù)5W以下,無子任務(wù)列表,速度快。

  • 網(wǎng)格計算:子任務(wù)100W以下,無子任務(wù)列表。

4. 并行計算原理

因?yàn)椴⑿腥蝿?wù)具有子任務(wù)列表:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的
如上圖,子任務(wù)列表可以看到每個子任務(wù)的狀態(tài)、機(jī)器,還有重跑、查看日志等操作。

因?yàn)椴⑿杏嬎阋龅阶尤蝿?wù)級別的可視化,并且worker掛了、重啟還能支持手動重跑,就需要把task持久化到server端:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的
如上圖所示:

  1. server觸發(fā)jobInstance到某個worker,選中為master。

  2. MapTaskMaster選擇某個worker執(zhí)行root任務(wù),當(dāng)執(zhí)行map方法時,會回調(diào)MapTaskMaster。

  3. MapTaskMaster收到map方法,會把task持久化到server端。

  4. 同時,MapTaskMaster還有個pull線程,不停拉取INIT狀態(tài)的task,并派發(fā)給其他worker執(zhí)行。

5. 網(wǎng)格計算原理

網(wǎng)格計算要支持百萬級別的task,如果所有任務(wù)都往server回寫,server肯定扛不住,所以網(wǎng)格計算的存儲實(shí)際上是分布式在用戶自己的機(jī)器上的:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

如上圖所示:

  1. server觸發(fā)jobInstance到某個worker,選中為master。

  2. MapTaskMaster選擇某個worker執(zhí)行root任務(wù),當(dāng)執(zhí)行map方法時,會回調(diào)MapTaskMaster。

  3. MapTaskMaster收到map方法,會把task持久化到本地h3數(shù)據(jù)庫。

  4. 同時,MapTaskMaster還有個pull線程,不停拉取INIT狀態(tài)的task,并派發(fā)給其他worker執(zhí)行。

6. 最佳實(shí)踐

6.1 需求

舉個例子:

  1. 讀取A表中status=0的數(shù)據(jù)。

  2. 處理這些數(shù)據(jù),插入B表。

  3. 把A表中處理過的數(shù)據(jù)的修改status=1。

  4. 數(shù)據(jù)量有4億+,希望縮短時間。

6.2 反面案例

我們先看下如下代碼是否有問題?

public class ScanSingleTableProcessor extends MapJobProcessor {
    private static int pageSize = 1000;

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();

        if (WorkerConstants.MAP_TASK_ROOT_NAME.equals(taskName)) {
            int recordCount = queryRecordCount();
            int pageAmount = recordCount / pageSize;//計算分頁數(shù)量
            for(int i = 0 ; i < pageAmount ; i ++) {
                List<Record> recordList = queryRecord(i);//根據(jù)分頁查詢一頁數(shù)據(jù)
                map(recordList, "record記錄");//把子任務(wù)分發(fā)出去并行處理
            }
            return new ProcessResult(true);//true表示執(zhí)行成功,false表示失敗
        } else if ("record記錄".equals(taskName)) {
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(false);
    }
}

如上面的代碼所示,在root任務(wù)中,會把數(shù)據(jù)庫所有記錄讀取出來,每一行就是一個Record,然后分發(fā)出去,分布式到不同的worker上去執(zhí)行。邏輯是沒有問題的,但是實(shí)際上性能非常的差。結(jié)合網(wǎng)格計算原理,我們把上面的代碼繪制成下面這幅圖:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

如上圖所示,root任務(wù)一開始會全量的讀取A表的數(shù)據(jù),然后會全量的存到h3中,pull線程還會全量的從h3讀取一次所有的task,還會分發(fā)給所有客戶端。所以實(shí)際上對A表中的數(shù)據(jù):

  • 全量讀2次

  • 全量寫一次

  • 全量傳輸一次

這個效率是非常低的。

6.3 正面案例

下面給出正面案例的代碼:

public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (taskName.equals(WorkerConstants.MAP_TASK_ROOT_NAME)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //計算分頁數(shù)量
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch");
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO
            return new ProcessResult(true);
        }
        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx
        return null;
    }
}

如上面的代碼所示,

  • 每個task不是整行記錄的record,而是PageTask,里面就2個字段,startId和endId。

  • root任務(wù),沒有全量的讀取A表,而是讀一下整張表的minId和maxId,然后構(gòu)造PageTask進(jìn)行分頁。比如task1表示PageTask[1,1000],task2表示PageTask[1001,2000]。每個task處理A表不同的數(shù)據(jù)。

  • 在下一級task中,如果拿到的是PageTask,再根據(jù)id區(qū)間去A表處理數(shù)據(jù)。

根據(jù)上面的代碼和網(wǎng)格計算原理,得出下面這幅圖:

Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的

如上圖所示,

  • A表只需要全量讀取一次。

  • 子任務(wù)數(shù)量比反面案例少了上千、上萬倍。

  • 子任務(wù)的body非常小,如果recod中有大字段,也少了上千、上萬倍。

綜上,對A表訪問次數(shù)少了好幾倍,對h3存儲壓力少了上萬倍,不但執(zhí)行速度可以快很多,還保證不會把自己本地的h3數(shù)據(jù)庫搞掛。

上述內(nèi)容就是Schedulerx2.0分布式計算原理及最佳實(shí)踐是怎么樣的,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI