从 ElasticSearch 迁移到 ProtonBase
Elasticsearch(下文简称ES)是一个基于Lucene的开源、分布式、RESTful搜索引擎,由Elastic公司开发。它以其快速搜索、高扩展性和实时分析能力而闻名。Elasticsearch广泛应用于搜索、全文检索日志分析等领域,支持大规模数据集的实时索引和搜索,同时提供强大的数据分析和可视化功能。
本文介绍如何将ES的数据迁移到 ProtonBase 中。
操作步骤
数据导出
- 安装Node.js
- Windows: 安装包可以从Node.js官网 (opens in a new tab)获取
- Linux: 从仓库中安装或自行编译;
- 安装elasticdump:
npm install -g elasticdump
- 导出数据
# 导出全部数据
elasticdump \
--input 'http://localhost:9200/' \
--output 'dump.json' \
--timeout 600000 \
# 导出特定index数据
INDEX_NAME="gharchive"
elasticdump \
--input 'http://localhost:9200/${INDEX_NAME}' \
--output 'dump.json' \
--timeout 600000 \
创建目标表
CREATE TABLE gharchive(
id TEXT NOT NULL PRIMARY KEY
,doc JSONB
)
using hybrid -- 根据需要选择存储格式,可选值为 [row, columnar, hybrid]
;
ETL导入数据
import json
import psycopg2
def make_connection():
return psycopg2.connect('host=... port=5432 dbname=postgres username=... password=...')
def main():
inited_tables = set()
with open('dump.json', 'r', encoding='utf8') as fd, make_connection() as conn:
with conn.cursor() as cursor:
for line in fd:
line = line.strip()
if not line:
continue
payload = json.loads(line)
index_name = payload['_index']
document_id = payload['_id']
document = payload['_source']
if index_name not in inited_tables:
# 自动初始化表
init_sql = f'''CREATE TABLE IF NOT EXISTS {index_name}(id TEXT NOT NULL PRIMARY KEY, doc JSONB) USING hybrid;'''
cursor.execute(init_sql)
conn.commit()
# 写入document
insert_sql = f'''INSERT INTO {index_name}(id, doc) VALUES(%s, %s)'''
cursor.execute(insert_sql, (document_id, json.dumps(document)))
conn.commit()
if __name__ == '__main__':
main()
应用端改造
CRUD 操作
写入文档
-- 写入文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB);
-- 更新文档
UPDATE index_name
SET doc='{"key": "val2", "flag": false}'::JSONB
WHERE id='123';
-- upsert文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB)
ON CONFLICT(id) DO UPDATE SET doc=EXCLUDED.doc;
-- partial update
UPDATE index_name
SET doc=jsonb_set(doc, '{flag}', 'false')
WHERE id='123';
取回文档
/*
# 取回整个文档
curl http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}
*/
SELECT doc FROM index_name
WHERE id='123';
/*
取回文档部分内容
curl http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}?_source_includes=id,type
*/
SELECT
doc->>'id' AS "id",
doc->>'type' AS "type"
FROM index_name
WHERE id='123';
删除文档
/*
curl -XDELETE http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}
*/
DELETE FROM index_name
WHERE id='123';
搜索
索引查询
/*
搜索以下内容
1. q=key:val ==> 索引key的值为val
2. sort=weight:desc ==> 按照索引weight进行反向排序
3. size=100 ==> 取回100条结果
curl http://localhost:9200/${INDEX_NAME}/_search?q=key:val&sort=weight:desc&size=100
*/
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
ORDER BY weight DESC
LIMIT 100
全文检索
- 若需使用全文检索功能,需提前创建split_gin索引,详情可参考 索引设计
- 使用tsvector方式进行搜索,建议建立索引
CREATE INDEX ON index_name USING split_gin(to_tsvector('english', doc->>'key'), tsvector_ops)
; - 使用trgm方式进行搜索,建议建立索引
CREATE INDEX ON index_name USING split_gin(doc->>'key', gin_trgm_ops)
;
- 使用tsvector方式进行搜索,建议建立索引
/*
curl "http://localhost:9200/${INDEX_NAME}/_search" \
--header 'content-type: application/json' \
--data 'query payload'
*/
-- 精确文本匹配
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
LIMIT 100;
-- 使用like进行模糊匹配
SELECT doc FROM index_name
WHERE (doc->>'key') like '%val%'
LIMIT 100;
-- 使用tsvector进行全文检索
SELECT doc FROM index_name
WHERE to_tsvector('english', (doc->>'key')) @@ to_tsquery('english', 'val')
LIMIT 100;
-- 使用trgm进行文本相似度搜索
SELECT doc FROM index_name
WHERE (doc->>'key') % 'val' -- % 是trgm扩展的运算符
LIMIT 100
聚合查询
NDV统计
/*
{
"aggs":{
"distinct_name_count":{"cardinality":{"field":"fees"}}
}
}
*/
-- 迁移后SQL
SELECT COUNT(DISTINCT (doc->>'fees')) AS ndv
FROM index_name
最大聚合 & 最小聚合
/*
{
"aggs" : {
"fees_stats" : { "max" : { "field" : "fees" } }
}
}
*/
SELECT max(fees) FROM index_name;
/*
{
"aggs" : {
"min_fees" : { "min" : { "field" : "fees" } }
}
}
*/
SELECT min(fees) FROM index_name;
汇总聚合
/*
{
"aggs" : {
"total_fees" : { "sum" : { "field" : "fees" } }
}
}
*/
SELECT sum(fees) FROM index_name;