溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

詳細(xì)解讀tornado協(xié)程(coroutine)原理

發(fā)布時間:2020-09-29 03:27:28 來源:腳本之家 閱讀:225 作者:wyx819 欄目:開發(fā)技術(shù)

tornado中的協(xié)程是如何工作的

協(xié)程定義

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 —— [ 維基百科 ]

我們在平常編程中,更習(xí)慣使用的是子例程(subroutine),通俗的叫法是函數(shù),或者過程。子例程,往往只有一個入口(函數(shù)調(diào)用,實參通過傳給形參開始執(zhí)行),一個出口(函數(shù)return,執(zhí)行完畢,或者引發(fā)異常,將控制權(quán)轉(zhuǎn)移給調(diào)用者)。但協(xié)程是子例程基礎(chǔ)上,一種更加寬泛定義的計算機程序模塊(子例程可以看做協(xié)程的特例),它可以有多個入口點,允許從一個入口點,執(zhí)行到下一個入口點之前暫停,保存執(zhí)行狀態(tài),等到合適的時機恢復(fù)執(zhí)行狀態(tài),從下一個入口點重新開始執(zhí)行,這也是協(xié)程應(yīng)該具有的能力。

定義

協(xié)程代碼塊

一個入口點和下一個入口點(或者退出點)中的代碼。

協(xié)程模塊

由n個入口點代碼,和n個協(xié)程代碼塊組成。第一個入口點通常是一個函 數(shù)入口點。其組織形式如:函數(shù)入口點->協(xié)程代碼塊->入口點->協(xié)程代碼塊…,入口點和代碼塊相間。

線性模塊

一個同步函數(shù)的函數(shù)體是線性執(zhí)行的。也就是說一個模塊中的每一行代碼,相繼執(zhí)行,一個模塊在執(zhí)行中,如果還沒有執(zhí)行完畢,不會去執(zhí)行其他模塊的代碼。稱這樣的代碼模塊為線性模塊。

一個協(xié)程模塊,如果只含有單一入口點和單一協(xié)程代碼塊(假設(shè)這個協(xié)程代碼塊全是同步代碼),當(dāng)然這個協(xié)程模塊是一個線性執(zhí)行模塊,但是如果含有多個入口點和多個協(xié)程代碼塊,那么就不是一個線性模塊。那么執(zhí)行一個協(xié)程模塊過程實際是分散的(不同的時間段,執(zhí)行不同的協(xié)程代碼塊,協(xié)程代碼塊的執(zhí)行時間段,彼此不相交),但也是順序的(后一個協(xié)程代碼塊在前一個協(xié)程代碼塊執(zhí)行結(jié)束后才執(zhí)行)。兩個屬于同一協(xié)程模塊的相繼協(xié)程代碼塊執(zhí)行的中間時間間隙,可能有很多其他協(xié)程模塊的協(xié)程代碼片段在執(zhí)行。

生成器和yield語義

談到協(xié)程,必須要說說python語義中的生成器(generator)。

在pep255中提到了”simple generator”和”yield語句”(此時還不是”yield表達(dá)式”)的實現(xiàn)。一個basic idea,提供一種函數(shù),能夠返回中間結(jié)果給調(diào)用者,然后維護(hù)函數(shù)的局部狀態(tài),以便函數(shù)當(dāng)離開后,也能恢復(fù)執(zhí)行。

prep255中舉了一個簡單的例子,生成斐波那契數(shù)列:

 def fib():
   a, b = 0, 1
   while 1:
    yield b
    a, b = b, a+b

a,b初始化為0,1。當(dāng)yield b被執(zhí)行,1被返回給調(diào)用者。當(dāng)fib恢復(fù)執(zhí)行,a,變成了1,b也是1,然后將1返回給調(diào)用者,如此循環(huán)。generator是一種非常自然的編程方式,因為對于fib來說,它的功能不變,都還是不斷生成下一個斐波那契數(shù)。而對于fib的調(diào)用者來說,fib像一個列表的迭代器,不斷迭代,可以獲取下一個斐波那契數(shù)。

def caller():
  for num in fib():
    print num

生成器是一個含有yield表達(dá)式的函數(shù),此時該函數(shù)叫生成器。一個生成器永遠(yuǎn)是異步的,即使生成器模塊中含有阻塞代碼。因為調(diào)用一個生成器,生成器的參數(shù)會綁定到生成器,結(jié)果返回是一個生成器對象,它的類型是types.GeneratorType,不會去執(zhí)行生成器主模塊中的代碼。

每次調(diào)用一個GeneratorType對象的next方法,生成器函數(shù)執(zhí)行到下一個yield語句或者,或者碰到一個return語句,或者執(zhí)行到生成器函數(shù)結(jié)束。

在pep342中,對Generator進(jìn)一步加強,增加了GeneratorType的send方法,和yield表達(dá)式語義。yield表達(dá)式,可以作為等號右邊的表達(dá)式。如果對Generator調(diào)用send(None)方法,生成器函數(shù)會從開始一直執(zhí)行到y(tǒng)ield表達(dá)式。那么下一次對Generator調(diào)用send(argument),Generator恢復(fù)執(zhí)行。那么可以在生成器函數(shù)體內(nèi)獲得這個argument,這個argument將會作為yield表達(dá)式的返回值。

從上面可以看到,Generator已經(jīng)具備協(xié)程的一些能力。如:能夠暫停執(zhí)行,保存狀態(tài);能夠恢復(fù)執(zhí)行;能夠異步執(zhí)行。

但是此時Generator還不是一個協(xié)程。一個真正的協(xié)程能夠控制代碼什么時候繼續(xù)執(zhí)行。而一個Generator執(zhí)行遇到一個yield表達(dá)式 或者語句,會將執(zhí)行控制權(quán)轉(zhuǎn)移給調(diào)用者。

However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 —— [ 維基百科 ]

在維基百科中提到,可以實現(xiàn)一個頂級的調(diào)度子例程,將執(zhí)行控制權(quán)轉(zhuǎn)移回Generator,從而讓它繼續(xù)執(zhí)行。在tornado中,ioLoop就是這樣的頂級調(diào)度子例程,每個協(xié)程模塊通過,函數(shù)裝飾器coroutine和ioLoop進(jìn)行通信,從而ioLoop可以在協(xié)程模塊執(zhí)行暫停后,在合適的時機重新調(diào)度協(xié)程模塊執(zhí)行。

不過,接下來還不能介紹coroutine和ioLoop,在介紹這兩者之前,先得明白tornado中在協(xié)程環(huán)境中一個非常重要的類Future.

Future類

Future類位于tornado源碼的concurrent模塊中。Future類的完整代碼,請查看tornado的源碼。在這里截取一部分代碼作為分析之用

class Future(object):
  def done(self):
    return self._done

  def result(self, timeout=None):
    self._clear_tb_log()
    if self._result is not None:
      return self._result
    if self._exc_info is not None:
      raise_exc_info(self._exc_info)
    self._check_done()
    return self._result

  def add_done_callback(self, fn):
    if self._done:
      fn(self)
    else:
      self._callbacks.append(fn)

  def set_result(self, result):
    self._result = result
    self._set_done()

  def _set_done(self):
    self._done = True
    for cb in self._callbacks:
      try:
        cb(self)
      except Exception:
        app_log.exception('exception calling callback %r for %r',
                 cb, self)
    self._callbacks = None

Future類重要成員函數(shù):

def done(self):

Future的_result成員是否被設(shè)置

def result(self, timeout=None):

獲取Future對象的結(jié)果

def add_done_callback(self, fn):

添加一個回調(diào)函數(shù)fn給Future對象。如果這個Future對象已經(jīng)done,則直接執(zhí)行fn,否則將fn加入到Future類的一個成員列表中保存。

def _set_done(self):

一個內(nèi)部函數(shù),主要是遍歷列表,逐個調(diào)用列表中的callback函數(shù),也就是前面add_done_calback加如來的。

def set_result(self, result):

給Future對象設(shè)置result,并且調(diào)用_set_done。也就是說,當(dāng)Future對象獲得result后,所有add_done_callback加入的回調(diào)函數(shù)就會執(zhí)行。

Future封裝了異步操作的結(jié)果。實際是它類似于在網(wǎng)頁html前端中,圖片異步加載的占位符,但加載后最終也是一個完整的圖片。Future也是同樣用處,tornado使用它,最終希望它被set_result,并且調(diào)用一些回調(diào)函數(shù)。Future對象實際是coroutine函數(shù)裝飾器和IOLoop的溝通使者,有著非常重要的作用。

IOLoop類

tornado框架的底層核心類,位于tornado的ioloop模塊。功能方面類似win32窗口的消息循環(huán)。每個窗口可以綁定一個窗口過程。窗口過程主要是一個消息循環(huán)在執(zhí)行。消息循環(huán)主要任務(wù)是利用PeekMessage系統(tǒng)調(diào)用,從消息隊列中取出各種類型的消息,判斷消息的類型,然后交給特定的消息handler進(jìn)行執(zhí)行。

tornado中的IOLoop與此相比具有很大的相似性,在協(xié)程運行環(huán)境中擔(dān)任著協(xié)程調(diào)度器的角色, 和win32的消息循環(huán)本質(zhì)上都是一種事件循環(huán),等待事件,然后運行對應(yīng)的事件處理器(handler)。不過IOLoop主要調(diào)度處理的是IO事件(如讀,寫,錯誤)。除此之外,還能調(diào)度callback和timeout事件。

在本博文中,我們暫時只關(guān)注callback事件,因為這個與協(xié)程調(diào)度的相關(guān)性最大。

def add_future(self, future, callback):
  assert is_future(future)
  callback = stack_context.wrap(callback)
  future.add_done_callback(
    lambda future: self.add_callback(callback, future))

add_future函數(shù)在基類IOLoop中實現(xiàn),函數(shù)參數(shù)是一個Future對象和一個callback函數(shù)。當(dāng)Future對象被set_result,執(zhí)行一個回調(diào)函數(shù),是個lambda函數(shù),在lambda函數(shù)中調(diào)用IOLoop的add_callback函數(shù)。將add_future的參數(shù)callback加入到IOLoop的統(tǒng)一調(diào)度中,讓callback在IOLoop下一次迭代中執(zhí)行。

def add_callback(self, callback, *args, **kwargs):
  with self._callback_lock:
    if self._closing:
      raise RuntimeError("IOLoop is closing")
    list_empty = not self._callbacks
    self._callbacks.append(functools.partial(
      stack_context.wrap(callback), *args, **kwargs))
    if list_empty and thread.get_ident() != self._thread_ident:
      self._waker.wake()

add_callback函數(shù)主要在IOLoop的子類PollIOLoop中實現(xiàn)。也很容易理解。

將傳入的callback函數(shù),利用偏函數(shù)進(jìn)行包裝,將所有callback真正運行時需要的參數(shù),都綁定到生成的偏函數(shù)中,實際上就是找個地方把callback運行時需要的參數(shù)保存起來。將包裝好的偏函數(shù)加入到回調(diào)函數(shù)列表。當(dāng)IOLoop下一次迭代運行的時候,遍歷callback函數(shù)列表,運行偏函數(shù)的時候,就不再需要傳入?yún)?shù)執(zhí)行,效果等同于用實參運行callback。

IOLoop對象調(diào)用start函數(shù),會運行event loop。在event loop中,首先遍歷callback列表,執(zhí)行回調(diào)函數(shù),然后遍歷timeout列表,執(zhí)行timeoutCallback。最后才執(zhí)行ioHandler。

coroutine函數(shù)裝飾器

函數(shù)裝飾器本質(zhì)是一個函數(shù),我們稱這個函數(shù)為裝飾器函數(shù)。裝飾器函數(shù)簽名含有一個 函數(shù)對象(可調(diào)用對象callable)參數(shù),返回的結(jié)果是一個裝飾器內(nèi)部定義的一個新函數(shù)對象。如果返回的函數(shù)對象被調(diào)用,裝飾器函數(shù)的參數(shù)(函數(shù)對象)也會被調(diào)用。不過,會在這個參數(shù)(裝飾器函數(shù)參數(shù))調(diào)用前做一些事情,或者在這個參數(shù)調(diào)用后做一些事情。實際上做的這些事情,就是利用內(nèi)部自定義的函數(shù)對象對參數(shù)(原函數(shù))的一些裝飾(額外操作)

當(dāng)一個函數(shù)被裝飾器裝飾。那么以后調(diào)用這個函數(shù)(此函數(shù)已經(jīng)非彼函數(shù))的時候,實際上調(diào)用的是裝飾器函數(shù)返回的內(nèi)部函數(shù)對象。理解tornado中coroutine修飾的函數(shù)如何執(zhí)行,主要是 理解coroutine這個裝飾器函數(shù)內(nèi)部定義的新函數(shù)對象所做的那些事兒。

def coroutine(func, replace_callback=True):
  return _make_coroutine_wrapper(func, replace_callback=True)

詳細(xì)解讀tornado協(xié)程(coroutine)原理

class Runner(object):
  def __init__(self, gen, result_future, first_yielded):
    self.gen = gen
    self.result_future = result_future
    self.future = _null_future
    self.yield_point = None
    self.pending_callbacks = None
    self.results = None
    self.running = False
    self.finished = False
    self.had_exception = False
    self.io_loop = IOLoop.current()
    self.stack_context_deactivate = None
    if self.handle_yield(first_yielded):
      self.run()

  def run(self):
    if self.running or self.finished:
      return
    try:
      self.running = True
      while True:
        future = self.future
        if not future.done():
          return
        self.future = None
        try:
          try:
            value = future.result()
          except Exception:
            self.had_exception = True
            yielded = self.gen.throw(*sys.exc_info())
          else:
            yielded = self.gen.send(value)
        except (StopIteration, Return) as e:
          self.finished = True
          self.future = _null_future
          self.result_future.set_result(getattr(e, 'value', None))
          self.result_future = None
          return
        except Exception:
          self.finished = True
          self.future = _null_future
          self.result_future.set_exc_info(sys.exc_info())
          self.result_future = None
          return
        if not self.handle_yield(yielded):
          return
    finally:
      self.running = False

  def handle_yield(self, yielded):

    try:
      self.future = convert_yielded(yielded)
    except BadYieldError:
      self.future = TracebackFuture()
      self.future.set_exc_info(sys.exc_info())

    if not self.future.done() or self.future is moment:
      self.io_loop.add_future(
        self.future, lambda f: self.run())
      return False
    return True

以上的代碼其實都對源碼進(jìn)行了一些調(diào)整。但函數(shù)調(diào)用進(jìn)入到Runner的構(gòu)造函數(shù)的時候,也就是說Generator的第一次執(zhí)行已經(jīng)完畢。那么接下來,調(diào)用的是,handle_yield,對第一次Generator執(zhí)行的返回結(jié)果進(jìn)行處理。當(dāng)然返回的結(jié)果可能是多種類型??赡苁且粋€Future對象,list,dict,或者其他類型對象,或者普通類型。通過convert_yield,self.future保存的是一個Future對象的引用(第一次Generator執(zhí)行返回的結(jié)果)。此時如果self.future還沒被set_result。對為self.future綁定一個done_callback(lambda f: self.run()),加入到self.io_loop中。

在前文說到。ioloop的add_future函數(shù)中,實際上是只有當(dāng)參數(shù)future,在某個地方調(diào)用了set_result, 才在執(zhí)行done_callback時,將參數(shù)callback加入到IOLoop中調(diào)度。換句話說。Runner類中,self.run要等到self.future在某個代碼塊被set_result,IOLoop才有可能在下一次迭代的時候執(zhí)行它,從而調(diào)度協(xié)程繼續(xù)恢復(fù)執(zhí)行。而在self.run函數(shù)中,我們可以看到將會通過Generator的send函數(shù),恢復(fù)執(zhí)行下一個協(xié)程代碼塊。所以關(guān)鍵的問題是我們需要明白Runner類中self.future,在什么時候被set_result?

從這里我們可以看到Future類的重要作用。future.set_result起到的作用是:

發(fā)送一個信號,告訴IOLoop去調(diào)度暫停的協(xié)程繼續(xù)執(zhí)行。

我們結(jié)合下面的代碼例子就可以明白協(xié)程調(diào)度的整個流程是如何進(jìn)行的了。

import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future

@coroutine
def asyn_sum(a, b):
  print("begin calculate:sum %d+%d"%(a,b))
  future = Future()

  def callback(a, b):
    print("calculating the sum of %d+%d:"%(a,b))
    future.set_result(a+b)
  tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)

  result = yield future

  print("after yielded")
  print("the %d+%d=%d"%(a, b, result))

def main():
  asyn_sum(2,3)
  tornado.ioloop.IOLoop.instance().start()

if __name__ == "__main__":
  main()

實際的運行場景是:一個協(xié)程(asyn_sum)遇到y(tǒng)ield表達(dá)式被暫停執(zhí)行后,IOLoop調(diào)用另外一個代碼段(asyn_sum中的回調(diào)函數(shù)callback)執(zhí)行,而在callback中,剛好可以訪問到屬于被暫停協(xié)程(asyn_sum)中的future對象(也就是Runner對象中的self.future的引用),callback中將future調(diào)用set_result,那么這個暫停的協(xié)程(asyn_sum)在IOLoop下一次迭代調(diào)度回調(diào)函數(shù)時中,被恢復(fù)執(zhí)行。

總結(jié)

tornado中的協(xié)程實現(xiàn)基于python語言的Generator并且結(jié)合一個全局的調(diào)度器IOLoop,Generator通過函數(shù)裝飾器coroutine和IOLoop進(jìn)行通信。IOLoop并沒有直接控制能力,調(diào)度恢復(fù)被暫停的協(xié)程繼續(xù)執(zhí)行。future對象在協(xié)程中被yield。協(xié)程暫停,IOLoop調(diào)度另外一個代碼模塊執(zhí)行,而在這個執(zhí)行的代碼模塊中剛好,可以訪問這個future對象,將其set_result,結(jié)果通過IOLoop間接恢復(fù)暫停協(xié)程執(zhí)行。不同執(zhí)行代碼模塊中,共享future對象,彼此合作,協(xié)程調(diào)度得順利執(zhí)行。

從這種意義上來說,future對象,像window中的Event內(nèi)核對象的作用。window中的event用于線程中同步。而協(xié)程中的yield future相當(dāng)于WaitForSingleObject(event_object), 而future.set_result(result)。相當(dāng)于SetEvent(event_object)。而future和Event的不同點在于,協(xié)程借future來恢復(fù)執(zhí)行,而線程借Event來進(jìn)行線程間同步。

以上就是本文關(guān)于詳細(xì)解讀tornado協(xié)程(coroutine)原理的全部內(nèi)容,希望對大家有所幫助。感興趣的朋友可以繼續(xù)參閱本站其他相關(guān)專題,如有不足之處,歡迎留言指出。感謝朋友們對本站的支持!

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI