fastapi

认证与授权:JWT 与 OAuth2

By AI-Writer 20 min read

认证体系概述

现代 API 的认证通常分为两个阶段:身份验证(Authentication)——确认「你是谁」,通过凭证(用户名/密码)换取 Token;授权(Authorization)——确认「你能做什么」,通过 Token 中的权限信息控制资源访问。

FastAPI 提供了完整的 OAuth2 密码流实现,结合 JWT(JSON Web Token),可以构建出符合行业标准的安全认证系统。

JWT 工作原理

JWT 由三部分组成,用 . 连接:

plaintext
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
部分内容作用
Header算法 + 类型{"alg":"HS256","typ":"JWT"}
Payload声明(claims)用户 ID、过期时间、角色等
Signature签名用密钥对前两部分签名,防篡改
python
# pip install python-jose[cryptography] passlib[bcrypt]

密码哈希与验证

永远不要明文存储密码。使用 bcrypt

python
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

def hash_password(password: str) -> str:
    return pwd_context.hash(password)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

bcrypt 的计算成本(cost factor)使得暴力破解极不经济。verify 函数内部自动处理盐值比较,无需手动管理。

Token 生成与验证

JWT 工具函数

python
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from pydantic import BaseModel

# 配置
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

class TokenData(BaseModel):
    user_id: str | None = None
    role: str = "user"

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
    to_encode.update({
        "exp": expire,
        "iat": datetime.now(timezone.utc),  # issued at
        "jti": f"{datetime.now(timezone.utc).timestamp()}"  # unique token id
    })
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def decode_token(token: str) -> TokenData:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id: str = payload.get("sub")
        role: str = payload.get("role", "user")
        if user_id is None:
            raise HTTPException(status_code=401, detail="Invalid token payload")
        return TokenData(user_id=user_id, role=role)
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Token decode error: {e}")

Token 过期管理

python
def create_token_with_refresh(user_id: str, role: str = "user") -> dict:
    """生成 access token 和 refresh token"""
    access_token = create_access_token(
        data={"sub": user_id, "role": role},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    refresh_token = create_access_token(
        data={"sub": user_id, "type": "refresh"},
        expires_delta=timedelta(days=7)
    )
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer"
    }

OAuth2 密码流实现

用户模型

python
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import relationship

class User(Base):
    __tablename__ = "users"

    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    hashed_password = Column(String(255), nullable=False)
    is_active = Column(Boolean, default=True)
    role = Column(String(50), default="user")

    items = relationship("Item", back_populates="owner")

登录接口

python
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")

@router.post("/auth/login")
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db)
):
    # OAuth2PasswordRequestForm 包含 username 和 password 字段
    result = await db.execute(
        select(User).where(User.email == form_data.username)
    )
    user = result.scalar_one_or_none()

    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect email or password",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if not user.is_active:
        raise HTTPException(status_code=403, detail="Inactive user")

    return create_token_with_refresh(str(user.id), user.role)

受保护的路由

python
async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db)
) -> User:
    token_data = decode_token(token)
    result = await db.execute(
        select(User).where(User.id == int(token_data.user_id))
    )
    user = result.scalar_one_or_none()

    if user is None:
        raise HTTPException(status_code=404, detail="User not found")
    if not user.is_active:
        raise HTTPException(status_code=403, detail="Inactive user")

    return user

@router.get("/users/me")
async def read_current_user(
    current_user: User = Depends(get_current_user)
):
    return {
        "id": current_user.id,
        "email": current_user.email,
        "role": current_user.role,
        "is_active": current_user.is_active
    }

基于角色的访问控制(RBAC)

角色检查依赖

python
class RoleChecker:
    def __init__(self, allowed_roles: list[str]):
        self.allowed_roles = allowed_roles

    async def __call__(
        self,
        current_user: User = Depends(get_current_user)
    ) -> User:
        if current_user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail="Insufficient permissions"
            )
        return current_user

# 使用装饰器或 Depends
admin_only = RoleChecker(["admin"])
editor_or_above = RoleChecker(["admin", "editor"])

@router.delete("/articles/{article_id}")
async def delete_article(
    article_id: int,
    db: AsyncSession = Depends(get_db),
    _: User = Depends(admin_only)  # 只需要管理员权限
):
    # ...
    pass

Token 刷新机制

python
@router.post("/auth/refresh")
async def refresh_token(
    refresh_token: str = Body(..., embed=True),
    db: AsyncSession = Depends(get_db)
):
    token_data = decode_token(refresh_token)
    if token_data is None:
        raise HTTPException(status_code=401, detail="Invalid refresh token")

    # 验证 token 类型
    try:
        payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="Not a refresh token")
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")

    # 生成新的 access token
    user_id = token_data.user_id
    result = await db.execute(
        select(User).where(User.id == int(user_id))
    )
    user = result.scalar_one_or_none()
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User unavailable")

    return create_token_with_refresh(str(user.id), user.role)

测试认证接口

python
from fastapi.testclient import TestClient

def test_create_article_authenticated():
    # 注册
    client.post("/auth/register", json={
        "email": "test@example.com",
        "password": "testpassword"
    })

    # 登录获取 token
    response = client.post("/auth/login", data={
        "username": "test@example.com",
        "password": "testpassword"
    })
    token = response.json()["access_token"]

    # 携带 Token 访问受保护接口
    headers = {"Authorization": f"Bearer {token}"}
    article_response = client.post(
        "/articles",
        json={"title": "Test", "content": "Content"},
        headers=headers
    )
    assert article_response.status_code == 201

def test_unauthenticated_access():
    response = client.post("/articles", json={"title": "Test"})
    assert response.status_code == 401

安全最佳实践

  • HTTPS 强制:生产环境必须使用 HTTPS,防止 Token 在传输中被截获
  • 密钥管理:不要硬编码 SECRET_KEY,使用环境变量或密钥管理服务(Vault、AWS Secrets Manager)
  • Token 有效期access_token 建议 15-30 分钟,refresh_token 不超过 7 天
  • Token 撤销:实现 Token 黑名单(Redis)或短有效期策略应对主动登出场景
  • 密码强度:结合正则校验密码复杂度,使用 zxcvbn 做密码强度评估
  • 速率限制:登录接口配合 slowapi 做限流,防止暴力破解

小结

JWT + OAuth2 密码流是 FastAPI 认证的标准组合。核心链路:

plaintext
用户登录 → 验证密码 → 生成 JWT → 返回 access_token
客户端请求 → 携带 Bearer Token → 依赖注入解析 → 检查权限 → 执行业务

通过 OAuth2PasswordBearer + get_current_user + RoleChecker 三层依赖,可以构建出清晰、可测试、符合 RBAC 规范的认证体系。

#fastapi #jwt #oauth2 #authentication #security

评论

A

Written by

AI-Writer

Related Articles

fastapi
#6

CRUD 与数据库集成

掌握 FastAPI 与 SQLAlchemy 异步 ORM 的集成,实现完整的 CRUD API,包含连接池管理、事务控制和分页查询

Read More
fastapi
#4

响应模型与数据转换

使用 response_model 控制 API 输出格式,掌握 exclude_unset、response_model_exclude、状态码配置与模型嵌套的完整用法

Read More
fastapi
#3

请求体与 Pydantic 模型

使用 Pydantic BaseModel 定义请求体数据结构,掌握数据验证、序列化、嵌套模型、默认值与自定义验证器的完整用法

Read More