如何使用contextvars模块和源码分析

前记

在Python3.7后出现了contextvars模块, contextvars模块最主要的功能就是可以为多线程以及asyncio生态添加上下文功能,即使程序在多个协程并发运行的情况下,也能调用到程序的上下文变量, 可以让我们的逻辑解耦.

上下文,可以理解为我们说话的语境, 在聊天的过程中, 有些话脱离了特定的语境,他的意思就变了,程序的运行也是如此.在线程中也是有他的上下文,只不过称为堆栈,如在python中就是保存在thread.local变量中,而协程也有他自己的上下文,但是没有暴露出来,不过有了contextvars模块后我们可以通过contextvars模块去保存与读取.

使用contextvars的好处不仅可以防止’一个变量传遍天’的事情发生外,还能很好的结合TypeHint,可以让自己的代码可以被mypy以及IDE检查,让自己的代码更加适应工程化.
不过用了contextvars后会多了一些隐性的调用, 需要解决好这些隐性的成本.

原文地址:so1n.me/2019/06/13/…


1.有无上下文传变量的区别

如果有用过Flask框架, 就知道了Flask用到了自己的上下文功能, 而contextvars跟它很像.
如果有去了解Flask的上下文是怎么实现的就会知道, Flask的上下文是基于threading.local实现的, threading.local的隔离效果很好,但是他是只针对线程的,只隔离线程之间的数据状态, 而werkzeug为了支持在gevent中运行,自己实现了一个Local变量, contextvars与它相似,不过contextvars除此之外还增加了对asyncio的上下文提供支持.

与之相比Python另一个经典Web框架Djano则没有上下文的支持, 最能体现和接触他们两者直接的差异是在调用request对象的时候.
先看看Django的显示传变量的例子

from django.http import HttpResponse


def root(request):
    so1n_name = request.get('so1n_name')
    return HttpResponse(f'Name is {so1n_name}')
复制代码

再看看Flask通过上下文传变量的例子:

from flask import Flask, request


app = Flask(__name__)

@app.route('/')
def root():
    so1n_name = request.get('so1n_name')
    return f'Name is {so1n_name}'
复制代码

通过上面的对比可以发现,在Django中,我们需要显示的传一个叫request的变量,而Flask则是import一个叫request的全局变量,并在视图中直接使用,达到解耦的目的.

可能会有人说, 也就是传个变量的区别,为了省传这个变量,而花许多功夫去维护一个上下文变量,有点不值得,那可以看看下面的例子,如果层次多就会出现’一个参数传一天’的情况(不过分层做的好或者需求不坑爹一般不会出现像下面的情况,一个好的程序员能做好代码的分层, 但可能也有出现一堆烂需求的时候)

# 伪代码,举个例子一个request传了3个函数
from django.http import HttpResponse


def is_allow(request, uid):
	if request.ip == '127.0.0.1' and check_permissions(uid):
		return True
	else:
		return False

def check_permissions(request, uid):
	pass

def root(request):
	user_id = request.GET.get('uid')
	if is_allow(request, id):
    	return HttpResponse('ok')
    else
	    return HttpResponse('error')
复制代码

除了防止一个参数传一天这个问题外, 通过上下文, 可以进行一些解耦, 比如有一个最经典的技术业务需求就是在日志打印request_id, 方便链路排查, 这时候如果有上下文模块, 就可以把读写request_id给解耦出来, 比如下面这个基于Flask框架读写request_id的例子:

import logging
from typing import Any

from flask import g  # type: ignore
from flask.logging import default_handler


# 这是一个Python logging.Filter的对象, 日志在生成之前会经过Filter步骤, 这时候我们可以为他绑定request_id变量
class RequestIDLogFilter(logging.Filter):
    """
    Log filter to inject the current request id of the request under `log_record.request_id`
    """

    def filter(self, record: Any) -> Any:
        record.request_id = g.request_id or None
        return record

# 配置日志的format格式, 这里多配了一个request_id变量
format_string: str = (
    "[%(asctime)s][%(levelname)s][%(filename)s:%(lineno)d:%(funcName)s:%(request_id)s]" " %(message)s"
)
# 为flask的默认logger设置format和增加一个logging.Filter对象
default_handler.setFormatter(logging.Formatter(format_string))
default_handler.addFilter(RequestIDLogFilter())

# 该方法用于设置request_id
def set_request_id() -> None:
    g.request_id = request.headers.get("X-Request-Id", str(uuid4()))

# 初始化FLask对象, 并设置before_request
app: Flask = Flask("demo")
app.before_request(set_request_id)
复制代码

2.如何使用contextvars模块

这里举了一个例子, 但这个例子也有别的解决方案. 只不过通过这个例子顺便说如何使用contextvar模块

首先看看未使用contextvars时,asyncio的web框架是如何传变量的,根据starlette的文档,在未使用contextvars时,传递Redis客户端实例的办法是通过request.stat这个变量保存Redis客户端的实例,改写代码如下:

# demo/web_tools.py
# 通过中间件把变量给存进去
class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        request.stat.redis = REDIS_POOL
        response = await call_next(request)
        return response
# demo/server.py
# 调用变量
@APP.route('/')
async def homepage(request):
	# 伪代码,这里是执行redis命令
	await request.stat.redis.execute()
    return JSONResponse({'hello': 'world'})
复制代码

代码非常简便, 也可以正常的运行, 但你下次在重构时, 比如简单的把redis这个变量名改为new_redis, 那IDE不会识别出来, 需要一个一个改, 同时, 在写代码的时候, IDE永远不知道这个方法调用到的变量的类型是什么, IDE也无法智能的帮你检查(如输入request.stat.redis.时,IDE不会出现execute,或者出错时,IDE并不会提示). 这非常不利于项目的工程化, 而通过contextvarsTypehints, 恰好能解决这个问题.

说了那么多, 下面以一个Redis client为例子,展示如何在asyncio生态中使用contextvars, 并引入TypeHints(详细解释见代码).

# demo/context.py
# 该文件存放contextvars相关
import contextvars

if TYPE_CHECKING:
    from demo.redis_dal import RDS  # 这里是一个redis的封装实例

# 初始化一个redis相关的全局context
redis_pool_context = contextvars.ContextVar('redis_pool')

# 通过函数调用可以获取到当前协程运行时的context上下文
def get_redis() -> 'RDS':
    return redis_pool_context.get()

# demo/web_tool.py
# 该文件存放starlette相关模块
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.middleware.base import RequestResponseEndpoint
from starlette.responses import Response
from demo.redis_dal import RDS


# 初始化一个redis客户端变量,当前为空
REDIS_POOL = None  # type: Optional[RDS]


class RequestContextMiddleware(BaseHTTPMiddleware):
    async def dispatch(
            self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        # 通过中间件,在进入路由之前,把redis客户端放入当前协程的上下文之中
        token = redis_pool_context.set(REDIS_POOL)
        try:
        	response = await call_next(request)
            return response
        finally:
        	# 调用完成,回收当前请求设置的redis客户端的上下文
	        redis_pool_context.reset(token)

async def startup_event() -> None:
    global REDIS_POOL

    REDIS_POOL = RDS() # 初始化客户端,里面通过asyncio.ensure_future逻辑延后连接

async def shutdown_event() -> None:
    if REDIS_POOL:
        await REDIS_POOL.close() # 关闭redis客户端

# demo/server.py
# 该文件存放starlette main逻辑
from starlette.applications import Starlette
from starlette.responses import JSONResponse

from demo.web_tool import RequestContextMiddleware
from demo.context import get_redis


APP = Starlette()
APP.add_middleware(RequestContextMiddleware)


@APP.route('/')
async def homepage(request):
	# 伪代码,这里是执行redis命令
	# 只要验证 id(get_redis())等于demo.web_tool里REDID_POOL的id一致,那证明contextvars可以为asyncio维护一套上下文状态
	await get_redis().execute()
    return JSONResponse({'hello': 'world'})
复制代码

3.如何优雅的使用contextvars

从上面的示例代码来看, 使用contextvarTypeHint确实能让让IDE可以识别到这个变量是什么了, 但增加的代码太多了,更恐怖的是, 每多一个变量,就需要自己去写一个context,一个变量的初始化,一个变量的get函数,同时在引用时使用函数会比较别扭.

自己在使用了contextvars一段时间后,觉得这样太麻烦了,每次都要做一堆重复的操作,且平时使用最多的就是把一个实例或者提炼出Headers的参数放入contextvars中,所以写了一个封装fast_tools.context(同时兼容fastapistarlette),它能屏蔽所有与contextvars的相关逻辑,其中由ContextModel负责contextvars的set和get操作,ContextMiddleware管理contextvars的周期,HeaderHeader和CustomHeader分别托管Headers相关的参数和用户自己的实例.调用者只需要在ContextModel中写入自己需要的变量,引用时调用ContextModel的属性即可.

以下是调用者的代码示例, 这里的实例化变量由一个http client代替, 且都会每次请求分配一个客户端实例, 但在实际使用中并不会为每一个请求都分配一个客户端实例, 很影响性能:

import asyncio
import logging
import uuid
from contextvars import Context, copy_context
from functools import partial
from typing import Optional

import httpx
from fastapi import FastAPI, Request, Response

from fast_tools.context import ContextBaseModel, ContextMiddleware, CustomHelper, HeaderHelper

app: FastAPI = FastAPI()


class ContextModel(ContextBaseModel):
    """
	通过该实例可以屏蔽大部分与contextvars相关的操作,如果要添加一个变量,则在该实例添加一个属性即可.
	属性必须要使用Type Hints的写法,不然不会识别(强制使用Type Hints)
    """
    # HeaderHepler用于把header的变量存放于contextvars中
    request_id: str = HeaderHelper.i("X-Request-Id", default_func=lambda request: str(uuid.uuid4()))
    ip: str = HeaderHelper.i("X-Real-IP", default_func=lambda request: request.client.host)
    user_agent: str = HeaderHelper.i("User-Agent")
    # CustomHelper用于把自己的实例(如上文所说的redis客户端)存放于contextvars中
    http_client: httpx.AsyncClient = CustomHelper.i("http_client")

    async def before_request(self, request: Request) -> None:
        # 请求之前的钩子, 通过该钩子可以设置自己的变量
        self.http_client = httpx.AsyncClient()

    async def after_response(self, request: Request, response: Response) -> None:
        # 正常响应的钩子
        raise NotImplementedError()

    async def before_reset_context(self, request: Request, response: Optional[Response]) -> None:
        # 准备退出中间件的钩子, 这步奏后会清掉上下文
        await self.http_client.aclose()


# 添加一个可以维护context的中间件,该中间件会自动维护该请求期间context_model的状态, 并调用对应的钩子
context_model: ContextModel = ContextModel()
app.add_middleware(ContextMiddleware, context_model=context_model)


async def test_ensure_future() -> None:
    logging.debug(f"test_ensure_future {id(context_model.http_client)}")


def test_run_in_executor() -> None:
    logging.info(f"test_run_in_executor {id(context_model.http_client)}")


def test_call_soon() -> None:
    logging.warning(f"test_call_soon {id(context_model.http_client)}")


@app.get("/")
async def root() -> dict:
    # 在使用asyncio.ensure_future开启另外一个子协程跑任务时, 也可以复用上下文
    asyncio.ensure_future(test_ensure_future())
    loop: "asyncio.AbstractEventLoop" = asyncio.get_event_loop()

    # 使用call_soon也能复用上下文
    loop.call_soon(test_call_soon)

    # 使用run_in_executor也能复用上下文, 但必须使用上下文的run方法, copy_context表示复制当前的上下文
    ctx: Context = copy_context()
    await loop.run_in_executor(None, partial(ctx.run, test_run_in_executor))  # type: ignore
    return {
        "message": context_model.to_dict(is_safe_return=True), # 返回的响应不能带有实例对象, 会序列化失败 
        "client_id": id(context_model.http_client),
    }


if __name__ == "__main__":
    import uvicorn  # type: ignore

    uvicorn.run(app)
复制代码

可以从例子中看到, 通过封装的上下文调用会变得非常愉快, 只要通过一两步方法就能设置好自己的上下文属性, 同时不用考虑如何编写上下文的生命周期. 另外也能通过这个例子看出, 在asyncio生态中, contextvars能运用到包括子协程, 多线程等所有的场景中.

4.contextvars的原理

在第一次使用时,我就很好奇contextvars是如何去维护程序的上下文的,好在contextvars的作者出了一个向下兼容的contextvars库,虽然他不支持asyncio,但我们还是可以通过代码了解到他的基本原理.

4.1 ContextMeta,ContextVarMeta和TokenMeta

代码仓中有ContextMeta,ContextVarMetaTokenMeta这几个对象, 它们的功能都是防止用户来继承Context,ContextVarToken,原理都是通过元类来判断类名是否是自己编写类的名称,如果不是则抛错.

class ContextMeta(type(collections.abc.Mapping)):

    # contextvars.Context is not subclassable.

    def __new__(mcls, names, bases, dct):
        cls = super().__new__(mcls, names, bases, dct)
        if cls.__module__ != 'contextvars' or cls.__name__ != 'Context':
            raise TypeError("type 'Context' is not an acceptable base type")
        return cls
复制代码

4.2 Token

上下文的本质是一个堆栈, 每次set一次对象就向堆栈增加一层数据, 每次reset就是pop掉最上层的数据, 而在Contextvars中, 通过Token对象来维护堆栈之间的交互.

class Token(metaclass=TokenMeta):

    MISSING = object()

    def __init__(self, context, var, old_value):
        # 分别存放上下文变量, 当前set的数据, 上次set的数据
        self._context = context
        self._var = var
        self._old_value = old_value
        self._used = False

    @property
    def var(self):
        return self._var

    @property
    def old_value(self):
        return self._old_value

    def __repr__(self):
        r = '<Token '
        if self._used:
            r += ' used'
        r += ' var={!r} at {:0x}>'.format(self._var, id(self))
        return r
复制代码

可以看到Token的代码很少, 它只保存当前的context变量, 本次调用set的数据和上一次被set的旧数据. 用户只有在调用contextvar.context后才能得到Token, 返回的Token可以被用户在调用context后,调用context.reset(token)来清空保存的上下文,方便本次context的变量能及时的被回收, 回到上上次的数据.

4.3 全局唯一context

前面说过, Python中由threading.local()负责每个线程的context, 协程属于线程的’子集’,所以contextvar直接基于threading.local()生成自己的全局context. 从他的源代码可以看到, _state就是threading.local()的引用, 并通过设置和读取_statecontext属性来写入和读取当前的上下文, copy_context调用也很简单, 同样也是调用到threading.local()API.

def copy_context():
    return _get_context().copy()


def _get_context():
    ctx = getattr(_state, 'context', None)
    if ctx is None:
        ctx = Context()
        _state.context = ctx
    return ctx


def _set_context(ctx):
    _state.context = ctx


_state = threading.local()
复制代码

4.3.1 threading.local()

关于threading.local(),虽然不是本文重点,但由于contextvars是基于threading.local()进行封装的,所以还是要明白threading.local()的原理,这里并不直接通过源码分析, 而是做一个简单的示例解释.

一个线程使用自己的局部变量比使用全局变量好,因为局部变量只有线程自己能看见,不会影响其他线程,而全局变量的修改必须加锁, 性能会变得很差. 但是局部变量也有问题,就是在函数调用的时候,传递起来很麻烦.
先看全局变量的例子:

pet_dict = {}

def get_pet(pet_name):
	return pet_dict[pet_name]

def set_pet(pet_name):
	return pet_dict[pet_name]
复制代码

就是一个简单的调用, 如果是多线程调用的话, 那就需要加锁啦, 每次在读写之前都要等到持有锁的线程放弃了锁后再去竞争, 而且还可能污染到了别的线程存放的数据. 为了避免这种情况, 最简单的方法让每个线程有一个自己的pet_dict,假设每个线程调用get_pet,set_pet时,都会把自己的pid传入进来, 那么久可以避免多个线程去同时竞争资源, 同时也不会污染到别的线程的数据, 那么代码可以改为这样子:

pet_dict = {}

def get_pet(pet_name, pid):
	return pet_dict[pid][pet_name]

def set_pet(pet_name, pid):
	return pet_dict[pid][pet_name]
复制代码

这样子使用起来非常方便, 但是示例例子没有对异常检查和初始化等处理,如果值比较复杂,靠我们手动维护太麻烦了.
这时候threading.local()就应运而生了,他负责帮我们处理这些维护的工作,我们只要对他进行一些调用即可,调用起来跟单线程调用一样简单方便.

import threading


thread_local=threading.local()

def get_pet(pet_name):
	return thread_local[pet_name]

def set_pet(pet_name, pid):
	return thread_local[pet_name]
复制代码

4.4contextvar自己封装的Context

contextvars自己封装的Context比较简单, 这里只展示他的两个核心方法, 其他的魔术方法就像dict的魔术方法一样.

class Context(collections.abc.Mapping, metaclass=ContextMeta):

    def __init__(self):
        self._data = immutables.Map()
        self._prev_context = None

    def run(self, callable, *args, **kwargs):
        if self._prev_context is not None:
            raise RuntimeError(
                'cannot enter context: {} is already entered'.format(self))

        self._prev_context = _get_context()
        try:
            _set_context(self)
            return callable(*args, **kwargs)
        finally:
            _set_context(self._prev_context)
            self._prev_context = None

    def copy(self):
        new = Context()
        new._data = self._data
        return new
复制代码

首先, 在__init__方法可以看到self._data,这里使用到了一个叫immutables.Map()的不可变对象,并对immutables.Map()进行一些封装,所以context可以看成一个不可变的dict.
这样可以防止才调用copy方法后得到的上下文后影响到了原本的上下文变量.

查看immutables.Map()的示例代码可以看到,每次对原对象的修改时,原对象并不会发生改变,并会返回一个已经发生改变的新对象.

map2 = map.set('a', 10)
print(map, map2)
# will print:
#   <immutables.Map({'a': 1, 'b': 2})>
#   <immutables.Map({'a': 10, 'b': 2})>

map3 = map2.delete('b')
print(map, map2, map3)
# will print:
#   <immutables.Map({'a': 1, 'b': 2})>
#   <immutables.Map({'a': 10, 'b': 2})>
#   <immutables.Map({'a': 10})>
复制代码

此外,context还有一个叫run的方法, 上面在执行loop.run_in_executor时就用过run方法, 目的就是可以产生一个新的上下文变量给另外一个线程使用. 执行run的时候,可以看出会copy一个新的上下文来调用传入的函数, 由于immutables.Map的存在, 函数中对上下文的修改并不会影响旧的上下文变量, 达到进程复制数据时的写时复制的目的. 在run方法的最后, 函数执行完了会再次set刚才的上下文,完成一次上下文切换.

def run(self, callable, *args, **kwargs):
    # 已经存在旧的context,抛出异常,防止多线程循环调用
    if self._prev_context is not None:
        raise RuntimeError(
            'cannot enter context: {} is already entered'.format(self))

    self._prev_context = _get_context()  # 保存当前的context
    try:
        _set_context(self) # 设置新的context
        return callable(*args, **kwargs)  # 执行函数
    finally:
        _set_context(self._prev_context)  # 设置为旧的context
        self._prev_context = None
复制代码

4.5 ContextVar

我们一般在使用contextvars模块时,经常使用的就是ContextVar这个类了,这个类很简单,主要提供了set–设置值,get–获取值,reset–重置值三个方法, 从Context类中写入和获取值, 而set和reset的就是通过上面的token类进行交互的.

  • set — 为当前上下文设置变量
    def set(self, value):
        ctx = _get_context()  # 获取当前上下文对象`Context`
        data = ctx._data
        try:
            old_value = data[self]  # 获取Context旧对象
        except KeyError:
            old_value = Token.MISSING  # 获取不到则填充一个object(全局唯一)
    
        updated_data = data.set(self, value) # 设置新的值
        ctx._data = updated_data
        return Token(ctx, self, old_value) # 返回带有旧值的token
    复制代码
  • get — 从当前上下文获取变量
    def get(self, default=_NO_DEFAULT):
        ctx = _get_context()  # 获取当前上下文对象`Context`
        try:
            return ctx[self]  # 返回获取的值
        except KeyError:
            pass
    
        if default is not _NO_DEFAULT:
            return default    # 返回调用get时设置的值
    
        if self._default is not _NO_DEFAULT:
            return self._default  # 返回初始化context时设置的默认值
    
        raise LookupError  # 都没有则会抛错
    复制代码
  • reset — 清理本次用到的上下文数据
    def reset(self, token):
        if token._used:
        	# 判断token是否已经被使用
            raise RuntimeError("Token has already been used once")
    
        if token._var is not self:
        	# 判断token是否是当前contextvar返回的
            raise ValueError(
                "Token was created by a different ContextVar")
    
        if token._context is not _get_context():
        	# 判断token的上下文是否跟contextvar上下文一致
            raise ValueError(
                "Token was created in a different Context")
    
        ctx = token._context
        if token._old_value is Token.MISSING:
        	# 如果没有旧值则删除该值
            ctx._data = ctx._data.delete(token._var)
        else:
        	# 有旧值则当前contextvar变为旧值
            ctx._data = ctx._data.set(token._var, token._old_value)
    
        token._used = True  # 设置flag,标记token已经被使用了
    复制代码

则此,contextvar的原理了解完了,接下来再看看他是如何在asyncio运行的.

5.contextvars asyncio

由于向下兼容的contextvars并不支持asyncio, 同时Python3.7对于asyncio的context有非常好的支持, 获取他的上下文非常方便,所以这里通过aiotask-context的源码简要的了解如何在asyncio中如何获取和设置context

5.1在asyncio中获取context

相比起contextvars复杂的概念,在asyncio中,我们可以很简单的获取到当前协程的task,并由task获取context,由于Pyhon3.7对asyncio的高级API 重新设计,所以可以看到需要对获取当前task进行封装

PY37 = sys.version_info >= (3, 7)

if PY37:
    def asyncio_current_task(loop=None):
        """Return the current task or None."""
        try:
            return asyncio.current_task(loop)
        except RuntimeError:
            # simulate old behaviour
            return None
else:
    asyncio_current_task = asyncio.Task.current_task
复制代码

之后我们调用asyncio_current_task().context即可获取到当前的上下文了…

5.2 对上下文的操作

同样的,在得到上下文后, 我们这里也需要set, get, reset的操作,不过十分简单, 类似dict一样的操作即可

  • set
    def set(key, value):
        """
        Sets the given value inside Task.context[key]. If the key does not exist it creates it.
        :param key: identifier for accessing the context dict.
        :param value: value to store inside context[key].
        :raises
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
    
        current_task.context[key] = value
    复制代码
  • get
    def get(key, default=None):
        """
        Retrieves the value stored in key from the Task.context dict. If key does not exist,
        or there is no event loop running, default will be returned
        :param key: identifier for accessing the context dict.
        :param default: None by default, returned in case key is not found.
        :return: Value stored inside the dict[key].
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError(NO_LOOP_EXCEPTION_MSG.format(key))
    
        return current_task.context.get(key, default)
    复制代码
  • clear — 也就是contextvar.ContextVars中的reset
    def clear():
        """
        Clear the Task.context.
        :raises ValueError: if no current task.
        """
        current_task = asyncio_current_task()
        if not current_task:
            raise ValueError("No event loop found")
    
        current_task.context.clear()
    复制代码

5.2 copying_task_factory和chainmap_task_factory

在Python的更高级版本中,已经支持设置context了,所以这两个方法可以不再使用了.他们最后都用到了task_factory的方法.
task_factory简单说就是创建一个新的task,再通过工厂方法合成context,最后把context设置到task

def task_factory(loop, coro, copy_context=False, context_factory=None):
    """
    By default returns a task factory that uses a simple dict as the task context,
    but allows context creation and inheritance to be customized via ``context_factory``.
    """
    # 生成context工厂函数
    context_factory = context_factory or partial(
        dict_context_factory, copy_context=copy_context)

	# 创建task, 跟asyncio.ensure_future一样
    task = asyncio.tasks.Task(coro, loop=loop)
    if task._source_traceback:
        del [-1]

	# 获取task的context
    try:
        context = asyncio_current_task(loop=loop).context
    except AttributeError:
        context = None
	
	# 从context工厂中处理context并赋值在task
    task.context = context_factory(context)

    return task
复制代码

aiotask-context提供了两个对context处理的函数dict_context_factorychainmap_context_factory.在aiotask-context中,context是一个dict对象,dict_context_factory可以选择赋值或者设置新的context

def dict_context_factory(parent_context=None, copy_context=False):
    """A traditional ``dict`` context to keep things simple"""
    if parent_context is None:
        # initial context
        return {}
    else:
        # inherit context
        new_context = parent_context
        if copy_context:
            new_context = deepcopy(new_context)
        return new_context
复制代码

chainmap_context_factorydict_context_factory的区别就是在合并context而不是直接继承.同时借用ChainMap保证合并context后,还能同步context的改变

def chainmap_context_factory(parent_context=None):
    """
    A ``ChainMap`` context, to avoid copying any data
    and yet preserve strict one-way inheritance
    (just like with dict copying)
    """
    if parent_context is None:
        # initial context
        return ChainMap()
    else:
        # inherit context
        if not isinstance(parent_context, ChainMap):
            # if a dict context was previously used, then convert
            # (without modifying the original dict)
            parent_context = ChainMap(parent_context)
        return parent_context.new_child()
复制代码

至此, asyncio中context的调用就简单的分析完了, 如果想要深入的了解asyncio是怎么传上下文的, 可以查看asyncio都源码.

6.总结

contextvars本身原理很简单,但他可以让我们调用起来更加方便便捷,减少我们的传参次数,同时还可以结合TypeHint使项目更加工成化, 但是还是仁者见仁. 不过在使用时最好能加上一层封装, 最好的实践应该是一个协程共享同一个context而不是每个变量一个context.

© 版权声明
THE END
喜欢就支持一下吧
点赞0 分享