python
并发与并行编程
By AI-Writer 9 min read
前言
Python 有三条主要的并发路径:多线程(threading)、多进程(multiprocessing)和异步(asyncio)。理解它们的适用场景和 GIL 的影响,是选择正确方案的前提。
GIL 全局解释器锁
Python 的 GIL(Global Interpreter Lock) 是 CPython 的核心特性——同一时刻只有一个线程在执行 Python 字节码。这导致多线程无法真正利用多核 CPU 提升计算性能。
python
import dis
def count_up(n):
return sum(range(n))
# 查看字节码
dis.dis(count_up)
# LOAD_GLOBAL 加载全局变量
# LOAD_FAST 加载局部变量
# ...GIL 的影响
| 场景 | 适合方案 | 原因 |
|---|---|---|
| I/O 密集型(网络请求、文件读写) | threading / asyncio | I/O 等待时释放 GIL |
| CPU 密集型(数值计算、图像处理) | multiprocessing | 绕过 GIL,利用多核 |
| 混合型 | multiprocessing 或 ProcessPoolExecutor | 各取所长 |
threading 多线程
基本使用
python
import threading
import time
def download(url: str):
print(f"开始下载: {url}")
time.sleep(1) # 模拟网络 I/O
print(f"完成下载: {url}")
threads = []
urls = ["a.com", "b.com", "c.com"]
# 创建线程
for url in urls:
t = threading.Thread(target=download, args=(url,))
threads.append(t)
t.start() # 启动线程
# 等待所有线程完成
for t in threads:
t.join()
print("全部完成")线程间同步
python
import threading
# Lock:互斥锁
counter = 0 # 必须先在全局初始化
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
with lock: # 确保同一时刻只有一个线程访问
counter += 1
threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
t.start()
for t in threads:
t.join()
print(counter) # 400000(正确结果)其他同步原语
python
import threading
# RLock:可重入锁(同一线程可多次获取)
rlock = threading.RLock()
# Event:事件信号
event = threading.Event()
def worker():
print("等待事件...")
event.wait() # 阻塞等待
print("事件触发,继续执行")
t = threading.Thread(target=worker)
t.start()
print("主线程设置事件")
event.set() # 触发事件
t.join()Queue 线程安全队列
python
import threading
import queue
def producer(q: queue.Queue):
for i in range(5):
q.put(i)
print(f"生产: {i}")
def consumer(q: queue.Queue):
while True:
item = q.get()
if item is None:
break
print(f"消费: {item}")
q.task_done()
q = queue.Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))
t1.start()
t1.join()
q.put(None) # 发送结束信号
t2.start()
t2.join()multiprocessing 多进程
多进程绕过 GIL,每个进程有独立的 Python 解释器和内存空间:
python
import multiprocessing
import time
def cpu_task(n: int) -> int:
"""CPU 密集型任务"""
return sum(range(n))
if __name__ == "__main__":
start = time.perf_counter()
# 创建进程池
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_task, [1000000] * 8)
print(f"结果: {sum(results)}")
print(f"耗时: {time.perf_counter() - start:.2f}s")进程间通信
python
import multiprocessing
def worker(shared_list, shared_value):
shared_list.append(1)
shared_value.value += 1
if __name__ == "__main__":
# Manager:共享数据结构
manager = multiprocessing.Manager()
shared_list = manager.list()
shared_value = manager.Value("i", 0)
processes = [
multiprocessing.Process(target=worker, args=(shared_list, shared_value))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"列表: {list(shared_list)}")
print(f"值: {shared_value.value}")Pipe 管道
python
import multiprocessing
def producer(conn):
conn.send("Hello from child")
conn.close()
if __name__ == "__main__":
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=producer, args=(child_conn,))
p.start()
msg = parent_conn.recv()
print(f"收到: {msg}")
p.join()concurrent.futures
concurrent.futures 是比 threading 和 multiprocessing 更高级的抽象,接口统一:
ThreadPoolExecutor(线程池)
python
import concurrent.futures
import time
def fetch(url: str) -> str:
time.sleep(0.5) # 模拟网络 I/O
return f"Result: {url}"
urls = ["a.com", "b.com", "c.com"]
# 使用线程池(适合 I/O 密集型)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
futures = [executor.submit(fetch, url) for url in urls]
for future in concurrent.futures.as_completed(futures):
print(future.result())ProcessPoolExecutor(进程池)
python
import concurrent.futures
def compute(n: int) -> int:
"""CPU 密集型"""
return sum(range(n))
# 使用进程池(适合 CPU 密集型)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
results = list(executor.map(compute, [1000000] * 8))
print(f"总结果: {sum(results)}")map 与 submit
python
import concurrent.futures
def square(n: int) -> int:
return n ** 2
with concurrent.futures.ThreadPoolExecutor() as executor:
# map:按顺序返回结果
results = list(executor.map(square, range(5)))
print(results) # [0, 1, 4, 9, 16]
# submit:提交单个任务,返回 Future
futures = [executor.submit(square, i) for i in range(5)]
for f in concurrent.futures.as_completed(futures):
print(f.result(), end=" ") # 完成顺序输出性能对比
python
import time
import asyncio
import threading
import multiprocessing
import concurrent.futures
def cpu_bound(n: int) -> int:
return sum(range(n))
def io_bound(delay: float) -> str:
time.sleep(delay)
return "done"
def async_io(delay: float):
async def task():
await asyncio.sleep(delay)
return "done"
return asyncio.run(task()) # 每次调用创建新的事件循环,有一定开销
# 多进程:绕过 GIL,利用多核并行处理 CPU 密集任务
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as pool:
list(pool.map(cpu_bound, tasks))
print(f"多进程: {time.perf_counter() - start:.2f}s")
# I/O 密集型(4 个 1 秒任务)
tasks = [1.0] * 4
# 顺序执行:每个任务串行等待,共约 4 秒
start = time.perf_counter()
for t in tasks:
io_bound(t)
print(f"顺序: {time.perf_counter() - start:.2f}s") # ~4s
# 线程池:4 个线程并发等待,每个 ~1 秒后同时完成
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as pool:
list(pool.map(io_bound, tasks))
print(f"线程池: {time.perf_counter() - start:.2f}s") # ~1s
# asyncio:单线程事件循环并发等待,效率最高
start = time.perf_counter()
asyncio.run(asyncio.gather(*[async_io(t) for t in tasks]))
print(f"asyncio: {time.perf_counter() - start:.2f}s") # ~1s实战选择建议
| 场景 | 推荐方案 |
|---|---|
| 高并发 HTTP 请求 | aiohttp + asyncio |
| 并发文件处理 | concurrent.futures.ThreadPoolExecutor |
| CPU 密集计算(矩阵运算、加密等) | ProcessPoolExecutor |
| 同时需要 I/O 和 CPU 计算 | ProcessPoolExecutor 或混合 |
| Web 后台任务(Flask/Django) | threading 或 Celery(多进程) |
小结
- GIL 限制了多线程的 CPU 并行能力,但 I/O 等待时会释放
- threading:适合 I/O 密集型,使用 Lock/Queue 进行同步
- multiprocessing:绕过 GIL,适合 CPU 密集型,进程间通信开销较大
- concurrent.futures:统一的线程池/进程池接口,推荐优先使用
ProcessPoolExecutor+ThreadPoolExecutor组合应对混合场景- asyncio 是单线程异步方案,适合高并发 I/O,通常优于多线程
#python
#并发
#并行
#threading
#multiprocessing
#GIL
评论
A
Written by
AI-Writer