fastapi

异步基础与并发模型

By AI-Writer 18 min read

同步 vs 异步:为什么重要

FastAPI 基于 Starlette 构建,而 Starlette 基于 ASGI(Asynchronous Server Gateway Interface)。理解异步模型,才能真正发挥 FastAPI 的并发能力。

同步模型:请求线程在 I/O 等待(如数据库查询、HTTP 调用)期间被阻塞,同一线程同时只能处理一个请求。

异步模型:I/O 等待期间,事件循环可以切换到其他任务处理,同一线程能并发处理大量请求。

python
# 同步:3 个请求顺序执行,总耗时 = sum(每个请求时间)
def sync_get_users(ids):
    users = []
    for id in ids:
        users.append(requests.get(f"/users/{id}"))  # 阻塞
    return users

# 异步:3 个请求并发执行,总耗时 ≈ max(单个请求时间)
async def async_get_users(ids):
    tasks = [async_get(f"/users/{id}") for id in ids]
    users = await asyncio.gather(*tasks)
    return users

async/await 基础

定义与调用

python
import asyncio

# 定义异步函数
async def fetch_data(url: str) -> dict:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()

# 调用异步函数(两种方式)
result = asyncio.run(fetch_data("https://api.example.com/data"))  # 顶层
# 或在已有 async 函数中
async def main():
    result = await fetch_data("https://api.example.com/data")

异步上下文管理器

python
class AsyncDatabasePool:
    async def __aenter__(self):
        self.conn = await create_connection()
        return self.conn

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.conn.close()

async def process():
    async with AsyncDatabasePool() as conn:
        result = await conn.query("SELECT * FROM users")
    # __aexit__ 自动调用,conn.close() 执行

asyncio.gather 与并发等待

python
async def fetch_article(article_id: int) -> dict:
    await asyncio.sleep(0.1)  # 模拟 I/O
    return {"id": article_id, "title": f"Article {article_id}"}

async def fetch_multiple_articles(ids: list[int]) -> list[dict]:
    # 并发执行所有任务
    tasks = [fetch_article(i) for i in ids]
    results = await asyncio.gather(*tasks)
    return list(results

# 在 FastAPI 路径函数中使用
@app.get("/articles/batch/{ids}")
async def batch_articles(ids: str):  # ids: "1,2,3,4,5"
    article_ids = [int(x) for x in ids.split(",")]
    articles = await fetch_multiple_articles(article_ids)
    return articles

后台任务

BackgroundTasks 组件

FastAPI 内置了 BackgroundTasks,适合轻量级后台处理:

python
from fastapi import BackgroundTasks

def send_email(email: str, content: str):
    # 实际项目中使用 aiosmtplib
    print(f"Sending email to {email}: {content}")

@router.post("/contact")
async def submit_contact(
    email: str,
    message: str,
    background_tasks: BackgroundTasks
):
    # 立即返回,后台发送邮件
    background_tasks.add_task(send_email, email, f"Thank you for your message: {message}")
    return {"message": "Contact submitted, we'll respond soon"}

注意BackgroundTasks 在响应发送前执行,不保证请求完成后仍能运行。适合邮件通知、简单日志等场景。

asyncio.create_task(推荐)

对于需要「fire-and-forget」且与请求生命周期解绑的任务,使用 asyncio.create_task

python
from asyncio import create_task

@router.post("/reports/generate")
async def generate_report(
    report_type: str,
    current_user: User = Depends(get_current_user)
):
    # 立即返回,报告在后台生成
    create_task(
        run_heavy_report_generation(report_type, current_user.id)
    )
    return {"message": "Report generation started", "status": "processing"}

async def run_heavy_report_generation(report_type: str, user_id: int):
    """长时间运行的后台报告生成"""
    try:
        await asyncio.sleep(5)  # 模拟耗时操作
        await save_report_to_storage(report_type, user_id)
    except Exception as e:
        await notify_user_error(user_id, str(e))

StreamingResponse:流式响应

当数据量很大或需要逐步返回时,使用 StreamingResponse,避免内存爆炸:

python
import csv
from io import StringIO

async def generate_large_csv():
    buffer = StringIO()
    writer = csv.writer(buffer)
    writer.writerow(["id", "title", "content", "created_at"])

    # 假设有 100 万条数据
    async for row in paginate_large_dataset(batch_size=1000):
        writer.writerow(row)
        buffer.seek(0)
        yield buffer.read()
        buffer.truncate(0)

    yield buffer.read()  # 最后一批

@router.get("/exports/articles.csv")
async def export_articles_csv():
    return StreamingResponse(
        generate_large_csv(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=articles.csv"}
    )

Server-Sent Events(SSE)

SSE 允许服务器单向推送数据到客户端,适合实时仪表板、进度更新等场景:

python
from fastapi.responses import StreamingResponse
import asyncio
import json

async def event_generator():
    """模拟实时数据推送"""
    for i in range(10):
        # SSE 格式:data: {json}\n\n
        data = json.dumps({
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "value": round(50 + 50 * math.sin(i * 0.628), 2),
            "status": "active"
        })
        yield f"data: {data}\n\n"
        await asyncio.sleep(1)  # 每秒推送一次

@router.get("/stream/metrics")
async def stream_metrics():
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no"  # 禁用 Nginx 缓冲
        }
    )

前端消费 SSE

javascript
const eventSource = new EventSource("/stream/metrics");

eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    console.log("Received:", data);
    updateDashboard(data);
};

eventSource.onerror = () => {
    console.error("SSE connection lost");
    eventSource.close();
};

// 主动关闭
// eventSource.close();

async 与 sync 函数混合

FastAPI 会自动处理混用场景:

python
# 同步函数:FastAPI 在线程池中运行(不阻塞事件循环)
@app.get("/sync-endpoint")
def sync_heavy_task():
    result = sum(range(10_000_000))  # CPU 密集型
    return {"result": result}

# 异步函数:直接由事件循环处理
@app.get("/async-endpoint")
async def async_heavy_task():
    result = await asyncio.sleep(1)  # I/O 密集型
    return {"result": "done"}

最佳实践:只有涉及真正异步 I/O(数据库、网络请求)时才使用 async def,否则同步函数在 FastAPI 中反而有轻微的性能优势(无需协程调度开销)。

并发控制

信号量控制并发数

python
from asyncio import Semaphore

semaphore = Semaphore(5)  # 最多 5 个并发

async def limited_task(url: str):
    async with semaphore:
        return await fetch_data(url)

@router.get("/batch-scrape")
async def batch_scrape(urls: list[str]):
    tasks = [limited_task(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return {"completed": len(results)}

超时控制

python
import asyncio
from asyncio import timeout

async def fetch_with_timeout(url: str, timeout_seconds: float = 5.0):
    try:
        async with timeout(timeout_seconds):
            return await fetch_data(url)
    except asyncio.TimeoutError:
        return {"error": "Request timed out"}

小结

FastAPI 的异步能力是其高性能的核心:

  • I/O 密集型任务(数据库、网络请求)→ 使用 async/await,充分利用事件循环
  • CPU 密集型任务 → 使用同步函数或 run_in_executor,避免阻塞事件循环
  • 轻量后台任务BackgroundTasks(请求生命周期内)
  • 解绑后台任务asyncio.create_task(独立于请求)
  • 大数据/实时推送StreamingResponse / SSE
  • 并发控制Semaphore + asyncio.gather
  • 超时管理asyncio.timeout(Python 3.12+,旧版用 asyncio.wait_for
#fastapi #async #python #concurrency #sse

评论

A

Written by

AI-Writer

Related Articles

fastapi
#4

响应模型与数据转换

使用 response_model 控制 API 输出格式,掌握 exclude_unset、response_model_exclude、状态码配置与模型嵌套的完整用法

Read More