您好,登錄后才能下訂單哦!
在面試中關(guān)于多線程同步,你必須要思考的問題 一文中,我們知道glibc的pthread_cond_timedwait
底層是用linux futex機制實現(xiàn)的。
理想的同步機制應(yīng)該是沒有鎖沖突時在用戶態(tài)利用原子指令就解決問題,而需要掛起等待時再使用內(nèi)核提供的系統(tǒng)調(diào)用進行睡眠與喚醒。換句話說,在用戶態(tài)的自旋失敗時,能不能讓進程掛起,由持有鎖的線程釋放鎖時將其喚醒?
如果你沒有較深入地考慮過這個問題,很可能想當(dāng)然的認為類似于這樣就行了(偽代碼):
void?lock(int?lockval)?{ //trylock是用戶級的自旋鎖 while(!trylock(lockval))?{ wait();//釋放cpu,并將當(dāng)期線程加入等待隊列,是系統(tǒng)調(diào)用 } }boolean?trylock(int?lockval){ int?i=0;? //localval=1代表上鎖成功 while(!compareAndSet(lockval,0,1)){ if(++i>10){ return?false; } } return?true; }void?unlock(int?lockval)?{ ?compareAndSet(lockval,1,0); ?notify(); }
上述代碼的問題是trylock和wait兩個調(diào)用之間存在一個窗口:
如果一個線程trylock失敗,在調(diào)用wait時持有鎖的線程釋放了鎖,當(dāng)前線程還是會調(diào)用wait進行等待,但之后就沒有人再喚醒該線程了。
為了解決上述問題,linux內(nèi)核引入了futex機制,futex主要包括等待和喚醒兩個方法:futex_wait
和futex_wake
,其定義如下
//uaddr指向一個地址,val代表這個地址期待的值,當(dāng)*uaddr==val時,才會進行waitint?futex_wait(int?*uaddr,?int?val);//喚醒n個在uaddr指向的鎖變量上掛起等待的進程int?futex_wake(int?*uaddr,?int?n);
futex在真正將進程掛起之前會檢查addr指向的地址的值是否等于val,如果不相等則會立即返回,由用戶態(tài)繼續(xù)trylock。否則將當(dāng)期線程插入到一個隊列中去,并掛起。
在關(guān)于同步的一點思考-上文章中對futex的背景與基本原理有介紹,對futex不熟悉的人可以先看下。
本文將深入分析futex的實現(xiàn),讓讀者對于鎖的最底層實現(xiàn)方式有直觀認識,再結(jié)合之前的兩篇文章(關(guān)于同步的一點思考-上和關(guān)于同步的一點思考-下)能對操作系統(tǒng)的同步機制有個全面的理解。
下文中的進程一詞包括常規(guī)進程與線程。
在看下面的源碼分析前,先思考一個問題:如何確保掛起進程時,val的值是沒有被其他進程修改過的?
代碼在kernel/futex.c中
static?int?futex_wait(u32?__user?*uaddr,?int?fshared, ??????u32?val,?ktime_t?*abs_time,?u32?bitset,?int?clockrt){ struct?hrtimer_sleeper?timeout,?*to?=?NULL; struct?restart_block?*restart; struct?futex_hash_bucket?*hb; struct?futex_q?q; int?ret; ... //設(shè)置hrtimer定時任務(wù):在一定時間(abs_time)后,如果進程還沒被喚醒則喚醒wait的進程 if?(abs_time)?{ ????... hrtimer_init_sleeper(to,?current); ... } retry: //該函數(shù)中判斷uaddr指向的值是否等于val,以及一些初始化操作 ret?=?futex_wait_setup(uaddr,?val,?fshared,?&q,?&hb); //如果val發(fā)生了改變,則直接返回 if?(ret) goto?out; //將當(dāng)前進程狀態(tài)改為TASK_INTERRUPTIBLE,并插入到futex等待隊列,然后重新調(diào)度。 futex_wait_queue_me(hb,?&q,?to); /*?If?we?were?woken?(and?unqueued),?we?succeeded,?whatever.?*/ ret?=?0; //如果unqueue_me成功,則說明是超時觸發(fā)(因為futex_wake喚醒時,會將該進程移出等待隊列,所以這里會失敗) if?(!unqueue_me(&q)) goto?out_put_key; ret?=?-ETIMEDOUT; if?(to?&&?!to->task) goto?out_put_key; /* ?*?We?expect?signal_pending(current),?but?we?might?be?the ?*?victim?of?a?spurious?wakeup?as?well. ?*/ if?(!signal_pending(current))?{ put_futex_key(fshared,?&q.key); goto?retry; } ret?=?-ERESTARTSYS; if?(!abs_time) goto?out_put_key; ... out_put_key: put_futex_key(fshared,?&q.key); out: if?(to)?{ //取消定時任務(wù) hrtimer_cancel(&to->timer); destroy_hrtimer_on_stack(&to->timer); } return?ret; }
在將進程阻塞前會將當(dāng)期進程插入到一個等待隊列中,需要注意的是這里說的等待隊列其實是一個類似Java HashMap的結(jié)構(gòu),全局唯一。
struct?futex_hash_bucket?{ spinlock_t?lock; //雙向鏈表 struct?plist_head?chain;};static?struct?futex_hash_bucket?futex_queues[1<<FUTEX_HASHBITS];
著重看futex_wait_setup
和兩個函數(shù)futex_wait_queue_me
static?int?futex_wait_setup(u32?__user?*uaddr,?u32?val,?int?fshared, ???struct?futex_q?*q,?struct?futex_hash_bucket?**hb){ u32?uval; int?ret; retry: q->key?=?FUTEX_KEY_INIT; //初始化futex_q ret?=?get_futex_key(uaddr,?fshared,?&q->key,?VERIFY_READ); if?(unlikely(ret?!=?0)) return?ret; retry_private: //獲得自旋鎖 *hb?=?queue_lock(q); //原子的將uaddr的值設(shè)置到uval中 ret?=?get_futex_value_locked(&uval,?uaddr); ???...? //如果當(dāng)期uaddr指向的值不等于val,即說明其他進程修改了 //uaddr指向的值,等待條件不再成立,不用阻塞直接返回。 if?(uval?!=?val)?{ //釋放鎖 queue_unlock(q,?*hb); ret?=?-EWOULDBLOCK; } ???... return?ret; }
函數(shù)futex_wait_setup
中主要做了兩件事,一是獲得自旋鎖,二是判斷*uaddr是否為預(yù)期值。
static?void?futex_wait_queue_me(struct?futex_hash_bucket?*hb,?struct?futex_q?*q, struct?hrtimer_sleeper?*timeout) { //設(shè)置進程狀態(tài)為TASK_INTERRUPTIBLE,cpu調(diào)度時只會選擇 //狀態(tài)為TASK_RUNNING的進程 set_current_state(TASK_INTERRUPTIBLE); //將當(dāng)期進程(q封裝)插入到等待隊列中去,然后釋放自旋鎖 queue_me(q,?hb); //啟動定時任務(wù) if?(timeout)?{ hrtimer_start_expires(&timeout->timer,?HRTIMER_MODE_ABS); if?(!hrtimer_active(&timeout->timer)) timeout->task?=?NULL; } /* ?*?If?we?have?been?removed?from?the?hash?list,?then?another?task ?*?has?tried?to?wake?us,?and?we?can?skip?the?call?to?schedule(). ?*/ if?(likely(!plist_node_empty(&q->list)))?{ ? ?//如果沒有設(shè)置過期時間?||?設(shè)置了過期時間且還沒過期 if?(!timeout?||?timeout->task) //系統(tǒng)重新進行進程調(diào)度,這個時候cpu會去執(zhí)行其他進程,該進程會阻塞在這里 schedule(); } //走到這里說明又被cpu選中運行了 __set_current_state(TASK_RUNNING); }
futex_wait_queue_me
中主要做幾件事:
將當(dāng)期進程插入到等待隊列
啟動定時任務(wù)
重新調(diào)度進程
在futex_wait_setup
方法中會加自旋鎖;在futex_wait_queue_me
中將狀態(tài)設(shè)置為TASK_INTERRUPTIBLE
,調(diào)用queue_me
將當(dāng)期線程插入到等待隊列中,然后才釋放自旋鎖。也就是說檢查uaddr的值的過程跟進程掛起的過程放在同一個臨界區(qū)中。當(dāng)釋放自旋鎖后,這時再更改addr地址的值已經(jīng)沒有關(guān)系了,因為當(dāng)期進程已經(jīng)加入到等待隊列中,能被wake喚醒,不會出現(xiàn)本文開頭提到的沒人喚醒的問題。
總結(jié)下futex_wait
流程:
加自旋鎖
檢測*uaddr是否等于val,如果不相等則會立即返回
將進程狀態(tài)設(shè)置為TASK_INTERRUPTIBLE
將當(dāng)期進程插入到等待隊列中
釋放自旋鎖
創(chuàng)建定時任務(wù):當(dāng)超過一定時間還沒被喚醒時,將進程喚醒
掛起當(dāng)前進程
futex_wake
static?int?futex_wake(u32?__user?*uaddr,?int?fshared,?int?nr_wake,?u32?bitset){ struct?futex_hash_bucket?*hb; struct?futex_q?*this,?*next; struct?plist_head?*head; union?futex_key?key?=?FUTEX_KEY_INIT; int?ret; ... //根據(jù)uaddr的值填充&key的內(nèi)容 ret?=?get_futex_key(uaddr,?fshared,?&key,?VERIFY_READ); if?(unlikely(ret?!=?0)) goto?out; //根據(jù)&key獲得對應(yīng)uaddr所在的futex_hash_bucket hb?=?hash_futex(&key); //對該hb加自旋鎖 spin_lock(&hb->lock); head?=?&hb->chain; //遍歷該hb的鏈表,注意鏈表中存儲的節(jié)點是plist_node類型,而而這里的this卻是futex_q類型,這種類型轉(zhuǎn)換是通過c中的container_of機制實現(xiàn)的 plist_for_each_entry_safe(this,?next,?head,?list)?{ if?(match_futex?(&this->key,?&key))?{ ... //喚醒對應(yīng)進程 wake_futex(this); if?(++ret?>=?nr_wake) break; } } //釋放自旋鎖 spin_unlock(&hb->lock); put_futex_key(fshared,?&key); out: return?ret; }
futex_wake
流程如下:
找到uaddr對應(yīng)的futex_hash_bucket
,即代碼中的hb
對hb加自旋鎖
遍歷fb的鏈表,找到uaddr對應(yīng)的節(jié)點
調(diào)用wake_futex
喚起等待的進程
釋放自旋鎖
wake_futex
中將制定進程狀態(tài)設(shè)置為TASK_RUNNING
并加入到系統(tǒng)調(diào)度列表中,同時將進程從futex的等待隊列中移除掉,具體代碼就不分析了,有興趣的可以自行研究。
Java中的ReentrantLock,Object.wait和Thread.sleep等等底層都是用futex進行線程同步,理解futex的實現(xiàn)能幫助你更好的理解與使用這些上層的同步機制。另外因篇幅與精力有限,涉及到進程調(diào)度的相關(guān)內(nèi)容沒有具體分析,不過并不妨礙理解文章內(nèi)容。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。