您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“如何使用Java高并發(fā)編程之Semaphore”,感興趣的朋友不妨來看看。本文介紹的方法操作簡單快捷,實用性強。下面就讓小編來帶大家學習“如何使用Java高并發(fā)編程之Semaphore”吧!
共享鎖、獨占鎖
學習semaphore之前我們必須要先了解下什么是共享鎖。
共享鎖:它是允許多個線程同時獲取鎖,并發(fā)的訪問共享資源
獨占鎖:也有人把它叫做“獨享鎖”,它是是獨占的,排他的,只能被一個線程可持有, 當獨占鎖已經(jīng)被某個線程持有時,其他線程只能等待它被釋放后,才能去爭鎖,并且同一時刻只有一個線程能爭鎖成功。
什么是Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數(shù)量,它通過協(xié)調(diào)各個線程,以保證合理的使用公共資源。很多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈,比如XX馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,后面的車會看到紅燈,不能駛?cè)隭X馬路,但是如果前一百輛中有五輛車已經(jīng)離開了XX馬路,那么后面就允許有5輛車駛?cè)腭R路,這個例子里說的車就是線程,駛?cè)腭R路就表示線程在執(zhí)行,離開馬路就表示線程執(zhí)行完成,看見紅燈就表示線程被阻塞,不能執(zhí)行。”
Semaphore機制是提供給線程搶占式獲取許可,所以他可以實現(xiàn)公平或者非公平,類似于ReentrantLock。說了這么多我們來個實際的例子看一看,比如我們?nèi)ネ\噲鐾\?,停車場總共只?個車位,但是現(xiàn)在有8輛汽車來停車,剩下的3輛汽車要么等其他汽車開走后進行停車,或者去找別的停車位?
/** * @author: 公眾號【Java金融】 */ public class SemaphoreTest { public static void main(String[] args) throws InterruptedException { // 初始化五個車位 Semaphore semaphore = new Semaphore(5); // 等所有車子 final CountDownLatch latch = new CountDownLatch(8); for (int i = 0; i < 8; i++) { int finalI = i; if (i == 5) { Thread.sleep(1000); new Thread(() -> { stopCarNotWait(semaphore, finalI); latch.countDown(); }).start(); continue; } new Thread(() -> { stopCarWait(semaphore, finalI); latch.countDown(); }).start(); } latch.await(); log("總共還剩:" + semaphore.availablePermits() + "個車位"); } private static void stopCarWait(Semaphore semaphore, int finalI) { String format = String.format("車牌號%d", finalI); try { semaphore.acquire(1); log(format + "找到車位了,去停車了"); Thread.sleep(10000); } catch (Exception e) { e.printStackTrace(); } finally { semaphore.release(1); log(format + "開走了"); } } private static void stopCarNotWait(Semaphore semaphore, int finalI) { String format = String.format("車牌號%d", finalI); try { if (semaphore.tryAcquire()) { log(format + "找到車位了,去停車了"); Thread.sleep(10000); log(format + "開走了"); semaphore.release(); } else { log(format + "沒有停車位了,不在這里等了去其他地方停車去了"); } } catch (Exception e) { e.printStackTrace(); } } public static void log(String content) { // 格式化 DateTimeFormatter fmTime = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); // 當前時間 LocalDateTime now = LocalDateTime.now(); System.out.println(now.format(fmTime) + " "+content); } }
2021-03-01 18:54:57 車牌號0找到車位了,去停車了 2021-03-01 18:54:57 車牌號3找到車位了,去停車了 2021-03-01 18:54:57 車牌號2找到車位了,去停車了 2021-03-01 18:54:57 車牌號1找到車位了,去停車了 2021-03-01 18:54:57 車牌號4找到車位了,去停車了 2021-03-01 18:54:58 車牌號5沒有停車位了,不在這里等了去其他地方停車去了 2021-03-01 18:55:07 車牌號7找到車位了,去停車了 2021-03-01 18:55:07 車牌號6找到車位了,去停車了 2021-03-01 18:55:07 車牌號2開走了 2021-03-01 18:55:07 車牌號0開走了 2021-03-01 18:55:07 車牌號3開走了 2021-03-01 18:55:07 車牌號4開走了 2021-03-01 18:55:07 車牌號1開走了 2021-03-01 18:55:17 車牌號7開走了 2021-03-01 18:55:17 車牌號6開走了 2021-03-01 18:55:17 總共還剩:5個車位
從輸出結(jié)果我們可以看到車牌號5這輛車看見沒有車位了,就不在這個地方傻傻的等了,而是去其他地方了,但是車牌號6和車牌號7分別需要等到車庫開出兩輛車空出兩個車位后才停進去。這就體現(xiàn)了Semaphore 的acquire 方法如果沒有獲取到憑證它就會阻塞,而tryAcquire方法如果沒有獲取到憑證不會阻塞的。
semaphore在dubbo中的應用
在Dubbo中可以給Provider配置線程池大小來控制系統(tǒng)提供服務的最大并行度,默認是200。
<dubbo:provider threads="200"/>
比如我現(xiàn)在這個訂單系統(tǒng)有三個接口,分別為創(chuàng)單、取消訂單、修改訂單。這三個接口加起來的并發(fā)是200但是創(chuàng)單接口是核心接口,我想讓它多分點線程來執(zhí)行 讓它可以有最大150個線程,取消訂單和修改訂單分別最大25個線程執(zhí)行就可以了。dubbo提供了executes這一屬性來實現(xiàn)這個功能
<dubbo:service interface="cn.javajr.service.CreateOrderService" executes="150"/> <dubbo:service interface="cn.javajr.service.CancelOrderService" executes="25"/> <dubbo:service interface="cn.javajr.service.EditOrderService" executes="25"/>
我們可以看看dubbo內(nèi)部是如何來executes的,具體實現(xiàn)是在ExecuteLimitFilter這個類我們可以
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); Semaphore executesLimit = null; boolean acquireResult = false; int max = url.getMethodParameter(methodName, Constants.EXECUTES_KEY, 0); if (max > 0) { RpcStatus count = RpcStatus.getStatus(url, invocation.getMethodName()); // 如果當前使用的線程數(shù)量已經(jīng)大于等于設置的閾值,那么直接拋出異常 // if (count.getActive() >= max) { // throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service // using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); /** * http://manzhizhen.iteye.com/blog/2386408 * use semaphore for concurrency control (to limit thread number) */ executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())) { throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max + "\" /> limited."); } } long begin = System.currentTimeMillis(); boolean isSuccess = true; // 計數(shù)器+1 RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); return result; } catch (Throwable t) { isSuccess = false; if (t instanceof RuntimeException) { throw (RuntimeException) t; } else { throw new RpcException("unexpected exception when ExecuteLimitFilter", t); } } finally { // 計數(shù)器-1 RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, isSuccess); if(acquireResult) { executesLimit.release(); } } }
從上述代碼我們也可以看出早期這個是沒有采用Semaphore來實現(xiàn)的,而是直接采用被注釋的 if (count.getActive() >= max) 這個來來實現(xiàn)的,由于這個count.getActive() >= max 和這個計數(shù)加1不是原子性的,所以會有問題,具體bug號可以看https://github.com/apache/dubbo/pull/582后面才采用上述代碼用Semaphore來修復非原子性問題。具體更詳細的分析可以參見代碼的鏈接。不過現(xiàn)在最新版本(2.7.9)我看是采用采用自旋加上和CAS來實現(xiàn)的。
Semaphore
上面就是對Semaphore一個簡單的使用以及dubbo中用到的例子,說句實話Semaphore在工作中用的還是比較少的,不過面試又有可能會被問到,所以還是有必要來一起學習一下它。我們前面《Java高并發(fā)編程基礎之AQS》通過ReentrantLock 一起學習了下AQS,其實Semaphore同樣也是通過AQS來是實現(xiàn)的,我們可以一起來對照下獨占鎖的方法,基本上都是有方法一一相對應的。圖片這里有兩點稍微需要注意的地方:
在獨占鎖模式中,我們只有在獲取了獨占鎖的節(jié)點釋放鎖時,才會喚醒后繼節(jié)點,因為獨占鎖只能被一個線程持有,如果它還沒有被釋放,就沒有必要去喚醒它的后繼節(jié)點。
在共享鎖模式下,當一個節(jié)點獲取到了共享鎖,我們在獲取成功后就可以喚醒后繼節(jié)點了,而不需要等到該節(jié)點釋放鎖的時候,這是因為共享鎖可以被多個線程同時持有,一個鎖獲取到了,則后繼的節(jié)點都可以直接來獲取。因此,在共享鎖模式下,在獲取鎖和釋放鎖結(jié)束時,都會喚醒后繼節(jié)點。
獲取憑證
我們同樣還是通過非公平鎖的模式來獲取憑證 我們可以看下acquire的核心方法
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } // 主要看下這個方法,這個方法返回的值也就是tryAcquireShared返回的值,因為tryAcquireShared->nonfairTryAcquireShared final int nonfairTryAcquireShared(int acquires) { //自旋 for (;;) { //Semaphore用AQS的state變量的值代表可用許可數(shù) int available = getState(); //可用許可數(shù)減去本次需要獲取的許可數(shù)即為剩余許可數(shù) int remaining = available - acquires; //如果剩余許可數(shù)小于0或者CAS將當前可用許可數(shù)設置為剩余許可數(shù)成功,則返回成功許可數(shù) if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; }
當tryAcquireShared 獲取返回許可書小于0時說明獲取許可失敗需要進入doAcquireSharedInterruptibly這個方法去休眠。
當tryAcquireShared 獲取返回許可書小于0時說明獲取許可成功直接結(jié)束。
doAcquireSharedInterruptibly
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { // 獨占鎖的acquireQueued調(diào)用的是addWaiter(Node.EXCLUSIVE), // 而共享鎖調(diào)用的是addWaiter(Node.SHARED),表明了該節(jié)點處于共享模式 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
這個方法是不是跟我們上篇文章講的AQS的獨占鎖的acquireQueued很像,不過獨占鎖它是直接調(diào)用了用了setHead(node)方法,而共享鎖調(diào)用的是setHeadAndPropagate(node, r)這個方法除了調(diào)用setHead 里面還調(diào)用了doReleaseShared(喚醒后繼節(jié)點)
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
其他的方法基本上是和ReentrantLock來實現(xiàn)的獨占鎖差不多,我相信大家對源碼分析感興趣的應該也不多,其他更多細節(jié)問題還是需要自己親自動手去看源碼的。
總結(jié)
當信號量Semaphore初始化設置許可證為1 時,它也可以當作互斥鎖使用。其中0、1就相當于它的狀態(tài),當=1時表示其他線程可以獲取,當=0時,排他,即其他線程必須要等待。
Semaphore是JUC包中的一個很簡單的工具類,用來實現(xiàn)多線程下對于資源的同一時刻的訪問線程數(shù)限制
Semaphore中存在一個【許可】的概念,即訪問資源之前,先要獲得許可,如果當前許可數(shù)量為0,那么線程阻塞,直到獲得許可
Semaphore內(nèi)部使用AQS實現(xiàn),由抽象內(nèi)部類Sync繼承了AQS。因為Semaphore天生就是共享的場景,所以其內(nèi)部實際上類似于共享鎖的實現(xiàn)
共享鎖的調(diào)用框架和獨占鎖很相似,它們最大的不同在于獲取鎖的邏輯——共享鎖可以被多個線程同時持有,而獨占鎖同一時刻只能被一個線程持有。
由于共享鎖同一時刻可以被多個線程持有,因此當頭節(jié)點獲取到共享鎖時,可以立即喚醒后繼節(jié)點來爭鎖,而不必等到釋放鎖的時候。因此,共享鎖觸發(fā)喚醒后繼節(jié)點的行為可能有兩處,一處在當前節(jié)點成功獲得共享鎖后,一處在當前節(jié)點釋放共享鎖后。
采用semaphore來進行限流的話會產(chǎn)生突刺現(xiàn)象。
★指在一定時間內(nèi)的一小段時間內(nèi)就用完了所有資源,后大部分時間中無資源可用。比如在限流方法中的計算器算法,設置1s內(nèi)的最大請求數(shù)為100,在前100ms已經(jīng)有了100個請求,則后面900ms將無法處理請求,這就是突刺現(xiàn)象。
到此,相信大家對“如何使用Java高并發(fā)編程之Semaphore”有了更深的了解,不妨來實際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進入相關(guān)頻道進行查詢,關(guān)注我們,繼續(xù)學習!
免責聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。