Change Data Capture(CDC)

变化数据捕捉(Change Data Capture)

ProtonBase 提供了逻辑流复制的方法将执行的数据写入/删除通过 SQL 以流的方式传送给外部消费者。这种功能可以被用于多种目的,包括 CDC(Change Data Capture)和数据的审计。

流复制协议的使用包含两种方式:

  • 通过 pg_logical 相关的函数来获取数据,一般用来做少量数据的测试

  • 通过完整的 Logical Replication Protocol 来获取完整的获取数据

前置条件

启用Database级别的CDC能力,需要调整WAL_LEVELlogical级别(logical级别高于默认的replica级别),请注意,更改WAL_LEVEL需要重新建立连接才能生效。

ALTER DATABASE <db_name> SET WAL_LEVEL TO LOGICAL;

通过pg_logical函数获取数据变更

举一个例子:

postgres=# -- 使用输出插件'test_decoding'创建一个名为'regression_slot'的槽
postgres=# SELECT * FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
    slot_name    |         lsn
-----------------+---------------------
 regression_slot | 1698640579589000000
(1 row)
 
postgres=# SELECT slot_name, plugin, slot_type, database, active, restart_lsn, confirmed_flush_lsn FROM pg_replication_slots;
    slot_name    |    plugin     | slot_type | database | active |     restart_lsn     | confirmed_flush_lsn
-----------------+---------------+-----------+----------+--------+---------------------+---------------------
 regression_slot | test_decoding | logical   | postgres | f      | 1698640579588460626 | 0
(1 row)
 
postgres=# -- 目前还看不到更改
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
postgres=# CREATE TABLE data(id serial primary key, data text);
CREATE TABLE
 
postgres=# -- DDL 没有被复制,因此你将看到的东西只有事务,跳过空事务
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
 lsn | xid | data
-----+-----+------
(0 rows)
 
 
postgres=# BEGIN;
postgres=# INSERT INTO data(data) VALUES('1');
postgres=# INSERT INTO data(data) VALUES('2');
postgres=# COMMIT;
 
postgres=# SELECT * FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |     xid     |                          data
-------------------+-------------+---------------------------------------------------------
 1792C8F7/7B0A3F65 | -1611778953 | BEGIN
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:1 data[text]:'1'
 1792C8F7/7B0A3F65 | -1611778953 | table public.data: INSERT: id[integer]:2 data[text]:'2'
 1792C8F7/7B0A3F65 | -1611778953 | COMMIT
(4 rows)
 
postgres=# INSERT INTO data(data) VALUES('3');
 
postgres=# -- 你也可以不消费更改而在更改流中先看一看
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- 接下来对 pg_logical_slot_peek_changes() 的调用再次返回相同的更改
postgres=# SELECT * FROM pg_logical_slot_peek_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
        lsn        |    xid     |                          data
-------------------+------------+---------------------------------------------------------
 1792C909/DFC392DA | 2109858110 | BEGIN
 1792C909/DFC392DA | 2109858110 | table public.data: INSERT: id[integer]:3 data[text]:'3'
 1792C909/DFC392DA | 2109858110 | COMMIT
(3 rows)
 
postgres=# -- 当不再需要一个槽后记住销毁它以停止消耗服务器资源:
postgres=# SELECT pg_drop_replication_slot('regression_slot');
 pg_drop_replication_slot
-----------------------
 
(1 row)

这里提到了几个概念:

  • 逻辑编码(Logical Decoding)

数据库中实时写入数据时,会写入内部格式的 Write Ahead Log(WAL),但是这个格式,对于用户来说是并不理解的,逻辑编码的作用是将 WAL,编码成客户端可以理解的格式,如上面例子里,test_decoding 是将实时数据编码成类似于 SQL 的形式

  • 复制槽(Replication Slot)

每一个复制槽代表着一个 Database 订阅的一条实时数据流。每个 Database 可以创建多个复制槽,同时这几个复制槽的消费进度互相隔离。

  • 输出插件(Decoding Plugins)

实现数据流的输出格式是由输出插件决定的,目前 ProtonBase 支持两种输出插件分别是 test_decodingpgoutput

test_decoding 的输出格式是类似于 SQL 的文本格式

pgoutput 是性能更好的遵循 postgresql logical replication protocol 的二进制格式,但需要客户端去做 decode.

编码协议内容,可参考Logical Replication Message Formats (opens in a new tab)

流复制协议

除了使用上述例子中的函数进行流复制数据导出的测试以外,更高效的方式是以单独流复制协议的方式。

一般不同的语言的 postgresql jdbc driver 都有相应的封装,也可以用过普通 connection 的方式来连接,自己来处理协议,协议内容可参考:Streaming Replication Protocol (opens in a new tab)

使用Flink消费CDC

Flink可以通过postgres-cdc connector获取ProtonBase的CDC数据,需要提前在数据库中创建好Slot

以下例子中,首先通过CDC方式读取Source表,然后写入Sink表。

CREATE TEMPORARY TABLE jdbc_source (
        ins_time timestamp(6) NULL,
        store_id string NULL,
        store_code string NOT NULL,
        store_name string NULL,
        time_section_no bigint NULL,
        stat_date string NOT NULL,
        PRIMARY KEY (store_code,stat_date) NOT ENFORCED
) WITH (
  'connector' = 'postgres-cdc',
  'hostname' = 'host-url',
  'port' = '5432',
  'username' = 'xxxx',
  'password' = 'xxxx',
  'database-name' = 'dbname',
  'schema-name' = 'schemaname',
  'table-name' = 'tablename',
  'slot.name' = 'slotname',
  'scan.startup.mode' = 'latest-offset',
  'scan.incremental.snapshot.enabled' = 'true',
  'decoding.plugin.name' = 'pgoutput'
);
 
CREATE TEMPORARY TABLE jdbc_sink(
        store_id string NULL,
        cnt bigint NOT NULL,
        PRIMARY KEY (store_id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:postgresql://localhost:5432/dbname',
  'table-name' = 'schema.table_name',
  'username' = 'xxxx',
  'password' = 'xxxx'
);
 
INSERT INTO jdbc_sink 
SELECT store_id, count(*) as cnt 
FROM jdbc_source GROUP BY store_id;