溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Spring batch入門示例

發(fā)布時間:2020-06-29 07:01:54 來源:網(wǎng)絡(luò) 閱讀:718 作者:A零零八 欄目:大數(shù)據(jù)

1??? 場景說明

讀取CVS文件,經(jīng)過處理后,保存到數(shù)據(jù)庫。

?

Spring batch入門示例

2??? 項目結(jié)構(gòu)

應(yīng)用程序

啟動主程序

DemoApplication.java

讀取文件(輸入文件)

UserItemReader.java

處理數(shù)據(jù)

UserItemProcess.java

輸出文件

UserItemWriter.java

調(diào)度批作業(yè)

定時處理配置

QuartzConfiguration.java

定時調(diào)度

QuartzJobLauncher.java

輔助文件

數(shù)據(jù)文件

User.txt

對象實體(傳遞對象)

User.java

Meaven配置文件

Pom.xml

Spring batch入門示例

2.1?? Pom.xml

<?xml version="1.0" ? encoding="UTF-8"?>

<project ? xmlns="http://maven.apache.org/POM/4.0.0" ? xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

??? xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 ? http://maven.apache.org/xsd/maven-4.0.0.xsd">

??? <modelVersion>4.0.0</modelVersion>

?

??? <groupId>com.zy</groupId>

??? <artifactId>SpringBatchDemo1</artifactId>

??? <version>0.0.1-SNAPSHOT</version>

??? <packaging>jar</packaging>

?

??? <name>SpringBatchDemo1</name>

??? <description>Demo ? project for Spring Boot</description>

?

??? <parent>

?????? <groupId>org.springframework.boot</groupId>

?????? <artifactId>spring-boot-starter-parent</artifactId>

?????? <version>1.5.10.RELEASE</version>

?????? <relativePath ? /> <!-- lookup parent from repository -->

??? </parent>

?

??? <properties>

?????? <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

??? ??? <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

?????? <java.version>1.8</java.version>

??? </properties>

?

??? <dependencies>

?????? <dependency>

?????????? <groupId>org.springframework</groupId>

?????????? <artifactId>spring-context-support</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.boot</groupId>

?????????? <artifactId>spring-boot-starter-batch</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework</groupId>

?????????? <artifactId>spring-oxm</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.projectlombok</groupId>

?????????? <artifactId>lombok</artifactId>

?????? </dependency>

?????? <!-- ? <dependency> <groupId>com.h3database</groupId> ? <artifactId>h3</artifactId>

?????????? <scope>runtime</scope> ? </dependency> -->

?????? <dependency>

?????????? <groupId>mysql</groupId>

?????????? <artifactId>mysql-connector-java</artifactId>

?????????? <scope>runtime</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.boot</groupId>

?????????? <artifactId>spring-boot-starter-test</artifactId>

?????????? <scope>test</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.springframework.batch</groupId>

?????????? <artifactId>spring-batch-test</artifactId>

?????????? <scope>test</scope>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.projectlombok</groupId>

?????????? <artifactId>lombok</artifactId>

?????? </dependency>

?????? <dependency>

?????????? <groupId>org.quartz-scheduler</groupId>

?????????? <artifactId>quartz</artifactId>

?????????? <version>2.3.0</version>

?????? </dependency>

?????? <dependency>

?????????? <groupId>com.h3database</groupId>

?????????? <artifactId>h3</artifactId>

?????????? <scope>runtime</scope>

?????? </dependency>

??? </dependencies>

?

??? <build>

?????? <plugins>

?????????? <plugin>

????????????? <groupId>org.springframework.boot</groupId>

????????????? <artifactId>spring-boot-maven-plugin</artifactId>

?????????? </plugin>

?????? </plugins>

??? </build>

?

?

</project>

2.2?? User.java

package com.zy.model;

?

public class User {

?????? private ? String id;

?????? private ? String name;

?????? private ? String age;

??????

?????? public ? User(String id, String name, String age) {

????????????? this.id ? = id;

????????????? this.name ? = name;

????????????? this.age ? = age;

?????? }

?

?????? public ? String getId() {

????????????? return ? id;

?????? }

?

?????? public ? void setId(String id) {

????????????? this.id ? = id;

?????? }

?

?????? public ? String getName() {

????????????? return ? name;

?????? }

?

?????? public ? void setName(String name) {

????????????? this.name ? = name;

?????? }

?

?????? public ? String getAge() {

????????????? return ? age;

?????? }

?

?????? public ? void setAge(String age) {

????????????? this.age ? = age;

?????? }

?

?????? @Override

?????? public ? String toString() {

????????????? return ? "User [id=" + id + ", name=" + name + ", age=" ? + age + "]";

?????? }

??????

}

2.3?? UserItemReader.java

package com.zy.reader;

?

import ? org.springframework.batch.item.file.FlatFileItemReader;

import ? org.springframework.batch.item.file.LineMapper;

import ? org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import ? org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import ? org.springframework.batch.item.file.transform.FieldSet;

import ? org.springframework.batch.item.file.transform.LineTokenizer;

import ? org.springframework.core.io.ClassPathResource;

import ? org.springframework.validation.BindException;

?

import com.zy.model.User;

//從user.txt文件中讀取信息到User

public class UserItemReader extends ? FlatFileItemReader<User> {

?????? public ? UserItemReader(){

????????????? createReader();

?????? }

??????

?????? private ? void createReader(){

????????????? this.setResource(new ? ClassPathResource("data/User.txt"));

????????????? this.setLinesToSkip(1);

????????????? this.setLineMapper(userLineMapper());

?????? }

??????

?????? private ? LineMapper<User> userLineMapper(){

????????????? DefaultLineMapper<User> ? lineMapper = new DefaultLineMapper<>();

????????????? lineMapper.setLineTokenizer(userLineTokenizer());

????????????? lineMapper.setFieldSetMapper(new ? UserFieldStepMapper());

????????????? lineMapper.afterPropertiesSet(); ?

????????????? return ? lineMapper;

?????? }

??????

??? ? private LineTokenizer userLineTokenizer(){

?????? ? ?DelimitedLineTokenizer ? tokenizer = new DelimitedLineTokenizer();

??????? ? tokenizer.setNames(new String[]{"ID", "NAME", ? "AGE"});

??????? ? return tokenizer;

??? ? }

??? ?

??? ? private static class UserFieldStepMapper implements ? FieldSetMapper<User>{

????????????? @Override

????????????? public ? User mapFieldSet(FieldSet fieldSet) throws BindException {

??????????? return new ? User(fieldSet.readString("ID"),

??????????????????? ? fieldSet.readString("NAME"),

??????????????????? ? fieldSet.readString("AGE"));

????????????? }

?

??? ? }

?

??? ?

}

2.4?? User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

?

2.5?? UserItemProcessor.java

package com.zy.processor;

import ? org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

?

public class UserItemProcessor implements ? ItemProcessor<User, User> {

?

?????? @Override

?????? public ? User process(User item) throws Exception {

????????????? if ? (Integer.parseInt(item.getAge()) > 20) {

????????????????

???????????????????? return ? item;

????????????? }

????????????? return ? null;

?????? }

?

}

?

2.6?? UserItemWriter.java

package com.zy.writer;

import java.util.List;

import ? org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

?

public class UserItemWriter implements ? ItemWriter<User> {

?

?????? @Override

?????? public ? void write(List<? extends User> items) throws Exception {

????????????? for(User ? user : items){

???????????????????? System.out.println(user);

????????????? }

?????? }

?

}

2.7?? QuartzJobLauncher

package com.zy.QuartzConfiguration;

?

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import ? org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import ? org.springframework.batch.core.JobParameters;

import ? org.springframework.batch.core.configuration.JobLocator;

import ? org.springframework.batch.core.launch.JobLauncher;

import ? org.springframework.scheduling.quartz.QuartzJobBean;

?

public class QuartzJobLauncher extends ? QuartzJobBean {

?????? @Override

?????? protected ? void executeInternal(JobExecutionContext context) throws JobExecutionException ? {

?????????????

????????????? JobDetail ? jobDetail = context.getJobDetail();

????????????? JobDataMap ? jobDataMap = jobDetail.getJobDataMap();

????????????? String ? jobName = jobDataMap.getString("jobName");

????????????? JobLauncher ? jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

????????????? JobLocator ? jobLocator = (JobLocator) jobDataMap.get("jobLocator");

????????????? System.out.println("jobName ? : " + jobName);

????????????? System.out.println("jobLauncher ? : " + jobLauncher);

????????????? System.out.println("jobLocator ? : " + jobLocator);

????????????? JobKey ? key = context.getJobDetail().getKey();

????????????? System.out.println(key.getName() ? + " : " + key.getGroup());

????????????? SimpleDateFormat ? sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

????????????? System.out.println("Current ? time : " + sf.format(new Date()));

?????????????

????????????? try ? {

???????????????????? Job ? job = jobLocator.getJob(jobName);

???????????????????? JobExecution ? jobExecution = jobLauncher.run(job, new JobParameters());

????????????? } ? catch (Exception e) {

???????????????????? e.printStackTrace();

????????????? }

?????????????

?????? }

?

}

?

2.8?? QuartzConfiguration

package com.zy.QuartzConfiguration;

?

import java.util.HashMap;

import java.util.Map;

?

import ? org.springframework.batch.core.configuration.JobLocator;

import ? org.springframework.batch.core.configuration.JobRegistry;

import ? org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import ? org.springframework.batch.core.launch.JobLauncher;

import ? org.springframework.beans.factory.annotation.Autowired;

import ? org.springframework.context.annotation.Bean;

import ? org.springframework.context.annotation.Configuration;

import ? org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import ? org.springframework.scheduling.quartz.JobDetailFactoryBean;

import ? org.springframework.scheduling.quartz.SchedulerFactoryBean;

?

@Configuration

public class QuartzConfiguration {

??????

?????? //自動注入進(jìn)來的是SimpleJobLauncher

?????? @Autowired

?????? private ? JobLauncher jobLauncher;

??????

?????? @Autowired

?????? private ? JobLocator jobLocator;

??????

?????? /*用來注冊job*/

?????? /*JobRegistry會自動注入進(jìn)來*/

?????? @Bean

?????? public ? JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry ? jobRegistry){

????????????? JobRegistryBeanPostProcessor ? jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

????????????? jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

????????????? return ? jobRegistryBeanPostProcessor;

?????? }

??????

?????? @Bean

?????? public ? JobDetailFactoryBean jobDetailFactoryBean(){

????????????? JobDetailFactoryBean ? jobFactory = new JobDetailFactoryBean();

????????????? jobFactory.setJobClass(QuartzJobLauncher.class);

????????????? jobFactory.setGroup("my_group");

????????????? jobFactory.setName("my_job");

????????????? Map<String, ? Object> map = new HashMap<>();

????????????? map.put("jobName", ? "zyJob");

????????????? map.put("jobLauncher", ? jobLauncher);

????????????? map.put("jobLocator", ? jobLocator);

????????????? jobFactory.setJobDataAsMap(map);

????????????? return ? jobFactory;

?????? }

??????

?????? @Bean

?????? public ? CronTriggerFactoryBean cronTriggerFactoryBean(){

????????????? CronTriggerFactoryBean ? cTrigger = new CronTriggerFactoryBean();

????????????? System.out.println("------- ? : " + jobDetailFactoryBean().getObject());

????????????? cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

????????????? cTrigger.setStartDelay(3000);

????????????? cTrigger.setName("my_trigger");

????????????? cTrigger.setGroup("trigger_group");

????????????? cTrigger.setCronExpression("0/3 ? * * * * ? "); //每間隔3s觸發(fā)一次Job任務(wù)

????????????? return ? cTrigger;

?????? }

??????

?????? @Bean

?????? public ? SchedulerFactoryBean schedulerFactoryBean(){

????????????? SchedulerFactoryBean ? schedulerFactor = new SchedulerFactoryBean();

????????????? schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

????????????? return ? schedulerFactor;

?????? }

?

}

?

?

2.9?? BatchConfiguration

package com.zy.config;

import ? org.springframework.batch.core.Job;

import ? org.springframework.batch.core.Step;

import ? org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import ? org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import ? org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import ? org.springframework.beans.factory.annotation.Autowired;

import ? org.springframework.context.annotation.Bean;

import ? org.springframework.context.annotation.Configuration;

import ? org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import ? com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

?

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

??????

?????? @Autowired

?????? public ? JobBuilderFactory jobBuilderFactory;

?????? @Autowired

?????? public ? StepBuilderFactory stepBuilderFactory;

??????

??????

?????? /*創(chuàng)建job*/

?????? @Bean

?????? public ? Job jobMethod(){

????????????? return ? jobBuilderFactory.get("zyJob")

??????????????????????????? .start(stepMethod())

??????????????????????????? .build();

?????? }

??????

?????? /*創(chuàng)建step*/

?????? @Bean

?????? public ? Step stepMethod(){

????????????? return ? stepBuilderFactory.get("myStep1")

??????????????????????????? .<User, ? User>chunk(3)

??????????????????????????? .reader(new ? UserItemReader())

??????????????????????????? .processor(new ? UserItemProcessor())

??????????????????????????? .writer(new ? UserItemWriter())

??????????????????????????? .allowStartIfComplete(true)

??????????????????????????? .build();

?????? }

??????

?

}

?

3??? 執(zhí)行Job輸出結(jié)果

2019-04-30 21:31:48.049? INFO 9344 --- [ryBean_Worker-5] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] completed with the following ? parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher : ? org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012? INFO 9344 --- [ryBean_Worker-6] ? o.s.b.c.l.support.SimpleJobLauncher????? ? : Job: [SimpleJob: [name=zyJob]] launched with the following ? parameters: [{}]

2019-04-30 21:31:51.028? INFO 9344 --- [ryBean_Worker-6] ? o.s.batch.core.job.SimpleStepHandler???? ? : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

?

4??? 概念總結(jié)


Job Repository

作業(yè)倉庫,負(fù)責(zé)Job,Step執(zhí)行過程中的狀態(tài)保存。


Job Launcher

作業(yè)調(diào)度器,提供執(zhí)行Job的入口


Job

作業(yè),多個Step組成,封裝整個批處理操作。


Step

作業(yè)步,Job的一個執(zhí)行環(huán)節(jié),由多個或者一個Step組裝成Job


Tasklet

Step中具體執(zhí)行的邏輯的操作,可以重復(fù)執(zhí)行,可以具體的設(shè)置同步,異步操作。


Chunk

給定數(shù)量的Item集合,可以定義對Chunk的讀操作,處理操作,寫操作,提交間隔。


Item

一條數(shù)據(jù)記錄。


ItemReader

從數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)讀取Item


ItemProcessor

在寫入數(shù)據(jù)源之前,對數(shù)據(jù)進(jìn)行處理(如:數(shù)據(jù)清洗,轉(zhuǎn)換,過濾,數(shù)據(jù)校驗等)。


ItemWriter

將Item批量寫入數(shù)據(jù)源(文件系統(tǒng),數(shù)據(jù)庫,隊列等)。

5??? Spring Batch 結(jié)構(gòu)

Spring Batch的一個基本層級結(jié)構(gòu)。

首先,Spring Batch運行的基本單位是一個Job,一個Job就做一件批處理的事情。

一個Job包含很多Step,step就是每個job要執(zhí)行的單個步驟。

如下圖所示,Step里面,會有Tasklet,Tasklet是一個任務(wù)單元,它是屬于可以重復(fù)利用的東西。

然后是Chunk,chunk就是數(shù)據(jù)塊,你需要定義多大的數(shù)據(jù)量是一個chunk。

Chunk里面就是不斷循環(huán)的一個流程,讀數(shù)據(jù),處理數(shù)據(jù),然后寫數(shù)據(jù)。Spring Batch會不斷的循環(huán)這個流程,直到批處理數(shù)據(jù)完成。

Spring batch入門示例



向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI