Change Data Capture(CDC)

变化数据捕捉(Change Data Capture)

ProtonBase 提供了完整的逻辑复制(Logical Replication)功能,支持通过 CDC(Change Data Capture)机制捕获数据变更。本文档将重点介绍如何使用 publication 和 replication slot 这些核心概念来实现高效的数据变更捕获。

核心概念

  • Publication (发布)

Publication 定义了要从数据库中复制的表及其变更类型(INSERT, UPDATE, DELETE)。您可以创建针对特定表的 publication,也可以创建包含所有表的 publication。

  • Replication Slot (复制槽)

Replication slot 是服务器端的机制,用于跟踪消费者的复制进度,确保在消费者确认接收变更前,WAL(Write-Ahead Log)日志不会被删除。这为消费者提供了"至少一次"的交付保证。

其他概念

  • Logical Decoding(逻辑编码)

数据库中实时写入数据时,会写入内部格式的 Write Ahead Log(WAL),但是这个格式,对于用户来说是并不理解的,逻辑编码的作用是将 WAL,编码成客户端可以理解的格式。

  • Decoding Plugins(输出插件)

实现数据流的输出格式是由输出插件决定的,目前 ProtonBase 支持两种输出插件分别是 test_decodingpgoutput。test_decoding 的输出格式是类似于 SQL 的文本格式,仅用于测试验证。pgoutput 是性能更好的遵循 postgresql logical replication protocol 的二进制格式,但需要客户端去做 decode,Flink CDC 使用的是 pgoutput 格式.

更多编码协议内容,可参考Logical Replication Message Formats (opens in a new tab)

前置条件

首先,确保 ProtonBase 已配置为支持逻辑复制:需要调整WAL_LEVELlogical级别(logical级别高于默认的replica级别),请注意,更改WAL_LEVEL需要重新建立连接才能生效。

-- 检查当前配置
SHOW wal_level;
 
-- 如果未设置为 logical,需要修改配置并重建连接
ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;

Publication

Publication 是 ProtonBase中实现表数据变更订阅的核心机制。通过 Publication,您可以定义一个或多个表的变更集合,然后由 Subscription 进行订阅,实现数据的实时同步和分发。

创建 Publication

创建 Publication

-- 创建一个包含特定表变更的 Publication
CREATE PUBLICATION my_publication FOR TABLE table1, table2;
 
-- 创建一个包含所有表变更的 Publication
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;
 
-- 通过父表发布变更
ALTER PUBLICATION partitioned_table_pub SET (publish_via_partition_root = true);

创建指定操作类型的 Publication

-- 只发布 INSERT 操作
CREATE PUBLICATION insert_only_pub FOR TABLE table1 WITH (publish = 'insert');
 
-- 发布 INSERT 和 UPDATE 操作
CREATE PUBLICATION insert_update_pub FOR TABLE table1, table2 WITH (publish = 'insert, update');
 
-- 发布所有操作 (默认)
CREATE PUBLICATION all_ops_pub FOR TABLE table1 WITH (publish = 'insert, update, delete, truncate');

修改 Publication

添加表到 Publication

ALTER PUBLICATION my_publication ADD TABLE table3, table4;

从 Publication 移除表

ALTER PUBLICATION my_publication DROP TABLE table1;

修改发布的操作类型

ALTER PUBLICATION my_publication SET (publish = 'insert, update');

查看 Publication

列出所有 Publication

SELECT * FROM pg_publication;

查看特定 Publication 的详细信息

SELECT * FROM pg_publication WHERE pubname = 'my_publication';

删除 Publication

可以在 publication 创建的 database 内对 publication 进行删除。

-- 删除 publication
DROP PUBLICATION my_publication;
 
-- 检查依赖关系后再删除
-- 查看哪些 slot 使用了该 publication
SELECT slot_name FROM pg_replication_slots WHERE active_pid IS NOT NULL AND confirmed_flush_lsn IS NOT NULL;

Replication Slot

创建 Slot

-- 创建逻辑复制槽,noexport_snapshot参数为 true,暂不支持导出快照,空间使用更节省
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput', false, true);

更新 Slot

在某些情况下,您可能需要手动更新 replication slot 的位置(LSN):

-- 将 slot 推进到当前 WAL 位置
SELECT pg_replication_slot_advance('my_slot', pg_current_wal_lsn());

查看 Slot

-- 查看所有 replication slot
SELECT * FROM pg_replication_slots;
 
-- 查看 replication slot 的滞后情况
SELECT slot_name, confirmed_flush_lsn, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots;

删除 Slot

可以在 replication slot 创建的 database 内对 slot 进行删除。

删除前建议先检查 slot 的状态

-- 检查 slot 是否活跃
SELECT active FROM pg_replication_slots WHERE slot_name = 'my_slot';
 
-- 检查是否有消费者连接
SELECT
    r.pid,
    r.usename,
    r.application_name,
    r.client_addr,
    s.slot_name,
    r.state,
    r.sent_lsn,
    r.flush_lsn,
    s.confirmed_flush_lsn
FROM pg_stat_replication r
JOIN pg_replication_slots s ON r.pid = s.active_pid
WHERE s.slot_name = 'my_slot';
-- 安全删除 replication slot
SELECT pg_drop_replication_slot('my_slot');

Apache Flink 消费 CDC

Apache Flink 可以使用官方的postgres-cdc connector消费 CDC 事件,消费指定的 publication,并在 Flink 执行 checkpoint 时,在 slot 中确认更新 LSN,具体使用参考 Apache Flink 集成

有关更多postgres-cdc connector的使用参数,参考Postgres CDC Connector (opens in a new tab)

使用示例

创建测试表:

-- 创建示例表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
 
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

创建 Publication:

-- 或者只包含特定表
CREATE PUBLICATION flink_cdc_pub FOR TABLE users, orders;

创建 Replication Slot:

-- 为 Flink 创建专用的 replication slot
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
 
-- 验证 slot 创建
SELECT slot_name, plugin, slot_type, active FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';

Flink 消费 CDC 相关作业配置,请参参考 Apache Flink 集成

监控 Slot 状态:

-- 查看 slot 状态和滞后情况
SELECT
    slot_name,
    active,
    pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_pretty,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS restart_lag_pretty
FROM pg_replication_slots
WHERE slot_name = 'flink_cdc_slot';

处理 Slot 停滞,如果发现 slot 停滞(active=false 但未被使用):

-- 尝试重新激活
SELECT pg_replication_slot_advance('flink_cdc_slot', pg_current_wal_lsn());
 
-- 如果确认不再需要,可以删除重建
SELECT pg_drop_replication_slot('flink_cdc_slot');
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);

当作业完成或需要重新配置时,按顺序清理资源:

  • 停止 Flink 作业,确保 Flink 作业已完全停止,不再使用 CDC 连接。

  • 删除 Replication Slot

-- 安全删除 slot
SELECT pg_drop_replication_slot('flink_cdc_slot');
 
-- 验证删除
SELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
  • 删除 Publication
-- 删除 publication
DROP PUBLICATION flink_cdc_pub;
 
-- 验证删除
SELECT * FROM pg_publication WHERE pubname = 'flink_cdc_pub';

pg_logical函数

可以通过 pg_logical 相关函数和 test_decoding 插件在数据库内进行 CDC 事件的验证。

具体如下:

postgres=# -- 使用输出插件'test_decoding'创建一个名为'regression_slot'的槽
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    |         lsn
-----------------+---------------------
 regression_slot | 1698640579589000000
(1 row)
 
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | database | active |     restart_lsn     | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
 regression_slot | test_decoding | logical   | postgres | f      | 1698640579588460626 | 0
(1 row)
 
postgres=# -- 目前还看不到更改
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
 
postgres=# -- DDL 没有被复制,因此你将看到的东西只有事务,跳过空事务
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
 
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
 
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |     xid     |                          data
-------------------+-------------+---------------------------------------------------------
 1792C8F7/7B0A3F65 | -1611778953 | BEGIN
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
 
postgres=# INSERT INTO data(data) VALUES('3');
 
postgres=# -- 你也可以不消费更改而在更改流中先看一看
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- 接下来对 pg_logical_slot_peek_changes() 的调用再次返回相同的更改
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- 当不再需要一个槽后记住销毁它以停止消耗服务器资源:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------
 
(1 row)

流复制协议

除了使用上述例子中的函数进行流复制数据导出的测试以外,更高效的方式是以单独流复制协议的方式。

一般不同的语言的 postgresql jdbc driver 都有相应的封装,也可以用过普通 connection 的方式来连接,自己来处理协议,协议内容可参考:Streaming Replication Protocol (opens in a new tab)