您好,登錄后才能下訂單哦!
在對調(diào)度系統(tǒng)架構(gòu)說明之前,我們先來認識一下調(diào)度系統(tǒng)常用的名詞
DAG: 全稱Directed Acyclic Graph,簡稱DAG。工作流中的Task任務以有向無環(huán)圖的形式組裝起來,從入度為零的節(jié)點進行拓撲遍歷,直到無后繼節(jié)點為止。舉例如下圖:
流程定義:通過拖拽任務節(jié)點并建立任務節(jié)點的關(guān)聯(lián)所形成的可視化DAG
流程實例:流程實例是流程定義的實例化,可以通過手動啟動或定時調(diào)度生成
任務實例:任務實例是流程定義中任務節(jié)點的實例化,標識著具體的任務執(zhí)行狀態(tài)
任務類型: 目前支持有SHELL、SQL、SUB_PROCESS、PROCEDURE、MR、SPARK、PYTHON、DEPENDENT,同時計劃支持動態(tài)插件擴展,注意:其中子 SUB_PROCESS 也是一個單獨的流程定義,是可以單獨啟動執(zhí)行的
調(diào)度方式: 系統(tǒng)支持基于cron表達式的定時調(diào)度和手動調(diào)度。命令類型支持:啟動工作流、從當前節(jié)點開始執(zhí)行、恢復被容錯的工作流、恢復暫停流程、從失敗節(jié)點開始執(zhí)行、補數(shù)、調(diào)度、重跑、暫停、停止、恢復等待線程。其中 恢復被容錯的工作流 和 恢復等待線程 兩種命令類型是由調(diào)度內(nèi)部控制使用,外部無法調(diào)用
定時調(diào)度:系統(tǒng)采用 quartz 分布式調(diào)度器,并同時支持cron表達式可視化的生成
依賴:系統(tǒng)不單單支持 DAG 簡單的前驅(qū)和后繼節(jié)點之間的依賴,同時還提供任務依賴節(jié)點,支持流程間的自定義任務依賴
優(yōu)先級 :支持流程實例和任務實例的優(yōu)先級,如果流程實例和任務實例的優(yōu)先級不設置,則默認是先進先出
郵件告警:支持 SQL任務 查詢結(jié)果郵件發(fā)送,流程實例運行結(jié)果郵件告警及容錯告警通知
失敗策略:對于并行運行的任務,如果有任務失敗,提供兩種失敗策略處理方式,繼續(xù)是指不管并行運行任務的狀態(tài),直到流程失敗結(jié)束。結(jié)束是指一旦發(fā)現(xiàn)失敗任務,則同時Kill掉正在運行的并行任務,流程失敗結(jié)束
補數(shù):補歷史數(shù)據(jù),支持區(qū)間并行和串行兩種補數(shù)方式
MasterServer
MasterServer采用分布式無中心設計理念,MasterServer主要負責 DAG 任務切分、任務提交監(jiān)控,并同時監(jiān)聽其它MasterServer和WorkerServer的健康狀態(tài)。
MasterServer服務啟動時向Zookeeper注冊臨時節(jié)點,通過監(jiān)聽Zookeeper臨時節(jié)點變化來進行容錯處理。
Distributed Quartz分布式調(diào)度組件,主要負責定時任務的啟停操作,當quartz調(diào)起任務后,Master內(nèi)部會有線程池具體負責處理任務的后續(xù)操作
MasterSchedulerThread是一個掃描線程,定時掃描數(shù)據(jù)庫中的 command 表,根據(jù)不同的命令類型進行不同的業(yè)務操作
MasterExecThread主要是負責DAG任務切分、任務提交監(jiān)控、各種不同命令類型的邏輯處理
WorkerServer
WorkerServer也采用分布式無中心設計理念,WorkerServer主要負責任務的執(zhí)行和提供日志服務。WorkerServer服務啟動時向Zookeeper注冊臨時節(jié)點,并維持心跳。
FetchTaskThread主要負責不斷從Task Queue中領取任務,并根據(jù)不同任務類型調(diào)用TaskScheduleThread對應執(zhí)行器。
ZooKeeper
ZooKeeper服務,系統(tǒng)中的MasterServer和WorkerServer節(jié)點都通過ZooKeeper來進行集群管理和容錯。另外系統(tǒng)還基于ZooKeeper進行事件監(jiān)聽和分布式鎖。
我們也曾經(jīng)基于Redis實現(xiàn)過隊列,不過我們希望EasyScheduler依賴到的組件盡量地少,所以最后還是去掉了Redis實現(xiàn)。
Task Queue
提供任務隊列的操作,目前隊列也是基于Zookeeper來實現(xiàn)。由于隊列中存的信息較少,不必擔心隊列里數(shù)據(jù)過多的情況,實際上我們壓測過百萬級數(shù)據(jù)存隊列,對系統(tǒng)穩(wěn)定性和性能沒影響。
Alert
提供告警相關(guān)接口,接口主要包括告警兩種類型的告警數(shù)據(jù)的存儲、查詢和通知功能。其中通知功能又有郵件通知和SNMP(暫未實現(xiàn))兩種。
API
API接口層,主要負責處理前端UI層的請求。該服務統(tǒng)一提供RESTful api向外部提供請求服務。
接口包括工作流的創(chuàng)建、定義、查詢、修改、發(fā)布、下線、手工啟動、停止、暫停、恢復、從該節(jié)點開始執(zhí)行等等。
UI
系統(tǒng)的前端頁面,提供系統(tǒng)的各種可視化操作界面,詳見系統(tǒng)使用手冊部分。
中心化的設計理念比較簡單,分布式集群中的節(jié)點按照角色分工,大體上分為兩種角色:
中心化思想設計存在的問題:
實際上,真正去中心化的分布式系統(tǒng)并不多見。反而動態(tài)中心化分布式系統(tǒng)正在不斷涌出。在這種架構(gòu)下,集群中的管理者是被動態(tài)選擇出來的,而不是預置的,并且集群在發(fā)生故障的時候,集群的節(jié)點會自發(fā)的舉行"會議"來選舉新的"管理者"去主持工作。最典型的案例就是ZooKeeper及Go語言實現(xiàn)的Etcd。
EasyScheduler使用ZooKeeper分布式鎖來實現(xiàn)同一時刻只有一臺Master執(zhí)行Scheduler,或者只有一臺Worker執(zhí)行任務的提交。
上圖中MainFlowThread等待SubFlowThread1結(jié)束,SubFlowThread1等待SubFlowThread2結(jié)束, SubFlowThread2等待SubFlowThread3結(jié)束,而SubFlowThread3等待線程池有新線程,則整個DAG流程不能結(jié)束,從而其中的線程也不能釋放。這樣就形成的子父流程循環(huán)等待的狀態(tài)。此時除非啟動新的Master來增加線程來打破這樣的”僵局”,否則調(diào)度集群將不能再使用。
對于啟動新Master來打破僵局,似乎有點差強人意,于是我們提出了以下三種方案來降低這種風險:
注意:Master Scheduler線程在獲取Command的時候是FIFO的方式執(zhí)行的。
于是我們選擇了第三種方式來解決線程不足的問題。
容錯分為服務宕機容錯和任務重試,服務宕機容錯又分為Master容錯和Worker容錯兩種情況
服務容錯設計依賴于ZooKeeper的Watcher機制,實現(xiàn)原理如圖:
其中Master監(jiān)控其他Master和Worker的目錄,如果監(jiān)聽到remove事件,則會根據(jù)具體的業(yè)務邏輯進行流程實例容錯或者任務實例容錯。
ZooKeeper Master容錯完成之后則重新由EasyScheduler中Scheduler線程調(diào)度,遍歷 DAG 找到”正在運行”和“提交成功”的任務,對”正在運行”的任務監(jiān)控其任務實例的狀態(tài),對”提交成功”的任務需要判斷Task Queue中是否已經(jīng)存在,如果存在則同樣監(jiān)控任務實例的狀態(tài),如果不存在則重新提交任務實例。
Master Scheduler線程一旦發(fā)現(xiàn)任務實例為” 需要容錯”狀態(tài),則接管任務并進行重新提交。
注意:由于” 網(wǎng)絡抖動”可能會使得節(jié)點短時間內(nèi)失去和ZooKeeper的心跳,從而發(fā)生節(jié)點的remove事件。對于這種情況,我們使用最簡單的方式,那就是節(jié)點一旦和ZooKeeper發(fā)生超時連接,則直接將Master或Worker服務停掉。
這里首先要區(qū)分任務失敗重試、流程失敗恢復、流程失敗重跑的概念:
接下來說正題,我們將工作流中的任務節(jié)點分了兩種類型。
一種是業(yè)務節(jié)點,這種節(jié)點都對應一個實際的腳本或者處理語句,比如Shell節(jié)點,MR節(jié)點、Spark節(jié)點、依賴節(jié)點等。
每一個業(yè)務節(jié)點都可以配置失敗重試的次數(shù),當該任務節(jié)點失敗,會自動重試,直到成功或者超過配置的重試次數(shù)。邏輯節(jié)點不支持失敗重試。但是邏輯節(jié)點里的任務支持重試。
如果工作流中有任務失敗達到最大重試次數(shù),工作流就會失敗停止,失敗的工作流可以手動進行重跑操作或者流程恢復操作
在早期調(diào)度設計中,如果沒有優(yōu)先級設計,采用公平調(diào)度設計的話,會遇到先行提交的任務可能會和后繼提交的任務同時完成的情況,而不能做到設置流程或者任務的優(yōu)先級,因此我們對此進行了重新設計,目前我們設計如下:
按照不同流程實例優(yōu)先級優(yōu)先于同一個流程實例優(yōu)先級優(yōu)先于同一流程內(nèi)任務優(yōu)先級優(yōu)先于同一流程內(nèi)任務提交順序依次從高到低進行任務處理。
具體實現(xiàn)是根據(jù)任務實例的json解析優(yōu)先級,然后把流程實例優(yōu)先級_流程實例id_任務優(yōu)先級_任務id信息保存在ZooKeeper任務隊列中,當從任務隊列獲取的時候,通過字符串比較即可得出最需要優(yōu)先執(zhí)行的任務
- 任務的優(yōu)先級也分為5級,依次為HIGHEST、HIGH、MEDIUM、LOW、LOWEST。如下圖
由于Web(UI)和Worker不一定在同一臺機器上,所以查看日志不能像查詢本地文件那樣。有兩種方案:
FileAppender主要實現(xiàn)如下:
/**
* task log appender
*/
public class TaskLogAppender extends FileAppender<ILoggingEvent {
...
@Override
protected void append(ILoggingEvent event) {
if (currentlyActiveFile == null){
currentlyActiveFile = getFile();
}
String activeFile = currentlyActiveFile;
// thread name: taskThreadName-processDefineId_processInstanceId_taskInstanceId
String threadName = event.getThreadName();
String[] threadNameArr = threadName.split("-");
// logId = processDefineId_processInstanceId_taskInstanceId
String logId = threadNameArr[1];
...
super.subAppend(event);
}
}
以/流程定義id/流程實例id/任務實例id.log的形式生成日志
過濾匹配以TaskLogInfo開始的線程名稱:
TaskLogFilter實現(xiàn)如下:
/**
* task log filter
*/
public class TaskLogFilter extends Filter<ILoggingEvent {
@Override
public FilterReply decide(ILoggingEvent event) {
if (event.getThreadName().startsWith("TaskLogInfo-")){
return FilterReply.ACCEPT;
}
return FilterReply.DENY;
}
}
本文從調(diào)度出發(fā),初步介紹了大數(shù)據(jù)分布式工作流調(diào)度系統(tǒng)--EasyScheduler的架構(gòu)原理及實現(xiàn)思路。
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。