数据导入导出
为什么需要数据导入导出?
在数据管理和分析场景中,高效的数据导入导出能力是至关重要的。无论是将历史数据迁移到新系统、定期备份重要数据,还是在不同系统间共享数据,都需要可靠的导入导出机制。ProtonBase 提供了多种数据导入导出方式,能够满足不同场景的需求:对于大量数据的批量导入,COPY FROM 命令提供了最优性能;对于小量数据或增量更新,INSERT 语句更为灵活;而 COPY TO 命令则能高效地将数据导出供其他系统使用。掌握这些技术能帮助用户构建更高效的数据处理流程。
ProtonBase 支持对一个表的数据进行高效的导入和导出。在向目标表导入大量数据的时候,通常使用 INSERT
或者 COPY FROM
方式。如果要导入的数据量比较少,或者是对存量数据表进行增量导入数据的场景,建议使用 INSERT 操作。如果要导入的数据量比较大,或者是一个初始全量数据的导入场景,建议使用 COPY FROM
命令。该命令会直接读取文件中的数据,在解析之后直接批量写成存储引擎底层的文件格式,然后导入,这样能够避免每条数据 INSERT
操作的 SQL 事务处理流程的开销,大幅提升数据导入体验。此外,我们也可以通过 COPY TO
命令将一个表或者一个查询结果批量导出到一个文件中,供后续的导入或者其他处理。
比如,我们创建了如下的一张 students 表:
CREATE TABLE students(
id int PRIMARY KEY,
name varchar(255) NOT NULL,
phone varchar(16)
);
同时, 有一个准备好的文件(stu.txt)存储了满足上面表 schema 的一些数据,内容如下:
123 Jacob \N
456 "Alex Xia" 13888811888
789 Henry \N
其中,每一行的不同字段数据用\t 单字符来分割,空的 field 用两个字符\N 来表示。
数据导入
COPY FROM 导入
当我们用 psql 连上 ProtonBase 的 postgres 默认 DB 以后,就可以执行下面的命令来导入数据和查看结果:
postgres=> \COPY students FROM stu.txt;
COPY 3
postgres=> SELECT * FROM students;
id | name | phone
-----+------------+-------------
123 | Jacob |
456 | "Alex Xia" | 13888811888
789 | Henry |
(3 rows)
详细的批量导入的 COPY FROM
命令语法如下:
COPY table_name [ ( column_name [, ...] ) ]
FROM [ 'filename' | STDIN ]
[ [ WITH ] ( option [, ...] ) ]
其中,WITH
`支持的选项有:
DELIMITER 'delimiter_character'
:这个 one-byte 的的 delimiter 用来指定文件中每行数据的列分隔符,默认是制表符\t
。注意,如果使用 text 格式的文件输入,这个 delimiter 如果在正常的字段内容中出现的话,需要进行转义。NULL 'null_string'
:这个选项用来指定表示一列为 NULL 值的字符串,默认是两个字符的文本 \N.ON CONFLICT DO NOTHING
:当指定这个选项的时候,若被 copy 的文件中某行的数据的 PK 在已有数据中已经存在的情况下,这行数据就会被忽略。FORMAT ‘format_name'
:指定导入时数据传输的格式,支持 TEXT,CSV和BINARY三种格式,默认为TEXT,BINARY是一种紧凑的二进制格式,避免了文本解析的开销,以获得更好的性能。
有关COPY
更多的使用语法,请参考COPY
。
可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导入
import org.postgresql.copy.CopyManager;
public static void copyToFile(Connection connection, String filePath, String tableOrQuery)
throws SQLException, IOException {
FileOutputStream fileOutputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
fileOutputStream = new FileOutputStream(filePath);
String copyOut = "COPY " + tableOrQuery + " TO STDOUT DELIMITER AS ','";
final long line = copyManager.copyOut(copyOut, fileOutputStream);
System.out.println(line);
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
INSERT 导入
INSERT
语句用于向表中插入新的记录。插入时可以逐行插入,也可以一次插入多行。语法如下:
[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
[ OVERRIDING { SYSTEM | USER } VALUE ]
{ DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
[ ON CONFLICT [ conflict_target ] conflict_action ]
[ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]
where conflict_target can be one of:
( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
ON CONSTRAINT constraint_nameand conflict_action is one of:
DO NOTHING
DO UPDATE SET { column_name = { expression | DEFAULT } |
( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
( column_name [, ...] ) = ( sub-SELECT )
} [, ...]
[ WHERE condition ]
有关INSERT
更多的使用语法,请参考INSERT
。
测试表定义如下。注: 后续如无特殊说明,均使用上面的表定义进行测试
CREATE TABLE t1(
id int primary key,
name text,
age smallint,
gender char(1)
);
- 显式指定列名,字段列和数值列顺序相同,一一对应。可以只指定部分列名和对应 VALUE, 其余列的 VALUE 会按照 default value 进行更新
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname','22','f');
INSERT 0 1
protonbase=> INSERT INTO t1 (id ,name) VALUES (2,'testname');
INSERT 0 1
- 不指定列名,需要在 VALUES 里指定所有字段的值
使用示例:
protonbase=> INSERT INTO t1 VALUES (3,'testname3','33','f');
INSERT 0 1
- 批量 insert,可以一次 insert 多条
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (4,'testname4','22','f'), (5,'testname4','22','m');
INSERT 0 2
- 将查询结果作为 INSERT INTO 的数据
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) SELECT id, name, age, gender FROM t2 WHERE id>10;
INSERT 0 1
- 针对写入数据有主键冲突的场景,可以使用 INSERT INTO ON CONFLICT 语法来定义主键冲突后的行为,CONFLICT_ACTTION 主要有两种:
- NOTHING, 主键冲突后不执行任何操作,丢弃待插入的冲突数据
- UPDATE, 主键冲突后,更新指定列(注意: 虽然语法上允许,但应避免对主键列进行更新)
一般建议 on conflict 按主键或者配置了唯一性的索引。否则会触发全表扫描,性能较差。
使用示例:
protonbase=> \d t1
数据表 "public.t1"
栏位 | 类型 | 校对规则 | 可空的 | 预设
--------+--------------+----------+----------+--------------------------------------
id | integer | | not null | nextval('t1_id_seq'::text::regclass)
name | text | | |
age | smallint | | |
gender | character(1) | | |
索引:
"t1_pkey" PRIMARY KEY, btree (id ASC)
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+----------+-----+--------
1 | testname | 22 | f
(1 行记录)
-- 使用INSERT INTO ON CONFLICT DO NONTHING的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO NOTHING;
INSERT 0 0
-- 数据并未更新
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+----------+-----+--------
1 | testname | 22 | f
(1 行记录)
-- 使用INSERT INTO ON CONFLICT DO UPDATE的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO UPDATE SET name=EXCLUDED.name,age=EXCLUDED.age,gender=EXCLUDED.gender;
INSERT 0 1
-- 数据按指定列更新
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+-----------+-----+--------
1 | testname1 | 11 | m
(1 行记录)
性能优化
对于需要插入大量数据的场景,使用批量提交语句可以将多条 SQL 语句一次性提交到 ProtonBase, 通过攒批的方式来优化导入性能。示例如下:
INSERT INTO test (col1, col2) VALUES ('A', 10), ('B', 20), ('C', 30)
Connection conn = dataSource.getConnection();
try {
conn.setAutoCommit(false); // 关闭自动提交,开启事务
PreparedStatement pstat = conn.prepareStatement("INSERT INTO test (col1, col2) VALUES (?, ?)");
// Batch方式添加记录
pstat.setString(1, "A");
pstat.setInt(2, 10);
pstat.addBatch();
pstat.setString(1, "B");
pstat.setInt(2, 20);
pstat.addBatch();
pstat.setString(1, "C");
pstat.setInt(2, 30);
pstat.addBatch();
// 批量提交
pstat.executeBatch();
conn.commit(); // 提交事务
} catch (Exception e) {
conn.rollback(); // 回滚事务
e.printStackTrace();
} finally {
conn.setAutoCommit(true); // 恢复自动提交
conn.close(); // 关闭连接
}
对往一个大数据量表上进行增量更新的场景,比如在 30 亿底表上希望更新 300 万的增量。由于 COPY 不支持 ON CONFLICT DO UPDATE,可以通过 COPY FROM 到临时表 + INSERT INTO SELECT FROM (临时表) ON CONFLICT 的方案来获取整体最佳导入性能。示例步骤如下:
-- 测试表的格式
test=> \d test
数据表 "public.test"
栏位 | 类型 | 校对规则 | 可空的 | 预设
------+---------+----------+----------+------
col1 | integer | | not null |
col2 | text | | |
col3 | text | | |
索引:
"test_pkey" PRIMARY KEY, btree (col1 ASC)
-- 创建临时表
test=> CREATE TABLE test_tmp (like test including all);
-- 执行copy导入到临时表
test=> \copy "test_tmp" FROM csv_file;
-- 通过 insert into select from 批量导入
test=> INSERT INTO test(col1, col2, col3) SELECT col1, col2, col3 FROM test_tmp on conflict (col1) do update set col1=excluded.col1,col2=excluded.col2,col3=excluded.col3;
数据导出
COPY TO 导出
详细的 COPY TO
命令的语法如下:
COPY { table_name [ ( column_name [, ...] ) ] | ( query ) }
TO { 'filename' | STDOUT }
[ [ WITH ] ( option [, ...] ) ]
其中,WITH 支持的选项有:
- DELIMITER 'delimiter_character'
- NULL 'null_string'
- FORMAT 'format_name'
它们的语义和上面 COPY FROM 中作用一样。 作为例子:
- 若想将 students 表全部导出,可以执行如下的命令:
postgres=> \COPY students to stu.out;
COPY 3
- 如果想把学号前 200 的学生导出,可以执行如下的命令:
postgres=> \COPY (SELECT * FROM students WHERE id<=200) TO stu.out;
COPY 1
可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导出
import org.postgresql.copy.CopyManager;
public static long copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
FileInputStream fileInputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
fileInputStream = new FileInputStream(filePath);
String copyIn = "COPY " + tableName + " FROM STDIN DELIMITER AS ','";
return copyManager.copyIn(copyIn, fileInputStream);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
流式数据导出
ProtonBase 支持通过流复制协议进行数据导出,参考Change Data Capture。
实际操作指南:数据导入导出最佳实践
以下是一个完整的实际操作指南,展示如何在实际业务场景中高效地进行数据导入导出。
1. 大规模数据导入
准备数据文件
首先,创建一个包含大量数据的CSV文件用于测试:
# 创建测试数据文件
cat > users.csv << EOF
1,Alice Johnson,alice@example.com,28
2,Bob Smith,bob@example.com,35
3,Charlie Brown,charlie@example.com,22
4,Diana Prince,diana@example.com,31
5,Edward Norton,edward@example.com,45
EOF
创建目标表
-- 创建用户表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
email VARCHAR(100) UNIQUE,
age INTEGER
);
使用COPY FROM导入数据
-- 使用COPY FROM命令导入数据
\copy users FROM 'users.csv' WITH (FORMAT csv, HEADER false);
-- 验证导入结果
SELECT COUNT(*) FROM users;
SELECT * FROM users LIMIT 5;
处理导入错误
-- 创建错误日志表
CREATE TABLE import_errors (
error_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
error_message TEXT
);
-- 使用错误处理导入
\copy users FROM 'users_with_errors.csv' WITH (FORMAT csv, HEADER false, ON_ERROR STOP);
2. 批量INSERT操作
使用批量INSERT导入数据
-- 批量插入数据
INSERT INTO users (name, email, age) VALUES
('Frank Miller', 'frank@example.com', 29),
('Grace Lee', 'grace@example.com', 33),
('Henry Wilson', 'henry@example.com', 41),
('Ivy Chen', 'ivy@example.com', 26),
('Jack Davis', 'jack@example.com', 38);
使用事务批量导入
-- 开始事务
BEGIN;
-- 批量插入数据
INSERT INTO users (name, email, age) VALUES
('Kate White', 'kate@example.com', 32),
('Leo Garcia', 'leo@example.com', 27),
('Mia Taylor', 'mia@example.com', 34);
-- 提交事务
COMMIT;
3. 处理主键冲突
使用ON CONFLICT处理重复数据
-- 插入可能冲突的数据
INSERT INTO users (id, name, email, age) VALUES
(1, 'Alice Updated', 'alice.new@example.com', 29),
(6, 'New User', 'newuser@example.com', 30)
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
age = EXCLUDED.age;
4. 数据导出操作
导出完整表数据
-- 导出所有用户数据到CSV文件
\copy users TO 'users_export.csv' WITH (FORMAT csv, HEADER true);
-- 导出特定条件的数据
\copy (SELECT * FROM users WHERE age > 30) TO 'users_over_30.csv' WITH (FORMAT csv, HEADER true);
导出到标准输出
-- 导出到标准输出
\copy users TO STDOUT WITH (FORMAT csv, HEADER true);
5. 使用JDBC进行数据导入导出
Java代码示例:数据导入
import java.sql.*;
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;
public class DataImportExport {
// 使用COPY FROM导入数据
public static void importDataWithCopy(Connection connection, String filePath, String tableName)
throws SQLException {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
String copySql = "COPY " + tableName + " FROM STDIN WITH (FORMAT csv, HEADER true)";
try (FileInputStream fis = new FileInputStream(filePath)) {
long rowsAffected = copyManager.copyIn(copySql, fis);
System.out.println("Imported " + rowsAffected + " rows");
} catch (Exception e) {
e.printStackTrace();
}
}
// 使用COPY TO导出数据
public static void exportDataWithCopy(Connection connection, String filePath, String query)
throws SQLException {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
String copySql = "COPY (" + query + ") TO STDOUT WITH (FORMAT csv, HEADER true)";
try (FileOutputStream fos = new FileOutputStream(filePath)) {
long rowsAffected = copyManager.copyOut(copySql, fos);
System.out.println("Exported " + rowsAffected + " rows");
} catch (Exception e) {
e.printStackTrace();
}
}
// 使用批量INSERT导入数据
public static void batchInsertData(Connection connection, List<User> users)
throws SQLException {
String sql = "INSERT INTO users (name, email, age) VALUES (?, ?, ?)";
try (PreparedStatement pstmt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
for (User user : users) {
pstmt.setString(1, user.getName());
pstmt.setString(2, user.getEmail());
pstmt.setInt(3, user.getAge());
pstmt.addBatch();
}
int[] results = pstmt.executeBatch();
connection.commit();
System.out.println("Inserted " + results.length + " rows");
} catch (SQLException e) {
connection.rollback();
throw e;
} finally {
connection.setAutoCommit(true);
}
}
}
6. 性能优化技巧
优化导入性能
-- 临时关闭自动提交以提高性能
\set AUTOCOMMIT off
-- 导入大量数据
\copy large_table FROM 'large_data.csv' WITH (FORMAT csv, HEADER true);
-- 提交事务
COMMIT;
-- 重新启用自动提交
\set AUTOCOMMIT on
使用临时表优化大表更新
-- 创建临时表
CREATE TEMP TABLE temp_updates (
id INTEGER,
name VARCHAR(100),
email VARCHAR(100),
age INTEGER
);
-- 导入更新数据到临时表
\copy temp_updates FROM 'updates.csv' WITH (FORMAT csv, HEADER true);
-- 执行批量更新
INSERT INTO users (id, name, email, age)
SELECT id, name, email, age FROM temp_updates
ON CONFLICT (id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
age = EXCLUDED.age;
-- 清理临时表
DROP TABLE temp_updates;
通过以上实际操作指南,您可以高效地进行数据导入导出操作,并根据具体业务需求选择最适合的方法。