溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

java并發(fā)編程之同步器代碼示例

發(fā)布時(shí)間:2020-09-08 16:53:41 來源:腳本之家 閱讀:198 作者:Blessing_H 欄目:編程語(yǔ)言

同步器是一些使線程能夠等待另一個(gè)線程的對(duì)象,允許它們協(xié)調(diào)動(dòng)作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger

隊(duì)列同步器AbstractQueuedSynchronizer是用來構(gòu)建鎖或者其他同步組件的基礎(chǔ)框架,它內(nèi)部使用了一個(gè)volatiole修飾的int類型的成員變量state來表示同步狀態(tài),通過內(nèi)置的FIFO隊(duì)列來完成資源獲取線程的排隊(duì)工作。

同步器的主要使用方式是繼承,子類通過繼承同步器并實(shí)現(xiàn)它的抽象方法來管理同步狀態(tài),在抽象方法的實(shí)現(xiàn)過程中免不了要對(duì)同步狀態(tài)進(jìn)行修改,這時(shí)就需要使用同步器來提供的3個(gè)方法(getState()、setState(intnewState)/和compareAndSetState(intexpect,intupdate))來進(jìn)行操作,因?yàn)樗麄兡軌虮WC狀態(tài)的改變是安全的。子類推薦被定義為自定義同步組件的靜態(tài)內(nèi)部類,同步器自身沒有實(shí)現(xiàn)任何同步接口,它僅僅是定義了若干同步狀態(tài)獲取個(gè)釋放的方法來供自定義同步組件使用,同步器既可以獨(dú)占式的獲取同步狀態(tài),也可以支持共享式的獲取同步狀態(tài),這樣就可以方便實(shí)現(xiàn)不同類型的同步組件(ReentrantLock、ReadWriteLock、和CountDownLatch等)。

同步器是實(shí)現(xiàn)鎖的關(guān)鍵,在鎖的實(shí)現(xiàn)中聚合同步器,利用同步器實(shí)現(xiàn)鎖的語(yǔ)義。他們二者直接的關(guān)系就是:鎖是面向使用者的,它定義了使用者與鎖交互的接口,隱藏了實(shí)現(xiàn)的細(xì)節(jié);同步器則是面向鎖的實(shí)現(xiàn)者,它簡(jiǎn)化了鎖的實(shí)現(xiàn)方式,屏蔽了同步狀態(tài)管理、線程的排隊(duì)、等待與喚醒等底層操作。鎖和同步器很好的隔離了使用者與實(shí)現(xiàn)者所需關(guān)注的領(lǐng)域。

同步器的設(shè)計(jì)是基于模版方法模式實(shí)現(xiàn)的,使用者需要繼承同步器并重寫這頂?shù)姆椒?,隨后將同步器組合在自定義同步組件的實(shí)現(xiàn)中,并調(diào)用同步器提供的模版方法,而這些模版方法將會(huì)調(diào)用使用者重寫的方法。

同步器提供的模版方法基本上分為3類:獨(dú)占式獲取鎖與釋放同步狀態(tài)、共享式獲取與釋放同步狀態(tài)和查詢同步隊(duì)列中的等待線程情況。自定義同步組件將使用同步器提供的模版方法來實(shí)現(xiàn)自己的同步語(yǔ)義。倒計(jì)數(shù)器鎖存器是一次性障礙,允許一個(gè)或者多個(gè)線程等待一個(gè)或者多個(gè)其它線程來做某些事情。CountDownLatch的唯一構(gòu)造器帶一個(gè)int類型的參數(shù),這個(gè)int參數(shù)是指允許所有在等待線程被處理之前,必須在鎖存器上調(diào)用countDown方法的次數(shù)。

EG:

package hb.java.thread;
import java.util.concurrent.CountDownLatch;
/**
 * 
 * @author hb
 *     CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數(shù)一次,后者是等待倒數(shù)到0,如果沒有到達(dá)0
 *     ,就只有阻塞等待了。 *JAVA同步器之
 *     CountDownLatch(不能循環(huán)使用,如果需要循環(huán)使用可以考慮使用CyclicBarrier) 兩種比較常規(guī)用法: 1:new
 *     CountDownLatch(1);所有的線程在開始工作前需要做一些準(zhǔn)備工作,當(dāng)所有的線程都準(zhǔn)備到位后再統(tǒng)一執(zhí)行時(shí)有用 2:new
 *     CountDownLatch(THREAD_COUNT);當(dāng)所有的線程都執(zhí)行完畢后,等待這些線程的其他線程才開始繼續(xù)執(zhí)行時(shí)有用
 */
public class CountDownLatchTest {
	private static final int THREAD_COUNT = 10;
	// 在調(diào)用startSingal.countDown()之前調(diào)用了startSingal.await()的線程一律等待,直到startSingal.countDown()的調(diào)用
	private static final CountDownLatch startSingal = new CountDownLatch(1);
	// 在finishedSingal的初始化記數(shù)量通過調(diào)用finishedSingal.countDown()減少為0時(shí)調(diào)用了finishedSingal.await()的線程一直阻塞
	private static final CountDownLatch finishedSingal = new CountDownLatch(
				THREAD_COUNT);
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					System.out.println(Thread.currentThread().getName()
												+ " prepared!!");
					try {
						startSingal.await();
					}
					catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName()
												+ " finished!!");
					finishedSingal.countDown();
				}
				;
			}
			.start();
		}
		Thread.sleep(1000);
		startSingal.countDown();
		// 所有的線程被喚醒,同時(shí)開始工作.countDown 方法的線程等到計(jì)數(shù)到達(dá)零時(shí)才繼續(xù)
		finishedSingal.await();
		// 等待所有的線程完成!!
		System.out.println("All task are finished!!");
	}
}
package hb.java.thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/** 
 * 
 * JAVA同步器之Barrier(能夠循環(huán)使用,當(dāng)計(jì)數(shù)器增加到Barrier的初始化計(jì)數(shù)器之后馬上會(huì)被置為0為下一次循環(huán)使用做準(zhǔn)備) 
 * Barrier能夠?yàn)橹付ǖ囊粋€(gè)或多個(gè)(一般為多個(gè))線程設(shè)置一道屏障,只有當(dāng)所有的線程都到達(dá)該屏障后才能一起沖過該屏障繼續(xù)其他任務(wù) 一般可以new 
 * CyclicBarrier(ThreadCount)來進(jìn)行初始化,也可以new 
 * CyclicBarrier(ThreadCount,RunableAction)當(dāng)初始化數(shù)量的線程都調(diào)用 
 * 了await()方法后觸發(fā)RunableAction線程,也可以通過初始化一個(gè)new 
 * CyclicBarrier(ThreadCount+1)的Barrier在前置線程未執(zhí)行完成時(shí)一直阻塞一個(gè)或多個(gè) 
 * 后續(xù)線程,這一點(diǎn)類似于CountDownLatch 
 */
public class BarrierTest {
	private static final int THREAD_COUNT = 10;
	private static final CyclicBarrier barrier = new CyclicBarrier( 
	      THREAD_COUNT + 1, new Runnable() {
		@Override 
		        public void run() {
			System.out.println("All task are prepared or finished!!");
		}
	}
	);
	public static void main(String[] args) throws InterruptedException, 
	      BrokenBarrierException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					try {
						System.out.println(Thread.currentThread().getName() 
						                + " prepared!!");
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block 
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block 
						e.printStackTrace();
					}
					// do something 
					System.out.println(Thread.currentThread().getName() 
					              + " finished!!");
				}
				;
			}
			.start();
		}
		barrier.await();
		// --------------開始準(zhǔn)備循環(huán)使用-------------- 
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					// do something 
					System.out.println(Thread.currentThread().getName() 
					              + " finished!!");
					try {
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block 
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block 
						e.printStackTrace();
					}
				}
				;
			}
			.start();
		}
		barrier.await();
	}
}
package hb.java.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
	final static Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
	public static void main(String[] args) {
		new Producer("Producer", exchanger).start();
		new Consumer("Consumer", exchanger).start();
	}
	static class Producer extends Thread {
		private Exchanger<List<String>> exchanger;
		/** 
     *  
     */
		public Producer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/* 
     * (non-Javadoc) 
     * 
     * @see java.lang.Thread#run() 
     */
		@Override 
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("product " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("get results from consumer");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block 
				e.printStackTrace();
			}
		}
	}
	static class Consumer extends Thread {
		private Exchanger<List<String>> exchanger;
		/** 
     *  
     */
		public Consumer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/* 
     * (non-Javadoc) 
     * 
     * @see java.lang.Thread#run() 
     */
		@Override 
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("consumed " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("got products from produces");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block 
				e.printStackTrace();
			}
		}
	}
}

總結(jié)

以上就是本文關(guān)于java并發(fā)編程之同步器代碼示例的全部?jī)?nèi)容,希望對(duì)大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站:

深入分析java并發(fā)編程中volatile的實(shí)現(xiàn)原理

Javaweb應(yīng)用使用限流處理大量的并發(fā)請(qǐng)求詳解

java并發(fā)學(xué)習(xí)之BlockingQueue實(shí)現(xiàn)生產(chǎn)者消費(fèi)者詳解

如有不足之處,歡迎留言指出。

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI