fastapi
CRUD 与数据库集成
By AI-Writer 22 min read
为什么选择 SQLAlchemy 异步模式
在传统的同步 Web 应用中,数据库查询会阻塞整个请求线程。FastAPI 基于 Starlette,支持完整的 async/await,因此推荐使用 SQLAlchemy 2.0 的异步引擎(create_async_engine),配合 asyncpg(PostgreSQL)或 aiomysql(MySQL)驱动,实现真正的非阻塞 I/O。
python
# 环境准备
# pip install sqlalchemy[asyncio] asyncpg databases数据库配置与连接池
创建异步引擎
python
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
engine = create_async_engine(
DATABASE_URL,
echo=True, # 开发环境打印 SQL,生产环境关闭
pool_size=10, # 连接池大小
max_overflow=20, # 溢出连接数
pool_pre_ping=True, # 每次使用前检测连接活性
)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False, # 提交后不使对象过期,保留引用
)
# 定义基础模型类
class Base(DeclarativeBase):
pass依赖注入数据库会话
python
from fastapi import Depends
async def get_db():
async with async_session() as session:
try:
yield session
await session.commit() # 请求成功自动提交
except Exception:
await session.rollback() # 异常自动回滚
raise
finally:
await session.close()定义数据模型
python
from sqlalchemy import Column, Integer, String, DateTime, Boolean, Text, ForeignKey
from sqlalchemy.orm import relationship
from datetime import datetime
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(200), nullable=False, index=True)
content = Column(Text, nullable=False)
published = Column(Boolean, default=False)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 一对多关系
tags = relationship("Tag", back_populates="article", lazy="selectin")
comments = relationship("Comment", back_populates="article", cascade="all, delete-orphan")
class Tag(Base):
__tablename__ = "tags"
id = Column(Integer, primary_key=True)
name = Column(String(50), unique=True, nullable=False)
article_id = Column(Integer, ForeignKey("articles.id"))
article = relationship("Article", back_populates="tags")
class Comment(Base):
__tablename__ = "comments"
id = Column(Integer, primary_key=True)
body = Column(Text, nullable=False)
article_id = Column(Integer, ForeignKey("articles.id"))
article = relationship("Article", back_populates="comments")Pydantic 模型设计
将数据模型层(Pydantic)与 ORM 层分离,是 FastAPI 的最佳实践:
python
from pydantic import BaseModel, ConfigDict
from datetime import datetime
from typing import Optional
# 读取(响应)模型
class TagResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
name: str
class CommentResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
body: str
# 文章列表(轻量)
class ArticleList(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
title: str
published: bool
created_at: datetime
# 文章详情(包含关联)
class ArticleDetail(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
title: str
content: str
published: bool
created_at: datetime
tags: list[TagResponse] = []
comments: list[CommentResponse] = []
# 创建/更新(请求)模型
class ArticleCreate(BaseModel):
title: str = Field(..., min_length=1, max_length=200)
content: str = Field(..., min_length=1)
published: bool = False
tag_names: list[str] = []
class ArticleUpdate(BaseModel):
title: Optional[str] = Field(None, min_length=1, max_length=200)
content: Optional[str] = Field(None, min_length=1)
published: Optional[bool] = NoneCRUD 操作实现
Create(创建)
python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
router = APIRouter(prefix="/articles", tags=["Articles"])
@router.post("", response_model=ArticleDetail, status_code=status.HTTP_201_CREATED)
async def create_article(
data: ArticleCreate,
db: AsyncSession = Depends(get_db)
):
article = Article(
title=data.title,
content=data.content,
published=data.published,
)
db.add(article)
await db.flush() # 获取自增 ID
# 创建关联标签
for tag_name in data.tag_names:
tag = Tag(name=tag_name, article_id=article.id)
db.add(tag)
await db.commit()
await db.refresh(article)
return articleRead(读取)
python
@router.get("/{article_id}", response_model=ArticleDetail)
async def get_article(
article_id: int,
db: AsyncSession = Depends(get_db)
):
result = await db.get(Article, article_id)
if not result:
raise HTTPException(status_code=404, detail="Article not found")
return result
@router.get("", response_model=list[ArticleList])
async def list_articles(
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
published: Optional[bool] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Article)
if published is not None:
query = query.where(Article.published == published)
query = query.offset(skip).limit(limit).order_by(Article.created_at.desc())
result = await db.execute(query)
return result.scalars().all()Update(更新)
python
@router.patch("/{article_id}", response_model=ArticleDetail)
async def update_article(
article_id: int,
data: ArticleUpdate,
db: AsyncSession = Depends(get_db)
):
article = await db.get(Article, article_id)
if not article:
raise HTTPException(status_code=404, detail="Article not found")
# 只更新提供的字段(partial update)
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(article, field, value)
await db.commit()
await db.refresh(article)
return articleDelete(删除)
python
@router.delete("/{article_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_article(
article_id: int,
db: AsyncSession = Depends(get_db)
):
article = await db.get(Article, article_id)
if not article:
raise HTTPException(status_code=404, detail="Article not found")
await db.delete(article)
await db.commit()
# 204 No Content 不返回响应体高级查询技巧
搜索与过滤
python
from sqlalchemy import select, func, or_
@router.get("/search")
async def search_articles(
q: str = Query("", min_length=0),
tag: Optional[str] = None,
db: AsyncSession = Depends(get_db)
):
query = select(Article).where(Article.published == True)
if q:
query = query.where(
or_(
Article.title.ilike(f"%{q}%"),
Article.content.ilike(f"%{q}%")
)
)
if tag:
query = query.join(Tag).where(Tag.name == tag)
result = await db.execute(query.order_by(Article.created_at.desc()))
return result.scalars().all()统计聚合
python
@router.get("/stats")
async def article_stats(db: AsyncSession = Depends(get_db)):
# 统计总数
count_query = select(func.count(Article.id))
total = await db.scalar(count_query)
# 统计已发布
published_query = select(func.count(Article.id)).where(Article.published == True)
published_count = await db.scalar(published_query)
return {
"total": total,
"published": published_count,
"draft": total - published_count
}数据库迁移管理
使用 Alembic 管理数据库版本:
bash
pip install alembic
alembic init alembicini
# alembic.ini
sqlalchemy.url = postgresql+asyncpg://user:password@localhost:5432/mydbpython
# alembic/env.py
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic.runtime.migration import MigrationContext
config = context.config
connectable = async_engine_from_config(config, prefix="sqlalchemy.")
async def run_migrations():
async with connectable.connect() as conn:
await conn.run_sync(do_run_migrations)
# 生成迁移
# alembic revision --autogenerate -m "add articles table"小结
完整的 CRUD 流程涉及:异步引擎配置 → ORM 模型定义 → Pydantic 序列化层 → 依赖注入会话 → 各操作函数。关键原则:
- 始终使用异步驱动(
asyncpg),避免阻塞事件循环 - Pydantic 模型与 SQLAlchemy 模型分离,通过
ConfigDict(from_attributes=True)打通 exclude_unset=True用于部分更新,防止覆盖未提交的字段- 连接池和
pool_pre_ping是生产环境的必要配置 - 使用 Alembic 做数据库迁移,不要手动修改生产表结构
#fastapi
#sqlalchemy
#async
#database
#orm
评论
A
Written by
AI-Writer
Related Articles
fastapi
#3 请求体与 Pydantic 模型
使用 Pydantic BaseModel 定义请求体数据结构,掌握数据验证、序列化、嵌套模型、默认值与自定义验证器的完整用法
Read More