您好,登錄后才能下訂單哦!
1??? 場景說明
讀取CVS文件,經(jīng)過處理后,保存到數(shù)據(jù)庫。
?
2??? 項目結(jié)構(gòu)
應(yīng)用程序 | 啟動主程序 | DemoApplication.java |
讀取文件(輸入文件) | UserItemReader.java | |
處理數(shù)據(jù) | UserItemProcess.java | |
輸出文件 | UserItemWriter.java | |
調(diào)度批作業(yè) | 定時處理配置 | QuartzConfiguration.java |
定時調(diào)度 | QuartzJobLauncher.java | |
輔助文件 | 數(shù)據(jù)文件 | User.txt |
對象實體(傳遞對象) | User.java | |
Meaven配置文件 | Pom.xml |
2.1?? Pom.xml
<?xml version="1.0" ? encoding="UTF-8"?> <project ? xmlns="http://maven.apache.org/POM/4.0.0" ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" ??? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ? http://maven.apache.org/xsd/maven-4.0.0.xsd"> ??? <modelVersion>4.0.0</modelVersion> ? ??? <groupId>com.zy</groupId> ??? <artifactId>SpringBatchDemo1</artifactId> ??? <version>0.0.1-SNAPSHOT</version> ??? <packaging>jar</packaging> ? ??? <name>SpringBatchDemo1</name> ??? <description>Demo ? project for Spring Boot</description> ? ??? <parent> ?????? <groupId>org.springframework.boot</groupId> ?????? <artifactId>spring-boot-starter-parent</artifactId> ?????? <version>1.5.10.RELEASE</version> ?????? <relativePath ? /> <!-- lookup parent from repository --> ??? </parent> ? ??? <properties> ?????? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> ??? ??? <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> ?????? <java.version>1.8</java.version> ??? </properties> ? ??? <dependencies> ?????? <dependency> ?????????? <groupId>org.springframework</groupId> ?????????? <artifactId>spring-context-support</artifactId> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.springframework.boot</groupId> ?????????? <artifactId>spring-boot-starter-batch</artifactId> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.springframework</groupId> ?????????? <artifactId>spring-oxm</artifactId> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.projectlombok</groupId> ?????????? <artifactId>lombok</artifactId> ?????? </dependency> ?????? <!-- ? <dependency> <groupId>com.h3database</groupId> ? <artifactId>h3</artifactId> ?????????? <scope>runtime</scope> ? </dependency> --> ?????? <dependency> ?????????? <groupId>mysql</groupId> ?????????? <artifactId>mysql-connector-java</artifactId> ?????????? <scope>runtime</scope> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.springframework.boot</groupId> ?????????? <artifactId>spring-boot-starter-test</artifactId> ?????????? <scope>test</scope> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.springframework.batch</groupId> ?????????? <artifactId>spring-batch-test</artifactId> ?????????? <scope>test</scope> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.projectlombok</groupId> ?????????? <artifactId>lombok</artifactId> ?????? </dependency> ?????? <dependency> ?????????? <groupId>org.quartz-scheduler</groupId> ?????????? <artifactId>quartz</artifactId> ?????????? <version>2.3.0</version> ?????? </dependency> ?????? <dependency> ?????????? <groupId>com.h3database</groupId> ?????????? <artifactId>h3</artifactId> ?????????? <scope>runtime</scope> ?????? </dependency> ??? </dependencies> ? ??? <build> ?????? <plugins> ?????????? <plugin> ????????????? <groupId>org.springframework.boot</groupId> ????????????? <artifactId>spring-boot-maven-plugin</artifactId> ?????????? </plugin> ?????? </plugins> ??? </build> ? ? </project> |
2.2?? User.java
package com.zy.model; ? public class User { ?????? private ? String id; ?????? private ? String name; ?????? private ? String age; ?????? ?????? public ? User(String id, String name, String age) { ????????????? this.id ? = id; ????????????? this.name ? = name; ????????????? this.age ? = age; ?????? } ? ?????? public ? String getId() { ????????????? return ? id; ?????? } ? ?????? public ? void setId(String id) { ????????????? this.id ? = id; ?????? } ? ?????? public ? String getName() { ????????????? return ? name; ?????? } ? ?????? public ? void setName(String name) { ????????????? this.name ? = name; ?????? } ? ?????? public ? String getAge() { ????????????? return ? age; ?????? } ? ?????? public ? void setAge(String age) { ????????????? this.age ? = age; ?????? } ? ?????? @Override ?????? public ? String toString() { ????????????? return ? "User [id=" + id + ", name=" + name + ", age=" ? + age + "]"; ?????? } ?????? } |
2.3?? UserItemReader.java
package com.zy.reader; ? import ? org.springframework.batch.item.file.FlatFileItemReader; import ? org.springframework.batch.item.file.LineMapper; import ? org.springframework.batch.item.file.mapping.DefaultLineMapper; import org.springframework.batch.item.file.mapping.FieldSetMapper; import ? org.springframework.batch.item.file.transform.DelimitedLineTokenizer; import ? org.springframework.batch.item.file.transform.FieldSet; import ? org.springframework.batch.item.file.transform.LineTokenizer; import ? org.springframework.core.io.ClassPathResource; import ? org.springframework.validation.BindException; ? import com.zy.model.User; //從user.txt文件中讀取信息到User public class UserItemReader extends ? FlatFileItemReader<User> { ?????? public ? UserItemReader(){ ????????????? createReader(); ?????? } ?????? ?????? private ? void createReader(){ ????????????? this.setResource(new ? ClassPathResource("data/User.txt")); ????????????? this.setLinesToSkip(1); ????????????? this.setLineMapper(userLineMapper()); ?????? } ?????? ?????? private ? LineMapper<User> userLineMapper(){ ????????????? DefaultLineMapper<User> ? lineMapper = new DefaultLineMapper<>(); ????????????? lineMapper.setLineTokenizer(userLineTokenizer()); ????????????? lineMapper.setFieldSetMapper(new ? UserFieldStepMapper()); ????????????? lineMapper.afterPropertiesSet(); ? ????????????? return ? lineMapper; ?????? } ?????? ??? ? private LineTokenizer userLineTokenizer(){ ?????? ? ?DelimitedLineTokenizer ? tokenizer = new DelimitedLineTokenizer(); ??????? ? tokenizer.setNames(new String[]{"ID", "NAME", ? "AGE"}); ??????? ? return tokenizer; ??? ? } ??? ? ??? ? private static class UserFieldStepMapper implements ? FieldSetMapper<User>{ ????????????? @Override ????????????? public ? User mapFieldSet(FieldSet fieldSet) throws BindException { ??????????? return new ? User(fieldSet.readString("ID"), ??????????????????? ? fieldSet.readString("NAME"), ??????????????????? ? fieldSet.readString("AGE")); ????????????? } ? ??? ? } ? ??? ? } |
2.4?? User.txt
ID,NAME,AGE 1,zy,28 2,tom,20 3,terry,30 4,lerry,18 5,bob,25 6,linda,27 7,marry,39 8,long,22 9,kin,33 10,王五,40 |
?
2.5?? UserItemProcessor.java
package com.zy.processor; import ? org.springframework.batch.item.ItemProcessor; import com.zy.model.User; ? public class UserItemProcessor implements ? ItemProcessor<User, User> { ? ?????? @Override ?????? public ? User process(User item) throws Exception { ????????????? if ? (Integer.parseInt(item.getAge()) > 20) { ???????????????? ???????????????????? return ? item; ????????????? } ????????????? return ? null; ?????? } ? } |
?
2.6?? UserItemWriter.java
package com.zy.writer; import java.util.List; import ? org.springframework.batch.item.ItemWriter; import com.zy.model.User; ? public class UserItemWriter implements ? ItemWriter<User> { ? ?????? @Override ?????? public ? void write(List<? extends User> items) throws Exception { ????????????? for(User ? user : items){ ???????????????????? System.out.println(user); ????????????? } ?????? } ? } |
2.7?? QuartzJobLauncher
package com.zy.QuartzConfiguration; ? import java.text.SimpleDateFormat; import java.util.Date; import org.quartz.JobDataMap; import org.quartz.JobDetail; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; import ? org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import ? org.springframework.batch.core.JobParameters; import ? org.springframework.batch.core.configuration.JobLocator; import ? org.springframework.batch.core.launch.JobLauncher; import ? org.springframework.scheduling.quartz.QuartzJobBean; ? public class QuartzJobLauncher extends ? QuartzJobBean { ?????? @Override ?????? protected ? void executeInternal(JobExecutionContext context) throws JobExecutionException ? { ????????????? ????????????? JobDetail ? jobDetail = context.getJobDetail(); ????????????? JobDataMap ? jobDataMap = jobDetail.getJobDataMap(); ????????????? String ? jobName = jobDataMap.getString("jobName"); ????????????? JobLauncher ? jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher"); ????????????? JobLocator ? jobLocator = (JobLocator) jobDataMap.get("jobLocator"); ????????????? System.out.println("jobName ? : " + jobName); ????????????? System.out.println("jobLauncher ? : " + jobLauncher); ????????????? System.out.println("jobLocator ? : " + jobLocator); ????????????? JobKey ? key = context.getJobDetail().getKey(); ????????????? System.out.println(key.getName() ? + " : " + key.getGroup()); ????????????? SimpleDateFormat ? sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); ????????????? System.out.println("Current ? time : " + sf.format(new Date())); ????????????? ????????????? try ? { ???????????????????? Job ? job = jobLocator.getJob(jobName); ???????????????????? JobExecution ? jobExecution = jobLauncher.run(job, new JobParameters()); ????????????? } ? catch (Exception e) { ???????????????????? e.printStackTrace(); ????????????? } ????????????? ?????? } ? } |
?
2.8?? QuartzConfiguration
package com.zy.QuartzConfiguration; ? import java.util.HashMap; import java.util.Map; ? import ? org.springframework.batch.core.configuration.JobLocator; import ? org.springframework.batch.core.configuration.JobRegistry; import ? org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor; import ? org.springframework.batch.core.launch.JobLauncher; import ? org.springframework.beans.factory.annotation.Autowired; import ? org.springframework.context.annotation.Bean; import ? org.springframework.context.annotation.Configuration; import ? org.springframework.scheduling.quartz.CronTriggerFactoryBean; import ? org.springframework.scheduling.quartz.JobDetailFactoryBean; import ? org.springframework.scheduling.quartz.SchedulerFactoryBean; ? @Configuration public class QuartzConfiguration { ?????? ?????? //自動注入進(jìn)來的是SimpleJobLauncher ?????? @Autowired ?????? private ? JobLauncher jobLauncher; ?????? ?????? @Autowired ?????? private ? JobLocator jobLocator; ?????? ?????? /*用來注冊job*/ ?????? /*JobRegistry會自動注入進(jìn)來*/ ?????? @Bean ?????? public ? JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry ? jobRegistry){ ????????????? JobRegistryBeanPostProcessor ? jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor(); ????????????? jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry); ????????????? return ? jobRegistryBeanPostProcessor; ?????? } ?????? ?????? @Bean ?????? public ? JobDetailFactoryBean jobDetailFactoryBean(){ ????????????? JobDetailFactoryBean ? jobFactory = new JobDetailFactoryBean(); ????????????? jobFactory.setJobClass(QuartzJobLauncher.class); ????????????? jobFactory.setGroup("my_group"); ????????????? jobFactory.setName("my_job"); ????????????? Map<String, ? Object> map = new HashMap<>(); ????????????? map.put("jobName", ? "zyJob"); ????????????? map.put("jobLauncher", ? jobLauncher); ????????????? map.put("jobLocator", ? jobLocator); ????????????? jobFactory.setJobDataAsMap(map); ????????????? return ? jobFactory; ?????? } ?????? ?????? @Bean ?????? public ? CronTriggerFactoryBean cronTriggerFactoryBean(){ ????????????? CronTriggerFactoryBean ? cTrigger = new CronTriggerFactoryBean(); ????????????? System.out.println("------- ? : " + jobDetailFactoryBean().getObject()); ????????????? cTrigger.setJobDetail(jobDetailFactoryBean().getObject()); ????????????? cTrigger.setStartDelay(3000); ????????????? cTrigger.setName("my_trigger"); ????????????? cTrigger.setGroup("trigger_group"); ????????????? cTrigger.setCronExpression("0/3 ? * * * * ? "); //每間隔3s觸發(fā)一次Job任務(wù) ????????????? return ? cTrigger; ?????? } ?????? ?????? @Bean ?????? public ? SchedulerFactoryBean schedulerFactoryBean(){ ????????????? SchedulerFactoryBean ? schedulerFactor = new SchedulerFactoryBean(); ????????????? schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject()); ????????????? return ? schedulerFactor; ?????? } ? } |
?
?
2.9?? BatchConfiguration
package com.zy.config; import ? org.springframework.batch.core.Job; import ? org.springframework.batch.core.Step; import ? org.springframework.batch.core.configuration.annotation.EnableBatchProcessing; import ? org.springframework.batch.core.configuration.annotation.JobBuilderFactory; import ? org.springframework.batch.core.configuration.annotation.StepBuilderFactory; import ? org.springframework.beans.factory.annotation.Autowired; import ? org.springframework.context.annotation.Bean; import ? org.springframework.context.annotation.Configuration; import ? org.springframework.context.annotation.Import; import com.zy.QuartzConfiguration.QuartzConfiguration; import com.zy.model.User; import ? com.zy.processor.UserItemProcessor; import com.zy.reader.UserItemReader; import com.zy.writer.UserItemWriter; ? @Configuration @EnableBatchProcessing //@Import({QuartzConfiguration.class}) public class BatchConfiguration { ?????? ?????? @Autowired ?????? public ? JobBuilderFactory jobBuilderFactory; ?????? @Autowired ?????? public ? StepBuilderFactory stepBuilderFactory; ?????? ?????? ?????? /*創(chuàng)建job*/ ?????? @Bean ?????? public ? Job jobMethod(){ ????????????? return ? jobBuilderFactory.get("zyJob") ??????????????????????????? .start(stepMethod()) ??????????????????????????? .build(); ?????? } ?????? ?????? /*創(chuàng)建step*/ ?????? @Bean ?????? public ? Step stepMethod(){ ????????????? return ? stepBuilderFactory.get("myStep1") ??????????????????????????? .<User, ? User>chunk(3) ??????????????????????????? .reader(new ? UserItemReader()) ??????????????????????????? .processor(new ? UserItemProcessor()) ??????????????????????????? .writer(new ? UserItemWriter()) ??????????????????????????? .allowStartIfComplete(true) ??????????????????????????? .build(); ?????? } ?????? ? } |
?
3??? 執(zhí)行Job輸出結(jié)果
2019-04-30 21:31:48.049? INFO 9344 --- [ryBean_Worker-5] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] completed with the following ? parameters: [{}] and the following status: [COMPLETED] jobName : zyJob jobLauncher : ? org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5 my_job : my_group Current time : 2019-04-30 21:31:51 2019-04-30 21:31:51.012? INFO 9344 --- [ryBean_Worker-6] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] launched with the following ? parameters: [{}] 2019-04-30 21:31:51.028? INFO 9344 --- [ryBean_Worker-6] ? o.s.batch.core.job.SimpleStepHandler???? ? : Executing step: [myStep1] User [id=1, name=zy, age=28] User [id=3, name=terry, age=30] User [id=5, name=bob, age=25] User [id=6, name=linda, age=27] User [id=7, name=marry, age=39] User [id=8, name=long, age=22] User [id=9, name=kin, age=33] User [id=10, name=ww, age=40] |
?
4??? 概念總結(jié)
Job Repository | 作業(yè)倉庫,負(fù)責(zé)Job,Step執(zhí)行過程中的狀態(tài)保存。 | |
Job Launcher | 作業(yè)調(diào)度器,提供執(zhí)行Job的入口 | |
Job | 作業(yè),多個Step組成,封裝整個批處理操作。 | |
Step | 作業(yè)步,Job的一個執(zhí)行環(huán)節(jié),由多個或者一個Step組裝成Job | |
Tasklet | Step中具體執(zhí)行的邏輯的操作,可以重復(fù)執(zhí)行,可以具體的設(shè)置同步,異步操作。 | |
Chunk | 給定數(shù)量的Item集合,可以定義對Chunk的讀操作,處理操作,寫操作,提交間隔。 | |
Item | 一條數(shù)據(jù)記錄。 | |
ItemReader | 從數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)讀取Item | |
ItemProcessor | 在寫入數(shù)據(jù)源之前,對數(shù)據(jù)進(jìn)行處理(如:數(shù)據(jù)清洗,轉(zhuǎn)換,過濾,數(shù)據(jù)校驗等)。 | |
ItemWriter | 將Item批量寫入數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)。 |
5??? Spring Batch 結(jié)構(gòu)
Spring Batch的一個基本層級結(jié)構(gòu)。
首先,Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。
一個Job包含很多Step,step就是每個job要執(zhí)行的單個步驟。
如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務(wù)單元,它是屬于可以重復(fù)利用的東西。
然后是Chunk,chunk就是數(shù)據(jù)塊,你需要定義多大的數(shù)據(jù)量是一個chunk。
Chunk里面就是不斷循環(huán)的一個流程,讀數(shù)據(jù),處理數(shù)據(jù),然后寫數(shù)據(jù)。Spring Batch會不斷的循環(huán)這個流程,直到批處理數(shù)據(jù)完成。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。