从 ElasticSearch 迁移

从 ElasticSearch 迁移到 ProtonBase

Elasticsearch(下文简称ES)是一个基于Lucene的开源、分布式、RESTful搜索引擎,由Elastic公司开发。它以其快速搜索、高扩展性和实时分析能力而闻名。Elasticsearch广泛应用于搜索、全文检索日志分析等领域,支持大规模数据集的实时索引和搜索,同时提供强大的数据分析和可视化功能。

本文介绍如何将ES的数据迁移到 ProtonBase 中。

操作步骤

数据导出

  1. 安装Node.js
    1. Windows: 安装包可以从Node.js官网 (opens in a new tab)获取
    2. Linux: 从仓库中安装或自行编译;
  2. 安装elasticdump: npm install -g elasticdump
  3. 导出数据
# 导出全部数据
elasticdump \
--input 'http://localhost:9200/' \
--output 'dump.json' \
--timeout 600000 \
 
 
# 导出特定index数据
INDEX_NAME="gharchive"
elasticdump \
--input 'http://localhost:9200/${INDEX_NAME}' \
--output 'dump.json' \
--timeout 600000 \

创建目标表

CREATE TABLE gharchive(
  id TEXT NOT NULL PRIMARY KEY
  ,doc JSONB
 )
 using hybrid -- 根据需要选择存储格式,可选值为 [row, columnar, hybrid]
 ;

ETL导入数据

import json
import psycopg2
 
def make_connection():
    return psycopg2.connect('host=... port=5432 dbname=postgres username=... password=...')
    
    
def main():
    inited_tables = set()
    with open('dump.json', 'r', encoding='utf8') as fd, make_connection() as conn:
        with conn.cursor() as cursor:
            for line in fd:
                line = line.strip()
                if not line:
                    continue
                payload = json.loads(line)
                index_name = payload['_index']
                document_id = payload['_id']
                document = payload['_source']
                if index_name not in inited_tables:
                    # 自动初始化表
                    init_sql = f'''CREATE TABLE IF NOT EXISTS {index_name}(id TEXT NOT NULL PRIMARY KEY, doc JSONB) USING hybrid;'''
                    cursor.execute(init_sql)
                    conn.commit()
                # 写入document
                insert_sql = f'''INSERT INTO {index_name}(id, doc) VALUES(%s, %s)'''
                cursor.execute(insert_sql, (document_id, json.dumps(document)))
                conn.commit()
 
if __name__ == '__main__':
    main()

应用端改造

CRUD 操作

写入文档

-- 写入文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB);
 
-- 更新文档
UPDATE index_name
    SET doc='{"key": "val2", "flag": false}'::JSONB
WHERE id='123';
 
 
-- upsert文档
INSERT INTO index_name(id, doc)
VALUES('123', '{"key": "val"}'::JSONB)
ON CONFLICT(id) DO UPDATE SET doc=EXCLUDED.doc;
 
-- partial update
UPDATE index_name
    SET doc=jsonb_set(doc, '{flag}', 'false')
WHERE id='123';

取回文档

/*
# 取回整个文档
curl http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}
*/
 
SELECT doc FROM index_name
WHERE id='123';
 
 
/*
取回文档部分内容
curl http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}?_source_includes=id,type
*/
SELECT
    doc->>'id' AS "id",
    doc->>'type' AS "type"
FROM index_name
WHERE id='123';
 

删除文档

/*
curl -XDELETE http://localhost:9200/${INDEX_NAME}/_doc/${DOCUMENT_ID}
*/
 
DELETE FROM index_name
WHERE id='123';

搜索

索引查询

/*
搜索以下内容
1. q=key:val ==> 索引key的值为val
2. sort=weight:desc ==> 按照索引weight进行反向排序
3. size=100 ==> 取回100条结果
curl http://localhost:9200/${INDEX_NAME}/_search?q=key:val&sort=weight:desc&size=100
*/
 
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
ORDER BY weight DESC
LIMIT 100

全文检索

  1. 若需使用全文检索功能,需提前创建split_gin索引,详情可参考 索引设计
    1. 使用tsvector方式进行搜索,建议建立索引 CREATE INDEX ON index_name USING split_gin(to_tsvector('english', doc->>'key'), tsvector_ops);
    2. 使用trgm方式进行搜索,建议建立索引CREATE INDEX ON index_name USING split_gin(doc->>'key', gin_trgm_ops);
/*
curl "http://localhost:9200/${INDEX_NAME}/_search" \
--header 'content-type: application/json' \
--data 'query payload'
*/
 
 
-- 精确文本匹配
SELECT doc FROM index_name
WHERE (doc->>'key') = 'val'
LIMIT 100;
 
-- 使用like进行模糊匹配
SELECT doc FROM index_name
WHERE (doc->>'key') like '%val%'
LIMIT 100;
 
-- 使用tsvector进行全文检索
SELECT doc FROM index_name
WHERE to_tsvector('english', (doc->>'key')) @@ to_tsquery('english', 'val')
LIMIT 100;
 
-- 使用trgm进行文本相似度搜索
SELECT doc FROM index_name
WHERE (doc->>'key') % 'val' -- % 是trgm扩展的运算符
LIMIT 100

聚合查询

NDV统计

/*
{
   "aggs":{
      "distinct_name_count":{"cardinality":{"field":"fees"}}
   }
}
*/
-- 迁移后SQL
 
SELECT COUNT(DISTINCT (doc->>'fees')) AS ndv
FROM index_name

最大聚合 & 最小聚合

/*
{
   "aggs" : {
      "fees_stats" : { "max" : { "field" : "fees" } }
   }
}
*/
SELECT max(fees) FROM index_name;
 
/*
{
   "aggs" : {
      "min_fees" : { "min" : { "field" : "fees" } }
   }
}
*/
SELECT min(fees) FROM index_name;
 

汇总聚合

/*
{
   "aggs" : {
      "total_fees" : { "sum" : { "field" : "fees" } }
   }
}
*/
SELECT sum(fees) FROM index_name;