当前位置: 首页 > news >正文

PHP数据同步与CDC变更数据捕获

PHP数据同步与CDC变更数据捕获

CDC(Change Data Capture)是一种跟踪数据库变更的技术。PHP可以通过多种方式实现CDC和数据同步。今天说说PHP中CDC的实现方案。

CDC的核心是捕获数据的插入、更新和删除操作,并将变更传播到其他系统。

```php
class ChangeTracker
{
private PDO $pdo;
private string $table;

public function __construct(PDO $pdo, string $table)
{
$this->pdo = $pdo;
$this->table = $table;
$this->initTrackingTable();
}

private function initTrackingTable(): void
{
$this->pdo->exec("
CREATE TABLE IF NOT EXISTS cdc_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
table_name VARCHAR(100) NOT NULL,
operation ENUM('insert', 'update', 'delete') NOT NULL,
entity_id INT NOT NULL,
old_data JSON,
new_data JSON,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processed BOOLEAN DEFAULT FALSE,
INDEX idx_unprocessed (processed, changed_at)
)
");
}

public function logInsert(int $entityId, array $data): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, new_data)
VALUES (?, 'insert', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($data, JSON_UNESCAPED_UNICODE)]);
}

public function logUpdate(int $entityId, array $oldData, array $newData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data, new_data)
VALUES (?, 'update', ?, ?, ?)
");
$stmt->execute([
$this->table,
$entityId,
json_encode($oldData, JSON_UNESCAPED_UNICODE),
json_encode($newData, JSON_UNESCAPED_UNICODE),
]);
}

public function logDelete(int $entityId, array $oldData): void
{
$stmt = $this->pdo->prepare("
INSERT INTO cdc_log (table_name, operation, entity_id, old_data)
VALUES (?, 'delete', ?, ?)
");
$stmt->execute([$this->table, $entityId, json_encode($oldData, JSON_UNESCAPED_UNICODE)]);
}

public function getUnprocessed(int $limit = 100): array
{
$stmt = $this->pdo->prepare("
SELECT * FROM cdc_log
WHERE processed = FALSE
ORDER BY id ASC
LIMIT ?
");
$stmt->execute([$limit]);
return $stmt->fetchAll();
}

public function markProcessed(int $id): void
{
$this->pdo->prepare("UPDATE cdc_log SET processed = TRUE WHERE id = ?")->execute([$id]);
}
}

class UserServiceWithCDC
{
private PDO $pdo;
private ChangeTracker $tracker;

public function __construct(PDO $pdo, ChangeTracker $tracker)
{
$this->pdo = $pdo;
$this->tracker = $tracker;
}

public function createUser(string $name, string $email): int
{
$stmt = $this->pdo->prepare("INSERT INTO users (name, email) VALUES (?, ?)");
$stmt->execute([$name, $email]);
$id = (int)$this->pdo->lastInsertId();

$this->tracker->logInsert($id, ['name' => $name, 'email' => $email]);
return $id;
}

public function updateUser(int $id, array $data): void
{
$oldStmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$oldStmt->execute([$id]);
$oldData = $oldStmt->fetch(PDO::FETCH_ASSOC);

$sets = [];
$params = [];
foreach ($data as $key => $value) {
$sets[] = "{$key} = ?";
$params[] = $value;
}
$params[] = $id;

$sql = "UPDATE users SET " . implode(', ', $sets) . " WHERE id = ?";
$this->pdo->prepare($sql)->execute($params);

$this->tracker->logUpdate($id, $oldData, $data);
}

public function deleteUser(int $id): void
{
$stmt = $this->pdo->prepare("SELECT * FROM users WHERE id = ?");
$stmt->execute([$id]);
$oldData = $stmt->fetch(PDO::FETCH_ASSOC);

$this->pdo->prepare("DELETE FROM users WHERE id = ?")->execute([$id]);
$this->tracker->logDelete($id, $oldData);
}
}
?>

CDC消费者将变更同步到其他系统:

```php
class CdcConsumer
{
private ChangeTracker $tracker;
private array $handlers = [];

public function __construct(ChangeTracker $tracker)
{
$this->tracker = $tracker;
}

public function registerHandler(string $operation, callable $handler): void
{
$this->handlers[$operation][] = $handler;
}

public function process(): int
{
$processed = 0;
$changes = $this->tracker->getUnprocessed(50);

foreach ($changes as $change) {
try {
$handlers = $this->handlers[$change['operation']] ?? [];
foreach ($handlers as $handler) {
$handler($change);
}
$this->tracker->markProcessed($change['id']);
$processed++;
} catch (\Exception $e) {
error_log("CDC处理失败: {$e->getMessage()}");
}
}

return $processed;
}
}
?>

CDC是数据同步和事件驱动架构的基础技术。通过捕获数据库变更日志,可以将数据同步到缓存、搜索引擎或数据仓库。CDC的关键是变更的顺序性和幂等性,确保数据最终一致。PHP实现的CDC适合轻量级的同步场景,大规模场景建议使用Debezium等专业工具。

http://www.gsyq.cn/news/1448775.html

相关文章:

  • 2026新疆建筑资质/压力管道资质代办机构推荐排行 权威专业榜单 - 极欧测评
  • 山东喷涂工艺品牌2026最新排行:5家企业核心能力客观对比 - 奔跑123
  • ES2020七大新特性实战:构建单位价格计算器
  • 从AlphaZero到区块链:指数技术浪潮下的信任构建与伦理挑战
  • 别再炸机了!固定翼无人机重心调试保姆级指南(从原理到实操)
  • AI语音合成将如何重塑内容产业?:7大颠覆性趋势+3类已验证商业场景(附2025技术成熟度曲线)
  • # 总氮水质在线自动监测仪源头厂家推荐榜:2026国产技术突围与选型实战全解析 - 仪表品牌榜
  • 别再只会用Google了!网络安全工程师的“神器”FOFA,从语法到实战一次讲透
  • AI工具“免费”背后的精密算计:从Rate Limit到数据训练权,6大隐性条款如何 silently lock 你的生产力
  • 不只是libxcb-cursor:盘点Qt在Linux桌面(X11/Wayland)下那些容易缺失的图形库
  • 新鲜出炉!2026新疆建筑资质/压力管道资质代办机构推荐排行 专业评测榜 - 极欧测评
  • 2026 值得信赖的网站建设公司 TOP10 榜单,专业网站制作公司盘点 - 博客湾
  • 广州餐厅装修设计哪家好?10家公司实测对比 - 博客湾
  • 如何一键永久备份微信聊天记录:WeChatMsg完整解决方案
  • 2026东莞专业合同纠纷律所测评推荐前十:专业处理大额商事与复杂合同争议 - 速递信息
  • 垂直行业全覆盖|实在Agent2026 商业案例库 + 降本增效实操指南
  • 别再傻傻复制文件了!用Linux软链接管理项目依赖,效率翻倍(附ln命令详解)
  • 大连中志钢结构工程:金州专业的钢结构加工公司有哪些 - LYL仔仔
  • 闲置名表放着也是落灰!同城快速回款,变现高效又省心 - 合扬奢侈品交易中心
  • 如何快速配置猫抓扩展:5个简单步骤的完整指南
  • 从ParseArgs宏看C++命令行解析:手搓一个stressapptest同款参数解析器
  • 昇腾开发的“基石”——CANN-Infra基础设施仓库架构原理与实战指南
  • BOTW存档编辑器终极指南:5分钟掌握武器耐久与资源修改
  • AI时代必备:小白程序员如何利用大模型抢占职场先机?收藏这份进阶指南!
  • 从零开始:如何为qBittorrent编写自定义搜索插件
  • 自动化浪潮下计算机工程师的进化:从代码工人到系统架构师
  • 初识AI Agent:小白程序员必备的六大核心模块解析(收藏版)
  • 从零实现带噪梯度与空洞卷积的反向传播:NumPy手写深度学习核心算法
  • STM32F407基于USART1的DMA双工通信方案,含环形缓冲队列防丢包
  • Tessy新手避坑指南:从零搭建单元测试工程(含PDBX文件迁移配置)