01 Python并发的上下文管理
梳理在Python的各种并发场景,各种隔离级别的数据定义方式,当前重点关注Python内存中的定义和使用要点;
1.1 线程数据隔离threading.local()
代码的运行都是有上下文环境的,即运行内存空间中的局部变量是什么,全局变量是什么. 一般情况下,全局变量的名称空间(比如模块中定义的变量)对于不同的线程,都是统一的,每一个线程都能去读,写它. 当处于并发的情况下,如果想要不改变变量位于全局代码位置的封装形式,又想让变量在各个线程中都是独立的, 这时候就需要要使用thread.local().
import threading
context = threading.local()
context.value = 0 # 初始值
context.name = "global"
def func1():
print("in func1:")
print("context.value is {}, context.name is {}".format(context.value, context.name))
def func2():
print("in func2:")
context.value = 2 # 子线程内部初始化
context.name = "func2"
print("context.value is {}, context.name is {}".format(context.value, context.name))
def main():
print("in main first: context.value is {}, context.name is {}".format(
context.value, context.name))
t = threading.Thread(target=func1)
t.start()
t.join()
t = threading.Thread(target=func2)
t.start()
t.join()
print("in main second: context.value is {}, context.name is {}".format(
context.value, context.name))
if __name__ == "__main__":
main()
运行代码输出如下结果:
in main first: context.value is 0, context.name is global
in func1:
Exception in thread Thread-1 (func1):
Traceback (most recent call last):
File "/usr/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
self.run()
File "/usr/lib/python3.10/threading.py", line 953, in run
self._target(*self._args, **self._kwargs)
File "/root/workspace/writedocs/source/01python/demo.py", line 12, in func1
print("context.value is {}, context.name is {}".format(context.value, context.name))
AttributeError: '_thread._local' object has no attribute 'value'
in func2:
context.value is 2, context.name is func2
in main second: context.value is 0, context.name is global
Note:
当在func1中,子线程没有设置值,所有抛出了异常; 而在func2中,子线程设置了值,但是设置的值并没有影响外面主线程的变量值; 我们可以得出如下结论: threading.local()的属性在每一个线程都是独立,并且都需要独立初始化. 每个线程在修改属性变量时,不会影响其他线程的数据.这样就达到了数据在不同线程独立的目的.
每个线程的数据都是独立的,需要在线程启动的时候初始化,否则在读取的时候就会报错;
1.2 基于ContextVar的上下文数据隔离
1.2.1 要点整理
contextvars.Context(上下文)是 ContextVar 及其当前值的映射容器。
每个线程对应一个 PyThreadState, PyThreadState 中维护当前激活的 Context(current_context)。
asyncio.Task 保存一个独立的 Context, 该 Context 在 Task 创建时通过 copy_context() 捕获。 当 Task 被调度运行时, 其保存的 Context 会被设置为线程当前激活的 Context。
copy_context() 的时间复杂度为 O(1)。
它不会遍历复制所有 ContextVar。
新旧 Context 会共享底层状态结构, 因此复制成本与 ContextVar 数量无关。
当执行 ContextVar.set() 时, 当前 Context 会产生新的状态版本, 修改仅对当前 Context 可见, 不会影响其他共享历史状态的 Context。
context.run(func, *args, **kwargs) 的本质是在当前线程中临时切换 current_context。
执行前保存旧 Context。
将 current_context 设置为指定 Context。
执行 func。
func 退出后恢复原 Context。
run() 不会自动执行 copy_context(), 也不会创建新的 Context, 而是直接在传入的 Context 上运行代码。
同一个 Context 不允许被并发进入。 如果 context_a.run(...) 尚未退出, 另一个线程(或执行流)再次进入同一个 Context, 将抛出 RuntimeError。
1.2.2 验证示例
import asyncio
import contextvars
import threading
import time
import sys
import gc
# 1. 定义一个用于追踪内存死活的测试对象
class MemoryTracker:
def __init__(self, name):
self.name = name
print(f"[创建对象]:{self.name}")
def __del__(self):
print(f"[释放对象]:{self.name}")
ctx_var = contextvars.ContextVar("my_tracker")
def thread_worker(thread_name, obj_name):
print(f"\n[线程 {thread_name}] 启动...")
# 在当前线程的 Context 树中写入变量
tracker = MemoryTracker(obj_name)
ctx_var.set(tracker)
print(f"[线程 {thread_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}")
time.sleep(1.5) # 故意阻塞,等待另一个线程干扰
# 再次读取,验证是否被污染
print(f"[线程 {thread_name}] 阻塞结束后再次读取: {ctx_var.get().name}")
# --- 协程工作函数 ---
async def async_worker(task_name, obj_name):
print(f"\n[协程 {task_name}] 启动...")
# 协程启动时自动隐式触发 copy_context() 软拷贝
tracker = MemoryTracker(obj_name)
ctx_var.set(tracker)
print(f"[协程 {task_name}] 读取当前内存地址: {hex(id(ctx_var.get()))}")
# 遇到 await 挂起,此时事件循环会切走线程的 current_context 指针
await asyncio.sleep(1)
# 唤醒后再次读取,验证指针切回来后数据是否完好
print(f"[协程 {task_name}] 唤醒后再次读取: {ctx_var.get().name}")
def run_isolation_test():
# A. 启动两个线程(物理隔离)
t1 = threading.Thread(target=thread_worker, args=("A", "Thread-Data-A"))
t2 = threading.Thread(target=thread_worker, args=("B", "Thread-Data-B"))
t1.start()
t2.start()
t1.join()
t2.join()
# B. 在单线程内启动两个协程(指针频繁闪现切换)
async def main_async():
await asyncio.gather(
async_worker("1", "Coroutine-Data-1"),
async_worker("2", "Coroutine-Data-2")
)
asyncio.run(main_async())
def run_lifecycle_test():
# 创建一个局部的旧上下文
old_ctx = contextvars.copy_context()
def scope_func():
# 在这个临时环境中存入对象
local_tracker = MemoryTracker("Lifecycle-Target")
ctx_var.set(local_tracker)
print(f" [当前状态] 对象在临时 Context 中的引用计数: {sys.getrefcount(local_tracker) - 1}")
print("\n[步骤 1] 通过 ctx.run 进入临时上下文并赋值:")
old_ctx.run(scope_func)
print("\n[步骤 2] 退出 ctx.run,回到外层上下文。")
print(" 此时线程指针已经切走,原临时 Context 失去强引用。")
print("\n[步骤 3] 手动触发垃圾回收(GC)...")
del old_ctx # 销毁上下文容器
gc.collect() # 强制触发 Python 内存回收
print("\n[测试结束] 如果上面没有打印 [释放对象],说明发生了内存泄漏。")
if __name__ == "__main__":
run_isolation_test()
run_lifecycle_test()