Skip to main content

在 Python 中使用 UDF

本章将帮助您完成以下步骤:安装 RisingWave UDF API、在 Python 文件中定义函数、启动 UDF 服务器、在 RisingWave 中声明和使用 UDF(User-defined Function,用户定义函数)。

开始之前

1. 安装 Python 版的 RisingWave UDF API

运行以下命令下载并安装 RisingWave UDF API 包及其依赖。

pip install risingwave
无法运行此命令?
如果返回 "command not found: pip",请[检查 pip 是否在您的环境中可用](https://packaging.python.org/en/latest/tutorials/installing-packages/#ensure-you-can-run-pip-from-the-command-line)并[确保它是最新的](https://packaging.python.org/en/latest/tutorials/installing-packages/#ensure-pip-setuptools-and-wheel-are-up-to-date)。

2. 在 Python 文件中定义您的函数

为了更好地演示这一步,我们准备了一个示例脚本供您尝试。请创建一个名为 udf.py 的 Python 文件,并插入以下脚本。

如何操作?
这里有几种创建 Python 文件的方法。
这里我们以 VS Code 为例。
  1. 打开 VS Code,通过选择顶部菜单中的 文件(File) 并点击 新建文件(New File) 来创建一个新文件。

  2. 输入 udf.py 作为文件的名称和扩展名。

  3. 将下面的脚本复制并粘贴到新创建的文件中。

  4. 保存编辑。

udf.py
# 从 risingwave.udf 模块导入组件
from risingwave.udf import udf, udtf, UdfServer
import struct
import socket

# 定义一个返回单个值的标量函数
@udf(input_types=['INT', 'INT'], result_type='INT')
def gcd(x, y):
while y != 0:
(x, y) = (y, x % y)
return x

# 定义一个返回多个值(在一个 struct 中)的标量函数
@udf(input_types=['BYTEA'], result_type='STRUCT<VARCHAR, VARCHAR, SMALLINT, SMALLINT>')
def extract_tcp_info(tcp_packet: bytes):
src_addr, dst_addr = struct.unpack('!4s4s', tcp_packet[12:20])
src_port, dst_port = struct.unpack('!HH', tcp_packet[20:24])
src_addr = socket.inet_ntoa(src_addr)
dst_addr = socket.inet_ntoa(dst_addr)
return src_addr, dst_addr, src_port, dst_port

# 定义一个表函数
@udtf(input_types='INT', result_types='INT')
def series(n):
for i in range(n):
yield i

# 启动 UDF 服务器
if __name__ == '__main__':
server = UdfServer(location="0.0.0.0:8815") # 您可以使用系统中任何可用的端口。这里我们使用端口 8815。
server.add_function(gcd)
server.add_function(extract_tcp_info)
server.add_function(series)
server.serve()
查看代码解释

脚本首先导入了 struct模块、socket 模块,以及 risingwave.udf 模块的三个组件——udfudtfUdfServer

udfudtf 是用于定义标量函数和表函数的装饰器。

代码定义了两个标量函数和一个表函数:

  • 标量函数 gcd,使用 @udf 装饰,接受两个整数输入,并返回这两个整数的最大公约数。

  • 标量函数 extract_tcp_info,使用 @udf 装饰,接受一个二进制输入,并返回一个结构化输出。

    该函数接受一个类型为 bytes 的参数 tcp_packet,使用 struct 模块从 tcp_packet 中解包源地址和目的地址以及端口号,然后使用 socket.inet_ntoa 将二进制 IP 地址转换为字符串。

    函数返回一个包含源 IP 地址、目的 IP 地址、源端口号和目的端口号的元组,所有这些都转换为相应的类型。返回类型使用 result_type 参数指定为一个包含四个字段的 struct 。

  • 表函数 series,由 @udtf 装饰,接受一个整数输入并生成从 0 到 n-1 的一系列整数。

最后,脚本使用 UdfServer 启动了一个 UDF 服务器,并在本机的 8815 端口监听传入请求。然后,它将 gcdextract_tcp_infoseries 函数添加到服务器,并使用 serve() 方法启动服务器。if __name__ == '__main__': 条件用于确保只有在直接运行脚本时才启动服务器,而不是作为模块导入时。

info

我们会持续往 udf.py 中添加新的示例函数,如 JSONB 函数。要查看最新内容,请参见源文件

一些示例函数仍在测试中,可能还没有完全实现功能或彻底优化。

3. 启动 UDF 服务器

  1. 在终端窗口中,导航到保存 udf.py 的目录。

  2. 运行以下命令来执行 udf.py

    python3 udf.py

UDF 服务器将开始运行,允许您从 RisingWave 调用已定义的 UDF。

4. 在 RisingWave 中声明您的函数

在 RisingWave 中,使用 CREATE FUNCTION 命令来声明您定义的函数。

这里是声明 第 2 步 中定义的三个 UDF 的 SQL 语句。

CREATE FUNCTION gcd(int, int) RETURNS int
LANGUAGE python AS gcd USING LINK 'http://localhost:8815'; -- 如果您正在使用 Docker 运行 RisingWave,请将地址替换为 'http://host.docker.internal:8815'。

CREATE FUNCTION extract_tcp_info(bytea)
RETURNS struct<src_ip varchar, dst_ip varchar, src_port smallint, dst_port smallint>
LANGUAGE python AS extract_tcp_info USING LINK 'http://localhost:8815'; -- 如果您正在使用 Docker 运行 RisingWave,请将地址替换为 'http://host.docker.internal:8815'。

CREATE FUNCTION series(int) RETURNS TABLE (x int)
LANGUAGE python AS series USING LINK 'http://localhost:8815'; -- 如果您正在使用 Docker 运行 RisingWave,请将地址替换为 'http://host.docker.internal:8815'。

5. 在 RisingWave 中使用您的函数

一旦在 RisingWave 中创建了 UDF,您就可以像使用任何内置函数一样在 SQL 查询中使用它们。

示例

SELECT gcd(25, 15);
---
5

SELECT extract_tcp_info(E'\\x45000034a8a8400040065b8ac0a8000ec0a80001035d20b6d971b900000000080020200493310000020405b4' :: bytea);
---
(192.168.0.14,192.168.0.1,861,8374)

SELECT * FROM series(10);
---
0
1
2
3
4
5
6
7
8
9

6. 扩展 UDF 服务器

由于 Python 解释器的 全局解释器锁(GIL) 的限制,UDF 服务器在处理请求时只能使用单个 CPU 核心。如果您发现 UDF 服务器的吞吐量不足,请考虑扩展 UDF 服务器。

info

如何确定 UDF 服务器需要扩展?

您可以使用 top 等工具监控 UDF 服务器的 CPU 使用情况。如果 CPU 使用率接近 100%,这表明 UDF 服务器的 CPU 资源不足,需要扩展。

要扩展 UDF 服务器,您可以在不同的端口上启动多个 UDF 服务器,并使用负载均衡器在这些服务器之间分配请求。

具体代码如下:

udf.py
from multiprocessing import Pool

def start_server(port: int):
"""在指定端口上启动 UDF 服务器。"""
server = UdfServer(location=f"localhost:{port}")
# 添加函数 ...
server.serve()

if __name__ == "__main__":
"""在不同的端口上启动多个服务器。"""
n = 4
with Pool(n) as p:
p.map(start_server, range(8816, 8816 + n))

然后,您可以启动一个负载均衡器,例如 Nginx。它监听端口 8815 并将请求转发到端口 8816-8819 上的 UDF 服务器。