溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

Spring-batch的文件footer處理是怎樣的

發(fā)布時(shí)間:2021-10-19 18:27:53 來(lái)源:億速云 閱讀:203 作者:柒染 欄目:大數(shù)據(jù)

這篇文章給大家介紹Spring-batch的文件footer處理是怎樣的,內(nèi)容非常詳細(xì),感興趣的小伙伴們可以參考借鑒,希望對(duì)大家能有所幫助。

Spring-batch對(duì)文件處理時(shí),可以是:
1)單純body文件數(shù)據(jù)形式;
2)header+body文件數(shù)據(jù)形式。
但是當(dāng)文件數(shù)據(jù)是header+body+footer的場(chǎng)合,對(duì)于footer的處理則沒(méi)有很好的方式。
重寫(xiě)FileItemReader類(lèi)實(shí)現(xiàn)對(duì)于footer的Callback處理。(類(lèi)似skippedLinesCallback)

1)FileItemReader
2)FileReadFooterHandler
3)job.xml

package l.c.w;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ReaderNotOpenException;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.batch.item.file.DefaultBufferedReaderFactory;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.FlatFileParseException;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.NonTransientFlatFileException;
import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
import org.springframework.batch.item.file.separator.RecordSeparatorPolicy;
import org.springframework.batch.item.file.separator.SimpleRecordSeparatorPolicy;
import org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.StringUtils;

/**
 * Restartable {@link ItemReader} that reads lines from input {@link #setResource(Resource)}. Line is defined by the
 * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} and mapped to item using {@link #setLineMapper(LineMapper)}.
 * If an exception is thrown during line mapping it is rethrown as {@link FlatFileParseException} adding information
 * about the problematic line and its line number.
 *
 * @author Robert Kasanicky
 * @author wcl
 */
public class FileItemReader<T> extends AbstractItemCountingItemStreamItemReader<T> implements
        ResourceAwareItemReaderItemStream<T>, InitializingBean {

    private static final Log logger = LogFactory.getLog(FileItemReader.class);

    // default encoding for input files
    public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();

    private RecordSeparatorPolicy recordSeparatorPolicy = new SimpleRecordSeparatorPolicy();

    private Resource resource;

    private BufferedReader reader;

    private int lineCount = 0;

    private String[] comments = new String[] { "#" };

    private boolean noInput = false;

    private String encoding = DEFAULT_CHARSET;

    private LineMapper<T> lineMapper;

    private int linesToSkip = 0;

    private LineCallbackHandler skippedLinesCallback;

    private boolean strict = true;

    private BufferedReaderFactory bufferedReaderFactory = new DefaultBufferedReaderFactory();

    private FileReadFooterHandler fileReadFooterHandler;

    private List<String> footerLines = null;

    public FileItemReader() {
        setName(ClassUtils.getShortName(FlatFileItemReader.class));
    }

    /**
     * In strict mode the reader will throw an exception on
     * {@link #open(org.springframework.batch.item.ExecutionContext)} if the input resource does not exist.
     * @param strict <code>true</code> by default
     */
    public void setStrict(boolean strict) {
        this.strict = strict;
    }

    /**
     * @param skippedLinesCallback will be called for each one of the initial skipped lines before any items are read.
     */
    public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) {
        this.skippedLinesCallback = skippedLinesCallback;
    }

    /**
     * Public setter for the number of lines to skip at the start of a file. Can be used if the file contains a header
     * without useful (column name) information, and without a comment delimiter at the beginning of the lines.
     *
     * @param linesToSkip the number of lines to skip
     */
    public void setLinesToSkip(int linesToSkip) {
        this.linesToSkip = linesToSkip;
    }

    /**
     * Setter for line mapper. This property is required to be set.
     * @param lineMapper maps line to item
     */
    public void setLineMapper(LineMapper<T> lineMapper) {
        this.lineMapper = lineMapper;
    }

    /**
     * Setter for the encoding for this input source. Default value is {@link #DEFAULT_CHARSET}.
     *
     * @param encoding a properties object which possibly contains the encoding for this input file;
     */
    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }

    /**
     * Factory for the {@link BufferedReader} that will be used to extract lines from the file. The default is fine for
     * plain text files, but this is a useful strategy for binary files where the standard BufferedReaader from java.io
     * is limiting.
     *
     * @param bufferedReaderFactory the bufferedReaderFactory to set
     */
    public void setBufferedReaderFactory(BufferedReaderFactory bufferedReaderFactory) {
        this.bufferedReaderFactory = bufferedReaderFactory;
    }

    /**
     * Setter for comment prefixes. Can be used to ignore header lines as well by using e.g. the first couple of column
     * names as a prefix.
     *
     * @param comments an array of comment line prefixes.
     */
    public void setComments(String[] comments) {
        this.comments = new String[comments.length];
        System.arraycopy(comments, 0, this.comments, 0, comments.length);
    }

    /**
     * Public setter for the input resource.
     */
    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }

    /**
     * Public setter for the recordSeparatorPolicy. Used to determine where the line endings are and do things like
     * continue over a line ending if inside a quoted string.
     *
     * @param recordSeparatorPolicy the recordSeparatorPolicy to set
     */
    public void setRecordSeparatorPolicy(RecordSeparatorPolicy recordSeparatorPolicy) {
        this.recordSeparatorPolicy = recordSeparatorPolicy;
    }

    /**
     * Public setter for the fileReadFooterHandler.
     *
     * @param fileReadFooterHandler the fileReadFooterHandler to set
     */
    public void setFileReadFooterHandler(FileReadFooterHandler fileReadFooterHandler) {
        this.fileReadFooterHandler = fileReadFooterHandler;
    }

    /**
     * @return string corresponding to logical record according to
     * {@link #setRecordSeparatorPolicy(RecordSeparatorPolicy)} (might span multiple lines in file).
     */
    @Override
    protected T doRead() throws Exception {
        if (noInput) {
            return null;
        }

        String line = readLine();

        if (line == null) {
            return null;
        }
        else {
            try {
                return lineMapper.mapLine(line, lineCount);
            }
            catch (Exception ex) {
                throw new FlatFileParseException("Parsing error at line: " + lineCount + " in resource=["
                        + resource.getDescription() + "], input=[" + line + "]", ex, line, lineCount);
            }
        }
    }

    /**
     * @return next line (skip comments).getCurrentResource
     */
    private String readLine() {

        if (reader == null) {
            throw new ReaderNotOpenException("Reader must be open before it can be read.");
        }

        String line = null;

        try {
            line = this.reader.readLine();
            if (line == null) {
                return null;
            }
            lineCount++;
            while (isComment(line) || isFooter(line)) {
                line = reader.readLine();
                if (line == null) {
                    return null;
                }
                lineCount++;
            }

            line = applyRecordSeparatorPolicy(line);
        }
        catch (IOException e) {
            // Prevent IOException from recurring indefinitely
            // if client keeps catching and re-calling
            noInput = true;
            throw new NonTransientFlatFileException("Unable to read from resource: [" + resource + "]", e, line,
                    lineCount);
        }
        return line;
    }

    private boolean isComment(String line) {
        for (String prefix : comments) {
            if (line.startsWith(prefix)) {
                return true;
            }
        }
        return false;
    }

    private boolean isFooter(String line) {
        if (footerLines == null) {
            footerLines = fileReadFooterHandler.footerLines();
        }
        for (String footer : footerLines) {
            if (line.equals(footer)) {
                return true;
            }
        }
        return false;
    }

    @Override
    protected void doClose() throws Exception {
        lineCount = 0;
        if (reader != null) {
            reader.close();
        }
    }

    @Override
    protected void doOpen() throws Exception {
        Assert.notNull(resource, "Input resource must be set");
        Assert.notNull(recordSeparatorPolicy, "RecordSeparatorPolicy must be set");

        noInput = true;
        if (!resource.exists()) {
            if (strict) {
                throw new IllegalStateException("Input resource must exist (reader is in 'strict' mode): " + resource);
            }
            logger.warn("Input resource does not exist " + resource.getDescription());
            return;
        }

        if (!resource.isReadable()) {
            if (strict) {
                throw new IllegalStateException("Input resource must be readable (reader is in 'strict' mode): "
                        + resource);
            }
            logger.warn("Input resource is not readable " + resource.getDescription());
            return;
        }

        reader = bufferedReaderFactory.create(resource, encoding);
        for (int i = 0; i < linesToSkip; i++) {
            String line = readLine();
            if (skippedLinesCallback != null) {
                skippedLinesCallback.handleLine(line);
            }
        }
        noInput = false;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        Assert.notNull(lineMapper, "LineMapper is required");
    }

    @Override
    protected void jumpToItem(int itemIndex) throws Exception {
        for (int i = 0; i < itemIndex; i++) {
            readLine();
        }
    }

    private String applyRecordSeparatorPolicy(String line) throws IOException {

        String record = line;
        while (line != null && !recordSeparatorPolicy.isEndOfRecord(record)) {
            line = this.reader.readLine();
            if (line == null) {
                if (StringUtils.hasText(record)) {
                    // A record was partially complete since it hasn't ended but
                    // the line is null
                    throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
                }
                else {
                    // Record has no text but it might still be post processed
                    // to something (skipping preProcess since that was already
                    // done)
                    break;
                }
            }
            else {
                lineCount++;
            }
            record = recordSeparatorPolicy.preProcess(record) + line;
        }

        return recordSeparatorPolicy.postProcess(record);

    }

}
package l.c.w;

import java.io.RandomAccessFile;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;

import l.c.w.common.contants.JobConstants;
import l.c.w.common.utils.StringUtil;

import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.annotation.BeforeStep;

/**
 *
 * footer line skip.
 *
 * @author wcl
 *
 */
public class FileReadFooterHandler {
    // default encoding for input files
    public static final String DEFAULT_CHARSET = Charset.defaultCharset().name();

    private StepExecution stepExecution;

    private String charset = DEFAULT_CHARSET;

    private int lines = 0;

    @BeforeStep
    public void beforeStep(StepExecution stepExecution) throws Exception {
        this.stepExecution = stepExecution;

        // resource
        String fileName = this.stepExecution.getJobExecution().getJobParameters().
                getString("inFile");

        List<String> footers = new ArrayList<String>();
        setFooterLines(footers, fileName);

        this.stepExecution.getExecutionContext().put("footer_line_list", footers);

    }

    @SuppressWarnings("unchecked")
    public List<String> footerLines() {
        List<String> footers = (List<String>) this.stepExecution.getExecutionContext().get(
                "footer_line_list");

        return footers;

    }

    /**
     * footer line data get.
     *
     * @param footers
     * @throws Exception
     */
    private void setFooterLines(List<String> footers, String fileName) throws Exception {
        RandomAccessFile raf = null;
        try {
            raf = new RandomAccessFile(fileName, "r");
            long len = raf.length();
            if (len == 0L) {
                return;
            }
            long pos = len - 1;
            while (pos > 0 && lines > 0) {
                pos--;
                raf.seek(pos);
                if (raf.read() == '\n' || raf.read() == '\r') {
                    String line = raf.readLine();
                    if (StringUtil.isNotEmpty(line)) {
                        footers.add(0, new String(line.getBytes("ISO-8859-1"), charset));
                        lines--;
                    }
                }
            }
        } finally {
            if (raf != null) {
                raf.close();
            }
        }
    }

    /**
     * Setter for the charset for this input source. Default value is {@link #DEFAULT_CHARSET}.
     *
     * @param encoding a properties object which possibly contains the encoding for this input file;
     */
    public void setCharset(String charset) {
        this.charset = charset;
    }

    /**
     * Public setter for footer line.
     */
    public void setLines(int lines) {
        this.lines = lines;
    }

}
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:batch="http://www.springframework.org/schema/batch" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:c="http://www.springframework.org/schema/c" xmlns:mybatis="http://mybatis.org/schema/mybatis-spring"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
             http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
             http://mybatis.org/schema/mybatis-spring http://mybatis.org/schema/mybatis-spring.xsd">

    <import resource="classpath:META-INF/spring/job-base-context.xml" />

    <!-- file read -->
    <bean id="reader" class="l.c.w.FileItemReader"
        scope="step"
        p:resource="file:#{jobParameters['inFile']}" p:encoding="UTF-8"
        p:strict="true"
        p:linesToSkip="2"
        p:skippedLinesCallback-ref="fileReadHeaderHandler"
        p:fileReadFooterHandler-ref="fileReadFooterHandler">
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
                <property name="lineTokenizer">
                    <bean
                        class="org.terasoluna.batch.item.file.transform.FixedByteLengthLineTokenizer"
                        p:names="aa,bb,cc,dd,ee" c:ranges="1-2, 3-5, 6-9, 10-14, 15-20"
                        c:charset="UTF-8" />
                </property>
                <property name="fieldSetMapper">
                    <bean
                        class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper"
                        p:targetType="l.c.w.dto.demo.Demo3SrcData" />
                </property>
            </bean>
        </property>
    </bean>

    <bean id="fileReadFooterHandler" class="l.c.w.FileReadFooterHandler"
        p:charset="UTF-8"
        p:lines="2">
    </bean>

    <!-- file write -->
    <bean id="writer" class="org.springframework.batch.item.file.FlatFileItemWriter"
        scope="step"
        p:resource="file:#{jobParameters['outFile']}" p:encoding="UTF-8"
        p:lineSeparator="&#x0A;"
        p:appendAllowed="false"
        p:shouldDeleteIfExists="true"
        p:shouldDeleteIfEmpty="false"
        p:transactional="true"
        p:headerCallback-ref="fileWriteHeaderHandler"
        p:footerCallback-ref="fileWriteFooterHandler">
        <property name="lineAggregator">
            <bean
                class="org.springframework.batch.item.file.transform.FormatterLineAggregator"
                p:format="%2s%3s%4s%5s%6s">
                <property name="fieldExtractor">
                    <bean
                        class="org.springframework.batch.item.file.transform.BeanWrapperFieldExtractor"
                        p:names="aa,bb,cc,dd,ee " />
                </property>
            </bean>
        </property>
    </bean>

    <!-- job -->
    <batch:job id="demo3" job-repository="jobRepository">
        <batch:step id="demo3.step01">
            <batch:tasklet transaction-manager="transactionManager">
                <batch:chunk reader="reader" processor="demo3Processor"
                    writer="writer" commit-interval="500" skip-limit="5000">
                    <!-- step -->
                    <batch:skippable-exception-classes>
                        <batch:include class="java.lang.Exception" />
                    </batch:skippable-exception-classes>
                    <!-- listener -->
                    <batch:listeners>
                        <batch:listener ref="tableItemWriteListener" />
                        <batch:listener ref="tableSkipListener" />
                        <batch:listener ref="fileReadHeaderHandler" />
                        <batch:listener ref="fileReadFooterHandler" />
                        <batch:listener ref="fileWriteHeaderHandler" />
                        <batch:listener ref="fileWriteFooterHandler" />
                    </batch:listeners>
                </batch:chunk>
            </batch:tasklet>
        </batch:step>
        <batch:listeners>
            <batch:listener ref="jobExcuteListener" />
        </batch:listeners>
    </batch:job>
</beans>

關(guān)于Spring-batch的文件footer處理是怎樣的就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,可以學(xué)到更多知識(shí)。如果覺(jué)得文章不錯(cuò),可以把它分享出去讓更多的人看到。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀(guān)點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI