溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶(hù)服務(wù)條款》

python中如何使用contextvars模塊

發(fā)布時(shí)間:2023-05-18 15:40:02 來(lái)源:億速云 閱讀:148 作者:iii 欄目:編程語(yǔ)言

這篇“python中如何使用contextvars模塊”文章的知識(shí)點(diǎn)大部分人都不太理解,所以小編給大家總結(jié)了以下內(nèi)容,內(nèi)容詳細(xì),步驟清晰,具有一定的借鑒價(jià)值,希望大家閱讀完這篇文章能有所收獲,下面我們一起來(lái)看看這篇“python中如何使用contextvars模塊”文章吧。

    前記

    在Python3.7后官方庫(kù)出現(xiàn)了contextvars模塊, 它的主要功能就是可以為多線程以及asyncio生態(tài)添加上下文功能,即使程序在多個(gè)協(xié)程并發(fā)運(yùn)行的情況下,也能調(diào)用到程序的上下文變量, 從而使我們的邏輯解耦.

    上下文,可以理解為我們說(shuō)話的語(yǔ)境, 在聊天的過(guò)程中, 有些話脫離了特定的語(yǔ)境,他的意思就變了,程序的運(yùn)行也是如此.在線程中也是有他的上下文,只不過(guò)稱(chēng)為堆棧,如在python中就是保存在thread.local變量中,而協(xié)程也有他自己的上下文,但是沒(méi)有暴露出來(lái),不過(guò)有了contextvars模塊后我們可以通過(guò)contextvars模塊去保存與讀取.

    使用contextvars的好處不僅可以防止’一個(gè)變量傳遍天’的事情發(fā)生外,還能很好的結(jié)合TypeHint,可以讓自己的代碼可以被mypy以及IDE檢查,讓自己的代碼更加適應(yīng)工程化.
    不過(guò)用了contextvars后會(huì)多了一些隱性的調(diào)用, 需要解決好這些隱性的成本.

    更新說(shuō)明

    • 切換web框架sanicstarlette

    • 增加一個(gè)自己編寫(xiě)且可用于starlette,fastapi的context說(shuō)明

    • 更新fast_tools.context的最新示例以及簡(jiǎn)單的修改行文。

    1.有無(wú)上下文傳變量的區(qū)別

    如果有用過(guò)Flask框架, 就知道了Flask擁有自己的上下文功能, 而contextvars跟它很像, 而且還增加了對(duì)asyncio的上下文提供支持。
    Flask的上下文是基于threading.local實(shí)現(xiàn)的, threading.local的隔離效果很好,但是他是只針對(duì)線程的,只隔離線程之間的數(shù)據(jù)狀態(tài), 而werkzeug為了支持在gevent中運(yùn)行,自己實(shí)現(xiàn)了一個(gè)Local變量, 常用的Flask上下文變量request的例子如下:

    from flask import Flask, request
    app = Flask(__name__)
    @app.route('/')
    def root():
        so1n_name = request.get('so1n_name')
        return f'Name is {so1n_name}'

    與之相比的是Python的另一個(gè)經(jīng)典Web框架Djano, 它沒(méi)有上下文的支持, 所以只能顯示的傳request對(duì)象, 例子如下:

    from django.http import HttpResponse
    def root(request):
        so1n_name = request.get('so1n_name')
        return HttpResponse(f'Name is {so1n_name}')

    通過(guò)上面兩者的對(duì)比可以發(fā)現(xiàn), 在Django中,我們需要顯示的傳一個(gè)叫request的變量,而Flask則是import一個(gè)叫request的全局變量,并在視圖中直接使用,達(dá)到解耦的目的.

    可能會(huì)有人說(shuō), 也就是傳個(gè)變量的區(qū)別,為了省傳這個(gè)變量,而花許多功夫去維護(hù)一個(gè)上下文變量,有點(diǎn)不值得,那可以看看下面的例子,如果層次多就會(huì)出現(xiàn)’一個(gè)參數(shù)傳一天’的情況(不過(guò)分層做的好或者需求不坑爹一般不會(huì)出現(xiàn)像下面的情況,一個(gè)好的程序員能做好代碼的分層, 但可能也有出現(xiàn)一堆爛需求的時(shí)候)

    # 偽代碼,舉個(gè)例子一個(gè)request傳了3個(gè)函數(shù)
    from django.http import HttpResponse
    def is_allow(request, uid):
        if request.ip == '127.0.0.1' and check_permissions(uid):
            return True
        else:
            return False
    def check_permissions(request, uid):
        pass
    
    def root(request):
        user_id = request.GET.get('uid')
        if is_allow(request, id):
        	return HttpResponse('ok')
        else
            return HttpResponse('error')

    此外, 除了防止一個(gè)參數(shù)傳一天這個(gè)問(wèn)題外, 通過(guò)上下文, 可以進(jìn)行一些解耦, 比如有一個(gè)最經(jīng)典的技術(shù)業(yè)務(wù)需求就是在日志打印request_id, 從而方便鏈路排查, 這時(shí)候如果有上下文模塊, 就可以把讀寫(xiě)request_id給解耦出來(lái), 比如下面這個(gè)基于Flask框架讀寫(xiě)request_id的例子:

    import logging
    from typing import Any
    from flask import g  # type: ignore
    from flask.logging import default_handler
    # 這是一個(gè)Python logging.Filter的對(duì)象, 日志在生成之前會(huì)經(jīng)過(guò)Filter步驟, 這時(shí)候我們可以為他綁定request_id變量
    class RequestIDLogFilter(logging.Filter):
        """
        Log filter to inject the current request id of the request under `log_record.request_id`
        """
        def filter(self, record: Any) -> Any:
            record.request_id = g.request_id or None
            return record
    # 配置日志的format格式, 這里多配了一個(gè)request_id變量
    format_string: str = (
        "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
    )
    # 為flask的默認(rèn)logger設(shè)置format和增加一個(gè)logging.Filter對(duì)象
    default_handler.setFormatter(logging.Formatter(format_string))
    default_handler.addFilter(RequestIDLogFilter())
    # 該方法用于設(shè)置request_id
    def set_request_id() -> None:
        g.request_id = request.headers.get("X-Request-Id", str(uuid4()))
    # 初始化FLask對(duì)象, 并設(shè)置before_request
    app: Flask = Flask("demo")
    app.before_request(set_request_id)

    2.如何使用contextvars模塊

    這里舉了一個(gè)例子, 但這個(gè)例子也有別的解決方案. 只不過(guò)通過(guò)這個(gè)例子順便說(shuō)如何使用contextvar模塊

    首先看看未使用contextvars時(shí),asyncio的web框架是如何傳變量的,根據(jù)starlette的文檔,在未使用contextvars時(shí),傳遞Redis客戶(hù)端實(shí)例的辦法是通過(guò)request.stat這個(gè)變量保存Redis客戶(hù)端的實(shí)例,改寫(xiě)代碼如下:

    # demo/web_tools.py
    # 通過(guò)中間件把變量給存進(jìn)去
    class RequestContextMiddleware(BaseHTTPMiddleware):
        async def dispatch(
                self, request: Request, call_next: RequestResponseEndpoint
        ) -> Response:
            request.stat.redis = REDIS_POOL
            response = await call_next(request)
            return response
    # demo/server.py
    # 調(diào)用變量
    @APP.route('/')
    async def homepage(request):
        # 偽代碼,這里是執(zhí)行redis命令
        await request.stat.redis.execute()
        return JSONResponse({'hello': 'world'})

    代碼非常簡(jiǎn)便, 也可以正常的運(yùn)行, 但你下次在重構(gòu)時(shí), 比如簡(jiǎn)單的把redis這個(gè)變量名改為new_redis, 那IDE不會(huì)識(shí)別出來(lái), 需要一個(gè)一個(gè)改。 同時(shí), 在寫(xiě)代碼的時(shí)候, IDE永遠(yuǎn)不知道這個(gè)方法調(diào)用到的變量的類(lèi)型是什么, IDE也無(wú)法智能的幫你檢查(如輸入request.stat.redis.時(shí),IDE不會(huì)出現(xiàn)execute,或者出錯(cuò)時(shí),IDE并不會(huì)提示). 這非常不利于項(xiàng)目的工程化, 而通過(guò)contextvarsTypeHints, 恰好能解決這個(gè)問(wèn)題.

    說(shuō)了那么多, 下面以一個(gè)Redis client為例子,展示如何在asyncio生態(tài)中使用contextvars, 并引入TypeHints(詳細(xì)解釋見(jiàn)代碼).

    # demo/context.py
    # 該文件存放contextvars相關(guān)
    import contextvars
    if TYPE_CHECKING:
        from demo.redis_dal import RDS  # 這里是一個(gè)redis的封裝實(shí)例
    # 初始化一個(gè)redis相關(guān)的全局context
    redis_pool_context = contextvars.ContextVar('redis_pool')
    # 通過(guò)函數(shù)調(diào)用可以獲取到當(dāng)前協(xié)程運(yùn)行時(shí)的context上下文
    def get_redis() -> 'RDS':
        return redis_pool_context.get()
    # demo/web_tool.py
    # 該文件存放starlette相關(guān)模塊
    from starlette.middleware.base import BaseHTTPMiddleware
    from starlette.requests import Request
    from starlette.middleware.base import RequestResponseEndpoint
    from starlette.responses import Response
    from demo.redis_dal import RDS
    # 初始化一個(gè)redis客戶(hù)端變量,當(dāng)前為空
    REDIS_POOL = None  # type: Optional[RDS]
    class RequestContextMiddleware(BaseHTTPMiddleware):
        async def dispatch(
                self, request: Request, call_next: RequestResponseEndpoint
        ) -> Response:
            # 通過(guò)中間件,在進(jìn)入路由之前,把redis客戶(hù)端放入當(dāng)前協(xié)程的上下文之中
            token = redis_pool_context.set(REDIS_POOL)
            try:
            	response = await call_next(request)
                return response
            finally:
            	# 調(diào)用完成,回收當(dāng)前請(qǐng)求設(shè)置的redis客戶(hù)端的上下文
                redis_pool_context.reset(token)
    async def startup_event() -> None:
        global REDIS_POOL
        REDIS_POOL = RDS() # 初始化客戶(hù)端,里面通過(guò)asyncio.ensure_future邏輯延后連接
    async def shutdown_event() -> None:
        if REDIS_POOL:
            await REDIS_POOL.close() # 關(guān)閉redis客戶(hù)端
    # demo/server.py
    # 該文件存放starlette main邏輯
    from starlette.applications import Starlette
    from starlette.responses import JSONResponse
    from demo.web_tool import RequestContextMiddleware
    from demo.context import get_redis
    APP = Starlette()
    APP.add_middleware(RequestContextMiddleware)
    @APP.route('/')
    async def homepage(request):
        # 偽代碼,這里是執(zhí)行redis命令
        # 只要驗(yàn)證 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那證明contextvars可以為asyncio維護(hù)一套上下文狀態(tài)
        await get_redis().execute()
        return JSONResponse({'hello': 'world'})

    3.如何優(yōu)雅的使用contextvars

    從上面的示例代碼來(lái)看, 使用contextvarTypeHint確實(shí)能讓讓IDE可以識(shí)別到這個(gè)變量是什么了, 但增加的代碼太多了,更恐怖的是, 每多一個(gè)變量,就需要自己去寫(xiě)一個(gè)context,一個(gè)變量的初始化,一個(gè)變量的get函數(shù),同時(shí)在引用時(shí)使用函數(shù)會(huì)比較別扭.

    自己在使用了contextvars一段時(shí)間后,覺(jué)得這樣太麻煩了,每次都要做一堆重復(fù)的操作,且平時(shí)使用最多的就是把一個(gè)實(shí)例或者提煉出Headers的參數(shù)放入contextvars中,所以寫(xiě)了一個(gè)封裝fast_tools.context(同時(shí)兼容fastapistarlette), 它能屏蔽所有與contextvars的相關(guān)邏輯,其中由ContextModel負(fù)責(zé)contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader負(fù)責(zé)托管Headers相關(guān)的參數(shù), 調(diào)用者只需要在ContextModel中寫(xiě)入自己需要的變量,引用時(shí)調(diào)用ContextModel的屬性即可.

    以下是調(diào)用者的代碼示例, 這里的實(shí)例化變量由一個(gè)http client代替, 且都會(huì)每次請(qǐng)求分配一個(gè)客戶(hù)端實(shí)例, 但在實(shí)際使用中并不會(huì)為每一個(gè)請(qǐng)求都分配一個(gè)客戶(hù)端實(shí)例, 很影響性能:

    import asyncio
    import uuid
    from contextvars import Context, copy_context
    from functools import partial
    from typing import Optional, Set
    import httpx
    from fastapi import FastAPI, Request, Response
    from fast_tools.context import ContextBaseModel, ContextMiddleware, HeaderHelper
    app: FastAPI = FastAPI()
    check_set: Set[int] = set()
    class ContextModel(ContextBaseModel):
        """
    	通過(guò)該實(shí)例可以屏蔽大部分與contextvars相關(guān)的操作,如果要添加一個(gè)變量,則在該實(shí)例添加一個(gè)屬性即可.
    	屬性必須要使用Type Hints的寫(xiě)法,不然不會(huì)識(shí)別(強(qiáng)制使用Type Hints)
        """
        # 用于把自己的實(shí)例(如上文所說(shuō)的redis客戶(hù)端)存放于contextvars中
        http_client: httpx.AsyncClient
        # HeaderHepler用于把header的變量存放于contextvars中
        request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
        ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
        user_agent: str = HeaderHelper.i("User-Agent")
    
        async def before_request(self, request: Request) -> None:
            # 請(qǐng)求之前的鉤子, 通過(guò)該鉤子可以設(shè)置自己的變量
            self.http_client = httpx.AsyncClient()
            check_set.add(id(self.http_client))
    
        async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
            # 準(zhǔn)備退出中間件的鉤子, 這步奏后會(huì)清掉上下文
            await self.http_client.aclose()
    context_model: ContextModel = ContextModel()
    app.add_middleware(ContextMiddleware, context_model=context_model)
    async def test_ensure_future() -> None:
        assert id(context_model.http_client) in check_set
    def test_run_in_executor() -> None:
        assert id(context_model.http_client) in check_set
    def test_call_soon() -> None:
        assert id(context_model.http_client) in check_set
    @app.get("/")
    async def root() -> dict:
        # 在使用asyncio.ensure_future開(kāi)啟另外一個(gè)子協(xié)程跑任務(wù)時(shí), 也可以復(fù)用上下文
        asyncio.ensure_future(test_ensure_future())
        loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()
        # 使用call_soon也能復(fù)用上下文
        loop.call_soon(test_call_soon)
        # 使用run_in_executor也能復(fù)用上下文, 但必須使用上下文的run方法, copy_context表示復(fù)制當(dāng)前的上下文
        ctx: Context = copy_context()
        await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
        return {
            "message": context_model.to_dict(is_safe_return=True),  # not return CustomQuery
            "client_id": id(context_model.http_client),
        }
    if __name__ == "__main__":
        import uvicorn  # type: ignore
        uvicorn.run(app)

    可以從例子中看到, 通過(guò)封裝的上下文調(diào)用會(huì)變得非常愉快, 只要通過(guò)一兩步方法就能設(shè)置好自己的上下文屬性, 同時(shí)不用考慮如何編寫(xiě)上下文的生命周期. 另外也能通過(guò)這個(gè)例子看出, 在asyncio生態(tài)中, contextvars能運(yùn)用到包括子協(xié)程, 多線程等所有的場(chǎng)景中.

    4.contextvars的原理

    在第一次使用時(shí),我就很好奇contextvars是如何去維護(hù)程序的上下文的,好在contextvars的作者出了一個(gè)向下兼容的contextvars庫(kù),雖然他不支持asyncio,但我們還是可以通過(guò)代碼了解到他的基本原理.

    4.1 ContextMeta,ContextVarMeta和TokenMeta

    代碼倉(cāng)中有ContextMeta,ContextVarMetaTokenMeta這幾個(gè)對(duì)象, 它們的功能都是防止用戶(hù)來(lái)繼承Context,ContextVarToken,原理都是通過(guò)元類(lèi)來(lái)判斷類(lèi)名是否是自己編寫(xiě)類(lèi)的名稱(chēng),如果不是則拋錯(cuò).

    class ContextMeta(type(collections.abc.Mapping)):
        # contextvars.Context is not subclassable.
        def __new__(mcls, names, bases, dct):
            cls = super().__new__(mcls, names, bases, dct)
            if cls.__module__ != 'contextvars' or cls.__name__ != 'Context':
                raise TypeError("type 'Context' is not an acceptable base type")
            return cls
    4.2 Token

    上下文的本質(zhì)是一個(gè)堆棧, 每次set一次對(duì)象就向堆棧增加一層數(shù)據(jù), 每次reset就是pop掉最上層的數(shù)據(jù), 而在Contextvars中, 通過(guò)Token對(duì)象來(lái)維護(hù)堆棧之間的交互.

    class Token(metaclass=TokenMeta):
        MISSING = object()
        def __init__(self, context, var, old_value):
            # 分別存放上下文變量, 當(dāng)前set的數(shù)據(jù)以及上次set的數(shù)據(jù)
            self._context = context
            self._var = var
            self._old_value = old_value
            self._used = False
        @property
        def var(self):
            return self._var
        @property
        def old_value(self):
            return self._old_value
        def __repr__(self):
            r = '<Token '
            if self._used:
                r += ' used'
            r += ' var={!r} at {:0x}>'.format(self._var, id(self))
            return r

    可以看到Token的代碼很少, 它只保存當(dāng)前的context變量, 本次調(diào)用set的數(shù)據(jù)和上一次被set的舊數(shù)據(jù). 用戶(hù)只有在調(diào)用contextvar.context后才能得到Token, 返回的Token可以被用戶(hù)在調(diào)用context后, 通過(guò)調(diào)用context.reset(token)來(lái)清空保存的上下文,方便本次context的變量能及時(shí)的被回收, 回到上上次的數(shù)據(jù).

    4.3 全局唯一context

    前面說(shuō)過(guò), Python中由threading.local()負(fù)責(zé)每個(gè)線程的context, 協(xié)程屬于線程的&rsquo;子集&rsquo;,所以contextvar直接基于threading.local()生成自己的全局context. 從他的源代碼可以看到, _state就是threading.local()的引用, 并通過(guò)設(shè)置和讀取_statecontext屬性來(lái)寫(xiě)入和讀取當(dāng)前的上下文, copy_context調(diào)用也很簡(jiǎn)單, 同樣也是調(diào)用到threading.local()API.

    def copy_context():
        return _get_context().copy()
    def _get_context():
        ctx = getattr(_state, 'context', None)
        if ctx is None:
            ctx = Context()
            _state.context = ctx
        return ctx
    def _set_context(ctx):
        _state.context = ctx
    _state = threading.local()

    關(guān)于threading.local(),雖然不是本文重點(diǎn),但由于contextvars是基于threading.local()進(jìn)行封裝的,所以還是要明白threading.local()的原理,這里并不直接通過(guò)源碼分析, 而是做一個(gè)簡(jiǎn)單的示例解釋.

    在一個(gè)線程里面使用線程的局部變量會(huì)比直接使用全局變量的性能好,因?yàn)榫植孔兞恐挥芯€程自己能看見(jiàn),不會(huì)影響其他線程,而全局變量的修改必須加鎖, 性能會(huì)變得很差, 比如下面全局變量的例子:

    pet_dict = {}
    def get_pet(pet_name):
        return pet_dict[pet_name]
    def set_pet(pet_name):
        return pet_dict[pet_name]

    這份代碼就是模仿一個(gè)簡(jiǎn)單的全局變量調(diào)用, 如果是多線程調(diào)用的話, 那就需要加鎖啦, 每次在讀寫(xiě)之前都要等到持有鎖的線程放棄了鎖后再去競(jìng)爭(zhēng), 而且還可能污染到了別的線程存放的數(shù)據(jù).

    而線程的局部變量則是讓每個(gè)線程有一個(gè)自己的pet_dict, 假設(shè)每個(gè)線程調(diào)用get_pet,set_pet時(shí),都會(huì)把自己的pid傳入進(jìn)來(lái), 那么就可以避免多個(gè)線程去同時(shí)競(jìng)爭(zhēng)資源, 同時(shí)也不會(huì)污染到別的線程的數(shù)據(jù), 那么代碼可以改為這樣子:

    pet_dict = {}
    def get_pet(pet_name, pid):
        return pet_dict[pid][pet_name]
    def set_pet(pet_name, pid):
        return pet_dict[pid][pet_name]

    不過(guò)這樣子使用起來(lái)非常方便, 同時(shí)示例例子沒(méi)有對(duì)異常檢查和初始化等處理, 如果值比較復(fù)雜, 我們還要維護(hù)異常狀況, 這樣太麻煩了.

    這時(shí)候threading.local()就應(yīng)運(yùn)而生了,他負(fù)責(zé)幫我們處理這些維護(hù)的工作,我們只要對(duì)他進(jìn)行一些調(diào)用即可,調(diào)用起來(lái)跟單線程調(diào)用一樣簡(jiǎn)單方便, 應(yīng)用threading.local()后的代碼如下:

    import threading
    thread_local=threading.local()
    def get_pet(pet_name):
        return thread_local[pet_name]
    def set_pet(pet_name):
        return thread_local[pet_name]

    可以看到代碼就像調(diào)用全局變量一樣, 但是又不會(huì)產(chǎn)生競(jìng)爭(zhēng)狀態(tài)。

    4.4contextvar自己封裝的Context

    contextvars自己封裝的Context比較簡(jiǎn)單, 這里只展示他的兩個(gè)核心方法(其他的魔術(shù)方法就像dict的魔術(shù)方法一樣):

    class Context(collections.abc.Mapping, metaclass=ContextMeta):
        def __init__(self):
            self._data = immutables.Map()
            self._prev_context = None
        def run(self, callable, *args, **kwargs):
            if self._prev_context is not None:
                raise RuntimeError(
                    'cannot enter context: {} is already entered'.format(self))
            self._prev_context = _get_context()
            try:
                _set_context(self)
                return callable(*args, **kwargs)
            finally:
                _set_context(self._prev_context)
                self._prev_context = None
        def copy(self):
            new = Context()
            new._data = self._data
            return new

    首先, 在__init__方法可以看到self._data,這里使用到了一個(gè)叫immutables.Map()的不可變對(duì)象,并對(duì)immutables.Map()進(jìn)行一些封裝,所以context可以看成一個(gè)不可變的dict。這樣可以防止調(diào)用copy方法后得到的上下文的變動(dòng)會(huì)影響到了原本的上下文變量。

    查看immutables.Map()的示例代碼可以看到,每次對(duì)原對(duì)象的修改時(shí),原對(duì)象并不會(huì)發(fā)生改變,并會(huì)返回一個(gè)已經(jīng)發(fā)生改變的新對(duì)象.

    map2 = map.set('a', 10)
    print(map, map2)
    # will print:
    #   <immutables.Map({'a': 1, 'b': 2})>
    #   <immutables.Map({'a': 10, 'b': 2})>
    map3 = map2.delete('b')
    print(map, map2, map3)
    # will print:
    #   <immutables.Map({'a': 1, 'b': 2})>
    #   <immutables.Map({'a': 10, 'b': 2})>
    #   <immutables.Map({'a': 10})>

    此外,context還有一個(gè)叫run的方法, 上面在執(zhí)行loop.run_in_executor時(shí)就用過(guò)run方法, 目的就是可以產(chǎn)生一個(gè)新的上下文變量給另外一個(gè)線程使用, 同時(shí)這個(gè)新的上下文變量跟原來(lái)的上下文變量是一致的.
    執(zhí)行run的時(shí)候,可以看出會(huì)copy一個(gè)新的上下文來(lái)調(diào)用傳入的函數(shù), 由于immutables.Map的存在, 函數(shù)中對(duì)上下文的修改并不會(huì)影響舊的上下文變量, 達(dá)到進(jìn)程復(fù)制數(shù)據(jù)時(shí)的寫(xiě)時(shí)復(fù)制的目的. 在run方法的最后, 函數(shù)執(zhí)行完了會(huì)再次set舊的上下文, 從而完成一次上下文切換.

    def run(self, callable, *args, **kwargs):
        # 已經(jīng)存在舊的context,拋出異常,防止多線程循環(huán)調(diào)用
        if self._prev_context is not None:
            raise RuntimeError(
                'cannot enter context: {} is already entered'.format(self))
        self._prev_context = _get_context()  # 保存當(dāng)前的context
        try:
            _set_context(self) # 設(shè)置新的context
            return callable(*args, **kwargs)  # 執(zhí)行函數(shù)
        finally:
            _set_context(self._prev_context)  # 設(shè)置為舊的context
            self._prev_context = None
    4.5 ContextVar

    我們一般在使用contextvars模塊時(shí),經(jīng)常使用的就是ContextVar這個(gè)類(lèi)了,這個(gè)類(lèi)很簡(jiǎn)單,主要提供了set&ndash;設(shè)置值,get&ndash;獲取值,reset&ndash;重置值三個(gè)方法, 從Context類(lèi)中寫(xiě)入和獲取值, 而set和reset的就是通過(guò)上面的token類(lèi)進(jìn)行交互的.

    set &ndash; 為當(dāng)前上下文設(shè)置變量

    def set(self, value):
        ctx = _get_context()  # 獲取當(dāng)前上下文對(duì)象`Context`
        data = ctx._data
        try:
            old_value = data[self]  # 獲取Context舊對(duì)象
        except KeyError:
            old_value = Token.MISSING  # 獲取不到則填充一個(gè)object(全局唯一)
        updated_data = data.set(self, value) # 設(shè)置新的值
        ctx._data = updated_data
        return Token(ctx, self, old_value) # 返回帶有舊值的token

    get &ndash; 從當(dāng)前上下文獲取變量

    def get(self, default=_NO_DEFAULT):
        ctx = _get_context()  # 獲取當(dāng)前上下文對(duì)象`Context`
        try:
            return ctx[self]  # 返回獲取的值
        except KeyError:
            pass
        if default is not _NO_DEFAULT:
            return default    # 返回調(diào)用get時(shí)設(shè)置的值
        if self._default is not _NO_DEFAULT:
            return self._default  # 返回初始化context時(shí)設(shè)置的默認(rèn)值
        raise LookupError  # 都沒(méi)有則會(huì)拋錯(cuò)

    reset &ndash; 清理本次用到的上下文數(shù)據(jù)

    def reset(self, token):
           if token._used:
           	# 判斷token是否已經(jīng)被使用
               raise RuntimeError("Token has already been used once")
           if token._var is not self:
           	# 判斷token是否是當(dāng)前contextvar返回的
               raise ValueError(
                   "Token was created by a different ContextVar")
           if token._context is not _get_context():
           	# 判斷token的上下文是否跟contextvar上下文一致
               raise ValueError(
                   "Token was created in a different Context")
           ctx = token._context
           if token._old_value is Token.MISSING:
           	# 如果沒(méi)有舊值則刪除該值
               ctx._data = ctx._data.delete(token._var)
           else:
           	# 有舊值則當(dāng)前contextvar變?yōu)榕f值
               ctx._data = ctx._data.set(token._var, token._old_value)
           token._used = True  # 設(shè)置flag,標(biāo)記token已經(jīng)被使用了

    則此,contextvar的原理了解完了,接下來(lái)再看看他是如何在asyncio運(yùn)行的.

    5.contextvars asyncio

    由于向下兼容的contextvars并不支持asyncio, 所以這里通過(guò)aiotask-context的源碼簡(jiǎn)要的了解如何在asyncio中如何獲取和設(shè)置context。

    5.1在asyncio中獲取context

    相比起contextvars復(fù)雜的概念,在asyncio中,我們可以很簡(jiǎn)單的獲取到當(dāng)前協(xié)程的task, 然后通過(guò)task就可以很方便的獲取到task的context了,由于Pyhon3.7對(duì)asyncio的高級(jí)API 重新設(shè)計(jì),所以可以看到需要對(duì)獲取當(dāng)前task進(jìn)行封裝

    PY37 = sys.version_info >= (3, 7)
    if PY37:
        def asyncio_current_task(loop=None):
            """Return the current task or None."""
            try:
                return asyncio.current_task(loop)
            except RuntimeError:
                # simulate old behaviour
                return None
    else:
        asyncio_current_task = asyncio.Task.current_task

    不同的版本有不同的獲取task方法, 之后我們就可以通過(guò)調(diào)用asyncio_current_task().context即可獲取到當(dāng)前的上下文了&hellip;

    5.2 對(duì)上下文的操作

    同樣的,在得到上下文后, 我們這里也需要set, get, reset的操作,不過(guò)十分簡(jiǎn)單, 類(lèi)似dict一樣的操作即可, 它沒(méi)有token的邏輯:

    set

    def set(key, value):
        """
        Sets the given value inside Task.context[key]. If the key does not exist it creates it.
        :param key: identifier for accessing the context dict.
        :param value: value to store inside context[key].
        :raises
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
    
        current_task.context[key] = value

    get

    def get(key, default=None):
        """
        Retrieves the value stored in key from the Task.context dict. If key does not exist,
        or there is no event loop running, default will be returned
        :param key: identifier for accessing the context dict.
        :param default: None by default, returned in case key is not found.
        :return: Value stored inside the dict[key].
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
        return current_task.context.get(key, default)

    clear &ndash; 也就是contextvar.ContextVars中的reset

    def clear():
        """
        Clear the Task.context.
        :raises ValueError: if no current task.
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError("No event loop found")
        current_task.context.clear()
    5.2 copying_task_factory和chainmap_task_factory

    在Python的更高級(jí)版本中,已經(jīng)支持設(shè)置context了,所以這兩個(gè)方法可以不再使用了.他們最后都用到了task_factory的方法.
    task_factory簡(jiǎn)單說(shuō)就是創(chuàng)建一個(gè)新的task,再通過(guò)工廠方法合成context,最后把context設(shè)置到task

    def task_factory(loop, coro, copy_context=False, context_factory=None):
        """
        By default returns a task factory that uses a simple dict as the task context,
        but allows context creation and inheritance to be customized via ``context_factory``.
        """
        # 生成context工廠函數(shù)
        context_factory = context_factory or partial(
            dict_context_factory, copy_context=copy_context)
        # 創(chuàng)建task, 跟asyncio.ensure_future一樣
        task = asyncio.tasks.Task(coro, loop=loop)
        if task._source_traceback:
            del [-1]
    
        # 獲取task的context
        try:
            context = asyncio_current_task(loop=loop).context
        except AttributeError:
            context = None
        # 從context工廠中處理context并賦值在task
        task.context = context_factory(context)
        return task

    aiotask-context提供了兩個(gè)對(duì)context處理的函數(shù)dict_context_factorychainmap_context_factory.在aiotask-context中,context是一個(gè)dict對(duì)象,dict_context_factory可以選擇賦值或者設(shè)置新的context

    def dict_context_factory(parent_context=None, copy_context=False):
        """A traditional ``dict`` context to keep things simple"""
        if parent_context is None:
            # initial context
            return {}
        else:
            # inherit context
            new_context = parent_context
            if copy_context:
                new_context = deepcopy(new_context)
            return new_context

    chainmap_context_factorydict_context_factory的區(qū)別就是在合并context而不是直接繼承.同時(shí)借用ChainMap保證合并context后,還能同步context的改變

    def chainmap_context_factory(parent_context=None):
        """
        A ``ChainMap`` context, to avoid copying any data
        and yet preserve strict one-way inheritance
        (just like with dict copying)
        """
        if parent_context is None:
            # initial context
            return ChainMap()
        else:
            # inherit context
            if not isinstance(parent_context, ChainMap):
                # if a dict context was previously used, then convert
                # (without modifying the original dict)
                parent_context = ChainMap(parent_context)
            return parent_context.new_child()

    以上就是關(guān)于“python中如何使用contextvars模塊”這篇文章的內(nèi)容,相信大家都有了一定的了解,希望小編分享的內(nèi)容對(duì)大家有幫助,若想了解更多相關(guān)的知識(shí)內(nèi)容,請(qǐng)關(guān)注億速云行業(yè)資訊頻道。

    向AI問(wèn)一下細(xì)節(jié)

    免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

    AI