当前位置: 首页 > news >正文

MonkeyCode实现OAuth2认证:从零到生产级SSO

为什么不用Session+Cookie了?

传统Session方案的痛点:

问题表现
扩展性差Session存在单台服务器内存,多实例无法共享
CSRF风险Cookie自动携带,容易被恶意网站利用
跨域麻烦Cookie在跨域场景下各种限制
移动端不友好App/小程序很难处理Cookie

JWT + OAuth2是无状态、跨平台、支持SSO的现代方案。

OAuth2四种授权模式

MonkeyCode帮你选最合适的:

模式适用场景典型用户
授权码模式有后端的Web应用(最安全)你的SaaS平台
隐式模式纯前端SPA(已淘汰,不推荐)
密码模式高度信任的第一方App你自己的官方App
客户端凭证模式服务间调用微服务A调用微服务B

99%的Web应用应该用:授权码模式 + PKCE(防止授权码被截获)。

实战:用Authlib实现OAuth2 + JWT

安装依赖

让MonkeyCode生成requirements.txt:

authlib==1.3.0 fastapi==0.104.0 pydantic==2.5.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4

完整实现(授权码模式 + JWT)

# app/auth.py - 完整认证模块 from fastapi import FastAPI, Depends, HTTPException, Request, status from authlib.integrations.starlette import OAuth from starlette.middleware.sessions import SessionMiddleware from jose import JWTError, jwt from passlib.context import CryptContext from datetime import datetime, timedelta from typing import Optional, List import os app = FastAPI(title="OAuth2 JWT Demo") app.add_middleware(SessionMiddleware, secret_key=os.getenv("SESSION_SECRET", "dev-secret")) # ─── 密码哈希 ─── pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def hash_password(password: str) -> str: return pwd_context.hash(password) def verify_password(plain: str, hashed: str) -> bool: return pwd_context.verify(plain, hashed) # ─── JWT工具 ─── SECRET_KEY = os.getenv("JWT_SECRET", "dev-jwt-secret-change-in-prod") ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 REFRESH_TOKEN_EXPIRE_DAYS = 7 def create_access_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire, "type": "access"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def create_refresh_token(data: dict) -> str: to_encode = data.copy() expire = datetime.utcnow() + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS) to_encode.update({"exp": expire, "type": "refresh"}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def decode_token(token: str) -> dict: try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return payload except JWTError: raise HTTPException(status_code=401, detail="Token无效或已过期") # ─── OAuth2第三方登录(GitHub示例)─── oauth = OAuth() oauth.register( name="github", client_id=os.getenv("GITHUB_CLIENT_ID"), client_secret=os.getenv("GITHUB_CLIENT_SECRET"), access_token_url="https://github.com/login/oauth/access_token", access_token_params=None, authorize_url="https://github.com/login/oauth/authorize", authorize_params=None, api_base_url="https://api.github.com/", client_kwargs={"scope": "user:email"}, ) @app.get("/auth/github/login") async def github_login(request: Request): redirect_uri = request.url_for("github_callback") return await oauth.github.authorize_redirect(request, redirect_uri) @app.get("/auth/github/callback") async def github_callback(request: Request): token = await oauth.github.authorize_access_token(request) user_info = await oauth.github.get("user", token=token) user_email = user_info.json().get("email") # 查找或创建用户 user = await db.get_user_by_email(user_email) if not user: user = await db.create_user(email=user_email, name=user_info.json()["login"]) # 签发JWT access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) # 重定向回前端,把token放在URL fragment(不会发到后端) frontend_url = os.getenv("FRONTEND_URL", "http://localhost:3000") return RedirectResponse( url=f"{frontend_url}/auth/callback?access_token={access_token}&refresh_token={refresh_token}" ) # ─── 本地注册/登录 ─── from pydantic import BaseModel class RegisterRequest(BaseModel): email: str password: str name: str class LoginRequest(BaseModel): email: str password: str @app.post("/auth/register") async def register(req: RegisterRequest): existing = await db.get_user_by_email(req.email) if existing: raise HTTPException(400, "邮箱已注册") hashed = hash_password(req.password) user = await db.create_user(email=req.email, name=req.name, password_hash=hashed) access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } @app.post("/auth/login") async def login(req: LoginRequest): user = await db.get_user_by_email(req.email) if not user or not verify_password(req.password, user.password_hash): raise HTTPException(401, "邮箱或密码错误") access_token = create_access_token({"sub": str(user.id), "email": user.email}) refresh_token = create_refresh_token({"sub": str(user.id)}) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", "expires_in": ACCESS_TOKEN_EXPIRE_MINUTES * 60 } # ─── 刷新Token ─── class RefreshRequest(BaseModel): refresh_token: str @app.post("/auth/refresh") async def refresh(req: RefreshRequest): payload = decode_token(req.refresh_token) if payload.get("type") != "refresh": raise HTTPException(401, "无效的refresh token") new_access_token = create_access_token({"sub": payload["sub"]}) return {"access_token": new_access_token, "token_type": "bearer"} # ─── 获取当前用户(依赖注入)─── from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials security = HTTPBearer() async def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)) -> dict: payload = decode_token(cred.credentials) user_id = payload.get("sub") if not user_id: raise HTTPException(401, "无效的token") user = await db.get_user_by_id(int(user_id)) if not user: raise HTTPException(401, "用户不存在") return user # ─── 受保护的接口 ─── @app.get("/api/me") async def get_me(current_user: dict = Depends(get_current_user)): return { "id": current_user["id"], "email": current_user["email"], "name": current_user["name"] } @app.get("/api/protected") async def protected_route(current_user: dict = Depends(get_current_user)): return {"message": f"你好,{current_user['name']}!"}

前端如何携带JWT

// 登录后存储token const login = async (email, password) => { const resp = await fetch("http://localhost:8000/auth/login", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ email, password }) }); const data = await resp.json(); localStorage.setItem("access_token", data.access_token); localStorage.setItem("refresh_token", data.refresh_token); }; // 请求拦截器:自动附加token const apiFetch = async (url, options = {}) => { const token = localStorage.getItem("access_token"); const headers = { ...options.headers, "Authorization": `Bearer ${token}` }; let resp = await fetch(url, { ...options, headers }); // Token过期,尝试刷新 if (resp.status === 401) { const refreshed = await refreshToken(); if (refreshed) { const newToken = localStorage.getItem("access_token"); headers["Authorization"] = `Bearer ${newToken}`; resp = await fetch(url, { ...options, headers }); } } return resp; }; const refreshToken = async () => { const refreshToken = localStorage.getItem("refresh_token"); const resp = await fetch("http://localhost:8000/auth/refresh", { method: "POST", headers: { "Content-Type": "application/json" }, body: JSON.stringify({ refresh_token: refreshToken }) }); if (resp.ok) { const data = await resp.json(); localStorage.setItem("access_token", data.access_token); return true; } // Refresh token也过期,跳转登录 localStorage.clear(); window.location.href = "/login"; return false; };

PKCE:为什么授权码模式需要它?

传统授权码模式有个漏洞:授权码在回调URL中,可能被恶意App截获(Android/iOS的Intent劫持)。

PKCE(Proof Key for Code Exchange)解决这个问题:

前端生成 code_verifier(随机字符串) ↓ hash → code_challenge ↓ 把 code_challenge 发给授权服务器 授权服务器返回授权码 ↓ 前端带着 授权码 + code_verifier 换token 服务器验证 hash(code_verifier) == code_challenge? ✅ 通过才发token

MonkeyCode生成的PKCE实现:

import hashlib import base64 import secrets def generate_pkce_pair(): """生成code_verifier和code_challenge""" code_verifier = base64.urlsafe_b64encode( secrets.token_bytes(32) ).decode().rstrip("=") code_challenge = base64.urlsafe_b64encode( hashlib.sha256(code_verifier.encode()).digest() ).decode().rstrip("=") return code_verifier, code_challenge # 前端登录时 verifier, challenge = generate_pkce_pair() session["pkce_verifier"] = verifier # 存在session redirect_url = f"https://auth-server/oauth/authorize?...&code_challenge={challenge}&code_challenge_method=S256" # 回调时 stored_verifier = session.pop("pkce_verifier") # 用 verifier 换token,服务器会验证

安全最佳实践(MonkeyCode自动检查)

让MonkeyCode检查我的认证代码的安全性,列出所有风险点
检查项风险修复
JWT Secret硬编码Token可被伪造用环境变量,长度≥32字符
没用HTTPSToken被中间人截获生产环境强制HTTPS
access token过期时间太长泄露后影响大≤30分钟,用refresh token续期
没做登出黑名单token被盗用无法撤销登出时把token加入黑名单(Redis)
没限制登录尝试容易被暴力破解登录失败5次锁定15分钟
密码强度不够弱密码被撞库注册时强制8位+大小写+数字
http://www.gsyq.cn/news/1604161.html

相关文章:

  • 级别的AutoBuilder,一键干掉80%的重复CRUD工作
  • 费可商用 PHP 管理后台 CatchAdmin V5.3.1 发布 后台打包直降 5s 内
  • 高校汉服租赁网站源码 Java+SpringBoot+Vue 万字文档
  • FDE标准:FDE落地最后一公里,在银行、政务,石油,电力,金融的产品、标准和落地案例
  • IEC 60205-2026
  • 竣宝潜龙尾盘副选精准抓主力洗盘尾巴主升浪信号 九点智投三步点金,五星智投双紫擒龙指标选股魔方量化指标公式
  • item0(1):接地
  • 最新小学生学习前端vue 多插图
  • AMAT 0100-1200印刷电路板
  • WinUtil:革命性Windows系统管理工具,一键完成软件部署与系统优化
  • AutoUnipus终极指南:快速掌握U校园智能刷课工具完整教程
  • 告别图片!三种 CSS 原生方案实现任意方向三角形
  • leetcode:两个数组的交集
  • MouseTester:免费开源的鼠标性能终极测试工具
  • 从工具函数中注入消息
  • 二维数组知识
  • 3D Web 服务器环境搭建
  • 为什么你用光模块测试FPGA IBERT不通
  • AI插件开发实战:基于JS脚本的Illustrator色标生成器设计与实现
  • 特殊上位机权限管理方案
  • 三角洲S10裂变新赛季上线[特殊字符]Mac玩家再也不用错过核电站新图!
  • C# CAD二次开发消息提示技巧
  • TUSB4020B评估模块拆解:从电源设计到信号完整性,打造稳定USB集线器
  • LangGraph 架构避坑:智能体职责拆分与流式回调透传机制剖析
  • 启鸣AI赋能大学课堂,西班牙访学团沉浸式体验天立智慧教学
  • Dataify 跨境电商数据采集全攻略实战
  • 私钥登录ssh服务器
  • 深度把玩劳力士3235机芯的老哥,先放大50倍看看这组表盘序列号的防伪公差
  • 用Python调用百度热搜榜API:从零实现实时热搜数据抓取与可视化
  • SUMO仿真控制新维度:Python与TraCI接口实战指南