您好,登錄后才能下訂單哦!
這篇文章主要介紹“Spring Batch批處理框架操作實例分析”,在日常操作中,相信很多人在Spring Batch批處理框架操作實例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spring Batch批處理框架操作實例分析”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!
Spring Batch 是 Spring 提供的一個數(shù)據(jù)處理框架。企業(yè)域中的許多應用程序需要批量處理才能在關鍵任務環(huán)境中執(zhí)行業(yè)務操作。
這些業(yè)務運營包括:
無需用戶交互即可最有效地處理大量信息的自動化,復雜處理。這些操作通常包括基于時間的事件(例如月末計算,通知或通信)。
在非常大的數(shù)據(jù)集中重復處理復雜業(yè)務規(guī)則的定期應用(例如,保險利益確定或費率調(diào)整)。
集成從內(nèi)部和外部系統(tǒng)接收的信息,這些信息通常需要以事務方式格式化,驗證和處理到記錄系統(tǒng)中。批處理用于每天為企業(yè)處理數(shù)十億的交易。
Spring Batch 是一個輕量級,全面的批處理框架,旨在開發(fā)對企業(yè)系統(tǒng)日常運營至關重要的強大批處理應用程序。
Spring Batch 構建了人們期望的 Spring Framework 特性(生產(chǎn)力,基于 POJO 的開發(fā)方法和一般易用性),同時使開發(fā)人員可以在必要時輕松訪問和利用更高級的企業(yè)服務。Spring Batch 不是一個 schuedling 的框架。
Spring Batch 提供了可重用的功能,這些功能對于處理大量的數(shù)據(jù)至關重要,包括記錄/跟蹤,事務管理,作業(yè)處理統(tǒng)計,作業(yè)重啟,跳過和資源管理。
它還提供更高級的技術服務和功能,通過優(yōu)化和分區(qū)技術實現(xiàn)極高容量和高性能的批處理作業(yè)。
Spring Batch 可用于兩種簡單的用例(例如將文件讀入數(shù)據(jù)庫或運行存儲過程)以及復雜的大量用例(例如在數(shù)據(jù)庫之間移動大量數(shù)據(jù),轉換它等等) 上)。大批量批處理作業(yè)可以高度可擴展的方式利用該框架來處理大量信息。
一個典型的批處理應用程序大致如下:
從數(shù)據(jù)庫,文件或隊列中讀取大量記錄。
以某種方式處理數(shù)據(jù)。
以修改之后的形式寫回數(shù)據(jù)。
其對應的示意圖如下:
Spring Batch 的一個總體的架構如下:
在 Spring Batch 中一個 job 可以定義很多的步驟 step,在每一個 step 里面可以定義其專屬的 ItemReader 用于讀取數(shù)據(jù)。
ItemProcesseor 用于處理數(shù)據(jù),ItemWriter 用于寫數(shù)據(jù),而每一個定義的 job 則都在 JobRepository 里面,我們可以通過 JobLauncher 來啟動某一個 job。
下面是一些概念是 Spring Batch 框架中的核心概念。
Job 和 Step 是 Spring Batch 執(zhí)行批處理任務最為核心的兩個概念。
其中 Job 是一個封裝整個批處理過程的一個概念。Job 在 Spring Batch 的體系當中只是一個最頂層的一個抽象概念,體現(xiàn)在代碼當中則它只是一個最上層的接口。
其代碼如下:
/** * Batch domain object representing a job. Job is an explicit abstraction * representing the configuration of a job specified by a developer. It should * be noted that restart policy is applied to the job as a whole and not to a * step. */ public interface Job { String getName(); boolean isRestartable(); void execute(JobExecution execution); JobParametersIncrementer getJobParametersIncrementer(); JobParametersValidator getJobParametersValidator(); }
在 Job 這個接口當中定義了五個方法,它的實現(xiàn)類主要有兩種類型的 job,一個是 simplejob,另一個是 flowjob。
在 Spring Batch 當中,job 是最頂層的抽象,除 job 之外我們還有 JobInstance 以及 JobExecution 這兩個更加底層的抽象。
一個 job 是我們運行的基本單位,它內(nèi)部由 step 組成。job 本質(zhì)上可以看成 step 的一個容器。
一個 job 可以按照指定的邏輯順序組合 step,并提供了我們給所有 step 設置相同屬性的方法,例如一些事件監(jiān)聽,跳過策略。
Spring Batch 以 SimpleJob 類的形式提供了 Job 接口的默認簡單實現(xiàn),它在 Job 之上創(chuàng)建了一些標準功能。
一個使用 java config 的例子代碼如下:
@Bean public Job footballJob() { return this.jobBuilderFactory.get("footballJob") .start(playerLoad()) .next(gameLoad()) .next(playerSummarization()) .end() .build(); }
這個配置的意思是:首先給這個 job 起了一個名字叫 footballJob,接著指定了這個 job 的三個 step,他們分別由方法 playerLoad,gameLoad,playerSummarization 實現(xiàn)。
我們在上文已經(jīng)提到了 JobInstance,他是 Job 的更加底層的一個抽象,他的定義如下:
public interface JobInstance { /** * Get unique id for this JobInstance. * @return instance id */ public long getInstanceId(); /** * Get job name. * @return value of 'id' attribute from <job> */ public String getJobName(); }
他的方法很簡單,一個是返回 Job 的 id,另一個是返回 Job 的名字。
JobInstance 指的是 job 運行當中,作業(yè)執(zhí)行過程當中的概念。Instance 本就是實例的意思。
比如說現(xiàn)在有一個批處理的 job,它的功能是在一天結束時執(zhí)行行一次。我們假定這個批處理 job 的名字為 'EndOfDay'。
在這個情況下,那么每天就會有一個邏輯意義上的 JobInstance,而我們必須記錄 job 的每次運行的情況。
在上文當中我們提到了,同一個 job 每天運行一次的話,那么每天都有一個 jobIntsance,但他們的 job 定義都是一樣的,那么我們怎么來區(qū)別一個 job 的不同 jobinstance 了。
不妨先做個猜想,雖然 jobinstance 的 job 定義一樣,但是他們有的東西就不一樣,例如運行時間。
Spring Batch 中提供的用來標識一個 jobinstance 的東西是:JobParameters。
JobParameters 對象包含一組用于啟動批處理作業(yè)的參數(shù),它可以在運行期間用于識別或甚至用作參考數(shù)據(jù)。我們假設的運行時間,就可以作為一個 JobParameters。
例如,我們前面的 'EndOfDay' 的 job 現(xiàn)在已經(jīng)有了兩個實例,一個產(chǎn)生于 1 月 1 日,另一個產(chǎn)生于 1 月 2 日,那么我們就可以定義兩個 JobParameter 對象:一個的參數(shù)是 01-01,, 另一個的參數(shù)是 01-02。
因此,識別一個 JobInstance 的方法可以定義為:
因此,我么可以通過 Jobparameter 來操作正確的 JobInstance。
JobExecution 指的是單次嘗試運行一個我們定義好的 Job 的代碼層面的概念。job 的一次執(zhí)行可能以失敗也可能成功。只有當執(zhí)行成功完成時,給定的與執(zhí)行相對應的 JobInstance 才也被視為完成。
還是以前面描述的 EndOfDay 的 job 作為示例,假設第一次運行 01-01-2019 的 JobInstance 結果是失敗。
那么此時如果使用與第一次運行相同的 Jobparameter 參數(shù)(即 01-01-2019)作業(yè)參數(shù)再次運行,那么就會創(chuàng)建一個對應于之前 jobInstance 的一個新的 JobExecution 實例,JobInstance 仍然只有一個。
JobExecution 的接口定義如下:
public interface JobExecution { /** * Get unique id for this JobExecution. * @return execution id */ public long getExecutionId(); /** * Get job name. * @return value of 'id' attribute from <job> */ public String getJobName(); /** * Get batch status of this execution. * @return batch status value. */ public BatchStatus getBatchStatus(); /** * Get time execution entered STARTED status. * @return date (time) */ public Date getStartTime(); /** * Get time execution entered end status: COMPLETED, STOPPED, FAILED * @return date (time) */ public Date getEndTime(); /** * Get execution exit status. * @return exit status. */ public String getExitStatus(); /** * Get time execution was created. * @return date (time) */ public Date getCreateTime(); /** * Get time execution was last updated updated. * @return date (time) */ public Date getLastUpdatedTime(); /** * Get job parameters for this execution. * @return job parameters */ public Properties getJobParameters(); }
每一個方法的注釋已經(jīng)解釋的很清楚,這里不再多做解釋。只提一下 BatchStatus,JobExecution 當中提供了一個方法 getBatchStatus 用于獲取一個 job 某一次特地執(zhí)行的一個狀態(tài)。
BatchStatus 是一個代表 job 狀態(tài)的枚舉類,其定義如下:
public enum BatchStatus {STARTING, STARTED, STOPPING, STOPPED, FAILED, COMPLETED, ABANDONED }
這些屬性對于一個 job 的執(zhí)行來說是非常關鍵的信息,并且 Spring Batch 會將他們持久到數(shù)據(jù)庫當中。
在使用 Spring Batch 的過程當中 Spring Batch 會自動創(chuàng)建一些表用于存儲一些 job 相關的信息,用于存儲 JobExecution 的表為 batch_job_execution。
下面是一個從數(shù)據(jù)庫當中截圖的實例:
每一個 Step 對象都封裝了批處理作業(yè)的一個獨立的階段。事實上,每一個 Job 本質(zhì)上都是由一個或多個步驟組成。每一個 step 包含定義和控制實際批處理所需的所有信息。
任何特定的內(nèi)容都由編寫 Job 的開發(fā)人員自行決定。一個 step 可以非常簡單也可以非常復雜。
例如,一個 step 的功能是將文件中的數(shù)據(jù)加載到數(shù)據(jù)庫中,那么基于現(xiàn)在 Spring Batch 的支持則幾乎不需要寫代碼。更復雜的 step 可能具有復雜的業(yè)務邏輯,這些邏輯作為處理的一部分。
與 Job 一樣,Step 具有與 JobExecution 類似的 StepExecution,如下圖所示:
StepExecution 表示一次執(zhí)行 Step,每次運行一個 Step 時都會創(chuàng)建一個新的 StepExecution,類似于 JobExecution。
但是,某個步驟可能由于其之前的步驟失敗而無法執(zhí)行。且僅當 Step 實際啟動時才會創(chuàng)建 StepExecution。
一次 step 執(zhí)行的實例由 StepExecution 類的對象表示。每個 StepExecution 都包含對其相應步驟的引用以及 JobExecution 和事務相關的數(shù)據(jù),例如提交和回滾計數(shù)以及開始和結束時間。
此外,每個步驟執(zhí)行都包含一個 ExecutionContext,其中包含開發(fā)人員需要在批處理運行中保留的任何數(shù)據(jù),例如重新啟動所需的統(tǒng)計信息或狀態(tài)信息。
下面是一個從數(shù)據(jù)庫當中截圖的實例:
ExecutionContext 即每一個 StepExecution 的執(zhí)行環(huán)境。它包含一系列的鍵值對。
我們可以用如下代碼獲取 ExecutionContext:
ExecutionContext ecStep = stepExecution.getExecutionContext(); ExecutionContext ecJob = jobExecution.getExecutionContext();
JobRepository 是一個用于將上述 job,step 等概念進行持久化的一個類。它同時給 Job 和 Step 以及下文會提到的 JobLauncher 實現(xiàn)提供 CRUD 操作。
首次啟動 Job 時,將從 repository 中獲取 JobExecution,并且在執(zhí)行批處理的過程中,StepExecution 和 JobExecution 將被存儲到 repository 當中。
@EnableBatchProcessing 注解可以為 JobRepository 提供自動配置。
JobLauncher 這個接口的功能非常簡單,它是用于啟動指定了 JobParameters 的 Job,為什么這里要強調(diào)指定了 JobParameter,原因其實我們在前面已經(jīng)提到了,jobparameter 和 job 一起才能組成一次 job 的執(zhí)行。
下面是代碼實例:
public interface JobLauncher { public JobExecution run(Job job, JobParameters jobParameters) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException; }
上面 run 方法實現(xiàn)的功能是根據(jù)傳入的 job 以及 jobparamaters 從 JobRepository 獲取一個 JobExecution 并執(zhí)行 Job。
ItemReader 是一個讀數(shù)據(jù)的抽象,它的功能是為每一個 Step 提供數(shù)據(jù)輸入。當 ItemReader 以及讀完所有數(shù)據(jù)時,它會返回 null 來告訴后續(xù)操作數(shù)據(jù)已經(jīng)讀完。
Spring Batch 為 ItemReader 提供了非常多的有用的實現(xiàn)類,比如 JdbcPagingItemReader,JdbcCursorItemReader 等等。
ItemReader 支持的讀入的數(shù)據(jù)源也是非常豐富的,包括各種類型的數(shù)據(jù)庫,文件,數(shù)據(jù)流,等等。幾乎涵蓋了我們的所有場景。
下面是一個 JdbcPagingItemReader 的例子代碼:
@Bean public JdbcPagingItemReader itemReader(DataSource dataSource, PagingQueryProvider queryProvider) { Map<String, Object> parameterValues = new HashMap<>(); parameterValues.put("status", "NEW"); return new JdbcPagingItemReaderBuilder<CustomerCredit>() .name("creditReader") .dataSource(dataSource) .queryProvider(queryProvider) .parameterValues(parameterValues) .rowMapper(customerCreditMapper()) .pageSize(1000) .build(); } @Bean public SqlPagingQueryProviderFactoryBean queryProvider() { SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean(); provider.setSelectClause("select id, name, credit"); provider.setFromClause("from customer"); provider.setWhereClause("where status=:status"); provider.setSortKey("id"); return provider; }
JdbcPagingItemReader 必須指定一個 PagingQueryProvider,負責提供 SQL 查詢語句來按分頁返回數(shù)據(jù)。
下面是一個 JdbcCursorItemReader 的例子代碼:
private JdbcCursorItemReader<Map<String, Object>> buildItemReader(final DataSource dataSource, String tableName, String tenant) { JdbcCursorItemReader<Map<String, Object>> itemReader = new JdbcCursorItemReader<>(); itemReader.setDataSource(dataSource); itemReader.setSql("sql here"); itemReader.setRowMapper(new RowMapper()); return itemReader; }
既然 ItemReader 是讀數(shù)據(jù)的一個抽象,那么 ItemWriter 自然就是一個寫數(shù)據(jù)的抽象,它是為每一個 step 提供數(shù)據(jù)寫出的功能。
寫的單位是可以配置的,我們可以一次寫一條數(shù)據(jù),也可以一次寫一個 chunk 的數(shù)據(jù),關于 chunk 下文會有專門的介紹。ItemWriter 對于讀入的數(shù)據(jù)是不能做任何操作的。
Spring Batch 為 ItemWriter 也提供了非常多的有用的實現(xiàn)類,當然我們也可以去實現(xiàn)自己的 writer 功能。
ItemProcessor 對項目的業(yè)務邏輯處理的一個抽象,當 ItemReader 讀取到一條記錄之后,ItemWriter 還未寫入這條記錄之前,I 我們可以借助 temProcessor 提供一個處理業(yè)務邏輯的功能,并對數(shù)據(jù)進行相應操作。
如果我們在 ItemProcessor 發(fā)現(xiàn)一條數(shù)據(jù)不應該被寫入,可以通過返回 null 來表示。
ItemProcessor 和 ItemReader 以及 ItemWriter 可以非常好的結合在一起工作,他們之間的數(shù)據(jù)傳輸也非常方便。我們直接使用即可。
Spring Batch 提供了讓我們按照 chunk 處理數(shù)據(jù)的能力,一個 chunk 的示意圖如下:
它的意思就和圖示的一樣,由于我們一次batch的任務可能會有很多的數(shù)據(jù)讀寫操作,因此一條一條的處理并向數(shù)據(jù)庫提交的話效率不會很高。
因此 Spring Batch 提供了 chunk 這個概念,我們可以設定一個 chunk size,spring batch 將一條一條處理數(shù)據(jù),但不提交到數(shù)據(jù)庫,只有當處理的數(shù)據(jù)數(shù)量達到 chunk size 設定的值得時候,才一起去 commit。
java 的實例定義代碼如下:
在上面這個 step 里面,chunk size 被設為了 10,當 ItemReader 讀的數(shù)據(jù)數(shù)量達到 10 的時候,這一批次的數(shù)據(jù)就一起被傳到 itemWriter,同時 transaction 被提交。
一個 batch 的 job 的 step,可能會處理非常大數(shù)量的數(shù)據(jù),難免會遇到出錯的情況,出錯的情況雖出現(xiàn)的概率較小,但是我們不得不考慮這些情況,因為我們做數(shù)據(jù)遷移最重要的是要保證數(shù)據(jù)的最終一致性。
Spring Batch 當然也考慮到了這種情況,并且為我們提供了相關的技術支持,請看如下 bean 的配置:
我們需要留意這三個方法,分別是 skipLimit(),skip(),noSkip()。
skipLimit 方法的意思是我們可以設定一個我們允許的這個 step 可以跳過的異常數(shù)量,假如我們設定為 10,則當這個 step 運行時,只要出現(xiàn)的異常數(shù)目不超過 10,整個 step 都不會 fail。
注意,若不設定 skipLimit,則其默認值是 0。
skip 方法我們可以指定我們可以跳過的異常,因為有些異常的出現(xiàn),我們是可以忽略的。
noSkip 方法的意思則是指出現(xiàn)這個異常我們不想跳過,也就是從 skip 的所以 exception 當中排除這個 exception。
從上面的例子來說,也就是跳過所有除 FileNotFoundException 的 exception。
那么對于這個 step 來說,F(xiàn)ileNotFoundException 就是一個 fatal 的 exception,拋出這個 exception 的時候 step 就會直接 fail。
本部分是一些使用 Spring Batch 時的值得注意的點。
在構建批處理解決方案時,應考慮以下關鍵原則和注意事項:
批處理體系結構通常會影響體系結構
盡可能簡化并避免在單批應用程序中構建復雜的邏輯結構
保持數(shù)據(jù)的處理和存儲在物理上靠得很近(換句話說,將數(shù)據(jù)保存在處理過程中)。
最大限度地減少系統(tǒng)資源的使用,尤其是 I/O。在 internal memory 中執(zhí)行盡可能多的操作。
查看應用程序 I/O(分析 SQL 語句)以確保避免不必要的物理 I/O。特別是,需要尋找以下四個常見缺陷:當數(shù)據(jù)可以被讀取一次并緩存或保存在工作存儲中時,讀取每個事務的數(shù)據(jù);重新讀取先前在同一事務中讀取數(shù)據(jù)的事務的數(shù)據(jù);導致不必要的表或索引掃描;未在 SQL 語句的 WHERE 子句中指定鍵值。
在批處理運行中不要做兩次一樣的事情。例如,如果需要數(shù)據(jù)匯總以用于報告目的,則應該(如果可能)在最初處理數(shù)據(jù)時遞增存儲的總計,因此您的報告應用程序不必重新處理相同的數(shù)據(jù)。
在批處理應用程序開始時分配足夠的內(nèi)存,以避免在此過程中進行耗時的重新分配。
總是假設數(shù)據(jù)完整性最差。插入適當?shù)臋z查和記錄驗證以維護數(shù)據(jù)完整性。
盡可能實施校驗和以進行內(nèi)部驗證。例如,對于一個文件里的數(shù)據(jù)應該有一個數(shù)據(jù)條數(shù)紀錄,告訴文件中的記錄總數(shù)以及關鍵字段的匯總。
在具有真實數(shù)據(jù)量的類似生產(chǎn)環(huán)境中盡早計劃和執(zhí)行壓力測試。
在大批量系統(tǒng)中,數(shù)據(jù)備份可能具有挑戰(zhàn)性,特別是如果系統(tǒng)以 24-7 在線的情況運行。數(shù)據(jù)庫備份通常在在線設計中得到很好的處理,但文件備份應該被視為同樣重要。如果系統(tǒng)依賴于文件,則文件備份過程不僅應該到位并記錄在案,還應定期進行測試。
在使用 java config 使用 Spring Batch 的 job 時,如果不做任何配置,項目在啟動時就會默認去跑我們定義好的批處理 job。那么如何讓項目在啟動時不自動去跑 job 呢?
Spring Batch 的 job 會在項目啟動時自動 run,如果我們不想讓他在啟動時 run 的話,可以在 application.properties 中添加如下屬性:
spring.batch.job.enabled=false
在使用 Spring Batch 做數(shù)據(jù)遷移時,發(fā)現(xiàn)在 job 啟動后,執(zhí)行到一定時間點時就卡在一個地方不動了,且 log 也不再打印,等待一段時間之后,得到如下錯誤:
紅字的信息為:Resource exhaustion event:the JVM was unable to allocate memory from the heap.
翻譯過來的意思就是項目發(fā)出了一個資源耗盡的事件,告訴我們 java 虛擬機無法再為堆分配內(nèi)存。
造成這個錯誤的原因是:這個項目里的 batch job 的 reader 是一次性拿回了數(shù)據(jù)庫里的所有數(shù)據(jù),并沒有進行分頁,當這個數(shù)據(jù)量太大時,就會導致內(nèi)存不夠用。
解決的辦法有兩個:
調(diào)整 reader 讀數(shù)據(jù)邏輯,按分頁讀取,但實現(xiàn)上會麻煩一些,且運行效率會下降
增大 service 內(nèi)存
到此,關于“Spring Batch批處理框架操作實例分析”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權內(nèi)容。