您好,登錄后才能下訂單哦!
如何使用SpringBoot和SpringBatch ?針對這個(gè)問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問題的小伙伴找到更簡單易行的方法。
什么是Spring Batch
Spring Batch 是一個(gè)輕量級的、完善的批處理框架,旨在幫助企業(yè)建立健壯、高效的批處理應(yīng)用。Spring Batch是Spring的一個(gè)子項(xiàng)目,使用Java語言并基于Spring框架為基礎(chǔ)開發(fā),使的已經(jīng)使用 Spring 框架的開發(fā)者或者企業(yè)更容易訪問和利用企業(yè)服務(wù)。
Spring Batch 提供了大量可重用的組件,包括了日志、追蹤、事務(wù)、任務(wù)作業(yè)統(tǒng)計(jì)、任務(wù)重啟、跳過、重復(fù)、資源管理。對于大數(shù)據(jù)量和高性能的批處理任務(wù),Spring Batch 同樣提供了高級功能和特性來支持,比如分區(qū)功能、遠(yuǎn)程功能??傊?通過 Spring Batch 能夠支持簡單的、復(fù)雜的和大數(shù)據(jù)量的批處理作業(yè)。
Spring Batch 使用
我們首先配置Spring Batch 在Spring Boot 中的使用,數(shù)據(jù)庫用的是mysql,pom文件如下,因?yàn)镾pring Boot 中的Spring Batch 包含 hsqsldb 所以我們將其去除
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> <exclusions> <!-- 注意這里--> <exclusion> <groupId>org.hsqldb</groupId> <artifactId>hsqldb</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.hibernate</groupId> <artifactId>hibernate-validator</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.21</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
配置好我們需要的實(shí)體類。頁面就不展示了。
如果有數(shù)據(jù)校驗(yàn)添加的話那么我們需要配置自定義的檢驗(yàn)器。若果沒有課略過該步驟
public class CsvBeanValidator<T> implements Validator<T>,InitializingBean { private javax.validation.Validator validator; @Override public void validate(T value) throws ValidationException { Set<ConstraintViolation<T >> constraintViolations=validator.validate(value); if(constraintViolations.size()>0){ StringBuilder message=new StringBuilder(); for(ConstraintViolation<T> constraintViolation:constraintViolations){ message.append(constraintViolation.getMessage() +"\n"); } throw new ValidationException(message.toString()); } } //在這里我們使用的是JSR-303校驗(yàn)數(shù)據(jù),在此進(jìn)行初始化 @Override public void afterPropertiesSet() throws Exception { ValidatorFactory validatorFactory= Validation.buildDefaultValidatorFactory(); validator=validatorFactory.usingContext().getValidator(); } } public class CsvItemProcessor extends ValidatingItemProcessor<Person> { @Override public Person process(Person item) throws ValidationException { super.process(item); // 在這里啟動 然后才會調(diào)用我們自定義的校驗(yàn)器,否則不能通過 。 if (item.getNation().equals("漢族")){ item.setName("01"); }else{ item.setNation("02"); } return item; } }
進(jìn)行job任務(wù)監(jiān)聽 自定義類實(shí)現(xiàn)JobExecutionListener 即可
long startTime; long endTime; @Override public void beforeJob(JobExecution jobExecution) { startTime = System.currentTimeMillis(); System.out.println("任務(wù)處理開始"); } @Override public void afterJob(JobExecution jobExecution) { endTime = System.currentTimeMillis(); System.out.println("耗時(shí)多長時(shí)間:" + (endTime - startTime) + "ms"); System.out.println("任務(wù)處理結(jié)束"); }
進(jìn)行Spring Batch 的注入 方法有xml文件注入bean ,在這里選擇java注入
@Configuration @EnableBatchProcessing //開啟批處理 public class CsvBatchConfig { /**1 首先我們通過 FlatFileItemReader 讀取我們需要的文件 通過setResource來實(shí)現(xiàn) * 2 設(shè)置map 在這里通過先設(shè)置解析器 setLineTokenizer 來解析我們csv文件中的數(shù) 據(jù) * 3 setFieldSetMapper 將我們需要的數(shù)據(jù)轉(zhuǎn)化為我們的實(shí)體對象 存儲 * 4 如果想 跳過前面的幾行 需要使用setLinesToSkip就可以實(shí)現(xiàn) */ @Bean public ItemReader<Person> reader() throws Exception { FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>(); //1 reader.setResource(new ClassPathResource("people.csv")); //2 reader.setLineMapper(new DefaultLineMapper<Person>() {{ //3 setLineTokenizer(new DelimitedLineTokenizer() {{ setNames(new String[] { "name","age", "nation" ,"address"}); }}); setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{ setTargetType(Person.class); }}); }}); reader.setLinesToSkip(3); return reader; } @Bean public ItemProcessor<Person, Person> processor() { CsvItemProcessor processor = new CsvItemProcessor(); //1 processor.setValidator(csvBeanValidator()); //2 return processor; } /** *寫入數(shù)據(jù)到數(shù)據(jù)庫中 * 1執(zhí)行的sql 語句 2 設(shè)置數(shù)據(jù)源 */ @Bean public ItemWriter<Person> writer(DataSource dataSource) {//1 JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>(); //2 writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<Person>()); String sql = "insert into person " + "(id,name,age,nation,address) " + "values(hibernate_sequence.nextval, :name, :age, :nation,:address)"; writer.setSql(sql); //3 writer.setDataSource(dataSource); return writer; } // 作業(yè)的倉庫 就是設(shè)置數(shù)據(jù)源 @Bean public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean(); jobRepositoryFactoryBean.setDataSource(dataSource); jobRepositoryFactoryBean.setTransactionManager(transactionManager); jobRepositoryFactoryBean.setDatabaseType("mysql"); return jobRepositoryFactoryBean.getObject(); } //調(diào)度器 使用它來執(zhí)行 我們的批處理 @Bean public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception { SimpleJobLauncher jobLauncher = new SimpleJobLauncher(); jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager)); return jobLauncher; } //將監(jiān)聽器加入到j(luò)ob中 @Bean public Job importJob(JobBuilderFactory jobs, Step s1) { return jobs.get("importJob") .incrementer(new RunIdIncrementer()) .flow(s1) //1 .end() .listener(csvJobListener()) //2 .build(); } //步驟綁定 reader 與writer 一次性處理65000條記錄 @Bean public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person,Person> processor) { return stepBuilderFactory .get("step1") .<Person, Person>chunk(65000) //1 .reader(reader) //2 .processor(processor) //3 .writer(writer) //4 .build(); } @Bean public CsvJobListener csvJobListener() { return new CsvJobListener(); } @Bean public Validator<Person> csvBeanValidator() { return new CsvBeanValidator<Person>(); } }
在配置文件中 啟動自動執(zhí)行批處理
spring.batch.job.names = job1,job2 #啟動時(shí)要執(zhí)行的Job,默認(rèn)執(zhí)行全部Job
spring.batch.job.enabled=true #是否自動執(zhí)行定義的Job,默認(rèn)是
spring.batch.initializer.enabled=true #是否初始化Spring Batch的數(shù)據(jù)庫,默認(rèn)為是
spring.batch.schema=
spring.batch.table-prefix= #設(shè)置SpringBatch的數(shù)據(jù)庫表的前綴
springboot一種全新的編程規(guī)范,其設(shè)計(jì)目的是用來簡化新Spring應(yīng)用的初始搭建以及開發(fā)過程,SpringBoot也是一個(gè)服務(wù)于框架的框架,服務(wù)范圍是簡化配置文件。
關(guān)于如何使用SpringBoot和SpringBatch 問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注億速云行業(yè)資訊頻道了解更多相關(guān)知識。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。