导出到 Kafka
Teleport 支持将 CDC(Change Data Capture)事件推送到下游 Kafka,便于下游系统集成和实时数据处理。
导出格式
以下数据样例基于如下示例表:
CREATE TABLE tx(
hash text,
maker text,
ts timestamp,
volume numeric(160,60),
PRIMARY KEY(hash)
);
KV JSON 格式
仅输出更新后的数据,适用于前端状态展示、状态同步等场景。
// 执行SQL: INSERT INTO tx VALUES('0496f994-3c42-4cf3-86f0-7faf20699bd5', '840', NOW(), 230.33037109759)
// 输出内容:
{
"hash": "0496f994-3c42-4cf3-86f0-7faf20699bd5",
"maker": "840",
"ts": "2025-09-09 16:56:56.449000",
"volume": 230.33037109759
}
Maxwell JSON 格式
Maxwell 是一种 CDC JSON 协议,完整记录所有 INSERT
/UPDATE
/DELETE
操作。
字段说明
database
:变更操作所在数据库table
:变更操作的表type
:操作类型(insert/update/delete)ts
:变更时间(Unix 时间戳)xid
:变更事务 IDcommit
:是否提交data
:变更后的行数据
数据样例
// 执行: insert into tx VALUES(gen_random_uuid(), (random() * 1024)::bigint::text, now(), random() * 1024);
{
"database": "test",
"table": "tx",
"type": "insert",
"ts": 1757417323,
"xid": 616699495,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": "503",
"ts": "2025-09-09 19:28:43.484600",
"volume": 505.356811461980000000000000000000000000000000000000000000000000
}
}
// 执行: UPDATE tx SET volume = 1024 WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
"database": "test",
"table": "tx",
"type": "update",
"ts": 1757417442,
"xid": -1527714998,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": "503",
"ts": "2025-09-09 19:28:43.484600",
"volume": 1024.000000000000000000000000000000000000000000000000000000000000
},
"old": {
// old 默认只输出主键,修改订阅配置可输出完整的旧值
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": null,
"ts": null,
"volume": null
}
}
// 执行: DELETE FROM tx WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
"database": "test",
"table": "tx",
"type": "delete",
"ts": 1757417562,
"xid": -448725205,
"commit": true,
"data": {
"hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
"maker": null,
"ts": null,
"volume": null
}
}
导出作业配置
1. 创建导出任务
- 登录产品控制台
- 选择 "数据同步" --> "数据导出" --> "数据导出作业"
2. 选择目标类型
创建任务时,输出目标选择 Kafka:
3. 配置连接信息
Source 端配置
- 左侧 ProtonBase 端选择需要同步的实例
- 点击 "连接测试" 测试连接
- 测试成功会显示 "连接成功"
Sink 端配置(Kafka)
- 右侧填入目标 Kafka 的连接信息
- 网络配置:
- 首先需要解决 Teleport 与目标 Kafka 之间的网络联通问题。
- Bootstrap server:填入 Kafka 服务器地址和端口
- 若通过公网进行 CDC 推送,需要修改 Kafka 的
advertised.listeners
属性,避免连接问题
- 安全协议:支持 "PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL",根据实际情况选择
PLAINTEXT
:明文传输,适用于内网环境SASL_PLAINTEXT
:SASL 认证 + 明文传输,适用于需要认证但不要求加密的场景SASL_SSL
:SASL 认证 + SSL 加密传输,适用于公网或对安全性要求较高的场景
- 配置完成后点击 "连接测试" 测试连接
- 测试成功会显示 "连接成功"
4. 选择同步对象
选择需要同步的 schema 和 table:
可选:同步物化视图(MV)
如需同步物化视图,需在配置中勾选 Mview 类别。
5. 输出配置
同步模式(Sync Mode)
提供三种同步模式选项:
全量 FULL
:读取整张表的当前数据快照,写入 Kafka 后退出任务。适用于一次性数据迁移场景。增量 INCREMENTAL
:只读取增量 CDC(Change Data Capture)事件,持续写入 Kafka。适用于实时数据同步场景。全量 FULL + 增量 INCREMENTAL
:先读取表的当前数据快照,然后从快照点开始持续推送所有的 CDC JSON。适用于既需要历史数据又需要实时同步的场景。
其他配置项
- Topic:写入 Kafka 的 topic 名称(需预先创建)
- Zone ID:任务时区,默认 UTC
6. 高级参数说明
时区处理
Zone ID 是基于 JDK 的时区处理参数。由于 Maxwell 格式中没有 timestamptz
类型,PostgreSQL 的 timestamptz
到 Maxwell 的转换会按照配置的时区格式化后去掉时区信息。其他字段均遵循 Maxwell 标准,与 Maxwell 工具保持一致。
Zone ID 的有效值参考 Java 的时区标识符,常用的包括:
UTC
:协调世界时(默认值)Asia/Shanghai
:中国标准时间America/New_York
:美国东部时间Europe/London
:英国时间Japan
:日本时间
选择合适的时区可以确保时间数据在不同系统间正确转换和显示。
其他参数说明
output_binlog_position
:是否输出 binlog 位置信息(PG 位点)output_server_id
:是否输出 server ID(mock 实现)output_thread_id
:是否输出线程 ID(mock 实现)output_schema_id
:schema 变更时递增output_primary_keys
:是否输出主键值output_primary_key_columns
:是否输出主键列名output_push_timestamp
:是否输出发送时间kafka_key_format
:Kafka key 生成方式producer_partition_by
:分区策略producer_partition_columns
:分区列名(如按列分区)producer_partition_by_fallback
:分区策略失效时的备选方式kafka_partition_hash
:分区 hash 算法