溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

Java中Condition類的示例分析

發(fā)布時間:2021-09-01 13:37:21 來源:億速云 閱讀:112 作者:小新 欄目:開發(fā)技術(shù)

這篇文章主要介紹了Java中Condition類的示例分析,具有一定借鑒價值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

一 condition 介紹及demo

 Condition是在java 1.5中才出現(xiàn)的,它用來替代傳統(tǒng)的Object的wait()、notify()實(shí)現(xiàn)線程間的協(xié)作,相比使用Object的wait()、notify(),使用Condition的await()、signal()這種方式實(shí)現(xiàn)線程間協(xié)作更加安全和高效。因此通常來說比較推薦使用Condition,阻塞隊列實(shí)際上是使用了Condition來模擬線程間協(xié)作。

Condition是個接口,基本的方法就是await()和signal()方法;
Condition依賴于Lock接口,生成一個Condition的基本代碼是lock.newCondition()
調(diào)用Condition的await()和signal()方法,都必須在lock保護(hù)之內(nèi),就是說必須在lock.lock()和lock.unlock之間才可以使用
Conditon中的await()對應(yīng)Object的wait();

Condition中的signal()對應(yīng)Object的notify();

Condition中的signalAll()對應(yīng)Object的notifyAll()。

Java中Condition類的示例分析

condition常見例子arrayblockingqueue。下面是demo:

package thread;
 
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * 
 * @author zhangliang
 *
 * 2016年4月8日 下午5:48:54
 */
public class ConTest {
	
	 final Lock lock = new ReentrantLock();
	 final Condition condition = lock.newCondition();
 
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		ConTest test = new ConTest();
	    Producer producer = test.new Producer();
	    Consumer consumer = test.new Consumer();
	          
	    
	    consumer.start(); 
	    producer.start();
	}
	
	 class Consumer extends Thread{
         
	        @Override
	        public void run() {
	            consume();
	        }
	          
	        private void consume() {
	           	             
	                try {
	                	   lock.lock();
	                    System.out.println("我在等一個新信號"+this.currentThread().getName());
	                    condition.await();
	                    
	                } catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					} finally{
						System.out.println("拿到一個信號"+this.currentThread().getName());
	                    lock.unlock();
	                }
	            
	        }
	    }
	 
	 class Producer extends Thread{
         
	        @Override
	        public void run() {
	            produce();
	        }
	          
	        private void produce() {	             
	                try {
	                	   lock.lock();
	                       System.out.println("我拿到鎖"+this.currentThread().getName());
	                        condition.signalAll();	                         
	                    System.out.println("我發(fā)出了一個信號:"+this.currentThread().getName());
	                } finally{
	                    lock.unlock();
	                }
	            }
	 }
	    
}

運(yùn)行結(jié)果:

Java中Condition類的示例分析

Condition的執(zhí)行方式,是當(dāng)在線程Consumer中調(diào)用await方法后,線程Consumer將釋放鎖,并且將自己沉睡,等待喚醒,線程Producer獲取到鎖后,開始做事,完畢后,調(diào)用Condition的signalall方法,喚醒線程Consumer,線程Consumer恢復(fù)執(zhí)行。

以上說明Condition是一個多線程間協(xié)調(diào)通信的工具類,使得某個,或者某些線程一起等待某個條件(Condition),只有當(dāng)該條件具備( signal 或者 signalAll方法被帶調(diào)用)時 ,這些等待線程才會被喚醒,從而重新爭奪鎖。

Condition實(shí)現(xiàn)生產(chǎn)者、消費(fèi)者模式:

package thread;
 
import java.util.PriorityQueue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
 
public class ConTest2 {
	    private int queueSize = 10;
	    private PriorityQueue<Integer> queue = new PriorityQueue<Integer>(queueSize);
	    private Lock lock = new ReentrantLock();
	    private Condition notFull = lock.newCondition();
	    private Condition notEmpty = lock.newCondition();
	     
	    public static void main(String[] args) throws InterruptedException  {
	    	ConTest2 test = new ConTest2();
	        Producer producer = test.new Producer();
	        Consumer consumer = test.new Consumer();	          
	        producer.start();
	        consumer.start();
	        Thread.sleep(0);
	        producer.interrupt();
	        consumer.interrupt();
	    }
	      
	    class Consumer extends Thread{	          
	        @Override
	        public void run() {
	            consume();
	        }
	        volatile boolean flag=true;  
	        private void consume() {
	            while(flag){
	                lock.lock();
	                try {
	                    while(queue.isEmpty()){
	                        try {
	                            System.out.println("隊列空,等待數(shù)據(jù)");
	                            notEmpty.await();
	                        } catch (InterruptedException e) {	                          
	                            flag =false;
	                        }
	                    }
	                    queue.poll();                //每次移走隊首元素
	                    notFull.signal();
	                    System.out.println("從隊列取走一個元素,隊列剩余"+queue.size()+"個元素");
	                } finally{
	                    lock.unlock();
	                }
	            }
	        }
	    }
	      
	    class Producer extends Thread{	          
	        @Override
	        public void run() {
	            produce();
	        }
	        volatile boolean flag=true;  
	        private void produce() {
	            while(flag){
	                lock.lock();
	                try {
	                    while(queue.size() == queueSize){
	                        try {
	                            System.out.println("隊列滿,等待有空余空間");
	                            notFull.await();
	                        } catch (InterruptedException e) {
	                            
	                            flag =false;
	                        }
	                    }
	                    queue.offer(1);        //每次插入一個元素
	                    notEmpty.signal();
	                    System.out.println("向隊列取中插入一個元素,隊列剩余空間:"+(queueSize-queue.size()));
	                } finally{
	                    lock.unlock();
	                }
	            }
	        }
	    }
	}

運(yùn)行結(jié)果:

Java中Condition類的示例分析

二 Condition接口

condition可以通俗的理解為條件隊列。當(dāng)一個線程在調(diào)用了await方法以后,直到線程等待的某個條件為真的時候才會被喚醒。這種方式為線程提供了更加簡單的等待/通知模式。Condition必須要配合鎖一起使用,因?yàn)閷蚕頎顟B(tài)變量的訪問發(fā)生在多線程環(huán)境下。一個Condition的實(shí)例必須與一個Lock綁定,因此Condition一般都是作為Lock的內(nèi)部實(shí)現(xiàn)。

await() :造成當(dāng)前線程在接到信號或被中斷之前一直處于等待狀態(tài)。
await(long time, TimeUnit unit) :造成當(dāng)前線程在接到信號、被中斷或到達(dá)指定等待時間之前一直處于等待狀態(tài)
awaitNanos(long nanosTimeout) :造成當(dāng)前線程在接到信號、被中斷或到達(dá)指定等待時間之前一直處于等待狀態(tài)。返回值表示剩余時間,如果在nanosTimesout之前喚醒,那么返回值 = nanosTimeout - 消耗時間,如果返回值 <= 0 ,則可以認(rèn)定它已經(jīng)超時了。

awaitUninterruptibly() :造成當(dāng)前線程在接到信號之前一直處于等待狀態(tài)?!咀⒁猓涸摲椒▽χ袛嗖幻舾小俊?br/>awaitUntil(Date deadline) :造成當(dāng)前線程在接到信號、被中斷或到達(dá)指定最后期限之前一直處于等待狀態(tài)。如果沒有到指定時間就被通知,則返回true,否則表示到了指定時間,返回返回false。
signal() :喚醒一個等待線程。該線程從等待方法返回前必須獲得與Condition相關(guān)的鎖。
signal()All :喚醒所有等待線程。能夠從等待方法返回的線程必須獲得與Condition相關(guān)的鎖。

三 condition實(shí)現(xiàn)分析:

Java中Condition類的示例分析

Condition接口包含了多種await方式和兩個通知方法
ConditionObject實(shí)現(xiàn)了Condition接口,是AbstractQueuedSynchronizer的內(nèi)部類(因?yàn)镃ondition的操作都需要獲取想關(guān)聯(lián)的鎖)
Reentrantlock的newCondition方法返回與某個lock實(shí)例相關(guān)的Condition對象

public abstract class AbstractQueuedLongSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {

結(jié)合上面的類圖,我們看到condition實(shí)現(xiàn)是依賴于aqs,而aqs是個抽象類。里面定義了同步器的基本框架,實(shí)現(xiàn)了基本的結(jié)構(gòu)功能。只留有狀態(tài)條件的維護(hù)由具體同步器根據(jù)具體場景來定制,如常見的 ReentrantLock 、 RetrantReadWriteLock和CountDownLatch 等等,

3.1 等待隊列

Condition是AQS的內(nèi)部類。每個Condition對象都包含一個隊列(等待隊列)。等待隊列是一個FIFO的隊列,在隊列中的每個節(jié)點(diǎn)都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調(diào)用了Condition.await()方法,那么該線程將會釋放鎖、構(gòu)造成節(jié)點(diǎn)加入等待隊列并進(jìn)入等待狀態(tài)。AQS有一個同步隊列和多個等待隊列,節(jié)點(diǎn)都是Node。等待隊列的基本結(jié)構(gòu)如下所示。

Java中Condition類的示例分析

等待分為首節(jié)點(diǎn)和尾節(jié)點(diǎn)。當(dāng)一個線程調(diào)用Condition.await()方法,將會以當(dāng)前線程構(gòu)造節(jié)點(diǎn),并將節(jié)點(diǎn)從尾部加入等待隊列。新增節(jié)點(diǎn)就是將尾部節(jié)點(diǎn)指向新增的節(jié)點(diǎn)。節(jié)點(diǎn)引用更新本來就是在獲取鎖以后的操作,所以不需要CAS保證。同時也是線程安全的操作。

public class ConditionObject implements Condition, java.io.Serializable {
    private static final long serialVersionUID = 1173984872572414699L;
    /** First node of condition queue. */
    private transient Node firstWaiter;
    /** Last node of condition queue. */
    private transient Node lastWaiter;

3.2 等待

當(dāng)線程調(diào)用了Condition的await()方法以后。線程就作為隊列中的一個節(jié)點(diǎn)被加入到等待隊列中去了。同時會釋放鎖的擁有。當(dāng)從await方法返回的時候。當(dāng)前線程一定會獲取condition相關(guān)聯(lián)的鎖。

如果從隊列(同步隊列和等待隊列)的角度去看await()方法,當(dāng)調(diào)用await()方法時,相當(dāng)于同步隊列的首節(jié)點(diǎn)(獲取鎖的節(jié)點(diǎn))移動到Condition的等待隊列中。

調(diào)用該方法的線程成功的獲取鎖的線程,也就是同步隊列的首節(jié)點(diǎn),該方法會將當(dāng)前線程構(gòu)造成節(jié)點(diǎn)并加入到等待隊列中,然后釋放同步狀態(tài),喚醒同步隊列中的后繼節(jié)點(diǎn),然后當(dāng)前線程會進(jìn)入等待狀態(tài)。

當(dāng)?shù)却犃兄械墓?jié)點(diǎn)被喚醒的時候,則喚醒節(jié)點(diǎn)的線程開始嘗試獲取同步狀態(tài)。如果不是通過 其他線程調(diào)用Condition.signal()方法喚醒,而是對等待線程進(jìn)行中斷,則會拋出InterruptedException異常信息。

Java中Condition類的示例分析

我們看一下這個await的方法,它是AQS的方法,

public final void await() throws InterruptedException {
	if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); //將當(dāng)前線程包裝下后,
//添加到Condition自己維護(hù)的一個鏈表中。
int savedState = fullyRelease(node);//釋放當(dāng)前線程占有的鎖,從demo中看到,
//調(diào)用await前,當(dāng)前線程是占有鎖的

int interruptMode = 0;
	 while (!isOnSyncQueue(node)) {//釋放完畢后,遍歷AQS的隊列,看當(dāng)前節(jié)點(diǎn)是否在隊列中,
//不在 說明它還沒有競爭鎖的資格,所以繼續(xù)將自己沉睡。
//直到它被加入到隊列中,聰明的你可能猜到了,
//沒有錯,在singal的時候加入不就可以了?
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//被喚醒后,重新開始正式競爭鎖,同樣,如果競爭不到還是會將自己沉睡,等待喚醒重新開始競爭。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

結(jié)合代碼去看,同步隊列的首節(jié)點(diǎn) 并不會直接加入等待隊列,而是通過addConditionWaiter把當(dāng)前線程構(gòu)造成一個新節(jié)點(diǎn)并加入到等待隊列中。

/**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

3.3 通知

調(diào)用Condition的signal()方法,將會喚醒在等待隊列中等待最長時間的節(jié)點(diǎn)(條件隊列里的首節(jié)點(diǎn)),在喚醒節(jié)點(diǎn)前,會將節(jié)點(diǎn)移到同步隊列中。當(dāng)前線程加入到等待隊列中如圖所示:

Java中Condition類的示例分析

回到上面的demo,鎖被釋放后,線程Consumer開始沉睡,這個時候線程因?yàn)榫€程Consumer沉睡時,會喚醒AQS隊列中的頭結(jié)點(diǎn),所所以線程Producer會開始競爭鎖,并獲取到,執(zhí)行完后線程Producer會調(diào)用signal方法,“發(fā)出”signal信號,signal方法如下:

public final void signal() {
	 if (!isHeldExclusively())
	 throw new IllegalMonitorStateException();
	 Node first = firstWaiter; //firstWaiter為condition自己維護(hù)的一個鏈表的頭結(jié)點(diǎn),
	                          //取出第一個節(jié)點(diǎn)后開始喚醒操作
	 if (first != null)
	 doSignal(first);
	 }

在調(diào)用signal()方法之前必須先判斷是否獲取到了鎖(isHeldExclusively方法)。接著獲取等待隊列的首節(jié)點(diǎn),將其移動到同步隊列并且利用LockSupport喚醒節(jié)點(diǎn)中的線程。
被喚醒的線程將從await方法中的while循環(huán)中退出( while (!isOnSyncQueue(node)) { 方法返回true,節(jié)點(diǎn)已經(jīng)在同步隊列中)。隨后調(diào)用同步器的acquireQueued()方法加入到同步狀態(tài)的競爭當(dāng)中去。成功獲取到競爭的線程從先前調(diào)用await方法返回,此時該線程已經(jīng)成功獲取了鎖。

AQS的同步隊列與Condition的等待隊列,兩個隊列的作用是不同,事實(shí)上,每個線程也僅僅會同時存在以上兩個隊列中的一個,流程是這樣的:

Java中Condition類的示例分析

注意:

1.線程producer調(diào)用signal方法,這個時候Condition的等待隊列中只有線程Consumer一個節(jié)點(diǎn),于是它被取出來,并被加入到AQS的等待隊列中。 注意,這個時候,線程Consumer 并沒有被喚醒。

2.Sync是AQS的抽象子類,實(shí)現(xiàn)可重入和互斥的大部分功能。在Sync的子類中有FairSync和NonfairSync兩種代表公平鎖策略和非公平鎖策略。Sync lock方法留給子類去實(shí)現(xiàn),NonfairSync的實(shí)現(xiàn):

 final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

其中如果一開始獲取鎖成功,是直接設(shè)置當(dāng)前線程。
否則執(zhí)行acquire(1),也就是進(jìn)入aqs等待隊列。這里不展開細(xì)節(jié)。

可以這樣理解,整個協(xié)作過程是靠結(jié)點(diǎn)在AQS的等待隊列和Condition的等待隊列中來回移動實(shí)現(xiàn)的,每個隊列的意義不同,Condition作為一個條件類,很好的自己維護(hù)了一個等待信號的隊列,并在適時的時候?qū)⒔Y(jié)點(diǎn)加入到AQS的等待隊列中來實(shí)現(xiàn)的喚醒操作

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“Java中Condition類的示例分析”這篇文章對大家有幫助,同時也希望大家多多支持億速云,關(guān)注億速云行業(yè)資訊頻道,更多相關(guān)知識等著你來學(xué)習(xí)!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI