导出到 Kafka

Teleport 支持将 CDC(Change Data Capture)事件推送到下游 Kafka,便于下游系统集成和实时数据处理。

导出格式

以下数据样例基于如下示例表:

CREATE TABLE tx(
    hash text,
    maker text,
    ts timestamp,
    volume numeric(160,60),
    PRIMARY KEY(hash)
);

KV JSON 格式

仅输出更新后的数据,适用于前端状态展示、状态同步等场景。

// 执行SQL: INSERT INTO tx VALUES('0496f994-3c42-4cf3-86f0-7faf20699bd5', '840', NOW(), 230.33037109759)
// 输出内容:
{
    "hash": "0496f994-3c42-4cf3-86f0-7faf20699bd5",
    "maker": "840",
    "ts": "2025-09-09 16:56:56.449000",
    "volume": 230.33037109759
}

Maxwell JSON 格式

Maxwell 是一种 CDC JSON 协议,完整记录所有 INSERT/UPDATE/DELETE 操作。

字段说明

  • database:变更操作所在数据库
  • table:变更操作的表
  • type:操作类型(insert/update/delete)
  • ts:变更时间(Unix 时间戳)
  • xid:变更事务 ID
  • commit:是否提交
  • data:变更后的行数据

数据样例

// 执行: insert into tx VALUES(gen_random_uuid(), (random() * 1024)::bigint::text, now(), random() * 1024);
{
    "database": "test",
    "table": "tx",
    "type": "insert",
    "ts": 1757417323,
    "xid": 616699495,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 505.356811461980000000000000000000000000000000000000000000000000
    }
}
 
// 执行: UPDATE tx SET volume = 1024 WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "update",
    "ts": 1757417442,
    "xid": -1527714998,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": "503",
        "ts": "2025-09-09 19:28:43.484600",
        "volume": 1024.000000000000000000000000000000000000000000000000000000000000
    },
    "old": {
        // old 默认只输出主键,修改订阅配置可输出完整的旧值
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}
 
// 执行: DELETE FROM tx WHERE hash = '5dc0ac66-6f92-4aa9-8672-4312b3666c15';
{
    "database": "test",
    "table": "tx",
    "type": "delete",
    "ts": 1757417562,
    "xid": -448725205,
    "commit": true,
    "data": {
        "hash": "5dc0ac66-6f92-4aa9-8672-4312b3666c15",
        "maker": null,
        "ts": null,
        "volume": null
    }
}

导出作业配置

1. 创建导出任务

  1. 登录产品控制台
  2. 选择 "数据同步" --> "数据导出" --> "数据导出作业"

2. 选择目标类型

创建任务时,输出目标选择 Kafka:

3. 配置连接信息

Source 端配置

  1. 左侧 ProtonBase 端选择需要同步的实例
  2. 点击 "连接测试" 测试连接
  3. 测试成功会显示 "连接成功"

Sink 端配置(Kafka)

  1. 右侧填入目标 Kafka 的连接信息
  2. 网络配置:
    • 首先需要解决 Teleport 与目标 Kafka 之间的网络联通问题。
    • Bootstrap server:填入 Kafka 服务器地址和端口
    • 若通过公网进行 CDC 推送,需要修改 Kafka 的 advertised.listeners 属性,避免连接问题
  3. 安全协议:支持 "PLAINTEXT", "SASL_PLAINTEXT", "SASL_SSL",根据实际情况选择
    • PLAINTEXT:明文传输,适用于内网环境
    • SASL_PLAINTEXT:SASL 认证 + 明文传输,适用于需要认证但不要求加密的场景
    • SASL_SSL:SASL 认证 + SSL 加密传输,适用于公网或对安全性要求较高的场景
  4. 配置完成后点击 "连接测试" 测试连接
  5. 测试成功会显示 "连接成功"

4. 选择同步对象

选择需要同步的 schema 和 table:

可选:同步物化视图(MV)

如需同步物化视图,需在配置中勾选 Mview 类别。

5. 输出配置

同步模式(Sync Mode)

提供三种同步模式选项:

  1. 全量 FULL:读取整张表的当前数据快照,写入 Kafka 后退出任务。适用于一次性数据迁移场景。
  2. 增量 INCREMENTAL:只读取增量 CDC(Change Data Capture)事件,持续写入 Kafka。适用于实时数据同步场景。
  3. 全量 FULL + 增量 INCREMENTAL:先读取表的当前数据快照,然后从快照点开始持续推送所有的 CDC JSON。适用于既需要历史数据又需要实时同步的场景。

其他配置项

  • Topic:写入 Kafka 的 topic 名称(需预先创建)
  • Zone ID:任务时区,默认 UTC

6. 高级参数说明

时区处理

Zone ID 是基于 JDK 的时区处理参数。由于 Maxwell 格式中没有 timestamptz 类型,PostgreSQL 的 timestamptz 到 Maxwell 的转换会按照配置的时区格式化后去掉时区信息。其他字段均遵循 Maxwell 标准,与 Maxwell 工具保持一致。

Zone ID 的有效值参考 Java 的时区标识符,常用的包括:

  • UTC:协调世界时(默认值)
  • Asia/Shanghai:中国标准时间
  • America/New_York:美国东部时间
  • Europe/London:英国时间
  • Japan:日本时间

选择合适的时区可以确保时间数据在不同系统间正确转换和显示。

其他参数说明

  • output_binlog_position:是否输出 binlog 位置信息(PG 位点)
  • output_server_id:是否输出 server ID(mock 实现)
  • output_thread_id:是否输出线程 ID(mock 实现)
  • output_schema_id:schema 变更时递增
  • output_primary_keys:是否输出主键值
  • output_primary_key_columns:是否输出主键列名
  • output_push_timestamp:是否输出发送时间
  • kafka_key_format:Kafka key 生成方式
  • producer_partition_by:分区策略
  • producer_partition_columns:分区列名(如按列分区)
  • producer_partition_by_fallback:分区策略失效时的备选方式
  • kafka_partition_hash:分区 hash 算法