当前位置: 首页 > news >正文

Python安全序列化

"""
Python 安全序列化 —— 规避 Pickle 风险,使用安全替代方案
涵盖 pickle 漏洞演示、JSON/msgpack 安全序列化、itsdangerous 签名加密
"""

# 安装依赖:pip install msgpack itsdangerous
# 序列化安全是 Python 安全中常被忽视但后果严重的领域

import pickle
import json
import msgpack
import hashlib
import hmac
import os
from typing import Any, Dict, Optional
from itsdangerous import URLSafeSerializer, TimedSerializer, BadSignature

# ========== 第一部分:Pickle 安全风险 ==========

class UnsafePickleDemo:
"""
Pickle 反序列化漏洞演示。
pickle.loads() 可以执行任意 Python 代码,这就是为什么
永远不要对不可信数据使用 pickle 反序列化。
"""

@staticmethod
def create_malicious_payload() -> bytes:
"""
创建一个恶意的 pickle 载荷。
实际攻击中,攻击者会让服务器反序列化此载荷以执行恶意代码。
"""
# 使用 __reduce__ 定义反序列化时执行的代码
class EvilPickle:
def __reduce__(self):
# 反序列化时将执行 os.system 命令
import os
cmd = "echo '攻击成功!反序列化执行了任意命令'"
return (os.system, (cmd,))

evil = EvilPickle()
malicious_data = pickle.dumps(evil)
print(f"恶意 pickle 载荷已生成({len(malicious_data)} 字节)")
return malicious_data

@staticmethod
def dangerous_deserialize(data: bytes) -> Any:
"""
不安全的反序列化 —— 直接调用 pickle.loads。
这就是生产代码中绝对不能做的事情!
"""
print("警告:正在反序列化不可信的 pickle 数据!")
obj = pickle.loads(data) # 这里会执行任意代码
return obj


# ========== 第二部分:安全的序列化替代方案 ==========

class SafeSerialization:
"""
安全的序列化方案 —— 使用不执行代码的格式。
JSON 和 MessagePack 都是纯数据格式,不会执行代码。
"""

@staticmethod
def serialize_json(data: Any) -> str:
"""
使用 JSON 进行安全序列化。
JSON 是最通用的跨语言序列化格式。
"""
return json.dumps(data, ensure_ascii=False, default=str)

@staticmethod
def deserialize_json(data: str) -> Any:
"""
安全反序列化 JSON 数据。
"""
return json.loads(data)

@staticmethod
def serialize_msgpack(data: Any) -> bytes:
"""
使用 MessagePack 进行安全序列化。
MessagePack 类似 JSON 但更紧凑,适合高性能场景。
"""
return msgpack.packb(data)

@staticmethod
def deserialize_msgpack(data: bytes) -> Any:
"""
安全反序列化 MessagePack 数据。
"""
return msgpack.unpackb(data)

@staticmethod
def compare_formats(data: Dict) -> None:
"""
比较不同序列化格式的效率和大小。
"""
json_data = SafeSerialization.serialize_json(data)
msgpack_data = SafeSerialization.serialize_msgpack(data)

print(f"原始数据大小:{len(str(data))} 字节")
print(f"JSON 序列化大小:{len(json_data)} 字节")
print(f"MessagePack 序列化大小:{len(msgpack_data)} 字节")
print(f"MessagePack 节省了 {len(json_data) - len(msgpack_data)} 字节")


# ========== 第三部分:带签名和加密的安全序列化 ==========

class SecureSerializer:
"""
安全序列化器 —— 使用 itsdangerous 库对数据进行签名和加密。
itsdangerous 提供:
- 签名:防止数据被篡改
- 时间戳:防止重放攻击
- 加密:保护数据机密性
"""

def __init__(self, secret_key: str):
"""
初始化安全序列化器。
secret_key 必须是高熵的密钥(建议使用 secrets.token_hex() 生成)。
"""
self.secret_key = secret_key
# URL 安全序列化器:对数据进行签名
self.url_serializer = URLSafeSerializer(secret_key)
# 时间戳序列化器:带过期时间的签名
self.timed_serializer = TimedSerializer(secret_key)

def sign_data(self, data: Any) -> str:
"""
对数据进行签名,防止篡改。
签名后的数据以 base64 编码的字符串形式返回。
"""
# URLSafeSerializer 自动处理 JSON 序列化和签名
signed = self.url_serializer.dumps(data)
print(f"数据已签名(长度:{len(signed)} 字符)")
return signed

def verify_signed_data(self, signed_data: str) -> Optional[Any]:
"""
验证签名并还原数据。
如果签名无效(数据被篡改),抛出 BadSignature 异常。
"""
try:
data = self.url_serializer.loads(signed_data)
print("签名验证通过,数据完整未被篡改")
return data
except BadSignature:
print("警告:数据签名无效,数据可能被篡改!")
return None

def sign_with_expiry(self, data: Any,
max_age_seconds: int = 3600) -> str:
"""
带过期时间的签名。
适用于需要限时使用的令牌(如密码重置链接)。
"""
signed = self.timed_serializer.dumps(data)
print(f"数据已签名(有效期 {max_age_seconds} 秒)")
return signed

def verify_with_expiry(self, signed_data: str,
max_age_seconds: int = 3600) -> Optional[Any]:
"""
验证签名并检查是否过期。
"""
try:
data = self.timed_serializer.loads(
signed_data, max_age=max_age_seconds
)
print("签名验证通过,且在有效期内")
return data
except BadSignature:
print("警告:签名无效或已过期!")
return None


class SignEncryptSerializer:
"""
签名 + 加密的序列化方案 —— 自己实现 HMAC 签名。
确保数据的完整性和机密性。
"""

def __init__(self, signing_key: str, encryption_key: Optional[str] = None):
self.signing_key = signing_key.encode()
self.encryption_key = encryption_key

def serialize_secure(self, data: Any) -> bytes:
"""
安全序列化:先编码 -> 再加密(可选)-> 最后签名。
先签名后加密可以防止签名碰撞攻击。
"""
# 第一步:序列化为 JSON
payload = json.dumps(data, ensure_ascii=False).encode()

# 第二步:计算 HMAC 签名
signature = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()

# 第三步:组合签名和数据
result = json.dumps({
"signature": signature,
"payload": payload.decode()
}).encode()
return result

def deserialize_secure(self, data: bytes) -> Optional[Any]:
"""
安全反序列化:先验证签名 -> 再解密(可选)-> 最后解码。
"""
try:
# 第一步:解析外层 JSON
wrapper = json.loads(data.decode())

# 第二步:提取签名和载荷
expected_sig = wrapper["signature"]
payload_str = wrapper["payload"]
payload = payload_str.encode()

# 第三步:验证签名
actual_sig = hmac.new(
self.signing_key, payload, hashlib.sha256
).hexdigest()

# 使用 hmac.compare_digest 防止时序攻击
if not hmac.compare_digest(expected_sig, actual_sig):
print("警告:HMAC 签名不匹配,数据被篡改!")
return None

# 第四步:解码 JSON
return json.loads(payload)

except (json.JSONDecodeError, KeyError, Exception) as e:
print(f"反序列化失败:{e}")
return None


# ========== 第四部分:Notrust 反序列化模式 ==========

class NotrustDeserializer:
"""
"不信任"反序列化模式 —— 处理不可信数据时的安全最佳实践。
即使使用安全的格式,也要验证数据的结构和内容。
"""

@staticmethod
def safe_deserialize(data: str,
expected_schema: Optional[Dict] = None) -> Optional[Dict]:
"""
安全的反序列化:验证类型、结构和内容后再使用。
"""
try:
# 第一步:使用安全格式解析
obj = json.loads(data)

# 第二步:验证类型
if not isinstance(obj, dict):
print("错误:期望 JSON 对象,得到其他类型")
return None

# 第三步:验证必填字段
if expected_schema:
for field, field_type in expected_schema.items():
if field not in obj:
print(f"错误:缺少必填字段 '{field}'")
return None
if not isinstance(obj[field], field_type):
print(f"错误:字段 '{field}' 类型错误")
return None

# 第四步:验证值范围
for key, value in obj.items():
if isinstance(value, str) and len(value) > 1000:
print(f"错误:字段 '{key}' 超过最大长度")
return None
if isinstance(value, (int, float)) and abs(value) > 1e9:
print(f"错误:字段 '{key}' 数值超出范围")
return None

print("反序列化验证通过")
return obj

except json.JSONDecodeError:
print("错误:无效的 JSON 格式")
return None


# ========== 第五部分:演示 ==========

def demo_secure_serialization():
"""
演示各种序列化的安全特性和风险。
"""
print("=== 安全序列化演示 ===\n")

# 1. Pickle 风险演示
print("--- Pickle 反序列化风险 ---")
malicious = UnsafePickleDemo.create_malicious_payload()
print("注意:在生产代码中绝不要对不可信数据调用 pickle.loads()")

# 2. 安全格式比较
print("\n--- 序列化格式比较 ---")
test_data = {
"user": "alice",
"scores": [95, 87, 92],
"metadata": {"version": "1.0", "timestamp": 1234567890}
}
SafeSerialization.compare_formats(test_data)

# 3. itsdangerous 签名
print("\n--- 带签名的安全序列化 ---")
serializer = SecureSerializer(
secret_key="my-secret-key-change-in-production"
)
user_data = {"user_id": 123, "role": "admin"}
signed = serializer.sign_data(user_data)
verified = serializer.verify_signed_data(signed)
print(f"还原数据:{verified}")

# 4. 篡改检测演示
print("\n--- 篡改检测演示 ---")
tampered = signed[:-1] + ("X" if signed[-1] != "X" else signed[-2])
result = serializer.verify_signed_data(tampered)
print(f"篡改数据还原结果:{result}")

# 5. Schema 验证
print("\n--- Notrust 反序列化验证 ---")
safe_data = '{"name": "Alice", "age": 30}'
schema = {"name": str, "age": int}
result = NotrustDeserializer.safe_deserialize(safe_data, schema)
print(f"验证通过的数据:{result}")

# 6. 序列化安全建议
print("\n--- 序列化安全最佳实践 ---")
print("""
1. 永远不要对不可信数据使用 pickle.loads()
2. 优先使用 JSON 序列化(纯数据,不执行代码)
3. 对序列化数据进行 HMAC 签名以防止篡改
4. itsdangerous 提供了简洁的签名+序列化方案
5. 反序列化后验证数据结构和值范围
6. 敏感数据在序列化前应加密
""")


if __name__ == "__main__":
demo_secure_serialization()

http://www.gsyq.cn/news/1433301.html

相关文章:

  • 5分钟极简方案:在Mac上解锁QQ音乐加密文件
  • ESP32-S3 + LVGL 8.3实战:如何为你的3.5寸SPI屏(ILI9488)定制UI并优化性能
  • 多智能体AI系统在风险投资决策中的架构设计与工程实践
  • Python安全会话管理
  • 喜讯!奋飞咨询春明老师辅导客户斩获Ecovadis铜牌! - 奋飞咨询ecovadis
  • AI Wrapper实战指南:从API调用到构建可持续AI产品的核心挑战
  • AI与区块链融合:Obizcoin如何重塑创业协作与信任机制
  • 2026年淮安市CPPM报名十大核心问题全流程答疑 - 众智商学院课程中心
  • 机器学习在游戏难度动态平衡中的应用与策略层设计
  • 盘点!8款热门CRM平台全维度评测,综合实力大比拼 - Joyky
  • 别再只会用预设了!用Unity粒子系统手搓一个带拖尾和二次爆炸的烟花(附完整项目文件)
  • C语言深度解析:从内存管理到系统编程的实战指南
  • 别再到处找了!一份SMIC 0.18um工艺库文件详解,带你搞懂每个文件夹是干嘛的
  • STM32H723ZGT6网络通信避坑实录:CubeMX配置LWIP+FreeRTOS,就差这行PHY复位代码
  • 期权策略分析——希腊字母与盈亏图Excel绘制
  • NS-USBloader:一站式解决Switch游戏安装与系统引导的三大痛点
  • 不止于游戏:用Unity的Animation系统模拟智能家居‘自动门’(从建模到触发逻辑全流程)
  • DownKyi终极指南:3步掌握B站视频批量下载与管理的完整方案
  • 告别文献混乱:手把手教你用Zotero打造个人知识库(含标签、关联与RSS订阅配置)
  • Windows/Mac通用!用Anaconda+PyTorch搞定CodeFormer环境搭建,附国内镜像加速
  • 告别连接烦恼:手把手教你用SecureCRT 8.5搞定服务器远程管理(附激活避坑指南)
  • MATRIX:下一代去中心化预言机与可验证计算协议深度解析
  • 抖音Scheme抓包实战:从Fiddler到反编译,手把手教你获取最新跳转链接
  • 量子计算与高性能计算融合:架构解析与编程实践
  • 从50MHz到随心所欲:我的QuartusII+FPGA数控分频器踩坑实录(附完整代码与仿真)
  • 保姆级避坑指南:用树莓派Zero 2 W搭建智能花盆,从传感器接线到Python代码调试全流程
  • 避开这些坑!STM32F407 SD卡擦除与文件系统(FATFS)移植关键步骤详解
  • 数据科学家必知:伦理AI工具库实战指南与工作流整合
  • 从调试工具到系统思维:工程师构建终身调试能力的实战指南
  • Modelsim 2024配置Vivado IP仿真库全记录:从库编译到工程搭建的完整避坑手册