01 Python并发的上下文管理

梳理在Python的各种并发场景,各种隔离级别的数据定义方式,当前重点关注Python内存中的定义和使用要点;

1.1 线程数据隔离threading.local()

代码的运行都是有上下文环境的,即运行内存空间中的局部变量是什么,全局变量是什么. 一般情况下,全局变量的名称空间(比如模块中定义的变量)对于不同的线程,都是统一的,每一个线程都能去读,写它. 当处于并发的情况下,如果想要不改变变量位于全局代码位置的封装形式,又想让变量在各个线程中都是独立的, 这时候就需要要使用thread.local().

import threading

context = threading.local()
context.value = 0  # 初始值
context.name = "global"


def func1():

    print("in func1:")
    print("context.value is {}, context.name is {}".format(context.value, context.name))


def func2():
    print("in func2:")
    context.value = 2  # 子线程内部初始化
    context.name = "func2"
    print("context.value is {}, context.name is {}".format(context.value, context.name))


def main():
    print("in main first: context.value is {}, context.name is {}".format(
        context.value, context.name))
    t = threading.Thread(target=func1)
    t.start()
    t.join()

    t = threading.Thread(target=func2)
    t.start()
    t.join()
    print("in main second: context.value is {}, context.name is {}".format(
        context.value, context.name))


if __name__ == "__main__":
    main()

运行代码输出如下结果:

in main first: context.value is 0, context.name is global
in func1:
Exception in thread Thread-1 (func1):
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
File "/usr/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
File "/root/workspace/writedocs/source/01python/demo.py", line 12, in func1
    print("context.value is {}, context.name is {}".format(context.value, context.name))
AttributeError: '_thread._local' object has no attribute 'value'
in func2:
context.value is 2, context.name is func2
in main second: context.value is 0, context.name is global

Note:

当在func1中,子线程没有设置值,所有抛出了异常; 而在func2中,子线程设置了值,但是设置的值并没有影响外面主线程的变量值; 我们可以得出如下结论: threading.local()的属性在每一个线程都是独立,并且都需要独立初始化. 每个线程在修改属性变量时,不会影响其他线程的数据.这样就达到了数据在不同线程独立的目的.

每个线程的数据都是独立的,需要在线程启动的时候初始化,否则在读取的时候就会报错;

1.2 基于ContextVar的上下文数据隔离

1.2.1 要点整理

  • contextvars.Context(上下文)是 ContextVar 及其当前值的映射容器。

  • 每个线程对应一个 PyThreadState, PyThreadState 中维护当前激活的 Context(current_context)。

  • asyncio.Task 保存一个独立的 Context, 该 Context 在 Task 创建时通过 copy_context() 捕获。 当 Task 被调度运行时, 其保存的 Context 会被设置为线程当前激活的 Context。

  • copy_context() 的时间复杂度为 O(1)。

    • 它不会遍历复制所有 ContextVar。

    • 新旧 Context 会共享底层状态结构, 因此复制成本与 ContextVar 数量无关。

    • 当执行 ContextVar.set() 时, 当前 Context 会产生新的状态版本, 修改仅对当前 Context 可见, 不会影响其他共享历史状态的 Context。

  • context.run(func, *args, **kwargs) 的本质是在当前线程中临时切换 current_context。

    • 执行前保存旧 Context。

    • 将 current_context 设置为指定 Context。

    • 执行 func。

    • func 退出后恢复原 Context。

    • run() 不会自动执行 copy_context(), 也不会创建新的 Context, 而是直接在传入的 Context 上运行代码。

    • 同一个 Context 不允许被并发进入。 如果 context_a.run(...) 尚未退出, 另一个线程(或执行流)再次进入同一个 Context, 将抛出 RuntimeError。

1.2.2 验证示例

import asyncio
import contextvars
import threading
import time
import sys
import gc

# 1. 定义一个用于追踪内存死活的测试对象
class MemoryTracker:
    def __init__(self, name):
        self.name = name
        print(f"[创建对象]:{self.name}")

    def __del__(self):
        print(f"[释放对象]:{self.name}")

ctx_var = contextvars.ContextVar("my_tracker")

def thread_worker(thread_name, obj_name):
    print(f"\n[线程 {thread_name}] 启动...")
    # 在当前线程的 Context 树中写入变量
    tracker = MemoryTracker(obj_name)
    ctx_var.set(tracker)
    
    print(f"[线程 {thread_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}")
    time.sleep(1.5)  # 故意阻塞,等待另一个线程干扰
    
    # 再次读取,验证是否被污染
    print(f"[线程 {thread_name}] 阻塞结束后再次读取: {ctx_var.get().name}")

# --- 协程工作函数 ---
async def async_worker(task_name, obj_name):
    print(f"\n[协程 {task_name}] 启动...")
    # 协程启动时自动隐式触发 copy_context() 软拷贝
    tracker = MemoryTracker(obj_name)
    ctx_var.set(tracker)
    
    print(f"[协程 {task_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}")
    # 遇到 await 挂起,此时事件循环会切走线程的 current_context 指针
    await asyncio.sleep(1) 
    
    # 唤醒后再次读取,验证指针切回来后数据是否完好
    print(f"[协程 {task_name}] 唤醒后再次读取: {ctx_var.get().name}")


def run_isolation_test():
    # A. 启动两个线程(物理隔离)
    t1 = threading.Thread(target=thread_worker, args=("A", "Thread-Data-A"))
    t2 = threading.Thread(target=thread_worker, args=("B", "Thread-Data-B"))
    t1.start()
    t2.start()
    t1.join()
    t2.join()

    # B. 在单线程内启动两个协程(指针频繁闪现切换)
    async def main_async():
        await asyncio.gather(
            async_worker("1", "Coroutine-Data-1"),
            async_worker("2", "Coroutine-Data-2")
        )
    asyncio.run(main_async())


def run_lifecycle_test():
    # 创建一个局部的旧上下文
    old_ctx = contextvars.copy_context()
    
    def scope_func():
        # 在这个临时环境中存入对象
        local_tracker = MemoryTracker("Lifecycle-Target")
        ctx_var.set(local_tracker)
        print(f"  [当前状态] 对象在临时 Context 中的引用计数: {sys.getrefcount(local_tracker) - 1}")

    print("\n[步骤 1] 通过 ctx.run 进入临时上下文并赋值:")
    old_ctx.run(scope_func)

    print("\n[步骤 2] 退出 ctx.run,回到外层上下文。")
    print("  此时线程指针已经切走,原临时 Context 失去强引用。")
    
    print("\n[步骤 3] 手动触发垃圾回收(GC)...")
    del old_ctx  # 销毁上下文容器
    gc.collect() # 强制触发 Python 内存回收
    
    print("\n[测试结束] 如果上面没有打印 [释放对象],说明发生了内存泄漏。")


if __name__ == "__main__":
    run_isolation_test()
    run_lifecycle_test()