千家信息网

Spring-batch (ItemProcessor) 数据处理过程

发表于:2025-12-03 作者:千家信息网编辑
千家信息网最后更新 2025年12月03日,Spring-batch学习总结(五)学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑
千家信息网最后更新 2025年12月03日Spring-batch (ItemProcessor) 数据处理过程

Spring-batch学习总结(五)
学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:

代码:
Person

package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person {    private Integer id;    private String name;    private String perDesc;    private Date createTime;    private Date updateTime;    private String sex;    private Float score;    private Double price;    public Person() {        super();    }    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,            Double price) {        super();        this.id = id;        this.name = name;        this.perDesc = perDesc;        this.createTime = createTime;        this.updateTime = updateTime;        this.sex = sex;        this.score = score;        this.price = price;    }    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public Date getCreateTime() {        return createTime;    }    public String getPerDesc() {        return perDesc;    }    public void setPerDesc(String perDesc) {        this.perDesc = perDesc;    }    public void setCreateTime(Date createTime) {        this.createTime = createTime;    }    public Date getUpdateTime() {        return updateTime;    }    public void setUpdateTime(Date updateTime) {        this.updateTime = updateTime;    }    public String getSex() {        return sex;    }    public void setSex(String sex) {        this.sex = sex;    }    public Float getScore() {        return score;    }    public void setScore(Float score) {        this.score = score;    }    public Double getPrice() {        return price;    }    public void setPrice(Double price) {        this.price = price;    }    @Override    public String toString() {        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";    }}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator {    //JSON    private ObjectMapper mapper=new ObjectMapper();    @Override    public String aggregate(Person person) {        try {            return mapper.writeValueAsString(person);        } catch (JsonProcessingException e) {            throw new RuntimeException("unable to writer...",e);        }    }}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper {    /**     * rs一条结果集,rowNum代表当前行     */    @Override    public Person mapRow(ResultSet rs, int rowNum) throws SQLException {        return new Person(rs.getInt("id")                ,rs.getString("name")                ,rs.getString("per_desc")                ,rs.getDate("create_time")                ,rs.getDate("update_time")                ,rs.getString("sex")                ,rs.getFloat("score")                ,rs.getDouble("price"));    }}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication {    public static void main(String[] args) {        SpringApplication.run(ProcessorFileApplication.class, args);    }}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    private DataSource dataSource;    @Autowired    private ItemProcessor fristNameUpperCaseProcessor;    @Autowired    private ItemProcessor idFilterProcessor;    @Bean    public Job ProcessorFileOutputFromDBJob() {        return jobBuilderFactory.get("ProcessorFileOutputFromDBJob")                .start(ProcessorFileOutputFromDBStep())                .build();    }    @Bean    public Step ProcessorFileOutputFromDBStep() {        return stepBuilderFactory.get("ProcessorFileOutputFromDBStep")                .chunk(100)                .reader(ProcessorFileOutputFromItemWriter())                .processor(personDataProcessor())                .writer(ProcessorFileOutputFromItemReader())                .build();    }    @Bean    @StepScope    public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() {        JdbcPagingItemReader reader = new JdbcPagingItemReader<>();        reader.setDataSource(this.dataSource); // 设置数据源        reader.setFetchSize(100); // 设置一次最大读取条数        reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();        queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列        queryProvider.setFromClause("from person_buf"); // 设置要查询的表        Map sortKeys = new HashMap();// 定义一个集合用于存放排序列        sortKeys.put("id", Order.ASCENDING);// 按照升序排序        queryProvider.setSortKeys(sortKeys);        reader.setQueryProvider(queryProvider);// 设置排序列        return reader;    }    @Bean    public CompositeItemProcessor personDataProcessor(){        CompositeItemProcessor processor=new CompositeItemProcessor<>();        List> listProcessor=new ArrayList<>();        listProcessor.add(fristNameUpperCaseProcessor);        listProcessor.add(idFilterProcessor);        processor.setDelegates(listProcessor);        return processor;    }    @Bean    @StepScope    public FlatFileItemWriter ProcessorFileOutputFromItemReader() {        FlatFileItemWriter writer = new FlatFileItemWriter();        try {            File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile();            System.out.println("file is create in :" + path);            writer.setResource(new FileSystemResource(path));            writer.setLineAggregator(new PersonLineAggregator());            writer.afterPropertiesSet();        } catch (Exception e) {            e.printStackTrace();        }        return writer;    }}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor {    @Override    public Person process(Person item) throws Exception {        return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(),                item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice());    }}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor {    @Override    public Person process(Person item) throws Exception {        if (item.getId() % 2 == 0) {            return item;        } else {            return null;        }    }}

运行结果:

观察写入完成后的文件:

可以看出我们已经完成了我们的目标

数据 数据库 排序 代码 目标 结果 学习 查询 观察 数据处理 过程 处理 最大 业务 代表 升序 大写 奇数 字母 对象 数据库的安全要保护哪些东西 数据库安全各自的含义是什么 生产安全数据库录入 数据库的安全性及管理 数据库安全策略包含哪些 海淀数据库安全审计系统 建立农村房屋安全信息数据库 易用的数据库客户端支持安全管理 连接数据库失败ssl安全错误 数据库的锁怎样保障安全 拉萨警示教育展馆软件开发 网络安全你我同行的手抄报内容 网络安全法宣传日活动 全亚网络技术有限公司 腾讯 网络安全 招聘 绵阳工业软件开发 杰思网络安全模式 网络安全咨询实施方案 计算机网络技术初级课程 河南畅想网络技术有限公司 服务器如何建网站 软件开发商为一个人 机关网络安全事件应急措施 物联网卡使用的网络技术 服务器上行带宽 简述数据库设计的四种方法 区块链 企业网络安全 学习计算机网络技术的规划 华为软件开发工程师做什么 昌平区综合网络技术服务怎么样 我的世界服务器如何管理员 电子科技大学的网络安全专业 数据库记录二维码信息 网络安全考什么证书好 web服务器怎么防护 pg数据库用户权限都有什么 2020年网络安全工作方案信息 数据库中为何要设置主键 学生网络安全会议上讲话 宝塔面板数据库误删恢复
0