从 RisingWave 导出数据到 Apache Iceberg
本文介绍了如何使用 RisingWave 中的 Iceberg Sink 连接器将数据导出到 Apache Iceberg。Apache Iceberg 是一种旨在支持大型表的格式。更多信息,请参见 Apache Iceberg。
RisingWave 中的 Iceberg Sink 连接器目前处于测试版。如果您遇到任何问题或有反馈,请与我们联系。
开始之前
确保您已经创建了目标 Iceberg 表。 有关创建表和设置 Iceberg 的更多指导,请参见此 快速上手指南。
确保您已创建可以从中导出数据的上游物化视图或 Source。
句法
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='iceberg',
connector_parameter = 'value', ...
);
参数
参数名称 | 描述 |
---|---|
type | 必填。允许的值:append-only 和 upsert 。 |
force_append_only | 可选。如果为 true ,即使无法做到,也强制 Sink 为 append-only 。 |
s3.endpoint | 可选。S3 的端点。 |
s3.region | 可选。S3 存储桶所在的区域。必须指定 s3.endpoint 或 s3.region 其中之一。 |
s3.access.key | 必填。与 S3 兼容的对象存储的访问密钥。 |
s3.secret.key | 必填。与 S3 兼容的对象存储的密钥。 |
database.name | 必填。目标 Iceberg 表所属的数据库。 |
table.name | 必填。目标 Iceberg 表的名称。 |
catalog.type | 可选。此表使用的目录类型。目前支持的值为 storage 和 rest 。如果未指定,则使用 storage 。详情见 目录。 |
warehouse.path | 有条件必填。Iceberg 仓库的路径。目前仅支持与 S3 兼容的对象存储系统,如 AWS S3 和 MinIO。如果 catalog.type 为 storage ,则此项为必填项。 |
catalog.uri | 有条件必填。目录的 URL。当 catalog.type 为 rest 时,此项为必填项。 |
primary_key | Upsert Sink 的主键。仅适用于 Upsert 模式。 |
数据类型映射
RisingWave 将根据以下数据类型映射表与 Iceberg 转换数据类型:
RisingWave 类型 | Iceberg 类型 |
---|---|
boolean | boolean |
int | integer |
bigint | long |
real | float |
double | double |
varchar | string |
date | date |
timestamptz | timestamptz |
timestamp | timestamp |
目录
Iceberg 支持两种类型的目录:
- 存储目录:存储目录在底层文件系统(如 Hadoop 或 S3)中存储所有元数据。目前,我们仅支持 S3 作为底层文件系统。
- REST 目录:RisingWave 支持 REST 目录,它充当 Hive、JDBC 和 Nessie 目录等其他目录的代理。这是与 Iceberg 表一起使用 RisingWave 的推荐方法。
Iceberg 表格式
目前,RisingWave 仅支持 v2 格式的 Iceberg 表。
示例
本节包括一些示例,如果您想快速尝试将数据导出到 Iceberg,可以使用这些示例。
创建 Iceberg 表(如果还没有的话)
例如,以下 spark-sql
命令在 AWS S3 中创建了一个名为 table
的 Iceberg 表,位于 dev
数据库下。表位于名为 my-iceberg-bucket
的 S3 存储桶中,区域为 ap-southeast-1
,路径为 path/to/warehouse
。表具有 format-version=2
属性,因此支持 upsert 选项。应存在一个名为 s3://my-iceberg-bucket/path/to/warehouse/dev/table/metadata
的文件夹。
请注意,仅支持与 S3 兼容的对象存储,如 AWS S3 或 MinIO。
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.3.1,org.apache.hadoop:hadoop-aws:3.3.2\
--conf spark.sql.catalog.demo=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.demo.type=hadoop \
--conf spark.sql.catalog.demo.warehouse=s3a://my-iceberg-bucket/path/to/warehouse \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.path.style.access=true \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.access.key=${ACCESS_KEY} \
--conf spark.sql.catalog.demo.hadoop.fs.s3a.secret.key=${SECRET_KEY} \
--conf spark.sql.defaultCatalog=demo \
--e "drop table if exists demo.dev.`table`;
CREATE TABLE demo.dev.`table`
(
seq_id bigint,
user_id bigint,
user_name string
) TBLPROPERTIES ('format-version'='2')";
创建上游物化视图或 Source
以下查询创建了一个 Append-only 的 Source。有关创建 Source 的更多详情,请参见 CREATE SOURCE
。
CREATE SOURCE s1_source (
seq_id bigint,
user_id bigint,
user_name varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '10000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '20000'
) FORMAT PLAIN ENCODE JSON;
另一个选项是创建一个支持原地更新的 Upsert 表。有关创建表的更多详情,请参见 CREATE TABLE
。
CREATE TABLE s1_table (
seq_id bigint,
user_id bigint,
user_name varchar)
WITH (
connector = 'datagen',
fields.seq_id.kind = 'sequence',
fields.seq_id.start = '1',
fields.seq_id.end = '10000000',
fields.user_id.kind = 'random',
fields.user_id.min = '1',
fields.user_id.max = '10000000',
fields.user_name.kind = 'random',
fields.user_name.length = '10',
datagen.rows.per.second = '20000'
) FORMAT PLAIN ENCODE JSON;
从 Append-only Source 创建 Append-only Sink
如果您有一个 Append-only 的 Source 并希望创建一个 Append-only 的 Sink,请在 CREATE SINK
SQL 查询中设置 type = append-only
。
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'iceberg',
type = 'append-only',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table'
);
从 Upsert Source 创建 Append-only Sink
如果您有一个 Upsert Source 并希望创建一个仅追加的 Sink,请设置 type = append-only
和 force_append_only = true
。这将忽略上游的删除消息,并将上游更新消息转换为插入消息。
CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'iceberg',
type = 'append-only',
force_append_only = 'true',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table'
);
从 Upsert Source 创建 Upsert Sink
在 RisingWave 中,您可以直接将数据作为 upserts 导入到 Iceberg 表中。
CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'iceberg',
warehouse.path = 's3a://my-iceberg-bucket/path/to/warehouse',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}',
database.name='dev',
table.name='table',
primary_key='seq_id'
);