溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

Spring?Batch批處理框架操作實例分析

發(fā)布時間:2022-07-21 09:52:42 來源:億速云 閱讀:150 作者:iii 欄目:開發(fā)技術

這篇文章主要介紹“Spring Batch批處理框架操作實例分析”,在日常操作中,相信很多人在Spring Batch批處理框架操作實例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spring Batch批處理框架操作實例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

簡介

Spring Batch 是 Spring 提供的一個數(shù)據(jù)處理框架。企業(yè)域中的許多應用程序需要批量處理才能在關鍵任務環(huán)境中執(zhí)行業(yè)務操作。

這些業(yè)務運營包括:

  • 無需用戶交互即可最有效地處理大量信息的自動化,復雜處理。這些操作通常包括基于時間的事件(例如月末計算,通知或通信)。

  • 在非常大的數(shù)據(jù)集中重復處理復雜業(yè)務規(guī)則的定期應用(例如,保險利益確定或費率調(diào)整)。

  • 集成從內(nèi)部和外部系統(tǒng)接收的信息,這些信息通常需要以事務方式格式化,驗證和處理到記錄系統(tǒng)中。批處理用于每天為企業(yè)處理數(shù)十億的交易。

Spring Batch 是一個輕量級,全面的批處理框架,旨在開發(fā)對企業(yè)系統(tǒng)日常運營至關重要的強大批處理應用程序。

Spring Batch 構建了人們期望的 Spring Framework 特性(生產(chǎn)力,基于 POJO 的開發(fā)方法和一般易用性),同時使開發(fā)人員可以在必要時輕松訪問和利用更高級的企業(yè)服務。Spring Batch 不是一個 schuedling 的框架。

Spring Batch 提供了可重用的功能,這些功能對于處理大量的數(shù)據(jù)至關重要,包括記錄/跟蹤,事務管理,作業(yè)處理統(tǒng)計,作業(yè)重啟,跳過和資源管理。

它還提供更高級的技術服務和功能,通過優(yōu)化和分區(qū)技術實現(xiàn)極高容量和高性能的批處理作業(yè)。

Spring Batch 可用于兩種簡單的用例(例如將文件讀入數(shù)據(jù)庫或運行存儲過程)以及復雜的大量用例(例如在數(shù)據(jù)庫之間移動大量數(shù)據(jù),轉換它等等) 上)。大批量批處理作業(yè)可以高度可擴展的方式利用該框架來處理大量信息。

Spring Batch 架構

一個典型的批處理應用程序大致如下:

  • 從數(shù)據(jù)庫,文件或隊列中讀取大量記錄。

  • 以某種方式處理數(shù)據(jù)。

  • 以修改之后的形式寫回數(shù)據(jù)。

其對應的示意圖如下:

Spring?Batch批處理框架操作實例分析

Spring Batch 的一個總體的架構如下:

Spring?Batch批處理框架操作實例分析

在 Spring Batch 中一個 job 可以定義很多的步驟 step,在每一個 step 里面可以定義其專屬的 ItemReader 用于讀取數(shù)據(jù)。

ItemProcesseor 用于處理數(shù)據(jù),ItemWriter 用于寫數(shù)據(jù),而每一個定義的 job 則都在 JobRepository 里面,我們可以通過 JobLauncher 來啟動某一個 job。

Spring Batch 核心概念

下面是一些概念是 Spring Batch 框架中的核心概念。

什么是 Job

Job 和 Step 是 Spring Batch 執(zhí)行批處理任務最為核心的兩個概念。

其中 Job 是一個封裝整個批處理過程的一個概念。Job 在 Spring Batch 的體系當中只是一個最頂層的一個抽象概念,體現(xiàn)在代碼當中則它只是一個最上層的接口。

其代碼如下:

/**
 * Batch domain object representing a job. Job is an explicit abstraction
 * representing the configuration of a job specified by a developer. It should
 * be noted that restart policy is applied to the job as a whole and not to a
 * step.
 */
public interface Job {
 String getName();
 boolean isRestartable();
 void execute(JobExecution execution);
 JobParametersIncrementer getJobParametersIncrementer();
 JobParametersValidator getJobParametersValidator();
}

在 Job 這個接口當中定義了五個方法,它的實現(xiàn)類主要有兩種類型的 job,一個是 simplejob,另一個是 flowjob。

在 Spring Batch 當中,job 是最頂層的抽象,除 job 之外我們還有 JobInstance 以及 JobExecution 這兩個更加底層的抽象。

一個 job 是我們運行的基本單位,它內(nèi)部由 step 組成。job 本質(zhì)上可以看成 step 的一個容器。

一個 job 可以按照指定的邏輯順序組合 step,并提供了我們給所有 step 設置相同屬性的方法,例如一些事件監(jiān)聽,跳過策略。

Spring Batch 以 SimpleJob 類的形式提供了 Job 接口的默認簡單實現(xiàn),它在 Job 之上創(chuàng)建了一些標準功能。

一個使用 java config 的例子代碼如下:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

這個配置的意思是:首先給這個 job 起了一個名字叫 footballJob,接著指定了這個 job 的三個 step,他們分別由方法 playerLoad,gameLoad,playerSummarization 實現(xiàn)。

什么是 JobInstance

我們在上文已經(jīng)提到了 JobInstance,他是 Job 的更加底層的一個抽象,他的定義如下:

public interface JobInstance {
 /**
  * Get unique id for this JobInstance.
  * @return instance id
  */
 public long getInstanceId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
}

他的方法很簡單,一個是返回 Job 的 id,另一個是返回 Job 的名字。

JobInstance 指的是 job 運行當中,作業(yè)執(zhí)行過程當中的概念。Instance 本就是實例的意思。

比如說現(xiàn)在有一個批處理的 job,它的功能是在一天結束時執(zhí)行行一次。我們假定這個批處理 job 的名字為 'EndOfDay'。

在這個情況下,那么每天就會有一個邏輯意義上的 JobInstance,而我們必須記錄 job 的每次運行的情況。

什么是 JobParameters

在上文當中我們提到了,同一個 job 每天運行一次的話,那么每天都有一個 jobIntsance,但他們的 job 定義都是一樣的,那么我們怎么來區(qū)別一個 job 的不同 jobinstance 了。

不妨先做個猜想,雖然 jobinstance 的 job 定義一樣,但是他們有的東西就不一樣,例如運行時間。

Spring Batch 中提供的用來標識一個 jobinstance 的東西是:JobParameters。

JobParameters 對象包含一組用于啟動批處理作業(yè)的參數(shù),它可以在運行期間用于識別或甚至用作參考數(shù)據(jù)。我們假設的運行時間,就可以作為一個 JobParameters。

例如,我們前面的 'EndOfDay' 的 job 現(xiàn)在已經(jīng)有了兩個實例,一個產(chǎn)生于 1 月 1 日,另一個產(chǎn)生于 1 月 2 日,那么我們就可以定義兩個 JobParameter 對象:一個的參數(shù)是 01-01,, 另一個的參數(shù)是 01-02。

因此,識別一個 JobInstance 的方法可以定義為:

Spring?Batch批處理框架操作實例分析

因此,我么可以通過 Jobparameter 來操作正確的 JobInstance。

什么是 JobExecution

JobExecution 指的是單次嘗試運行一個我們定義好的 Job 的代碼層面的概念。job 的一次執(zhí)行可能以失敗也可能成功。只有當執(zhí)行成功完成時,給定的與執(zhí)行相對應的 JobInstance 才也被視為完成。

還是以前面描述的 EndOfDay 的 job 作為示例,假設第一次運行 01-01-2019 的 JobInstance 結果是失敗。

那么此時如果使用與第一次運行相同的 Jobparameter 參數(shù)(即 01-01-2019)作業(yè)參數(shù)再次運行,那么就會創(chuàng)建一個對應于之前 jobInstance 的一個新的 JobExecution 實例,JobInstance 仍然只有一個。

JobExecution 的接口定義如下:

public interface JobExecution {
 /**
  * Get unique id for this JobExecution.
  * @return execution id
  */
 public long getExecutionId();
 /**
  * Get job name.
  * @return value of 'id' attribute from <job>
  */
 public String getJobName(); 
 /**
  * Get batch status of this execution.
  * @return batch status value.
  */
 public BatchStatus getBatchStatus();
 /**
  * Get time execution entered STARTED status. 
  * @return date (time)
  */
 public Date getStartTime();
 /**
  * Get time execution entered end status: COMPLETED, STOPPED, FAILED 
  * @return date (time)
  */
 public Date getEndTime();
 /**
  * Get execution exit status.
  * @return exit status.
  */
 public String getExitStatus();
 /**
  * Get time execution was created.
  * @return date (time)
  */
 public Date getCreateTime();
 /**
  * Get time execution was last updated updated.
  * @return date (time)
  */
 public Date getLastUpdatedTime();
 /**
  * Get job parameters for this execution.
  * @return job parameters  
  */
 public Properties getJobParameters();

}

每一個方法的注釋已經(jīng)解釋的很清楚,這里不再多做解釋。只提一下 BatchStatus,JobExecution 當中提供了一個方法 getBatchStatus 用于獲取一個 job 某一次特地執(zhí)行的一個狀態(tài)。

BatchStatus 是一個代表 job 狀態(tài)的枚舉類,其定義如下:

public enum BatchStatus {STARTING, STARTED, STOPPING, 
   STOPPED, FAILED, COMPLETED, ABANDONED }

這些屬性對于一個 job 的執(zhí)行來說是非常關鍵的信息,并且 Spring Batch 會將他們持久到數(shù)據(jù)庫當中。

在使用 Spring Batch 的過程當中 Spring Batch 會自動創(chuàng)建一些表用于存儲一些 job 相關的信息,用于存儲 JobExecution 的表為 batch_job_execution。

下面是一個從數(shù)據(jù)庫當中截圖的實例:

Spring?Batch批處理框架操作實例分析

什么是 Step

每一個 Step 對象都封裝了批處理作業(yè)的一個獨立的階段。事實上,每一個 Job 本質(zhì)上都是由一個或多個步驟組成。每一個 step 包含定義和控制實際批處理所需的所有信息。

任何特定的內(nèi)容都由編寫 Job 的開發(fā)人員自行決定。一個 step 可以非常簡單也可以非常復雜。

例如,一個 step 的功能是將文件中的數(shù)據(jù)加載到數(shù)據(jù)庫中,那么基于現(xiàn)在 Spring Batch 的支持則幾乎不需要寫代碼。更復雜的 step 可能具有復雜的業(yè)務邏輯,這些邏輯作為處理的一部分。

與 Job 一樣,Step 具有與 JobExecution 類似的 StepExecution,如下圖所示:

Spring?Batch批處理框架操作實例分析

什么是 StepExecution

StepExecution 表示一次執(zhí)行 Step,每次運行一個 Step 時都會創(chuàng)建一個新的 StepExecution,類似于 JobExecution。

但是,某個步驟可能由于其之前的步驟失敗而無法執(zhí)行。且僅當 Step 實際啟動時才會創(chuàng)建 StepExecution。

一次 step 執(zhí)行的實例由 StepExecution 類的對象表示。每個 StepExecution 都包含對其相應步驟的引用以及 JobExecution 和事務相關的數(shù)據(jù),例如提交和回滾計數(shù)以及開始和結束時間。

此外,每個步驟執(zhí)行都包含一個 ExecutionContext,其中包含開發(fā)人員需要在批處理運行中保留的任何數(shù)據(jù),例如重新啟動所需的統(tǒng)計信息或狀態(tài)信息。

下面是一個從數(shù)據(jù)庫當中截圖的實例:

Spring?Batch批處理框架操作實例分析

什么是 ExecutionContext

ExecutionContext 即每一個 StepExecution 的執(zhí)行環(huán)境。它包含一系列的鍵值對。

我們可以用如下代碼獲取 ExecutionContext:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();

什么是 JobRepository

JobRepository 是一個用于將上述 job,step 等概念進行持久化的一個類。它同時給 Job 和 Step 以及下文會提到的 JobLauncher 實現(xiàn)提供 CRUD 操作。

首次啟動 Job 時,將從 repository 中獲取 JobExecution,并且在執(zhí)行批處理的過程中,StepExecution 和 JobExecution 將被存儲到 repository 當中。

@EnableBatchProcessing 注解可以為 JobRepository 提供自動配置。

什么是 JobLauncher

JobLauncher 這個接口的功能非常簡單,它是用于啟動指定了 JobParameters 的 Job,為什么這里要強調(diào)指定了 JobParameter,原因其實我們在前面已經(jīng)提到了,jobparameter 和 job 一起才能組成一次 job 的執(zhí)行。

下面是代碼實例:

public interface JobLauncher {
public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

上面 run 方法實現(xiàn)的功能是根據(jù)傳入的 job 以及 jobparamaters 從 JobRepository 獲取一個 JobExecution 并執(zhí)行 Job。

什么是 Item Reader

ItemReader 是一個讀數(shù)據(jù)的抽象,它的功能是為每一個 Step 提供數(shù)據(jù)輸入。當 ItemReader 以及讀完所有數(shù)據(jù)時,它會返回 null 來告訴后續(xù)操作數(shù)據(jù)已經(jīng)讀完。

Spring Batch 為 ItemReader 提供了非常多的有用的實現(xiàn)類,比如 JdbcPagingItemReader,JdbcCursorItemReader 等等。

ItemReader 支持的讀入的數(shù)據(jù)源也是非常豐富的,包括各種類型的數(shù)據(jù)庫,文件,數(shù)據(jù)流,等等。幾乎涵蓋了我們的所有場景。

下面是一個 JdbcPagingItemReader 的例子代碼:

@Bean
public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) {
        Map<String, Object> parameterValues = new HashMap<>();
        parameterValues.put("status", "NEW");

        return new JdbcPagingItemReaderBuilder<CustomerCredit>()
               .name("creditReader")
               .dataSource(dataSource)
               .queryProvider(queryProvider)
               .parameterValues(parameterValues)
               .rowMapper(customerCreditMapper())
               .pageSize(1000)
               .build();
}

@Bean
public SqlPagingQueryProviderFactoryBean queryProvider() {
        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        provider.setSelectClause("select id, name, credit");
        provider.setFromClause("from customer");
        provider.setWhereClause("where status=:status");
        provider.setSortKey("id");

        return provider;
}

JdbcPagingItemReader 必須指定一個 PagingQueryProvider,負責提供 SQL 查詢語句來按分頁返回數(shù)據(jù)。

下面是一個 JdbcCursorItemReader 的例子代碼:

private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName,
            String tenant) {

        JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>();
        itemReader.setDataSource(dataSource);
        itemReader.setSql("sql here");
        itemReader.setRowMapper(new RowMapper());
        return itemReader;
    }

什么是 Item Writer

既然 ItemReader 是讀數(shù)據(jù)的一個抽象,那么 ItemWriter 自然就是一個寫數(shù)據(jù)的抽象,它是為每一個 step 提供數(shù)據(jù)寫出的功能。

寫的單位是可以配置的,我們可以一次寫一條數(shù)據(jù),也可以一次寫一個 chunk 的數(shù)據(jù),關于 chunk 下文會有專門的介紹。ItemWriter 對于讀入的數(shù)據(jù)是不能做任何操作的。

Spring Batch 為 ItemWriter 也提供了非常多的有用的實現(xiàn)類,當然我們也可以去實現(xiàn)自己的 writer 功能。

什么是 Item Processor

ItemProcessor 對項目的業(yè)務邏輯處理的一個抽象,當 ItemReader 讀取到一條記錄之后,ItemWriter 還未寫入這條記錄之前,I 我們可以借助 temProcessor 提供一個處理業(yè)務邏輯的功能,并對數(shù)據(jù)進行相應操作。

如果我們在 ItemProcessor 發(fā)現(xiàn)一條數(shù)據(jù)不應該被寫入,可以通過返回 null 來表示。

ItemProcessor 和 ItemReader 以及 ItemWriter 可以非常好的結合在一起工作,他們之間的數(shù)據(jù)傳輸也非常方便。我們直接使用即可。

chunk 處理流程

Spring Batch 提供了讓我們按照 chunk 處理數(shù)據(jù)的能力,一個 chunk 的示意圖如下:

Spring?Batch批處理框架操作實例分析

它的意思就和圖示的一樣,由于我們一次batch的任務可能會有很多的數(shù)據(jù)讀寫操作,因此一條一條的處理并向數(shù)據(jù)庫提交的話效率不會很高。

因此 Spring Batch 提供了 chunk 這個概念,我們可以設定一個 chunk size,spring batch 將一條一條處理數(shù)據(jù),但不提交到數(shù)據(jù)庫,只有當處理的數(shù)據(jù)數(shù)量達到 chunk size 設定的值得時候,才一起去 commit。

java 的實例定義代碼如下:

Spring?Batch批處理框架操作實例分析

在上面這個 step 里面,chunk size 被設為了 10,當 ItemReader 讀的數(shù)據(jù)數(shù)量達到 10 的時候,這一批次的數(shù)據(jù)就一起被傳到 itemWriter,同時 transaction 被提交。

skip 策略和失敗處理

一個 batch 的 job 的 step,可能會處理非常大數(shù)量的數(shù)據(jù),難免會遇到出錯的情況,出錯的情況雖出現(xiàn)的概率較小,但是我們不得不考慮這些情況,因為我們做數(shù)據(jù)遷移最重要的是要保證數(shù)據(jù)的最終一致性。

Spring Batch 當然也考慮到了這種情況,并且為我們提供了相關的技術支持,請看如下 bean 的配置:

Spring?Batch批處理框架操作實例分析

我們需要留意這三個方法,分別是 skipLimit(),skip(),noSkip()。

skipLimit 方法的意思是我們可以設定一個我們允許的這個 step 可以跳過的異常數(shù)量,假如我們設定為 10,則當這個 step 運行時,只要出現(xiàn)的異常數(shù)目不超過 10,整個 step 都不會 fail。

注意,若不設定 skipLimit,則其默認值是 0。

skip 方法我們可以指定我們可以跳過的異常,因為有些異常的出現(xiàn),我們是可以忽略的。

noSkip 方法的意思則是指出現(xiàn)這個異常我們不想跳過,也就是從 skip 的所以 exception 當中排除這個 exception。

從上面的例子來說,也就是跳過所有除 FileNotFoundException 的 exception。

那么對于這個 step 來說,F(xiàn)ileNotFoundException 就是一個 fatal 的 exception,拋出這個 exception 的時候 step 就會直接 fail。

批處理操作指南

本部分是一些使用 Spring Batch 時的值得注意的點。

批處理原則

在構建批處理解決方案時,應考慮以下關鍵原則和注意事項:

  • 批處理體系結構通常會影響體系結構

  • 盡可能簡化并避免在單批應用程序中構建復雜的邏輯結構

  • 保持數(shù)據(jù)的處理和存儲在物理上靠得很近(換句話說,將數(shù)據(jù)保存在處理過程中)。

  • 最大限度地減少系統(tǒng)資源的使用,尤其是 I/O。在 internal memory 中執(zhí)行盡可能多的操作。

  • 查看應用程序 I/O(分析 SQL 語句)以確保避免不必要的物理 I/O。特別是,需要尋找以下四個常見缺陷:當數(shù)據(jù)可以被讀取一次并緩存或保存在工作存儲中時,讀取每個事務的數(shù)據(jù);重新讀取先前在同一事務中讀取數(shù)據(jù)的事務的數(shù)據(jù);導致不必要的表或索引掃描;未在 SQL 語句的 WHERE 子句中指定鍵值。

  • 在批處理運行中不要做兩次一樣的事情。例如,如果需要數(shù)據(jù)匯總以用于報告目的,則應該(如果可能)在最初處理數(shù)據(jù)時遞增存儲的總計,因此您的報告應用程序不必重新處理相同的數(shù)據(jù)。

  • 在批處理應用程序開始時分配足夠的內(nèi)存,以避免在此過程中進行耗時的重新分配。

  • 總是假設數(shù)據(jù)完整性最差。插入適當?shù)臋z查和記錄驗證以維護數(shù)據(jù)完整性。

  • 盡可能實施校驗和以進行內(nèi)部驗證。例如,對于一個文件里的數(shù)據(jù)應該有一個數(shù)據(jù)條數(shù)紀錄,告訴文件中的記錄總數(shù)以及關鍵字段的匯總。

  • 在具有真實數(shù)據(jù)量的類似生產(chǎn)環(huán)境中盡早計劃和執(zhí)行壓力測試。

  • 在大批量系統(tǒng)中,數(shù)據(jù)備份可能具有挑戰(zhàn)性,特別是如果系統(tǒng)以 24-7 在線的情況運行。數(shù)據(jù)庫備份通常在在線設計中得到很好的處理,但文件備份應該被視為同樣重要。如果系統(tǒng)依賴于文件,則文件備份過程不僅應該到位并記錄在案,還應定期進行測試。

如何默認不啟動 job

在使用 java config 使用 Spring Batch 的 job 時,如果不做任何配置,項目在啟動時就會默認去跑我們定義好的批處理 job。那么如何讓項目在啟動時不自動去跑 job 呢?

Spring Batch 的 job 會在項目啟動時自動 run,如果我們不想讓他在啟動時 run 的話,可以在 application.properties 中添加如下屬性:

spring.batch.job.enabled=false

在讀數(shù)據(jù)時內(nèi)存不夠

在使用 Spring Batch 做數(shù)據(jù)遷移時,發(fā)現(xiàn)在 job 啟動后,執(zhí)行到一定時間點時就卡在一個地方不動了,且 log 也不再打印,等待一段時間之后,得到如下錯誤:

Spring?Batch批處理框架操作實例分析

紅字的信息為:Resource exhaustion event:the JVM was unable to allocate memory from the heap.

翻譯過來的意思就是項目發(fā)出了一個資源耗盡的事件,告訴我們 java 虛擬機無法再為堆分配內(nèi)存。

造成這個錯誤的原因是:這個項目里的 batch job 的 reader 是一次性拿回了數(shù)據(jù)庫里的所有數(shù)據(jù),并沒有進行分頁,當這個數(shù)據(jù)量太大時,就會導致內(nèi)存不夠用。

解決的辦法有兩個:

  • 調(diào)整 reader 讀數(shù)據(jù)邏輯,按分頁讀取,但實現(xiàn)上會麻煩一些,且運行效率會下降

  • 增大 service 內(nèi)存

到此,關于“Spring Batch批處理框架操作實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。

AI