python
异步编程 async/await
By AI-Writer 11 min read
前言
异步编程是现代 Python 高并发编程的核心。asyncio 模块让你可以用单线程处理大量并发 I/O 操作(如网络请求、文件读写),性能远超传统的多线程方案。
协程基础
async/await 语法
python
import asyncio
# 定义协程函数(async def)
async def fetch_data():
return {"id": 1, "name": "Alice"}
# 调用协程不会立即执行,返回一个协程对象
coro = fetch_data()
print(coro) # <coroutine object fetch_data at 0x...>
# 运行协程
result = asyncio.run(coro)
print(result) # {'id': 1, 'name': 'Alice'}await 关键字
await 暂停当前协程,等待另一个协程完成:
python
import asyncio
async def step1():
print("步骤 1 开始")
await asyncio.sleep(1) # 模拟 I/O 操作
print("步骤 1 完成")
return "数据 A"
async def step2():
print("步骤 2 开始")
await asyncio.sleep(0.5)
print("步骤 2 完成")
return "数据 B"
async def main():
# 顺序执行
# result1 = await step1()
# result2 = await step2()
# 并发执行
results = await asyncio.gather(step1(), step2())
print(results) # ['数据 A', '数据 B']
asyncio.run(main())asyncio 事件循环
gather — 并发运行多个协程
python
import asyncio
async def get(url: str):
await asyncio.sleep(0.5) # 模拟网络请求
return f"Response from {url}"
async def main():
urls = ["a.com", "b.com", "c.com"]
# 并发获取所有 URL
results = await asyncio.gather(*[get(url) for url in urls])
for r in results:
print(r)
asyncio.run(main())
# 如果顺序执行:1.5 秒;并发执行:0.5 秒create_task — 并发调度
python
import asyncio
async def worker(name: str, duration: float):
await asyncio.sleep(duration)
return f"{name} 完成"
async def main():
# 创建任务(立即调度,不等待)
task1 = asyncio.create_task(worker("任务A", 1.0))
task2 = asyncio.create_task(worker("任务B", 0.5))
print("任务已创建,等待完成...")
# 等待任务完成
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())wait — 条件等待
python
import asyncio
async def task(name: str, delay: float):
await asyncio.sleep(delay)
return f"{name} 完成"
async def main():
tasks = [
asyncio.create_task(task("快速", 0.3)),
asyncio.create_task(task("慢速", 1.0)),
]
# 等待任意一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"已完成: {[t.result() for t in done]}")
print(f"待完成: {len(pending)} 个")
# 取消待完成的任务
for t in pending:
t.cancel()
asyncio.run(main())异步上下文管理器
python
import asyncio
class AsyncTimer:
"""异步上下文管理器:测量代码执行时间"""
async def __aenter__(self):
import time
self.start = time.perf_counter()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
import time
elapsed = time.perf_counter() - self.start
print(f"执行耗时: {elapsed:.4f}s")
return False # 不压制异常
async def main():
async with AsyncTimer():
await asyncio.sleep(0.5)
asyncio.run(main())@asynccontextmanager
python
import asyncio
from contextlib import asynccontextmanager
@asynccontextmanager
async def async_timer(name: str):
import time
start = time.perf_counter()
print(f"{name} 开始")
yield
elapsed = time.perf_counter() - start
print(f"{name} 耗时: {elapsed:.4f}s")
async def main():
async with async_timer("异步任务"):
await asyncio.sleep(0.3)
asyncio.run(main())asyncio 队列
python
import asyncio
async def producer(queue: asyncio.Queue):
for i in range(5):
await asyncio.sleep(0.1)
await queue.put(i)
print(f"生产: {i}")
await queue.put(None) # 发送结束信号
async def consumer(queue: asyncio.Queue):
while True:
item = await queue.get()
if item is None:
break
print(f"消费: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 并发运行生产者和消费者
await asyncio.gather(
producer(queue),
consumer(queue),
)
asyncio.run(main())aiohttp 异步 HTTP 客户端
需要安装:pip install aiohttp
python
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.text()
async def fetch_all(urls: list):
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
responses = await asyncio.gather(*tasks, return_exceptions=True)
return responses
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
]
import time
start = time.perf_counter()
results = await fetch_all(urls)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f"URL {i} 失败: {r}")
else:
print(f"URL {i} 成功,长度: {len(r)}")
print(f"总耗时: {time.perf_counter() - start:.2f}s")
# 顺序执行:~4s;并发执行:~2s
asyncio.run(main())带超时的请求
python
import asyncio
import aiohttp
async def fetch_with_timeout(url: str, timeout: float = 5.0):
async with aiohttp.ClientSession() as session:
try:
async with session.get(url) as response:
return await response.text()
except asyncio.TimeoutError:
print(f"请求 {url} 超时({timeout}s)")
return None
async def main():
result = await fetch_with_timeout(
"https://httpbin.org/delay/10", # 故意延迟 10 秒
timeout=2.0
)
print("完成" if result is None else "成功")
asyncio.run(main())异步迭代器与生成器
python
import asyncio
# 异步生成器
async def async_range(start, end):
for i in range(start, end):
await asyncio.sleep(0.1)
yield i
async def main():
# 异步迭代
async for i in async_range(0, 5):
print(i)
asyncio.run(main())常见陷阱
在非 async 函数中调用协程
python
# 错误:普通函数不能 await
def bad_main():
result = asyncio.run(fetch_data()) # 虽然可以,但不应该
# 正确:使用 asyncio.run 或在 async 函数中调用
async def good_main():
result = await fetch_data()
asyncio.run(good_main())忘记 await
python
async def main():
coro = fetch_data()
result = coro # ❌ 没有 await,结果是协程对象而非返回值
result = await coro # ✅ 正确小结
async def定义协程函数,await等待协程完成asyncio.run()是运行顶级协程的入口asyncio.gather()并发运行多个协程,显著提升 I/O 效率asyncio.create_task()调度协程为任务并行执行- 异步上下文管理器使用
__aenter__/__aexit__ aiohttp是异步 HTTP 客户端,适合高并发爬虫和 API 调用asyncio只适合 I/O 密集型任务,CPU 密集型仍需multiprocessing
#python
#异步编程
#async
#await
#asyncio
评论
A
Written by
AI-Writer