溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

多線程并發(fā)編程如何在Java項(xiàng)目中實(shí)現(xiàn)

發(fā)布時(shí)間:2020-11-20 15:46:52 來(lái)源:億速云 閱讀:301 作者:Leah 欄目:編程語(yǔ)言

本篇文章為大家展示了多線程并發(fā)編程如何在Java項(xiàng)目中實(shí)現(xiàn) ,內(nèi)容簡(jiǎn)明扼要并且容易理解,絕對(duì)能使你眼前一亮,通過(guò)這篇文章的詳細(xì)介紹希望你能有所收獲。

一、多線程

1、操作系統(tǒng)有兩個(gè)容易混淆的概念,進(jìn)程和線程。

進(jìn)程:一個(gè)計(jì)算機(jī)程序的運(yùn)行實(shí)例,包含了需要執(zhí)行的指令;有自己的獨(dú)立地址空間,包含程序內(nèi)容和數(shù)據(jù);不同進(jìn)程的地址空間是互相隔離的;進(jìn)程擁有各種資源和狀態(tài)信息,包括打開的文件、子進(jìn)程和信號(hào)處理。

線程:表示程序的執(zhí)行流程,是CPU調(diào)度執(zhí)行的基本單位;線程有自己的程序計(jì)數(shù)器、寄存器、堆棧和幀。同一進(jìn)程中的線程共用相同的地址空間,同時(shí)共享進(jìn)進(jìn)程鎖擁有的內(nèi)存和其他資源。

2、Java標(biāo)準(zhǔn)庫(kù)提供了進(jìn)程和線程相關(guān)的API,進(jìn)程主要包括表示進(jìn)程的java.lang.Process類和創(chuàng)建進(jìn)程的java.lang.ProcessBuilder類;

表示線程的是java.lang.Thread類,在虛擬機(jī)啟動(dòng)之后,通常只有Java類的main方法這個(gè)普通線程運(yùn)行,運(yùn)行時(shí)可以創(chuàng)建和啟動(dòng)新的線程;還有一類守護(hù)線程(damon thread),守護(hù)線程在后臺(tái)運(yùn)行,提供程序運(yùn)行時(shí)所需的服務(wù)。當(dāng)虛擬機(jī)中運(yùn)行的所有線程都是守護(hù)線程時(shí),虛擬機(jī)終止運(yùn)行。

3、線程間的可見性:一個(gè)線程對(duì)進(jìn)程中共享的數(shù)據(jù)的修改,是否對(duì)另一個(gè)線程可見

可見性問(wèn)題:

a、CPU采用時(shí)間片輪轉(zhuǎn)等不同算法來(lái)對(duì)線程進(jìn)行調(diào)度

public class IdGenerator{ 
 private int value = 0; 
 public int getNext(){ 
 return value++; 
 } 
} 

 對(duì)于IdGenerator的getNext()方法,在多線程下不能保證返回值是不重復(fù)的:各個(gè)線程之間相互競(jìng)爭(zhēng)CPU時(shí)間來(lái)獲取運(yùn)行機(jī)會(huì),CPU切換可能發(fā)生在執(zhí)行間隙。

以上代碼getNext()的指令序列:CPU切換可能發(fā)生在7條指令之間,多個(gè)getNext的指令交織在一起。

aload_0 
dup 
getfield #12 
dup_x1 
iconst_1 
iadd 
putfield #12 

b、CPU緩存:

目前CPU一般采用層次結(jié)構(gòu)的多級(jí)緩存的架構(gòu),有的CPU提供了L1、L2和L3三級(jí)緩存。當(dāng)CPU需要讀取主存中某個(gè)位置的數(shù)據(jù)時(shí),會(huì)一次檢查各級(jí)緩存中是否存在對(duì)應(yīng)的數(shù)據(jù)。如果有,直接從緩存中讀取,這比從主存中讀取速度快很多。當(dāng)CPU需要寫入時(shí),數(shù)據(jù)先被寫入緩存中,之后再某個(gè)時(shí)間點(diǎn)寫回主存。所以某些時(shí)間點(diǎn)上,緩存中的數(shù)據(jù)與主存中的數(shù)據(jù)可能是不一致。

c、指令順序重排

出行性能考慮,編譯器在編譯時(shí)可能會(huì)對(duì)字節(jié)代碼的指令順序進(jìn)行重新排列,以優(yōu)化指令的執(zhí)行順序,在單線程中不會(huì)有問(wèn)題,但在多線程可能產(chǎn)生與可見性相關(guān)的問(wèn)題。

二、Java內(nèi)存模型(Java Memory Model)

屏蔽了CPU緩存等細(xì)節(jié),只關(guān)注主存中的共享變量;關(guān)注對(duì)象的實(shí)例域、靜態(tài)域和數(shù)組元素;關(guān)注線程間的動(dòng)作。

1、volatile關(guān)鍵詞:用來(lái)對(duì)共享變量的訪問(wèn)進(jìn)行同步,上一次寫入操作的結(jié)果對(duì)下一次讀取操作是肯定可見的。(在寫入volatile變量值之后,CPU緩存中的內(nèi)容會(huì)被寫回內(nèi)存;在讀取volatile變量時(shí),CPU緩存中的對(duì)應(yīng)內(nèi)容會(huì)被置為失效,重新從主存中進(jìn)行讀取),volatile不使用鎖,性能優(yōu)于synchronized關(guān)鍵詞。

用來(lái)確保對(duì)一個(gè)變量的修改被正確地傳播到其他線程中。

例子:A線程是Worker,一直跑循環(huán),B線程調(diào)用setDone(true),A線程即停止任務(wù)

public class Worker{ 
 private volatile boolean done; 
 public void setDone(boolean done){ 
 this.done = done; 
 } 
 public void work(){ 
 while(!done){ 
  //執(zhí)行任務(wù); 
 } 
 } 
} 

例子:錯(cuò)誤使用。因?yàn)闆](méi)有鎖的支持,volatile的修改不能依賴于當(dāng)前值,當(dāng)前值可能在其他線程中被修改。(Worker是直接賦新值與當(dāng)前值無(wú)關(guān))

public class Counter { 
 public volatile static int count = 0; 
 public static void inc() { 
 //這里延遲1毫秒,使得結(jié)果明顯 
 try { 
  Thread.sleep(1); 
 } catch (InterruptedException e) { 
 } 
 count++; 
 } 
 public static void main(String[] args) { 
 //同時(shí)啟動(dòng)1000個(gè)線程,去進(jìn)行i++計(jì)算,看看實(shí)際結(jié)果 
 for (int i = 0; i < 1000; i++) { 
  new Thread(new Runnable() { 
  @Override 
  public void run() { 
   Counter.inc(); 
  } 
  }).start(); 
 } 
 //這里每次運(yùn)行的值都有可能不同,可能不為1000 
 System.out.println("運(yùn)行結(jié)果:Counter.count=" + Counter.count); 
 } 
} 

2、final關(guān)鍵詞

final關(guān)鍵詞聲明的域的值只能被初始化一次,一般在構(gòu)造方法中初始化。。(在多線程開發(fā)中,final域通常用來(lái)實(shí)現(xiàn)不可變對(duì)象)

當(dāng)對(duì)象中的共享變量的值不可能發(fā)生變化時(shí),在多線程中也就不需要同步機(jī)制來(lái)進(jìn)行處理,故在多線程開發(fā)中應(yīng)盡可能使用不可變對(duì)象。

另外,在代碼執(zhí)行時(shí),final域的值可以被保存在寄存器中,而不用從主存中頻繁重新讀取。

3、java基本類型的原子操作

1)基本類型,引用類型的復(fù)制引用是原子操作;(即一條指令完成)

2)long與double的賦值,引用是可以分割的,非原子操作;

3)要在線程間共享long或double的字段時(shí),必須在synchronized中操作,或是聲明成volatile

三、Java提供的線程同步方式

1、synchronized關(guān)鍵字

方法或代碼塊的互斥性來(lái)完成實(shí)際上的一個(gè)原子操作。(方法或代碼塊在被一個(gè)線程調(diào)用時(shí),其他線程處于等待狀態(tài))

所有的Java對(duì)象都有一個(gè)與synchronzied關(guān)聯(lián)的監(jiān)視器對(duì)象(monitor),允許線程在該監(jiān)視器對(duì)象上進(jìn)行加鎖和解鎖操作。

a、靜態(tài)方法:Java類對(duì)應(yīng)的Class類的對(duì)象所關(guān)聯(lián)的監(jiān)視器對(duì)象。

b、實(shí)例方法:當(dāng)前對(duì)象實(shí)例所關(guān)聯(lián)的監(jiān)視器對(duì)象。

c、代碼塊:代碼塊聲明中的對(duì)象所關(guān)聯(lián)的監(jiān)視器對(duì)象。

注:當(dāng)鎖被釋放,對(duì)共享變量的修改會(huì)寫入主存;當(dāng)活得鎖,CPU緩存中的內(nèi)容被置為無(wú)效。編譯器在處理synchronized方法或代碼塊,不會(huì)把其中包含的代碼移動(dòng)到synchronized方法或代碼塊之外,從而避免了由于代碼重排而造成的問(wèn)題。

例:以下方法getNext()和getNextV2() 都獲得了當(dāng)前實(shí)例所關(guān)聯(lián)的監(jiān)視器對(duì)象

public class SynchronizedIdGenerator{ 
 private int value = 0; 
 public synchronized int getNext(){ 
 return value++; 
 } 
 public int getNextV2(){ 
 synchronized(this){ 
  return value++; 
 } 
 } 
} 

2、Object類的wait、notify和notifyAll方法

生產(chǎn)者和消費(fèi)者模式,判斷緩沖區(qū)是否滿來(lái)消費(fèi),緩沖區(qū)是否空來(lái)生產(chǎn)的邏輯。如果用while 和 volatile也可以做,不過(guò)本質(zhì)上會(huì)讓線程處于忙等待,占用CPU時(shí)間,對(duì)性能造成影響。

wait: 將當(dāng)前線程放入,該對(duì)象的等待池中,線程A調(diào)用了B對(duì)象的wait()方法,線程A進(jìn)入B對(duì)象的等待池,并且釋放B的鎖。(這里,線程A必須持有B的鎖,所以調(diào)用的代碼必須在synchronized修飾下,否則直接拋出java.lang.IllegalMonitorStateException異常)。

notify:將該對(duì)象中等待池中的線程,隨機(jī)選取一個(gè)放入對(duì)象的鎖池,當(dāng)當(dāng)前線程結(jié)束后釋放掉鎖, 鎖池中的線程即可競(jìng)爭(zhēng)對(duì)象的鎖來(lái)獲得執(zhí)行機(jī)會(huì)。

notifyAll:將對(duì)象中等待池中的線程,全部放入鎖池。

(notify鎖喚醒的線程選擇由虛擬機(jī)實(shí)現(xiàn)來(lái)決定,不能保證一個(gè)對(duì)象鎖關(guān)聯(lián)的等待集合中的線程按照所期望的順序被喚醒,很可能一個(gè)線程被喚醒之后,發(fā)現(xiàn)他所要求的條件并沒(méi)有滿足,而重新進(jìn)入等待池。因?yàn)楫?dāng)?shù)却刂邪鄠€(gè)線程時(shí),一般使用notifyAll方法,不過(guò)該方法會(huì)導(dǎo)致線程在沒(méi)有必要的情況下被喚醒,之后又馬上進(jìn)入等待池,對(duì)性能有影響,不過(guò)能保證程序的正確性)

工作流程:

a、Consumer線程A 來(lái) 看產(chǎn)品,發(fā)現(xiàn)產(chǎn)品為空,調(diào)用產(chǎn)品對(duì)象的wait(),線程A進(jìn)入產(chǎn)品對(duì)象的等待池并釋放產(chǎn)品的鎖。

b、Producer線程B獲得產(chǎn)品的鎖,執(zhí)行產(chǎn)品的notifyAll(),Consumer線程A從產(chǎn)品的等待池進(jìn)入鎖池,Producer線程B生產(chǎn)產(chǎn)品,然后退出釋放鎖。

c、Consumer線程A獲得產(chǎn)品鎖,進(jìn)入執(zhí)行,發(fā)現(xiàn)有產(chǎn)品,消費(fèi)產(chǎn)品,然后退出。

例子:

public synchronized String pop(){ 
 this.notifyAll();// 喚醒對(duì)象等待池中的所有線程,可能喚醒的就是 生產(chǎn)者(當(dāng)生產(chǎn)者發(fā)現(xiàn)產(chǎn)品滿,就會(huì)進(jìn)入對(duì)象的等待池,這里代碼省略,基本略同) 
 while(index == -1){//如果發(fā)現(xiàn)沒(méi)產(chǎn)品,就釋放鎖,進(jìn)入對(duì)象等待池 
 this.wait(); 
 }//當(dāng)生產(chǎn)者生產(chǎn)完后,消費(fèi)者從this.wait()方法再開始執(zhí)行,第一次還會(huì)執(zhí)行循環(huán),萬(wàn)一產(chǎn)品還是為空,則再等待,所以這里必須用while循環(huán),不能用if 
 String good = buffer[index]; 
 buffer[index] = null; 
 index--; 
 return good;// 消費(fèi)完產(chǎn)品,退出。 
} 

注:wait()方法有超時(shí)和不超時(shí)之分,超時(shí)的在經(jīng)過(guò)一段時(shí)間,線程還在對(duì)象的等待池中,那么線程也會(huì)推出等待狀態(tài)。

3、線程狀態(tài)轉(zhuǎn)換:

已經(jīng)廢棄的方法:stop、suspend、resume、destroy,這些方法在實(shí)現(xiàn)上時(shí)不安全的。

線程的狀態(tài):NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超時(shí)的等待)、TERMINATED。

a、方法sleep()進(jìn)入的阻塞狀態(tài),不會(huì)釋放對(duì)象的鎖(即大家一起睡,誰(shuí)也別想執(zhí)行代碼),所以不要讓sleep方法處在synchronized方法或代碼塊中,否則造成其他等待獲取鎖的線程長(zhǎng)時(shí)間處于等待。

b、方法join()則是主線程等待子線程完成,再往下執(zhí)行。例如main方法新建兩個(gè)線程A和B

public static void main(String[] args) throws InterruptedException { 
Thread t1 = new Thread(new ThreadTesterA()); 
Thread t2 = new Thread(new ThreadTesterB()); 
t1.start(); 
t1.join(); // 等t1執(zhí)行完再往下執(zhí)行 
t2.start(); 
t2.join(); // 在虛擬機(jī)執(zhí)行中,這句可能被忽略 
}

c、方法interrupt(),向被調(diào)用的對(duì)象線程發(fā)起中斷請(qǐng)求。如線程A通過(guò)調(diào)用線程B的d的interrupt方法來(lái)發(fā)出中斷請(qǐng)求,線程B來(lái)處理這個(gè)請(qǐng)求,當(dāng)然也可以忽略,這不是必須的。Object類的wait()、Thread類的join()和sleep方法都會(huì)拋出受檢異常java.lang.InterruptedException,通過(guò)interrupt方法中斷該線程會(huì)導(dǎo)致線程離開等待狀態(tài)。對(duì)于wait()調(diào)用來(lái)說(shuō),線程需要重新獲取監(jiān)視器對(duì)象上的鎖之后才能拋出InterruptedException異常,并致以異常的處理邏輯。
可以通過(guò)Thread類的isInterrupted方法來(lái)判斷是否有中斷請(qǐng)求發(fā)生,通??梢岳眠@個(gè)方法來(lái)判斷是否退出線程(類似上面的volatitle修飾符的例子);

Thread類還有個(gè)方法Interrupted(),該方法不但可以判斷當(dāng)前線程是否被中斷,還會(huì)清楚線程內(nèi)部的中斷標(biāo)記,如果返回true,即曾被請(qǐng)求中斷,同時(shí)調(diào)用完后,清除中斷標(biāo)記。

如果一個(gè)線程在某個(gè)對(duì)象的等待池,那么notify和interrupt 都可以使該線程從等待池中被移除。如果同時(shí)發(fā)生,那么看實(shí)際發(fā)生順序。如果是notify先,那照常喚醒,沒(méi)影響。如果是interrupt先,并且虛擬機(jī)選擇讓該線程中斷,那么即使nofity,也會(huì)忽略該線程,而喚醒等待池中的另一個(gè)線程。

e、yield(),嘗試讓出所占有的CPU資源,讓其他線程獲取運(yùn)行機(jī)會(huì),對(duì)操作系統(tǒng)上的調(diào)度器來(lái)說(shuō)是一個(gè)信號(hào),不一定立即切換線程。(在實(shí)際開發(fā)中,測(cè)試階段頻繁調(diào)用yeid方法使線程切換更頻繁,從而讓一些多線程相關(guān)的錯(cuò)誤更容易暴露出來(lái))。

多線程并發(fā)編程如何在Java項(xiàng)目中實(shí)現(xiàn)

四、非阻塞方式

線程之間同步機(jī)制的核心是監(jiān)視對(duì)象上的鎖,競(jìng)爭(zhēng)鎖來(lái)獲得執(zhí)行代碼的機(jī)會(huì)。當(dāng)一個(gè)對(duì)象獲取對(duì)象的鎖,然后其他嘗試獲取鎖的對(duì)象會(huì)處于等待狀態(tài),這種鎖機(jī)制的實(shí)現(xiàn)方式很大程度限制了多線程程序的吞吐量和性能(線程阻塞),且會(huì)帶來(lái)死鎖(線程A有a對(duì)象鎖,等著獲取b對(duì)象鎖,線程B有b對(duì)象鎖,等待獲取a對(duì)象鎖)和優(yōu)先級(jí)倒置(優(yōu)先級(jí)低的線程獲得鎖,優(yōu)先級(jí)高的只能等待對(duì)方釋放鎖)等問(wèn)題。

如果能不阻塞線程,又能保證多線程程序的正確性,就能有更好的性能。

在程序中,對(duì)共享變量的使用一般遵循一定的模式,即讀取、修改和寫入三步組成。之前碰到的問(wèn)題是,這三步執(zhí)行中可能線程執(zhí)行切換,造成非原子操作。鎖機(jī)制是把這三步變成一個(gè)原子操作。

目前CPU本身實(shí)現(xiàn) 將這三步 合起來(lái) 形成一個(gè)原子操作,無(wú)需線程鎖機(jī)制干預(yù),常見的指令是“比較和替換”(compare and swap,CAS),這個(gè)指令會(huì)先比較某個(gè)內(nèi)存地址的當(dāng)前值是不是指定的舊指,如果是,就用新值替換,否則什么也不做,指令返回的結(jié)果是內(nèi)存地址的當(dāng)前值。通過(guò)CAS指令可以實(shí)現(xiàn)不依賴鎖機(jī)制的非阻塞算法。一般做法是把CAS指令的調(diào)用放在一個(gè)無(wú)限循環(huán)中,不斷嘗試,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是所有CPU都支持CAS,在某些平臺(tái),java.util.concurrent.atomic的實(shí)現(xiàn)仍然是鎖機(jī)制)

atomic包中提供的Java類分成三類:

1、支持以原子操作來(lái)進(jìn)行更新的數(shù)據(jù)類型的Java類(AtomicBoolean、AtomicInteger、AtomicReference),在內(nèi)存模型相關(guān)的語(yǔ)義上,這四個(gè)類的對(duì)象類似于volatile變量。

類中的常用方法:

a、compareAndSet:接受兩個(gè)參數(shù),一個(gè)是期望的舊值,一個(gè)是替換的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式讀取和有條件地寫入變量但不創(chuàng)建任何 happen-before 排序,但在源代碼中和compareAndSet完全一樣,所以并沒(méi)有按JSR實(shí)現(xiàn))

c、get和set:分別用來(lái)直接獲取和設(shè)置變量的值。

d、lazySet:與set類似,但允許編譯器把lazySet方法的調(diào)用與后面的指令進(jìn)行重排,因此對(duì)值得設(shè)置操作有可能被推遲。

例:

public class AtomicIdGenerator{ 
 private final AtomicInter counter = new AtomicInteger(0); 
 public int getNext(){ 
 return counter.getAndIncrement(); 
 } 
} 
// getAndIncrement方法的內(nèi)部實(shí)現(xiàn)方式,這也是CAS方法的一般模式,CAS方法不一定成功,所以包裝在一個(gè)無(wú)限循環(huán)中,直到成功 
public final int getAndIncrement(){ 
 for(;;){ 
 int current = get(); 
 int next = current +1; 
 if(compareAndSet(current,next)) 
  return current; 
 } 
} 

2、提供對(duì)數(shù)組類型的變量進(jìn)行處理的Java類,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray類。(同上,只是放在類數(shù)組里,調(diào)用時(shí)也只是多了一個(gè)操作元素索引的參數(shù))

3、通過(guò)反射的方式對(duì)任何對(duì)象中包含的volatitle變量使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他們提供了一種方式把CAS的功能擴(kuò)展到了任何Java類中聲明為volatitle的域上。(靈活,但語(yǔ)義較弱,因?yàn)閷?duì)象的volatitle可能被非atomic的其他方式被修改)

public class TreeNode{ 
 private volatile TreeNode parent; 
// 靜態(tài)工廠方法 
 private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent"); 
public boolean compareAndSetParent(TreeNode expect, TreeNode update){ 
 return parentUpdater.compareAndSet(this, expect, update); 
} 
} 

注:java.util.concurrent.atomic包中的Java類屬于比較底層的實(shí)現(xiàn),一般作為java.util.concurrent包中很多非阻塞的數(shù)據(jù)結(jié)構(gòu)的實(shí)現(xiàn)基礎(chǔ)。

比較多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在實(shí)現(xiàn)線程安全的計(jì)數(shù)器時(shí),AtomicInteger和AtomicLong類時(shí)最佳的選擇。

五、高級(jí)同步機(jī)制(比synchronized更靈活的加鎖機(jī)制)

synchronized和volatile,以及wait、notify等方法抽象層次低,在程序開發(fā)中使用比較繁瑣,易出錯(cuò)。

而多線程之間的交互來(lái)說(shuō),存在某些固定的模式,如生產(chǎn)者-消費(fèi)者和讀者-寫者模式,把這些模式抽象成高層API,使用起來(lái)會(huì)非常方便。

java.util.concurrent包為多線程提供了高層的API,滿足日常開發(fā)中的常見需求。

常用接口

1、Lock接口,表示一個(gè)鎖方法:

a、lock(),獲取所,如果無(wú)法獲取所鎖,會(huì)處于等待狀態(tài)

b、unlock(),釋放鎖。(一般放在finally代碼塊中)

c、lockInterruptibly(),與lock()類似,但允許當(dāng)前線程在等待獲取鎖的過(guò)程中被中斷。(所以要處理InterruptedException)

d、tryLock(),以非阻塞方式獲取鎖,如果無(wú)法獲取鎖,則返回false。(tryLock()的另一個(gè)重載可以指定超時(shí),如果指定超時(shí),當(dāng)無(wú)法獲取鎖,會(huì)等待而阻塞,同時(shí)線程可以被中斷)

2、ReadWriteLock接口,表示兩個(gè)鎖,讀取的共享鎖和寫入的排他鎖。(適合常見的讀者--寫者場(chǎng)景)

ReadWriteLock接口的readLock和writeLock方法來(lái)獲取對(duì)應(yīng)的鎖的Lock接口的實(shí)現(xiàn)。

在多數(shù)線程讀取,少數(shù)線程寫入的情況下,可以提高多線程的性能,提高使用該數(shù)據(jù)結(jié)構(gòu)的吞吐量。

如果是相反的情況,較多的線程寫入,則接口會(huì)降低性能。

3、ReentrantLock類和ReentrantReadWriteLock,分別為上面兩個(gè)接口的實(shí)現(xiàn)類。

他們具有重入性:即允許一個(gè)線程多次獲取同一個(gè)鎖(他們會(huì)記住上次獲取鎖并且未釋放的線程對(duì)象,和加鎖的次數(shù),getHoldCount())

同一個(gè)線程每次獲取鎖,加鎖數(shù)+1,每次釋放鎖,加鎖數(shù)-1,到0,則該鎖被釋放,可以被其他線程獲取。

public class LockIdGenrator{ 
//new ReentrantLock(true)是重載,使用更加公平的加鎖機(jī)制,在鎖被釋放后,會(huì)優(yōu)先給等待時(shí)間最長(zhǎng)的線程,避免一些線程長(zhǎng)期無(wú)法獲得鎖 
 private int ReentrantLock lock = ReentrantLock(); 
 privafte int value = 0; 
 public int getNext(){ 
 lock.lock(); //進(jìn)來(lái)就加鎖,沒(méi)有鎖會(huì)等待 
 try{ 
  return value++;//實(shí)際操作 
 }finally{ 
  lock.unlock();//釋放鎖 
 } 
 } 
} 

注:重入性減少了鎖在各個(gè)線程之間的等待,例如便利一個(gè)HashMap,每次next()之前加鎖,之后釋放,可以保證一個(gè)線程一口氣完成便利,而不會(huì)每次next()之后釋放鎖,然后和其他線程競(jìng)爭(zhēng),降低了加鎖的代價(jià), 提供了程序整體的吞吐量。(即,讓一個(gè)線程一口氣完成任務(wù),再把鎖傳遞給其他線程)。

4、Condition接口,Lock接口代替了synchronized,Condition接口替代了object的wait、nofity。

a、await(),使當(dāng)前線程進(jìn)入等待狀態(tài),知道被喚醒或中斷。重載形式可以指定超時(shí)時(shí)間。

b、awaitNanos(),以納秒為單位等待。
c、awaitUntil(),指定超時(shí)發(fā)生的時(shí)間點(diǎn),而不是經(jīng)過(guò)的時(shí)間,參數(shù)為java.util.Date。

d、awaitUninterruptibly(),前面幾種會(huì)響應(yīng)其他線程發(fā)出的中斷請(qǐng)求,他會(huì)無(wú)視,直到被喚醒。

注:與Object類的wait()相同,await()會(huì)釋放其所持有的鎖。

e、signal()和signalAll, 相當(dāng)于 notify和notifyAll

Lock lock = new ReentrantLock(); 
Condition condition = lock.newCondition(); 
lock.lock(); 
try{ 
 while(/*邏輯條件不滿足*/){ 
 condition.await(); 
 } 
}finally{ 
 lock.unlock(); 
}

六、底層同步器

多線程程序中,線程之間存在多種不同的同步方式。除了Java標(biāo)準(zhǔn)庫(kù)提供的同步方式之外,程序中特有的同步方式需要由開發(fā)人員自己來(lái)實(shí)現(xiàn)。

常見的一種需求是 對(duì)有限個(gè)共享資源的訪問(wèn),比如多臺(tái)個(gè)人電腦,2臺(tái)打印機(jī),當(dāng)多個(gè)線程在等待同一個(gè)資源時(shí),從公平角度出發(fā),會(huì)用FIFO隊(duì)列。

  如果程序中的同步方式可以抽象成對(duì)有限個(gè)資源的訪問(wèn),那么可以使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer類和AbstractQueuedLongSynchronizer類作為實(shí)現(xiàn)的基礎(chǔ),前者用int類型的變量來(lái)維護(hù)內(nèi)部狀態(tài),而后者用long類型。(可以將這個(gè)變量理解為共享資源個(gè)數(shù))

通過(guò)getState、setState、和compareAndSetState3個(gè)方法更新內(nèi)部變量的值。

AbstractQueuedSynchronizer類是abstract的,需要覆蓋其中包含的部分方法,通常做法是把其作為一個(gè)Java類的內(nèi)部類,外部類提供具體的同步方式,內(nèi)部類則作為實(shí)現(xiàn)的基礎(chǔ)。有兩種模式,排他模式和共享模式,分別對(duì)應(yīng)方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在這些方法中,使用getState、setState、compareAndSetState3個(gè)方法來(lái)修改內(nèi)部變量的值,以此來(lái)反應(yīng)資源的狀態(tài)。

public class SimpleResourceManager{ 
 private final InnerSynchronizer synchronizer; 
 private static class InnerSynchronizer extends AbstractQueuedSynchronizer{ 
 InnerSynchronizer(int numOfResources){ 
  setState(numOfResources); 
 } 
 protected int tryAcquireShared(int acquires){ 
  for(;;){ 
  int available = getState(); 
  int remain = available - acquires; 
  if(remain <0 || comapreAndSetState(available, remain){ 
  return remain; 
  } 
  } 
 } 
 protected boolean try ReleaseShared(int releases){ 
  for(;;){ 
  int available = getState(); 
  int next = available + releases; 
  if(compareAndSetState(available,next){ 
  return true; 
  } 
  } 
 } 
 } 
 public SimpleResourceManager(int numOfResources){ 
 synchronizer = new InnerSynchronizer(numOfResources); 
 } 
 public void acquire() throws InterruptedException{ 
 synchronizer.acquireSharedInterruptibly(1); 
 } 
 pubic void release(){ 
 synchronizer.releaseShared(1); 
 } 
} 

七、高級(jí)同步對(duì)象(提高開發(fā)效率)

atomic和locks包提供的Java類可以滿足基本的互斥和同步訪問(wèn)的需求,但這些Java類的抽象層次較低,使用比較復(fù)雜。
更簡(jiǎn)單的做法是使用java.util.concurrent包中的高級(jí)同步對(duì)象。

1、信號(hào)量。

信號(hào)量一般用來(lái)數(shù)量有限的資源,每類資源有一個(gè)對(duì)象的信號(hào)量,信號(hào)量的值表示資源的可用數(shù)量。

在使用資源時(shí),需要從該信號(hào)量上獲取許可,成功獲取許可,資源的可用數(shù)-1;完成對(duì)資源的使用,釋放許可,資源可用數(shù)+1; 當(dāng)資源數(shù)為0時(shí),需要獲取資源的線程以阻塞的方式來(lái)等待資源,或過(guò)段時(shí)間之后再來(lái)檢查資源是否可用。(上面的SimpleResourceManager類實(shí)際上時(shí)信號(hào)量的一個(gè)簡(jiǎn)單實(shí)現(xiàn))

java.util.concurrent.Semaphore類,在創(chuàng)建Semaphore類的對(duì)象時(shí)指定資源的可用數(shù)

a、acquire(),以阻塞方式獲取許可

b、tryAcquire(),以非阻塞方式獲取許可

c、release(),釋放許可。

d、accquireUninterruptibly(),accquire()方法獲取許可以的過(guò)程可以被中斷,如果不希望被中斷,使用此方法。

public class PrinterManager{ 
 private final Semphore semaphore; 
 private final List<Printer> printers = new ArrayList<>(): 
 public PrinterManager(Collection<&#63; extends Printer> printers){ 
  this.printers.addAll(printers); 
  //這里重載方法,第二個(gè)參數(shù)為true,以公平競(jìng)爭(zhēng)模式,防止線程饑餓 
  this.semaphore = new Semaphore(this.printers.size(),true); 
 } 
 public Printer acquirePrinter() throws InterruptedException{ 
  semaphore.acquire(); 
  return getAvailablePrinter(); 
 } 
 public void releasePrinter(Printer printer){ 
  putBackPrinter(pinter); 
  semaphore.release(); 
 } 
 private synchronized Printer getAvailablePrinter(){ 
  printer result = printers.get(0); 
  printers.remove(0); 
  return result; 
 } 
 private synchronized void putBackPrinter(Printer printer){ 
  printers.add(printer); 
 } 
} 

2、倒數(shù)閘門

多線程協(xié)作時(shí),一個(gè)線程等待另外的線程完成任務(wù)才能繼續(xù)進(jìn)行。

java.util.concurrent.CountDownLatch類,創(chuàng)建該類時(shí),指定等待完成的任務(wù)數(shù);當(dāng)一個(gè)任務(wù)完成,調(diào)用countDonw(),任務(wù)數(shù)-1。等待任務(wù)完成的線程通過(guò)await(),進(jìn)入阻塞狀態(tài),直到任務(wù)數(shù)量為0。CountDownLatch類為一次性,一旦任務(wù)數(shù)為0,再調(diào)用await()不再阻塞當(dāng)前線程,直接返回。

例:

public class PageSizeSorter{ 
 // 并發(fā)性能遠(yuǎn)遠(yuǎn)優(yōu)于HashTable的 Map實(shí)現(xiàn),hashTable做任何操作都需要獲得鎖,同一時(shí)間只有有個(gè)線程能使用,而ConcurrentHashMap是分段加鎖,不同線程訪問(wèn)不同的數(shù)據(jù)段,完全不受影響,忘記HashTable吧。 
 private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>(); 
 private static class GetSizeWorker implements Runnable{ 
  private final String urlString; 
  public GetSizeWorker(String urlString , CountDownLatch signal){ 
   this.urlString = urlStirng; 
   this.signal = signal; 
  } 
  public void run(){ 
   try{ 
   InputStream is = new URL(urlString).openStream(); 
   int size = IOUtils.toByteArray(is).length; 
   sizeMap.put(urlString, size); 
   }catch(IOException e){ 
   sizeMap.put(urlString, -1); 
   }finally{ 
   signal.countDown()://完成一個(gè)任務(wù) , 任務(wù)數(shù)-1 
   } 
  } 
 } 
 private void sort(){ 
  List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet()); 
  Collections.slort(list, new Comparator<Entry<String,Integer>>(){ 
   public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2){ 
   return Integer.compare(o2.getValue(),o1.getValue()); 
  }; 
  System.out.println(Arrays.deepToString(list.toArray())); 
 } 
 public void sortPageSize(Collection<String> urls) throws InterruptedException{ 
  CountDownLatch sortSignal = new CountDownLatch(urls.size()); 
  for(String url: urls){ 
   new Thread(new GetSizeWorker(url, sortSignal)).start(); 
  } 
  sortSignal.await()://主線程在這里等待,任務(wù)數(shù)歸0,則繼續(xù)執(zhí)行 
  sort(); 
 } 
} 

3、循環(huán)屏障

循環(huán)屏障在作用上類似倒數(shù)閘門,不過(guò)他不像倒數(shù)閘門是一次性的,可以循環(huán)使用。另外,線程之間是互相平等的,彼此都需要等待對(duì)方完成,當(dāng)一個(gè)線程完成自己的任務(wù)之后,等待其他線程完成。當(dāng)所有線程都完成任務(wù)之后,所有線程才可以繼續(xù)運(yùn)行。

當(dāng)線程之間需要再次進(jìn)行互相等待時(shí),可以復(fù)用同一個(gè)循環(huán)屏障。

類java.uti.concurrent.CyclicBarrier用來(lái)表示循環(huán)屏障,創(chuàng)建時(shí)指定使用該對(duì)象的線程數(shù)目,還可以指定一個(gè)Runnable接口的對(duì)象作為每次循環(huán)后執(zhí)行的動(dòng)作。(當(dāng)最后一個(gè)線程完成任務(wù)之后,所有線程繼續(xù)執(zhí)行之前,被執(zhí)行。如果線程之間需要更新一些共享的內(nèi)部狀態(tài),可以利用這個(gè)Runnalbe接口的對(duì)象來(lái)處理)。

每個(gè)線程任務(wù)完成之后,通過(guò)調(diào)用await方法進(jìn)行等待,當(dāng)所有線程都調(diào)用await方法之后,處于等待狀態(tài)的線程都可以繼續(xù)執(zhí)行。在所有線程中,只要有一個(gè)在等待中被中斷,超時(shí)或是其他錯(cuò)誤,整個(gè)循環(huán)屏障會(huì)失敗,所有等待中的其他線程拋出java.uti.concurrent.BrokenBarrierException。

例:每個(gè)線程負(fù)責(zé)找一個(gè)數(shù)字區(qū)間的質(zhì)數(shù),當(dāng)所有線程完成后,如果質(zhì)數(shù)數(shù)目不夠,繼續(xù)擴(kuò)大范圍查找

public class PrimeNumber{ 
 private static final int TOTAL_COUTN = 5000; 
 private static final int RANGE_LENGTH= 200; 
 private static final int WORKER_NUMBER = 5; 
 private static volatitle boolean done = false; 
 private static int rangeCount = 0; 
 private static final List<Long> results = new ArrayList<Long>(): 
 private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable(){ 
  public void run(){ 
   if(results.size() >= TOTAL_COUNT){ 
   done = true; 
   } 
  } 
 }); 
 private static class PrimeFinder implements Runnable{ 
  public void run(){ 
   while(!done){// 整個(gè)過(guò)程在一個(gè) while循環(huán)下,await()等待,下次循環(huán)開始,會(huì)再次判斷 執(zhí)行條件 
   int range = getNextRange(); 
   long start = rang * RANGE_LENGTH; 
   long end = (range + 1) * RANGE_LENGTH; 
   for(long i = start; i<end;i++){ 
    if(isPrime(i)){ 
     updateResult(i); 
    } 
   } 
   try{ 
    barrier.await(); 
   }catch (InterruptedException | BokenBarrierException e){ 
    done = true; 
   } 
   } 
  } 
 } 
 private synchronized static void updateResult(long value){ 
  results.add(value); 
 } 
 private synchronized static int getNextRange(){ 
  return rangeCount++; 
 } 
 private static boolean isPrime(long number){ 
  //找質(zhì)數(shù)的代碼 
 } 
 public void calculate(){ 
  for(int i=0;i<WORKER_NUMBER;i++){ 
   new Thread(new PrimeFinder()).start(); 
  } 
  while(!done){ 
 
  } 
  //計(jì)算完成 
 } 
} 

4、對(duì)象交換器

適合于兩個(gè)線程需要進(jìn)行數(shù)據(jù)交換的場(chǎng)景。(一個(gè)線程完成后,把結(jié)果交給另一個(gè)線程繼續(xù)處理)
java.util.concurrent.Exchanger類,提供了這種對(duì)象交換能力,兩個(gè)線程共享一個(gè)Exchanger類的對(duì)象,一個(gè)線程完成對(duì)數(shù)據(jù)的處理之后,調(diào)用Exchanger類的exchange()方法把處理之后的數(shù)據(jù)作為參數(shù)發(fā)送給另外一個(gè)線程。而exchange方法的返回結(jié)果是另外一個(gè)線程鎖提供的相同類型的對(duì)象。如果另外一個(gè)線程未完成對(duì)數(shù)據(jù)的處理,那么exchange()會(huì)使當(dāng)前線程進(jìn)入等待狀態(tài),直到另外一個(gè)線程也調(diào)用了exchange方法來(lái)進(jìn)行數(shù)據(jù)交換。

例:

public class SendAndReceiver{ 
 private final Exchanger<StringBuilder> exchanger = new Exchanger<StringBuilder>(); 
 private class Sender implements Runnable{ 
  public void run(){ 
   try{ 
   StringBuilder content = new StringBuilder("Hello"); 
   content = exchanger.exchange(content); 
   }catch(InterruptedException e){ 
   Thread.currentThread().interrupt(); 
   } 
  } 
 } 
 private class Receiver implements Runnable{ 
  public void run(){ 
   try{ 
   StringBuilder content = new StringBuilder("World"); 
   content = exchanger.exchange(content); 
   }catch(InterruptedException e){ 
   Thread.currentThread().interrupt(); 
   } 
  } 
 } 
 public void exchange(){ 
  new Thread(new Sender()).start(); 
  new Thread(new Receiver()).start(); 
 } 
} 

八、數(shù)據(jù)結(jié)構(gòu)(多線程程序使用的高性能數(shù)據(jù)結(jié)構(gòu))

java.util.concurrent包中提供了一些適合多線程程序使用的高性能數(shù)據(jù)結(jié)構(gòu),包括隊(duì)列和集合類對(duì)象等。

1、隊(duì)列

a、BlockingQueue接口:線程安全的阻塞式隊(duì)列;當(dāng)隊(duì)列已滿時(shí),想隊(duì)列添加會(huì)阻塞;當(dāng)隊(duì)列空時(shí),取數(shù)據(jù)會(huì)阻塞。(非常適合消費(fèi)者-生產(chǎn)者模式)

阻塞方式:put()、take()。

非阻塞方式:offer()、poll()。

實(shí)現(xiàn)類:基于數(shù)組的固定元素個(gè)數(shù)的ArrayBolockingQueue和基于鏈表結(jié)構(gòu)的不固定元素個(gè)數(shù)的LinkedBlockQueue類。

b、BlockingDeque接口: 與BlockingQueue相似,但可以對(duì)頭尾進(jìn)行添加和刪除操作的雙向隊(duì)列;方法分為兩類,分別在隊(duì)首和對(duì)尾進(jìn)行操作。

實(shí)現(xiàn)類:標(biāo)準(zhǔn)庫(kù)值提供了一個(gè)基于鏈表的實(shí)現(xiàn),LinkedBlockgingDeque。

2、集合類

在多線程程序中,如果共享變量時(shí)集合類的對(duì)象,則不適合直接使用java.util包中的集合類。這些類要么不是線程安全,要么在多線程下性能比較差。

應(yīng)該使用java.util.concurrent包中的集合類。

a、ConcurrentMap接口: 繼承自java.util.Map接口

putIfAbsent():只有在散列表不包含給定鍵時(shí),才會(huì)把給定的值放入。

remove():刪除條目。

replace(key,value):把value 替換到給定的key上。

replace(key, oldvalue, newvalue):CAS的實(shí)現(xiàn)。

實(shí)現(xiàn)類:ConcurrentHashMap:

創(chuàng)建時(shí),如果可以預(yù)估可能包含的條目個(gè)數(shù),可以優(yōu)化性能。(因?yàn)閯?dòng)態(tài)調(diào)整所能包含的數(shù)目操作比較耗時(shí),這個(gè)

HashMap也一樣,只是多線程下更耗時(shí))。創(chuàng)建時(shí),預(yù)估進(jìn)行更新操作的線程數(shù),這樣實(shí)現(xiàn)中會(huì)根據(jù)這個(gè)數(shù)把內(nèi)部空間劃分為對(duì)應(yīng)數(shù)量的部分。(默認(rèn)是16,如果只有一個(gè)線程進(jìn)行寫操作,其他都是讀取,那么把值設(shè)為1 可以提高性能)。

注:當(dāng)從集合中創(chuàng)建出迭代器遍歷Map元素時(shí),不一定能看到正在添加的數(shù)據(jù),只能和集合保證弱一致性。(當(dāng)然使用迭代器不會(huì)因?yàn)椴榭凑诟淖兊腗ap,而拋出java.util.ConcurrentModifycationException)

b、CopyOnWriteArrayList接口:繼承自java.util.List接口。

顧名思義,在CopyOnWriteArrayList的實(shí)現(xiàn)類,所有對(duì)列表的更新操作都會(huì)新創(chuàng)建一個(gè)底層數(shù)組的副本,并使用副本來(lái)存儲(chǔ)數(shù)據(jù);對(duì)列表更新操作加鎖,讀取操作不加鎖。

適合多讀取少修改的場(chǎng)景,如果更新操作多,那么不適合用,同樣迭代器只能表示創(chuàng)建時(shí)列表的狀態(tài),更新后使用了新的底層數(shù)組,迭代器還是引用舊的底層數(shù)組。

九、多線程任務(wù)的執(zhí)行

過(guò)去線程的執(zhí)行,是先創(chuàng)建Thread類的想,再調(diào)用start方法啟動(dòng),這種做法要求開發(fā)人員對(duì)線程進(jìn)行維護(hù),在線程較多時(shí),一般創(chuàng)建一個(gè)線程池同一管理,同時(shí)降低重復(fù)創(chuàng)建線程的開銷

在J2SE5.0中,java.util.concurrent包提供了豐富的用來(lái)管理線程和執(zhí)行任務(wù)的實(shí)現(xiàn)。

1、基本接口(描述任務(wù))

a、Callable接口:

Runnable接口受限于run方法的類型簽名,而Callable只有一個(gè)方法call(),可以有返回值,可以拋出受檢異常。

b、Future接口:

過(guò)去,需要異步線程的任務(wù)執(zhí)行結(jié)果,要求主線程和任務(wù)執(zhí)行線程之間進(jìn)行同步和數(shù)據(jù)傳遞。

Future簡(jiǎn)化了任務(wù)的異步執(zhí)行,作為異步操作的一個(gè)抽象。調(diào)用get()方法可以獲取異步的執(zhí)行結(jié)果,如果任務(wù)沒(méi)有執(zhí)行完,會(huì)等待,直到任務(wù)完成或被取消,cancel()可以取消。

c、Delayed接口:

延遲執(zhí)行任務(wù),getDelay()返回當(dāng)前剩余的延遲時(shí)間,如果不大于0,說(shuō)明延遲時(shí)間已經(jīng)過(guò)去,應(yīng)該調(diào)度并執(zhí)行該任務(wù)。

2、組合接口(描述任務(wù))

a、RunnableFuture接口:繼承自Runnable接口和Future接口。

當(dāng)來(lái)自Runnalbe接口中的run方法成功執(zhí)行之后,相當(dāng)于Future接口表示的異步任務(wù)已經(jīng)完成,可以通過(guò)get()獲取運(yùn)行結(jié)果。

b、ScheduledFuture接口:繼承Future接口和Delayed接口,表示一個(gè)可以調(diào)用的異步操作。

c、RunnableScheduledFuture接口:繼承自Runnable、Delayed和Future,接口中包含isPeriodic,表明該異步操作是否可以被重復(fù)執(zhí)行。

3、Executor接口、ExcutorServer接口、ScheduleExecutorService接口和CompletionService接口(描述任務(wù)執(zhí)行)

a、executor接口,execute()用來(lái)執(zhí)行一個(gè)Runnable接口的實(shí)現(xiàn)對(duì)象,不同的Executor實(shí)現(xiàn)采取不同執(zhí)行策略,但提供的任務(wù)執(zhí)行功能比較弱。

b、excutorServer接口,繼承自executor;

提供了對(duì)任務(wù)的管理:submit(),可以吧Callable和Runnable作為任務(wù)提交,得到一個(gè)Future作為返回,可以獲取任務(wù)結(jié)果或取消任務(wù)。

提供批量執(zhí)行:invokeAll()和invokeAny(),同時(shí)提交多個(gè)Callable;invokeAll(),會(huì)等待所有任務(wù)都執(zhí)行完成,返回一個(gè)包含每個(gè)任務(wù)對(duì)應(yīng)Future的列表;invokeAny(),任何一個(gè)任務(wù)成功完成,即返回該任務(wù)結(jié)果。

提供任務(wù)關(guān)閉:shutdown()、shutdownNow()來(lái)關(guān)閉服務(wù),前者不允許新的任務(wù)提交,后者試圖終止正在運(yùn)行和等待的任務(wù),并返回已經(jīng)提交單沒(méi)有被運(yùn)行的任務(wù)列表。(兩個(gè)方法都不會(huì)等待服務(wù)真正關(guān)閉,只是發(fā)出關(guān)閉請(qǐng)求。)。

shutdownDow,通常做法是向線程發(fā)出中斷請(qǐng)求,所以確保提交的任務(wù)實(shí)現(xiàn)了正確的中斷處理邏輯。

c、ScheduleExecutorService接口,繼承自excutorServer接口:支持任務(wù)的延遲執(zhí)行和定期執(zhí)行,可以執(zhí)行Callable或Runnable。

schedule(),調(diào)度一個(gè)任務(wù)在延遲若干時(shí)間之后執(zhí)行;
scheduleAtFixedRate():在初始延遲后,每隔一段時(shí)間循環(huán)執(zhí)行;在下一次執(zhí)行開始時(shí),上一次執(zhí)行可能還未結(jié)束。(同一時(shí)間,可能有多個(gè))

scheduleWithFixedDelay:同上,只是在上一次任務(wù)執(zhí)行完后,經(jīng)過(guò)給定的間隔時(shí)間再開始下一次執(zhí)行。(同一時(shí)間,只有一個(gè))

以上三個(gè)方法都返回ScheduledFuture接口的實(shí)現(xiàn)對(duì)象。

d、CompletionService接口,共享任務(wù)執(zhí)行結(jié)果。

通常在使用ExecutorService接口,通過(guò)submit提交任務(wù),并得到一個(gè)Future接口來(lái)獲取任務(wù)結(jié)果,如果任務(wù)提交者和執(zhí)行結(jié)果的使用者是程序的不同部分,那就要把Future在不同部分進(jìn)行傳遞;而CompletionService就是解決這個(gè)問(wèn)題,程序不同部分可以共享CompletionService,任務(wù)提交后,執(zhí)行結(jié)果可以通過(guò)take(阻塞),poll(非阻塞)來(lái)獲取。

標(biāo)準(zhǔn)庫(kù)提供的實(shí)現(xiàn)是 ExecutorCompletionService,在創(chuàng)建時(shí),需要提供一個(gè)Executor接口的實(shí)現(xiàn)作為參數(shù),用來(lái)實(shí)際執(zhí)行任務(wù)。

例:多線程方式下載文件

public class FileDownloader{ 
 // 線程池 
 private final ExecutorService executor = Executors.newFixedThreadPool(10); 
 public boolean download(final URL url, final Path path){ 
 Future<Path> future = executor.submit(new Callable<Path>(){ //submit提交任務(wù) 
  public Path call(){ 
   //這里就省略IOException的處理了 
   InputStream is = url.openStream(); 
   Files.copy(is, path, StandardCopyOption.REPLACE_EXISTING); 
   return path; 
  }); 
  try{ 
   return future.get() !=null &#63; true : false; 
  }<span >catch(InterruptedException | ExecutionException e){</span> 
    return false; 
  } 
 } 
 public void close(){//當(dāng)不再使用FileDownloader類的對(duì)象時(shí),應(yīng)該使用close方法關(guān)閉其中包含的ExecutorService接口的實(shí)現(xiàn)對(duì)象,否則虛擬機(jī)不會(huì)退出,占用內(nèi)存不釋放 
  executor.shutdown();// 發(fā)出關(guān)閉請(qǐng)求,此時(shí)不會(huì)再接受新任務(wù) 
  try{ 
   if(!executor.awaitTermination(3, TimeUnit.MINUTES)){// awaitTermination 來(lái)等待一段時(shí)間,使正在執(zhí)行的任務(wù)或等待的任務(wù)有機(jī)會(huì)完成 
   executor.shutdownNow();// 如果等待時(shí)間過(guò)后還有任務(wù)沒(méi)完成,則強(qiáng)制結(jié)束 
   executor.awaitTermination(1, TimeUnit.MINUTES);// 再等待一段時(shí)間,使被強(qiáng)制結(jié)束的任務(wù)完成必要的清理工作 
   } 
  }catch(InterruptedException e){ 
   executor.shutdownNow(); 
   Thread.currentThread().interrupt(); 
  } 
 } 
} 

十、Java SE 7 新特性

對(duì)java.util.concurrent包進(jìn)行更新,增加了新的輕量級(jí)任務(wù)執(zhí)行框架fork/join和多階段線程同步工具。

1、輕量級(jí)任務(wù)執(zhí)行框架fork/join

這個(gè)框架的目的主要是更好地利用底層平臺(tái)上的多核和多處理器來(lái)進(jìn)行并行處理。

通過(guò)分治算法或map/reduce算法來(lái)解決問(wèn)題。

fork/join 類比于 map/reduce。

fork操作是把一個(gè)大的問(wèn)題劃分為若干個(gè)較小的問(wèn)題,劃分過(guò)程一般為遞歸,直到可以直接進(jìn)行計(jì)算的粒度適合的子問(wèn)題;子問(wèn)題在結(jié)算后,可以得到整個(gè)問(wèn)題的部分解

join操作收集子結(jié)果,合并,得到完整解,也可能是 遞歸進(jìn)行的。

相對(duì)一般的線程池實(shí)現(xiàn),F(xiàn)/J框架的優(yōu)勢(shì)在任務(wù)的處理方式上。在一般線程池中,一個(gè)線程由于某些原因無(wú)法運(yùn)行,會(huì)等待;而在F/J,某個(gè)子問(wèn)題由于等待另外一個(gè)子問(wèn)題的完成而無(wú)法繼續(xù)運(yùn)行,那么處理該子問(wèn)題的線程會(huì)主動(dòng)尋找其他尚未運(yùn)行的子問(wèn)題來(lái)執(zhí)行。這種方式減少了等待時(shí)間,提高了性能。

為了F/J能高效,在每個(gè)子問(wèn)題視線中應(yīng)避免使用synchronized或其他方式進(jìn)行同步,也不應(yīng)使用阻塞式IO或過(guò)多訪問(wèn)共享變量。在理想情況下,每個(gè)子問(wèn)題都應(yīng)值進(jìn)行CPU計(jì)算,只使用每個(gè)問(wèn)題的內(nèi)部對(duì)象,唯一的同步應(yīng)只發(fā)生在子問(wèn)題和創(chuàng)建它的父問(wèn)題之間。(這完全就是Hadoop的MapReduce嘛)

a、ForkJoinTask類:表示一個(gè)由F/J框架執(zhí)行的任務(wù),該類實(shí)現(xiàn)了Future接口,可以按照Future接口的方式來(lái)使用。(表示任務(wù))

fork(),異步方式啟動(dòng)任務(wù)的執(zhí)行。

join(),等待任務(wù)完成并返回執(zhí)行結(jié)果。

在創(chuàng)建自己的任務(wù)時(shí),最好不要直接繼承自ForkJoinTask,而是繼承其子類,RecuriveTask或RecursiveAction,前者可以返回結(jié)果,后者不行。

b、ForkJoinPool類:表示任務(wù)執(zhí)行,實(shí)現(xiàn)了ExecutorService接口,除了可以執(zhí)行ForkJoinTask,也可以執(zhí)行Callable和Runnable。(任務(wù)執(zhí)行)

執(zhí)行任務(wù)的兩大類:

第一類:execute、invoke或submit方法:直接提交任務(wù)。

第二類:fork():運(yùn)行ForkJoinTask在執(zhí)行過(guò)程中的子任務(wù)。

一般作法是表示整個(gè)問(wèn)題的ForkJoinTask用第一類提交,執(zhí)行過(guò)程中產(chǎn)生的子任務(wù)不需要處理,F(xiàn)orkJoinPool會(huì)負(fù)責(zé)子任務(wù)執(zhí)行。

例:查找數(shù)組中的最大值

private static class MaxValueTask extends RecursiveTask<Long>{ 
 private final long[] array; 
 private final int start; 
 private final int end; 
 MaxValueTask(long[] array, int start, int end){ 
  this.array = array; 
  this.start = start; 
  this.end = end; 
 } 
 //compute是RecursiveTask的主方法 
 protected long compute(){ 
  long max = Long.MIN_VALUE; 
  if(end - start < RANG_LENGTH){//尋找最大值 
   for(int i = start; i<end;i++{ 
   if(array[i] > max){ 
    max = array[i]; 
   } 
   } 
  }else{// 二分任務(wù) 
   int mid = (start + end) /2; 
   MaxValueTask lowTask = new MaxValueTask(array, start , mid); 
   MaxValueTask highTask = new MaxValueTask(array, mid, end); 
   lowTask.fork();// 異步啟動(dòng)任務(wù) 
   highTask.fork(); 
   max = Math.max(max, lowTask.join());//等待執(zhí)行結(jié)果 
   max = Math.max(max, highTask.join(); 
  } 
  return max; 
 } 
 public Long calculate(long[] array){ 
  MaxValueTask task = new MaxValueTask(array, 0 , array.length); 
  Long result = forkJoinPool.invoke(task); 
  return result; 
 } 
} 

注:這個(gè)例子是示例,但從性能上說(shuō)直接對(duì)整個(gè)數(shù)組順序比較效率高,畢竟多線程所帶來(lái)的額外開銷過(guò)大。
在實(shí)際中,F(xiàn)/J框架發(fā)揮作用的場(chǎng)合很多,比如在一個(gè)目錄包含的所有文本中搜索某個(gè)關(guān)鍵字,可以每個(gè)文件創(chuàng)建一個(gè)子任務(wù)。

如果相關(guān)的功能可以用遞歸和分治來(lái)解決,就適合F/J。

2、多階段線程同步工具

Phaser類是Java SE 7中新增的一個(gè)使用同步工具,功能和靈活性比倒數(shù)閘門和循環(huán)屏障要強(qiáng)很多。

在F/J框架中的子任務(wù)之間要進(jìn)行同步時(shí),應(yīng)優(yōu)先考慮Phaser。

Phaser把多個(gè)線程寫作執(zhí)行的任務(wù)劃分成多個(gè)階段(phase),編程時(shí)要明確各個(gè)階段的任務(wù),每個(gè)階段都可以有任意個(gè)參與者,線程可以隨時(shí)注冊(cè)并參與到某個(gè)階段,當(dāng)一個(gè)階段中所有線程都成功完成之后,Phaser的onAdvance()被調(diào)用,可以通過(guò)覆蓋添加自定義處理邏輯(類似循環(huán)屏障的使用的Runnable接口),然后Phaser類會(huì)自動(dòng)進(jìn)入下個(gè)階段。如此循環(huán),知道Phaser不再包含任何參與者。

Phaser創(chuàng)建后,初始階段編號(hào)為0,構(gòu)造函數(shù)中指定初始參與個(gè)數(shù)。

register(),bulkRegister(),動(dòng)態(tài)添加一個(gè)或多個(gè)參與者。

arrive(),某個(gè)參與者完成任務(wù)后調(diào)用

arriveAndDeregister(),任務(wù)完成,取消自己的注冊(cè)。

arriveAndAwaitAdvance(),自己完成等待其他參與者完成。,進(jìn)入阻塞,直到Phaser成功進(jìn)入下個(gè)階段。

awaitAdvance()、awaitAdvanceInterruptibly(),等待phaser進(jìn)入下個(gè)階段,參數(shù)為當(dāng)前階段的編號(hào),后者可以設(shè)置超時(shí)和處理中斷請(qǐng)求。

另外,Phaser的一個(gè)重要特征是多個(gè)Phaser可以組成樹形結(jié)構(gòu),Phaser提供了構(gòu)造方法來(lái)指定當(dāng)前對(duì)象的父對(duì)象;當(dāng)一個(gè)子對(duì)象參與者>0,會(huì)自動(dòng)注冊(cè)到父對(duì)象中;當(dāng)=0,自動(dòng)解除注冊(cè)。

例:從指定網(wǎng)址,下載img標(biāo)簽的照片

階段

1、處理網(wǎng)址對(duì)應(yīng)的html文本,和抽取img的鏈接;

2、創(chuàng)建圖片下載子線程,主線程等待;

3、子線程下載圖片,主線程等待;

4、任務(wù)完成退出

public class WebPageImageDownloader{ 
 private final Phaser phaser = new Phaser(1);//初始參與數(shù)1,代表主線程。 
 public void download(URL url, final Path path) throws IOException{ 
  String content = getContent(url);//獲得HTML文本,省略。 
  List<URL> imageUrls = extractImageUrls(content);//獲得圖片鏈接,省略。 
  for(final URL imageUrl : imageUrls){ 
   phaser.register();//子線程注冊(cè) 
   new Thread(){ 
   public void run(){ 
    phaser.arriveAndAwaitAdvance();//第二階段的等待,等待進(jìn)入第三階段 
    try{ 
     InputStream is = imageUrl.openStream(); 
     File.copy(is, getSavePath(path, imageUrl), StandardCopyOption.REPLACE_EXISTING); 
    }catch(IOException e){ 
     e.printStackTrace(): 
    }finally{ 
     phaser.arriveAndDeregister();//子線程完成任務(wù),退出。 
    } 
   } 
  }.start(); 
  } 
  phaser.arriveAndAwaitAdvance();//第二階段等待,子線程在注冊(cè) 
  phaser.arriveAndAwaitAdvance();//第三階段等待,子線程在下載 
  phaser.arriveAndDeregister();//所有線程退出。 
 } 
} 

十一、ThreadLocal類

java.lang.ThreadLocal,線程局部變量,把一個(gè)共享變量變?yōu)橐粋€(gè)線程的私有對(duì)象。不同線程訪問(wèn)一個(gè)ThreadLocal類的對(duì)象時(shí),鎖訪問(wèn)和修改的事每個(gè)線程變量各自獨(dú)立的對(duì)象。通過(guò)ThreadLocal可以快速把一個(gè)非線程安全的對(duì)象轉(zhuǎn)換成線程安全的對(duì)象。(同時(shí)也就不能達(dá)到數(shù)據(jù)傳遞的作用了)。

a、get()和set()分別用來(lái)獲取和設(shè)置當(dāng)前線程中包含的對(duì)象的值。

b、remove(),刪除。

c、initialValue(),初始化值。如果沒(méi)有通過(guò)set方法設(shè)置值,第一個(gè)調(diào)用get,會(huì)通過(guò)initValue來(lái)獲取對(duì)象的初始值。

ThreadLoacl的一般用法,創(chuàng)建一個(gè)ThreadLocal的匿名子類并覆蓋initalValue(),把ThreadLoacl的使用封裝在另一個(gè)類中

public class ThreadLocalIdGenerator{ 
 private static final ThreadLocal<IdGenerator> idGenerator = new ThreadLocal<IdGenerator>(){ 
   protected IdGenerator initalValue(){ 
   return new IdGenerator();//IdGenerator 是個(gè)初始int value =0,然后getNext(){ return value++} 
   } 
  }; 
 public static int getNext(){ 
  return idGenerator.get().getNext(); 
 } 
} 

ThreadLoal的另外一個(gè)作用是創(chuàng)建線程唯一的對(duì)象,在有些情況,一個(gè)對(duì)象在代碼中各個(gè)部分都需要用到,傳統(tǒng)做法是把這個(gè)對(duì)象作為參數(shù)在代碼間傳遞,如果使用這個(gè)對(duì)I昂的代碼都在同一個(gè)線程,可以封裝在ThreadLocal中。

如:在多線程中,生成隨機(jī)數(shù)

java.util.Random會(huì)帶來(lái)競(jìng)爭(zhēng)問(wèn)題,java.util.concurrent.ThreadLocalRandom類提供多線程下的隨機(jī)數(shù)聲場(chǎng),底層是ThreadLoacl。

上述內(nèi)容就是多線程并發(fā)編程如何在Java項(xiàng)目中實(shí)現(xiàn) ,你們學(xué)到知識(shí)或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識(shí)儲(chǔ)備,歡迎關(guān)注億速云行業(yè)資訊頻道。

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI