第五难:MongoDB到PostgreSQL的类型转换
问题
MongoDB和PostgreSQL的数据类型完全不兼容:
| MongoDB | PostgreSQL | 问题 |
|---|---|---|
| ObjectId | 无对应类型 | 主键转换 |
| BSON对象 | JSONB | 嵌套结构 |
| 数组 | Array | 类型声明 |
解决方案
在配置表的扩展字段定义类型映射:
{ "mongoCollection": "user_profile", "pgTable": "user_profile", "fieldMapping": { "_id": "id", "preferences": "preferences", "tags": "tags" }, "typeMapping": { "_id": "OBJECTID_TO_VARCHAR", "preferences": "JSONB", "tags": "INTEGER_ARRAY" } }类型转换代码:
private String convertValue(Object value, String typeRule) { if (value == null) return "NULL"; switch (typeRule) { case "JSONB": // {name: "test"} → '{"name":"test"}'::jsonb String json = toJsonString(value); return "'" + escapeSql(json) + "'::jsonb"; case "INTEGER_ARRAY": // [1,2,3] → ARRAY[1,2,3]::INTEGER[] List<Integer> list = (List) value; return "ARRAY[" + String.join(",", list) + "]::INTEGER[]"; case "OBJECTID_TO_VARCHAR": // ObjectId("507f...") → '507f...' return "'" + value.toString() + "'"; default: return convertDefault(value); } }复盘:一个月完成迁移的关键
整体架构:塔外-塔内双链路
┌──────────── 塔外系统 (Outer) ────────────┐
│ │
│ ① API触发同步 │
│ ② 查询配置表 → 拆分公司级/店铺级配置 │
│ ③ 构建MQ消息 → 投递RocketMQ │
│ ④ MQ Consumer │
│ ├─ SHOW CREATE TABLE 获取表结构 │
│ ├─ 流式读取源数据库 │
│ ├─ 生成 DELETE + INSERT SQL │
│ ├─ 分号替换为特殊符号 │
│ └─ 上传到 OSS │
└───────────────────────────────────────────┘
│
│ OSS中转
↓
┌──────────── 塔内系统 (Inner) ────────────┐
│ │
│ ⑤ 定时任务 / 手动触发 │
│ ⑥ 扫描OSS目录 → 获取待处理SQL文件列表 │
│ ⑦ 流式下载SQL文件 → 逐行读取 │
│ ├─ 特殊符号还原为分号 │
│ ├─ 批量执行(1000条/批) │
│ └─ setAutoCommit(true) 防止事务过大 │
│ ⑧ 执行成功 → 立即删除OSS文件 │
└───────────────────────────────────────────┘
核心亮点总结
| 技术点 | 传统方案 | 本方案 | 效果 |
|---|---|---|---|
| 表结构获取 | 手写100个Mapper | SHOW CREATE TABLE动态解析 | 零硬编码,支持任意表 |
| SQL分隔符 | 用;判断结束 | 特殊符号;#END# | 支持数据含分号、换行符 |
| 同步策略 | 全量同步or硬编码 | 配置表+占位符 | 灵活配置,4种策略 |
| 大数据量处理 | 一次性加载(OOM) | 流式读取+临时文件 | 常量级内存,50W+行稳定 |
| 扩展性 | 新增表需改代码 | 只需加配置 | 秒级上线新表同步 |
做对的3件事
1. 从工具中偷师学艺
Navicat的导入/导出功能启发了整体方案,SHOW CREATE TABLE是突破口
2. 把复杂逻辑放在塔外
塔内只负责执行SQL,逻辑简单;塔外可以随意调试、优化
3. 配置驱动,而非代码驱动
新增表只需加配置,不改代码。后续维护成本趋近于0
最终效果
| 指标 | 数据 |
|---|---|
| 迁移表数量 | 200+张(含后续新增) |
| 最大单表数据 | 1000+万行 |
| 首次全量同步 | 10-30分钟 |
| 日常增量同步 | 公司级表约30秒,店铺级表约1分钟 |
| 内存占用 | 稳定在200MB左右 |
| OOM次数 | 0(连续运行3个月) |
| 工期 | 25天(提前5天完成) |
写在最后
以上便是我这次迁移实战的全部分享。绝非标准答案,但希望能为你带来一丝灵感。
这次迁移让我深刻体会到:
好的架构不是设计出来的,而是从实际问题中"偷"出来的。
当你面对技术难题时,不妨问自己:
- 有没有现成的工具已经解决了类似问题?不要重复造轮子!!(Navicat)
- 数据库/框架本身提供了什么能力?(SHOW CREATE TABLE、setFetchSize)
- 能否用配置代替硬编码?(配置表+占位符)
感谢那些"默默扛下所有"的技术细节
SHOW CREATE TABLE—— 你扛下了表结构解析的苦活stmt.setFetchSize(Integer.MIN_VALUE)—— 你默默守护了内存安全;#END#—— 你可能是全网最诡异但最实用的分隔符- RocketMQ的TAG过滤—— 你让消息路由变得优雅
- CompletableFuture—— 你让塔内并发处理成为可能
System.lineSeparator()—— 你让SQL文件格式清晰明了
