fastapi
异步基础与并发模型
By AI-Writer 18 min read
同步 vs 异步:为什么重要
FastAPI 基于 Starlette 构建,而 Starlette 基于 ASGI(Asynchronous Server Gateway Interface)。理解异步模型,才能真正发挥 FastAPI 的并发能力。
同步模型:请求线程在 I/O 等待(如数据库查询、HTTP 调用)期间被阻塞,同一线程同时只能处理一个请求。
异步模型:I/O 等待期间,事件循环可以切换到其他任务处理,同一线程能并发处理大量请求。
python
# 同步:3 个请求顺序执行,总耗时 = sum(每个请求时间)
def sync_get_users(ids):
users = []
for id in ids:
users.append(requests.get(f"/users/{id}")) # 阻塞
return users
# 异步:3 个请求并发执行,总耗时 ≈ max(单个请求时间)
async def async_get_users(ids):
tasks = [async_get(f"/users/{id}") for id in ids]
users = await asyncio.gather(*tasks)
return usersasync/await 基础
定义与调用
python
import asyncio
# 定义异步函数
async def fetch_data(url: str) -> dict:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.json()
# 调用异步函数(两种方式)
result = asyncio.run(fetch_data("https://api.example.com/data")) # 顶层
# 或在已有 async 函数中
async def main():
result = await fetch_data("https://api.example.com/data")异步上下文管理器
python
class AsyncDatabasePool:
async def __aenter__(self):
self.conn = await create_connection()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
async def process():
async with AsyncDatabasePool() as conn:
result = await conn.query("SELECT * FROM users")
# __aexit__ 自动调用,conn.close() 执行asyncio.gather 与并发等待
python
async def fetch_article(article_id: int) -> dict:
await asyncio.sleep(0.1) # 模拟 I/O
return {"id": article_id, "title": f"Article {article_id}"}
async def fetch_multiple_articles(ids: list[int]) -> list[dict]:
# 并发执行所有任务
tasks = [fetch_article(i) for i in ids]
results = await asyncio.gather(*tasks)
return list(results
# 在 FastAPI 路径函数中使用
@app.get("/articles/batch/{ids}")
async def batch_articles(ids: str): # ids: "1,2,3,4,5"
article_ids = [int(x) for x in ids.split(",")]
articles = await fetch_multiple_articles(article_ids)
return articles后台任务
BackgroundTasks 组件
FastAPI 内置了 BackgroundTasks,适合轻量级后台处理:
python
from fastapi import BackgroundTasks
def send_email(email: str, content: str):
# 实际项目中使用 aiosmtplib
print(f"Sending email to {email}: {content}")
@router.post("/contact")
async def submit_contact(
email: str,
message: str,
background_tasks: BackgroundTasks
):
# 立即返回,后台发送邮件
background_tasks.add_task(send_email, email, f"Thank you for your message: {message}")
return {"message": "Contact submitted, we'll respond soon"}注意:BackgroundTasks 在响应发送前执行,不保证请求完成后仍能运行。适合邮件通知、简单日志等场景。
asyncio.create_task(推荐)
对于需要「fire-and-forget」且与请求生命周期解绑的任务,使用 asyncio.create_task:
python
from asyncio import create_task
@router.post("/reports/generate")
async def generate_report(
report_type: str,
current_user: User = Depends(get_current_user)
):
# 立即返回,报告在后台生成
create_task(
run_heavy_report_generation(report_type, current_user.id)
)
return {"message": "Report generation started", "status": "processing"}
async def run_heavy_report_generation(report_type: str, user_id: int):
"""长时间运行的后台报告生成"""
try:
await asyncio.sleep(5) # 模拟耗时操作
await save_report_to_storage(report_type, user_id)
except Exception as e:
await notify_user_error(user_id, str(e))StreamingResponse:流式响应
当数据量很大或需要逐步返回时,使用 StreamingResponse,避免内存爆炸:
python
import csv
from io import StringIO
async def generate_large_csv():
buffer = StringIO()
writer = csv.writer(buffer)
writer.writerow(["id", "title", "content", "created_at"])
# 假设有 100 万条数据
async for row in paginate_large_dataset(batch_size=1000):
writer.writerow(row)
buffer.seek(0)
yield buffer.read()
buffer.truncate(0)
yield buffer.read() # 最后一批
@router.get("/exports/articles.csv")
async def export_articles_csv():
return StreamingResponse(
generate_large_csv(),
media_type="text/csv",
headers={"Content-Disposition": "attachment; filename=articles.csv"}
)Server-Sent Events(SSE)
SSE 允许服务器单向推送数据到客户端,适合实时仪表板、进度更新等场景:
python
from fastapi.responses import StreamingResponse
import asyncio
import json
async def event_generator():
"""模拟实时数据推送"""
for i in range(10):
# SSE 格式:data: {json}\n\n
data = json.dumps({
"timestamp": datetime.now(timezone.utc).isoformat(),
"value": round(50 + 50 * math.sin(i * 0.628), 2),
"status": "active"
})
yield f"data: {data}\n\n"
await asyncio.sleep(1) # 每秒推送一次
@router.get("/stream/metrics")
async def stream_metrics():
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # 禁用 Nginx 缓冲
}
)前端消费 SSE
javascript
const eventSource = new EventSource("/stream/metrics");
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("Received:", data);
updateDashboard(data);
};
eventSource.onerror = () => {
console.error("SSE connection lost");
eventSource.close();
};
// 主动关闭
// eventSource.close();async 与 sync 函数混合
FastAPI 会自动处理混用场景:
python
# 同步函数:FastAPI 在线程池中运行(不阻塞事件循环)
@app.get("/sync-endpoint")
def sync_heavy_task():
result = sum(range(10_000_000)) # CPU 密集型
return {"result": result}
# 异步函数:直接由事件循环处理
@app.get("/async-endpoint")
async def async_heavy_task():
result = await asyncio.sleep(1) # I/O 密集型
return {"result": "done"}最佳实践:只有涉及真正异步 I/O(数据库、网络请求)时才使用 async def,否则同步函数在 FastAPI 中反而有轻微的性能优势(无需协程调度开销)。
并发控制
信号量控制并发数
python
from asyncio import Semaphore
semaphore = Semaphore(5) # 最多 5 个并发
async def limited_task(url: str):
async with semaphore:
return await fetch_data(url)
@router.get("/batch-scrape")
async def batch_scrape(urls: list[str]):
tasks = [limited_task(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return {"completed": len(results)}超时控制
python
import asyncio
from asyncio import timeout
async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0):
try:
async with timeout(timeout_seconds):
return await fetch_data(url)
except asyncio.TimeoutError:
return {"error": "Request timed out"}小结
FastAPI 的异步能力是其高性能的核心:
- I/O 密集型任务(数据库、网络请求)→ 使用
async/await,充分利用事件循环 - CPU 密集型任务 → 使用同步函数或
run_in_executor,避免阻塞事件循环 - 轻量后台任务 →
BackgroundTasks(请求生命周期内) - 解绑后台任务 →
asyncio.create_task(独立于请求) - 大数据/实时推送 →
StreamingResponse/ SSE - 并发控制 →
Semaphore+asyncio.gather - 超时管理 →
asyncio.timeout(Python 3.12+,旧版用asyncio.wait_for)
#fastapi
#async
#python
#concurrency
#sse
评论
A
Written by
AI-Writer
Related Articles
fastapi
#7 认证与授权:JWT 与 OAuth2
深入理解 FastAPI 中的 OAuth2 密码流、JWT Token 生成与验证,通过依赖注入实现安全的 API 鉴权机制
Read More