溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

如何使用SpringBoot和SpringBatch

發(fā)布時(shí)間:2021-05-17 17:18:19 來源:億速云 閱讀:146 作者:Leah 欄目:編程語言

如何使用SpringBoot和SpringBatch ?針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。

什么是Spring Batch

Spring Batch 是一個(gè)輕量級的、完善的批處理框架,旨在幫助企業(yè)建立健壯、高效的批處理應(yīng)用。Spring Batch是Spring的一個(gè)子項(xiàng)目,使用Java語言并基于Spring框架為基礎(chǔ)開發(fā),使的已經(jīng)使用 Spring 框架的開發(fā)者或者企業(yè)更容易訪問和利用企業(yè)服務(wù)。

Spring Batch 提供了大量可重用的組件,包括了日志、追蹤、事務(wù)、任務(wù)作業(yè)統(tǒng)計(jì)、任務(wù)重啟、跳過、重復(fù)、資源管理。對于大數(shù)據(jù)量和高性能的批處理任務(wù),Spring Batch 同樣提供了高級功能和特性來支持,比如分區(qū)功能、遠(yuǎn)程功能??傊?通過 Spring Batch 能夠支持簡單的、復(fù)雜的和大數(shù)據(jù)量的批處理作業(yè)。

Spring Batch 使用

我們首先配置Spring Batch 在Spring Boot 中的使用,數(shù)據(jù)庫用的是mysql,pom文件如下,因?yàn)镾pring Boot 中的Spring Batch 包含 hsqsldb 所以我們將其去除

<dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-batch</artifactId>
      <exclusions> <!-- 注意這里-->
        <exclusion>
          <groupId>org.hsqldb</groupId>
          <artifactId>hsqldb</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-jdbc</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
<dependency>
    <groupId>org.hibernate</groupId>
    <artifactId>hibernate-validator</artifactId>
  </dependency>
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.21</version>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
  </dependency>

配置好我們需要的實(shí)體類。頁面就不展示了。

如果有數(shù)據(jù)校驗(yàn)添加的話那么我們需要配置自定義的檢驗(yàn)器。若果沒有課略過該步驟

public class CsvBeanValidator<T> implements Validator<T>,InitializingBean {
  private javax.validation.Validator  validator;
  @Override
  public void validate(T value) throws ValidationException {
    Set<ConstraintViolation<T >> constraintViolations=validator.validate(value);
    if(constraintViolations.size()>0){
      StringBuilder message=new StringBuilder();
      for(ConstraintViolation<T> constraintViolation:constraintViolations){
        message.append(constraintViolation.getMessage() +"\n");
      }
      throw new ValidationException(message.toString());
    }
  }
  //在這里我們使用的是JSR-303校驗(yàn)數(shù)據(jù),在此進(jìn)行初始化
  @Override
  public void afterPropertiesSet() throws Exception { 
    ValidatorFactory validatorFactory= Validation.buildDefaultValidatorFactory();
    validator=validatorFactory.usingContext().getValidator();
  }
}
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {
  @Override
  public Person process(Person item) throws ValidationException {
     super.process(item); // 在這里啟動 然后才會調(diào)用我們自定義的校驗(yàn)器,否則不能通過 。
     if (item.getNation().equals("漢族")){
       item.setName("01");
     }else{
       item.setNation("02");
     }
     return item;
  }
}

進(jìn)行job任務(wù)監(jiān)聽 自定義類實(shí)現(xiàn)JobExecutionListener 即可

long startTime;
 long endTime;
 @Override
 public void beforeJob(JobExecution jobExecution) {
   startTime = System.currentTimeMillis();
   System.out.println("任務(wù)處理開始");
 }
 @Override
 public void afterJob(JobExecution jobExecution) {
   endTime = System.currentTimeMillis();
   System.out.println("耗時(shí)多長時(shí)間:" + (endTime - startTime) + "ms");
   System.out.println("任務(wù)處理結(jié)束");
 }

進(jìn)行Spring Batch 的注入 方法有xml文件注入bean ,在這里選擇java注入

@Configuration
@EnableBatchProcessing //開啟批處理
public class CsvBatchConfig {
  /**1 首先我們通過 FlatFileItemReader 讀取我們需要的文件 通過setResource來實(shí)現(xiàn)
   * 2 設(shè)置map 在這里通過先設(shè)置解析器 setLineTokenizer 來解析我們csv文件中的數(shù)   據(jù)
   * 3 setFieldSetMapper 將我們需要的數(shù)據(jù)轉(zhuǎn)化為我們的實(shí)體對象 存儲
   * 4 如果想 跳過前面的幾行 需要使用setLinesToSkip就可以實(shí)現(xiàn) 
   */ 
 @Bean
 public ItemReader<Person> reader() throws Exception {
   FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //1
   reader.setResource(new ClassPathResource("people.csv")); //2
     reader.setLineMapper(new DefaultLineMapper<Person>() {{ //3
       setLineTokenizer(new DelimitedLineTokenizer() {{
         setNames(new String[] { "name","age", "nation" ,"address"});
       }});
       setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
         setTargetType(Person.class);
       }});
     }});
       reader.setLinesToSkip(3); 
     return reader;
 }
 @Bean
 public ItemProcessor<Person, Person> processor() {
   CsvItemProcessor processor = new CsvItemProcessor(); //1
   processor.setValidator(csvBeanValidator()); //2
   return processor;
 }
   /** 
    *寫入數(shù)據(jù)到數(shù)據(jù)庫中
    * 1執(zhí)行的sql 語句 2 設(shè)置數(shù)據(jù)源 
     */
 @Bean
 public ItemWriter<Person> writer(DataSource dataSource) {//1
   JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //2
   writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>());
   String sql = "insert into person " + "(id,name,age,nation,address) "
       + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)";
   writer.setSql(sql); //3
   writer.setDataSource(dataSource);
   return writer;
 }
  // 作業(yè)的倉庫 就是設(shè)置數(shù)據(jù)源
 @Bean
 public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager)
     throws Exception {
   JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
   jobRepositoryFactoryBean.setDataSource(dataSource);
   jobRepositoryFactoryBean.setTransactionManager(transactionManager);
   jobRepositoryFactoryBean.setDatabaseType("mysql");
   return jobRepositoryFactoryBean.getObject();
 }
   //調(diào)度器 使用它來執(zhí)行 我們的批處理
 @Bean
 public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager)
     throws Exception {
   SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
   jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
   return jobLauncher;
 }
   //將監(jiān)聽器加入到j(luò)ob中
 @Bean
 public Job importJob(JobBuilderFactory jobs, Step s1) {
   return jobs.get("importJob")
       .incrementer(new RunIdIncrementer())
       .flow(s1) //1
       .end()
       .listener(csvJobListener()) //2
       .build();
 }
   //步驟綁定 reader 與writer 一次性處理65000條記錄
 @Bean
 public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer,
     ItemProcessor<Person,Person> processor) {
   return stepBuilderFactory
       .get("step1")
       .<Person, Person>chunk(65000) //1
       .reader(reader) //2
       .processor(processor) //3
       .writer(writer) //4
       .build();
 }
 @Bean
 public CsvJobListener csvJobListener() {
   return new CsvJobListener();
 }
 @Bean
 public Validator<Person> csvBeanValidator() {
   return new CsvBeanValidator<Person>();
 }
}

在配置文件中 啟動自動執(zhí)行批處理

spring.batch.job.names = job1,job2 #啟動時(shí)要執(zhí)行的Job,默認(rèn)執(zhí)行全部Job

spring.batch.job.enabled=true #是否自動執(zhí)行定義的Job,默認(rèn)是

spring.batch.initializer.enabled=true #是否初始化Spring Batch的數(shù)據(jù)庫,默認(rèn)為是

spring.batch.schema=

spring.batch.table-prefix= #設(shè)置SpringBatch的數(shù)據(jù)庫表的前綴

springboot是什么

springboot一種全新的編程規(guī)范,其設(shè)計(jì)目的是用來簡化新Spring應(yīng)用的初始搭建以及開發(fā)過程,SpringBoot也是一個(gè)服務(wù)于框架的框架,服務(wù)范圍是簡化配置文件。

關(guān)于如何使用SpringBoot和SpringBatch 問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI