1.什么是 JWT?

JWT 的全称是 JSON Web Token。

通俗来说,它是一张由服务器签发的‘数字通行证’。当用户登录成功后,服务器会发给客户端(浏览器/APP)一个加密的字符串,此后,客户端每次请求服务器,只要带着这张‘通行证’,服务器就能立刻认出你是谁,并决定是否放行。

以前最主流的身份验证方式是Cookie-Session 机制。

1.Redis Session方案

Cookie-Session 机制是怎么做的呢,用户输入账号密码,服务器A验证通过,生成一个全局唯一的SessionID,并通过网络写入Redis。
然后服务器A通过Set-Cookie响应头,把SessionID返回给浏览器。
当用户再次请求的时候,可能到服务器B去了,服务器B从请求中取出SessionID,这时候他并不会去查自己的内存,而是去Redis查询。
如果Redis返回了用户信息,那么验证成功,开始处理业务。
没有就会要求重新登录。
(注:Redis 的全称是 Remote Dictionary Server(远程字典服务器)。它是一个开源、基于内存的数据结构存储系统,常被用作数据库、缓存和消息代理。)

流程图如下:
请添加图片描述

相比起上面的Redis Session,它是‘服务端可控’的方案,而JWT是‘服务端无状态’的方案。

当我们希望极简化架构(不引入Redis),请求量极大,且无法承受每次请求都查 Redis 的微小开销(其实 Redis 足够快,但对于极低延迟场景 JWT 更好)的时候就需要用到JWT了。

2.JWT方案

JWT是怎么做的?

用户输入账号密码,服务器验证通过,直接把用户信息(如用户ID,角色等)打包,加上一个独家密钥生成的签名,做成一个JWT字符串返回给客户端,服务器自己不会保存这个Token。
客户端自己把 JWT 存起来(比如存在 LocalStorage)。以后客户端每次请求,都在 HTTP 的 Header(Header 里的 Authorization)中带着这个 JWT。
服务器收到后,不需要查数据库,只需要用自己的独家密钥验算一下这个 Token 里的签名是否合法。如果合法,直接解密出里面的用户 ID,允许登录。

请添加图片描述

服务器变成了“无状态”的。 不管请求分流到哪台服务器,只要服务器有那个相同的“密钥”,就能独立完成验证,天然支持分布式和微服务架构。

2.JWT令牌的三部分结构

  1. 构造Header:
    Header通常包含令牌类型和签名算法。

    {
      "alg": "HS256",
      "typ": "JWT"
    }
    

    typ固定为JWT,alg是签名算法。
    对这个JSON 对象进行 Base64Url 编码,得到第一部分。

    eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9
    
  2. 构造 Payload(声明/载荷)
    Payload 包含你希望携带的信息,通常包括注册声明(如 iss, exp, sub)和自定义声明(如 user_id, role)。

    {
      "sub": "1234567890",
      "name": "John Doe",
      "iat": 1516239022,
      "exp": 1516242622
    }
    

    iat是签发时间,exp是过期时间。sub是主题,放用户唯一标识。
    同样,把这个 JSON 进行 Base64Url 编码,得到第二部分。

    eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyNDI2MjJ9
    
  3. 生成签名(Signature)
    将前两部分编码后的字符串用点号 . 连接起来:

    eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjE1MTYyNDI2MjJ9
    

    然后用 Header 中声明的算法(例如 HS256)和密钥对这个字符串进行签名。

    HMACSHA256(
      base64UrlEncode(header) + "." + base64UrlEncode(payload),
      secret
    )
    

    得到的签名结果再经过 Base64Url 编码,就是第三部分。

    SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
    
  4. 组合
    将三部分连起来,就是最终下发给客服端的JWT令牌。

    • 传输安全:HTTPS 加密整个 HTTP 请求,包括 JWT 字符串,所以中间人看不到任何内容。

    • JWT 内部:前两部分是 Base64Url 编码,等于明文(任何人都能解码阅读)。第三部分是签名,也是可见的(Base64Url 字符串),但它依赖密钥才能验证。

    • 签名的作用:防止前两部分被篡改。攻击者可以解码阅读,但改了任何字符,签名就失效。

3.使用

在 FastAPI 中使用 JWT 验证非常简单且优雅。FastAPI 内置了对 OAuth2 和 Bearer Token 的支持,再结合 pyjwt 库,几行代码就能搞定。

下面是一份完整的 JWT 认证实现思路,以及对应的 FastAPI 源码。整个过程不依赖任何外部存储(用户数据放在内存字典中),重点展示 JWT 的生成与验证。

思路步骤

  1. 准备依赖库
    需要安装 fastapi、uvicorn、python-jose(用于 JWT 的 encode/decode)、passlib(用于密码哈希)和 bcrypt。
    命令:pip install fastapi uvicorn python-jose[cryptography] passlib[bcrypt]

  2. 定义配置常量

    • SECRET_KEY:用于签名的密钥(生产环境必须从环境变量读取,不能硬编码)。
    • ALGORITHM:签名算法,这里使用 HS256。
    • ACCESS_TOKEN_EXPIRE_MINUTES:访问令牌的有效期(例如 15 分钟)。
  3. 模拟用户数据库
    用一个字典存储用户信息,其中密码使用 bcrypt 哈希存储。为了演示,可以事先用 pwd_context.hash("明文密码") 生成哈希值填入。

  4. 密码验证工具
    通过 passlib.context.CryptContext 创建密码上下文,提供 verify() 方法验证明文和哈希是否匹配。

  5. JWT 生成函数
    接收用户标识(如用户名)和过期时间增量,构造 payload(包含标准字段 subexp),调用 jwt.encode() 返回 token 字符串。

  6. 登录接口(/token)

    • 接收用户名和密码(使用 Pydantic 模型或 FastAPI 的 OAuth2PasswordRequestForm)。
    • 验证用户名是否存在以及密码是否正确。
    • 调用 JWT 生成函数,返回 {"access_token": token, "token_type": "bearer"}
  7. JWT 验证依赖项

    • 使用 OAuth2PasswordBearer(tokenUrl="token") 创建一个依赖对象,它负责从请求头的 Authorization: Bearer <token> 中提取 token。
    • 编写 get_current_user 函数,参数为 token: str = Depends(oauth2_scheme)
    • 在函数内部用 jwt.decode() 验证 token 的签名和过期时间,提取 sub 中的用户名。
    • 根据用户名从模拟数据库中获取用户信息,如果一切正常就返回用户对象或用户名;任何异常(签名错误、过期、用户不存在)都抛出 HTTP 401。
  8. 受保护路由
    在需要登录才能访问的接口中添加参数 current_user = Depends(get_current_user),FastAPI 会自动执行验证,验证通过后可以在路由函数中直接使用 current_user

  9. 公开路由
    不加任何依赖项,允许所有人访问。

完整源码(main.py)

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
from datetime import datetime, timedelta, timezone
from jose import jwt
from passlib.context import CryptContext

# ---------- 配置 ----------
SECRET_KEY = "your-secret-key-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 15

# 密码哈希工具
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 模拟用户数据库(实际应使用真实数据库)
# 下面密码 "123456" 的哈希值是通过 pwd_context.hash("123456") 生成的
fake_users_db = {
    "alice": {
        "username": "alice",
        "hashed_password": "$2b$12$KxE5fHqDfGj5XfZxYxZxYxZxYxZxYxZxYxZxYxZxY",
        "disabled": False,
    }
}

# OAuth2 方案,tokenUrl 指向我们的登录端点
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

# ---------- 辅助函数 ----------
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_user(username: str):
    return fake_users_db.get(username)

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# ---------- 核心依赖:验证 JWT 并返回当前用户 ----------
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="无效的认证凭据",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token 已过期")
    except jwt.JWTError:
        raise credentials_exception

    user = get_user(username)
    if user is None:
        raise credentials_exception
    return user

# ---------- 路由 ----------
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    # OAuth2PasswordRequestForm 提供 username 和 password 字段
    user = get_user(form_data.username)
    if not user or not verify_password(form_data.password, user["hashed_password"]):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="用户名或密码错误",
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user["username"]}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
    # 只有验证通过的用户才能访问
    return {"message": f"你好 {current_user['username']},你已通过 JWT 验证"}

@app.get("/public")
async def public_route():
    return {"message": "这是一个公开接口,无需认证"}

如何测试

  1. 保存上述代码为 main.py
  2. 启动服务:uvicorn main:app --reload
  3. 获取 token:
    curl -X POST "http://127.0.0.1:8000/token" \
         -H "Content-Type: application/x-www-form-urlencoded" \
         -d "username=alice&password=123456"
    
    返回示例:{"access_token":"eyJ...","token_type":"bearer"}
  4. 访问受保护接口:
    curl -X GET "http://127.0.0.1:8000/protected" \
         -H "Authorization: Bearer eyJ..."
    
    返回:{"message":"你好 alice,你已通过 JWT 验证"}
  5. 访问公开接口:curl http://127.0.0.1:8000/public

提醒

  • 生产环境 SECRET_KEY 必须使用随机长字符串,并从环境变量读取,严禁硬编码。
  • 必须启用 HTTPS,防止 token 在传输中被截获。
  • 密码哈希务必使用强算法(bcrypt 是好的选择)。
  • 根据业务需要调整 token 有效期,并可加入 refresh token 机制。
Logo

openEuler 是由开放原子开源基金会孵化的全场景开源操作系统项目,面向数字基础设施四大核心场景(服务器、云计算、边缘计算、嵌入式),全面支持 ARM、x86、RISC-V、loongArch、PowerPC、SW-64 等多样性计算架构

更多推荐