使用 PL/Python 编写 UDF

1 PL/Python 扩展概述

PL/Python 是一种可加载的过程语言,它允许你使用 Python 编写数据库的用户定义函数(UDF)或存储过程。这意味着你可以在数据库中直接利用 Python 生态系统的强大功能进行复杂的数据处理、分析和机器学习推理,无需将数据移出数据库,从而提升开发效率并减少数据移动的开销。

核心价值:

  • 扩展性:突破 SQL 内置函数的限制,实现任意复杂逻辑。

  • 灵活性:利用 Python 庞大的生态系统(如 NumPy, Pandas, Scikit-learn, Requests 等)。

  • 便利性:数据处理逻辑离数据更近,减少数据移动,提升开发效率。

在 ProtonBase 中,支持的 PL/Python 扩展包括:

  • plpython3u:用于支持 Python 3 的不可信(untrusted)语言扩展。u 代表 "untrusted",意味着它具有更少的安全沙盒限制,功能强大但需谨慎使用。
  • plpythonvec3u:用于 Python 3 的、支持向量化执行的不可信语言扩展。它旨在显著提升处理批量数据行的性能。

注:使用 PL/Python 需要首先绑定 UDF Server,并保持 UDF Server 处于运行状态。

2 内置 Python 库支持

为了提升开发便利性和性能,系统已预装了以下常用的 Python 库,您可以在 PL/Python UDF 中直接导入使用,无需额外安装:

2.1 数据库连接与操作

  • psycopg2: PostgreSQL 适配器,用于在 Python 中连接和操作 PostgreSQL数据库
  • SQLAlchemy: Python SQL 工具包和对象关系映射器,提供高效的数据访问模式

2.2 数据处理与分析

  • numpy: 基础科学计算库,提供高性能的多维数组对象和数学函数

  • pandas: 强大的数据分析和操作工具,提供 DataFrame 数据结构

  • SciPy: 基于 numpy 的科学计算库,提供数学、科学和工程计算功能

  • Scikit-learn: 机器学习库,提供各种分类、回归和聚类算法

2.3 安全与加密

  • pycryptodome: 加密工具包,提供各种加密算法实现

  • cryptography: 密码学库,提供加密配方和基元

  • passlib: 密码哈希库,支持多种密码哈希方案

  • pyOpenSSL: OpenSSL 绑定,提供 SSL/TLS 功能

  • base58: Base58 编码/解码库,常用于加密货币地址编码

2.4 网络与云服务

  • Requests: HTTP 库,用于发送 HTTP 请求和调用 REST API

  • boto3: AWS SDK for Python,用于与 AWS 服务进行交互

2.5 工具与工具

  • pytest: 测试框架,可用于编写和运行测试用例

  • pendulum: 日期时间库,提供更直观的日期时间操作接口

这些预装库覆盖了从数据处理、机器学习到安全加密和云服务集成的常见用例,让您能够快速构建复杂的数据库内处理逻辑。

3 启用 PL/Python 支持

要使用 PL/Python,首先需要在目标数据库中启用相应的扩展。

3.1 启用 plpython3u

使用 CREATE EXTENSION 命令来启用 plpython3u。由于是不可信语言,需要超级用户权限

- 连接到目标数据库后执行
CREATE EXTENSION IF NOT EXISTS plpython3u;

3.2 启用 plpythonvec3u

-- 使用超级用户执行
CREATE EXTENSION IF NOT EXISTS plpythonvec3u;

4 基于 plpython3u 编写 PL/Python UDF

PL/Python UDF 的核心是在 SQL 函数定义中编写 Python 脚本。

4.1 基本结构

一个基本的 PL/Python UDF 创建语法如下:

CREATE OR REPLACE FUNCTION function_name (argument_list)
  RETURNS return_type
AS $$
  # Python 脚本在这里
  # 函数逻辑
  return result  # 返回结果
$$ LANGUAGE plpython3u;

4.2 数据类型映射

PL/Python 会自动在 SQL 和 Python 数据类型之间进行转换:

PostgreSQL 类型Python 类型
TEXTstr
INTEGERint
FLOAT8float
BOOLbool
ARRAY (e.g., INTEGER[])list (e.g., [int, int, ...])
JSON/JSONBdict/list (Python 对象)
NULLNone

4.3 基础示例

4.3.1 字符串处理函数

CREATE OR REPLACE FUNCTION capitalize_name(name TEXT)
  RETURNS TEXT
AS $$
  if name is None:
      return None
  return name.title()  # 将姓名转换为首字母大写
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT capitalize_name('john doe');  -- 返回 'John Doe'

4.3.2 数值计算与数组处理

CREATE OR REPLACE FUNCTION calculate_average(arr float8[])
  RETURNS float8
AS $$
  # 计算列表平均值
  if not arr or len(arr) == 0:
      return 0.0
  return sum(arr) / len(arr)
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT calculate_average(ARRAY[1.0, 2.5, 3.7, 4.1]);  -- 返回 2.825

4.3.3 返回数组

CREATE FUNCTION return_py_int_array()
  RETURNS int[]
AS $$
  return [1, 11, 21, 31]
$$ LANGUAGE plpython3u;
 
SELECT return_py_int_array(); -- 返回 {1,11,21,31}

5 基于 plpythonvec3u 编写 PL/Python UDF

5.1 向量化执行与 plpythonvec3u

plpythonvec3u 是一个旨在显著提升 PL/Python UDF 处理批量数据性能的扩展。

核心概念:向量化执行

  • 传统 UDF (plpython3u):对查询结果集中的每一行都会调用一次 Python 函数。这意味着如果你处理 100 万行数据,Python 解释器会被调用 100 万次,并且每次调用只处理一行数据。这种模式的开销(进程间通信、函数调用)非常大。
  • 向量化 UDF (plpythonvec3u)一次性接收一批行(例如一个数组或矩阵),在 Python 函数内部进行向量化或批处理操作。这样极大地减少了函数调用次数和进程间通信的开销,并能更好地利用 Python 科学计算库(如 NumPy、Pandas)的向量化操作优势。

plpythonvec3u 的预期优势

  1. 大幅提升性能:减少函数调用和进程切换开销,尤其适合需要处理大量行的聚合、转换和机器学习推理场景。
  2. 更好的硬件利用:能够更高效地利用现代 CPU 的 SIMD 指令和多核并行能力。
  3. 与科学计算库无缝集成:直接接收和返回 NumPy 数组或 Pandas DataFrame,便于使用这些库优化过的批量计算函数。

5.2 使用示例

-- 标准的单条处理函数
CREATE OR REPLACE FUNCTION process_text_single(text_input text)
RETURNS text
AS $$
    import re
    result = text_input.upper()
    result = re.sub(r'\s+', '_', result)
    return f"SINGLE_PROCESSED: {result}"
$$ LANGUAGE plpython3u;
 
-- 使用:每行都会单独调用一次函数
SELECT id, original_text, process_text_single(original_text) as processed
FROM (
    VALUES
    (1, 'hello world'),
    (2, 'python function'),
    (3, 'database processing')
) AS t(id, original_text);
-- 相同的函数签名,但内部会收到批量数据
CREATE OR REPLACE FUNCTION process_text_batch(text_input text)
RETURNS text
AS $$
    import re
    results = []
    for item in text_input:  # text_input 是 ['hello world', 'python function', 'database processing']
        if item is not None:
            processed = item.upper()
            processed = re.sub(r'\s+', '_', processed)
            results.append(f"BATCH_PROCESSED: {processed}")
        else:
            results.append(None)
    
    return results  # 返回结果列表
$$ LANGUAGE plpythonvec3u;
 
-- 使用方式完全相同!但内部是批量处理
SELECT id, original_text, process_text_batch(original_text) as processed
FROM (
    VALUES
    (1, 'hello world'),
    (2, 'python function'),
    (3, 'database processing')
) AS t(id, original_text);

6 内置 Python 库 UDF 使用示例

6.1 NumPy UDF 数组统计分析示例

CREATE OR REPLACE FUNCTION numpy_array_stats(arr float8[])
RETURNS JSONB
AS $$
    import numpy as np
    import json
    
    if arr is None or len(arr) == 0:
        return json.dumps({"error": "Empty array provided"})
    
    np_array = np.array(arr)
    
    stats = {
        "mean": float(np.mean(np_array)),
        "median": float(np.median(np_array)),
        "std_dev": float(np.std(np_array)),
        "variance": float(np.var(np_array)),
        "min": float(np.min(np_array)),
        "max": float(np.max(np_array)),
        "sum": float(np.sum(np_array)),
        "percentile_25": float(np.percentile(np_array, 25)),
        "percentile_75": float(np.percentile(np_array, 75))
    }
    
    return json.dumps(stats)
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT numpy_array_stats(ARRAY[1.0, 2.5, 3.7, 4.1, 5.8, 6.2, 7.9]);

6.2 Pandas UDF 数据清洗和转换示例

CREATE OR REPLACE FUNCTION pandas_data_cleanup(data_json JSONB)
RETURNS JSONB
AS $$
    import pandas as pd
    import json
    from datetime import datetime
    
    # 将 JSON 数据转换为 DataFrame
    data_dict = json.loads(data_json)
    df = pd.DataFrame(data_dict)
    
    # 数据清洗操作
    # 1. 处理缺失值
    df.fillna({'age': df['age'].mean(), 'salary': df['salary'].median()}, inplace=True)
    
    # 2. 数据类型转换
    df['join_date'] = pd.to_datetime(df['join_date'])
    df['age'] = df['age'].astype(int)
    
    # 3. 创建新特征
    current_year = datetime.now().year
    df['years_of_service'] = current_year - df['join_date'].dt.year
    
    # 4. 数据过滤
    df = df[df['salary'] > 30000]
    
    # 5. 将 Timestamp 转换为字符串,以便 JSON 序列化
    df['join_date'] = df['join_date'].dt.strftime('%Y-%m-%d')
    
    # 6. 数据聚合
    summary = {
        'total_employees': len(df),
        'average_salary': float(df['salary'].mean()),
        'max_salary': float(df['salary'].max()),
        'department_counts': df['department'].value_counts().to_dict(),
        'cleaned_data': df.to_dict('records')
    }
    
    return json.dumps(summary)
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT pandas_data_cleanup('[
    {"name": "John", "age": 30, "salary": 50000, "department": "IT", "join_date": "2018-05-15"},
    {"name": "Jane", "age": null, "salary": 45000, "department": "HR", "join_date": "2020-02-20"},
    {"name": "Bob", "age": 35, "salary": null, "department": "IT", "join_date": "2019-11-10"}
]'::jsonb);

6.3 Cryptography UDF 密码哈希验证示例

CREATE OR REPLACE FUNCTION hash_password(password TEXT)
RETURNS TEXT
AS $$
    from passlib.hash import bcrypt
    
    try:
        # 检查密码是否为空
        if password is None or password.strip() == '':
            return NULL
            
        # 使用 bcrypt 算法哈希密码
        hashed_password = bcrypt.hash(password)
        return hashed_password
        
    except Exception:
        # 发生异常时返回NULL
        return NULL
        
$$ LANGUAGE plpython3u;
 
CREATE OR REPLACE FUNCTION verify_password(password TEXT, hashed_password TEXT)
RETURNS BOOLEAN
AS $$
    from passlib.hash import bcrypt
    
    try:
        # 检查输入参数是否有效
        if password is None or password.strip() == '':
            return False
            
        if hashed_password is None or hashed_password.strip() == '':
            return False
            
        # 验证哈希值格式(基本检查)
        if not (hashed_password.startswith('$2b$') or
                hashed_password.startswith('$2a$') or
                hashed_password.startswith('$2y$')):
            return False
            
        # 验证密码是否匹配哈希值
        return bcrypt.verify(password, hashed_password)
        
    except Exception:
        # 发生任何异常都返回false
        return False
        
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT hash_password('your_key_123');
SELECT verify_password('your_key_123', '$2b$12$ah/---');
 
-- 测试异常场景
SELECT verify_password('', '$2b$12$ah---'); -- 空密码
SELECT verify_password('your_key_123', ''); -- 空哈希值
SELECT verify_password('your_key_123', 'invalid_hash_format'); -- 无效哈希格式
SELECT verify_password(NULL, '$2b$12$...'); -- NULL密码
SELECT verify_password('your_key_123', NULL); -- NULL哈希值

6.4 Requests 汇率查询示例

CREATE OR REPLACE FUNCTION get_exchange_rate(base_currency TEXT, target_currency TEXT)
RETURNS JSONB
AS $$
    import requests
    import json
    
    # 使用免费的汇率API(示例)
    url = f'https://api.exchangerate-api.com/v4/latest/{base_currency}'
    
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()
        
        rates_data = response.json()
        
        if target_currency in rates_data['rates']:
            result = {
                "base_currency": base_currency,
                "target_currency": target_currency,
                "exchange_rate": rates_data['rates'][target_currency],
                "last_updated": rates_data['date']
            }
            return json.dumps(result)
        else:
            return json.dumps({"error": f"Currency {target_currency} not found"})
            
    except requests.exceptions.RequestException as e:
        return json.dumps({"error": f"Exchange rate request failed: {str(e)}"})
$$ LANGUAGE plpython3u;
 
-- 使用示例
SELECT get_exchange_rate('USD', 'EUR');
SELECT get_exchange_rate('EUR', 'JPY');

7 安全性与生产环境注意事项

  1. 不可信语言plpython3uplpythonvec3u 都是不可信语言,功能强大但风险较高切勿将使用或创建这些函数的权限授予不受信任的用户
  2. 模块安装:UDF 中使用的 Python 库(如 numpy, pandas)只能使用内置的 Python 库,不支持手动安装第三方库。
  3. 错误处理:在 Python 代码中使用 try-except 块来妥善处理潜在异常,避免整个查询因未捕获的异常而失败。
  4. 资源管理:复杂的 Python 函数可能会消耗大量 CPU 和内存资源。监控数据库资源使用情况,确保 UDF 不会影响数据库的整体稳定性。
  5. 代码质量:确保编写的 Python 代码健壮、高效,并进行充分测试,特别是处理边界情况(如空输入、NULL 值)。