- Implement AES-256-GCM encryption for sensitive data - Implement decryption function - Implement Bearer token authentication verification - Add comprehensive tests for encryption/decryption roundtrip - Add tests for API token verification (success and failure cases) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
79 lines
1.9 KiB
Python
79 lines
1.9 KiB
Python
import base64
|
|
import os
|
|
from typing import Optional
|
|
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
|
|
from cryptography.hazmat.backends import default_backend
|
|
|
|
|
|
def encrypt_data(data: bytes, key_b64: str) -> bytes:
|
|
"""
|
|
使用 AES-256-GCM 加密数据.
|
|
|
|
Args:
|
|
data: 原始数据
|
|
key_b64: Base64 编码的加密密钥
|
|
|
|
Returns:
|
|
加密后的数据 (nonce + ciphertext + tag)
|
|
"""
|
|
key = base64.b64decode(key_b64)
|
|
nonce = os.urandom(12) # GCM 96-bit nonce
|
|
|
|
cipher = Cipher(
|
|
algorithms.AES(key),
|
|
modes.GCM(nonce),
|
|
backend=default_backend()
|
|
)
|
|
encryptor = cipher.encryptor()
|
|
ciphertext = encryptor.update(data) + encryptor.finalize()
|
|
|
|
# 返回 nonce + ciphertext + tag
|
|
return nonce + ciphertext + encryptor.tag
|
|
|
|
|
|
def decrypt_data(encrypted_data: bytes, key_b64: str) -> bytes:
|
|
"""
|
|
解密使用 encrypt_data 加密的数据.
|
|
|
|
Args:
|
|
encrypted_data: 加密数据 (nonce + ciphertext + tag)
|
|
key_b64: Base64 编码的加密密钥
|
|
|
|
Returns:
|
|
解密后的原始数据
|
|
"""
|
|
key = base64.b64decode(key_b64)
|
|
nonce = encrypted_data[:12]
|
|
tag = encrypted_data[-16:]
|
|
ciphertext = encrypted_data[12:-16]
|
|
|
|
cipher = Cipher(
|
|
algorithms.AES(key),
|
|
modes.GCM(nonce, tag),
|
|
backend=default_backend()
|
|
)
|
|
decryptor = cipher.decryptor()
|
|
return decryptor.update(ciphertext) + decryptor.finalize()
|
|
|
|
|
|
def verify_api_token(authorization: Optional[str]) -> bool:
|
|
"""
|
|
验证 Bearer Token.
|
|
|
|
Args:
|
|
authorization: Authorization 头部值
|
|
|
|
Returns:
|
|
Token 是否有效
|
|
"""
|
|
if not authorization:
|
|
return False
|
|
|
|
if not authorization.startswith('Bearer '):
|
|
return False
|
|
|
|
from app.config import get_settings
|
|
settings = get_settings()
|
|
token = authorization[7:] # 移除 'Bearer ' 前缀
|
|
return token == settings.api_token
|