溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

處理java異步事件的阻塞和非阻塞方法分析

發(fā)布時間:2020-09-19 15:51:17 來源:腳本之家 閱讀:198 作者:Dennis Sosnoski 欄目:編程語言

前言

由于多核系統(tǒng)普遍存在,并發(fā)性編程的應用無疑比以往任何時候都要廣泛。但并發(fā)性很難正確實現(xiàn),用戶需要借助新工具來使用它。很多基于 JVM 的語言都屬于這類開發(fā)工具,Scala 在這一領域尤為活躍。本系列文章將介紹一些針對 Java 和 Scala 語言的較新的并發(fā)性編程方法。

在任何并發(fā)性應用程序中,異步事件處理都至關重要。事件來源可能是不同的計算任務、I/O 操作或與外部系統(tǒng)的交互。無論來源是什么,應用程序代碼都必須跟蹤事件,協(xié)調(diào)為響應事件而采取的操作。

Java 應用程序可采用兩種基本的異步事件處理方法:該應用程序有一個協(xié)調(diào)線程等待事件,然后采取操作,或者事件可在完成時直接執(zhí)行某項操作(通常采取執(zhí)行應用程序所提供的代碼的方式)。讓線程等待事件的方法被稱為阻塞 方法。讓事件執(zhí)行操作、線程無需顯式等待事件的方法被稱為非阻塞 方法。

在計算中,根據(jù)具體上下文,阻塞 和非阻塞 這兩個詞的使用通常會有所不同。舉例而言,共享數(shù)據(jù)結(jié)構(gòu)的非阻塞算法不需要線程等待訪問數(shù)據(jù)結(jié)構(gòu)。在非阻塞 I/O 中,應用程序線程可以啟動一個 I/O 操作,然后離開執(zhí)行其他事情,同時該操作會異步地執(zhí)行。在本文中,非阻塞 指的是在無需等待線程的情況下完成某個執(zhí)行操作的事件。這些用法中的一個共同概念是,阻塞操作需要一個線程來等待某個結(jié)果,而非阻塞操作不需要。

合成事件

等待事件的完成很簡單:您有一個線程等待該事件,線程恢復運行時,您就可以知道該事件已經(jīng)完成。如果您的線程在此期間有其他事要做,它會做完這些事再等待。該線程甚至可以使用輪詢方法,通過該方法中斷它的其他活動,從而檢查事件是否已完成。但基本原理是相同的:需要事件的結(jié)果時,您會讓線程???(park),以便等待事件完成。

阻塞很容易完成且相對簡單,只要您有一個等待事件完成的單一主線程。使用多個因為彼此等待而阻塞的線程時,可能遇到一些問題,比如:

  • 死鎖:兩個或更多線程分別控制其他線程繼續(xù)執(zhí)行所需的資源。
  • 饑餓 (Starvation):一些線程可能無法繼續(xù)執(zhí)行,因為其他線程貪婪地消耗著共享資源。
  • 活鎖:線程嘗試針對彼此而調(diào)整,但最終沒有進展。

非阻塞方法為創(chuàng)造力留出的空間要多得多?;卣{(diào)是非阻塞事件處理的一種常見技術(shù)?;卣{(diào)是靈活性的象征,因為您可以在發(fā)生事件時執(zhí)行任何想要的代碼。回調(diào)的缺點是,在使用回調(diào)處理許多事件時,您的代碼會變得凌亂。而且回調(diào)特別難調(diào)試,因為控制流與應用程序中的代碼順序不匹配。

Java 8 CompletableFuture 同時支持阻塞和非阻塞的事件處理方法,包括常規(guī)回調(diào)。CompletableFuture 也提供了多種合成和組合事件的方式,實現(xiàn)了回調(diào)的靈活性以及干凈、簡單、可讀的代碼。在本節(jié)中,您將看到處理由 CompletableFuture 表示的事件的阻塞和非阻塞方法的示例。

任務和排序

應用程序在一個特定操作中通常必須執(zhí)行多個處理步驟。例如,在向用戶返回結(jié)果之前,Web 應用程序可能需要:

1.在一個數(shù)據(jù)庫中查找用戶的信息
2.使用查找到的信息來執(zhí)行 Web 服務調(diào)用,并執(zhí)行另一次數(shù)據(jù)庫查詢。
3.基于來自上一步的結(jié)果而執(zhí)行數(shù)據(jù)庫更新。

圖 1 演示了這種結(jié)構(gòu)類型。

圖 1. 應用程序任務流

處理java異步事件的阻塞和非阻塞方法分析

圖 1 將處理過程分解為 4 個不同的任務,它們通過表示順序依賴關系的箭頭相連接。任務 1 可直接執(zhí)行,任務 2 和任務 3 都在任務 1 完成后執(zhí)行,任務 4 在任務 2 和任務 3 都完成后執(zhí)行。這是我在本文中用于演示異步事件處理的任務結(jié)構(gòu)。真實應用程序(尤其是具有多個移動部分的服務器應用程序)可能要復雜得多,但這個簡單的示例僅用于演示所涉及的原理。

建模異步事件

在真實系統(tǒng)中,異步事件的來源一般是并行計算或某種形式的 I/O 操作。但是,使用簡單的時間延遲來建模這種系統(tǒng)會更容易,這也是本文所采用的方法。清單 1 顯示了我用于生成事件的基本的賦時事件 (timed-event) 代碼,這些事件采用了 CompletableFuture 格式。

清單 1. 賦時事件代碼

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.CompletableFuture;
public class TimedEventSupport {
private static final Timer timer = new Timer();
/**
* Build a future to return the value after a delay.
*
* @param delay
* @param value
* @return future
*/
public static <T> CompletableFuture<T> delayedSuccess(int delay, T value) {
CompletableFuture<T> future = new CompletableFuture<T>();
TimerTask task = new TimerTask() {
public void run() {
future.complete(value);
}
};
timer.schedule(task, delay * 1000);
return future;
}
/**
* Build a future to return a throwable after a delay.
* 
* @param delay
* @param t
* @return future
*/
public static <T> CompletableFuture<T> delayedFailure(int delay, Throwable t) {
CompletableFuture<T> future = new CompletableFuture<T>();
TimerTask task = new TimerTask() {
public void run() {
future.completeExceptionally(t);
}
};
timer.schedule(task, delay * 1000);
return future;
}
}

為什么不采用 lambda?

清單 1 中的 TimerTask 被實現(xiàn)為一個匿名內(nèi)部類,僅包含一個 run() 方法。您可能認為這里可以使用 lambda 代替內(nèi)部類。但是,lambda 僅能用作接口的實例,而 TimerTask 被定義為一種抽象類。除非 lambda 特性的 future 擴展添加了對抽象類的支持(有可能,但由于設計問題,未必行得通),或者為 TimerTask 等情形定義了并行接口,否則您必須繼續(xù)使用 Java 內(nèi)部類創(chuàng)建單一方法實現(xiàn)。

清單 1 的代碼使用一個 java.util.Timer 來計劃 java.util.TimerTask 在一定的延遲后執(zhí)行。每個 TimerTask 在運行時完成一個有關聯(lián)的 future。delayedSuccess() 計劃一個任務來成功完成一個 CompletableFuture<T> 并將 future 返回調(diào)用方。delayedFailure() 計劃了一個任務來完成一個 CompletableFuture<T> 并拋出異常,然后將 future 返回給調(diào)用方。

清單 2 展示了如何使用 清單 1 中的代碼創(chuàng)建 CompletableFuture<Integer> 形式的事件,這些事件與 圖 1 中的 4 個任務相匹配。(此代碼來自示例代碼中的 EventComposition 類。)

清單 2. 示例任務的事件

// task definitions
private static CompletableFuture<Integer> task1(int input) {
return TimedEventSupport.delayedSuccess(1, input + 1);
}
private static CompletableFuture<Integer> task2(int input) {
return TimedEventSupport.delayedSuccess(2, input + 2);
}
private static CompletableFuture<Integer> task3(int input) {
return TimedEventSupport.delayedSuccess(3, input + 3);
}
private static CompletableFuture<Integer> task4(int input) {
return TimedEventSupport.delayedSuccess(1, input + 4);
}

清單 2 中 4 個任務方法中的每一個都為該任務的完成時刻使用了特定的延遲值:task1 為 1 秒,task2 為 2 秒,task3 為 3 秒,task4 重新變?yōu)?1 秒。每個任務還接受一個輸入值,是該輸入加上任務編號作為 future 的(最終)結(jié)果值。這些方法都使用了 future 的成功形式;稍后我們將會查看一些使用失敗形式的例子。

這些任務要求您按 圖 1 中所示的順序運行它們,向每個任務傳遞上一個任務返回的結(jié)果值(或者對于 task4,傳遞前兩個任務結(jié)果的和)。如果中間兩個任務是同時執(zhí)行的,那么總執(zhí)行時間大約為 5 秒(1 秒 + (2 秒、3 秒中的最大值)+ 1 秒。

如果 task1 的輸入為 1,那么結(jié)果為 2。如果該結(jié)果傳遞給 task2 和 task3,結(jié)果將為 4 和 5。如果這兩個結(jié)果的和 (9) 作為輸入傳遞給 task4,最終結(jié)果將為 13。

阻塞等待

在設置了執(zhí)行環(huán)境后,是時候設置一些操作了。協(xié)調(diào) 4 個任務的執(zhí)行的最簡單方式是使用阻塞等待:主要線程等待每個任務完成。清單 3(同樣來自示例代碼中的 EventComposition 類)給出了此方法。

清單 3. 阻塞等待任務執(zhí)行

private static CompletableFuture<Integer> runBlocking() {
Integer i1 = task1(1).join();
CompletableFuture<Integer> future2 = task2(i1);
CompletableFuture<Integer> future3 = task3(i1);
Integer result = task4(future2.join() + future3.join()).join();
return CompletableFuture.completedFuture(result);
}

清單 3 使用 CompletableFuture 的 join() 方法來完成阻塞等待。join() 等待任務完成,然后,如果成功完成任務,則返回結(jié)果值,或者如果失敗或被取消,則拋出一個未經(jīng)檢查的異常。該代碼首先等待 task1 的結(jié)果,然后同時啟動 task2 和 task3,并等待兩個任務依次返回 future,最后等待 task4 的結(jié)果。runBlocking() 返回一個 CompletableFuture,以便與我接下來將展示的非阻塞形式保持一致,但在本例中,future 實際上將在該方法返回之前完成。

合成和組合 future

清單 4(同樣來自示例代碼中的 EventComposition 類)展示了如何將 future 連接在一起,以便按正確順序并使用正確的依賴關系執(zhí)行任務,而不使用阻塞。

清單 4. 非阻塞的合成和組合

private static CompletableFuture<Integer> runNonblocking() {
return task1(1).thenCompose(i1 -> ((CompletableFuture<Integer>)task2(i1)
.thenCombine(task3(i1), (i2,i3) -> i2+i3)))
.thenCompose(i4 -> task4(i4));
}

清單 4 中的代碼基本上構(gòu)造了一個執(zhí)行計劃,指定不同的任務如何執(zhí)行和它們彼此有何關聯(lián)。此代碼精美而簡潔,但是,如果您不熟悉 CompletableFuture 方法,或許難以理解該代碼。清單 5 通過將 task2 和 task3 部分分離到一個新方法 runTask2and3 中,將同樣的代碼重構(gòu)為更容易理解的形式。

清單 5. 重構(gòu)后的非阻塞的合成和組合

private static CompletableFuture<Integer> runTask2and3(Integer i1) {
CompletableFuture<Integer> task2 = task2(i1);
CompletableFuture<Integer> task3 = task3(i1);
BiFunction<Integer, Integer, Integer> sum = (a, b) -> a + b;
return task2.thenCombine(task3, sum);
}
private static CompletableFuture<Integer> runNonblockingAlt() {
CompletableFuture<Integer> task1 = task1(1);
CompletableFuture<Integer> comp123 = task1.thenCompose(EventComposition::runTask2and3);
return comp123.thenCompose(EventComposition::task4); }

在 清單 5 中,runTask2and3() 方法表示任務流的中間部分,其中 task2 和 task3 同時執(zhí)行,然后將它們的結(jié)果值組合在一起。此順序是使用一個 future 上的 thenCombine() 方法來編碼的,該方法接受另一個 future 作為它的第一個參數(shù),接受一個二進制函數(shù)實例(其輸入類型與 future 的結(jié)果類型匹配)作為其第二個參數(shù)。thenCombine() 返回了第三個 future,表示應用到最初的兩個 future 的結(jié)果上的函數(shù)的值。在本例中,兩個 future 是 task2 和 task3,該函數(shù)將結(jié)果值求和。

runNonblockingAlt() 方法使用在一個 future 上調(diào)用了 thenCompose() 方法兩次。thenCompose() 的參數(shù)是一個函數(shù)實例,它接收原始 future 的值類型作為輸入,返回另一個 future 作為輸出。thenCompose() 的結(jié)果是第三個 future,具有與該函數(shù)相同的結(jié)果類型。這個 future 用作在原始 future 完成后,該函數(shù)最終將返回的 future 的占位符。

對 task1.thenCompose() 的調(diào)用將會返回一個 future,表示對 task1 的結(jié)果應用 runTask2and3() 函數(shù)的結(jié)果,該結(jié)果被保存為 comp123。對 comp123.thenCompose() 的調(diào)用返回一個 future,表示對第一個 henCompose() 的結(jié)果應用 task4() 函數(shù)的結(jié)果,這是執(zhí)行所有任務的總體結(jié)果。

試用示例

示例代碼包含一個 main() 方法,以便依次運行事件代碼的每個版本,并顯示完成事件(約 5 秒)和結(jié)果 (13) 是正確的。清單 6 顯示了從一個控制臺運行這個 main() 方法的結(jié)果。

清單 6. 運行 main() 方法

dennis@linux-guk3:~/devworks/scala3/code/bin> java com.sosnoski.concur.article3.EventComposition
Starting runBlocking
runBlocking returned 13 in 5008 ms.
Starting runNonblocking
runNonblocking returned 13 in 5002 ms.
Starting runNonblockingAlt
runNonblockingAlt returned 13 in 5001 ms.

不順利的道路

目前為止,您看到了以 future 形式協(xié)調(diào)事件的代碼,這些代碼總是能夠成功完成。在真實應用程序中,不能寄希望于事情總是這么順利。處理任務過程中將發(fā)生問題,而且在 Java 術(shù)語中,這些問題通常表示為 Throwable。

更改 清單 2 中的任務定義很容易,只需使用 delayedFailure() 代替 delayedSuccess() 方法即可,如這里的 task4 所示:

private static CompletableFuture<Integer> task4(int input) {
return TimedEventSupport.delayedFailure(1, new IllegalArgumentException("This won't work!"));
}

如果運行 清單 3 并且僅將 task4 修改為完成時拋出異常,那么您會得到 task4 上的 join() 調(diào)用所拋出的預期的 IllegalArgumentException。如果在 runBlocking() 方法中沒有捕獲該問題,該異常會在調(diào)用鏈中一直傳遞,最終如果仍未捕獲問題,則會終止執(zhí)行線程。幸運的是,修改該代碼很容易,因此,如果在任何任務完成時拋出異常,該異常會通過返回的 future 傳遞給調(diào)用方來處理。清單 7 展示了這一更改。

清單 7. 具有異常的阻塞等待

private static CompletableFuture<Integer> runBlocking() {
try {
Integer i1 = task1(1).join();
CompletableFuture<Integer> future2 = task2(i1);
CompletableFuture<Integer> future3 = task3(i1);
Integer result = task4(future2.join() + future3.join()).join();
return CompletableFuture.completedFuture(result);
} catch (CompletionException e) {
CompletableFuture<Integer> result = new CompletableFuture<Integer>();
result.completeExceptionally(e.getCause());
return result;
}}

清單 7 非常淺顯易懂。最初的代碼包裝在一個 try/catch 中,catch 在返回的 future 完成時傳回異常。此方法稍微增加了一點復雜性,但任何 Java 開發(fā)人員應該仍然很容易理解它。

清單 4 中的非阻塞代碼甚至不需要添加 try/catch。CompletableFuture 合成和組合操作負責自動為您傳遞異常,以便依賴的 future 也會在完成時拋出異常。

阻塞還是不阻塞

您已經(jīng)查看了由 CompletableFuture 表示的事件的阻塞和非阻塞處理方法。至少對于本文中建模的基本的任務流,兩種方法都非常簡單。對于更復雜的任務流,該代碼也會更加復雜。

在阻塞情況下,增加的復雜性不是大問題,您仍然只需要等待事件完成。如果要在線程之間執(zhí)行其他類型的同步,則會遇到線程饑餓問題,甚至是死鎖問題。

在非阻塞情況下,事件的完成所觸發(fā)的代碼執(zhí)行很難調(diào)試。在執(zhí)行許多類型的事件且事件之間存在許多交互時,跟蹤哪個事件觸發(fā)了哪次執(zhí)行就會變得很難。這種情形基本上就是回調(diào)的噩夢,無論是使用傳統(tǒng)的回調(diào)還是 CompletableFuture 組合和合成操作。

總而言之,阻塞代碼通常具有簡單性優(yōu)勢。那么為什么有人希望使用非阻塞方法?本節(jié)將給出一些重要的理由。

切換的成本

一個線程阻塞時,以前執(zhí)行該線程的處理器核心會轉(zhuǎn)而執(zhí)行另一個線程。以前執(zhí)行的線程的執(zhí)行狀態(tài)必須保存到內(nèi)存中,并加載新線程的狀態(tài)。這種將核心從運行一個線程切換到運行另一個線程的操作稱為上下文切換。

除了直接的上下文切換性能成本,新線程一般會使用來自前一個線程的不同數(shù)據(jù)。內(nèi)存訪問比處理器時鐘慢得多,所以現(xiàn)代系統(tǒng)會在處理器核心與主要內(nèi)存之間使用多層緩存。盡管比主要內(nèi)存快得多,但緩存的容量也小得多(一般而言,緩存越快,容量越?。匀魏螘r刻只能在緩存中保存總內(nèi)存的小部分。

發(fā)生線程切換且一個核心開始執(zhí)行一個新線程時,新線程需要的內(nèi)存數(shù)據(jù)可能不在緩存中,所以該核心必須等待該數(shù)據(jù)從主要內(nèi)存加載。

組合的上下文切換和內(nèi)存訪問延遲,會導致直接的顯著性能成本。圖 2 顯示了我使用 Oracle 的 Java 8 for 64-bit Linux 的四核 AMD 系統(tǒng)上的線程切換開銷。

此測試使用了可變數(shù)量的線程,數(shù)量從 1 到 4,096 按 2 的冪次變化,每個線程的內(nèi)存塊大小也是可變的,介于 0 到 64KB 之間。線程依次執(zhí)行,使用 CompletableFuture 來觸發(fā)線程執(zhí)行。每次一個線程執(zhí)行時,它首先使用針對線程的數(shù)據(jù)返回一個簡單計算結(jié)果,以顯示將此數(shù)據(jù)加載到緩存中的開銷,然后增加一個共享的靜態(tài)變量。

最后創(chuàng)建一個新 CompletableFuture 實例來觸發(fā)它的下一次執(zhí)行,然后通過完成該線程等待的 CompletableFuture 來啟動序列中的下一個線程。最后,如果需要再次執(zhí)行它,那么該線程會等待新創(chuàng)建的 CompletableFuture 完成。

圖 2. 線程切換成本

處理java異步事件的阻塞和非阻塞方法分析

可以在圖 2 的圖表中看到線程數(shù)量和每個線程的內(nèi)存的影響。線程數(shù)量為 4 個時影響最大,只要特定于線程的數(shù)據(jù)足夠小,兩個線程的運行速度幾乎與單個線程一樣快。線程數(shù)量超過 4 個后,對性能的影響相對較小。每個線程的內(nèi)存量越大,兩層緩存就會越快溢出,導致切換成本增高。

圖 2 中所示的時間值來自我的有點過時的主要系統(tǒng)。您系統(tǒng)上相應的時間將不同,可能會小得多。但是,曲線的形狀應大體相同。

圖 2 顯示了一次線程切換的微秒級開銷,所以即使線程切換的成本達到數(shù)萬個處理器時鐘,但絕對數(shù)字并不大。對于中等數(shù)量的線程,16KB 數(shù)據(jù)具有 12.5 微秒的切換時間(圖表中的黃線),系統(tǒng)每秒可執(zhí)行 80,000 次線程切換。與您在任何精心編寫的單用戶應用程序以及甚至許多服務器應用程序中看到的結(jié)果相比,這一線程切換次數(shù)可能多得多。但對于每秒處理數(shù)千個事件的高性能服務器應用程序,阻塞的開銷可能成為影響性能的主要因素。對于這種應用程序,盡可能使用非阻塞代碼來最大限度減少線程切換非常重要。

認識到這些時間數(shù)據(jù)來自最理想的場景也很重要。運行線程切換程序時,會運行足夠的 CPU 活動來讓所有核心全速運行(至少在我的系統(tǒng)上是這樣)。在真實應用程序中,處理負載可能具有更大的突發(fā)性。在活動量低的時間,現(xiàn)代處理器將一些核心過渡到休眠狀態(tài),以減少總功耗和產(chǎn)生的熱量。這個降低功耗的過程的惟一問題是,在需求增多時,它需要時間來將核心從休眠狀態(tài)喚醒。從深度休眠狀態(tài)過渡到全速運行所需的時間可能達到微秒級別,而不是在這個線程切換時間示例中看到的毫秒級。

反應式應用程序

對于許多應用程序,不在特定線程上阻塞的另一個原因是,這些線程用于處理需要及時響應的事件。經(jīng)典的例子就是 UI 線程。如果在 UI 線程中執(zhí)行會阻塞來等待異步事件完成的代碼,那么您會延遲用戶輸入事件的處理。沒有人喜歡等待應用程序響應他們的鍵入、單擊或觸控操作,所以 UI 線程中的阻塞可能很快在來自用戶的錯誤報告中反映出來。

UI 線程概念以一種更一般性的原則作為支撐。許多類型的應用程序,甚至非 GUI 應用程序,也必須迅速響應事件,而且在許多情況下,保持較短的響應事件至關重要。對于這些類型的應用程序,阻塞等待不是可接受的選擇。

反應式編程 這個名稱表示為響應靈敏且可擴展的應用程序采用的編程風格。反應式編程的核心原則是應用程序應能夠:

  • 對事件做出反應:應用程序應是事件驅(qū)動的,在由異步通信所鏈接的每個級別上具有松散耦合的組件。
  • 對負載做出反應:應用程序應該是可擴展的,以便可以輕松地升級應用程序來處理增加的需求。
  • 對故障做出反應:應用程序應具有恢復能力,能將故障的影響局部化并迅速更正。
  • 對用戶做出反應:應用程序應能迅速響應用戶,甚至在具有負載和存在故障的情況下。

使用阻塞式事件處理方法的應用程序無法滿足這些原則。線程是有限的資源,所以在阻塞等待中占用它們會限制可伸縮性,還會增加延遲(應用程序響應時間),因為阻塞的線程無法立即響應事件。非阻塞應用程序可更快地響應事件,降低成本,同時減少線程切換開銷并改善吞吐量。

反應式編程比非阻塞代碼的復雜得多。反應式編程涉及到關注您應用程序中的數(shù)據(jù)流并將這些數(shù)據(jù)流實現(xiàn)為異步交互,而不會讓接收方負擔過重或讓發(fā)送方積滯。這種對數(shù)據(jù)流的關注,有助于避免傳統(tǒng)的并發(fā)編程的許多復雜性。

結(jié)束語

本文介紹了如何使用 Java 8 CompletableFuture 類將事件合成和組合到一種執(zhí)行計劃中,這個計劃可以使用代碼來輕松而又簡潔地表示。這種非阻塞可合成性對編寫能快速響應工作負載并適當處理故障的反應式應用程序不可或缺。

文中通過示例代碼介紹的非常詳細,對大家的學習或者工作具有一定的參考學習價值,

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI