溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java 并發(fā)框架的介紹和使用方法

發(fā)布時間:2021-06-18 17:20:26 來源:億速云 閱讀:260 作者:chen 欄目:編程語言

這篇文章主要講解了“ Java 并發(fā)框架的介紹和使用方法”,文中的講解內(nèi)容簡單清晰,易于學(xué)習(xí)與理解,下面請大家跟著小編的思路慢慢深入,一起來研究和學(xué)習(xí)“ Java 并發(fā)框架的介紹和使用方法”吧!

1. 為什么要寫這篇文章

幾年前 NoSQL 開始流行的時候,像其他團隊一樣,我們的團隊也熱衷于令人興奮的新東西,并且計劃替換一個應(yīng)用程序的數(shù)據(jù)庫。但是,當深入實現(xiàn)細節(jié)時,我們想起了一位智者曾經(jīng)說過的話:“細節(jié)決定成敗”。最終我們意識到 NoSQL 不是解決所有問題的銀彈,而 NoSQL vs RDMS 的答案是:“視情況而定”。類似地,去年RxJava 和 Spring Reactor 這樣的并發(fā)庫加入了讓人充滿激情的語句,如異步非阻塞方法等。為了避免再犯同樣的錯誤,我們嘗試評估諸如 ExecutorService、 RxJava、Disruptor 和 Akka 這些并發(fā)框架彼此之間的差異,以及如何確定各自框架的正確用法。

本文中用到的術(shù)語在這里有更詳細的描述。

2. 分析并發(fā)框架的示例用例

Java 并發(fā)框架的介紹和使用方法

3. 快速更新線程配置

在開始比較并發(fā)框架的之前,讓我們快速復(fù)習(xí)一下如何配置最佳線程數(shù)以提高并行任務(wù)的性能。這個理論適用于所有框架,并且在所有框架中使用相同的線程配置來度量性能。

  • 對于內(nèi)存任務(wù),線程的數(shù)量大約等于具有最佳性能的內(nèi)核的數(shù)量,盡管它可以根據(jù)各自處理器中的超線程特性進行一些更改。

    • 例如,在8核機器中,如果對應(yīng)用程序的每個請求都必須在內(nèi)存中并行執(zhí)行4個任務(wù),那么這臺機器上的負載應(yīng)該保持為 @2 req/sec,在 ThreadPool 中保持8個線程。

  • 對于 I/O 任務(wù),ExecutorService 中配置的線程數(shù)應(yīng)該取決于外部服務(wù)的延遲。

    • 與內(nèi)存中的任務(wù)不同,I/O 任務(wù)中涉及的線程將被阻塞,并處于等待狀態(tài),直到外部服務(wù)響應(yīng)或超時。因此,當涉及 I/O 任務(wù)線程被阻塞時,應(yīng)該增加線程的數(shù)量,以處理來自并發(fā)請求的額外負載。

    • I/O 任務(wù)的線程數(shù)應(yīng)該以保守的方式增加,因為處于活動狀態(tài)的許多線程帶來了上下文切換的成本,這將影響應(yīng)用程序的性能。為了避免這種情況,應(yīng)該根據(jù) I/O 任務(wù)中涉及的線程的等待時間按比例增加此機器的線程的確切數(shù)量以及負載。

參考: http://baddotrobot.com/blog/2013/06/01/optimum-number-of-threads/

4. 性能測試結(jié)果

性能測試配置 GCP -> 處理器:Intel(R) Xeon(R) CPU @ 2.30GHz;架構(gòu):x86_64;CPU 內(nèi)核:8個(注意:這些結(jié)果僅對該配置有意義,并不表示一個框架比另一個框架更好)。

Java 并發(fā)框架的介紹和使用方法

5. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)

5.1 何時使用?

如果一個應(yīng)用程序部署在多個節(jié)點上,并且每個節(jié)點的 req/sec 小于可用的核心數(shù)量,那么 ExecutorService 可用于并行化任務(wù),更快地執(zhí)行代碼。

5.2 什么時候適用?

如果一個應(yīng)用程序部署在多個節(jié)點上,并且每個節(jié)點的 req/sec 遠遠高于可用的核心數(shù)量,那么使用 ExecutorService 進一步并行化只會使情況變得更糟。

當外部服務(wù)延遲增加到 400ms 時,性能測試結(jié)果如下(請求速率 @50 req/sec,8核)。

Java 并發(fā)框架的介紹和使用方法

5.3 所有任務(wù)按順序執(zhí)行示例

// I/O 任務(wù):調(diào)用外部服務(wù)
String posts = JsonService.getPosts();
String comments = JsonService.getComments();
String albums = JsonService.getAlbums();
String photos = JsonService.getPhotos();

// 合并來自外部服務(wù)的響應(yīng)
// (內(nèi)存中的任務(wù)將作為此操作的一部分執(zhí)行)
int userId = new Random().nextInt(10) + 1;
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
return response;

5.4 I/O 任務(wù)與 ExecutorService 并行執(zhí)行代碼示例

// 添加 I/O 任務(wù)
List<Callable<String>> ioCallableTasks = new ArrayList<>();
ioCallableTasks.add(JsonService::getPosts);
ioCallableTasks.add(JsonService::getComments);
ioCallableTasks.add(JsonService::getAlbums);
ioCallableTasks.add(JsonService::getPhotos);

// 調(diào)用所有并行任務(wù)
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
List<Future<String>> futuresOfIOTasks = ioExecutorService.invokeAll(ioCallableTasks);

// 獲取 I/O  操作(阻塞調(diào)用)結(jié)果
String posts = futuresOfIOTasks.get(0).get();
String comments = futuresOfIOTasks.get(1).get();
String albums = futuresOfIOTasks.get(2).get();
String photos = futuresOfIOTasks.get(3).get();

// 合并響應(yīng)(內(nèi)存中的任務(wù)是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

6. 使用執(zhí)行器服務(wù)并行化 IO 任務(wù)(CompletableFuture)

與上述情況類似:處理傳入請求的 HTTP 線程被阻塞,而 CompletableFuture 用于處理并行任務(wù)

6.1 何時使用?

如果沒有 AsyncResponse,性能與 ExecutorService 相同。如果多個 API 調(diào)用必須異步并且鏈接起來,那么這種方法更好(類似 Node 中的 Promises)。

ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);

// I/O 任務(wù)
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
   ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
   ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
   ioExecutorService);
CompletableFuture.allOf(postsFuture, commentsFuture, albumsFuture, photosFuture).get();

// 從 I/O 任務(wù)(阻塞調(diào)用)獲得響應(yīng)
String posts = postsFuture.get();
String comments = commentsFuture.get();
String albums = albumsFuture.get();
String photos = photosFuture.get();

// 合并響應(yīng)(內(nèi)存中的任務(wù)將是此操作的一部分)
String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments);
String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos);

// 構(gòu)建最終響應(yīng)并將其發(fā)送回客戶端
return postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;

7. 使用 ExecutorService 并行處理所有任務(wù)

使用 ExecutorService 并行處理所有任務(wù),并使用 @suspended AsyncResponse response 以非阻塞方式發(fā)送響應(yīng)。

Java 并發(fā)框架的介紹和使用方法

圖片來自 http://tutorials.jenkov.com/java-nio/nio-vs-io.html

  • HTTP 線程處理傳入請求的連接,并將處理傳遞給 Executor Pool,當所有任務(wù)完成后,另一個 HTTP 線程將把響應(yīng)發(fā)送回客戶端(異步非阻塞)。

  • 性能下降原因:

    • 在同步通信中,盡管 I/O 任務(wù)中涉及的線程被阻塞,但是只要進程有額外的線程來承擔并發(fā)請求負載,它仍然處于運行狀態(tài)。

    • 因此,以非阻塞方式保持線程所帶來的好處非常少,而且在此模式中處理請求所涉及的成本似乎很高。

    • 通常,對這里討論采用的例子使用異步非阻塞方法會降低應(yīng)用程序的性能。

7.1 何時使用?

如果用例類似于服務(wù)器端聊天應(yīng)用程序,在客戶端響應(yīng)之前,線程不需要保持連接,那么異步、非阻塞方法比同步通信更受歡迎。在這些用例中,系統(tǒng)資源可以通過異步、非阻塞方法得到更好的利用,而不僅僅是等待。

// 為異步執(zhí)行提交并行任務(wù)
ExecutorService ioExecutorService = CustomThreads.getExecutorService(ioPoolSize);
CompletableFuture<String> postsFuture = CompletableFuture.supplyAsync(JsonService::getPosts, ioExecutorService);
CompletableFuture<String> commentsFuture = CompletableFuture.supplyAsync(JsonService::getComments,
ioExecutorService);
CompletableFuture<String> albumsFuture = CompletableFuture.supplyAsync(JsonService::getAlbums,
ioExecutorService);
CompletableFuture<String> photosFuture = CompletableFuture.supplyAsync(JsonService::getPhotos,
ioExecutorService);

// 當 /posts API 返回響應(yīng)時,它將與來自 /comments API 的響應(yīng)結(jié)合在一起
// 作為這個操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
CompletableFuture<String> postsAndCommentsFuture = postsFuture.thenCombineAsync(commentsFuture,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments),
ioExecutorService);

// 當 /albums API 返回響應(yīng)時,它將與來自 /photos API 的響應(yīng)結(jié)合在一起
// 作為這個操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
CompletableFuture<String> albumsAndPhotosFuture = albumsFuture.thenCombineAsync(photosFuture,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos),
ioExecutorService);

// 構(gòu)建最終響應(yīng)并恢復(fù) http 連接,把響應(yīng)發(fā)送回客戶端
postsAndCommentsFuture.thenAcceptBothAsync(albumsAndPhotosFuture, (s1, s2) -> {
LOG.info("Building Async Response in Thread " + Thread.currentThread().getName());
String response = s1 + s2;
asyncHttpResponse.resume(response);
}, ioExecutorService);

8. RxJava

  • 這與上面的情況類似,唯一的區(qū)別是 RxJava 提供了更好的 DSL 可以進行流式編程,下面的例子中沒有體現(xiàn)這一點。

  • 性能優(yōu)于 CompletableFuture 處理并行任務(wù)。

8.1 何時使用?

如果編碼的場景適合異步非阻塞方式,那么可以首選 RxJava 或任何響應(yīng)式開發(fā)庫。還具有諸如 back-pressure 之類的附加功能,可以在生產(chǎn)者和消費者之間平衡負載。

int userId = new Random().nextInt(10) + 1;
ExecutorService executor = CustomThreads.getExecutorService(8);

// I/O 任務(wù)
Observable<String> postsObservable = Observable.just(userId).map(o -> JsonService.getPosts())
.subscribeOn(Schedulers.from(executor));
Observable<String> commentsObservable = Observable.just(userId).map(o -> JsonService.getComments())
.subscribeOn(Schedulers.from(executor));
Observable<String> albumsObservable = Observable.just(userId).map(o -> JsonService.getAlbums())
.subscribeOn(Schedulers.from(executor));
Observable<String> photosObservable = Observable.just(userId).map(o -> JsonService.getPhotos())
.subscribeOn(Schedulers.from(executor));

// 合并來自 /posts 和 /comments API 的響應(yīng)
// 作為這個操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
Observable<String> postsAndCommentsObservable = Observable
.zip(postsObservable, commentsObservable,
(posts, comments) -> ResponseUtil.getPostsAndCommentsOfRandomUser(userId, posts, comments))
.subscribeOn(Schedulers.from(executor));

// 合并來自 /albums 和 /photos API 的響應(yīng)
// 作為這個操作的一部分,將執(zhí)行內(nèi)存中的一些任務(wù)
Observable<String> albumsAndPhotosObservable = Observable
.zip(albumsObservable, photosObservable,
(albums, photos) -> ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, albums, photos))
.subscribeOn(Schedulers.from(executor));

// 構(gòu)建最終響應(yīng)
Observable.zip(postsAndCommentsObservable, albumsAndPhotosObservable, (r1, r2) -> r1 + r2)
.subscribeOn(Schedulers.from(executor))
.subscribe((response) -> asyncResponse.resume(response), e -> asyncResponse.resume("error"));

9. Disruptor

Java 并發(fā)框架的介紹和使用方法

[Queue vs RingBuffer]

Java 并發(fā)框架的介紹和使用方法

圖片1:http://tutorials.jenkov.com/java-concurrency/blocking-queues.html

圖片2:https://www.baeldung.com/lmax-disruptor-concurrency

  • 在本例中,HTTP 線程將被阻塞,直到 disruptor 完成任務(wù),并且使用 countdowlatch 將 HTTP 線程與 ExecutorService 中的線程同步。

  • 這個框架的主要特點是在沒有任何鎖的情況下處理線程間通信。在 ExecutorService 中,生產(chǎn)者和消費者之間的數(shù)據(jù)將通過 Queue傳遞,在生產(chǎn)者和消費者之間的數(shù)據(jù)傳輸過程中涉及到一個鎖。Disruptor 框架通過一個名為 Ring Buffer 的數(shù)據(jù)結(jié)構(gòu)(它是循環(huán)數(shù)組隊列的擴展版本)來處理這種生產(chǎn)者-消費者通信,并且不需要任何鎖。

  • 這個庫不適用于我們在這里討論的這種用例。僅出于好奇而添加。

9.1 何時使用?

Disruptor 框架在下列場合性能更好:與事件驅(qū)動的體系結(jié)構(gòu)一起使用,或主要關(guān)注內(nèi)存任務(wù)的單個生產(chǎn)者和多個消費者。

static {
   int userId = new Random().nextInt(10) + 1;

   // 示例 Event-Handler; count down latch 用于使線程與 http 線程同步
   EventHandler<Event> postsApiHandler = (event, sequence, endOfBatch) -> {
       event.posts = JsonService.getPosts();
       event.countDownLatch.countDown();
   };

   // 配置 Disputor 用于處理事件
   DISRUPTOR.handleEventsWith(postsApiHandler, commentsApiHandler, albumsApiHandler)
   .handleEventsWithWorkerPool(photosApiHandler1, photosApiHandler2)
   .thenHandleEventsWithWorkerPool(postsAndCommentsResponseHandler1, postsAndCommentsResponseHandler2)
   .handleEventsWithWorkerPool(albumsAndPhotosResponseHandler1, albumsAndPhotosResponseHandler2);
   DISRUPTOR.start();
}

// 對于每個請求,在 RingBuffer 中發(fā)布一個事件:
Event event = null;
RingBuffer<Event> ringBuffer = DISRUPTOR.getRingBuffer();
long sequence = ringBuffer.next();
CountDownLatch countDownLatch = new CountDownLatch(6);
try {
   event = ringBuffer.get(sequence);
   event.countDownLatch = countDownLatch;
   event.startTime = System.currentTimeMillis();
} finally {
   ringBuffer.publish(sequence);
}
try {
   event.countDownLatch.await();
} catch (InterruptedException e) {
   e.printStackTrace();
}

10. Akka

Java 并發(fā)框架的介紹和使用方法

圖片來自:https://blog.codecentric.de/en/2015/08/introduction-to-akka-actors/

  • Akka 庫的主要優(yōu)勢在于它擁有構(gòu)建分布式系統(tǒng)的本地支持。

  • 它運行在一個叫做 Actor System 的系統(tǒng)上。這個系統(tǒng)抽象了線程的概念,Actor System 中的 Actor 通過異步消息進行通信,這類似于生產(chǎn)者和消費者之間的通信。

  • 這種額外的抽象級別有助于 Actor System 提供諸如容錯、位置透明等特性。

  • 使用正確的 Actor-to-Thread 策略,可以對該框架進行優(yōu)化,使其性能優(yōu)于上表所示的結(jié)果。雖然它不能在單個節(jié)點上與傳統(tǒng)方法的性能匹敵,但是由于其構(gòu)建分布式和彈性系統(tǒng)的能力,仍然是首選。

10.1 示例代碼

// 來自 controller :
Actors.masterActor.tell(new Master.Request("Get Response", event, Actors.workerActor), ActorRef.noSender());

// handler :
public Receive createReceive() {
   return receiveBuilder().match(Request.class, request -> {
   Event event = request.event; // Ideally, immutable data structures should be used here.
   request.worker.tell(new JsonServiceWorker.Request("posts", event), getSelf());
   request.worker.tell(new JsonServiceWorker.Request("comments", event), getSelf());
   request.worker.tell(new JsonServiceWorker.Request("albums", event), getSelf());
   request.worker.tell(new JsonServiceWorker.Request("photos", event), getSelf());
   }).match(Event.class, e -> {
   if (e.posts != null && e.comments != null & e.albums != null & e.photos != null) {
   int userId = new Random().nextInt(10) + 1;
   String postsAndCommentsOfRandomUser = ResponseUtil.getPostsAndCommentsOfRandomUser(userId, e.posts,
   e.comments);
   String albumsAndPhotosOfRandomUser = ResponseUtil.getAlbumsAndPhotosOfRandomUser(userId, e.albums,
   e.photos);
   String response = postsAndCommentsOfRandomUser + albumsAndPhotosOfRandomUser;
   e.response = response;
   e.countDownLatch.countDown();
   }
   }).build();
}

11. 總結(jié)

  • 根據(jù)機器的負載決定 Executor 框架的配置,并檢查是否可以根據(jù)應(yīng)用程序中并行任務(wù)的數(shù)量進行負載平衡。

  • 對于大多數(shù)傳統(tǒng)應(yīng)用程序來說,使用響應(yīng)式開發(fā)庫或任何異步庫都會降低性能。只有當用例類似于服務(wù)器端聊天應(yīng)用程序時,這個模式才有用,其中線程在客戶機響應(yīng)之前不需要保留連接。

  • Disruptor 框架在與事件驅(qū)動的架構(gòu)模式一起使用時性能很好; 但是當 Disruptor 模式與傳統(tǒng)架構(gòu)混合使用時,就我們在這里討論的用例而言,它并不符合標準。

感謝各位的閱讀,以上就是“ Java 并發(fā)框架的介紹和使用方法”的內(nèi)容了,經(jīng)過本文的學(xué)習(xí)后,相信大家對 Java 并發(fā)框架的介紹和使用方法這一問題有了更深刻的體會,具體使用情況還需要大家實踐驗證。這里是億速云,小編將為大家推送更多相關(guān)知識點的文章,歡迎關(guān)注!

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI