溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

面試中關(guān)于多線程同步,你必須要思考的問題

發(fā)布時(shí)間:2020-07-17 17:26:29 來源:網(wǎng)絡(luò) 閱讀:780 作者:Java筆記丶 欄目:編程語言

ReentrantLock的實(shí)現(xiàn)網(wǎng)上有很多文章了,本篇文章會(huì)簡單介紹下其java層實(shí)現(xiàn),重點(diǎn)放在分析競爭鎖失敗后如何阻塞線程。
因篇幅有限,synchronized的內(nèi)容將會(huì)放到下篇文章。

Java Lock的實(shí)現(xiàn)

ReentrantLock是jdk中常用的鎖實(shí)現(xiàn),其實(shí)現(xiàn)邏輯主語基于AQS(juc包中的大多數(shù)同步類實(shí)現(xiàn)都是基于AQS);接下來會(huì)簡單介紹AQS的大致原理,關(guān)于其實(shí)現(xiàn)細(xì)節(jié)以及各種應(yīng)用,之后會(huì)寫一篇文章具體分析。

AQS

AQS是類AbstractQueuedSynchronizer.java的簡稱,JUC包下的ReentrantLock、CyclicBarrier、CountdownLatch都使用到了AQS。

其大致原理如下:

  1. AQS維護(hù)一個(gè)叫做state的int型變量和一個(gè)雙向鏈表,state用來表示同步狀態(tài),雙向鏈表存儲(chǔ)的是等待鎖的線程

  2. 加鎖時(shí)首先調(diào)用tryAcquire嘗試獲得鎖,如果獲得鎖失敗,則將線程插入到雙向鏈表中,并調(diào)用LockSupport.park()方法阻塞當(dāng)前線程。

  3. 釋放鎖時(shí)調(diào)用LockSupport.unpark()喚起鏈表中的第一個(gè)節(jié)點(diǎn)的線程。被喚起的線程會(huì)重新走一遍競爭鎖的流程。

其中tryAcquire方法是抽象方法,具體實(shí)現(xiàn)取決于實(shí)現(xiàn)類,我們常說的公平鎖和非公平鎖的區(qū)別就在于該方法的實(shí)現(xiàn)。

ReentrantLock

ReentrantLock分為公平鎖和非公平鎖,我們只看公平鎖。
ReentrantLock.lock會(huì)調(diào)用到ReentrantLock#FairSync.lock中:

FairSync.java

??static?final?class?FairSync?extends?Sync?{??????
????????final?void?lock()?{
????????????acquire(1);
????????}????????/**
?????????*?Fair?version?of?tryAcquire.??Don't?grant?access?unless
?????????*?recursive?call?or?no?waiters?or?is?first.
?????????*/
????????protected?final?boolean?tryAcquire(int?acquires)?{????????????final?Thread?current?=?Thread.currentThread();????????????int?c?=?getState();????????????if?(c?==?0)?{????????????????if?(!hasQueuedPredecessors()?&&
????????????????????compareAndSetState(0,?acquires))?{
????????????????????setExclusiveOwnerThread(current);????????????????????return?true;
????????????????}
????????????}????????????else?if?(current?==?getExclusiveOwnerThread())?{????????????????int?nextc?=?c?+?acquires;????????????????if?(nextc?<?0)????????????????????throw?new?Error("Maximum?lock?count?exceeded");
????????????????setState(nextc);????????????????return?true;
????????????}????????????return?false;
????????}
????}

AbstractQueuedSynchronizer.java

???public?final?void?acquire(int?arg)?{????????if?(!tryAcquire(arg)?&&
????????????acquireQueued(addWaiter(Node.EXCLUSIVE),?arg))
????????????selfInterrupt();
????}

可以看到FairSync.lock調(diào)用了AQS的acquire方法,而在acquire中首先調(diào)用tryAcquire嘗試獲得鎖,以下兩種情況返回true:

  1. state==0(代表沒有線程持有鎖),且等待隊(duì)列為空(公平的實(shí)現(xiàn)),且cas修改state成功。

  2. 當(dāng)前線程已經(jīng)獲得了鎖,這次調(diào)用是重入

如果tryAcquire失敗則調(diào)用acquireQueued阻塞當(dāng)前線程。acquireQueued最終會(huì)調(diào)用到LockSupport.park()阻塞線程。

LockSupport.park

個(gè)人認(rèn)為,要深入理解鎖機(jī)制,一個(gè)很重要的點(diǎn)是理解系統(tǒng)是如何阻塞線程的。

LockSupport.java

????public?static?void?park(Object?blocker)?{
????????Thread?t?=?Thread.currentThread();
????????setBlocker(t,?blocker);
????????UNSAFE.park(false,?0L);
????????setBlocker(t,?null);
????}

park方法的參數(shù)blocker是用于負(fù)責(zé)這次阻塞的同步對(duì)象,在AQS的調(diào)用中,這個(gè)對(duì)象就是AQS本身。我們知道synchronized關(guān)鍵字是需要指定一個(gè)對(duì)象的(如果作用于方法上則是當(dāng)前對(duì)象或當(dāng)前類),與之類似blocker就是LockSupport指定的對(duì)象。

park方法調(diào)用了native方法UNSAFE.park,第一個(gè)參數(shù)代表第二個(gè)參數(shù)是否是絕對(duì)時(shí)間,第二個(gè)參數(shù)代表最長阻塞時(shí)間。

其實(shí)現(xiàn)如下,只保留核心代碼,完整代碼看查看unsafe.cpp

?Unsafe_Park(JNIEnv?*env,?jobject?unsafe,?jboolean?isAbsolute,?jlong?time){
?...?thread->parker()->park(isAbsolute?!=?0,?time);
?...
?}

park方法在os_linux.cpp中(其他操作系統(tǒng)的實(shí)現(xiàn)在os_xxx中)

void?Parker::park(bool?isAbsolute,?jlong?time)?{
??
??...??//獲得當(dāng)前線程
??Thread*?thread?=?Thread::current();
??assert(thread->is_Java_thread(),?"Must?be?JavaThread");
??JavaThread?*jt?=?(JavaThread?*)thread;?//如果當(dāng)前線程被設(shè)置了interrupted標(biāo)記,則直接返回
??if?(Thread::is_interrupted(thread,?false))?{????return;
??}?
??if?(time?>?0)?{??//unpacktime中根據(jù)isAbsolute的值來填充absTime結(jié)構(gòu)體,isAbsolute為true時(shí),time代表絕對(duì)時(shí)間且單位是毫秒,否則time是相對(duì)時(shí)間且單位是納秒
??//absTime.tvsec代表了對(duì)于時(shí)間的秒
??//absTime.tv_nsec代表對(duì)應(yīng)時(shí)間的納秒
????unpackTime(&absTime,?isAbsolute,?time);
??}	//調(diào)用mutex?trylock方法
????if?(Thread::is_interrupted(thread,?false)?||?pthread_mutex_trylock(_mutex)?!=?0)?{????return;
??}	
?	//_counter是一個(gè)許可的數(shù)量,跟ReentrantLock里定義的許可變量基本都是一個(gè)原理。?unpack方法調(diào)用時(shí)會(huì)將_counter賦值為1。
?	//_counter>0代表已經(jīng)有人調(diào)用了unpark,所以不用阻塞
??int?status?;??if?(_counter?>?0)??{?//?no?wait?needed
????_counter?=?0;????//釋放mutex鎖
????status?=?pthread_mutex_unlock(_mutex);????return;
??}//設(shè)置線程狀態(tài)為CONDVAR_WAIT
??OSThreadWaitState?osts(thread->osthread(),?false?/*?not?Object.wait()?*/);
?...?//等待
?_cur_index?=?isAbsolute???ABS_INDEX?:?REL_INDEX;
?pthread_cond_timedwait(&_cond[_cur_index],?_mutex,??&absTime);
?
?...??//釋放mutex鎖
??status?=?pthread_mutex_unlock(_mutex)?;
??
??
}

park方法用POSIX的pthread_cond_timedwait方法阻塞線程,調(diào)用pthread_cond_timedwait前需要先獲得鎖,因此park主要流程為:

  1. 調(diào)用pthread_mutex_trylock嘗試獲得鎖,如果獲取鎖失敗則直接返回

  2. 調(diào)用pthread_cond_timedwait進(jìn)行等待

  3. 調(diào)用pthread_mutex_unlock釋放鎖

另外,在阻塞當(dāng)前線程前,會(huì)調(diào)用OSThreadWaitState的構(gòu)造方法將線程狀態(tài)設(shè)置為CONDVAR_WAIT,在Jvm中Thread狀態(tài)枚舉如下

??enum?ThreadState?{
??ALLOCATED,????????????????????//?Memory?has?been?allocated?but?not?initialized
??INITIALIZED,??????????????????//?The?thread?has?been?initialized?but?yet?started
??RUNNABLE,?????????????????????//?Has?been?started?and?is?runnable,?but?not?necessarily?running
??MONITOR_WAIT,?????????????????//?Waiting?on?a?contended?monitor?lock
??CONDVAR_WAIT,?????????????????//?Waiting?on?a?condition?variable
??OBJECT_WAIT,??????????????????//?Waiting?on?an?Object.wait()?call
??BREAKPOINTED,?????????????????//?Suspended?at?breakpoint
??SLEEPING,?????????????????????//?Thread.sleep()
??ZOMBIE????????????????????????//?All?done,?but?not?reclaimed?yet};

Linux的timedwait

由上文我們可以知道LockSupport.park方法最終是由POSIX的
pthread_cond_timedwait的方法實(shí)現(xiàn)的。
我們現(xiàn)在就進(jìn)一步看看pthread_mutex_trylock,pthread_cond_timedwait,pthread_mutex_unlock這幾個(gè)方法是如何實(shí)現(xiàn)的。

Linux系統(tǒng)中相關(guān)代碼在glibc庫中。

pthread_mutex_trylock

先看trylock的實(shí)現(xiàn),
代碼在glibc的pthread_mutex_trylock.c文件中,該方法代碼很多,我們只看主要代碼

//pthread_mutex_t是posix中的互斥鎖結(jié)構(gòu)體int__pthread_mutex_trylock?(mutex)?????pthread_mutex_t?*mutex;
{??int?oldval;??pid_t?id?=?THREAD_GETMEM?(THREAD_SELF,?tid);switch?(__builtin_expect?(PTHREAD_MUTEX_TYPE?(mutex),
			????PTHREAD_MUTEX_TIMED_NP))
????{????
????case?PTHREAD_MUTEX_ERRORCHECK_NP:????case?PTHREAD_MUTEX_TIMED_NP:????case?PTHREAD_MUTEX_ADAPTIVE_NP:??????/*?Normal?mutex.??*/
??????if?(lll_trylock?(mutex->__data.__lock)?!=?0)	break;??????/*?Record?the?ownership.??*/
??????mutex->__data.__owner?=?id;
??????++mutex->__data.__nusers;??????return?0;
????}
????
}?
?//以下代碼在lowlevellock.h中??
???#define?__lll_trylock(futex)?\
??(atomic_compare_and_exchange_val_acq?(futex,?1,?0)?!=?0)
??#define?lll_trylock(futex)?__lll_trylock?(&(futex))

mutex默認(rèn)用的是PTHREAD_MUTEX_NORMAL類型(與PTHREAD_MUTEX_TIMED_NP相同);
因此會(huì)先調(diào)用lll_trylock方法,lll_trylock實(shí)際上是一個(gè)cas操作,如果mutex->__data.__lock==0則將其修改為1并返回0,否則返回1。

如果成功,則更改mutex中的owner為當(dāng)前線程。

pthread_mutex_unlock

pthread_mutex_unlock.c

intinternal_function?attribute_hidden__pthread_mutex_unlock_usercnt?(mutex,?decr)
?????pthread_mutex_t?*mutex;?????int?decr;
{????if?(__builtin_expect?(type,?PTHREAD_MUTEX_TIMED_NP)
??????==?PTHREAD_MUTEX_TIMED_NP)
????{??????/*?Always?reset?the?owner?field.??*/
????normal:
??????mutex->__data.__owner?=?0;??????if?(decr)	/*?One?less?user.??*/
	--mutex->__data.__nusers;??????/*?Unlock.??*/
??????lll_unlock?(mutex->__data.__lock,?PTHREAD_MUTEX_PSHARED?(mutex));??????return?0;
????}
?}

pthread_mutex_unlock將mutex中的owner清空,并調(diào)用了lll_unlock方法

lowlevellock.h

??#define?__lll_unlock(futex,?private)					??????\
??((void)?({								??????\????int?*__futex?=?(futex);						??????\????int?__val?=?atomic_exchange_rel?(__futex,?0);			??????\
									??????\????if?(__builtin_expect?(__val?>?1,?0))				??????\
??????lll_futex_wake?(__futex,?1,?private);				??????\
??}))#define?lll_unlock(futex,?private)?__lll_unlock(&(futex),?private)#define?lll_futex_wake(ftx,?nr,?private)				\({									\
???DO_INLINE_SYSCALL(futex,?3,?(long)?(ftx),				\
		?????__lll_private_flag?(FUTEX_WAKE,?private),		\
		?????(int)?(nr));					\
???_r10?==?-1???-_retval?:?_retval;					\
})

lll_unlock分為兩個(gè)步驟:

  1. 將futex設(shè)置為0并拿到設(shè)置之前的值(用戶態(tài)操作)

  2. 如果futex之前的值>1,代表存在鎖沖突,也就是說有線程調(diào)用了FUTEX_WAIT在休眠,所以通過調(diào)用系統(tǒng)函數(shù)FUTEX_WAKE喚醒休眠線程

FUTEX_WAKE?在上一篇文章有分析,futex機(jī)制的核心是當(dāng)獲得鎖時(shí),嘗試cas更改一個(gè)int型變量(用戶態(tài)操作),如果integer原始值是0,則修改成功,該線程獲得鎖,否則就將當(dāng)期線程放入到 wait queue中,wait queue中的線程不會(huì)被系統(tǒng)調(diào)度(內(nèi)核態(tài)操作)。

futex變量的值有3種:0代表當(dāng)前鎖空閑,1代表有線程持有當(dāng)前鎖,2代表存在鎖沖突。futex的值初始化時(shí)是0;當(dāng)調(diào)用try_lock的時(shí)候會(huì)利用cas操作改為1(見上面的trylock函數(shù));當(dāng)調(diào)用lll_lock時(shí),如果不存在鎖沖突,則將其改為1,否則改為2。

#define?__lll_lock(futex,?private)					??????\
??((void)?({								??????\????int?*__futex?=?(futex);						??????\????if?(__builtin_expect?(atomic_compare_and_exchange_bool_acq?(__futex,??????\
								1,?0),?0))????\
??????{									??????\	if?(__builtin_constant_p?(private)?&&?(private)?==?LLL_PRIVATE)	??????\
	??__lll_lock_wait_private?(__futex);				??????\	else								??????\
	??__lll_lock_wait?(__futex,?private);				??????\
??????}									??????\
??}))#define?lll_lock(futex,?private)?__lll_lock?(&(futex),?private)void
__lll_lock_wait_private?(int?*futex)
{//第一次進(jìn)來的時(shí)候futex==1,所以不會(huì)走這個(gè)if
??if?(*futex?==?2)
????lll_futex_wait?(futex,?2,?LLL_PRIVATE);//在這里會(huì)把futex設(shè)置成2,并調(diào)用futex_wait讓當(dāng)前線程等待??while?(atomic_exchange_acq?(futex,?2)?!=?0)
????lll_futex_wait?(futex,?2,?LLL_PRIVATE);
}

pthread_cond_timedwait

pthread_cond_timedwait用于阻塞線程,實(shí)現(xiàn)線程等待,
代碼在glibc的pthread_cond_timedwait.c文件中,代碼較長,你可以先簡單過一遍,看完下面的分析再重新讀一遍代碼

int
int
__pthread_cond_timedwait?(cond,?mutex,?abstime)
?????pthread_cond_t?*cond;
?????pthread_mutex_t?*mutex;?????const?struct?timespec?*abstime;
{
??struct?_pthread_cleanup_buffer?buffer;
??struct?_condvar_cleanup_buffer?cbuffer;
??int?result?=?0;??/*?Catch?invalid?parameters.??*/
??if?(abstime->tv_nsec?<?0?||?abstime->tv_nsec?>=?1000000000)????return?EINVAL;

??int?pshared?=?(cond->__data.__mutex?==?(void?*)?~0l)
		??LLL_SHARED?:?LLL_PRIVATE;??//1.獲得cond鎖
??lll_lock?(cond->__data.__lock,?pshared);??//2.釋放mutex鎖
??int?err?=?__pthread_mutex_unlock_usercnt?(mutex,?0);??if?(err)
????{
??????lll_unlock?(cond->__data.__lock,?pshared);??????return?err;
????}??/*?We?have?one?new?user?of?the?condvar.??*/
??//每執(zhí)行一次wait(pthread_cond_timedwait/pthread_cond_wait),__total_seq就會(huì)+1
??++cond->__data.__total_seq;??//用來執(zhí)行futex_wait的變量
??++cond->__data.__futex;??//標(biāo)識(shí)該cond還有多少線程在使用,pthread_cond_destroy需要等待所有的操作完成
??cond->__data.__nwaiters?+=?1?<<?COND_NWAITERS_SHIFT;??/*?Remember?the?mutex?we?are?using?here.??If?there?is?already?a
?????different?address?store?this?is?a?bad?user?bug.??Do?not?store
?????anything?for?pshared?condvars.??*/
??//保存mutex鎖
??if?(cond->__data.__mutex?!=?(void?*)?~0l)
????cond->__data.__mutex?=?mutex;??/*?Prepare?structure?passed?to?cancellation?handler.??*/
??cbuffer.cond?=?cond;
??cbuffer.mutex?=?mutex;??/*?Before?we?block?we?enable?cancellation.??Therefore?we?have?to
?????install?a?cancellation?handler.??*/
??__pthread_cleanup_push?(&buffer,?__condvar_cleanup,?&cbuffer);??/*?The?current?values?of?the?wakeup?counter.??The?"woken"?counter
?????must?exceed?this?value.??*/
??//記錄futex_wait前的__wakeup_seq(為該cond上執(zhí)行了多少次sign操作+timeout次數(shù))和__broadcast_seq(代表在該cond上執(zhí)行了多少次broadcast)
??unsigned?long?long?int?val;
??unsigned?long?long?int?seq;
??val?=?seq?=?cond->__data.__wakeup_seq;??/*?Remember?the?broadcast?counter.??*/
??cbuffer.bc_seq?=?cond->__data.__broadcast_seq;??while?(1)
????{??????//3.計(jì)算要wait的相對(duì)時(shí)間
??????struct?timespec?rt;
??????{#ifdef?__NR_clock_gettime
	INTERNAL_SYSCALL_DECL?(err);
	int?ret;
	ret?=?INTERNAL_VSYSCALL?(clock_gettime,?err,?2,
				(cond->__data.__nwaiters
				?&?((1?<<?COND_NWAITERS_SHIFT)?-?1)),
				&rt);#?ifndef?__ASSUME_POSIX_TIMERS
	if?(__builtin_expect?(INTERNAL_SYSCALL_ERROR_P?(ret,?err),?0))
	??{
	????struct?timeval?tv;
	????(void)?gettimeofday?(&tv,?NULL);	????/*?Convert?the?absolute?timeout?value?to?a?relative?timeout.??*/
	????rt.tv_sec?=?abstime->tv_sec?-?tv.tv_sec;
	????rt.tv_nsec?=?abstime->tv_nsec?-?tv.tv_usec?*?1000;
	??}	else#?endif
	??{	????/*?Convert?the?absolute?timeout?value?to?a?relative?timeout.??*/
	????rt.tv_sec?=?abstime->tv_sec?-?rt.tv_sec;
	????rt.tv_nsec?=?abstime->tv_nsec?-?rt.tv_nsec;
	??}#else
	/*?Get?the?current?time.??So?far?we?support?only?one?clock.??*/
	struct?timeval?tv;
	(void)?gettimeofday?(&tv,?NULL);	/*?Convert?the?absolute?timeout?value?to?a?relative?timeout.??*/
	rt.tv_sec?=?abstime->tv_sec?-?tv.tv_sec;
	rt.tv_nsec?=?abstime->tv_nsec?-?tv.tv_usec?*?1000;#endif
??????}??????if?(rt.tv_nsec?<?0)
	{
	??rt.tv_nsec?+=?1000000000;
	??--rt.tv_sec;
	}???/*---計(jì)算要wait的相對(duì)時(shí)間?end----?*/

??//是否超時(shí)
??????/*?Did?we?already?time?out???*/
??????if?(__builtin_expect?(rt.tv_sec?<?0,?0))
	{????//被broadcast喚醒,這里疑問的是,為什么不需要判斷__wakeup_seq?
	??if?(cbuffer.bc_seq?!=?cond->__data.__broadcast_seq)	????goto?bc_out;	??goto?timeout;
	}

??????unsigned?int?futex_val?=?cond->__data.__futex;??????//4.釋放cond鎖,準(zhǔn)備wait
??????lll_unlock?(cond->__data.__lock,?pshared);??????/*?Enable?asynchronous?cancellation.??Required?by?the?standard.??*/
??????cbuffer.oldtype?=?__pthread_enable_asynccancel?();??????//5.調(diào)用futex_wait
??????/*?Wait?until?woken?by?signal?or?broadcast.??*/
??????err?=?lll_futex_timed_wait?(&cond->__data.__futex,
				??futex_val,?&rt,?pshared);??????/*?Disable?asynchronous?cancellation.??*/
??????__pthread_disable_asynccancel?(cbuffer.oldtype);??????//6.重新獲得cond鎖,因?yàn)橛忠L問&修改cond的數(shù)據(jù)了
??????lll_lock?(cond->__data.__lock,?pshared);??????//__broadcast_seq值發(fā)生改變,代表發(fā)生了有線程調(diào)用了廣播
??????if?(cbuffer.bc_seq?!=?cond->__data.__broadcast_seq)	goto?bc_out;?????//判斷是否是被sign喚醒的,sign會(huì)增加__wakeup_seq
?????//第二個(gè)條件cond->__data.__woken_seq?!=?val的意義在于
????//可能兩個(gè)線程A、B在wait,一個(gè)線程調(diào)用了sign導(dǎo)致A被喚醒,這時(shí)B因?yàn)槌瑫r(shí)被喚醒
????//對(duì)于B線程來說,執(zhí)行到這里時(shí)第一個(gè)條件也是滿足的,從而導(dǎo)致上層拿到的result不是超時(shí)
????//所以這里需要判斷下__woken_seq(即該cond已經(jīng)被喚醒的線程數(shù))是否等于__wakeup_seq(sign執(zhí)行次數(shù)+timeout次數(shù))
??????val?=?cond->__data.__wakeup_seq;??????if?(val?!=?seq?&&?cond->__data.__woken_seq?!=?val)	break;??????/*?Not?woken?yet.??Maybe?the?time?expired???*/
??????if?(__builtin_expect?(err?==?-ETIMEDOUT,?0))
	{
	timeout:	??/*?Yep.??Adjust?the?counters.??*/
	??++cond->__data.__wakeup_seq;
	??++cond->__data.__futex;	??/*?The?error?value.??*/
	??result?=?ETIMEDOUT;	??break;
	}
????}??//一個(gè)線程已經(jīng)醒了所以這里__woken_seq?+1
??++cond->__data.__woken_seq;

?bc_out:??//
??cond->__data.__nwaiters?-=?1?<<?COND_NWAITERS_SHIFT;??/*?If?pthread_cond_destroy?was?called?on?this?variable?already,
?????notify?the?pthread_cond_destroy?caller?all?waiters?have?left
?????and?it?can?be?successfully?destroyed.??*/
??if?(cond->__data.__total_seq?==?-1ULL
??????&&?cond->__data.__nwaiters?<?(1?<<?COND_NWAITERS_SHIFT))
????lll_futex_wake?(&cond->__data.__nwaiters,?1,?pshared);?//9.cond數(shù)據(jù)修改完畢,釋放鎖
??lll_unlock?(cond->__data.__lock,?pshared);??/*?The?cancellation?handling?is?back?to?normal,?remove?the?handler.??*/
??__pthread_cleanup_pop?(&buffer,?0);?//10.重新獲得mutex鎖
??err?=?__pthread_mutex_cond_lock?(mutex);??return?err??:?result;
}

上面的代碼雖然加了注釋,但相信大多數(shù)人第一次看都看不懂。
我們來簡單梳理下,上面代碼有兩把鎖,一把是mutex鎖,一把cond鎖。另外,在調(diào)用pthread_cond_timedwait前后必須調(diào)用pthread_mutex_lock(&mutex);pthread_mutex_unlock(&mutex);加/解mutex鎖。

因此pthread_cond_timedwait的使用大致分為幾個(gè)流程:

  1. 加mutex鎖(在pthread_cond_timedwait調(diào)用前)

  2. 加cond鎖

  3. 釋放mutex鎖

  4. 修改cond數(shù)據(jù)

  5. 釋放cond鎖

  6. 執(zhí)行futex_wait

  7. 重新獲得cond鎖

  8. 比較cond的數(shù)據(jù),判斷當(dāng)前線程是被正常喚醒的還是timeout喚醒的,需不需要重新wait

  9. 修改cond數(shù)據(jù)

  10. 是否cond鎖

  11. 重新獲得mutex鎖

  12. 釋放mutex鎖(在pthread_cond_timedwait調(diào)用后)

看到這里,你可能有幾點(diǎn)疑問:為什么需要兩把鎖?mutex鎖和cond鎖的作用是什么?

mutex鎖

說mutex鎖的作用之前,我們回顧一下java的Object.wait的使用。Object.wait必須是在synchronized同步塊中使用。試想下如果不加synchronized也能運(yùn)行Object.wait的話會(huì)存在什么問題?

Object?condObj=new?Object();
voilate?int?flag?=?0;public?void?waitTest(){	if(flag?==?0){
		condObj.wait();
	}
}public?void?notifyTest(){
	flag=1;
	condObj.notify();
}

如上代碼,A線程調(diào)用waitTest,這時(shí)flag==0,所以準(zhǔn)備調(diào)用wait方法進(jìn)行休眠,這時(shí)B線程開始執(zhí)行,調(diào)用notifyTest將flag置為1,并調(diào)用notify方法,注意:此時(shí)A線程還沒調(diào)用wait,所以notfiy沒有喚醒任何線程。然后A線程繼續(xù)執(zhí)行,調(diào)用wait方法進(jìn)行休眠,而之后不會(huì)有人來喚醒A線程,A線程將永久wait下去!

Object?condObj=new?Object();
voilate?int?flag?=?0;public?void?waitTest(){	synchronized(condObj){		if(flag?==?0){
			condObj.wait();
		}
	}
	
}public?void?notifyTest(){	synchronized(condObj){
		flag=1;
		condObj.notify();
	}
}

在有鎖保護(hù)下的情況下, 當(dāng)調(diào)用condObj.wait時(shí),flag一定是等于0的,不會(huì)存在一直wait的問題。

回到pthread_cond_timedwait,其需要加mutex鎖的原因就呼之欲出了:保證wait和其wait條件的原子性

不管是glibc的pthread_cond_timedwait/pthread_cond_signal還是java層的Object.wait/Object.notify,Jdk AQS的Condition.await/Condition.signal,所有的Condition機(jī)制都需要在加鎖環(huán)境下才能使用,其根本原因就是要保證進(jìn)行線程休眠時(shí),條件變量是沒有被篡改的。

注意下mutex鎖釋放的時(shí)機(jī),回顧上文中pthread_cond_timedwait的流程,在第2步時(shí)就釋放了mutex鎖,之后調(diào)用futex_wait進(jìn)行休眠,為什么要在休眠前就釋放mutex鎖呢?原因也很簡單:如果不釋放mutex鎖就開始休眠,那其他線程就永遠(yuǎn)無法調(diào)用signal方法將休眠線程喚醒(因?yàn)檎{(diào)用signal方法前需要獲得mutex鎖)。

在線程被喚醒之后還要在第10步中重新獲得mutex鎖是為了保證鎖的語義(思考下如果不重新獲得mutex鎖會(huì)發(fā)生什么)。

cond鎖

cond鎖的作用其實(shí)很簡單: 保證對(duì)象cond->data的線程安全。
pthread_cond_timedwait時(shí)需要修改cond->data的數(shù)據(jù),如增加__total_seq(在這個(gè)cond上一共執(zhí)行過多少次wait)增加__nwaiters(現(xiàn)在還有多少個(gè)線程在wait這個(gè)cond),所有在修改及訪問cond->data時(shí)需要加cond鎖。

這里我沒想明白的一點(diǎn)是,用mutex鎖也能保證cond->data修改的線程安全,只要晚一點(diǎn)釋放mutex鎖就行了。為什么要先釋放mutex,重新獲得cond來保證線程安全? 是為了避免mutex鎖住的范圍太大嗎?

該問題的答案可以見評(píng)論區(qū)@11800222?的回答:

mutex鎖不能保護(hù)cond->data修改的線程安全,調(diào)用signal的線程沒有用mutex鎖保護(hù)修改cond的那段臨界區(qū)。

pthread_cond_wait/signal這一對(duì)本身用cond鎖同步就能睡眠喚醒。
wait的時(shí)候需要傳入mutex是因?yàn)樗咔靶枰尫舖utex鎖,但睡眠之前又不能有無鎖的空隙,解決辦法是讓mutex鎖在cond鎖上之后再釋放。
而signal前不需要釋放mutex鎖,在持有mutex的情況下signal,之后再釋放mutex鎖。

如何喚醒休眠線程

喚醒休眠線程的代碼比較簡單,主要就是調(diào)用lll_futex_wake。

int
__pthread_cond_signal?(cond)
?????pthread_cond_t?*cond;
{
??int?pshared?=?(cond->__data.__mutex?==?(void?*)?~0l)
		??LLL_SHARED?:?LLL_PRIVATE;??//因?yàn)橐僮鱟ond的數(shù)據(jù),所以要加鎖
??lll_lock?(cond->__data.__lock,?pshared);??/*?Are?there?any?waiters?to?be?woken???*/
??if?(cond->__data.__total_seq?>?cond->__data.__wakeup_seq)
????{??????//__wakeup_seq為執(zhí)行sign與timeout次數(shù)的和
??????++cond->__data.__wakeup_seq;
??????++cond->__data.__futex;

???????...		//喚醒wait的線程
??????lll_futex_wake?(&cond->__data.__futex,?1,?pshared);
????}??/*?We?are?done.??*/
??lll_unlock?(cond->__data.__lock,?pshared);??return?0;
}

End

本文對(duì)Java簡單介紹了ReentrantLock實(shí)現(xiàn)原理,對(duì)LockSupport.park底層實(shí)現(xiàn)pthread_cond_timedwait機(jī)制做了詳細(xì)分析。

看完這篇文章,你可能還會(huì)有疑問:Synchronized鎖的實(shí)現(xiàn)和ReentrantLock是一樣的嗎?Thread.sleep/Object.wait休眠線程的原理和LockSupport.park有什么區(qū)別?linux內(nèi)核層的futex的具體是如何實(shí)現(xiàn)的?

?


向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI