溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

SpringBoot2整合ElasticJob框架過(guò)程詳解

發(fā)布時(shí)間:2020-09-01 19:33:55 來(lái)源:腳本之家 閱讀:214 作者:我是小白1 欄目:編程語(yǔ)言

一、ElasticJob

簡(jiǎn)介

1、定時(shí)任務(wù)

在前面的文章中,說(shuō)過(guò)QuartJob這個(gè)定時(shí)任務(wù),被廣泛應(yīng)用的定時(shí)任務(wù)標(biāo)準(zhǔn)。但Quartz核心點(diǎn)在于執(zhí)行定時(shí)任務(wù)并不是在于關(guān)注的業(yè)務(wù)模式和場(chǎng)景,缺少高度自定義的功能。Quartz能夠基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)任務(wù)的高可用,但是不具備分布式并行調(diào)度的功能。

-> QuartJob定時(shí)任務(wù)

2、ElasticJob說(shuō)明基礎(chǔ)簡(jiǎn)介

Elastic-Job 是一個(gè)開(kāi)源的分布式調(diào)度中間件,由兩個(gè)相互獨(dú)立的子項(xiàng)目 Elastic-Job-Lite 和 Elastic-Job-Cloud 組成。Elastic-Job-Lite 為輕量級(jí)無(wú)中心化解決方案,使用 jar 包提供分布式任務(wù)的調(diào)度和治理。 Elastic-Job-Cloud 是一個(gè) Mesos Framework,依托于Mesos額外提供資源治理、應(yīng)用分發(fā)以及進(jìn)程隔離等服務(wù)。

功能特點(diǎn)

  • 分布式調(diào)度
  • 協(xié)調(diào)彈性擴(kuò)容縮容
  • 失效轉(zhuǎn)移
  • 錯(cuò)過(guò)執(zhí)行
  • 作業(yè)重觸發(fā)作業(yè)分片一致性,保證同一分片在分布式環(huán)境中僅一個(gè)執(zhí)行實(shí)例

補(bǔ)刀:人家官網(wǎng)這樣描述的,這里贅述一下,充實(shí)一下文章。

基礎(chǔ)框架結(jié)構(gòu)

該圖片來(lái)自ElasticJob官網(wǎng)。

SpringBoot2整合ElasticJob框架過(guò)程詳解

由圖可知如下內(nèi)容:

需要Zookeeper組件支持,作為分布式的調(diào)度任務(wù),有良好的監(jiān)聽(tīng)機(jī)制,和控制臺(tái),下面的案例也就沖這個(gè)圖解來(lái)。

3、分片管理

這個(gè)概念在ElasticJob中是最具有特點(diǎn)的,實(shí)用性極好。

分片概念

任務(wù)的分布式執(zhí)行,需要將一個(gè)任務(wù)拆分為多個(gè)獨(dú)立的任務(wù)項(xiàng),然后由分布式的服務(wù)器分別執(zhí)行某一個(gè)或幾個(gè)分片項(xiàng)。

場(chǎng)景描述:假設(shè)有服務(wù)3臺(tái),分3片管理,要處理數(shù)據(jù)表100條,那就可以100%3,按照余數(shù)0,1,2分散到三臺(tái)服務(wù)上執(zhí)行,看到這里分庫(kù)分表的基本邏輯涌上心頭,這就是為何很多大牛講說(shuō),編程思維很重要。

個(gè)性化參數(shù)

個(gè)性化參數(shù)即shardingItemParameter,可以和分片項(xiàng)匹配對(duì)應(yīng)關(guān)系,用于將分片項(xiàng)的數(shù)字轉(zhuǎn)換為更加可讀的業(yè)務(wù)代碼。

場(chǎng)景描述:這里猛一讀好像很飄逸,其實(shí)就是這個(gè)意思,如果分3片,取名[0,1,2]不好看,或者不好標(biāo)識(shí),可以分別給個(gè)別名標(biāo)識(shí)一下,[0=A,1=B,2=C]。

二、定時(shí)任務(wù)加載

1、核心依賴(lài)包

這里使用2.0+的版本。

<dependency>
  <groupId>com.dangdang</groupId>
  <artifactId>elastic-job-lite-core</www.lanboylsy.com artifactId>
  <version>2.1.5</version>
</dependency>
<dependency>
  <groupId>com.dangdang<www.yuanyangyul.com /groupId>
  <artifactId>elastic-job-lite-spring<www.lexuancaizc.cn /artifactId>
  <version>2.1.5</version>
</dependency>

2、核心配置文件

這里主要配置一下Zookeeper中間件,分片和分片參數(shù)。

zookeeper:
 server: 127.0.0.1:2181
 namespace: es-job
job-config:
 cron: 0/10 * * * * ?
 shardCount: 1
 shardItem: 0=A,1=B,2=shentuylzc.cn C,3www.yongxinylzn.cn=D

3、自定義注解

看了官方的案例,沒(méi)看到好用的注解,這里只能自己編寫(xiě)一個(gè),基于案例的加載過(guò)程和核心API作為參考。

核心配置類(lèi):

com.dangdang.ddframe.job.lite.config.LiteJobConfiguration

根據(jù)自己想如何使用注解的思路,比如我只想注解定時(shí)任務(wù)名稱(chēng)和Cron表達(dá)式這兩個(gè)功能,其他參數(shù)直接統(tǒng)一配置(這里可能是受QuartJob影響太深,可能根本就是想省事...)

@Inherited
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
public @interface TaskJobSign www.zhuyngyule.cn{

  @AliasFor("cron"www.feiyuptzc.cn)
  String value(www.yinmao2zhuce.cn) default "";

  @AliasFor("value")
  String cron(www.wujiu5zhuce.cn) default "";

  String jobName(www.shengyunyule.cn) default "";

}

4、作業(yè)案例

這里打印一些基本參數(shù),對(duì)照配置和注解,一目了然。

@Component
@TaskJobSign(cron = www.anxing4zc.cn"0/5 * * * * ?",jobName =www.jucaiyle.cn "Hello-Job")
public class HelloJob implements SimpleJob {

  private static final Logger LOG = LoggerFactory.getLogger(HelloJob.class.getName()) ;

  @Override
  public void execute(ShardingContext shardingContext) {
    LOG.info("當(dāng)前線(xiàn)程: "+Thread.currentThread().getId());
    LOG.info("任務(wù)分片:"+shardingContext.getShardingTotalCount());
    LOG.info("當(dāng)前分片:"+shardingContext.getShardingItem());
    LOG.info("分片參數(shù):"+shardingContext.getShardingParameter());
    LOG.info("任務(wù)參數(shù):"+shardingContext.getJobParameter());
  }
}

5、加載定時(shí)任務(wù)

既然自定義注解,那加載過(guò)程自然也要自定義一下,讀取自定義的注解,配置化,加入容器,然后初始化,等著任務(wù)執(zhí)行就好。

@Configuration
public class ElasticJobConfig {

  @Resource
  private ApplicationContext applicationContext ;
  @Resource
  private ZookeeperRegistryCenter zookeeperRegistryCenter;

  @Value("${job-config.cron}") private String cron ;
  @Value("${job-config.shardCount}"www.jucaiylzc.cn) private int shardCount ;
  @Value("${job-config.shardItem}") private String shardItem ;
  
  /**
   * 配置任務(wù)監(jiān)聽(tīng)器
   */
  @Bean
  public ElasticJobListener elasticJobListener() {
    return new TaskJobListener();
  }
  /**
   * 初始化配置任務(wù)
   */
  @PostConstruct
  public void initTaskJob() {
    Map<String, SimpleJob> jobMap = this.applicationContext.getBeansOfType(SimpleJob.class);
    Iterator iterator = jobMap.entrySet().iterator();
    while (iterator.hasNext()) {
      // 自定義注解管理
      Map.Entry<String, SimpleJob> entry = (Map.Entry)iterator.next();
      SimpleJob simpleJob = entry.getValue();
      TaskJobSign taskJobSign = simpleJob.getClass().getAnnotation(TaskJobSign.class);
      if (taskJobSign != null){
        String cron = taskJobSign.cron() ;
        String jobName = taskJobSign.jobName() ;
        // 生成配置
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(
                        JobCoreConfiguration.newBuilder(jobName, cron, shardCount)
                        .shardingItemParameters(shardItem).jobParameter(jobName).build(),
                        simpleJob.getClass().getCanonicalName());
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
                        simpleJobConfiguration).overwrite(true).build();
        TaskJobListener taskJobListener = new TaskJobListener();
        // 初始化任務(wù)
        SpringJobScheduler jobScheduler = new SpringJobScheduler(
                        simpleJob, zookeeperRegistryCenter,
                        liteJobConfiguration, taskJobListener);
        jobScheduler.init();
      }
    }
  }
}

絮叨一句:不要疑問(wèn)這些API是怎么知道,看下官方文檔的案例,他們?cè)趺词褂眠@些核心API,這里就是照著寫(xiě)過(guò)來(lái),就是多一步自定義注解類(lèi)的加載過(guò)程。當(dāng)然官方文檔大致讀一遍還是很有必要的。

補(bǔ)刀一句:如何快速學(xué)習(xí)一些組件的用法,首先找到官方文檔,或者開(kāi)源庫(kù)Wiki,再不濟(jì)ReadMe文檔(如果都沒(méi)有,酌情放棄,另尋其他),熟悉基本功能是否符合自己的需求,如果符合,就看下基本用法案例,熟悉API,最后就是研究自己需要的功能模塊,個(gè)人經(jīng)驗(yàn)來(lái)看,該過(guò)程是彎路最少,坑最少的。

6、任務(wù)監(jiān)聽(tīng)

用法非常簡(jiǎn)單,實(shí)現(xiàn)ElasticJobListener接口。

@Component
public class TaskJobListener implements ElasticJobListener {
  private static final Logger LOG = LoggerFactory.getLogger(TaskJobListener.class);

  private long beginTime = 0;

  @Override
  public void beforeJobExecuted(ShardingContexts shardingContexts) {
    beginTime = System.currentTimeMillis();
    LOG.info(shardingContexts.getJobName()+"===>開(kāi)始...");
  }

  @Override
  public void afterJobExecuted(ShardingContexts shardingContexts) {
    long endTime = System.currentTimeMillis();
    LOG.info(shardingContexts.getJobName()+
    "===>結(jié)束...[耗時(shí):"+(endTime - beginTime)+"]");
  }
}

絮叨一句:before和after執(zhí)行前后,中間執(zhí)行目標(biāo)方法,標(biāo)準(zhǔn)的AOP切面思想,所以底層水平?jīng)Q定了對(duì)上層框架的理解速度,那本《Java編程思想》上的灰塵是不是該擦擦?

三、動(dòng)態(tài)添加

1、作業(yè)任務(wù)

有部分場(chǎng)景需要?jiǎng)討B(tài)添加和管理定時(shí)任務(wù),基于上面的加載流程,在自定義一些步驟就可以。

@Component
public class GetTimeJob implements SimpleJob {

  private static final Logger LOG = LoggerFactory.getLogger(GetTimeJob.class.getName()) ;

  private static final SimpleDateFormat format =
      new SimpleDateFormat("yyyy-MM-dd HH:mm:ss") ;

  @Override
  public void execute(ShardingContext shardingContext) {
    LOG.info("Job Name:"+shardingContext.getJobName());
    LOG.info("Local Time:"+format.format(new Date()));
  }
}

2、添加任務(wù)服務(wù)

這里就動(dòng)態(tài)添加上面的任務(wù)。

@Service
public class TaskJobService {

  @Resource
  private ZookeeperRegistryCenter zookeeperRegistryCenter;

  public void addTaskJob(final String jobName,final SimpleJob simpleJob,
              final String cron,final int shardCount,final String shardItem) {
    // 配置過(guò)程
    JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder(
                          jobName, cron, shardCount)
                          .shardingItemParameters(shardItem).build();
    JobTypeConfiguration jobTypeConfiguration = new SimpleJobConfiguration(jobCoreConfiguration,
                          simpleJob.getClass().getCanonicalName());
    LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(
                          jobTypeConfiguration).overwrite(true).build();
    TaskJobListener taskJobListener = new TaskJobListener();
    // 加載執(zhí)行
    SpringJobScheduler jobScheduler = new SpringJobScheduler(
        simpleJob, zookeeperRegistryCenter,
        liteJobConfiguration, taskJobListener);
    jobScheduler.init();
  }

}

補(bǔ)刀一句:這里添加之后,任務(wù)就會(huì)定時(shí)執(zhí)行,如何停止任務(wù)又是一個(gè)問(wèn)題,可以在任務(wù)名上做一些配置,比如在數(shù)據(jù)庫(kù)生成一條記錄[1,job1,state],如果調(diào)度到state為停止?fàn)顟B(tài)的任務(wù),直接截胡即可。

3、測(cè)試接口

@RestController
public class TaskJobController {

  @Resource
  private TaskJobService taskJobService ;

  @RequestMapping("/addJob")
  public String addJob(@RequestParam("cron") String cron,@RequestParam("jobName") String jobName,
             @RequestParam("shardCount") Integer shardCount,
             @RequestParam("shardItem") String shardItem) {
    taskJobService.addTaskJob(jobName, new GetTimeJob(), cron, shardCount, shardItem);
    return "success";
  }
}

四、源代碼地址

GitHub

·地址https://github.com/cicadasmile/middle-ware-parentGitEE

·地址https://gitee.com/cicadasmile/middle-ware-parent

以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持億速云。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI