Skip to main content

在 Java 中使用 UDF

本章将帮助您:安装 RisingWave Java UDF SDK、使用 Java 定义函数、启动 Java 进程作为 UDF 服务器、在 RisingWave 中声明和使用 UDF(User-defined Function,用户定义函数)。

开始之前

  • 请确保您的计算机上已安装 Java 开发者工具包 (JDK)(11 或更高版本)。

  • 请确保您的计算机上已安装 Apache Maven(3.0 或更高版本)。Maven 是一个构建工具,帮助管理 Java 项目和依赖关系。

1. 从模板创建 Maven 项目

RisingWave Java UDF SDK 以 Maven 构件的形式进行分发。我们准备了一个示例项目,因此您不必从零开始创建。请运行以下命令克隆模板仓库。

git clone https://github.com/risingwavelabs/risingwave-java-udf-template.git
我想从头开始

要使用 RisingWave Java UDF SDK 创建一个新项目,请遵循以下步骤:

生成一个新的 Maven 项目:

mvn archetype:generate -DgroupId=com.example -DartifactId=udf-example -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false

配置您的 pom.xml 文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>udf-example</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-udf</artifactId>
<version>0.1.1</version>
</dependency>
</dependencies>
</project>

运行 Maven 单元测试时,必须添加 --add-opens 标志:

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
<configuration>
<argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>

2. 在 Java 中定义您的函数

标量函数

用户自定义的标量函数会将零个、一个或多个标量值映射到一个新的标量值。

为了定义一个标量函数,您必须创建一个新类,该类实现 com.risingwave.functions 中的 ScalarFunction 接口,并且实现一个名为 eval(...) 的评估方法。这个方法必须声明为公开且非静态。

我们支持的数据类型 列表中的任何数据类型都可以作为评估方法的参数或返回类型。

以下是一个计算两个整数的最大公约数 (GCD) 的标量函数示例:

import com.risingwave.functions.ScalarFunction;

public class Gcd implements ScalarFunction {
public int eval(int a, int b) {
while (b != 0) {
int temp = b;
b = a % b;
a = temp;
}
return a;
}
}
与 Flink 的区别
  • ScalarFunction 是一个接口,而不是一个抽象类。
  • 不支持多个重载的 eval 方法。
  • 不支持如 eval(Integer...) 这样的可变参数。

表函数

用户自定义的表函数会将零个、一个或多个标量值映射到一行或多行(结构化类型)。

为了定义一个表函数,您必须创建一个新类,该类实现 com.risingwave.functions 中的 TableFunction 接口,并且实现一个名为 eval(...) 的评估方法。这个方法必须声明为公开且非静态。

返回类型必须是我们支持的数据类型 列表中任意数据类型的 Iterator

类似于标量函数,输入和输出数据类型使用反射自动提取。这包括返回值的泛型参数 T,用于确定输出数据类型。

以下是一个生成一系列整数的表函数示例:

import com.risingwave.functions.TableFunction;

public class Series implements TableFunction {
public Iterator<Integer> eval(int n) {
return java.util.stream.IntStream.range(0, n).iterator();
}
}
与 Flink 的区别
  • TableFunction 是一个接口而不是一个抽象类。它没有泛型参数。
  • 不是调用 collect 来发出行,而是 eval 方法返回输出行的 Iterator
  • 不支持多个重载的 eval 方法。
  • 不支持如 eval(Integer...) 这样的可变参数。
  • 在 SQL 中,表函数可以直接在 FROM 子句中使用。不支持 JOIN LATERAL TABLE

3. 启动 UDF 服务器

请运行以下命令,创建一个 UDF 服务器,并为您定义的函数注册。

import com.risingwave.functions.UdfServer;

public class App {
public static void main(String[] args) {
try (var server = new UdfServer("0.0.0.0", 8815)) {
// 注册函数
server.addFunction("gcd", new Gcd());
server.addFunction("series", new Series());
// 启动服务器
server.start();
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}

再运行以下命令启动 UDF 服务器。

_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="com.example.App"

现在,UDF 服务器将开始运行,允许您从 RisingWave 调用定义的 UDF。

4. 在 RisingWave 中声明您的函数

在 RisingWave 中,使用 CREATE FUNCTION 命令来声明您定义的函数。

以下是用于声明在 步骤 3 中定义的两个 UDF 的 SQL 语句。

CREATE FUNCTION gcd(int, int) RETURNS int  
AS gcd
USING LINK 'http://localhost:8815';

CREATE FUNCTION series(int) RETURNS TABLE (x int)
AS series
USING LINK 'http://localhost:8815';

5. 在 RisingWave 中使用您的函数

一旦在 RisingWave 中声明了 UDF,您就可以像使用任何内置函数一样在 SQL 查询中使用它们。例如:

SELECT gcd(25, 15);
SELECT * FROM series(10);

支持的数据类型

RisingWave Java UDF SDK 支持以下数据类型:

SQL 类型Java 类型注释
BOOLEANboolean, Boolean
SMALLINTshort, Short
INTint, Integer
BIGINTlong, Long
REALfloat, Float
DOUBLE PRECISIONdouble, Double
DECIMALBigDecimal
DATEjava.time.LocalDate
TIMEjava.time.LocalTime
TIMESTAMPjava.time.LocalDateTime
INTERVALcom.risingwave.functions.PeriodDuration
VARCHARString
BYTEAbyte[]
JSONBString使用 @DataTypeHint("JSONB") String 作为类型。参见 示例
T[]T'[]T 可以是上述任何一种 SQL 类型。T' 应该是相应的 Java 类型。
STRUCT<>用户自定义的类定义一个数据类作为类型。参见 示例

示例 - JSONB

在 Java 中定义函数
import com.google.gson.Gson;

// 返回 JSON 数组的第 i 个元素。
public class JsonbAccess implements ScalarFunction {
static Gson gson = new Gson();

public @DataTypeHint("JSONB") String eval(@DataTypeHint("JSONB") String json, int index) {
if (json == null)
return null;
var array = gson.fromJson(json, Object[].class);
if (index >= array.length || index < 0)
return null;
var obj = array[index];
return gson.toJson(obj);
}
}
在 RisingWave 中创建函数
CREATE FUNCTION jsonb_access(jsonb, int) RETURNS jsonb
AS jsonb_access USING link 'http://localhost:8815';

示例 - Struct 类型

在 Java 中定义函数
// 将 socket 地址分割为主机和端口。
public static class IpPort implements ScalarFunction {
public static class SocketAddr {
public String host;
public short port;
}

public SocketAddr eval(String addr) {
var socketAddr = new SocketAddr();
var parts = addr.split(":");
socketAddr.host = parts[0];
socketAddr.port = Short.parseShort(parts[1]);
return socketAddr;
}
}
在 RisingWave 中声明函数
CREATE FUNCTION ip_port(varchar) RETURNS struct<host varchar, port smallint>
AS ip_port USING link 'http://localhost:8815';