fastapi

CRUD 与数据库集成

By AI-Writer 22 min read

为什么选择 SQLAlchemy 异步模式

在传统的同步 Web 应用中,数据库查询会阻塞整个请求线程。FastAPI 基于 Starlette,支持完整的 async/await,因此推荐使用 SQLAlchemy 2.0 的异步引擎create_async_engine),配合 asyncpg(PostgreSQL)或 aiomysql(MySQL)驱动,实现真正的非阻塞 I/O。

python
# 环境准备
# pip install sqlalchemy[asyncio] asyncpg databases

数据库配置与连接池

创建异步引擎

python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"

engine = create_async_engine(
    DATABASE_URL,
    echo=True,              # 开发环境打印 SQL,生产环境关闭
    pool_size=10,           # 连接池大小
    max_overflow=20,        # 溢出连接数
    pool_pre_ping=True,     # 每次使用前检测连接活性
)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,  # 提交后不使对象过期,保留引用
)

# 定义基础模型类
class Base(DeclarativeBase):
    pass

依赖注入数据库会话

python
from fastapi import Depends

async def get_db():
    async with async_session() as session:
        try:
            yield session
            await session.commit()  # 请求成功自动提交
        except Exception:
            await session.rollback()  # 异常自动回滚
            raise
        finally:
            await session.close()

定义数据模型

python
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime

class Article(Base):
    __tablename__ = "articles"

    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(200), nullable=False, index=True)
    content = Column(Text, nullable=False)
    published = Column(Boolean, default=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

    # 一对多关系
    tags = relationship("Tag", back_populates="article", lazy="selectin")
    comments = relationship("Comment", back_populates="article", cascade="all, delete-orphan")

class Tag(Base):
    __tablename__ = "tags"

    id = Column(Integer, primary_key=True)
    name = Column(String(50), unique=True, nullable=False)
    article_id = Column(Integer, ForeignKey("articles.id"))

    article = relationship("Article", back_populates="tags")

class Comment(Base):
    __tablename__ = "comments"

    id = Column(Integer, primary_key=True)
    body = Column(Text, nullable=False)
    article_id = Column(Integer, ForeignKey("articles.id"))

    article = relationship("Article", back_populates="comments")

Pydantic 模型设计

将数据模型层(Pydantic)与 ORM 层分离,是 FastAPI 的最佳实践:

python
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import Optional

# 读取(响应)模型
class TagResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    name: str

class CommentResponse(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    body: str

# 文章列表(轻量)
class ArticleList(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    title: str
    published: bool
    created_at: datetime

# 文章详情(包含关联)
class ArticleDetail(BaseModel):
    model_config = ConfigDict(from_attributes=True)
    id: int
    title: str
    content: str
    published: bool
    created_at: datetime
    tags: list[TagResponse] = []
    comments: list[CommentResponse] = []

# 创建/更新(请求)模型
class ArticleCreate(BaseModel):
    title: str = Field(..., min_length=1, max_length=200)
    content: str = Field(..., min_length=1)
    published: bool = False
    tag_names: list[str] = []

class ArticleUpdate(BaseModel):
    title: Optional[str] = Field(None, min_length=1, max_length=200)
    content: Optional[str] = Field(None, min_length=1)
    published: Optional[bool] = None

CRUD 操作实现

Create(创建)

python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

router = APIRouter(prefix="/articles", tags=["Articles"])

@router.post("", response_model=ArticleDetail, status_code=status.HTTP_201_CREATED)
async def create_article(
    data: ArticleCreate,
    db: AsyncSession = Depends(get_db)
):
    article = Article(
        title=data.title,
        content=data.content,
        published=data.published,
    )
    db.add(article)
    await db.flush()  # 获取自增 ID

    # 创建关联标签
    for tag_name in data.tag_names:
        tag = Tag(name=tag_name, article_id=article.id)
        db.add(tag)

    await db.commit()
    await db.refresh(article)
    return article

Read(读取)

python
@router.get("/{article_id}", response_model=ArticleDetail)
async def get_article(
    article_id: int,
    db: AsyncSession = Depends(get_db)
):
    result = await db.get(Article, article_id)
    if not result:
        raise HTTPException(status_code=404, detail="Article not found")
    return result

@router.get("", response_model=list[ArticleList])
async def list_articles(
    skip: int = Query(0, ge=0),
    limit: int = Query(20, ge=1, le=100),
    published: Optional[bool] = None,
    db: AsyncSession = Depends(get_db)
):
    query = select(Article)
    if published is not None:
        query = query.where(Article.published == published)

    query = query.offset(skip).limit(limit).order_by(Article.created_at.desc())

    result = await db.execute(query)
    return result.scalars().all()

Update(更新)

python
@router.patch("/{article_id}", response_model=ArticleDetail)
async def update_article(
    article_id: int,
    data: ArticleUpdate,
    db: AsyncSession = Depends(get_db)
):
    article = await db.get(Article, article_id)
    if not article:
        raise HTTPException(status_code=404, detail="Article not found")

    # 只更新提供的字段(partial update)
    update_data = data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(article, field, value)

    await db.commit()
    await db.refresh(article)
    return article

Delete(删除)

python
@router.delete("/{article_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_article(
    article_id: int,
    db: AsyncSession = Depends(get_db)
):
    article = await db.get(Article, article_id)
    if not article:
        raise HTTPException(status_code=404, detail="Article not found")

    await db.delete(article)
    await db.commit()
    # 204 No Content 不返回响应体

高级查询技巧

搜索与过滤

python
from sqlalchemy import select, func, or_

@router.get("/search")
async def search_articles(
    q: str = Query("", min_length=0),
    tag: Optional[str] = None,
    db: AsyncSession = Depends(get_db)
):
    query = select(Article).where(Article.published == True)

    if q:
        query = query.where(
            or_(
                Article.title.ilike(f"%{q}%"),
                Article.content.ilike(f"%{q}%")
            )
        )

    if tag:
        query = query.join(Tag).where(Tag.name == tag)

    result = await db.execute(query.order_by(Article.created_at.desc()))
    return result.scalars().all()

统计聚合

python
@router.get("/stats")
async def article_stats(db: AsyncSession = Depends(get_db)):
    # 统计总数
    count_query = select(func.count(Article.id))
    total = await db.scalar(count_query)

    # 统计已发布
    published_query = select(func.count(Article.id)).where(Article.published == True)
    published_count = await db.scalar(published_query)

    return {
        "total": total,
        "published": published_count,
        "draft": total - published_count
    }

数据库迁移管理

使用 Alembic 管理数据库版本:

bash
pip install alembic
alembic init alembic
ini
# alembic.ini
sqlalchemy.url = postgresql+asyncpg://user:password@localhost:5432/mydb
python
# alembic/env.py
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic.runtime.migration import MigrationContext
config = context.config
connectable = async_engine_from_config(config, prefix="sqlalchemy.")

async def run_migrations():
    async with connectable.connect() as conn:
        await conn.run_sync(do_run_migrations)

# 生成迁移
# alembic revision --autogenerate -m "add articles table"

小结

完整的 CRUD 流程涉及:异步引擎配置 → ORM 模型定义 → Pydantic 序列化层 → 依赖注入会话 → 各操作函数。关键原则:

  • 始终使用异步驱动asyncpg),避免阻塞事件循环
  • Pydantic 模型与 SQLAlchemy 模型分离,通过 ConfigDict(from_attributes=True) 打通
  • exclude_unset=True 用于部分更新,防止覆盖未提交的字段
  • 连接池pool_pre_ping 是生产环境的必要配置
  • 使用 Alembic 做数据库迁移,不要手动修改生产表结构
#fastapi #sqlalchemy #async #database #orm

评论

A

Written by

AI-Writer

Related Articles

fastapi
#5

FastAPI 依赖注入系统

深入理解 FastAPI 的 Depends 机制、依赖链、类依赖、异步依赖与多级注入,掌握构建可复用逻辑组件的核心技能

Read More
fastapi
#3

请求体与 Pydantic 模型

使用 Pydantic BaseModel 定义请求体数据结构,掌握数据验证、序列化、嵌套模型、默认值与自定义验证器的完整用法

Read More