python

异步编程 async/await

By AI-Writer 11 min read

前言

异步编程是现代 Python 高并发编程的核心。asyncio 模块让你可以用单线程处理大量并发 I/O 操作(如网络请求、文件读写),性能远超传统的多线程方案。

协程基础

async/await 语法

python
import asyncio

# 定义协程函数(async def)
async def fetch_data():
    return {"id": 1, "name": "Alice"}

# 调用协程不会立即执行,返回一个协程对象
coro = fetch_data()
print(coro)  # <coroutine object fetch_data at 0x...>

# 运行协程
result = asyncio.run(coro)
print(result)  # {'id': 1, 'name': 'Alice'}

await 关键字

await 暂停当前协程,等待另一个协程完成:

python
import asyncio

async def step1():
    print("步骤 1 开始")
    await asyncio.sleep(1)  # 模拟 I/O 操作
    print("步骤 1 完成")
    return "数据 A"

async def step2():
    print("步骤 2 开始")
    await asyncio.sleep(0.5)
    print("步骤 2 完成")
    return "数据 B"

async def main():
    # 顺序执行
    # result1 = await step1()
    # result2 = await step2()

    # 并发执行
    results = await asyncio.gather(step1(), step2())
    print(results)  # ['数据 A', '数据 B']

asyncio.run(main())

asyncio 事件循环

gather — 并发运行多个协程

python
import asyncio

async def get(url: str):
    await asyncio.sleep(0.5)  # 模拟网络请求
    return f"Response from {url}"

async def main():
    urls = ["a.com", "b.com", "c.com"]
    # 并发获取所有 URL
    results = await asyncio.gather(*[get(url) for url in urls])
    for r in results:
        print(r)

asyncio.run(main())
# 如果顺序执行:1.5 秒;并发执行:0.5 秒

create_task — 并发调度

python
import asyncio

async def worker(name: str, duration: float):
    await asyncio.sleep(duration)
    return f"{name} 完成"

async def main():
    # 创建任务(立即调度,不等待)
    task1 = asyncio.create_task(worker("任务A", 1.0))
    task2 = asyncio.create_task(worker("任务B", 0.5))

    print("任务已创建,等待完成...")

    # 等待任务完成
    result1 = await task1
    result2 = await task2
    print(result1, result2)

asyncio.run(main())

wait — 条件等待

python
import asyncio

async def task(name: str, delay: float):
    await asyncio.sleep(delay)
    return f"{name} 完成"

async def main():
    tasks = [
        asyncio.create_task(task("快速", 0.3)),
        asyncio.create_task(task("慢速", 1.0)),
    ]

    # 等待任意一个完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"已完成: {[t.result() for t in done]}")
    print(f"待完成: {len(pending)} 个")

    # 取消待完成的任务
    for t in pending:
        t.cancel()

asyncio.run(main())

异步上下文管理器

python
import asyncio

class AsyncTimer:
    """异步上下文管理器:测量代码执行时间"""

    async def __aenter__(self):
        import time
        self.start = time.perf_counter()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        import time
        elapsed = time.perf_counter() - self.start
        print(f"执行耗时: {elapsed:.4f}s")
        return False  # 不压制异常


async def main():
    async with AsyncTimer():
        await asyncio.sleep(0.5)

asyncio.run(main())

@asynccontextmanager

python
import asyncio
from contextlib import asynccontextmanager

@asynccontextmanager
async def async_timer(name: str):
    import time
    start = time.perf_counter()
    print(f"{name} 开始")
    yield
    elapsed = time.perf_counter() - start
    print(f"{name} 耗时: {elapsed:.4f}s")


async def main():
    async with async_timer("异步任务"):
        await asyncio.sleep(0.3)

asyncio.run(main())

asyncio 队列

python
import asyncio

async def producer(queue: asyncio.Queue):
    for i in range(5):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f"生产: {i}")
    await queue.put(None)  # 发送结束信号


async def consumer(queue: asyncio.Queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"消费: {item}")
        await asyncio.sleep(0.2)
        queue.task_done()


async def main():
    queue = asyncio.Queue()

    # 并发运行生产者和消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue),
    )

asyncio.run(main())

aiohttp 异步 HTTP 客户端

需要安装:pip install aiohttp

python
import asyncio
import aiohttp

async def fetch(session: aiohttp.ClientSession, url: str):
    async with session.get(url) as response:
        return await response.text()


async def fetch_all(urls: list):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        responses = await asyncio.gather(*tasks, return_exceptions=True)
        return responses


async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
    ]

    import time
    start = time.perf_counter()

    results = await fetch_all(urls)
    for i, r in enumerate(results):
        if isinstance(r, Exception):
            print(f"URL {i} 失败: {r}")
        else:
            print(f"URL {i} 成功,长度: {len(r)}")

    print(f"总耗时: {time.perf_counter() - start:.2f}s")
    # 顺序执行:~4s;并发执行:~2s


asyncio.run(main())

带超时的请求

python
import asyncio
import aiohttp

async def fetch_with_timeout(url: str, timeout: float = 5.0):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                return await response.text()
        except asyncio.TimeoutError:
            print(f"请求 {url} 超时({timeout}s)")
            return None


async def main():
    result = await fetch_with_timeout(
        "https://httpbin.org/delay/10",  # 故意延迟 10 秒
        timeout=2.0
    )
    print("完成" if result is None else "成功")

asyncio.run(main())

异步迭代器与生成器

python
import asyncio

# 异步生成器
async def async_range(start, end):
    for i in range(start, end):
        await asyncio.sleep(0.1)
        yield i

async def main():
    # 异步迭代
    async for i in async_range(0, 5):
        print(i)

asyncio.run(main())

常见陷阱

在非 async 函数中调用协程

python
# 错误:普通函数不能 await
def bad_main():
    result = asyncio.run(fetch_data())  # 虽然可以,但不应该

# 正确:使用 asyncio.run 或在 async 函数中调用
async def good_main():
    result = await fetch_data()

asyncio.run(good_main())

忘记 await

python
async def main():
    coro = fetch_data()
    result = coro  # ❌ 没有 await,结果是协程对象而非返回值
    result = await coro  # ✅ 正确

小结

  • async def 定义协程函数,await 等待协程完成
  • asyncio.run() 是运行顶级协程的入口
  • asyncio.gather() 并发运行多个协程,显著提升 I/O 效率
  • asyncio.create_task() 调度协程为任务并行执行
  • 异步上下文管理器使用 __aenter__ / __aexit__
  • aiohttp 是异步 HTTP 客户端,适合高并发爬虫和 API 调用
  • asyncio 只适合 I/O 密集型任务,CPU 密集型仍需 multiprocessing
#python #异步编程 #async #await #asyncio

评论

A

Written by

AI-Writer

Related Articles

python
#2

数据类型与运算符

深入理解 Python 内置数据类型(整数、浮点数、布尔值、复数)以及各类运算符的使用方法和注意事项

Read More
python
#5

函数定义与参数传递

系统掌握 Python 函数的定义方式、参数类型(默认参数、可变参数、关键字参数)、lambda 表达式以及作用域规则

Read More
python
#3

流程控制语句

详解 Python 的条件分支、循环语句以及 break、continue、pass 的使用场景,帮你掌握程序流程控制的精髓

Read More