您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“如何理解分布式調(diào)度框架Elastic-job”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學(xué)習(xí)“如何理解分布式調(diào)度框架Elastic-job”吧!
一、介紹
之所以產(chǎn)生這樣的結(jié)果,是因為 Quartz 在分布式集群環(huán)境下是通過數(shù)據(jù)庫鎖方式來實現(xiàn)有且只有一個有效的服務(wù)節(jié)點來運行服務(wù),從而保證服務(wù)在集群環(huán)境下定時任務(wù)不會被重復(fù)調(diào)用!
如果需要運行的定時任務(wù)很少的話,使用 Quartz 不會有太大的問題,但是如果 現(xiàn)在有這么一個需求,例如理財產(chǎn)品,每天6點系統(tǒng)需要計算每個賬戶昨天的收益,假如這個理財產(chǎn)品,有幾個億的用戶,如果都在一個服務(wù)實例上跑,可能第二天都無法處理完這項任務(wù)!
類似這樣場景還有很多很多,很顯然 Quartz 很難滿足我們這種大批量、任務(wù)執(zhí)行周期長的任務(wù)調(diào)度!
因此短板,當(dāng)當(dāng)網(wǎng)基于 Quartz 開發(fā)了一套適合在分布式環(huán)境下能高效率的使用服務(wù)器資源的 Elastic-Job 定時任務(wù)框架!
Elastic-Job-Lite最大的亮點就是支持彈性擴容縮容,怎么實現(xiàn)的呢?
比如現(xiàn)在有個任務(wù)要執(zhí)行,如果將任務(wù)進行分片成10個,那么可以同時在10個服務(wù)實例上并行執(zhí)行,互相不影響,從而大大的提升了任務(wù)執(zhí)行效率,并且充分的利用服務(wù)器資源!
對于上面的理財產(chǎn)品,如果這個任務(wù)需要處理1個億用戶,那么我們可以通過水平擴展,比如對任務(wù)進行分片為500,讓500個服務(wù)實例同時運行,每個服務(wù)實例處理20萬條數(shù)據(jù),不出意外的話,1 - 2個小時可以全部跑完,如果時間還是很長,還可以繼續(xù)水平擴張,添加服務(wù)實例來運行!
2015 年,當(dāng)當(dāng)網(wǎng)將其開源,瞬間吸引了一大批程序員的關(guān)注,同時登頂開源中國第一名!
下面我們就一起來了解一下這款使用非常廣泛的分布式調(diào)度框架。
二、項目架構(gòu)介紹
Elastic-Job 最開始只有一個 elastic-job-core 的項目,定位輕量級、無中心化,最核心的服務(wù)就是支持彈性擴容和數(shù)據(jù)分片!
從 2.X 版本以后,主要分為 Elastic-Job-Lite 和 Elastic-Job-Cloud 兩個子項目。
其中,Elastic-Job-Lite 定位為輕量級 無 中 心 化 解 決 方 案 , 使 用jar 包 的 形 式 提 供 分 布 式 任 務(wù) 的 協(xié) 調(diào) 服 務(wù) 。
而 Elastic-Job-Cloud 使用 Mesos + Docker 的解決方案,額外提供資源治理、應(yīng)用分發(fā)以及進程隔離等服務(wù)(跟 Lite 的區(qū)別只是部署方式不同,他們使用相同的 API,只要開發(fā)一次)。
今天我們主要介紹的是Elastic-Job-Lite,最主要的功能特性如下:
分布式調(diào)度協(xié)調(diào):采用 zookeeper 實現(xiàn)注冊中心,進行統(tǒng)一調(diào)度。
支持任務(wù)分片:將需要執(zhí)行的任務(wù)進行分片,實現(xiàn)并行調(diào)度。
支持彈性擴容縮容:將任務(wù)拆分為 n 個任務(wù)項后,各個服務(wù)器分別執(zhí)行各自分配到的任務(wù)項。一旦有新的服務(wù)器加入集群,或現(xiàn)有服務(wù)器下線,elastic-job 將在保留本次任務(wù)執(zhí)行不變的情況下,下次任務(wù)開始前觸發(fā)任務(wù)重分片。
當(dāng)然,還有失效轉(zhuǎn)移、錯過執(zhí)行作業(yè)重觸發(fā)等等功能,大家可以訪問官網(wǎng)文檔,以獲取更多詳細(xì)資料。
應(yīng)用在各自的節(jié)點執(zhí)行任務(wù),通過 zookeeper 注冊中心協(xié)調(diào)。節(jié)點注冊、節(jié)點選舉、任務(wù)分片、監(jiān)聽都在 E-Job 的代碼中完成。下圖是官網(wǎng)提供得架構(gòu)圖。
啥也不用多說了,下面我們直接通過實踐介紹,更容易了解里面是怎么玩的!
三、應(yīng)用實踐
3.1、zookeeper 安裝
elastic-job-lite,是直接依賴 zookeeper 的,因此在開發(fā)之前我們需要先準(zhǔn)備好對應(yīng)的 zookeeper 環(huán)境,關(guān)于 zookeeper 的安裝過程,就不多說了,非常簡單,網(wǎng)上都有教程!
3.2、elastic-job-lite-console 安裝
elastic-job-lite-console,主要是一個任務(wù)作業(yè)可視化界面管理系統(tǒng)。
可以單獨部署,與平臺不關(guān),主要是通過配置注冊中心和數(shù)據(jù)源來抓取數(shù)據(jù)。
獲取的方式也很簡單,直接訪問https://github.com/apache/shardingsphere-elasticjob地址,然后切換到2.1.5的版本號,然后執(zhí)行mvn clean install進行打包,獲取對應(yīng)的安裝包將其解壓,進行bin文件夾啟動服務(wù)即可!
如果你的網(wǎng)速像蝸牛一樣的慢,還有一個辦法就是從這個地址https://gitee.com/elasticjob/elastic-job獲取對應(yīng)的源碼!
啟動服務(wù)后,在瀏覽器訪問http://127.0.0.1:8899,輸入賬戶、密碼(都是root)即可進入控制臺頁面,類似如下界面!
進入之后,將上文所在的 zookeeper 注冊中心進行配置,包括數(shù)據(jù)庫 mysql 的數(shù)據(jù)源也可以配置一下!
3.3、創(chuàng)建工程
本文采用springboot來搭建工程為例,創(chuàng)建工程并添加elastic-job-lite依賴!
<!-- 引入elastic-job-lite核心模塊 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-core</artifactId> <version>2.1.5</version> </dependency> <!-- 使用springframework自定義命名空間時引入 --> <dependency> <groupId>com.dangdang</groupId> <artifactId>elastic-job-lite-spring</artifactId> <version>2.1.5</version> </dependency>
在配置文件application.properties中提前配置好 zookeeper 注冊中心相關(guān)信息!
#zookeeper config zookeeper.serverList=127.0.0.1:2181 zookeeper.namespace=example-elastic-job-test
3.4、新建 ZookeeperConfig 配置類
@Configuration @ConditionalOnExpression("'${zookeeper.serverList}'.length() > 0") public class ZookeeperConfig { /** * zookeeper 配置 * @return */ @Bean(initMethod = "init") public ZookeeperRegistryCenter zookeeperRegistryCenter(@Value("${zookeeper.serverList}") String serverList, @Value("${zookeeper.namespace}") String namespace){ return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace)); } }
3.5、新建任務(wù)處理類
elastic-job支持三種類型的作業(yè)任務(wù)處理!
Simple 類型作業(yè):Simple 類型用于一般任務(wù)的處理,只需實現(xiàn)SimpleJob接口。該接口僅提供單一方法用于覆蓋,此方法將定時執(zhí)行,與Quartz原生接口相似。
Dataflow 類型作業(yè):Dataflow 類型用于處理數(shù)據(jù)流,需實現(xiàn)DataflowJob接口。該接口提供2個方法可供覆蓋,分別用于抓取(fetchData)和處理(processData)數(shù)據(jù)。
Script類型作業(yè):Script 類型作業(yè)意為腳本類型作業(yè),支持 shell,python,perl等所有類型腳本。只需通過控制臺或代碼配置 scriptCommandLine 即可,無需編碼。執(zhí)行腳本路徑可包含參數(shù),參數(shù)傳遞完畢后,作業(yè)框架會自動追加最后一個參數(shù)為作業(yè)運行時信息。
3.6、新建 Simple 類型作業(yè)
編寫一個SimpleJob接口的實現(xiàn)類MySimpleJob,當(dāng)前工作主要是打印一條日志。
@Slf4j public class MySimpleJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.info(String.format("Thread ID: %s, 作業(yè)分片總數(shù): %s, " + "當(dāng)前分片項: %s.當(dāng)前參數(shù): %s," + "作業(yè)名稱: %s.作業(yè)自定義參數(shù): %s" , Thread.currentThread().getId(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingContext.getShardingParameter(), shardingContext.getJobName(), shardingContext.getJobParameter() )); } }
創(chuàng)建一個MyElasticJobListener任務(wù)監(jiān)聽器,用于監(jiān)聽MySimpleJob的任務(wù)執(zhí)行情況。
@Slf4j public class MyElasticJobListener implements ElasticJobListener { private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); } }
創(chuàng)建一個MySimpleJobConfig類,將MySimpleJob其注入到zookeeper。
@Configuration public class MySimpleJobConfig { /** * 任務(wù)名稱 */ @Value("${simpleJob.mySimpleJob.name}") private String mySimpleJobName; /** * cron表達式 */ @Value("${simpleJob.mySimpleJob.cron}") private String mySimpleJobCron; /** * 作業(yè)分片總數(shù) */ @Value("${simpleJob.mySimpleJob.shardingTotalCount}") private int mySimpleJobShardingTotalCount; /** * 作業(yè)分片參數(shù) */ @Value("${simpleJob.mySimpleJob.shardingItemParameters}") private String mySimpleJobShardingItemParameters; /** * 自定義參數(shù) */ @Value("${simpleJob.mySimpleJob.jobParameters}") private String mySimpleJobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MySimpleJob mySimpleJob() { return new MySimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { //配置任務(wù)監(jiān)聽器 MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業(yè)核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定義Lite作業(yè)根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } }
在配置文件application.properties中配置好對應(yīng)的mySimpleJob參數(shù)!
#elastic job #simpleJob類型的job simpleJob.mySimpleJob.name=mySimpleJob simpleJob.mySimpleJob.cron=0/15 * * * * ? simpleJob.mySimpleJob.shardingTotalCount=3 simpleJob.mySimpleJob.shardingItemParameters=0=a,1=b,2=c simpleJob.mySimpleJob.jobParameters=helloWorld
運行程序,看看效果如何?
在上圖demo中,配置的分片數(shù)為3,這個時候會有3個線程進行同時執(zhí)行任務(wù),因為都是在一臺機器上執(zhí)行的,這個任務(wù)被執(zhí)行來3次,下面修改一下端口配置,創(chuàng)建三個相同的服務(wù)實例,在看看效果如下:
很清晰的看到任務(wù)被執(zhí)行一次!
3.7、新建 DataFlowJob 類型作業(yè)
DataFlowJob 類型的任務(wù)配置和SimpleJob類似,操作也很簡單!
創(chuàng)建一個DataflowJob類型的實現(xiàn)類MyDataFlowJob。
@Slf4j public class MyDataFlowJob implements DataflowJob<String> { private boolean flag = false; @Override public List<String> fetchData(ShardingContext shardingContext) { log.info("開始獲取數(shù)據(jù)"); if (flag) { return null; } return Arrays.asList("qingshan", "jack", "seven"); } @Override public void processData(ShardingContext shardingContext, List<String> data) { for (String val : data) { // 處理完數(shù)據(jù)要移除掉,不然就會一直跑,處理可以在上面的方法里執(zhí)行。這里采用 flag log.info("開始處理數(shù)據(jù):" + val); } flag = true; } }
接著創(chuàng)建MyDataFlowJob的配置類,將其注入到zookeeper注冊中心。
Configuration public class MyDataFlowJobConfig { /** * 任務(wù)名稱 */ @Value("${dataflowJob.myDataflowJob.name}") private String jobName; /** * cron表達式 */ @Value("${dataflowJob.myDataflowJob.cron}") private String jobCron; /** * 作業(yè)分片總數(shù) */ @Value("${dataflowJob.myDataflowJob.shardingTotalCount}") private int jobShardingTotalCount; /** * 作業(yè)分片參數(shù) */ @Value("${dataflowJob.myDataflowJob.shardingItemParameters}") private String jobShardingItemParameters; /** * 自定義參數(shù) */ @Value("${dataflowJob.myDataflowJob.jobParameters}") private String jobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean public MyDataFlowJob myDataFlowJob() { return new MyDataFlowJob(); } @Bean(initMethod = "init") public JobScheduler dataFlowJobScheduler(final MyDataFlowJob myDataFlowJob) { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(myDataFlowJob, registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業(yè)核心配置 JobCoreConfiguration dataflowCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); // 定義DATAFLOW類型配置 DataflowJobConfiguration dataflowJobConfig = new DataflowJobConfiguration(dataflowCoreConfig, MyDataFlowJob.class.getCanonicalName(), false); // 定義Lite作業(yè)根配置 LiteJobConfiguration dataflowJobRootConfig = LiteJobConfiguration.newBuilder(dataflowJobConfig).overwrite(true).build(); return dataflowJobRootConfig; } }
最后,在配置文件application.properties中配置好對應(yīng)的myDataflowJob參數(shù)!
#dataflow類型的job dataflowJob.myDataflowJob.name=myDataflowJob dataflowJob.myDataflowJob.cron=0/15 * * * * ? dataflowJob.myDataflowJob.shardingTotalCount=1 dataflowJob.myDataflowJob.shardingItemParameters=0=a,1=b,2=c dataflowJob.myDataflowJob.jobParameters=myDataflowJobParamter
運行程序,看看效果如何?
需要注意的地方是,如果配置的是流式處理類型,它會不停的拉取數(shù)據(jù)、處理數(shù)據(jù),在拉取的時候,如果返回為空,就不會處理數(shù)據(jù)!
如果配置的是非流式處理類型,和上面介紹的simpleJob類型,處理一樣!
3.8、新建 ScriptJob 類型作業(yè)
ScriptJob 類型的任務(wù)配置和上面類似,主要是用于定時執(zhí)行某個腳本,一般用的比較少!
因為目標(biāo)是腳本,沒有執(zhí)行的任務(wù),所以無需編寫任務(wù)作業(yè)類型!
只需要編寫一個ScriptJob類型的配置類即可,命令是echo 'Hello World !內(nèi)容!
@Configuration public class MyScriptJobConfig { /** * 任務(wù)名稱 */ @Value("${scriptJob.myScriptJob.name}") private String jobName; /** * cron表達式 */ @Value("${scriptJob.myScriptJob.cron}") private String jobCron; /** * 作業(yè)分片總數(shù) */ @Value("${scriptJob.myScriptJob.shardingTotalCount}") private int jobShardingTotalCount; /** * 作業(yè)分片參數(shù) */ @Value("${scriptJob.myScriptJob.shardingItemParameters}") private String jobShardingItemParameters; /** * 自定義參數(shù) */ @Value("${scriptJob.myScriptJob.jobParameters}") private String jobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Bean(initMethod = "init") public JobScheduler scriptJobScheduler() { MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new JobScheduler(registryCenter, getLiteJobConfiguration(), elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業(yè)核心配置 JobCoreConfiguration scriptCoreConfig = JobCoreConfiguration.newBuilder(jobName, jobCron, jobShardingTotalCount). shardingItemParameters(jobShardingItemParameters).jobParameter(jobParameters).build(); // 定義SCRIPT類型配置 ScriptJobConfiguration scriptJobConfig = new ScriptJobConfiguration(scriptCoreConfig, "echo 'Hello World !'"); // 定義Lite作業(yè)根配置 LiteJobConfiguration scriptJobRootConfig = LiteJobConfiguration.newBuilder(scriptJobConfig).overwrite(true).build(); return scriptJobRootConfig; } }
在配置文件application.properties中配置好對應(yīng)的myScriptJob參數(shù)!
#script類型的job scriptJob.myScriptJob.name=myScriptJob scriptJob.myScriptJob.cron=0/15 * * * * ? scriptJob.myScriptJob.shardingTotalCount=3 scriptJob.myScriptJob.shardingItemParameters=0=a,1=b,2=c scriptJob.myScriptJob.jobParameters=myScriptJobParamter
運行程序,看看效果如何?
3.9、將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫
可能有的人會發(fā)出疑問,elastic-job是如何存儲數(shù)據(jù)的,用ZooInspector客戶端鏈接zookeeper注冊中心,你發(fā)現(xiàn)對應(yīng)的任務(wù)配置被存儲到相應(yīng)的樹根上!
而具體作業(yè)任務(wù)執(zhí)行軌跡和狀態(tài)結(jié)果是不會存儲到zookeeper,需要我們在項目中通過數(shù)據(jù)源方式進行持久化!
將任務(wù)狀態(tài)持久化到數(shù)據(jù)庫配置過程也很簡單,只需要在對應(yīng)的配置類上注入數(shù)據(jù)源即可,以MySimpleJobConfig為例,代碼如下:
@Configuration public class MySimpleJobConfig { /** * 任務(wù)名稱 */ @Value("${simpleJob.mySimpleJob.name}") private String mySimpleJobName; /** * cron表達式 */ @Value("${simpleJob.mySimpleJob.cron}") private String mySimpleJobCron; /** * 作業(yè)分片總數(shù) */ @Value("${simpleJob.mySimpleJob.shardingTotalCount}") private int mySimpleJobShardingTotalCount; /** * 作業(yè)分片參數(shù) */ @Value("${simpleJob.mySimpleJob.shardingItemParameters}") private String mySimpleJobShardingItemParameters; /** * 自定義參數(shù) */ @Value("${simpleJob.mySimpleJob.jobParameters}") private String mySimpleJobParameters; @Autowired private ZookeeperRegistryCenter registryCenter; @Autowired private DataSource dataSource;; @Bean public MySimpleJob stockJob() { return new MySimpleJob(); } @Bean(initMethod = "init") public JobScheduler simpleJobScheduler(final MySimpleJob mySimpleJob) { //添加事件數(shù)據(jù)源配置 JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource); MyElasticJobListener elasticJobListener = new MyElasticJobListener(); return new SpringJobScheduler(mySimpleJob, registryCenter, getLiteJobConfiguration(), jobEventConfig, elasticJobListener); } private LiteJobConfiguration getLiteJobConfiguration() { // 定義作業(yè)核心配置 JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder(mySimpleJobName, mySimpleJobCron, mySimpleJobShardingTotalCount). shardingItemParameters(mySimpleJobShardingItemParameters).jobParameter(mySimpleJobParameters).build(); // 定義SIMPLE類型配置 SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, MySimpleJob.class.getCanonicalName()); // 定義Lite作業(yè)根配置 LiteJobConfiguration simpleJobRootConfig = LiteJobConfiguration.newBuilder(simpleJobConfig).overwrite(true).build(); return simpleJobRootConfig; } }
同時,需要在配置文件application.properties中配置好對應(yīng)的datasource參數(shù)!
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/example-elastic-job-test spring.datasource.username=root spring.datasource.password=root spring.datasource.driver-class-name=com.mysql.jdbc.Driver
運行程序,然后在elastic-job-lite-console控制臺配置對應(yīng)的數(shù)據(jù)源!
最后,點擊【作業(yè)軌跡】即可查看對應(yīng)作業(yè)執(zhí)行情況!
到此,相信大家對“如何理解分布式調(diào)度框架Elastic-job”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。