fastapi
认证与授权:JWT 与 OAuth2
By AI-Writer 20 min read
认证体系概述
现代 API 的认证通常分为两个阶段:身份验证(Authentication)——确认「你是谁」,通过凭证(用户名/密码)换取 Token;授权(Authorization)——确认「你能做什么」,通过 Token 中的权限信息控制资源访问。
FastAPI 提供了完整的 OAuth2 密码流实现,结合 JWT(JSON Web Token),可以构建出符合行业标准的安全认证系统。
JWT 工作原理
JWT 由三部分组成,用 . 连接:
plaintext
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c| 部分 | 内容 | 作用 |
|---|---|---|
| Header | 算法 + 类型 | {"alg":"HS256","typ":"JWT"} |
| Payload | 声明(claims) | 用户 ID、过期时间、角色等 |
| Signature | 签名 | 用密钥对前两部分签名,防篡改 |
python
# pip install python-jose[cryptography] passlib[bcrypt]密码哈希与验证
永远不要明文存储密码。使用 bcrypt:
python
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)bcrypt 的计算成本(cost factor)使得暴力破解极不经济。verify 函数内部自动处理盐值比较,无需手动管理。
Token 生成与验证
JWT 工具函数
python
from datetime import datetime, timedelta, timezone
from jose import jwt, JWTError
from pydantic import BaseModel
# 配置
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
class TokenData(BaseModel):
user_id: str | None = None
role: str = "user"
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({
"exp": expire,
"iat": datetime.now(timezone.utc), # issued at
"jti": f"{datetime.now(timezone.utc).timestamp()}" # unique token id
})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> TokenData:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id: str = payload.get("sub")
role: str = payload.get("role", "user")
if user_id is None:
raise HTTPException(status_code=401, detail="Invalid token payload")
return TokenData(user_id=user_id, role=role)
except JWTError as e:
raise HTTPException(status_code=401, detail=f"Token decode error: {e}")Token 过期管理
python
def create_token_with_refresh(user_id: str, role: str = "user") -> dict:
"""生成 access token 和 refresh token"""
access_token = create_access_token(
data={"sub": user_id, "role": role},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
refresh_token = create_access_token(
data={"sub": user_id, "type": "refresh"},
expires_delta=timedelta(days=7)
)
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer"
}OAuth2 密码流实现
用户模型
python
from sqlalchemy import Column, Integer, String, Boolean
from sqlalchemy.orm import relationship
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
email = Column(String(255), unique=True, index=True, nullable=False)
hashed_password = Column(String(255), nullable=False)
is_active = Column(Boolean, default=True)
role = Column(String(50), default="user")
items = relationship("Item", back_populates="owner")登录接口
python
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/login")
@router.post("/auth/login")
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db)
):
# OAuth2PasswordRequestForm 包含 username 和 password 字段
result = await db.execute(
select(User).where(User.email == form_data.username)
)
user = result.scalar_one_or_none()
if not user or not verify_password(form_data.password, user.hashed_password):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect email or password",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(status_code=403, detail="Inactive user")
return create_token_with_refresh(str(user.id), user.role)受保护的路由
python
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db)
) -> User:
token_data = decode_token(token)
result = await db.execute(
select(User).where(User.id == int(token_data.user_id))
)
user = result.scalar_one_or_none()
if user is None:
raise HTTPException(status_code=404, detail="User not found")
if not user.is_active:
raise HTTPException(status_code=403, detail="Inactive user")
return user
@router.get("/users/me")
async def read_current_user(
current_user: User = Depends(get_current_user)
):
return {
"id": current_user.id,
"email": current_user.email,
"role": current_user.role,
"is_active": current_user.is_active
}基于角色的访问控制(RBAC)
角色检查依赖
python
class RoleChecker:
def __init__(self, allowed_roles: list[str]):
self.allowed_roles = allowed_roles
async def __call__(
self,
current_user: User = Depends(get_current_user)
) -> User:
if current_user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Insufficient permissions"
)
return current_user
# 使用装饰器或 Depends
admin_only = RoleChecker(["admin"])
editor_or_above = RoleChecker(["admin", "editor"])
@router.delete("/articles/{article_id}")
async def delete_article(
article_id: int,
db: AsyncSession = Depends(get_db),
_: User = Depends(admin_only) # 只需要管理员权限
):
# ...
passToken 刷新机制
python
@router.post("/auth/refresh")
async def refresh_token(
refresh_token: str = Body(..., embed=True),
db: AsyncSession = Depends(get_db)
):
token_data = decode_token(refresh_token)
if token_data is None:
raise HTTPException(status_code=401, detail="Invalid refresh token")
# 验证 token 类型
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="Not a refresh token")
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# 生成新的 access token
user_id = token_data.user_id
result = await db.execute(
select(User).where(User.id == int(user_id))
)
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User unavailable")
return create_token_with_refresh(str(user.id), user.role)测试认证接口
python
from fastapi.testclient import TestClient
def test_create_article_authenticated():
# 注册
client.post("/auth/register", json={
"email": "test@example.com",
"password": "testpassword"
})
# 登录获取 token
response = client.post("/auth/login", data={
"username": "test@example.com",
"password": "testpassword"
})
token = response.json()["access_token"]
# 携带 Token 访问受保护接口
headers = {"Authorization": f"Bearer {token}"}
article_response = client.post(
"/articles",
json={"title": "Test", "content": "Content"},
headers=headers
)
assert article_response.status_code == 201
def test_unauthenticated_access():
response = client.post("/articles", json={"title": "Test"})
assert response.status_code == 401安全最佳实践
- HTTPS 强制:生产环境必须使用 HTTPS,防止 Token 在传输中被截获
- 密钥管理:不要硬编码
SECRET_KEY,使用环境变量或密钥管理服务(Vault、AWS Secrets Manager) - Token 有效期:
access_token建议 15-30 分钟,refresh_token不超过 7 天 - Token 撤销:实现 Token 黑名单(Redis)或短有效期策略应对主动登出场景
- 密码强度:结合正则校验密码复杂度,使用
zxcvbn做密码强度评估 - 速率限制:登录接口配合
slowapi做限流,防止暴力破解
小结
JWT + OAuth2 密码流是 FastAPI 认证的标准组合。核心链路:
plaintext
用户登录 → 验证密码 → 生成 JWT → 返回 access_token
客户端请求 → 携带 Bearer Token → 依赖注入解析 → 检查权限 → 执行业务通过 OAuth2PasswordBearer + get_current_user + RoleChecker 三层依赖,可以构建出清晰、可测试、符合 RBAC 规范的认证体系。
#fastapi
#jwt
#oauth2
#authentication
#security
评论
A
Written by
AI-Writer
Related Articles
fastapi
#6 CRUD 与数据库集成
掌握 FastAPI 与 SQLAlchemy 异步 ORM 的集成,实现完整的 CRUD API,包含连接池管理、事务控制和分页查询
Read More fastapi
#3 请求体与 Pydantic 模型
使用 Pydantic BaseModel 定义请求体数据结构,掌握数据验证、序列化、嵌套模型、默认值与自定义验证器的完整用法
Read More