在 Java 中使用 UDF
本章将帮助您:安装 RisingWave Java UDF SDK、使用 Java 定义函数、启动 Java 进程作为 UDF 服务器、在 RisingWave 中声明和使用 UDF(User-defined Function,用户定义函数)。
开始之前
请确保您的计算机上已安装 Java 开发者工具包 (JDK)(11 或更高版本)。
请确保您的计算机上已安装 Apache Maven(3.0 或更高版本)。Maven 是一个构建工具,帮助管理 Java 项目和依赖关系。
1. 从模板创建 Maven 项目
RisingWave Java UDF SDK 以 Maven 构件的形式进行分发。我们准备了一个示例项目,因此您不必从零开始创建。请运行以下命令克隆模板仓库。
git clone https://github.com/risingwavelabs/risingwave-java-udf-template.git
我想从头开始
要使用 RisingWave Java UDF SDK 创建一个新项目,请遵循以下步骤:
生成一个新的 Maven 项目:
mvn archetype:generate -DgroupId=com.example -DartifactId=udf-example -DarchetypeArtifactId=maven-archetype-quickstart -DarchetypeVersion=1.4 -DinteractiveMode=false
配置您的 pom.xml
文件:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>udf-example</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-udf</artifactId>
<version>0.1.1</version>
</dependency>
</dependencies>
</project>
运行 Maven 单元测试时,必须添加 --add-opens
标志:
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.0.0-M7</version>
<configuration>
<argLine>--add-opens=java.base/java.nio=ALL-UNNAMED</argLine>
</configuration>
</plugin>
</plugins>
</build>
2. 在 Java 中定义您的函数
标量函数
用户自定义的标量函数会将零个、一个或多个标量值映射到一个新的标量值。
为了定义一个标量函数,您必须创建一个新类,该类实现 com.risingwave.functions
中的 ScalarFunction
接口,并且实现一个名为 eval(...)
的评估方法。这个方法必须声明为公开且非静态。
我们支持的数据类型 列表中的任何数据类型都可以作为评估方法的参数或返回类型。
以下是一个计算两个整数的最大公约数 (GCD) 的标量函数示例:
import com.risingwave.functions.ScalarFunction;
public class Gcd implements ScalarFunction {
public int eval(int a, int b) {
while (b != 0) {
int temp = b;
b = a % b;
a = temp;
}
return a;
}
}
ScalarFunction
是一个接口,而不是一个抽象类。- 不支持多个重载的
eval
方法。 - 不支持如
eval(Integer...)
这样的可变参数。
表函数
用户自定义的表函数会将零个、一个或多个标量值映射到一行或多行(结构化类型)。
为了定义一个表函数,您必须创建一个新类,该类实现 com.risingwave.functions
中的 TableFunction
接口,并且实现一个名为 eval(...)
的评估方法。这个方法必须声明为公开且非静态。
返回类型必须是我们支持的数据类型 列表中任意数据类型的 Iterator
。
类似于标量函数,输入和输出数据类型使用反射自动提取。这包括返回值的泛型参数 T,用于确定输出数据类型。
以下是一个生成一系列整数的表函数示例:
import com.risingwave.functions.TableFunction;
public class Series implements TableFunction {
public Iterator<Integer> eval(int n) {
return java.util.stream.IntStream.range(0, n).iterator();
}
}
TableFunction
是一个接口而不是一个抽象类。它没有泛型参数。- 不是调用
collect
来发出行,而是eval
方法返回输出行的Iterator
。 - 不支持多个重载的
eval
方法。 - 不支持如
eval(Integer...)
这样的可变参数。 - 在 SQL 中,表函数可以直接在
FROM
子句中使用。不支持JOIN LATERAL TABLE
。
3. 启动 UDF 服务器
请运行以下命令,创建一个 UDF 服务器,并为您定义的函数注册。
import com.risingwave.functions.UdfServer;
public class App {
public static void main(String[] args) {
try (var server = new UdfServer("0.0.0.0", 8815)) {
// 注册函数
server.addFunction("gcd", new Gcd());
server.addFunction("series", new Series());
// 启动服务器
server.start();
server.awaitTermination();
} catch (Exception e) {
e.printStackTrace();
}
}
}
再运行以下命令启动 UDF 服务器。
_JAVA_OPTIONS="--add-opens=java.base/java.nio=ALL-UNNAMED" mvn exec:java -Dexec.mainClass="com.example.App"
现在,UDF 服务器将开始运行,允许您从 RisingWave 调用定义的 UDF。
4. 在 RisingWave 中声明您的函数
在 RisingWave 中,使用 CREATE FUNCTION
命令来声明您定义的函数。
以下是用于声明在 步骤 3 中定义的两个 UDF 的 SQL 语句。
CREATE FUNCTION gcd(int, int) RETURNS int
AS gcd
USING LINK 'http://localhost:8815';
CREATE FUNCTION series(int) RETURNS TABLE (x int)
AS series
USING LINK 'http://localhost:8815';
5. 在 RisingWave 中使用您的函数
一旦在 RisingWave 中声明了 UDF,您就可以像使用任何内置函数一样在 SQL 查询中使用它们。例如:
SELECT gcd(25, 15);
SELECT * FROM series(10);
支持的数据类型
RisingWave Java UDF SDK 支持以下数据类型:
SQL 类型 | Java 类型 | 注释 |
---|---|---|
BOOLEAN | boolean, Boolean | |
SMALLINT | short, Short | |
INT | int, Integer | |
BIGINT | long, Long | |
REAL | float, Float | |
DOUBLE PRECISION | double, Double | |
DECIMAL | BigDecimal | |
DATE | java.time.LocalDate | |
TIME | java.time.LocalTime | |
TIMESTAMP | java.time.LocalDateTime | |
INTERVAL | com.risingwave.functions.PeriodDuration | |
VARCHAR | String | |
BYTEA | byte[] | |
JSONB | String | 使用 @DataTypeHint("JSONB") String 作为类型。参见 示例。 |
T[] | T'[] | T 可以是上述任何一种 SQL 类型。T' 应该是相应的 Java 类型。 |
STRUCT<> | 用户自定义的类 | 定义一个数据类作为类型。参见 示例。 |
示例 - JSONB
import com.google.gson.Gson;
// 返回 JSON 数组的第 i 个元素。
public class JsonbAccess implements ScalarFunction {
static Gson gson = new Gson();
public @DataTypeHint("JSONB") String eval(@DataTypeHint("JSONB") String json, int index) {
if (json == null)
return null;
var array = gson.fromJson(json, Object[].class);
if (index >= array.length || index < 0)
return null;
var obj = array[index];
return gson.toJson(obj);
}
}
CREATE FUNCTION jsonb_access(jsonb, int) RETURNS jsonb
AS jsonb_access USING link 'http://localhost:8815';
示例 - Struct 类型
// 将 socket 地址分割为主机和端口。
public static class IpPort implements ScalarFunction {
public static class SocketAddr {
public String host;
public short port;
}
public SocketAddr eval(String addr) {
var socketAddr = new SocketAddr();
var parts = addr.split(":");
socketAddr.host = parts[0];
socketAddr.port = Short.parseShort(parts[1]);
return socketAddr;
}
}
CREATE FUNCTION ip_port(varchar) RETURNS struct<host varchar, port smallint>
AS ip_port USING link 'http://localhost:8815';