千家信息网

Spring batch入门示例

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,1 场景说明读取CVS文件,经过处理后,保存到数据库。2 项目结构应用程序启动主程序DemoApplication.java读取文件(输入文件)UserItemReader.java处理数据UserI
千家信息网最后更新 2025年12月03日Spring batch入门示例

1 场景说明

读取CVS文件,经过处理后,保存到数据库。

2 项目结构

应用程序

启动主程序

DemoApplication.java

读取文件(输入文件)

UserItemReader.java

处理数据

UserItemProcess.java

输出文件

UserItemWriter.java

调度批作业

定时处理配置

QuartzConfiguration.java

定时调度

QuartzJobLauncher.java

辅助文件

数据文件

User.txt

对象实体(传递对象)

User.java

Meaven配置文件

Pom.xml

2.1 Pom.xml

xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

4.0.0

com.zy

SpringBatchDemo1

0.0.1-SNAPSHOT

jar

SpringBatchDemo1

Demo project for Spring Boot

org.springframework.boot

spring-boot-starter-parent

1.5.10.RELEASE

UTF-8

UTF-8

1.8

org.springframework

spring-context-support

org.springframework.boot

spring-boot-starter-batch

org.springframework

spring-oxm

org.projectlombok

lombok

mysql

mysql-connector-java

runtime

org.springframework.boot

spring-boot-starter-test

test

org.springframework.batch

spring-batch-test

test

org.projectlombok

lombok

org.quartz-scheduler

quartz

2.3.0

com.h3database

h3

runtime

org.springframework.boot

spring-boot-maven-plugin

2.2 User.java

package com.zy.model;

public class User {

private String id;

private String name;

private String age;

public User(String id, String name, String age) {

this.id = id;

this.name = name;

this.age = age;

}

public String getId() {

return id;

}

public void setId(String id) {

this.id = id;

}

public String getName() {

return name;

}

public void setName(String name) {

this.name = name;

}

public String getAge() {

return age;

}

public void setAge(String age) {

this.age = age;

}

@Override

public String toString() {

return "User [id=" + id + ", name=" + name + ", age=" + age + "]";

}

}

2.3 UserItemReader.java

package com.zy.reader;

import org.springframework.batch.item.file.FlatFileItemReader;

import org.springframework.batch.item.file.LineMapper;

import org.springframework.batch.item.file.mapping.DefaultLineMapper;

import org.springframework.batch.item.file.mapping.FieldSetMapper;

import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;

import org.springframework.batch.item.file.transform.FieldSet;

import org.springframework.batch.item.file.transform.LineTokenizer;

import org.springframework.core.io.ClassPathResource;

import org.springframework.validation.BindException;

import com.zy.model.User;

//从user.txt文件中读取信息到User

public class UserItemReader extends FlatFileItemReader {

public UserItemReader(){

createReader();

}

private void createReader(){

this.setResource(new ClassPathResource("data/User.txt"));

this.setLinesToSkip(1);

this.setLineMapper(userLineMapper());

}

private LineMapper userLineMapper(){

DefaultLineMapper lineMapper = new DefaultLineMapper<>();

lineMapper.setLineTokenizer(userLineTokenizer());

lineMapper.setFieldSetMapper(new UserFieldStepMapper());

lineMapper.afterPropertiesSet();

return lineMapper;

}

private LineTokenizer userLineTokenizer(){

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();

tokenizer.setNames(new String[]{"ID", "NAME", "AGE"});

return tokenizer;

}

private static class UserFieldStepMapper implements FieldSetMapper{

@Override

public User mapFieldSet(FieldSet fieldSet) throws BindException {

return new User(fieldSet.readString("ID"),

fieldSet.readString("NAME"),

fieldSet.readString("AGE"));

}

}

}

2.4 User.txt

ID,NAME,AGE

1,zy,28

2,tom,20

3,terry,30

4,lerry,18

5,bob,25

6,linda,27

7,marry,39

8,long,22

9,kin,33

10,王五,40

2.5 UserItemProcessor.java

package com.zy.processor;

import org.springframework.batch.item.ItemProcessor;

import com.zy.model.User;

public class UserItemProcessor implements ItemProcessor {

@Override

public User process(User item) throws Exception {

if (Integer.parseInt(item.getAge()) > 20) {

return item;

}

return null;

}

}

2.6 UserItemWriter.java

package com.zy.writer;

import java.util.List;

import org.springframework.batch.item.ItemWriter;

import com.zy.model.User;

public class UserItemWriter implements ItemWriter {

@Override

public void write(List items) throws Exception {

for(User user : items){

System.out.println(user);

}

}

}

2.7 QuartzJobLauncher

package com.zy.QuartzConfiguration;

import java.text.SimpleDateFormat;

import java.util.Date;

import org.quartz.JobDataMap;

import org.quartz.JobDetail;

import org.quartz.JobExecutionContext;

import org.quartz.JobExecutionException;

import org.quartz.JobKey;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.JobExecution;

import org.springframework.batch.core.JobParameters;

import org.springframework.batch.core.configuration.JobLocator;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.scheduling.quartz.QuartzJobBean;

public class QuartzJobLauncher extends QuartzJobBean {

@Override

protected void executeInternal(JobExecutionContext context) throws JobExecutionException {

JobDetail jobDetail = context.getJobDetail();

JobDataMap jobDataMap = jobDetail.getJobDataMap();

String jobName = jobDataMap.getString("jobName");

JobLauncher jobLauncher = (JobLauncher) jobDataMap.get("jobLauncher");

JobLocator jobLocator = (JobLocator) jobDataMap.get("jobLocator");

System.out.println("jobName : " + jobName);

System.out.println("jobLauncher : " + jobLauncher);

System.out.println("jobLocator : " + jobLocator);

JobKey key = context.getJobDetail().getKey();

System.out.println(key.getName() + " : " + key.getGroup());

SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

System.out.println("Current time : " + sf.format(new Date()));

try {

Job job = jobLocator.getJob(jobName);

JobExecution jobExecution = jobLauncher.run(job, new JobParameters());

} catch (Exception e) {

e.printStackTrace();

}

}

}

2.8 QuartzConfiguration

package com.zy.QuartzConfiguration;

import java.util.HashMap;

import java.util.Map;

import org.springframework.batch.core.configuration.JobLocator;

import org.springframework.batch.core.configuration.JobRegistry;

import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;

import org.springframework.batch.core.launch.JobLauncher;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.scheduling.quartz.CronTriggerFactoryBean;

import org.springframework.scheduling.quartz.JobDetailFactoryBean;

import org.springframework.scheduling.quartz.SchedulerFactoryBean;

@Configuration

public class QuartzConfiguration {

//自动注入进来的是SimpleJobLauncher

@Autowired

private JobLauncher jobLauncher;

@Autowired

private JobLocator jobLocator;

/*用来注册job*/

/*JobRegistry会自动注入进来*/

@Bean

public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry){

JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();

jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);

return jobRegistryBeanPostProcessor;

}

@Bean

public JobDetailFactoryBean jobDetailFactoryBean(){

JobDetailFactoryBean jobFactory = new JobDetailFactoryBean();

jobFactory.setJobClass(QuartzJobLauncher.class);

jobFactory.setGroup("my_group");

jobFactory.setName("my_job");

Map map = new HashMap<>();

map.put("jobName", "zyJob");

map.put("jobLauncher", jobLauncher);

map.put("jobLocator", jobLocator);

jobFactory.setJobDataAsMap(map);

return jobFactory;

}

@Bean

public CronTriggerFactoryBean cronTriggerFactoryBean(){

CronTriggerFactoryBean cTrigger = new CronTriggerFactoryBean();

System.out.println("------- : " + jobDetailFactoryBean().getObject());

cTrigger.setJobDetail(jobDetailFactoryBean().getObject());

cTrigger.setStartDelay(3000);

cTrigger.setName("my_trigger");

cTrigger.setGroup("trigger_group");

cTrigger.setCron_Expression("0/3 * * * * ? "); //每间隔3s触发一次Job任务

return cTrigger;

}

@Bean

public SchedulerFactoryBean schedulerFactoryBean(){

SchedulerFactoryBean schedulerFactor = new SchedulerFactoryBean();

schedulerFactor.setTriggers(cronTriggerFactoryBean().getObject());

return schedulerFactor;

}

}

2.9 BatchConfiguration

package com.zy.config;

import org.springframework.batch.core.Job;

import org.springframework.batch.core.Step;

import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;

import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;

import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.context.annotation.Bean;

import org.springframework.context.annotation.Configuration;

import org.springframework.context.annotation.Import;

import com.zy.QuartzConfiguration.QuartzConfiguration;

import com.zy.model.User;

import com.zy.processor.UserItemProcessor;

import com.zy.reader.UserItemReader;

import com.zy.writer.UserItemWriter;

@Configuration

@EnableBatchProcessing

//@Import({QuartzConfiguration.class})

public class BatchConfiguration {

@Autowired

public JobBuilderFactory jobBuilderFactory;

@Autowired

public StepBuilderFactory stepBuilderFactory;

/*创建job*/

@Bean

public Job jobMethod(){

return jobBuilderFactory.get("zyJob")

.start(stepMethod())

.build();

}

/*创建step*/

@Bean

public Step stepMethod(){

return stepBuilderFactory.get("myStep1")

.chunk(3)

.reader(new UserItemReader())

.processor(new UserItemProcessor())

.writer(new UserItemWriter())

.allowStartIfComplete(true)

.build();

}

}

3 执行Job输出结果

2019-04-30 21:31:48.049 INFO 9344 --- [ryBean_Worker-5] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

jobName : zyJob

jobLauncher : org.springframework.batch.core.launch.support.SimpleJobLauncher@2d27244d

jobLocator : org.springframework.batch.core.configuration.support.MapJobRegistry@6fc00b5

my_job : my_group

Current time : 2019-04-30 21:31:51

2019-04-30 21:31:51.012 INFO 9344 --- [ryBean_Worker-6] o.s.b.c.l.support.SimpleJobLauncher : Job: [SimpleJob: [name=zyJob]] launched with the following parameters: [{}]

2019-04-30 21:31:51.028 INFO 9344 --- [ryBean_Worker-6] o.s.batch.core.job.SimpleStepHandler : Executing step: [myStep1]

User [id=1, name=zy, age=28]

User [id=3, name=terry, age=30]

User [id=5, name=bob, age=25]

User [id=6, name=linda, age=27]

User [id=7, name=marry, age=39]

User [id=8, name=long, age=22]

User [id=9, name=kin, age=33]

User [id=10, name=ww, age=40]

4 概念总结


Job Repository

作业仓库,负责Job,Step执行过程中的状态保存。


Job Launcher

作业调度器,提供执行Job的入口


Job

作业,多个Step组成,封装整个批处理操作。


Step

作业步,Job的一个执行环节,由多个或者一个Step组装成Job


Tasklet

Step中具体执行的逻辑的操作,可以重复执行,可以具体的设置同步,异步操作。


Chunk

给定数量的Item集合,可以定义对Chunk的读操作,处理操作,写操作,提交间隔。


Item

一条数据记录。


ItemReader

从数据源(文件系统,数据库,队列等)读取Item


ItemProcessor

在写入数据源之前,对数据进行处理(如:数据清洗,转换,过滤,数据校验等)。


ItemWriter

将Item批量写入数据源(文件系统,数据库,队列等)。

5 Spring Batch 结构

Spring Batch的一个基本层级结构。

首先,Spring Batch运行的基本单位是一个Job,一个Job就做一件批处理的事情。

一个Job包含很多Step,step就是每个job要执行的单个步骤。

如下图所示,Step里面,会有Tasklet,Tasklet是一个任务单元,它是属于可以重复利用的东西。

然后是Chunk,chunk就是数据块,你需要定义多大的数据量是一个chunk。

Chunk里面就是不断循环的一个流程,读数据,处理数据,然后写数据。Spring Batch会不断的循环这个流程,直到批处理数据完成。



0