Skip to main content

从 RisingWave 导出数据到 Delta Lake

本指南介绍如何将数据从 RisingWave 导出到 Delta Lake。Delta Lake 是一个开源存储框架,旨在允许您使用其他计算引擎构建 Lakehouse 架构。有关更多信息,请参见 Delta Lake

开始之前

  • 确保您已经有一个可以导出的 Delta Lake 表。有关创建表和设置 Delta Lake 的更多指南,请参阅此 快速上手指南

  • 确保您有一个可以从中导出数据的上游物化视图或 Source。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='deltalake',
connector_parameter = 'value', ...
);

参数

参数名称描述
type必填。目前仅支持 append-only(追加写入)。
location必填。Delta Lake 表读取数据的文件路径,在创建 Delta Lake 表时指定。
s3.endpoint必填。S3 的端点。
s3.access.key必填。与 S3 兼容的对象存储的访问密钥。
s3.secret.key必填。与 S3 兼容的对象存储的密钥。

示例

以下是如何将数据从 RisingWave 导出到 Delta Lake 的逐步示例。

创建 Delta Lake 表

spark-sql Shell 中,创建一个 Delta 表。有关更多信息,请参见 Delta Lake 快速入门指南

例如,以下 spark-sql 命令在 AWS S3 中创建一个 Delta Lake 表。该表位于名为 my-delta-lake-bucket 的 S3 存储桶中,区域为 ap-southeast-1,路径为 path/to/table。在运行以下命令创建 Delta Lake 表之前,请创建一个空的目录 path/to/table。表位置的完整 URL 是 s3://my-delta-lake-bucket/path/to/table

请注意,仅支持与 S3 兼容的对象存储,例如 AWS S3 或 MinIO。

spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
--conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
--conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--e "create table delta.\`s3a://my-delta-lake-bucket/path/to/table\`(id int, name string) using delta"

创建上游物化视图或 Source

以下查询使用内置的负载生成器创建 Source,该生成器会生成模拟数据。有关更多详细信息,请参阅 CREATE SOURCE生成测试数据。如果需要,您可以使用额外的 SQL 查询来转换数据。

CREATE SOURCE s1_source (id int, name varchar)
WITH (
connector = 'datagen',
fields.id.kind = 'sequence',
fields.id.start = '1',
fields.id.end = '10000',
fields.name.kind = 'random',
fields.name.length = '10',
datagen.rows.per.second = '200'
) FORMAT PLAIN ENCODE JSON;

您也可以选择创建一个支持就地更新的 Upsert 表。有关创建表的更多详细信息,请参阅 CREATE TABLE

CREATE TABLE s1_table (id int, name varchar)
WITH (
connector = 'datagen',
fields.id.kind = 'sequence',
fields.id.start = '1',
fields.id.end = '10000',
fields.name.kind = 'random',
fields.name.length = '10',
datagen.rows.per.second = '200'
) FORMAT UPSERT ENCODE JSON;

创建 Sink

从 Append-only Source 创建 Append-only Sink

如果您有一个 Append-only Source 并想创建一个 Append-only Sink,请在 CREATE SINK 查询中设置 type = append-only

CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'deltalake',
type = 'append-only',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}'
);

从 Upsert 表创建 Append-only Sink

如果您有一个非 Append-only 的表或 Source,并想创建一个 Append-only Sink,请在 CREATE SINK 查询中设置 type = append-only 以及 force_append_only = true

CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'deltalake',
type = 'append-only',
force_append_only = 'true',
location = 's3a://my-delta-lake-bucket/path/to/table',
s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com',
s3.access.key = '${ACCESS_KEY}',
s3.secret.key = '${SECRET_KEY}'
);

查询 Delta Lake 中的数据

要确保数据被刷新到 Sink,请在 RisingWave 中使用 FLUSH 命令。

以下查询使用 spark-sql 能检查已导出到 Delta Lake 表的记录总数。

spark-sql --packages io.delta:delta-core_2.12:2.2.0,org.apache.hadoop:hadoop-aws:3.3.2\
--conf 'spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog' \
--conf 'spark.hadoop.fs.s3a.access.key=${ACCESS_KEY}' \
--conf 'spark.hadoop.fs.s3a.secret.key=${SECRET_KEY}' \
--conf 'spark.hadoop.fs.s3a.endpoint=https://s3.ap-southeast-1.amazonaws.com' \
--conf 'spark.hadoop.fs.s3a.path.style.access=true' \
--e "select count(*) from delta.\`s3a://my-delta-lake-bucket/path/to/table\`"