Skip to main content

使用 UDF 查询外部数据

本章提供了一个示例,帮助您通过 RisingWave 查询存储在不同数据库中的数据。在这个示例中,我们使用 Python 查询一个 Postgres 数据库,其大致步骤也适用于其他语言和数据库。

要通过 UDF(User-defined Function,用户定义函数)查询外部数据,您需要为每个希望查询的表创建一个 UDF。

性能注意事项

查询外部数据效率不高,因为 RisingWave 将调用一个 UDF 服务器,该服务器本身需要与外部数据库建立连接。如果性能是您比较重视的因素,建议:

  1. 如果您的数据变化频繁,可以使用 CDC 从这个数据库中流式传输变化。

  2. 如果您的数据不经常变化,可以将整个数据库表加载到 RisingWave 中。

开始之前

1. UDF 设置

运行以下命令下载并安装 RisingWave UDF API 包及其依赖项。

pip install risingwave
无法运行此命令?
如果返回“command not found: pip”,请检查 pip 是否在您的环境中可用 请确保它是最新版本

创建一个包含 UDF 的 python 文件

如何创建?
这里有几种创建 Python 文件的方法。
这里我们以 VS Code 为例。
  1. 打开 VS Code,通过顶部菜单选择 文件,然后点击 新建文件

  2. 输入 udf.py 作为文件的名称和扩展名。

  3. 将下面的脚本复制并粘贴到新创建的文件中。

  4. 保存编辑。

这里我们以 Vim 文本编辑器为例。
  1. 打开一个终端窗口。

  2. 运行 vim udf.py 来创建并在 Vim 中打开该文件。

  3. I 键进入 Vim 的插入模式。

  4. 将下面的脚本复制并粘贴到编辑器中。

  5. Esc 键退出插入模式。

  6. 输入 :wq 保存文件并退出 Vim。

udf.py
from risingwave.udf import udf, udtf, UdfServer
import psycopg2

@udtf(input_types=['VARCHAR'], result_types=['INT', 'INT', 'VARCHAR'])
def select_people_table_pg(query):
connection = None
people_records = []
try:
connection = psycopg2.connect(user="postgres", # 在生产环境中建议使用只读用户
password="mysecretpassword", # 请不要在生产环境中硬编码您的密码!
host="127.0.0.1",
port="5432",
database="people")

cursor = connection.cursor()
postgres_select_query = query
cursor.execute(postgres_select_query)
print("Selecting rows from people table using cursor.fetchall")
people_records = cursor.fetchall()

# UDF 服务器将显示错误
except (Exception, psycopg2.Error) as error:
print("从 PostgreSQL 获取数据时出错", error)

finally:
# 关闭数据库连接
if connection is not None:
cursor.close()
connection.close()
print("PostgreSQL 连接已关闭")

# 返回所有结果元组
for row in people_records:
yield row


# 启动 UDF 服务器
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815")
server.add_function(select_people_table_pg)
server.serve()
查看代码解释

我们使用 udtf 装饰器来声明一个一次返回多个元组的 UDF。我们将查询字符串作为参数 query 传递给 select_people_table_pg。该查询在运行在 127.0.0.1:5432 的 postgres 服务器上执行。

创建 udf.py 后,启动 UDF 服务器。

  1. 在终端窗口中,导航到保存 udf.py 的目录。

  2. 运行以下命令以执行 udf.py

    python3 udf.py

2. Postgres 设置

为了让这个过程更有趣,我们将创建一些我们的 UDF 可以查询的数据。为此,我们将使用 docker 运行一个本地 Postgres 数据库。

  1. 在终端运行以下命令:

    docker run --rm -it -p 5432:5432 -e POSTGRES_PASSWORD=mysecretpassword postgres
  2. 在另一个终端窗口中连接到运行中的 postgres 服务器。

    psql "port=5432 host=localhost user=postgres sslmode=disable" 
  3. 提供密码 mysecretpassword

  4. 通过运行以下命令创建我们的示例数据。

    -- 创建数据库
    CREATE DATABASE people;

    -- 连接到数据库
    \c people;

    -- 创建表
    CREATE TABLE people (
    id SERIAL PRIMARY KEY,
    age INT,
    name VARCHAR(50)
    );

    -- 插入 10 行虚拟数据
    INSERT INTO people (age, name) VALUES
    (30, 'Neo'),
    (29, 'Trinity'),
    (44, 'Morpheus'),
    (NULL, 'Agent Smith'), -- 假设 AI 程序的年龄为 NULL
    (35, 'Cypher'),
    (28, 'Tank'),
    (30, 'Dozer'),
    (27, 'Switch'),
    (22, 'Mouse'),
    (NULL, 'The Oracle');

    -- 验证插入的数据
    SELECT * FROM people;
  5. 通过输入 exit 退出与 postgres 的连接。

3. 在 RisingWave 中声明您的函数

在 RisingWave 中,使用 CREATE FUNCTION 命令声明您定义的函数。

  1. 请连接本地 RisingWave 服务器,并输入:
CREATE FUNCTION select_people_table_pg(VARCHAR) RETURNS TABLE (id int, age int, name varchar) LANGUAGE python AS select_people_table_pg USING LINK 'http://localhost:8815'; -- 如果您使用 Docker 运行 RisingWave,请将地址替换为 'http://host.docker.internal:8815'。

4. 通过 RisingWave 查询 Potsgres 中的外部数据

一旦在 RisingWave 中创建了 UDF,您就可以在 SQL 查询中使用它们来查询 people 表中的外部数据,就像它们存储在 RisingWave 中一样。

select * from select_people_table_pg('SELECT * FROM people WHERE age > 25;'::VARCHAR);
---返回结果
id | age | name
----+-----+----------
1 | 30 | Neo
2 | 29 | Trinity
3 | 44 | Morpheus
5 | 35 | Cypher
6 | 28 | Tank
7 | 30 | Dozer
8 | 27 | Switch
(7 rows)

-- 下面的查询等同于上面的这个
-- 但是 WHERE 子句是在 RisingWave 中应用的,而不是直接在 postgres 中过滤
-- 因此性能比上面的差
select * from select_people_table_pg('SELECT * FROM people;'::VARCHAR) WHERE age > 25;

请注意,如果在评估 UDF 时发生错误,结果将为空。错误消息可以在 Compute 节点的日志和 UDF 服务器中找到,例如:

select * from select_people_table_pg('an invalid query;'::VARCHAR);
---
id | age | name
----+-----+----------
(0 rows)