您好,登錄后才能下訂單哦!
tornado中的協(xié)程是如何工作的
Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 —— [ 維基百科 ]
我們在平常編程中,更習(xí)慣使用的是子例程(subroutine),通俗的叫法是函數(shù),或者過程。子例程,往往只有一個入口(函數(shù)調(diào)用,實參通過傳給形參開始執(zhí)行),一個出口(函數(shù)return,執(zhí)行完畢,或者引發(fā)異常,將控制權(quán)轉(zhuǎn)移給調(diào)用者)。但協(xié)程是子例程基礎(chǔ)上,一種更加寬泛定義的計算機程序模塊(子例程可以看做協(xié)程的特例),它可以有多個入口點,允許從一個入口點,執(zhí)行到下一個入口點之前暫停,保存執(zhí)行狀態(tài),等到合適的時機恢復(fù)執(zhí)行狀態(tài),從下一個入口點重新開始執(zhí)行,這也是協(xié)程應(yīng)該具有的能力。
定義
協(xié)程代碼塊
一個入口點和下一個入口點(或者退出點)中的代碼。
協(xié)程模塊
由n個入口點代碼,和n個協(xié)程代碼塊組成。第一個入口點通常是一個函 數(shù)入口點。其組織形式如:函數(shù)入口點->協(xié)程代碼塊->入口點->協(xié)程代碼塊…,入口點和代碼塊相間。
線性模塊
一個同步函數(shù)的函數(shù)體是線性執(zhí)行的。也就是說一個模塊中的每一行代碼,相繼執(zhí)行,一個模塊在執(zhí)行中,如果還沒有執(zhí)行完畢,不會去執(zhí)行其他模塊的代碼。稱這樣的代碼模塊為線性模塊。
一個協(xié)程模塊,如果只含有單一入口點和單一協(xié)程代碼塊(假設(shè)這個協(xié)程代碼塊全是同步代碼),當(dāng)然這個協(xié)程模塊是一個線性執(zhí)行模塊,但是如果含有多個入口點和多個協(xié)程代碼塊,那么就不是一個線性模塊。那么執(zhí)行一個協(xié)程模塊過程實際是分散的(不同的時間段,執(zhí)行不同的協(xié)程代碼塊,協(xié)程代碼塊的執(zhí)行時間段,彼此不相交),但也是順序的(后一個協(xié)程代碼塊在前一個協(xié)程代碼塊執(zhí)行結(jié)束后才執(zhí)行)。兩個屬于同一協(xié)程模塊的相繼協(xié)程代碼塊執(zhí)行的中間時間間隙,可能有很多其他協(xié)程模塊的協(xié)程代碼片段在執(zhí)行。
談到協(xié)程,必須要說說python語義中的生成器(generator)。
在pep255中提到了”simple generator”和”yield語句”(此時還不是”yield表達(dá)式”)的實現(xiàn)。一個basic idea,提供一種函數(shù),能夠返回中間結(jié)果給調(diào)用者,然后維護(hù)函數(shù)的局部狀態(tài),以便函數(shù)當(dāng)離開后,也能恢復(fù)執(zhí)行。
prep255中舉了一個簡單的例子,生成斐波那契數(shù)列:
def fib(): a, b = 0, 1 while 1: yield b a, b = b, a+b
a,b初始化為0,1。當(dāng)yield b被執(zhí)行,1被返回給調(diào)用者。當(dāng)fib恢復(fù)執(zhí)行,a,變成了1,b也是1,然后將1返回給調(diào)用者,如此循環(huán)。generator是一種非常自然的編程方式,因為對于fib來說,它的功能不變,都還是不斷生成下一個斐波那契數(shù)。而對于fib的調(diào)用者來說,fib像一個列表的迭代器,不斷迭代,可以獲取下一個斐波那契數(shù)。
def caller(): for num in fib(): print num
生成器是一個含有yield表達(dá)式的函數(shù),此時該函數(shù)叫生成器。一個生成器永遠(yuǎn)是異步的,即使生成器模塊中含有阻塞代碼。因為調(diào)用一個生成器,生成器的參數(shù)會綁定到生成器,結(jié)果返回是一個生成器對象,它的類型是types.GeneratorType,不會去執(zhí)行生成器主模塊中的代碼。
每次調(diào)用一個GeneratorType對象的next方法,生成器函數(shù)執(zhí)行到下一個yield語句或者,或者碰到一個return語句,或者執(zhí)行到生成器函數(shù)結(jié)束。
在pep342中,對Generator進(jìn)一步加強,增加了GeneratorType的send方法,和yield表達(dá)式語義。yield表達(dá)式,可以作為等號右邊的表達(dá)式。如果對Generator調(diào)用send(None)方法,生成器函數(shù)會從開始一直執(zhí)行到y(tǒng)ield表達(dá)式。那么下一次對Generator調(diào)用send(argument),Generator恢復(fù)執(zhí)行。那么可以在生成器函數(shù)體內(nèi)獲得這個argument,這個argument將會作為yield表達(dá)式的返回值。
從上面可以看到,Generator已經(jīng)具備協(xié)程的一些能力。如:能夠暫停執(zhí)行,保存狀態(tài);能夠恢復(fù)執(zhí)行;能夠異步執(zhí)行。
但是此時Generator還不是一個協(xié)程。一個真正的協(xié)程能夠控制代碼什么時候繼續(xù)執(zhí)行。而一個Generator執(zhí)行遇到一個yield表達(dá)式 或者語句,會將執(zhí)行控制權(quán)轉(zhuǎn)移給調(diào)用者。
However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 —— [ 維基百科 ]
在維基百科中提到,可以實現(xiàn)一個頂級的調(diào)度子例程,將執(zhí)行控制權(quán)轉(zhuǎn)移回Generator,從而讓它繼續(xù)執(zhí)行。在tornado中,ioLoop就是這樣的頂級調(diào)度子例程,每個協(xié)程模塊通過,函數(shù)裝飾器coroutine和ioLoop進(jìn)行通信,從而ioLoop可以在協(xié)程模塊執(zhí)行暫停后,在合適的時機重新調(diào)度協(xié)程模塊執(zhí)行。
不過,接下來還不能介紹coroutine和ioLoop,在介紹這兩者之前,先得明白tornado中在協(xié)程環(huán)境中一個非常重要的類Future.
Future類位于tornado源碼的concurrent模塊中。Future類的完整代碼,請查看tornado的源碼。在這里截取一部分代碼作為分析之用
class Future(object): def done(self): return self._done def result(self, timeout=None): self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception('exception calling callback %r for %r', cb, self) self._callbacks = None
Future類重要成員函數(shù):
def done(self):
Future的_result成員是否被設(shè)置
def result(self, timeout=None):
獲取Future對象的結(jié)果
def add_done_callback(self, fn):
添加一個回調(diào)函數(shù)fn給Future對象。如果這個Future對象已經(jīng)done,則直接執(zhí)行fn,否則將fn加入到Future類的一個成員列表中保存。
def _set_done(self):
一個內(nèi)部函數(shù),主要是遍歷列表,逐個調(diào)用列表中的callback函數(shù),也就是前面add_done_calback加如來的。
def set_result(self, result):
給Future對象設(shè)置result,并且調(diào)用_set_done。也就是說,當(dāng)Future對象獲得result后,所有add_done_callback加入的回調(diào)函數(shù)就會執(zhí)行。
Future封裝了異步操作的結(jié)果。實際是它類似于在網(wǎng)頁html前端中,圖片異步加載的占位符,但加載后最終也是一個完整的圖片。Future也是同樣用處,tornado使用它,最終希望它被set_result,并且調(diào)用一些回調(diào)函數(shù)。Future對象實際是coroutine函數(shù)裝飾器和IOLoop的溝通使者,有著非常重要的作用。
tornado框架的底層核心類,位于tornado的ioloop模塊。功能方面類似win32窗口的消息循環(huán)。每個窗口可以綁定一個窗口過程。窗口過程主要是一個消息循環(huán)在執(zhí)行。消息循環(huán)主要任務(wù)是利用PeekMessage系統(tǒng)調(diào)用,從消息隊列中取出各種類型的消息,判斷消息的類型,然后交給特定的消息handler進(jìn)行執(zhí)行。
tornado中的IOLoop與此相比具有很大的相似性,在協(xié)程運行環(huán)境中擔(dān)任著協(xié)程調(diào)度器的角色, 和win32的消息循環(huán)本質(zhì)上都是一種事件循環(huán),等待事件,然后運行對應(yīng)的事件處理器(handler)。不過IOLoop主要調(diào)度處理的是IO事件(如讀,寫,錯誤)。除此之外,還能調(diào)度callback和timeout事件。
在本博文中,我們暫時只關(guān)注callback事件,因為這個與協(xié)程調(diào)度的相關(guān)性最大。
def add_future(self, future, callback): assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
add_future函數(shù)在基類IOLoop中實現(xiàn),函數(shù)參數(shù)是一個Future對象和一個callback函數(shù)。當(dāng)Future對象被set_result,執(zhí)行一個回調(diào)函數(shù),是個lambda函數(shù),在lambda函數(shù)中調(diào)用IOLoop的add_callback函數(shù)。將add_future的參數(shù)callback加入到IOLoop的統(tǒng)一調(diào)度中,讓callback在IOLoop下一次迭代中執(zhí)行。
def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: self._waker.wake()
add_callback函數(shù)主要在IOLoop的子類PollIOLoop中實現(xiàn)。也很容易理解。
將傳入的callback函數(shù),利用偏函數(shù)進(jìn)行包裝,將所有callback真正運行時需要的參數(shù),都綁定到生成的偏函數(shù)中,實際上就是找個地方把callback運行時需要的參數(shù)保存起來。將包裝好的偏函數(shù)加入到回調(diào)函數(shù)列表。當(dāng)IOLoop下一次迭代運行的時候,遍歷callback函數(shù)列表,運行偏函數(shù)的時候,就不再需要傳入?yún)?shù)執(zhí)行,效果等同于用實參運行callback。
IOLoop對象調(diào)用start函數(shù),會運行event loop。在event loop中,首先遍歷callback列表,執(zhí)行回調(diào)函數(shù),然后遍歷timeout列表,執(zhí)行timeoutCallback。最后才執(zhí)行ioHandler。
函數(shù)裝飾器本質(zhì)是一個函數(shù),我們稱這個函數(shù)為裝飾器函數(shù)。裝飾器函數(shù)簽名含有一個 函數(shù)對象(可調(diào)用對象callable)參數(shù),返回的結(jié)果是一個裝飾器內(nèi)部定義的一個新函數(shù)對象。如果返回的函數(shù)對象被調(diào)用,裝飾器函數(shù)的參數(shù)(函數(shù)對象)也會被調(diào)用。不過,會在這個參數(shù)(裝飾器函數(shù)參數(shù))調(diào)用前做一些事情,或者在這個參數(shù)調(diào)用后做一些事情。實際上做的這些事情,就是利用內(nèi)部自定義的函數(shù)對象對參數(shù)(原函數(shù))的一些裝飾(額外操作)
當(dāng)一個函數(shù)被裝飾器裝飾。那么以后調(diào)用這個函數(shù)(此函數(shù)已經(jīng)非彼函數(shù))的時候,實際上調(diào)用的是裝飾器函數(shù)返回的內(nèi)部函數(shù)對象。理解tornado中coroutine修飾的函數(shù)如何執(zhí)行,主要是 理解coroutine這個裝飾器函數(shù)內(nèi)部定義的新函數(shù)對象所做的那些事兒。
def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True)
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: try: value = future.result() except Exception: self.had_exception = True yielded = self.gen.throw(*sys.exc_info()) else: yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None return if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): try: self.future = convert_yielded(yielded) except BadYieldError: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True
以上的代碼其實都對源碼進(jìn)行了一些調(diào)整。但函數(shù)調(diào)用進(jìn)入到Runner的構(gòu)造函數(shù)的時候,也就是說Generator的第一次執(zhí)行已經(jīng)完畢。那么接下來,調(diào)用的是,handle_yield,對第一次Generator執(zhí)行的返回結(jié)果進(jìn)行處理。當(dāng)然返回的結(jié)果可能是多種類型??赡苁且粋€Future對象,list,dict,或者其他類型對象,或者普通類型。通過convert_yield,self.future保存的是一個Future對象的引用(第一次Generator執(zhí)行返回的結(jié)果)。此時如果self.future還沒被set_result。對為self.future綁定一個done_callback(lambda f: self.run()),加入到self.io_loop中。
在前文說到。ioloop的add_future函數(shù)中,實際上是只有當(dāng)參數(shù)future,在某個地方調(diào)用了set_result, 才在執(zhí)行done_callback時,將參數(shù)callback加入到IOLoop中調(diào)度。換句話說。Runner類中,self.run要等到self.future在某個代碼塊被set_result,IOLoop才有可能在下一次迭代的時候執(zhí)行它,從而調(diào)度協(xié)程繼續(xù)恢復(fù)執(zhí)行。而在self.run函數(shù)中,我們可以看到將會通過Generator的send函數(shù),恢復(fù)執(zhí)行下一個協(xié)程代碼塊。所以關(guān)鍵的問題是我們需要明白Runner類中self.future,在什么時候被set_result?
從這里我們可以看到Future類的重要作用。future.set_result起到的作用是:
發(fā)送一個信號,告訴IOLoop去調(diào)度暫停的協(xié)程繼續(xù)執(zhí)行。
我們結(jié)合下面的代碼例子就可以明白協(xié)程調(diào)度的整個流程是如何進(jìn)行的了。
import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future @coroutine def asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) tornado.ioloop.IOLoop.instance().add_callback(callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result)) def main(): asyn_sum(2,3) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
實際的運行場景是:一個協(xié)程(asyn_sum)遇到y(tǒng)ield表達(dá)式被暫停執(zhí)行后,IOLoop調(diào)用另外一個代碼段(asyn_sum中的回調(diào)函數(shù)callback)執(zhí)行,而在callback中,剛好可以訪問到屬于被暫停協(xié)程(asyn_sum)中的future對象(也就是Runner對象中的self.future的引用),callback中將future調(diào)用set_result,那么這個暫停的協(xié)程(asyn_sum)在IOLoop下一次迭代調(diào)度回調(diào)函數(shù)時中,被恢復(fù)執(zhí)行。
tornado中的協(xié)程實現(xiàn)基于python語言的Generator并且結(jié)合一個全局的調(diào)度器IOLoop,Generator通過函數(shù)裝飾器coroutine和IOLoop進(jìn)行通信。IOLoop并沒有直接控制能力,調(diào)度恢復(fù)被暫停的協(xié)程繼續(xù)執(zhí)行。future對象在協(xié)程中被yield。協(xié)程暫停,IOLoop調(diào)度另外一個代碼模塊執(zhí)行,而在這個執(zhí)行的代碼模塊中剛好,可以訪問這個future對象,將其set_result,結(jié)果通過IOLoop間接恢復(fù)暫停協(xié)程執(zhí)行。不同執(zhí)行代碼模塊中,共享future對象,彼此合作,協(xié)程調(diào)度得順利執(zhí)行。
從這種意義上來說,future對象,像window中的Event內(nèi)核對象的作用。window中的event用于線程中同步。而協(xié)程中的yield future相當(dāng)于WaitForSingleObject(event_object), 而future.set_result(result)。相當(dāng)于SetEvent(event_object)。而future和Event的不同點在于,協(xié)程借future來恢復(fù)執(zhí)行,而線程借Event來進(jìn)行線程間同步。
以上就是本文關(guān)于詳細(xì)解讀tornado協(xié)程(coroutine)原理的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。