变化数据捕捉(Change Data Capture)
ProtonBase 提供了完整的逻辑复制(Logical Replication)功能,支持通过 CDC(Change Data Capture)机制捕获数据变更。本文档将重点介绍如何使用 publication 和 replication slot 这些核心概念来实现高效的数据变更捕获。
核心概念
- Publication (发布)
Publication 定义了要从数据库中复制的表及其变更类型(INSERT, UPDATE, DELETE)。您可以创建针对特定表的 publication,也可以创建包含所有表的 publication。
- Replication Slot (复制槽)
Replication slot 是服务器端的机制,用于跟踪消费者的复制进度,确保在消费者确认接收变更前,WAL(Write-Ahead Log)日志不会被删除。这为消费者提供了"至少一次"的交付保证。
其他概念
- Logical Decoding(逻辑编码)
数据库中实时写入数据时,会写入内部格式的 Write Ahead Log(WAL),但是这个格式,对于用户来说是并不理解的,逻辑编码的作用是将 WAL,编码成客户端可以理解的格式。
- Decoding Plugins(输出插件)
实现数据流的输出格式是由输出插件决定的,目前 ProtonBase 支持两种输出插件分别是 test_decoding 和 pgoutput。test_decoding 的输出格式是类似于 SQL 的文本格式,仅用于测试验证。pgoutput 是性能更好的遵循 postgresql logical replication protocol 的二进制格式,但需要客户端去做 decode,Flink CDC 使用的是 pgoutput 格式.
更多编码协议内容,可参考Logical Replication Message Formats (opens in a new tab)
前置条件
首先,确保 ProtonBase 已配置为支持逻辑复制:需要调整WAL_LEVEL
到logical级别(logical级别高于默认的replica级别),请注意,更改WAL_LEVEL
需要重新建立连接才能生效。
-- 检查当前配置
SHOW wal_level;
-- 如果未设置为 logical,需要修改配置并重建连接
ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;
Publication
Publication 是 ProtonBase中实现表数据变更订阅的核心机制。通过 Publication,您可以定义一个或多个表的变更集合,然后由 Subscription 进行订阅,实现数据的实时同步和分发。
创建 Publication
创建 Publication
-- 创建一个包含特定表变更的 Publication
CREATE PUBLICATION my_publication FOR TABLE table1, table2;
-- 创建一个包含所有表变更的 Publication
CREATE PUBLICATION all_tables_pub FOR ALL TABLES;
-- 通过父表发布变更
ALTER PUBLICATION partitioned_table_pub SET (publish_via_partition_root = true);
创建指定操作类型的 Publication
-- 只发布 INSERT 操作
CREATE PUBLICATION insert_only_pub FOR TABLE table1 WITH (publish = 'insert');
-- 发布 INSERT 和 UPDATE 操作
CREATE PUBLICATION insert_update_pub FOR TABLE table1, table2 WITH (publish = 'insert, update');
-- 发布所有操作 (默认)
CREATE PUBLICATION all_ops_pub FOR TABLE table1 WITH (publish = 'insert, update, delete, truncate');
修改 Publication
添加表到 Publication
ALTER PUBLICATION my_publication ADD TABLE table3, table4;
从 Publication 移除表
ALTER PUBLICATION my_publication DROP TABLE table1;
修改发布的操作类型
ALTER PUBLICATION my_publication SET (publish = 'insert, update');
查看 Publication
列出所有 Publication
SELECT * FROM pg_publication;
查看特定 Publication 的详细信息
SELECT * FROM pg_publication WHERE pubname = 'my_publication';
删除 Publication
可以在 publication 创建的 database 内对 publication 进行删除。
-- 删除 publication
DROP PUBLICATION my_publication;
-- 检查依赖关系后再删除
-- 查看哪些 slot 使用了该 publication
SELECT slot_name FROM pg_replication_slots WHERE active_pid IS NOT NULL AND confirmed_flush_lsn IS NOT NULL;
Replication Slot
创建 Slot
-- 创建逻辑复制槽,noexport_snapshot参数为 true,暂不支持导出快照,空间使用更节省
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput', false, true);
更新 Slot
在某些情况下,您可能需要手动更新 replication slot 的位置(LSN):
-- 将 slot 推进到当前 WAL 位置
SELECT pg_replication_slot_advance('my_slot', pg_current_wal_lsn());
查看 Slot
-- 查看所有 replication slot
SELECT * FROM pg_replication_slots;
-- 查看 replication slot 的滞后情况
SELECT slot_name, confirmed_flush_lsn, pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) FROM pg_replication_slots;
删除 Slot
可以在 replication slot 创建的 database 内对 slot 进行删除。
删除前建议先检查 slot 的状态
-- 检查 slot 是否活跃
SELECT active FROM pg_replication_slots WHERE slot_name = 'my_slot';
-- 检查是否有消费者连接
SELECT
r.pid,
r.usename,
r.application_name,
r.client_addr,
s.slot_name,
r.state,
r.sent_lsn,
r.flush_lsn,
s.confirmed_flush_lsn
FROM pg_stat_replication r
JOIN pg_replication_slots s ON r.pid = s.active_pid
WHERE s.slot_name = 'my_slot';
-- 安全删除 replication slot
SELECT pg_drop_replication_slot('my_slot');
Apache Flink 消费 CDC
Apache Flink 可以使用官方的postgres-cdc connector消费 CDC 事件,消费指定的 publication,并在 Flink 执行 checkpoint 时,在 slot 中确认更新 LSN,具体使用参考 Apache Flink 集成。
有关更多postgres-cdc connector的使用参数,参考Postgres CDC Connector (opens in a new tab)。
使用示例
创建测试表:
-- 创建示例表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
创建 Publication:
-- 或者只包含特定表
CREATE PUBLICATION flink_cdc_pub FOR TABLE users, orders;
创建 Replication Slot:
-- 为 Flink 创建专用的 replication slot
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
-- 验证 slot 创建
SELECT slot_name, plugin, slot_type, active FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
Flink 消费 CDC 相关作业配置,请参参考 Apache Flink 集成。
监控 Slot 状态:
-- 查看 slot 状态和滞后情况
SELECT
slot_name,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn)) AS lag_pretty,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS restart_lag_pretty
FROM pg_replication_slots
WHERE slot_name = 'flink_cdc_slot';
处理 Slot 停滞,如果发现 slot 停滞(active=false 但未被使用):
-- 尝试重新激活
SELECT pg_replication_slot_advance('flink_cdc_slot', pg_current_wal_lsn());
-- 如果确认不再需要,可以删除重建
SELECT pg_drop_replication_slot('flink_cdc_slot');
SELECT * FROM pg_create_logical_replication_slot('flink_cdc_slot', 'pgoutput', false, true);
当作业完成或需要重新配置时,按顺序清理资源:
-
停止 Flink 作业,确保 Flink 作业已完全停止,不再使用 CDC 连接。
-
删除 Replication Slot
-- 安全删除 slot
SELECT pg_drop_replication_slot('flink_cdc_slot');
-- 验证删除
SELECT * FROM pg_replication_slots WHERE slot_name = 'flink_cdc_slot';
- 删除 Publication
-- 删除 publication
DROP PUBLICATION flink_cdc_pub;
-- 验证删除
SELECT * FROM pg_publication WHERE pubname = 'flink_cdc_pub';
pg_logical函数
可以通过 pg_logical 相关函数和 test_decoding 插件在数据库内进行 CDC 事件的验证。
具体如下:
postgres=# -- 使用输出插件'test_decoding'创建一个名为'regression_slot'的槽
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
slot_name | lsn
-----------------+---------------------
regression_slot | 1698640579589000000
(1 row)
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
regression_slot | test_decoding | logical | postgres | f | 1698640579588460626 | 0
(1 row)
postgres=# -- 目前还看不到更改
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
postgres=# -- DDL 没有被复制,因此你将看到的东西只有事务,跳过空事务
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+-------------+---------------------------------------------------------
1792C8F7/7B0A3F65 | -1611778953 | BEGIN
1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
postgres=# INSERT INTO data(data) VALUES('3');
postgres=# -- 你也可以不消费更改而在更改流中先看一看
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+------------+---------------------------------------------------------
1792C909/DFC392DA | 2109858110 | BEGIN
1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
postgres=# -- 接下来对 pg_logical_slot_peek_changes() 的调用再次返回相同的更改
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+------------+---------------------------------------------------------
1792C909/DFC392DA | 2109858110 | BEGIN
1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
postgres=# -- 当不再需要一个槽后记住销毁它以停止消耗服务器资源:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)
流复制协议
除了使用上述例子中的函数进行流复制数据导出的测试以外,更高效的方式是以单独流复制协议的方式。
一般不同的语言的 postgresql jdbc driver 都有相应的封装,也可以用过普通 connection 的方式来连接,自己来处理协议,协议内容可参考:Streaming Replication Protocol (opens in a new tab)