python

并发与并行编程

By AI-Writer 9 min read

前言

Python 有三条主要的并发路径:多线程(threading)、多进程(multiprocessing)和异步(asyncio)。理解它们的适用场景和 GIL 的影响,是选择正确方案的前提。

GIL 全局解释器锁

Python 的 GIL(Global Interpreter Lock) 是 CPython 的核心特性——同一时刻只有一个线程在执行 Python 字节码。这导致多线程无法真正利用多核 CPU 提升计算性能

python
import dis

def count_up(n):
    return sum(range(n))

# 查看字节码
dis.dis(count_up)
# LOAD_GLOBAL 加载全局变量
# LOAD_FAST   加载局部变量
# ...

GIL 的影响

场景适合方案原因
I/O 密集型(网络请求、文件读写)threading / asyncioI/O 等待时释放 GIL
CPU 密集型(数值计算、图像处理)multiprocessing绕过 GIL,利用多核
混合型multiprocessing 或 ProcessPoolExecutor各取所长

threading 多线程

基本使用

python
import threading
import time

def download(url: str):
    print(f"开始下载: {url}")
    time.sleep(1)  # 模拟网络 I/O
    print(f"完成下载: {url}")

threads = []
urls = ["a.com", "b.com", "c.com"]

# 创建线程
for url in urls:
    t = threading.Thread(target=download, args=(url,))
    threads.append(t)
    t.start()  # 启动线程

# 等待所有线程完成
for t in threads:
    t.join()

print("全部完成")

线程间同步

python
import threading

# Lock:互斥锁
counter = 0  # 必须先在全局初始化
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:  # 确保同一时刻只有一个线程访问
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(4)]
for t in threads:
    t.start()
for t in threads:
    t.join()

print(counter)  # 400000(正确结果)

其他同步原语

python
import threading

# RLock:可重入锁(同一线程可多次获取)
rlock = threading.RLock()

# Event:事件信号
event = threading.Event()

def worker():
    print("等待事件...")
    event.wait()  # 阻塞等待
    print("事件触发,继续执行")

t = threading.Thread(target=worker)
t.start()

print("主线程设置事件")
event.set()  # 触发事件
t.join()

Queue 线程安全队列

python
import threading
import queue

def producer(q: queue.Queue):
    for i in range(5):
        q.put(i)
        print(f"生产: {i}")

def consumer(q: queue.Queue):
    while True:
        item = q.get()
        if item is None:
            break
        print(f"消费: {item}")
        q.task_done()

q = queue.Queue()
t1 = threading.Thread(target=producer, args=(q,))
t2 = threading.Thread(target=consumer, args=(q,))

t1.start()
t1.join()
q.put(None)  # 发送结束信号
t2.start()
t2.join()

multiprocessing 多进程

多进程绕过 GIL,每个进程有独立的 Python 解释器和内存空间:

python
import multiprocessing
import time

def cpu_task(n: int) -> int:
    """CPU 密集型任务"""
    return sum(range(n))

if __name__ == "__main__":
    start = time.perf_counter()

    # 创建进程池
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(cpu_task, [1000000] * 8)

    print(f"结果: {sum(results)}")
    print(f"耗时: {time.perf_counter() - start:.2f}s")

进程间通信

python
import multiprocessing

def worker(shared_list, shared_value):
    shared_list.append(1)
    shared_value.value += 1

if __name__ == "__main__":
    # Manager:共享数据结构
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    shared_value = manager.Value("i", 0)

    processes = [
        multiprocessing.Process(target=worker, args=(shared_list, shared_value))
        for _ in range(4)
    ]

    for p in processes:
        p.start()
    for p in processes:
        p.join()

    print(f"列表: {list(shared_list)}")
    print(f"值: {shared_value.value}")

Pipe 管道

python
import multiprocessing

def producer(conn):
    conn.send("Hello from child")
    conn.close()

if __name__ == "__main__":
    parent_conn, child_conn = multiprocessing.Pipe()

    p = multiprocessing.Process(target=producer, args=(child_conn,))
    p.start()

    msg = parent_conn.recv()
    print(f"收到: {msg}")

    p.join()

concurrent.futures

concurrent.futures 是比 threadingmultiprocessing 更高级的抽象,接口统一:

ThreadPoolExecutor(线程池)

python
import concurrent.futures
import time

def fetch(url: str) -> str:
    time.sleep(0.5)  # 模拟网络 I/O
    return f"Result: {url}"

urls = ["a.com", "b.com", "c.com"]

# 使用线程池(适合 I/O 密集型)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
    futures = [executor.submit(fetch, url) for url in urls]

    for future in concurrent.futures.as_completed(futures):
        print(future.result())

ProcessPoolExecutor(进程池)

python
import concurrent.futures

def compute(n: int) -> int:
    """CPU 密集型"""
    return sum(range(n))

# 使用进程池(适合 CPU 密集型)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
    results = list(executor.map(compute, [1000000] * 8))
    print(f"总结果: {sum(results)}")

map 与 submit

python
import concurrent.futures

def square(n: int) -> int:
    return n ** 2

with concurrent.futures.ThreadPoolExecutor() as executor:
    # map:按顺序返回结果
    results = list(executor.map(square, range(5)))
    print(results)  # [0, 1, 4, 9, 16]

    # submit:提交单个任务,返回 Future
    futures = [executor.submit(square, i) for i in range(5)]
    for f in concurrent.futures.as_completed(futures):
        print(f.result(), end=" ")  # 完成顺序输出

性能对比

python
import time
import asyncio
import threading
import multiprocessing
import concurrent.futures

def cpu_bound(n: int) -> int:
    return sum(range(n))

def io_bound(delay: float) -> str:
    time.sleep(delay)
    return "done"

def async_io(delay: float):
    async def task():
        await asyncio.sleep(delay)
        return "done"
    return asyncio.run(task())  # 每次调用创建新的事件循环,有一定开销

# 多进程:绕过 GIL,利用多核并行处理 CPU 密集任务
start = time.perf_counter()
with concurrent.futures.ProcessPoolExecutor() as pool:
    list(pool.map(cpu_bound, tasks))
print(f"多进程: {time.perf_counter() - start:.2f}s")

# I/O 密集型(4 个 1 秒任务)
tasks = [1.0] * 4

# 顺序执行:每个任务串行等待,共约 4 秒
start = time.perf_counter()
for t in tasks:
    io_bound(t)
print(f"顺序: {time.perf_counter() - start:.2f}s")  # ~4s

# 线程池:4 个线程并发等待,每个 ~1 秒后同时完成
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor() as pool:
    list(pool.map(io_bound, tasks))
print(f"线程池: {time.perf_counter() - start:.2f}s")  # ~1s

# asyncio:单线程事件循环并发等待,效率最高
start = time.perf_counter()
asyncio.run(asyncio.gather(*[async_io(t) for t in tasks]))
print(f"asyncio: {time.perf_counter() - start:.2f}s")  # ~1s

实战选择建议

场景推荐方案
高并发 HTTP 请求aiohttp + asyncio
并发文件处理concurrent.futures.ThreadPoolExecutor
CPU 密集计算(矩阵运算、加密等)ProcessPoolExecutor
同时需要 I/O 和 CPU 计算ProcessPoolExecutor 或混合
Web 后台任务(Flask/Django)threading 或 Celery(多进程)

小结

  • GIL 限制了多线程的 CPU 并行能力,但 I/O 等待时会释放
  • threading:适合 I/O 密集型,使用 Lock/Queue 进行同步
  • multiprocessing:绕过 GIL,适合 CPU 密集型,进程间通信开销较大
  • concurrent.futures:统一的线程池/进程池接口,推荐优先使用
  • ProcessPoolExecutor + ThreadPoolExecutor 组合应对混合场景
  • asyncio 是单线程异步方案,适合高并发 I/O,通常优于多线程
#python #并发 #并行 #threading #multiprocessing #GIL

评论

A

Written by

AI-Writer

Related Articles

python
#11

装饰器与元编程

深入掌握 Python 函数装饰器、类装饰器、带参数的装饰器、functools.wraps,以及类元编程与 __new__ 的高级用法

Read More
python
#7

面向对象编程

深入理解 Python 的类与实例、继承与多态、MRO、特殊方法、属性装饰器与描述符协议,成为面向对象编程的高手

Read More