使用 PL/Python 编写 UDF
1 PL/Python 扩展概述
PL/Python 是一种可加载的过程语言,它允许你使用 Python 编写数据库的用户定义函数(UDF)或存储过程。这意味着你可以在数据库中直接利用 Python 生态系统的强大功能进行复杂的数据处理、分析和机器学习推理,无需将数据移出数据库,从而提升开发效率并减少数据移动的开销。
核心价值:
-
扩展性:突破 SQL 内置函数的限制,实现任意复杂逻辑。
-
灵活性:利用 Python 庞大的生态系统(如 NumPy, Pandas, Scikit-learn, Requests 等)。
-
便利性:数据处理逻辑离数据更近,减少数据移动,提升开发效率。
在 ProtonBase 中,支持的 PL/Python 扩展包括:
plpython3u
:用于支持 Python 3 的不可信(untrusted)语言扩展。u
代表 "untrusted",意味着它具有更少的安全沙盒限制,功能强大但需谨慎使用。plpythonvec3u
:用于 Python 3 的、支持向量化执行的不可信语言扩展。它旨在显著提升处理批量数据行的性能。
注:使用 PL/Python 需要首先绑定 UDF Server,并保持 UDF Server 处于运行状态。
2 内置 Python 库支持
为了提升开发便利性和性能,系统已预装了以下常用的 Python 库,您可以在 PL/Python UDF 中直接导入使用,无需额外安装:
2.1 数据库连接与操作
- psycopg2: PostgreSQL 适配器,用于在 Python 中连接和操作 PostgreSQL数据库
- SQLAlchemy: Python SQL 工具包和对象关系映射器,提供高效的数据访问模式
2.2 数据处理与分析
-
numpy: 基础科学计算库,提供高性能的多维数组对象和数学函数
-
pandas: 强大的数据分析和操作工具,提供 DataFrame 数据结构
-
SciPy: 基于 numpy 的科学计算库,提供数学、科学和工程计算功能
-
Scikit-learn: 机器学习库,提供各种分类、回归和聚类算法
2.3 安全与加密
-
pycryptodome: 加密工具包,提供各种加密算法实现
-
cryptography: 密码学库,提供加密配方和基元
-
passlib: 密码哈希库,支持多种密码哈希方案
-
pyOpenSSL: OpenSSL 绑定,提供 SSL/TLS 功能
-
base58: Base58 编码/解码库,常用于加密货币地址编码
2.4 网络与云服务
-
Requests: HTTP 库,用于发送 HTTP 请求和调用 REST API
-
boto3: AWS SDK for Python,用于与 AWS 服务进行交互
2.5 工具与工具
-
pytest: 测试框架,可用于编写和运行测试用例
-
pendulum: 日期时间库,提供更直观的日期时间操作接口
这些预装库覆盖了从数据处理、机器学习到安全加密和云服务集成的常见用例,让您能够快速构建复杂的数据库内处理逻辑。
3 启用 PL/Python 支持
要使用 PL/Python,首先需要在目标数据库中启用相应的扩展。
3.1 启用 plpython3u
使用 CREATE EXTENSION
命令来启用 plpython3u
。由于是不可信语言,需要超级用户权限。
- 连接到目标数据库后执行
CREATE EXTENSION IF NOT EXISTS plpython3u;
3.2 启用 plpythonvec3u
-- 使用超级用户执行
CREATE EXTENSION IF NOT EXISTS plpythonvec3u;
4 基于 plpython3u 编写 PL/Python UDF
PL/Python UDF 的核心是在 SQL 函数定义中编写 Python 脚本。
4.1 基本结构
一个基本的 PL/Python UDF 创建语法如下:
CREATE OR REPLACE FUNCTION function_name (argument_list)
RETURNS return_type
AS $$
# Python 脚本在这里
# 函数逻辑
return result # 返回结果
$$ LANGUAGE plpython3u;
4.2 数据类型映射
PL/Python 会自动在 SQL 和 Python 数据类型之间进行转换:
PostgreSQL 类型 | Python 类型 |
---|---|
TEXT | str |
INTEGER | int |
FLOAT8 | float |
BOOL | bool |
ARRAY (e.g., INTEGER[] ) | list (e.g., [int, int, ...] ) |
JSON /JSONB | dict /list (Python 对象) |
NULL | None |
4.3 基础示例
4.3.1 字符串处理函数
CREATE OR REPLACE FUNCTION capitalize_name(name TEXT)
RETURNS TEXT
AS $$
if name is None:
return None
return name.title() # 将姓名转换为首字母大写
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT capitalize_name('john doe'); -- 返回 'John Doe'
4.3.2 数值计算与数组处理
CREATE OR REPLACE FUNCTION calculate_average(arr float8[])
RETURNS float8
AS $$
# 计算列表平均值
if not arr or len(arr) == 0:
return 0.0
return sum(arr) / len(arr)
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT calculate_average(ARRAY[1.0, 2.5, 3.7, 4.1]); -- 返回 2.825
4.3.3 返回数组
CREATE FUNCTION return_py_int_array()
RETURNS int[]
AS $$
return [1, 11, 21, 31]
$$ LANGUAGE plpython3u;
SELECT return_py_int_array(); -- 返回 {1,11,21,31}
5 基于 plpythonvec3u 编写 PL/Python UDF
5.1 向量化执行与 plpythonvec3u
plpythonvec3u
是一个旨在显著提升 PL/Python UDF 处理批量数据性能的扩展。
核心概念:向量化执行
- 传统 UDF (plpython3u):对查询结果集中的每一行都会调用一次 Python 函数。这意味着如果你处理 100 万行数据,Python 解释器会被调用 100 万次,并且每次调用只处理一行数据。这种模式的开销(进程间通信、函数调用)非常大。
- 向量化 UDF (plpythonvec3u):一次性接收一批行(例如一个数组或矩阵),在 Python 函数内部进行向量化或批处理操作。这样极大地减少了函数调用次数和进程间通信的开销,并能更好地利用 Python 科学计算库(如 NumPy、Pandas)的向量化操作优势。
plpythonvec3u 的预期优势
- 大幅提升性能:减少函数调用和进程切换开销,尤其适合需要处理大量行的聚合、转换和机器学习推理场景。
- 更好的硬件利用:能够更高效地利用现代 CPU 的 SIMD 指令和多核并行能力。
- 与科学计算库无缝集成:直接接收和返回 NumPy 数组或 Pandas DataFrame,便于使用这些库优化过的批量计算函数。
5.2 使用示例
-- 标准的单条处理函数
CREATE OR REPLACE FUNCTION process_text_single(text_input text)
RETURNS text
AS $$
import re
result = text_input.upper()
result = re.sub(r'\s+', '_', result)
return f"SINGLE_PROCESSED: {result}"
$$ LANGUAGE plpython3u;
-- 使用:每行都会单独调用一次函数
SELECT id, original_text, process_text_single(original_text) as processed
FROM (
VALUES
(1, 'hello world'),
(2, 'python function'),
(3, 'database processing')
) AS t(id, original_text);
-- 相同的函数签名,但内部会收到批量数据
CREATE OR REPLACE FUNCTION process_text_batch(text_input text)
RETURNS text
AS $$
import re
results = []
for item in text_input: # text_input 是 ['hello world', 'python function', 'database processing']
if item is not None:
processed = item.upper()
processed = re.sub(r'\s+', '_', processed)
results.append(f"BATCH_PROCESSED: {processed}")
else:
results.append(None)
return results # 返回结果列表
$$ LANGUAGE plpythonvec3u;
-- 使用方式完全相同!但内部是批量处理
SELECT id, original_text, process_text_batch(original_text) as processed
FROM (
VALUES
(1, 'hello world'),
(2, 'python function'),
(3, 'database processing')
) AS t(id, original_text);
6 内置 Python 库 UDF 使用示例
6.1 NumPy UDF 数组统计分析示例
CREATE OR REPLACE FUNCTION numpy_array_stats(arr float8[])
RETURNS JSONB
AS $$
import numpy as np
import json
if arr is None or len(arr) == 0:
return json.dumps({"error": "Empty array provided"})
np_array = np.array(arr)
stats = {
"mean": float(np.mean(np_array)),
"median": float(np.median(np_array)),
"std_dev": float(np.std(np_array)),
"variance": float(np.var(np_array)),
"min": float(np.min(np_array)),
"max": float(np.max(np_array)),
"sum": float(np.sum(np_array)),
"percentile_25": float(np.percentile(np_array, 25)),
"percentile_75": float(np.percentile(np_array, 75))
}
return json.dumps(stats)
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT numpy_array_stats(ARRAY[1.0, 2.5, 3.7, 4.1, 5.8, 6.2, 7.9]);
6.2 Pandas UDF 数据清洗和转换示例
CREATE OR REPLACE FUNCTION pandas_data_cleanup(data_json JSONB)
RETURNS JSONB
AS $$
import pandas as pd
import json
from datetime import datetime
# 将 JSON 数据转换为 DataFrame
data_dict = json.loads(data_json)
df = pd.DataFrame(data_dict)
# 数据清洗操作
# 1. 处理缺失值
df.fillna({'age': df['age'].mean(), 'salary': df['salary'].median()}, inplace=True)
# 2. 数据类型转换
df['join_date'] = pd.to_datetime(df['join_date'])
df['age'] = df['age'].astype(int)
# 3. 创建新特征
current_year = datetime.now().year
df['years_of_service'] = current_year - df['join_date'].dt.year
# 4. 数据过滤
df = df[df['salary'] > 30000]
# 5. 将 Timestamp 转换为字符串,以便 JSON 序列化
df['join_date'] = df['join_date'].dt.strftime('%Y-%m-%d')
# 6. 数据聚合
summary = {
'total_employees': len(df),
'average_salary': float(df['salary'].mean()),
'max_salary': float(df['salary'].max()),
'department_counts': df['department'].value_counts().to_dict(),
'cleaned_data': df.to_dict('records')
}
return json.dumps(summary)
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT pandas_data_cleanup('[
{"name": "John", "age": 30, "salary": 50000, "department": "IT", "join_date": "2018-05-15"},
{"name": "Jane", "age": null, "salary": 45000, "department": "HR", "join_date": "2020-02-20"},
{"name": "Bob", "age": 35, "salary": null, "department": "IT", "join_date": "2019-11-10"}
]'::jsonb);
6.3 Cryptography UDF 密码哈希验证示例
CREATE OR REPLACE FUNCTION hash_password(password TEXT)
RETURNS TEXT
AS $$
from passlib.hash import bcrypt
try:
# 检查密码是否为空
if password is None or password.strip() == '':
return NULL
# 使用 bcrypt 算法哈希密码
hashed_password = bcrypt.hash(password)
return hashed_password
except Exception:
# 发生异常时返回NULL
return NULL
$$ LANGUAGE plpython3u;
CREATE OR REPLACE FUNCTION verify_password(password TEXT, hashed_password TEXT)
RETURNS BOOLEAN
AS $$
from passlib.hash import bcrypt
try:
# 检查输入参数是否有效
if password is None or password.strip() == '':
return False
if hashed_password is None or hashed_password.strip() == '':
return False
# 验证哈希值格式(基本检查)
if not (hashed_password.startswith('$2b$') or
hashed_password.startswith('$2a$') or
hashed_password.startswith('$2y$')):
return False
# 验证密码是否匹配哈希值
return bcrypt.verify(password, hashed_password)
except Exception:
# 发生任何异常都返回false
return False
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT hash_password('your_key_123');
SELECT verify_password('your_key_123', '$2b$12$ah/---');
-- 测试异常场景
SELECT verify_password('', '$2b$12$ah---'); -- 空密码
SELECT verify_password('your_key_123', ''); -- 空哈希值
SELECT verify_password('your_key_123', 'invalid_hash_format'); -- 无效哈希格式
SELECT verify_password(NULL, '$2b$12$...'); -- NULL密码
SELECT verify_password('your_key_123', NULL); -- NULL哈希值
6.4 Requests 汇率查询示例
CREATE OR REPLACE FUNCTION get_exchange_rate(base_currency TEXT, target_currency TEXT)
RETURNS JSONB
AS $$
import requests
import json
# 使用免费的汇率API(示例)
url = f'https://api.exchangerate-api.com/v4/latest/{base_currency}'
try:
response = requests.get(url, timeout=5)
response.raise_for_status()
rates_data = response.json()
if target_currency in rates_data['rates']:
result = {
"base_currency": base_currency,
"target_currency": target_currency,
"exchange_rate": rates_data['rates'][target_currency],
"last_updated": rates_data['date']
}
return json.dumps(result)
else:
return json.dumps({"error": f"Currency {target_currency} not found"})
except requests.exceptions.RequestException as e:
return json.dumps({"error": f"Exchange rate request failed: {str(e)}"})
$$ LANGUAGE plpython3u;
-- 使用示例
SELECT get_exchange_rate('USD', 'EUR');
SELECT get_exchange_rate('EUR', 'JPY');
7 安全性与生产环境注意事项
- 不可信语言:
plpython3u
和plpythonvec3u
都是不可信语言,功能强大但风险较高。切勿将使用或创建这些函数的权限授予不受信任的用户。 - 模块安装:UDF 中使用的 Python 库(如
numpy
,pandas
)只能使用内置的 Python 库,不支持手动安装第三方库。 - 错误处理:在 Python 代码中使用
try-except
块来妥善处理潜在异常,避免整个查询因未捕获的异常而失败。 - 资源管理:复杂的 Python 函数可能会消耗大量 CPU 和内存资源。监控数据库资源使用情况,确保 UDF 不会影响数据库的整体稳定性。
- 代码质量:确保编写的 Python 代码健壮、高效,并进行充分测试,特别是处理边界情况(如空输入、NULL 值)。