溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

linux內(nèi)核級同步機制--futex

發(fā)布時間:2020-07-22 10:24:51 來源:網(wǎng)絡(luò) 閱讀:2158 作者:Java筆記丶 欄目:編程語言

在面試中關(guān)于多線程同步,你必須要思考的問題 一文中,我們知道glibc的pthread_cond_timedwait底層是用linux futex機制實現(xiàn)的。

理想的同步機制應(yīng)該是沒有鎖沖突時在用戶態(tài)利用原子指令就解決問題,而需要掛起等待時再使用內(nèi)核提供的系統(tǒng)調(diào)用進行睡眠與喚醒。換句話說,在用戶態(tài)的自旋失敗時,能不能讓進程掛起,由持有鎖的線程釋放鎖時將其喚醒?
如果你沒有較深入地考慮過這個問題,很可能想當(dāng)然的認為類似于這樣就行了(偽代碼):

void?lock(int?lockval)?{	//trylock是用戶級的自旋鎖
	while(!trylock(lockval))?{
		wait();//釋放cpu,并將當(dāng)期線程加入等待隊列,是系統(tǒng)調(diào)用
	}
}boolean?trylock(int?lockval){	int?i=0;?
	//localval=1代表上鎖成功
	while(!compareAndSet(lockval,0,1)){		if(++i>10){			return?false;
		}
	}	return?true;
}void?unlock(int?lockval)?{
	?compareAndSet(lockval,1,0);
	?notify();
}

上述代碼的問題是trylock和wait兩個調(diào)用之間存在一個窗口:
如果一個線程trylock失敗,在調(diào)用wait時持有鎖的線程釋放了鎖,當(dāng)前線程還是會調(diào)用wait進行等待,但之后就沒有人再喚醒該線程了。

為了解決上述問題,linux內(nèi)核引入了futex機制,futex主要包括等待和喚醒兩個方法:futex_waitfutex_wake,其定義如下

//uaddr指向一個地址,val代表這個地址期待的值,當(dāng)*uaddr==val時,才會進行waitint?futex_wait(int?*uaddr,?int?val);//喚醒n個在uaddr指向的鎖變量上掛起等待的進程int?futex_wake(int?*uaddr,?int?n);

futex在真正將進程掛起之前會檢查addr指向的地址的值是否等于val,如果不相等則會立即返回,由用戶態(tài)繼續(xù)trylock。否則將當(dāng)期線程插入到一個隊列中去,并掛起。

在關(guān)于同步的一點思考-上文章中對futex的背景與基本原理有介紹,對futex不熟悉的人可以先看下。

本文將深入分析futex的實現(xiàn),讓讀者對于鎖的最底層實現(xiàn)方式有直觀認識,再結(jié)合之前的兩篇文章(關(guān)于同步的一點思考-上和關(guān)于同步的一點思考-下)能對操作系統(tǒng)的同步機制有個全面的理解。

下文中的進程一詞包括常規(guī)進程與線程。

futex_wait

在看下面的源碼分析前,先思考一個問題:如何確保掛起進程時,val的值是沒有被其他進程修改過的?

代碼在kernel/futex.c中

static?int?futex_wait(u32?__user?*uaddr,?int?fshared,
		??????u32?val,?ktime_t?*abs_time,?u32?bitset,?int?clockrt){	struct?hrtimer_sleeper?timeout,?*to?=?NULL;
	struct?restart_block?*restart;
	struct?futex_hash_bucket?*hb;
	struct?futex_q?q;
	int?ret;

	...	//設(shè)置hrtimer定時任務(wù):在一定時間(abs_time)后,如果進程還沒被喚醒則喚醒wait的進程
	if?(abs_time)?{
	????...
		hrtimer_init_sleeper(to,?current);
		...
	}

retry:	//該函數(shù)中判斷uaddr指向的值是否等于val,以及一些初始化操作
	ret?=?futex_wait_setup(uaddr,?val,?fshared,?&q,?&hb);	//如果val發(fā)生了改變,則直接返回
	if?(ret)		goto?out;	//將當(dāng)前進程狀態(tài)改為TASK_INTERRUPTIBLE,并插入到futex等待隊列,然后重新調(diào)度。
	futex_wait_queue_me(hb,?&q,?to);	/*?If?we?were?woken?(and?unqueued),?we?succeeded,?whatever.?*/
	ret?=?0;	//如果unqueue_me成功,則說明是超時觸發(fā)(因為futex_wake喚醒時,會將該進程移出等待隊列,所以這里會失敗)
	if?(!unqueue_me(&q))		goto?out_put_key;
	ret?=?-ETIMEDOUT;	if?(to?&&?!to->task)		goto?out_put_key;	/*
	?*?We?expect?signal_pending(current),?but?we?might?be?the
	?*?victim?of?a?spurious?wakeup?as?well.
	?*/
	if?(!signal_pending(current))?{
		put_futex_key(fshared,?&q.key);		goto?retry;
	}

	ret?=?-ERESTARTSYS;	if?(!abs_time)		goto?out_put_key;

	...

out_put_key:
	put_futex_key(fshared,?&q.key);
out:	if?(to)?{		//取消定時任務(wù)
		hrtimer_cancel(&to->timer);
		destroy_hrtimer_on_stack(&to->timer);
	}	return?ret;
}

在將進程阻塞前會將當(dāng)期進程插入到一個等待隊列中,需要注意的是這里說的等待隊列其實是一個類似Java HashMap的結(jié)構(gòu),全局唯一。

struct?futex_hash_bucket?{
	spinlock_t?lock;	//雙向鏈表
	struct?plist_head?chain;};static?struct?futex_hash_bucket?futex_queues[1<<FUTEX_HASHBITS];

著重看futex_wait_setup和兩個函數(shù)futex_wait_queue_me

static?int?futex_wait_setup(u32?__user?*uaddr,?u32?val,?int?fshared,			???struct?futex_q?*q,?struct?futex_hash_bucket?**hb){
	u32?uval;	int?ret;
retry:
	q->key?=?FUTEX_KEY_INIT;	//初始化futex_q
	ret?=?get_futex_key(uaddr,?fshared,?&q->key,?VERIFY_READ);	if?(unlikely(ret?!=?0))		return?ret;

retry_private:	//獲得自旋鎖
	*hb?=?queue_lock(q);	//原子的將uaddr的值設(shè)置到uval中
	ret?=?get_futex_value_locked(&uval,?uaddr);

???...?
	//如果當(dāng)期uaddr指向的值不等于val,即說明其他進程修改了
	//uaddr指向的值,等待條件不再成立,不用阻塞直接返回。
	if?(uval?!=?val)?{		//釋放鎖
		queue_unlock(q,?*hb);
		ret?=?-EWOULDBLOCK;
	}

???...	return?ret;
}

函數(shù)futex_wait_setup中主要做了兩件事,一是獲得自旋鎖,二是判斷*uaddr是否為預(yù)期值。

static?void?futex_wait_queue_me(struct?futex_hash_bucket?*hb,?struct?futex_q?*q,
				struct?hrtimer_sleeper?*timeout)
{	//設(shè)置進程狀態(tài)為TASK_INTERRUPTIBLE,cpu調(diào)度時只會選擇
	//狀態(tài)為TASK_RUNNING的進程
	set_current_state(TASK_INTERRUPTIBLE);	//將當(dāng)期進程(q封裝)插入到等待隊列中去,然后釋放自旋鎖
	queue_me(q,?hb);	//啟動定時任務(wù)
	if?(timeout)?{
		hrtimer_start_expires(&timeout->timer,?HRTIMER_MODE_ABS);		if?(!hrtimer_active(&timeout->timer))
			timeout->task?=?NULL;
	}	/*
	?*?If?we?have?been?removed?from?the?hash?list,?then?another?task
	?*?has?tried?to?wake?us,?and?we?can?skip?the?call?to?schedule().
	?*/
	if?(likely(!plist_node_empty(&q->list)))?{		?
		?//如果沒有設(shè)置過期時間?||?設(shè)置了過期時間且還沒過期
		if?(!timeout?||?timeout->task)			//系統(tǒng)重新進行進程調(diào)度,這個時候cpu會去執(zhí)行其他進程,該進程會阻塞在這里
			schedule();
	}	//走到這里說明又被cpu選中運行了
	__set_current_state(TASK_RUNNING);
}

futex_wait_queue_me中主要做幾件事:

  1. 將當(dāng)期進程插入到等待隊列

  2. 啟動定時任務(wù)

  3. 重新調(diào)度進程

如何保證條件與等待之間的原子性

futex_wait_setup方法中會加自旋鎖;在futex_wait_queue_me中將狀態(tài)設(shè)置為TASK_INTERRUPTIBLE,調(diào)用queue_me將當(dāng)期線程插入到等待隊列中,然后才釋放自旋鎖。也就是說檢查uaddr的值的過程跟進程掛起的過程放在同一個臨界區(qū)中。當(dāng)釋放自旋鎖后,這時再更改addr地址的值已經(jīng)沒有關(guān)系了,因為當(dāng)期進程已經(jīng)加入到等待隊列中,能被wake喚醒,不會出現(xiàn)本文開頭提到的沒人喚醒的問題。

futex_wait小結(jié)

總結(jié)下futex_wait流程:

  1. 加自旋鎖

  2. 檢測*uaddr是否等于val,如果不相等則會立即返回

  3. 將進程狀態(tài)設(shè)置為TASK_INTERRUPTIBLE

  4. 將當(dāng)期進程插入到等待隊列中

  5. 釋放自旋鎖

  6. 創(chuàng)建定時任務(wù):當(dāng)超過一定時間還沒被喚醒時,將進程喚醒

  7. 掛起當(dāng)前進程

futex_wake

futex_wake

static?int?futex_wake(u32?__user?*uaddr,?int?fshared,?int?nr_wake,?u32?bitset){	struct?futex_hash_bucket?*hb;
	struct?futex_q?*this,?*next;
	struct?plist_head?*head;
	union?futex_key?key?=?FUTEX_KEY_INIT;	int?ret;

	...	//根據(jù)uaddr的值填充&key的內(nèi)容
	ret?=?get_futex_key(uaddr,?fshared,?&key,?VERIFY_READ);	if?(unlikely(ret?!=?0))		goto?out;	//根據(jù)&key獲得對應(yīng)uaddr所在的futex_hash_bucket
	hb?=?hash_futex(&key);	//對該hb加自旋鎖
	spin_lock(&hb->lock);
	head?=?&hb->chain;	//遍歷該hb的鏈表,注意鏈表中存儲的節(jié)點是plist_node類型,而而這里的this卻是futex_q類型,這種類型轉(zhuǎn)換是通過c中的container_of機制實現(xiàn)的
	plist_for_each_entry_safe(this,?next,?head,?list)?{		if?(match_futex?(&this->key,?&key))?{
			...			//喚醒對應(yīng)進程
			wake_futex(this);			if?(++ret?>=?nr_wake)				break;
		}
	}	//釋放自旋鎖
	spin_unlock(&hb->lock);
	put_futex_key(fshared,?&key);
out:	return?ret;
}

futex_wake流程如下:

  1. 找到uaddr對應(yīng)的futex_hash_bucket,即代碼中的hb

  2. 對hb加自旋鎖

  3. 遍歷fb的鏈表,找到uaddr對應(yīng)的節(jié)點

  4. 調(diào)用wake_futex喚起等待的進程

  5. 釋放自旋鎖

wake_futex中將制定進程狀態(tài)設(shè)置為TASK_RUNNING并加入到系統(tǒng)調(diào)度列表中,同時將進程從futex的等待隊列中移除掉,具體代碼就不分析了,有興趣的可以自行研究。

End

Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進行線程同步,理解futex的實現(xiàn)能幫助你更好的理解與使用這些上層的同步機制。另外因篇幅與精力有限,涉及到進程調(diào)度的相關(guān)內(nèi)容沒有具體分析,不過并不妨礙理解文章內(nèi)容。


向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI