导出到 Kafka
Teleport 支持将 CDC(Change Data Capture)事件推送到下游 Kafka,便于下游系统集成和实时数据处理。
导出格式
以下数据样例基于如下示例表:
CREATE TABLE tx(
    hash text,
    maker text,
    ts timestamp,
    volume numeric(160,60),
    PRIMARY KEY(hash)
);KV JSON 格式
仅输出更新后的数据,适用于前端状态展示、状态同步等场景。
// 执行SQL: INSERT INTO tx VALUES('0496f994-3c42-4cf3-86f0-7faf20699bd5', '840', NOW(), 230.33037109759)
// 输出内容:
{
    "hash": "0496f994-3c42-4cf3-86f0-7faf20699bd5",
    "maker": "840",
    "ts": "2025-09-09 16:56:56.449000",
    "volume": 230.33037109759
}Maxwell JSON 格式
Maxwell 是一种 CDC JSON 协议,完整记录所有 INSERT/UPDATE/DELETE 操作。
字段说明
- database:变更操作所在数据库
- table:变更操作的表
- type:操作类型(insert/update/delete)
- ts:变更时间(Unix 时间戳)
- xid:变更事务 ID
- commit:是否提交
- data:变更后的行数据
数据样例
// 执行: insert into tx VALUES(gen_random_uuid(), (random() * 1024)::bigint::text, now(), random() * 1024);
{
    "database": "test",
    "table": "tx",
    "type": "insert",
    "ts": 1757417323,
    "xid": 616699495,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 505.356811461980000000000000000000000000000000000000000000000000
    }
}
 
// 执行: UPDATE tx SET volume = 1024 WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "update",
    "ts": 1757417442,
    "xid": -1527714998,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 1024.000000000000000000000000000000000000000000000000000000000000
    },
    "old": {
        // old 默认只输出主键,修改订阅配置可输出完整的旧值
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}
 
// 执行: DELETE FROM tx WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "delete",
    "ts": 1757417562,
    "xid": -448725205,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}导出作业配置
1. 创建导出任务
- 登录产品控制台
- 选择 "数据同步" --> "数据导出" --> "数据导出作业"

2. 选择目标类型
创建任务时,输出目标选择 Kafka:

3. 配置连接信息
Source 端配置
- 左侧 ProtonBase 端选择需要同步的实例
- 点击 "连接测试" 测试连接
- 测试成功会显示 "连接成功"
Sink 端配置(Kafka)
- 右侧填入目标 Kafka 的连接信息
- 网络配置:
- 首先需要解决 Teleport 与目标 Kafka 之间的网络联通问题。
- Bootstrap server:填入 Kafka 服务器地址和端口
- 若通过公网进行 CDC 推送,需要修改 Kafka 的 advertised.listeners属性,避免连接问题
 
- 安全协议:支持 "PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL",根据实际情况选择
- PLAINTEXT:明文传输,适用于内网环境
- SASL_PLAINTEXT:SASL 认证 + 明文传输,适用于需要认证但不要求加密的场景
- SASL_SSL:SASL 认证 + SSL 加密传输,适用于公网或对安全性要求较高的场景
 
- 配置完成后点击 "连接测试" 测试连接
- 测试成功会显示 "连接成功"

4. 选择同步对象
选择需要同步的 schema 和 table:

可选:同步物化视图(MV)
如需同步物化视图,需在配置中勾选 Mview 类别。

5. 输出配置
同步模式(Sync Mode)
提供三种同步模式选项:
- 全量 FULL:读取整张表的当前数据快照,写入 Kafka 后退出任务。适用于一次性数据迁移场景。
- 增量 INCREMENTAL:只读取增量 CDC(Change Data Capture)事件,持续写入 Kafka。适用于实时数据同步场景。
- 全量 FULL + 增量 INCREMENTAL:先读取表的当前数据快照,然后从快照点开始持续推送所有的 CDC JSON。适用于既需要历史数据又需要实时同步的场景。
其他配置项
- Topic:写入 Kafka 的 topic 名称(需预先创建)
- Zone ID:任务时区,默认 UTC

6. 高级参数说明
时区处理
Zone ID 是基于 JDK 的时区处理参数。由于 Maxwell 格式中没有 timestamptz 类型,PostgreSQL 的 timestamptz 到 Maxwell 的转换会按照配置的时区格式化后去掉时区信息。其他字段均遵循 Maxwell 标准,与 Maxwell 工具保持一致。
Zone ID 的有效值参考 Java 的时区标识符,常用的包括:
- UTC:协调世界时(默认值)
- Asia/Shanghai:中国标准时间
- America/New_York:美国东部时间
- Europe/London:英国时间
- Japan:日本时间
选择合适的时区可以确保时间数据在不同系统间正确转换和显示。
其他参数说明
- output_binlog_position:是否输出 binlog 位置信息(PG 位点)
- output_server_id:是否输出 server ID(mock 实现)
- output_thread_id:是否输出线程 ID(mock 实现)
- output_schema_id:schema 变更时递增
- output_primary_keys:是否输出主键值
- output_primary_key_columns:是否输出主键列名
- output_push_timestamp:是否输出发送时间
- kafka_key_format:Kafka key 生成方式
- producer_partition_by:分区策略
- producer_partition_columns:分区列名(如按列分区)
- producer_partition_by_fallback:分区策略失效时的备选方式
- kafka_partition_hash:分区 hash 算法