数据导入导出
ProtonBase 支持对一个表的数据进行高效的导入和导出。在向目标表导入大量数据的时候,通常使用 INSERT
或者 COPY FROM
方式。如果要导入的数据量比较少,或者是对存量数据表进行增量导入数据的场景,建议使用 INSERT 操作。如果要导入的数据量比较大,或者是一个初始全量数据的导入场景,建议使用 COPY FROM
命令。该命令会直接读取文件中的数据,在解析之后直接批量写成存储引擎底层的文件格式,然后导入,这样能够避免每条数据 INSERT
操作的 SQL 事务处理流程的开销,大幅提升数据导入体验。此外,我们也可以通过 COPY TO
命令将一个表或者一个查询结果批量导出到一个文件中,供后续的导入或者其他处理。
比如,我们创建了如下的一张 students 表:
CREATE TABLE students(
id int PRIMARY KEY,
name varchar(255) NOT NULL,
phone varchar(16)
);
同时, 有一个准备好的文件(stu.txt)存储了满足上面表 schema 的一些数据,内容如下:
123 Jacob \N
456 "Alex Xia" 13888811888
789 Henry \N
其中,每一行的不同字段数据用\t 单字符来分割,空的 field 用两个字符\N 来表示。
数据导入
COPY FROM 导入
当我们用 psql 连上 ProtonBase 的 postgres 默认 DB 以后,就可以执行下面的命令来导入数据和查看结果:
postgres=> \COPY students FROM stu.txt;
COPY 3
postgres=> SELECT * FROM students;
id | name | phone
-----+------------+-------------
123 | Jacob |
456 | "Alex Xia" | 13888811888
789 | Henry |
(3 rows)
详细的批量导入的 COPY FROM
命令语法如下:
COPY table_name [ ( column_name [, ...] ) ]
FROM [ 'filename' | STDIN ]
[ [ WITH ] ( option [, ...] ) ]
其中,WITH
`支持的选项有:
DELIMITER 'delimiter_character'
:这个 one-byte 的的 delimiter 用来指定文件中每行数据的列分隔符,默认是制表符\t
。注意,由于目前 ProtonBase 只支持 text 格式的文件输入,所以这个 delimiter 如果在正常的字段内容中出现的话,需要进行转义。这个问题会在后续的版本会通过支持 csv 和 binary 格式的文件输入来解决。NULL 'null_string'
:这个选项用来指定表示一列为 NULL 值的字符串,默认是两个字符的文本 \N.ON CONFLICT DO NOTHING
:当指定这个选项的时候,若被 copy 的文件中某行的数据的 PK 在已有数据中已经存在的情况下,这行数据就会被忽略。
有关COPY
更多的使用语法,请参考COPY
。
可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导入
import org.postgresql.copy.CopyManager;
public static void copyToFile(Connection connection, String filePath, String tableOrQuery)
throws SQLException, IOException {
FileOutputStream fileOutputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
fileOutputStream = new FileOutputStream(filePath);
String copyOut = "COPY " + tableOrQuery + " TO STDOUT DELIMITER AS ','";
final long line = copyManager.copyOut(copyOut, fileOutputStream);
System.out.println(line);
} finally {
if (fileOutputStream != null) {
try {
fileOutputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
INSERT 导入
INSERT
语句用于向表中插入新的记录。插入时可以逐行插入,也可以一次插入多行。语法如下:
[ WITH [ RECURSIVE ] with_query [, ...] ]
INSERT INTO table_name [ AS alias ] [ ( column_name [, ...] ) ]
[ OVERRIDING { SYSTEM | USER } VALUE ]
{ DEFAULT VALUES | VALUES ( { expression | DEFAULT } [, ...] ) [, ...] | query }
[ ON CONFLICT [ conflict_target ] conflict_action ]
[ RETURNING * | output_expression [ [ AS ] output_name ] [, ...] ]
where conflict_target can be one of:
( { index_column_name | ( index_expression ) } [ COLLATE collation ] [ opclass ] [, ...] ) [ WHERE index_predicate ]
ON CONSTRAINT constraint_nameand conflict_action is one of:
DO NOTHING
DO UPDATE SET { column_name = { expression | DEFAULT } |
( column_name [, ...] ) = [ ROW ] ( { expression | DEFAULT } [, ...] ) |
( column_name [, ...] ) = ( sub-SELECT )
} [, ...]
[ WHERE condition ]
有关INSERT
更多的使用语法,请参考INSERT
。
测试表定义如下。注: 后续如无特殊说明,均使用上面的表定义进行测试
CREATE TABLE t1(
id int primary key,
name text,
age smallint,
gender char(1)
);
- 显式指定列名,字段列和数值列顺序相同,一一对应。可以只指定部分列名和对应 VALUE, 其余列的 VALUE 会按照 default value 进行更新
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname','22','f');
INSERT 0 1
protonbase=> INSERT INTO t1 (id ,name) VALUES (2,'testname');
INSERT 0 1
- 不指定列名,需要在 VALUES 里指定所有字段的值
使用示例:
protonbase=> INSERT INTO t1 VALUES (3,'testname3','33','f');
INSERT 0 1
- 批量 insert,可以一次 insert 多条
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (4,'testname4','22','f'), (5,'testname4','22','m');
INSERT 0 2
- 将查询结果作为 INSERT INTO 的数据
使用示例:
protonbase=> INSERT INTO t1 (id, name, age, gender) SELECT id, name, age, gender FROM t2 WHERE id>10;
INSERT 0 1
- 针对写入数据有主键冲突的场景,可以使用 INSERT INTO ON CONFLICT 语法来定义主键冲突后的行为,CONFLICT_ACTTION 主要有两种:
- NOTHING, 主键冲突后不执行任何操作,丢弃待插入的冲突数据
- UPDATE, 主键冲突后,更新指定列(注意: 虽然语法上允许,但应避免对主键列进行更新)
一般建议 on conflict 按主键或者配置了唯一性的索引。否则会触发全表扫描,性能较差。
使用示例:
protonbase=> \d t1
数据表 "public.t1"
栏位 | 类型 | 校对规则 | 可空的 | 预设
--------+--------------+----------+----------+--------------------------------------
id | integer | | not null | nextval('t1_id_seq'::text::regclass)
name | text | | |
age | smallint | | |
gender | character(1) | | |
索引:
"t1_pkey" PRIMARY KEY, btree (id ASC)
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+----------+-----+--------
1 | testname | 22 | f
(1 行记录)
-- 使用INSERT INTO ON CONFLICT DO NONTHING的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO NOTHING;
INSERT 0 0
-- 数据并未更新
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+----------+-----+--------
1 | testname | 22 | f
(1 行记录)
-- 使用INSERT INTO ON CONFLICT DO UPDATE的方式插入有主键冲突的数据
protonbase=> INSERT INTO t1 (id, name, age, gender) VALUES (1,'testname1', 11, 'm') ON CONFLICT(id) DO UPDATE SET name=EXCLUDED.name,age=EXCLUDED.age,gender=EXCLUDED.gender;
INSERT 0 1
-- 数据按指定列更新
protonbase=> SELECT * FROM t1 WHERE id=1;
id | name | age | gender
----+-----------+-----+--------
1 | testname1 | 11 | m
(1 行记录)
性能优化
对于需要插入大量数据的场景,使用批量提交语句可以将多条 SQL 语句一次性提交到 ProtonBase, 通过攒批的方式来优化导入性能。示例如下:
INSERT INTO test (col1, col2) VALUES ('A', 10), ('B', 20), ('C', 30)
Connection conn = dataSource.getConnection();
try {
conn.setAutoCommit(false); // 关闭自动提交,开启事务
PreparedStatement pstat = conn.prepareStatement("INSERT INTO test (col1, col2) VALUES (?, ?)");
// Batch方式添加记录
pstat.setString(1, "A");
pstat.setInt(2, 10);
pstat.addBatch();
pstat.setString(1, "B");
pstat.setInt(2, 20);
pstat.addBatch();
pstat.setString(1, "C");
pstat.setInt(2, 30);
pstat.addBatch();
// 批量提交
pstat.executeBatch();
conn.commit(); // 提交事务
} catch (Exception e) {
conn.rollback(); // 回滚事务
e.printStackTrace();
} finally {
conn.setAutoCommit(true); // 恢复自动提交
conn.close(); // 关闭连接
}
对往一个大数据量表上进行增量更新的场景,比如在 30 亿底表上希望更新 300 万的增量。由于 COPY 不支持 ON CONFLICT DO UPDATE,可以通过 COPY FROM 到临时表 + INSERT INTO SELECT FROM (临时表) ON CONFLICT 的方案来获取整体最佳导入性能。示例步骤如下:
-- 测试表的格式
test=> \d test
数据表 "public.test"
栏位 | 类型 | 校对规则 | 可空的 | 预设
------+---------+----------+----------+------
col1 | integer | | not null |
col2 | text | | |
col3 | text | | |
索引:
"test_pkey" PRIMARY KEY, btree (col1 ASC)
-- 创建临时表
test=> CREATE TABLE test_tmp (like test including all);
-- 执行copy导入到临时表
test=> \copy "test_tmp" FROM csv_file;
-- 通过 insert into select from 批量导入
test=> INSERT INTO test(col1, col2, col3) SELECT col1, col2, col3 FROM test_tmp on conflict (col1) do update set col1=excluded.col1,col2=excluded.col2,col3=excluded.col3;
数据导出
COPY TO 导出
详细的 COPY TO
命令的语法如下:
COPY { table_name [ ( column_name [, ...] ) ] | ( query ) }
TO { 'filename' | STDOUT }
[ [ WITH ] ( option [, ...] ) ]
其中,WITH 支持的选项有:
- DELIMITER 'delimiter_character'
- NULL 'null_string'
它们的语义和上面 COPY FROM 中作用一样。 作为例子:
- 若想将 students 表全部导出,可以执行如下的命令:
postgres=> \COPY students to stu.out;
COPY 3
- 如果想把学号前 200 的学生导出,可以执行如下的命令:
postgres=> \COPY (SELECT * FROM students WHERE id<=200) TO stu.out;
COPY 1
可以使用 postgresql 提供的 CopyManager 来使用 JDBC 驱动的方式进行数据导出
import org.postgresql.copy.CopyManager;
public static long copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
FileInputStream fileInputStream = null;
try {
CopyManager copyManager = new CopyManager((BaseConnection) connection);
fileInputStream = new FileInputStream(filePath);
String copyIn = "COPY " + tableName + " FROM STDIN DELIMITER AS ','";
return copyManager.copyIn(copyIn, fileInputStream);
} finally {
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
流式数据导出
ProtonBase 支持通过流复制协议进行数据导出,参考Change Data Capture。