数据导入和导出

数据导入导出

为什么需要数据导入导出?

在数据管理和分析场景中,高效的数据导入导出能力是至关重要的。无论是将历史数据迁移到新系统、定期备份重要数据,还是在不同系统间共享数据,都需要可靠的导入导出机制。ProtonBase 提供了多种数据导入导出方式,能够满足不同场景的需求:对于大量数据的批量导入,COPY FROM 命令提供了最优性能;对于小量数据或增量更新,INSERT 语句更为灵活;而 COPY TO 命令则能高效地将数据导出供其他系统使用。掌握这些技术能帮助用户构建更高效的数据处理流程。

ProtonBase 支持对一个表的数据进行高效的导入和导出。在向目标表导入大量数据的时候,通常使用 INSERT 或者 COPY FROM 方式。如果要导入的数据量比较少,或者是对存量数据表进行增量导入数据的场景,建议使用 INSERT 操作。如果要导入的数据量比较大,或者是一个初始全量数据的导入场景,建议使用 COPY FROM 命令。该命令会直接读取文件中的数据,在解析之后直接批量写成存储引擎底层的文件格式,然后导入,这样能够避免每条数据 INSERT 操作的 SQL 事务处理流程的开销,大幅提升数据导入体验。此外,我们也可以通过 COPY TO 命令将一个表或者一个查询结果批量导出到一个文件中,供后续的导入或者其他处理。

比如,我们创建了如下的一张 students 表:

CREATE TABLE students(
   id int PRIMARY KEY,
   name varchar(255) NOT NULL,
   phone varchar(16)
);

同时, 有一个准备好的文件(stu.txt)存储了满足上面表 schema 的一些数据,内容如下:

123        Jacob        \N
456        "Alex Xia"   13888811888
789        Henry        \N

其中,每一行的不同字段数据用\t 单字符来分割,空的 field 用两个字符\N 来表示。

数据导入

COPY FROM 导入

当我们用 psql 连上 ProtonBase 的 postgres 默认 DB 以后,就可以执行下面的命令来导入数据和查看结果:

postgres=> \COPY students FROM stu.txt;
COPY 3
 
postgres=> SELECT * FROM students;
 id  |    name    |    phone
-----+------------+-------------
 123 | Jacob      |
 456 | "Alex Xia" | 13888811888
 789 | Henry      |
(3 rows)

详细的批量导入的 COPY FROM 命令语法如下:

COPY table_name [ ( column_name [, ...] ) ]
FROM [ 'filename' | STDIN ]
[ [ WITH ] ( option [, ...] ) ]

其中,WITH `支持的选项有:

  • DELIMITER 'delimiter_character':这个 one-byte 的的 delimiter 用来指定文件中每行数据的列分隔符,默认是制表符 \t。注意,如果使用 text 格式的文件输入,这个 delimiter 如果在正常的字段内容中出现的话,需要进行转义。
  • NULL 'null_string':这个选项用来指定表示一列为 NULL 值的字符串,默认是两个字符的文本 \N.
  • ON CONFLICT DO NOTHING:当指定这个选项的时候,若被 copy 的文件中某行的数据的 PK 在已有数据中已经存在的情况下,这行数据就会被忽略。
  • FORMAT ‘format_name':指定导入时数据传输的格式,支持 TEXTCSVBINARY三种格式,默认为TEXTBINARY是一种紧凑的二进制格式,避免了文本解析的开销,以获得更好的性能。

有关COPY更多的使用语法,请参考COPY

可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导入

import org.postgresql.copy.CopyManager;
public static void copyToFile(Connection connection, String filePath, String tableOrQuery)
        throws SQLException, IOException {
    FileOutputStream fileOutputStream = null;
    try {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        fileOutputStream = new FileOutputStream(filePath);
        String copyOut = "COPY " + tableOrQuery + " TO STDOUT DELIMITER AS ','";
        final long line = copyManager.copyOut(copyOut, fileOutputStream);
        System.out.println(line);
    } finally {
        if (fileOutputStream != null) {
            try {
                fileOutputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

INSERT 导入

INSERT 语句用于向表中插入新的记录。插入时可以逐行插入,也可以一次插入多行。语法如下:

[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
    [ OVERRIDING { SYSTEM | USER } VALUE ]
    { DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
    [ ON CONFLICT [ conflict_target ] conflict_action ]
    [ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]
 
where conflict_target can be one of:
 
    ( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
    ON CONSTRAINT constraint_nameand conflict_action is one of:
 
    DO NOTHING
    DO UPDATE SET { column_name = { expression | DEFAULT } |
                    ( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
                    ( column_name [, ...] ) = ( sub-SELECT )
                  } [, ...]
              [ WHERE condition ]

有关INSERT更多的使用语法,请参考INSERT

测试表定义如下。注: 后续如无特殊说明,均使用上面的表定义进行测试

CREATE TABLE t1(
    id int primary key,
    name text,
    age smallint,
    gender char(1)
);
  1. 显式指定列名,字段列和数值列顺序相同,一一对应。可以只指定部分列名和对应 VALUE, 其余列的 VALUE 会按照 default value 进行更新

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname','22','f');
INSERT 0 1
 
protonbase=> INSERT INTO t1 (id ,name) VALUES (2,'testname');
INSERT 0 1
  1. 不指定列名,需要在 VALUES 里指定所有字段的值

使用示例:

protonbase=> INSERT INTO t1 VALUES (3,'testname3','33','f');
INSERT 0 1
  1. 批量 insert,可以一次 insert 多条

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (4,'testname4','22','f'), (5,'testname4','22','m');
INSERT 0 2
  1. 将查询结果作为 INSERT INTO 的数据

使用示例:

protonbase=> INSERT INTO t1 (id, name, age, gender) SELECT id, name, age, gender FROM t2 WHERE id>10;
INSERT 0 1
  1. 针对写入数据有主键冲突的场景,可以使用 INSERT INTO ON CONFLICT 语法来定义主键冲突后的行为,CONFLICT_ACTTION 主要有两种:
  • NOTHING, 主键冲突后不执行任何操作,丢弃待插入的冲突数据
  • UPDATE, 主键冲突后,更新指定列(注意: 虽然语法上允许,但应避免对主键列进行更新)

一般建议 on conflict 按主键或者配置了唯一性的索引。否则会触发全表扫描,性能较差。

使用示例:

protonbase=> \d t1
                                 数据表 "public.t1"
  栏位  |     类型     | 校对规则 |  可空的  |                 预设
--------+--------------+----------+----------+--------------------------------------
 id     | integer      |          | not null | nextval('t1_id_seq'::text::regclass)
 name   | text         |          |          |
 age    | smallint     |          |          |
 gender | character(1) |          |          |
索引:
    "t1_pkey" PRIMARY KEY, btree (id ASC)
 
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name   | age | gender
----+----------+-----+--------
  1 | testname |  22 | f
(1 行记录)
 
-- 使用INSERT INTO ON CONFLICT DO NONTHING的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO NOTHING;
INSERT 0 0
 
-- 数据并未更新
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name   | age | gender
----+----------+-----+--------
  1 | testname |  22 | f
(1 行记录)
 
-- 使用INSERT INTO ON CONFLICT DO UPDATE的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO UPDATE SET name=EXCLUDED.name,age=EXCLUDED.age,gender=EXCLUDED.gender;
INSERT 0 1
 
-- 数据按指定列更新
protonbase=> SELECT * FROM t1 WHERE id=1;
 id |   name    | age | gender
----+-----------+-----+--------
  1 | testname1 |  11 | m
(1 行记录)

性能优化

对于需要插入大量数据的场景,使用批量提交语句可以将多条 SQL 语句一次性提交到 ProtonBase, 通过攒批的方式来优化导入性能。示例如下:

INSERT INTO test (col1, col2) VALUES ('A', 10), ('B', 20), ('C', 30)
Connection conn = dataSource.getConnection();
try {
    conn.setAutoCommit(false); // 关闭自动提交,开启事务
 
    PreparedStatement pstat = conn.prepareStatement("INSERT INTO test (col1, col2) VALUES (?, ?)");
 
    // Batch方式添加记录
    pstat.setString(1, "A");
    pstat.setInt(2, 10);
    pstat.addBatch();
 
    pstat.setString(1, "B");
    pstat.setInt(2, 20);
    pstat.addBatch();
 
    pstat.setString(1, "C");
    pstat.setInt(2, 30);
    pstat.addBatch();
 
    // 批量提交
    pstat.executeBatch();
    conn.commit(); // 提交事务
} catch (Exception e) {
    conn.rollback(); // 回滚事务
    e.printStackTrace();
} finally {
    conn.setAutoCommit(true); // 恢复自动提交
    conn.close(); // 关闭连接
}

对往一个大数据量表上进行增量更新的场景,比如在 30 亿底表上希望更新 300 万的增量。由于 COPY 不支持 ON CONFLICT DO UPDATE,可以通过 COPY FROM 到临时表 + INSERT INTO SELECT FROM (临时表) ON CONFLICT 的方案来获取整体最佳导入性能。示例步骤如下:

-- 测试表的格式
test=> \d test
 
            数据表 "public.test"
 栏位 |  类型   | 校对规则 |  可空的  | 预设
------+---------+----------+----------+------
 col1 | integer |          | not null |
 col2 | text    |          |          |
 col3 | text    |          |          |
索引:
    "test_pkey" PRIMARY KEY, btree (col1 ASC)
 
-- 创建临时表
test=> CREATE TABLE test_tmp (like test including all);
 
-- 执行copy导入到临时表
test=> \copy "test_tmp" FROM csv_file;
 
-- 通过 insert into select from 批量导入
test=> INSERT INTO test(col1, col2, col3) SELECT col1, col2, col3 FROM test_tmp on conflict (col1) do update set col1=excluded.col1,col2=excluded.col2,col3=excluded.col3;

数据导出

COPY TO 导出

详细的 COPY TO 命令的语法如下:

COPY { table_name [ ( column_name [, ...] ) ] | ( query ) }
TO { 'filename' | STDOUT }
[ [ WITH ] ( option [, ...] ) ]

其中,WITH 支持的选项有:

  • DELIMITER 'delimiter_character'
  • NULL 'null_string'
  • FORMAT 'format_name'

它们的语义和上面 COPY FROM 中作用一样。 作为例子:

  • 若想将 students 表全部导出,可以执行如下的命令:
postgres=> \COPY students to stu.out;
COPY 3
  • 如果想把学号前 200 的学生导出,可以执行如下的命令:
postgres=> \COPY (SELECT * FROM students WHERE id<=200) TO stu.out;
COPY 1

可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导出

import org.postgresql.copy.CopyManager;
public static long copyFromFile(Connection connection, String filePath, String tableName)
        throws SQLException, IOException {
    FileInputStream fileInputStream = null;
    try {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        fileInputStream = new FileInputStream(filePath);
        String copyIn = "COPY " + tableName + " FROM STDIN DELIMITER AS ','";
        return copyManager.copyIn(copyIn, fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

流式数据导出

ProtonBase 支持通过流复制协议进行数据导出,参考Change Data Capture

实际操作指南:数据导入导出最佳实践

以下是一个完整的实际操作指南,展示如何在实际业务场景中高效地进行数据导入导出。

1. 大规模数据导入

准备数据文件

首先,创建一个包含大量数据的CSV文件用于测试:

# 创建测试数据文件
cat > users.csv << EOF
1,Alice Johnson,alice@example.com,28
2,Bob Smith,bob@example.com,35
3,Charlie Brown,charlie@example.com,22
4,Diana Prince,diana@example.com,31
5,Edward Norton,edward@example.com,45
EOF

创建目标表

-- 创建用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    email VARCHAR(100) UNIQUE,
    age INTEGER
);

使用COPY FROM导入数据

-- 使用COPY FROM命令导入数据
\copy users FROM 'users.csv' WITH (FORMAT csv, HEADER false);
 
-- 验证导入结果
SELECT COUNT(*) FROM users;
SELECT * FROM users LIMIT 5;

处理导入错误

-- 创建错误日志表
CREATE TABLE import_errors (
    error_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    error_message TEXT
);
 
-- 使用错误处理导入
\copy users FROM 'users_with_errors.csv' WITH (FORMAT csv, HEADER false, ON_ERROR STOP);

2. 批量INSERT操作

使用批量INSERT导入数据

-- 批量插入数据
INSERT INTO users (name, email, age) VALUES 
('Frank Miller', 'frank@example.com', 29),
('Grace Lee', 'grace@example.com', 33),
('Henry Wilson', 'henry@example.com', 41),
('Ivy Chen', 'ivy@example.com', 26),
('Jack Davis', 'jack@example.com', 38);

使用事务批量导入

-- 开始事务
BEGIN;
 
-- 批量插入数据
INSERT INTO users (name, email, age) VALUES 
('Kate White', 'kate@example.com', 32),
('Leo Garcia', 'leo@example.com', 27),
('Mia Taylor', 'mia@example.com', 34);
 
-- 提交事务
COMMIT;

3. 处理主键冲突

使用ON CONFLICT处理重复数据

-- 插入可能冲突的数据
INSERT INTO users (id, name, email, age) VALUES 
(1, 'Alice Updated', 'alice.new@example.com', 29),
(6, 'New User', 'newuser@example.com', 30)
ON CONFLICT (id) DO UPDATE SET 
    name = EXCLUDED.name,
    email = EXCLUDED.email,
    age = EXCLUDED.age;

4. 数据导出操作

导出完整表数据

-- 导出所有用户数据到CSV文件
\copy users TO 'users_export.csv' WITH (FORMAT csv, HEADER true);
 
-- 导出特定条件的数据
\copy (SELECT * FROM users WHERE age > 30) TO 'users_over_30.csv' WITH (FORMAT csv, HEADER true);

导出到标准输出

-- 导出到标准输出
\copy users TO STDOUT WITH (FORMAT csv, HEADER true);

5. 使用JDBC进行数据导入导出

Java代码示例:数据导入

import java.sql.*;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
 
public class DataImportExport {
    
    // 使用COPY FROM导入数据
    public static void importDataWithCopy(Connection connection, String filePath, String tableName) 
            throws SQLException {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        String copySql = "COPY " + tableName + " FROM STDIN WITH (FORMAT csv, HEADER true)";
        
        try (FileInputStream fis = new FileInputStream(filePath)) {
            long rowsAffected = copyManager.copyIn(copySql, fis);
            System.out.println("Imported " + rowsAffected + " rows");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    // 使用COPY TO导出数据
    public static void exportDataWithCopy(Connection connection, String filePath, String query) 
            throws SQLException {
        CopyManager copyManager = new CopyManager((BaseConnection) connection);
        String copySql = "COPY (" + query + ") TO STDOUT WITH (FORMAT csv, HEADER true)";
        
        try (FileOutputStream fos = new FileOutputStream(filePath)) {
            long rowsAffected = copyManager.copyOut(copySql, fos);
            System.out.println("Exported " + rowsAffected + " rows");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    // 使用批量INSERT导入数据
    public static void batchInsertData(Connection connection, List<User> users) 
            throws SQLException {
        String sql = "INSERT INTO users (name, email, age) VALUES (?, ?, ?)";
        
        try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            
            for (User user : users) {
                pstmt.setString(1, user.getName());
                pstmt.setString(2, user.getEmail());
                pstmt.setInt(3, user.getAge());
                pstmt.addBatch();
            }
            
            int[] results = pstmt.executeBatch();
            connection.commit();
            
            System.out.println("Inserted " + results.length + " rows");
        } catch (SQLException e) {
            connection.rollback();
            throw e;
        } finally {
            connection.setAutoCommit(true);
        }
    }
}

6. 性能优化技巧

优化导入性能

-- 临时关闭自动提交以提高性能
\set AUTOCOMMIT off
 
-- 导入大量数据
\copy large_table FROM 'large_data.csv' WITH (FORMAT csv, HEADER true);
 
-- 提交事务
COMMIT;
 
-- 重新启用自动提交
\set AUTOCOMMIT on

使用临时表优化大表更新

-- 创建临时表
CREATE TEMP TABLE temp_updates (
    id INTEGER,
    name VARCHAR(100),
    email VARCHAR(100),
    age INTEGER
);
 
-- 导入更新数据到临时表
\copy temp_updates FROM 'updates.csv' WITH (FORMAT csv, HEADER true);
 
-- 执行批量更新
INSERT INTO users (id, name, email, age)
SELECT id, name, email, age FROM temp_updates
ON CONFLICT (id) DO UPDATE SET
    name = EXCLUDED.name,
    email = EXCLUDED.email,
    age = EXCLUDED.age;
 
-- 清理临时表
DROP TABLE temp_updates;

通过以上实际操作指南,您可以高效地进行数据导入导出操作,并根据具体业务需求选择最适合的方法。