Skip to main content

从 RisingWave 导出数据到 Apache Iceberg

本文介绍了如何使用 RisingWave 中的 Iceberg Sink 连接器将数据导出到 Apache Iceberg。Apache Iceberg 是一种旨在支持大型表的格式。更多信息,请参见 Apache Iceberg

测试版功能

RisingWave 中的 Iceberg Sink 连接器目前处于测试版。如果您遇到任何问题或有反馈,请与我们联系。

开始之前

  • 确保您已经创建了目标 Iceberg 表。 有关创建表和设置 Iceberg 的更多指导,请参见此 快速上手指南

  • 确保您已创建可以从中导出数据的上游物化视图或 Source。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='iceberg',
connector_parameter = 'value', ...
);

参数

参数名称描述
type必填。允许的值:append-onlyupsert
force_append_only可选。如果为 true,即使无法做到,也强制 Sink 为 append-only
s3.endpoint可选。S3 的端点。
s3.region可选。S3 存储桶所在的区域。必须指定 s3.endpoints3.region 其中之一。
s3.access.key必填。与 S3 兼容的对象存储的访问密钥。
s3.secret.key必填。与 S3 兼容的对象存储的密钥。
database.name必填。目标 Iceberg 表所属的数据库。
table.name必填。目标 Iceberg 表的名称。
catalog.type可选。此表使用的目录类型。目前支持的值为 storagerest。如果未指定,则使用 storage。详情见 目录
warehouse.path有条件必填。Iceberg 仓库的路径。目前仅支持与 S3 兼容的对象存储系统,如 AWS S3 和 MinIO。如果 catalog.typestorage,则此项为必填项。
catalog.uri有条件必填。目录的 URL。当 catalog.typerest 时,此项为必填项。
primary_keyUpsert Sink 的主键。仅适用于 Upsert 模式。

数据类型映射

RisingWave 将根据以下数据类型映射表与 Iceberg 转换数据类型:

RisingWave 类型Iceberg 类型
booleanboolean
intinteger
bigintlong
realfloat
doubledouble
varcharstring
datedate
timestamptztimestamptz
timestamptimestamp

目录

Iceberg 支持两种类型的目录:

  • 存储目录:存储目录在底层文件系统(如 Hadoop 或 S3)中存储所有元数据。目前,我们仅支持 S3 作为底层文件系统。
  • REST 目录:RisingWave 支持 REST 目录,它充当 Hive、JDBC 和 Nessie 目录等其他目录的代理。这是与 Iceberg 表一起使用 RisingWave 的推荐方法。

Iceberg 表格式

目前,RisingWave 仅支持 v2 格式的 Iceberg 表。

示例

本节包括一些示例,如果您想快速尝试将数据导出到 Iceberg,可以使用这些示例。

创建 Iceberg 表(如果还没有的话)

例如,以下 spark-sql 命令在 AWS S3 中创建了一个名为 table 的 Iceberg 表,位于 dev 数据库下。表位于名为 my-iceberg-bucket 的 S3 存储桶中,区域为 ap-southeast-1,路径为 path/to/warehouse。表具有 format-version=2 属性,因此支持 upsert 选项。应存在一个名为 s3://my-iceberg-bucket/path/to/warehouse/dev/table/metadata 的文件夹。

请注意,仅支持与 S3 兼容的对象存储,如 AWS S3 或 MinIO。

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1,org.apache.hadoop:hadoop-aws:3.3.2\
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://my-iceberg-bucket/path/to/warehouse \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=${ACCESS_KEY} \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=${SECRET_KEY} \
--conf spark.sql.defaultCatalog=demo \
--e "drop table if exists demo.dev.`table`;

CREATE TABLE demo.dev.`table`
(
seq_id bigint,
user_id bigint,
user_name string
) TBLPROPERTIES ('format-version'='2')";

创建上游物化视图或 Source

以下查询创建了一个 Append-only 的 Source。有关创建 Source 的更多详情,请参见 CREATE SOURCE

CREATE SOURCE s1_source (
seq_id bigint,
user_id bigint,
user_name varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '10000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '20000'
) FORMAT PLAIN ENCODE JSON;

另一个选项是创建一个支持原地更新的 Upsert 表。有关创建表的更多详情,请参见 CREATE TABLE

CREATE TABLE s1_table (
seq_id bigint,
user_id bigint,
user_name varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '10000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '20000'
) FORMAT PLAIN ENCODE JSON;

从 Append-only Source 创建 Append-only Sink

如果您有一个 Append-only 的 Source 并希望创建一个 Append-only 的 Sink,请在 CREATE SINK SQL 查询中设置 type = append-only

CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table'
);

从 Upsert Source 创建 Append-only Sink

如果您有一个 Upsert Source 并希望创建一个仅追加的 Sink,请设置 type = append-onlyforce_append_only = true。这将忽略上游的删除消息,并将上游更新消息转换为插入消息。

CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table'
);

从 Upsert Source 创建 Upsert Sink

在 RisingWave 中,您可以直接将数据作为 upserts 导入到 Iceberg 表中。

CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'iceberg',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table',
primary_key='seq_id'
);