变化数据捕捉(Change Data Capture)
ProtonBase 提供了逻辑流复制的方法将执行的数据写入/删除通过 SQL 以流的方式传送给外部消费者。这种功能可以被用于多种目的,包括 CDC(Change Data Capture)和数据的审计。
流复制协议的使用包含两种方式:
-
通过 pg_logical 相关的函数来获取数据,一般用来做少量数据的测试
-
通过完整的 Logical Replication Protocol 来获取完整的获取数据
前置条件
启用Database级别的CDC能力,需要调整WAL_LEVEL
到logical级别(logical级别高于默认的replica级别),请注意,更改WAL_LEVEL
需要重新建立连接才能生效。
ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;
通过pg_logical函数获取数据变更
举一个例子:
postgres=# -- 使用输出插件'test_decoding'创建一个名为'regression_slot'的槽
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
slot_name | lsn
-----------------+---------------------
regression_slot | 1698640579589000000
(1 row)
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
slot_name | plugin | slot_type | database | active | restart_lsn | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
regression_slot | test_decoding | logical | postgres | f | 1698640579588460626 | 0
(1 row)
postgres=# -- 目前还看不到更改
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
postgres=# -- DDL 没有被复制,因此你将看到的东西只有事务,跳过空事务
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-----+-----+------
(0 rows)
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+-------------+---------------------------------------------------------
1792C8F7/7B0A3F65 | -1611778953 | BEGIN
1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
postgres=# INSERT INTO data(data) VALUES('3');
postgres=# -- 你也可以不消费更改而在更改流中先看一看
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+------------+---------------------------------------------------------
1792C909/DFC392DA | 2109858110 | BEGIN
1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
postgres=# -- 接下来对 pg_logical_slot_peek_changes() 的调用再次返回相同的更改
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
lsn | xid | data
-------------------+------------+---------------------------------------------------------
1792C909/DFC392DA | 2109858110 | BEGIN
1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
postgres=# -- 当不再需要一个槽后记住销毁它以停止消耗服务器资源:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
pg_drop_replication_slot
-----------------------
(1 row)
这里提到了几个概念:
- 逻辑编码(Logical Decoding)
数据库中实时写入数据时,会写入内部格式的 Write Ahead Log(WAL),但是这个格式,对于用户来说是并不理解的,逻辑编码的作用是将 WAL,编码成客户端可以理解的格式,如上面例子里,test_decoding 是将实时数据编码成类似于 SQL 的形式
- 复制槽(Replication Slot)
每一个复制槽代表着一个 Database 订阅的一条实时数据流。每个 Database 可以创建多个复制槽,同时这几个复制槽的消费进度互相隔离。
- 输出插件(Decoding Plugins)
实现数据流的输出格式是由输出插件决定的,目前 ProtonBase 支持两种输出插件分别是 test_decoding 和 pgoutput
test_decoding 的输出格式是类似于 SQL 的文本格式
pgoutput 是性能更好的遵循 postgresql logical replication protocol 的二进制格式,但需要客户端去做 decode.
编码协议内容,可参考Logical Replication Message Formats (opens in a new tab)
流复制协议
除了使用上述例子中的函数进行流复制数据导出的测试以外,更高效的方式是以单独流复制协议的方式。
一般不同的语言的 postgresql jdbc driver 都有相应的封装,也可以用过普通 connection 的方式来连接,自己来处理协议,协议内容可参考:Streaming Replication Protocol (opens in a new tab)
使用Flink消费CDC
Flink可以通过postgres-cdc connector获取ProtonBase的CDC数据,需要提前在数据库中创建好Slot。
以下例子中,首先通过CDC方式读取Source表,然后写入Sink表。
CREATE TEMPORARY TABLE jdbc_source (
ins_time timestamp(6) NULL,
store_id string NULL,
store_code string NOT NULL,
store_name string NULL,
time_section_no bigint NULL,
stat_date string NOT NULL,
PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'host-url',
'port' = '5432',
'username' = 'xxxx',
'password' = 'xxxx',
'database-name' = 'dbname',
'schema-name' = 'schemaname',
'table-name' = 'tablename',
'slot.name' = 'slotname',
'scan.startup.mode' = 'latest-offset',
'scan.incremental.snapshot.enabled' = 'true',
'decoding.plugin.name' = 'pgoutput'
);
CREATE TEMPORARY TABLE jdbc_sink(
store_id string NULL,
cnt bigint NOT NULL,
PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:postgresql://localhost:5432/dbname',
'table-name' = 'schema.table_name',
'username' = 'xxxx',
'password' = 'xxxx'
);
INSERT INTO jdbc_sink
SELECT store_id, count(*) as cnt
FROM jdbc_source GROUP BY store_id;