溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

什么是SynchronousQueue

發(fā)布時間:2021-10-18 11:27:35 來源:億速云 閱讀:124 作者:iii 欄目:web開發(fā)

這篇文章主要介紹“什么是SynchronousQueue”,在日常操作中,相信很多人在什么是SynchronousQueue問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”什么是SynchronousQueue”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

大劉有段時間經(jīng)常會給一些程序員講課。這一方面是由于團隊培訓(xùn)的需要,一方面也是大劉自身想搞搞凡爾賽,嘚瑟一下自身的實力。

大劉講課是允許公司任何一個人進去聽的。提前一個星期把主題公布在公司群里,有人想聽到日子直接去就是了。

有一次,大劉在聊并發(fā)話題的時候,為了彰顯自己確實是個并發(fā)達人,用了個 SynchronousQueue  舉例子。他說這個隊列其實沒有容積的概念,就是線程持有數(shù)據(jù)互相匹配。

嗯,談到這里還是要說一下,大劉其實也不太懂  SynchronousQueue。只是一來這東西沒人用,自然就沒人懂;二來它的概念也比較晦澀,有些時候比較違背直覺,所以,即使隨口說的一些話可能不太對,也未必會被發(fā)現(xiàn),還能給人一種不明覺厲的感覺。

大劉用過幾次,感覺良好。因此沒事兒就要秀一下 SynchronousQueue,表示自己這么生僻的也懂,并發(fā)達人的名頭是沒有叫錯的。

也就那一次,恰恰被人拆了臺。

當時課上來了個新入職的技術(shù),此人長得中等身材,相貌平平,只是臉卻長的像種地多年的老農(nóng)的巴掌。臉上的疙瘩如同老農(nóng)巴掌上的老繭。這人姓張,這里由于他臉長得像個大巴掌,那就暫且叫他巴掌張。

這個巴掌張打斷了大劉的話,言之鑿鑿說大劉說的是錯的,說他看過這個 SynchronousQueue,并不是大劉說的這樣。

大劉有點心虛,脖子滲出了一圈汗,但是并發(fā)達人的稱呼大劉并不想丟掉。于是說了一大堆云里霧里的廢話,把話題帶偏了開去。并告訴巴掌張,下回要和他在這個舞臺上  PK 一二, 要好好看看誰是真正的 SynchronousQueue 的知心朋友。

由于大劉感覺被巴掌張的巴掌糊了臉,便就此下了決心要研究透 SynchronousQueue。

Google 和百度一起查,東西合璧,洋為中用,搞了好是一陣子。最后有個犄角旮旯的小破網(wǎng)站,有人說了這么一句話:

SynchronousQueue  的目的就是為了接頭,為了匹配,當接上頭了就雙方合作愉快,整個工作完成。但是一旦在接頭中,任何一方還沒到達,那么另一方就必須阻塞著等待。

這句話一下子就敲開了大劉的腦殼,讓聰明的智商重新占領(lǐng)了高地。

為啥這句話就點亮了大劉那本來已經(jīng)像燈泡的腦袋了呢?因為大劉想起了他每次的面試經(jīng)歷,就和這個接頭是一樣的。

大劉每次去面試,都很規(guī)矩的提前趕到新公司。但是大部分情況,時間到了之后都需要等很長時間才開始面試。大劉那時候也年輕,只是以為領(lǐng)導(dǎo)忙,所以倒也恭恭敬敬的等著。

直到大劉自己當了領(lǐng)導(dǎo),去面試別人的時候,被 HR  委婉的提醒了下,要讓候選人等一會兒再過去,顯的公司業(yè)務(wù)很忙,讓候選人對公司保持一定的敬畏。那時候,大劉才知道這是一種 PUA 術(shù)……

大劉對照著自己的面試經(jīng)歷,一下就理解了 SynchronousQueue 的概念。

SynchronousQueue 本身是為了交接、匹配而存在的。當一個線程往 SynchronousQueue  放東西,發(fā)現(xiàn)沒線程在等著拿,就給阻塞掉——這就像面試者來早了等面試官。

當一個線程去 SynchronousQueue 拿東西,發(fā)現(xiàn)沒東西,就去等的時候——就像面試官來早了等面試者。

搞懂 SynchronousQueue 的時候,正是一個冬天,屋外面的寒風(fēng)在虎虎生威,屋里面的大劉在熠熠生輝。

只是一個堂而皇之擺在 JDK 底層并發(fā)包中的隊列結(jié)構(gòu),SynchronousQueue 當然沒那么簡單,里面還存在著億點點細節(jié)。

所以,大劉在整體方向搞懂之后,開始研究起了細節(jié)。他要奮發(fā),狠狠把巴掌張的囂張氣焰壓下去,大劉要當公司技術(shù)的頭牌。

回到現(xiàn)實里,SynchronousQueue  真正的目的就是為了讓兩個線程的工作結(jié)果進行交接。這沒什么問題。但是,在這個交接中是需要嚴格保密的,沒有人可以窺視。

嗯,沒錯,就和你約了女朋友去鐘點房那樣的不能被窺視。

好,圍繞這個 SynchronousQueue 的鐘點房,咱們通過源代碼,來看這億點點細節(jié)。

首先,鐘點房嚴格保密,里面是多少人,就不能讓人知道。所以,就不能讓別人通過方法得到具體的數(shù)據(jù)。對于 SynchronousQueue 來說,自然就是通過  size() 你得不到什么信息。

/** * Always returns zero. * A {@code SynchronousQueue} has no internal capacity. * * @return zero */ public int size() {   return 0; }  /** * Always returns {@code true}. * A {@code SynchronousQueue} has no internal capacity. * * @return {@code true} */ public boolean isEmpty() {   return true; }

其次,鐘點房也不能隨便進去查房,看看都是誰。所以,自然就不能迭代。

/** * Returns an empty iterator in which {@code hasNext} always returns * {@code false}. * * @return an empty iterator */ public Iterator<E> iterator() {   return Collections.emptyIterator(); }

再次,鐘點房保護隱私,它也不能讓你鉆了漏子,不告訴你 XXX 是不是躲在了鐘點房里。所以,你也不能知道鐘點房里有沒有這個人。

/** * Always returns {@code false}. * A {@code SynchronousQueue} has no internal capacity. * * @param o the element * @return {@code false} */ public boolean contains(Object o) {   return false; }  /** * Returns {@code false} unless the given collection is empty. * A {@code SynchronousQueue} has no internal capacity. * * @param c the collection * @return {@code false} unless given collection is empty */ public boolean containsAll(Collection<?> c) {   return c.isEmpty(); }

自然,鐘點房也沒什么權(quán)力趕人出去。

/** * Always returns {@code false}. * A {@code SynchronousQueue} has no internal capacity. * * @param o the element to remove * @return {@code false} */ public boolean remove(Object o) {   return false; }

當然,作為一個商業(yè)化的鐘點房,SynchronousQueue 還是很注意安全的,它貼心的提供了緊急轉(zhuǎn)移的手段。

/** * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException            {@inheritDoc} * @throws NullPointerException          {@inheritDoc} * @throws IllegalArgumentException      {@inheritDoc} */ public int drainTo(Collection<? super E> c) {   if (c == null)     throw new NullPointerException();   if (c == this)     throw new IllegalArgumentException();      int n = 0;     for (E e; (e = poll()) != null;) {       c.add(e);       ++n;     }   return n; }  /**  * @throws UnsupportedOperationException {@inheritDoc} * @throws ClassCastException            {@inheritDoc} * @throws NullPointerException          {@inheritDoc} * @throws IllegalArgumentException      {@inheritDoc} */ public int drainTo(Collection<? super E> c, int maxElements) {   if (c == null)     throw new NullPointerException();   if (c == this)     throw new IllegalArgumentException();      int n = 0;     for (E e; n < maxElements && (e = poll()) != null;) {       c.add(e);       ++n;     }   return n; }

最后,鐘點房就只能搞搞交接工作了。交接嗎,自然是有交有接的,交的就得帶東西。

public void put(E e) throws InterruptedException {   if (e == null) throw new NullPointerException();   // put:帶著東西進屋子   if (transferer.transfer(e, false, 0) == null) {     Thread.interrupted();     throw new InterruptedException();   } }

接的肯定不會帶著東西,得留地方拿東西。

public E take() throws InterruptedException {   // take:從屋子里把東西拿出來   E e = transferer.transfer(null, false, 0);   if (e != null)     return e;   Thread.interrupted();   throw new InterruptedException(); }

但是呢,這交接工作啊,得在專人安排下進行。

為什么需要專人來幫忙?因為有時候我們的鐘點房太受歡迎了,客人多,得排隊管管。管這些排隊的就是 Transfer,它是鐘點房的經(jīng)理。

/** * The transferer. Set only in constructor, but cannot be declared * as final without further complicating serialization.  Since * this is accessed only at most once per public method, there * isn't a noticeable performance penalty for using volatile * instead of final here. */ private transient volatile Transferer<E> transferer;  /** * Shared internal API for dual stacks and queues. */ abstract static class Transferer<E> {   /**   * Performs a put or take.   *   * @param e if non-null, the item to be handed to a consumer;   * if null, requests that transfer return an item   * offered by producer.   * @param timed if this operation should timeout   * @param nanos the timeout, in nanoseconds   * @return if non-null, the item provided or received; if null,   * the operation failed due to timeout or interrupt --   * the caller can distinguish which of these occurred   * by checking Thread.interrupted.   */   abstract E transfer(E e, boolean timed, long nanos); }

Transfer 經(jīng)理每次開門營業(yè)的時候,會收到總部給的牌子,告訴他管理工作要注意方式方法,比如公平有效,比如優(yōu)先服務(wù) VIP 客人之類的。

/** * 默認給vip客人開點后門 */ public SynchronousQueue() {   this(false); }  /** * 總部遞牌子,告訴Transfer到底是公平還是不公平, */ public SynchronousQueue(boolean fair) {   transferer = fair ? new TransferQueue<E>() : new TransferStack<E>(); }

先看看適合勞苦大眾的公平模式,先來先享受,晚來沒折扣。

static final class TransferQueue<E> extends Transferer<E> {   static final class QNode{...}   transient volatile QNode head;       transient volatile QNode tail;   transient volatile QNode cleanMe;   TransferQueue() {  //經(jīng)典的鏈表套路,先搞個虛擬的頭結(jié)點     QNode h = new QNode(null, false);      head = h;     tail = h;   }   &hellip;&hellip;   &hellip;&hellip;

QNode 就是 Transfer 經(jīng)理需要的牌子,上面記錄點信息,別到時候弄錯了。

static final class QNode {   volatile QNode next; // 下一個排隊的哥們兒   volatile Object item; // 這次哥們帶來的要交接的東西   volatile Thread waiter; // 交接的線程   final boolean isData; // isData == true表示帶著東西    QNode(Object item, boolean isData) {     this.item = item;     this.isData = isData;   }     // ...省略一系列CAS方法 }

怎么搞,秘密都在 transfer() 里。

@SuppressWarnings("unchecked")   E transfer(E e, boolean timed, long nanos) {   //...先省略細節(jié)         }

transfer 本質(zhì)就是一直在等待交接完成或者交接被中斷,被取消,或者等待超時。

for (;;) {   QNode t = tail;   QNode h = head;  //因為初始化是在構(gòu)造函數(shù)里搞得,可能構(gòu)造函數(shù)沒有執(zhí)行完,就被用上了,就會出現(xiàn)t或者h為null的情況   if (t == null || h == null)              continue; //啥也不能做       //h==t表示沒人,t.isData == isData表示過來的哥們和前面的哥們目的一樣,那就只能考慮排隊等著了。   if (h == t || t.isData == isData) {      QNode tn = t.next;     //線程不安全需要考慮的,現(xiàn)在的尾巴不對,指錯了,重新確認下   if (t != tail)                         continue;          //隊尾確定了,發(fā)現(xiàn)又來了人,把尾巴指向新來的人     if (tn != null) {                    advanceTail(t, tn);       continue;     }        //超時了,別等了     if (timed && nanos <= 0)       return null;          //總算沒事兒了,哥們可以登記進屋了     if (s == null)       s = new QNode(e, isData);          //中間可能有人插隊,只能再等等     if (!t.casNext(null, s))               continue;        //準備進屋等著約的人     advanceTail(t, s);                   Object x = awaitFulfill(s, e, timed, nanos);        //同一個人出來,那就是任務(wù)失敗了     if (x == s) {       //清理下                          clean(t, s);       return null;     }          if (!s.isOffList()) { //還沒脫隊       advanceHead(t, s); //排前面單獨處理       if (x != null) //交接成功設(shè)一下標記         s.item = s;         s.waiter = null;     }          return (x != null) ? (E)x : e;

這段是不是看著很頭痛?其實 Transfer 這小子也頭痛。

它首先要面臨的第一個問題:資源競爭的問題。

客人源源不斷的來,由于 Transfer  強迫癥,他想每次必須從絕對的隊頭或者隊尾巴開始,所以,每次都要判斷下,到底他看到的隊頭或者隊尾,是不是真正的隊頭、隊尾。

確定沒問題了,新來的客人就開始被打造成真正的隊尾。

然后,成為隊尾的哥們就可以等著屬于自己的 Mr.Right 過來交接了。等著交接一直到成功或者失敗的方法就是 awaitFulfill(t,  tn)。

這邊有人在等待,同時另外一邊,交接的人們也開始陸續(xù)過來了。

else { // complementary-mode   QNode m = h.next; // node to fulfill   if (t != tail || m == null || h != head)     continue; // inconsistent read      Object x = m.item;     if (isData == (x != null) || // m already fulfilled       x == m || // m cancelled       !m.casItem(x, e)) { // 交接的核心語句         advanceHead(h, m); // dequeue and retry         continue;       }    advanceHead(h, m); // successfully fulfilled   LockSupport.unpark(m.waiter);   return (x != null) ? (E)x : e; }

交接最核心的其實就是 m.casItem(x, e)。交接成功,大家各回各家了。

整體的流程如下:

開始就是個經(jīng)典鏈表開局,head = tail

什么是SynchronousQueue

陸續(xù)開始有節(jié)點鏈接,put 的時候,isData = true;take 的時候,isData = false

什么是SynchronousQueue

可能會同時有很多的 put 操作,沒有對應(yīng)的 take 操作,他們就按照次序一個個鏈接起來,形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的  take

什么是SynchronousQueue

也可能同時會有很多的 take 操作,而沒有對應(yīng)的 put 操作,會形成鏈表,并通過 awaitFulfill 方法等著對應(yīng)的 put

什么是SynchronousQueue

take 操作會從鏈表頭開始找匹配的 put,然后通過 casItem 方法交接

什么是SynchronousQueue

put 操作會從鏈表頭開始找匹配的 take,然后通過 casItem 方法交接

什么是SynchronousQueue

所以,SynchronousQueue 你可以看到了,專門就是搞交接任務(wù)。

  • put 的哥們發(fā)現(xiàn)沒人 take,就等在那里,等著take操作。

  • take的哥們兒發(fā)現(xiàn)沒人put,也會等在那里,等著put操作。

這就是我們的 SynchronousQueue 鐘點房做的事情。

OK,鐘點房既然開門做生意,它也要賺錢的嘛。所以,它還得搞搞 VIP 客戶收費,也得為 VIP 客戶搞一些優(yōu)待。

對于這些 VIP 客人,我們的 Transfer  經(jīng)理會特意安排下,以棧的形式來安排客人,越后來的客人越大牌兒。所以,自然是后來的客人會優(yōu)先搞定交接了。這里簡短的介紹下,就不再贅述了。

Transfer 化身成 TransferStack,后來的優(yōu)先服務(wù)。

開始自然是鏈表開局,一個無意義的鏈表頭指向了 null

什么是SynchronousQueue

發(fā)現(xiàn)鏈表是空了,二話不說,客官,您進來先啦

什么是SynchronousQueue

和 TransferQueue 一樣,如果都是 take 過來,模式就是 REQUEST,就得排隊了

什么是SynchronousQueue

交接人出現(xiàn),哥們可以收攤兒了

什么是SynchronousQueue

其余的不說了,一樣的,說多了沒勁

話說,大劉搞清楚了這些細節(jié)之后,次日,當巴掌張再次進行挑釁時,大劉徹底穩(wěn)下來了。

當挨個把細節(jié)講的一清二楚之后,看著巴掌張那張落寞的巴掌臉,瞬間也不覺得像巴掌了,而是像是在猜拳中出的石頭剪刀布中的布。大劉沒忍住,對著這個布比劃出了個剪刀,光榮的結(jié)束了戰(zhàn)斗。

大劉依然在技術(shù)流中獨占鰲頭。

到此,關(guān)于“什么是SynchronousQueue”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注億速云網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI