您好,登錄后才能下訂單哦!
本篇內(nèi)容主要講解“srs使用的開(kāi)源c語(yǔ)言網(wǎng)絡(luò)協(xié)程庫(kù)state thread源碼是什么”,感興趣的朋友不妨來(lái)看看。本文介紹的方法操作簡(jiǎn)單快捷,實(shí)用性強(qiáng)。下面就讓小編來(lái)帶大家學(xué)習(xí)“srs使用的開(kāi)源c語(yǔ)言網(wǎng)絡(luò)協(xié)程庫(kù)state thread源碼是什么”吧!
state thread是一個(gè)開(kāi)源的c語(yǔ)言網(wǎng)絡(luò)協(xié)程庫(kù),它在用戶空間實(shí)現(xiàn)了協(xié)程調(diào)度
st最初是由網(wǎng)景(Netscape)公司的MSPR(Netscape Portable Runtime library)項(xiàng)目中剝離出來(lái),后由SGI(Silicon Graphic Inc)和Yahoo!公司(前者是主力)共同開(kāi)發(fā)維護(hù)。
2001年發(fā)布v1.0以來(lái)一直到2009年v1.9穩(wěn)定版后未再變動(dòng)
State Threads:回調(diào)終結(jié)者(必讀)
https://blog.csdn.net/caoshangpa/article/details/79565411
st-1.9.tar.gz 是原版, http://state-threads.sourceforge.net/
state-threads-1.9.1.tar.gz 是srs修改版, https://github.com/ossrs/state-threads
st源碼編譯
tar zxvf st-1.9.tar.gz
cd st-1.9
make linux-debug // make命令可以查看支持的編譯選項(xiàng)
obj目錄有編譯生成的文件st.h, lib*.so,lib*.a
examples目錄有幾個(gè)例子lookupdns,proxy,server
需要的知識(shí)點(diǎn)
1 匯編語(yǔ)言(非必需)
2 線程的棧管理(非必需)
3 線程的調(diào)度和同步(必須)。線程不同步的測(cè)試代碼thread.c
4 setjmp/longjmp的使用(必須)。測(cè)試代碼setjmp.c
5 epoll原理和使用(必須)。測(cè)試代碼epoll_server.c 和 epoll_client.c
測(cè)試代碼以及文檔下載地址
鏈接: https://pan.baidu.com/s/1kQz3S1YIt6zUwMKScrnHaQ
提取碼: pu9z
分析state_thread源碼的目的,是為了正確的使用它
st中thread其實(shí)是協(xié)程的概念
st_xxx分為 io類 和 延遲類
一些重要的數(shù)據(jù)結(jié)構(gòu)
_st_vp_t _st_this_vp; virtual processor 虛擬處理器
_st_thread_t *_st_this_thread;
_st_clist_t run_q, io_q, zombie_q, thread_q
_st_thread_t *idle_thread, *sleep_q
代碼分析
st庫(kù)自帶的example業(yè)務(wù)邏輯較為復(fù)雜,有興趣可以看下。
為了簡(jiǎn)化問(wèn)題,編寫(xiě)了測(cè)試代碼st-1.9/examples/st_epoll.c,依據(jù)此代碼提出問(wèn)題分析問(wèn)題。
st_init()做了什么?
_st_idle_thread_start()做了什么?
st_thread_create()做了什么?
st_thread_exit()做了什么?
st_usleep()做了什么?
主業(yè)務(wù)邏輯(無(wú)限循環(huán))協(xié)程是如何調(diào)度的?
監(jiān)聽(tīng)的文件描述符是如何調(diào)度的?
協(xié)程如何正常退出?
1 沒(méi)有設(shè)置終止條件變量(不可以被join)的協(xié)程直接return即可退出;
2 設(shè)置了終止條件變量(可以被join)的協(xié)程退出時(shí),先把自己加入到zombie_q中,然后通知等待的協(xié)程,等待的協(xié)程退出后,自己在退出。
協(xié)程的join(連接)是什么意思?
1 創(chuàng)建協(xié)程a的時(shí)候 st_thread_create(handle_cycle, NULL, 1, 0) 要設(shè)置為1, 表示該協(xié)程可以被join
2 協(xié)程b代碼里要掉用st_thread_join(thread, retvalp),表示我要join到協(xié)程a上
3 join的意思是 協(xié)程a和協(xié)程b 有一定關(guān)聯(lián)行,在協(xié)程退出時(shí),要先退出協(xié)程b 才能退出協(xié)程a
4 st中一個(gè)協(xié)程只能被另一個(gè)協(xié)程join,不能被多個(gè)協(xié)程join
5 可以被join的協(xié)程a,在沒(méi)有其他協(xié)程join時(shí),協(xié)程a無(wú)法正常退出
st里的mutex有什么用?
通常情況下st的多協(xié)程是不需要加鎖的,但是在有些情況下需要鎖來(lái)保證原子操作,下面會(huì)詳細(xì)說(shuō)明。
st_mutex_new(void); 創(chuàng)建鎖
st_mutex_destroy(st_mutex_t lock); 等待隊(duì)列必須為空才能銷毀鎖
st_mutex_lock(st_mutex_t lock); 第一次掉用能獲得鎖,以后掉用會(huì)加入鎖的等待隊(duì)列中(FIFO)
st_mutex_unlock(st_mutex_t lock); 釋放鎖并激活等待隊(duì)列的協(xié)程
st_mutex_trylock(st_mutex_t lock); 嘗試獲得鎖不會(huì)加入到等待隊(duì)列
st里的cond有什么用?
通常情況下st的多協(xié)程是不需要條件變量的,但是有些情況下需要條件變量來(lái)保證協(xié)程執(zhí)行的先后順序,比如:協(xié)程a要先于協(xié)程b執(zhí)行
st_cond_new(void); 創(chuàng)建條件變量
st_cond_destroy(st_cond_t cvar); 等待隊(duì)列必須為空才能銷毀條件變量
st_cond_timedwait(st_cond_t cvar, st_utime_t timeout); 限時(shí)等待條件變量,會(huì)加入條件變量的等待隊(duì)列中(FIFO),并加入到sleep_q隊(duì)列中(可能先于FIFO的順序被調(diào)度到)
st_cond_wait(st_cond_t cvar); 阻塞等待條件變量,會(huì)加入條件變量的等待隊(duì)列中(FIFO)
st_cond_signal(st_cond_t cvar); 喚醒阻塞在條件變量上的一個(gè)協(xié)程
st_cond_broadcast(st_cond_t cvar); 喚醒阻塞在條件變量上的全部協(xié)程
這個(gè)圖要配合測(cè)試代碼 st-1.9/examples/st_epoll.c
st中與調(diào)度有關(guān)的函數(shù)
st的setjmp
#define _ST_SWITCH_CONTEXT(_thread) \ 協(xié)程切換的兩個(gè)宏函數(shù)之一,停止當(dāng)前協(xié)程并運(yùn)行其他協(xié)程
ST_BEGIN_MACRO \
ST_SWITCH_OUT_CB(_thread); \ 協(xié)程切走時(shí)調(diào)用的函數(shù),一般不管用
if (!MD_SETJMP((_thread)->context)) \ 匯編語(yǔ)言實(shí)現(xiàn) 應(yīng)該跟setjmp()一樣 首次掉用返回0
{ \
_st_vp_schedule(); \ 核心調(diào)度函數(shù)
} \
ST_DEBUG_ITERATE_THREADS(); \
ST_SWITCH_IN_CB(_thread); \ 協(xié)程切回時(shí)調(diào)用的函數(shù),一般不管用
ST_END_MACRO
st的longjmp
#define _ST_RESTORE_CONTEXT(_thread) \ 協(xié)程切換的兩個(gè)宏函數(shù)之一,恢復(fù)線程運(yùn)行
ST_BEGIN_MACRO \
_ST_SET_CURRENT_THREAD(_thread); \ 設(shè)置全局變量 _st_this_thread = _thread
MD_LONGJMP((_thread)->context, 1); \ 匯編語(yǔ)言實(shí)現(xiàn) 應(yīng)該跟longjmp()一樣, 返回值永遠(yuǎn)為1
ST_END_MACRO
MD_SETJMP的時(shí)候,會(huì)使用匯編把所有寄存器的信息保留下來(lái),而MD_LONGJMP則會(huì)把所有的寄存器信息重新加載出來(lái)。兩者配合使用的時(shí)候,可以完成函數(shù)間的跳轉(zhuǎn)。
st的核心調(diào)度函數(shù)
void _st_vp_schedule(void)
{
_st_thread_t *thread;
printf("in _st_vp_schedule\n");
printf("_st_active_count = %d\n", _st_active_count);
if (_ST_RUNQ.next != &_ST_RUNQ)
{
printf("use runq\n");
/* Pull thread off of the run queue */
thread = _ST_THREAD_PTR(_ST_RUNQ.next);
_ST_DEL_RUNQ(thread);
}
else
{
printf("use idle\n");
/* If there are no threads to run, switch to the idle thread */
thread = _st_this_vp.idle_thread;
}
ST_ASSERT(thread->state == _ST_ST_RUNNABLE);
/* Resume the thread */
thread->state = _ST_ST_RUNNING;
_ST_RESTORE_CONTEXT(thread);
}
st輔助調(diào)度函數(shù)
void *_st_idle_thread_start(void *arg)
{
printf("i'm in _st_idle_thread_start()\n");
_st_thread_t *me = _ST_CURRENT_THREAD();
while (_st_active_count > 0)
{
/* Idle vp till I/O is ready or the smallest timeout expired */
printf("call _st_epoll_dispatch()\n");
_ST_VP_IDLE(); 處理io類事件
/* Check sleep queue for expired threads */
_st_vp_check_clock(); 處理延時(shí)類事件
me->state = _ST_ST_RUNNABLE;
_ST_SWITCH_CONTEXT(me); 從這里恢復(fù)運(yùn)行,然后判斷_st_active_count的值
}
/* No more threads */
exit(0); 整個(gè)程序退出
/* NOTREACHED */
return NULL;
}
會(huì)觸發(fā)協(xié)程切換的函數(shù)有哪些?
sched.c:86: _ST_SWITCH_CONTEXT(me); 59 int st_poll(struct pollfd *pds, int npds, st_utime_t timeout)
sched.c:234: _ST_SWITCH_CONTEXT(me); 221 void *_st_idle_thread_start(void *arg)
sched.c:261: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sched.c:276: _ST_SWITCH_CONTEXT(thread); 244 void st_thread_exit(void *retval)
sync.c:131: _ST_SWITCH_CONTEXT(me); 115 int st_usleep(st_utime_t usecs)
sync.c:198: _ST_SWITCH_CONTEXT(me); 180 int st_cond_timedwait(_st_cond_t *cvar, st_utime_t timeout)
sync.c:315: _ST_SWITCH_CONTEXT(me); 290 int st_mutex_lock(_st_mutex_t *lock)
sched.c:134: _ST_RESTORE_CONTEXT(thread); 115 void _st_vp_schedule(void)
st中的interrupt
顯示調(diào)用void st_thread_interrupt(_st_thread_t *thread)會(huì)對(duì)協(xié)程設(shè)置interrupt狀態(tài),interrupt狀態(tài)會(huì)中斷協(xié)程的本次運(yùn)行(可能是個(gè)循環(huán)任務(wù)),是否導(dǎo)致協(xié)程退出,要看協(xié)程內(nèi)部對(duì)interrupt返回值的處理。下面以st_usleep()函數(shù)為例進(jìn)行說(shuō)明。
[ykMac:st-1.9]# grep -nr "_ST_FL_INTERRUPT" *
common.h:311:#define _ST_FL_INTERRUPT 0x08 interrupt的宏定義
sched.c:68: if (me->flags & _ST_FL_INTERRUPT) 59 int st_poll() ,調(diào)用函數(shù)時(shí),判斷是否設(shè)置interrupt
sched.c:70: me->flags &= ~_ST_FL_INTERRUPT; 如果設(shè)置就退出,退出前對(duì)interrupt取反
sched.c:107: if (me->flags & _ST_FL_INTERRUPT) 59 int st_poll(),變?yōu)檫\(yùn)行協(xié)程時(shí),判斷是否設(shè)置interrupt
sched.c:109: me->flags &= ~_ST_FL_INTERRUPT; 如果設(shè)置就退出,退出前對(duì)interrupt取反
sched.c:551: thread->flags |= _ST_FL_INTERRUPT; 在545 void st_thread_interrupt()中設(shè)置為interrupt
sync.c:119: if (me->flags & _ST_FL_INTERRUPT) { 115 int st_usleep(st_utime_t usecs),調(diào)用函數(shù)時(shí)
sync.c:120: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:133: if (me->flags & _ST_FL_INTERRUPT) { 115 int st_usleep(st_utime_t usecs),變?yōu)檫\(yùn)行協(xié)程時(shí)
sync.c:134: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:185: if (me->flags & _ST_FL_INTERRUPT) { 180 int st_cond_timedwait(),調(diào)用函數(shù)時(shí)
sync.c:186: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:208: if (me->flags & _ST_FL_INTERRUPT) { 180 int st_cond_timedwait(),變?yōu)檫\(yùn)行協(xié)程時(shí)
sync.c:209: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:294: if (me->flags & _ST_FL_INTERRUPT) { 290 int st_mutex_lock(),調(diào)用函數(shù)時(shí)
sync.c:295: me->flags &= ~_ST_FL_INTERRUPT;
sync.c:319: if ((me->flags & _ST_FL_INTERRUPT) && lock->owner != me) { 290 int st_mutex_lock(),變運(yùn)行時(shí)
sync.c:320: me->flags &= ~_ST_FL_INTERRUPT;
115 int st_usleep(st_utime_t usecs)
116 {
117 _st_thread_t *me = _ST_CURRENT_THREAD();
118
119 if (me->flags & _ST_FL_INTERRUPT) {
120 me->flags &= ~_ST_FL_INTERRUPT; 退出前對(duì)interrupt取反
121 errno = EINTR;
122 return -1; 如果不對(duì)errno或返回值做處理,循環(huán)還是會(huì)繼續(xù)的
123 }
124
125 if (usecs != ST_UTIME_NO_TIMEOUT) {
126 me->state = _ST_ST_SLEEPING;
127 _ST_ADD_SLEEPQ(me, usecs);
128 } else
129 me->state = _ST_ST_SUSPENDED;
130
131 _ST_SWITCH_CONTEXT(me);
132
133 if (me->flags & _ST_FL_INTERRUPT) {
134 me->flags &= ~_ST_FL_INTERRUPT;
135 errno = EINTR;
136 return -1;
137 }
138
139 return 0;
140 }
st的優(yōu)缺點(diǎn)
優(yōu)點(diǎn):
1 用戶空間實(shí)現(xiàn)協(xié)程調(diào)度,降低了用戶空間和內(nèi)核空間的切換,一定程度上提高了程序效率。
2 由于是在單核上的單線程多協(xié)程,同一時(shí)間只會(huì)有一個(gè)協(xié)程在運(yùn)行,所以對(duì)于全局變量也不需要做協(xié)程同步。
共享資源釋放函數(shù)只需做到可重入就行,所謂的可重入就是釋放之前先判斷是否為空值,釋放后要賦空值。
3 協(xié)程使用完,直接return即可,st會(huì)回收協(xié)程資源并做協(xié)程切換。
4 可以通過(guò)向run_q鏈表頭部加入?yún)f(xié)程,來(lái)實(shí)現(xiàn)優(yōu)先調(diào)度。
5 st支持多個(gè)操作系統(tǒng),比如 AIX,CYGWIN,DARWIN,FREEBSD,HPUX,IRIX,LINUX,NETBSD,OPENBSD,SOLARIS
缺點(diǎn):
1 所有I/O操作必須使用st提供的API,只有這樣協(xié)程才能被調(diào)度器管理。
2 所有協(xié)程里不能使用sleep(),sleep()會(huì)造成整個(gè)線程sleep。
3 被調(diào)度到的協(xié)程不會(huì)被限制運(yùn)行時(shí)長(zhǎng),如果有協(xié)程是cpu密集型或死循環(huán),就會(huì)嚴(yán)重阻礙其他協(xié)程運(yùn)行。
4 單進(jìn)程單線程,只能使用單核,想要通過(guò)多個(gè)cpu提高并發(fā)能力,只能開(kāi)多個(gè)程序(進(jìn)程),多進(jìn)程通信較麻煩。
補(bǔ)充知識(shí)點(diǎn)
1 線程為什么要同步?
線程由內(nèi)核自動(dòng)調(diào)度
同一個(gè)進(jìn)程上的線程共享該進(jìn)程的整個(gè)虛擬地址空間
同一個(gè)進(jìn)程上的線程代碼區(qū)是共享的,即不同的線程可以執(zhí)行同樣的函數(shù)
所以在并發(fā)環(huán)境中,多個(gè)線程同時(shí)對(duì)同一個(gè)內(nèi)存地址進(jìn)行寫(xiě)入,由于CPU寄存器時(shí)間調(diào)度上的問(wèn)題,寫(xiě)入數(shù)據(jù)會(huì)被多次的覆蓋,會(huì)造成共享數(shù)據(jù)損壞,所以就要使線程同步。
2 什么情況下需要線程同步?
線程同步指的是 不同時(shí)發(fā)生,就是線程要排隊(duì)
1 多核,單進(jìn)程多線程,不同線程會(huì)對(duì)全局變量讀寫(xiě),這種情況才需要對(duì)線程做同步控制
2 單核,單進(jìn)程多線程,不同線程會(huì)對(duì)全局變量讀寫(xiě),這種情況不需要對(duì)線程做同步控制
3 多核,單進(jìn)程多線程,不同線程不對(duì)全局變量讀寫(xiě),這種情況不需要對(duì)線程做同步控制
4 多核,單進(jìn)程多線程,不同線程會(huì)對(duì)全局變量讀,這種情況不需要對(duì)線程做同步控制
問(wèn)題:對(duì)于第2條,這個(gè)應(yīng)該是有點(diǎn)兒片面,有依賴有優(yōu)先級(jí)搶占也是要同步,除非像abcde這種幾個(gè)線程干一摸一樣的事情,而且項(xiàng)目之間不依賴。
有依賴 可以理解為 生產(chǎn)消費(fèi)關(guān)系,雖然是 單核 單進(jìn)程 多線程 但是同一時(shí)間只能有一個(gè)線程在運(yùn)行,也就是說(shuō) 生產(chǎn)和消費(fèi)不會(huì)同時(shí)發(fā)生,同樣多個(gè)生產(chǎn)也不會(huì)同時(shí)發(fā)生,所以不需要鎖。
線程是有優(yōu)先級(jí)控制,但是不管怎么控制,只要保證同一時(shí)間只能有一個(gè)線程在運(yùn)行,就不需要鎖了。
問(wèn)題:對(duì)于第2條,這個(gè)應(yīng)該是有點(diǎn)兒片面,有原子操作且原子操作過(guò)程中有線程切換,這種是需要鎖的。
比如,線程a 第一次讀取全局變量x并做處理,然后發(fā)生線程切換(線程由內(nèi)和自動(dòng)調(diào)度)后切回,然后第二次讀取全局變量x并做處理,我們想確保兩次讀取
x值相同,但是發(fā)生了線程切換x值可能被改變。
如何 確保 第一次讀取并處理和第二次讀取并處理是原子操作呢? 使用st_mutex_t
3 accept()序列化
亦稱驚群效應(yīng),亦亦稱Zeeg難題
https://uwsgi-docs-zh.readthedocs.io/zh_CN/latest/articles/SerializingAccept.html
在多次fork自己之后,每個(gè)進(jìn)程一般將會(huì)開(kāi)始阻塞在 accept() 上
每當(dāng)socket上嘗試進(jìn)行一個(gè)連接,阻塞在 accept() 上的每個(gè)進(jìn)程的 accept() 都會(huì)被喚醒。
只有其中一個(gè)進(jìn)程能夠真正接收到這個(gè)連接,而剩余的進(jìn)程將會(huì)獲得一個(gè)無(wú)聊的 EAGAIN 這導(dǎo)致了大量的CPU周期浪費(fèi),實(shí)際解決方法是把一個(gè)鎖放在 accept() 調(diào)用之前,來(lái)序列化它的使用
4 Internet Applications網(wǎng)絡(luò)程序架構(gòu)
多進(jìn)程架構(gòu) Multi-Process
一個(gè)進(jìn)程服務(wù)一個(gè)連接,要解決數(shù)據(jù)共享問(wèn)題
單進(jìn)程多線程架構(gòu) Multi-Threaded
一個(gè)線程服務(wù)一個(gè)連接,要解決數(shù)據(jù)同步問(wèn)題
事件驅(qū)動(dòng)的狀態(tài)機(jī)架構(gòu) Event-Driven State Machine
事件觸發(fā)回調(diào)函數(shù)(缺點(diǎn)是嵌套) 或 用戶空間實(shí)現(xiàn)協(xié)程調(diào)度
實(shí)際上 EDSM架構(gòu) 用很復(fù)雜的方式模擬了多線程
st提供的就是EDSM機(jī)制,它在用戶空間實(shí)現(xiàn)協(xié)程調(diào)度
到此,相信大家對(duì)“srs使用的開(kāi)源c語(yǔ)言網(wǎng)絡(luò)協(xié)程庫(kù)state thread源碼是什么”有了更深的了解,不妨來(lái)實(shí)際操作一番吧!這里是億速云網(wǎng)站,更多相關(guān)內(nèi)容可以進(jìn)入相關(guān)頻道進(jìn)行查詢,關(guān)注我們,繼續(xù)學(xué)習(xí)!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。