数据导入和导出

数据导入导出

ProtonBase 支持对一个表的数据进行高效的导入和导出。在向目标表导入大量数据的时候,通常使用 INSERT 或者 COPY FROM 方式。如果要导入的数据量比较少,或者是对存量数据表进行增量导入数据的场景,建议使用 INSERT 操作。如果要导入的数据量比较大,或者是一个初始全量数据的导入场景,建议使用 COPY FROM 命令。该命令会直接读取文件中的数据,在解析之后直接批量写成存储引擎底层的文件格式,然后导入,这样能够避免每条数据 INSERT 操作的 SQL 事务处理流程的开销,大幅提升数据导入体验。此外,我们也可以通过 COPY TO 命令将一个表或者一个查询结果批量导出到一个文件中,供后续的导入或者其他处理。

比如,我们创建了如下的一张 students 表:

CREATE TABLE students(
   id int PRIMARY KEY,
   name varchar(255) NOT NULL,
   phone varchar(16)
);

同时, 有一个准备好的文件(stu.txt)存储了满足上面表 schema 的一些数据,内容如下:

123        Jacob        \N
456        "Alex Xia"   13888811888
789        Henry        \N

其中,每一行的不同字段数据用\t 单字符来分割,空的 field 用两个字符\N 来表示。

数据导入

COPY FROM 导入

当我们用 psql 连上 ProtonBase 的 postgres 默认 DB 以后,就可以执行下面的命令来导入数据和查看结果:

postgres=> \COPY students FROM stu.txt;
COPY 3
 
postgres=> SELECT * FROM students;
 id  |    name    |    phone
-----+------------+-------------
 123 | Jacob      |
 456 | "Alex Xia" | 13888811888
 789 | Henry      |
(3 rows)

详细的批量导入的 COPY FROM 命令语法如下:

COPY table_name [ ( column_name [, ...] ) ]
FROM [ 'filename' | STDIN ]
[ [ WITH ] ( option [, ...] ) ]

其中,WITH `支持的选项有:

  • DELIMITER 'delimiter_character':这个 one-byte 的的 delimiter 用来指定文件中每行数据的列分隔符,默认是制表符 \t。注意,如果使用 text 格式的文件输入,这个 delimiter 如果在正常的字段内容中出现的话,需要进行转义。
  • NULL 'null_string':这个选项用来指定表示一列为 NULL 值的字符串,默认是两个字符的文本 \N.
  • ON CONFLICT DO NOTHING:当指定这个选项的时候,若被 copy 的文件中某行的数据的 PK 在已有数据中已经存在的情况下,这行数据就会被忽略。
  • FORMAT ‘format_name':指定导入时数据传输的格式,支持 TEXTCSVBINARY三种格式,默认为TEXTBINARY是一种紧凑的二进制格式,避免了文本解析的开销,以获得更好的性能。

有关COPY更多的使用语法,请参考COPY

可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导入

import org.postgresql.copy.CopyManager;
public static void copyToFile(Connection connection, String filePath, String tableOrQuery)
        throws SQLException, IOException {
    FileOutputStream fileOutputStream = null;
    try {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        fileOutputStream = new FileOutputStream(filePath);
        String copyOut = "COPY " + tableOrQuery + " TO STDOUT DELIMITER AS ','";
        final long line = copyManager.copyOut(copyOut, fileOutputStream);
        System.out.println(line);
    } finally {
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

INSERT 导入

INSERT 语句用于向表中插入新的记录。插入时可以逐行插入,也可以一次插入多行。语法如下:

[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    [ OVERRIDING { SYSTEM | USER } VALUE ]
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]
 
where conflict_target can be one of:
 
    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_nameand conflict_action is one of:
 
    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]

有关INSERT更多的使用语法,请参考INSERT

测试表定义如下。注: 后续如无特殊说明,均使用上面的表定义进行测试

CREATE TABLE t1(
    id int primary key,
    name text,
    age smallint,
    gender char(1)
);
  1. 显式指定列名,字段列和数值列顺序相同,一一对应。可以只指定部分列名和对应 VALUE, 其余列的 VALUE 会按照 default value 进行更新

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname','22','f');
INSERT 0 1
 
protonbase=> INSERT INTO t1 (id ,name) VALUES (2,'testname');
INSERT 0 1
  1. 不指定列名,需要在 VALUES 里指定所有字段的值

使用示例:

protonbase=> INSERT INTO t1 VALUES (3,'testname3','33','f');
INSERT 0 1
  1. 批量 insert,可以一次 insert 多条

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (4,'testname4','22','f'), (5,'testname4','22','m');
INSERT 0 2
  1. 将查询结果作为 INSERT INTO 的数据

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) SELECT id, name, age, gender FROM t2 WHERE id>10;
INSERT 0 1
  1. 针对写入数据有主键冲突的场景,可以使用 INSERT INTO ON CONFLICT 语法来定义主键冲突后的行为,CONFLICT_ACTTION 主要有两种:
  • NOTHING, 主键冲突后不执行任何操作,丢弃待插入的冲突数据
  • UPDATE, 主键冲突后,更新指定列(注意: 虽然语法上允许,但应避免对主键列进行更新)

一般建议 on conflict 按主键或者配置了唯一性的索引。否则会触发全表扫描,性能较差。

使用示例:

protonbase=> \d t1
                                 数据表 "public.t1"
  栏位  |     类型     | 校对规则 |  可空的  |                 预设
--------+--------------+----------+----------+--------------------------------------
 id     | integer      |          | not null | nextval('t1_id_seq'::text::regclass)
 name   | text         |          |          |
 age    | smallint     |          |          |
 gender | character(1) |          |          |
索引:
    "t1_pkey" PRIMARY KEY, btree (id ASC)
 
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name   | age | gender
----+----------+-----+--------
  1 | testname |  22 | f
(1 行记录)
 
-- 使用INSERT INTO ON CONFLICT DO NONTHING的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO NOTHING;
INSERT 0 0
 
-- 数据并未更新
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name   | age | gender
----+----------+-----+--------
  1 | testname |  22 | f
(1 行记录)
 
-- 使用INSERT INTO ON CONFLICT DO UPDATE的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO UPDATE SET name=EXCLUDED.name,age=EXCLUDED.age,gender=EXCLUDED.gender;
INSERT 0 1
 
-- 数据按指定列更新
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name    | age | gender
----+-----------+-----+--------
  1 | testname1 |  11 | m
(1 行记录)

性能优化

对于需要插入大量数据的场景,使用批量提交语句可以将多条 SQL 语句一次性提交到 ProtonBase, 通过攒批的方式来优化导入性能。示例如下:

INSERT INTO test (col1, col2) VALUES ('A', 10), ('B', 20), ('C', 30)
Connection conn = dataSource.getConnection();
try {
    conn.setAutoCommit(false); // 关闭自动提交,开启事务
 
    PreparedStatement pstat = conn.prepareStatement("INSERT INTO test (col1, col2) VALUES (?, ?)");
 
    // Batch方式添加记录
    pstat.setString(1, "A");
    pstat.setInt(2, 10);
    pstat.addBatch();
 
    pstat.setString(1, "B");
    pstat.setInt(2, 20);
    pstat.addBatch();
 
    pstat.setString(1, "C");
    pstat.setInt(2, 30);
    pstat.addBatch();
 
    // 批量提交
    pstat.executeBatch();
    conn.commit(); // 提交事务
} catch (Exception e) {
    conn.rollback(); // 回滚事务
    e.printStackTrace();
} finally {
    conn.setAutoCommit(true); // 恢复自动提交
    conn.close(); // 关闭连接
}

对往一个大数据量表上进行增量更新的场景,比如在 30 亿底表上希望更新 300 万的增量。由于 COPY 不支持 ON CONFLICT DO UPDATE,可以通过 COPY FROM 到临时表 + INSERT INTO SELECT FROM (临时表) ON CONFLICT 的方案来获取整体最佳导入性能。示例步骤如下:

-- 测试表的格式
test=> \d test
 
            数据表 "public.test"
 栏位 |  类型   | 校对规则 |  可空的  | 预设
------+---------+----------+----------+------
 col1 | integer |          | not null |
 col2 | text    |          |          |
 col3 | text    |          |          |
索引:
    "test_pkey" PRIMARY KEY, btree (col1 ASC)
 
-- 创建临时表
test=> CREATE TABLE test_tmp (like test including all);
 
-- 执行copy导入到临时表
test=> \copy "test_tmp" FROM csv_file;
 
-- 通过 insert into select from 批量导入
test=> INSERT INTO test(col1, col2, col3) SELECT col1, col2, col3 FROM test_tmp on conflict (col1) do update set col1=excluded.col1,col2=excluded.col2,col3=excluded.col3;

数据导出

COPY TO 导出

详细的 COPY TO 命令的语法如下:

COPY { table_name [ ( column_name [, ...] ) ] | ( query ) }
TO { 'filename' | STDOUT }
[ [ WITH ] ( option [, ...] ) ]

其中,WITH 支持的选项有:

  • DELIMITER 'delimiter_character'
  • NULL 'null_string'
  • FORMAT 'format_name'

它们的语义和上面 COPY FROM 中作用一样。 作为例子:

  • 若想将 students 表全部导出,可以执行如下的命令:
postgres=> \COPY students to stu.out;
COPY 3
  • 如果想把学号前 200 的学生导出,可以执行如下的命令:
postgres=> \COPY (SELECT * FROM students WHERE id<=200) TO stu.out;
COPY 1

可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导出

import org.postgresql.copy.CopyManager;
public static long copyFromFile(Connection connection, String filePath, String tableName)
        throws SQLException, IOException {
    FileInputStream fileInputStream = null;
    try {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        fileInputStream = new FileInputStream(filePath);
        String copyIn = "COPY " + tableName + " FROM STDIN DELIMITER AS ','";
        return copyManager.copyIn(copyIn, fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

流式数据导出

ProtonBase 支持通过流复制协议进行数据导出,参考Change Data Capture