溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

java異步并發(fā)請求和請求合并舉例分析

發(fā)布時間:2021-11-16 16:03:39 來源:億速云 閱讀:209 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“java異步并發(fā)請求和請求合并舉例分析”,在日常操作中,相信很多人在java異步并發(fā)請求和請求合并舉例分析問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”java異步并發(fā)請求和請求合并舉例分析”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

概述

在做業(yè)務(wù)系統(tǒng)需求開發(fā)中,經(jīng)常需要從其他服務(wù)獲取數(shù)據(jù),拼接數(shù)據(jù),然后返回數(shù)據(jù)給前端使用;常見的服務(wù)調(diào)用就是通過http接口調(diào)用,而對于http,通常一個請求會分配一個線程執(zhí)行,在同步調(diào)用接口的情況下,整個線程是一直被占用或者阻塞的;如果有大量的這種請求,整個系統(tǒng)的吞吐量就比較低,而在依賴的服務(wù)響應(yīng)時間比較低的情況下,我們希望先讓出cpu,讓其他請求先執(zhí)行,等依賴的服務(wù)請求返回結(jié)果時再繼續(xù)往下執(zhí)行,這時我們會考慮將請求異步化,或者將相同的請求合并,從而達(dá)到提高系統(tǒng)執(zhí)行效率和吞吐量的目的。

異步并發(fā)請求

目前常見的幾種調(diào)用方式是同步調(diào)用,線程池+future,異步回調(diào)completableFuture;協(xié)程也是異步調(diào)用的解決方式,但java目前不支持協(xié)程;對于future方式,只能用get或者while(!isDone)輪詢這種阻塞的方式直到線程執(zhí)行完成,這也不是我們希望的異步執(zhí)行方式,jdk8提供的completableFuture其實也不是異步的方式,只是對依賴多服務(wù)的Callback調(diào)用結(jié)果處理做結(jié)果編排,來彌補(bǔ)Callback的不足,從而實現(xiàn)異步鏈?zhǔn)秸{(diào)用的目的,這也是比較推薦的方式。

同步調(diào)用
	RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	// 假設(shè)rpc1耗時10ms
	Map<String, String> rpcResult1=rpcService.getRpcResult();
	// 假設(shè)rpc2耗時20ms
	Integer rpcResult2 = httpService.getHttpResult();
	
	// 則線程總耗時:30ms
future
	ExecutorService executor = Executors.newFixedThreadPool(2);
	RpcService rpcService = new RpcService();
	HttpService httpService = new HttpService();
	future1 = executor.submit(() -> rpcService.getRpcResult());
	future2 = executor.submit(() -> httpService.getHttpResult());
	//rpc1耗時10ms
	Map<String, String> rpcResult1 = future1.get(300, TimeUnit.MILLISECONDS);
	//rpc2耗時20ms
	Integer rpcResult2 = future2.get(300, TimeUnit.MILLISECONDS);

	//則線程總耗時20ms
CompletableFuture
	/** 
	* 場景:兩個接口并發(fā)異步調(diào)用,返回CompletableFuture,不阻塞主線程 
	* 兩個服務(wù)也是異步非阻塞調(diào)用
	**/
	CompletableFuture future1 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future2 = service.getHttpData("http://www.vip.com/showGoods/50");
	CompletableFuture future3 = future1.thenCombine(future2, (f1, f2) -> {
	    //處理業(yè)務(wù)....

	    return f1 + "," + f2;
	}).exceptionally(e -> {

	    return "";
	});	

CompletableFuture使用ForkJoinPool執(zhí)行線程,在ForkJoinPool類注冊ForkJoinWorkerThread線程時可以看到,F(xiàn)orkJoinPool里面的線程都是daemon線程(垃圾回收線程就是一個典型的deamon線程),

/**
     * Callback from ForkJoinWorkerThread constructor to establish and
     * record its WorkQueue.
     *
     * @param wt the worker thread
     * @return the worker's queue
     */
    final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
		
	    #注冊守護(hù)線程
        wt.setDaemon(true);          // configure thread
		
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        WorkQueue w = new WorkQueue(this, wt);
        int i = 0;                                    // assign a pool index
        int mode = config & MODE_MASK;
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                w.hint = s;                           // use as random seed
                w.config = i | mode;
                w.scanState = i;                      // publication fence
                ws[i] = w;
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

當(dāng)主線程執(zhí)行完后,jvm就會退出,所以需要考慮主線程執(zhí)行完成時間和fork出去的線程執(zhí)行時間,也需要考慮線程池的大小,默認(rèn)為當(dāng)前cpu的核數(shù)-1,可以參考下其他系統(tǒng)的故障記錄:CompletableFuture線程池問題。

請求合并

當(dāng)系統(tǒng)遇到瞬間產(chǎn)生大量請求時,可以考慮將相同的請求合并,最大化利用系統(tǒng)IO,提高系統(tǒng)的吞吐量。

設(shè)計時,可以將符合條件的url請求,先收集起來,直到滿足以下條件之一時進(jìn)行合并發(fā)送:

  1. 收集到的請求數(shù)超過預(yù)設(shè)的最大請求數(shù)。

  2. 距離上次請求發(fā)送時長超過預(yù)設(shè)的最大時長。

實現(xiàn)方案有自行使用阻塞隊列方式:并發(fā)環(huán)境下的請求合并,也可以考慮Hystrix:Hystrix實現(xiàn)請求合并/請求緩存

目前公司網(wǎng)關(guān)組件janus也是通過合并auth請求的方式減少網(wǎng)絡(luò)開銷,提高cpu的利用率和系統(tǒng)吞吐量的。

nginx同樣有合并請求模塊nginx-http-concat用來減少請求io,參考:nginx 合并多個js/css請求為一個請求

賴澤坤@vipshop.com

到此,關(guān)于“java異步并發(fā)請求和請求合并舉例分析”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI