Skip to main content

从 S3 存储桶摄取数据

使用下面的 SQL 语句将 RisingWave 连接到 Amazon S3 Source。RisingWave 同时支持 CSV 和 ndjson 文件格式。

句法

CREATE SOURCE [ IF NOT EXISTS ] source_name 
schema_definition
WITH (
connector={ 's3' | 's3_v2' },
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info

对于 CSV 数据,请在 ENCODE properties 中的 delimiter 选项中指定分隔符。

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)

参数

字段注释
connector必填。在 s3s3_v2(推荐)连接器之间进行选择。了解有关 s3_v2 的更多信息
s3.region_name必填。服务区域。
s3.bucket_name必填。存储数据源的存储桶名称。
s3.credentials.access必填。此字段表示 AWS 的访问密钥 ID。
s3.credentials.secret必填。此字段表示 AWS 的秘密访问密钥。
match_pattern有条件。此字段用于查找 s3.bucket_name 中与给定模式匹配的对象键。支持标准 Unix 风格 glob 句法。
s3.endpoint_url有条件。与 S3 兼容的对象存储服务器的主机 URL。这允许用户使用与标准 S3 服务器不同的服务器。
note

CSV 文件中的空单元格会被解析为 NULL

字段注释
data_format支持的数据格式:PLAIN
data_encode支持的数据编码:CSVJSON
without_header第一行是否为标题。接受的值:'true''false'。默认值:'true'
delimiterRisingWave 如何分割内容。对于 JSON 编码,分隔符是 /n

s3_v2 连接器

测试版功能

s3_v2 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

s3 连接器将文件视为 Split,导致在处理大量文件时可扩展性差,并且可能超时。

s3_v2 连接器旨在通过实施更高效的列表和获取机制来解决 s3 连接器的可扩展性和性能限制。若想了解这种新方法的技术细节,请参阅 设计文档

示例

以下是将 RisingWave 连接到 S3 Source 以从单个流中读取数据的示例。

CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 's3_v2',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);

重要考虑事项

S3 存储桶中的对象过滤

RisingWave 有一个用于过滤 S3 存储桶中对象的前缀参数。它依赖于 Apache Opendal,其前缀过滤器实现预计将很快发布。

源文件名作为列

创建包含源文件名的列的功能目前正在开发中。您可以在此处跟踪进度。

处理存储桶中的新文件

RisingWave 会自动摄取添加到存储桶中的新文件。但是,如果同时删除文件并添加同名的新文件,则其无法检测到文件的更新。此外,RisingWave 还会忽略文件的删除。

从 Source 读取数据

您需要从 Source 创建物化视图或使用 S3 连接器创建表来读取数据。下面是一些示例:

-- 从 Source 创建物化视图
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;

-- 使用 S3 连接器创建表
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3_v2', ... );

处理意外文件类型或格式不佳的文件

无论文件类型如何,RisingWave 都会尝试根据指定规则将文件解释和解析为 CSV 或 ndjson 格式。对于无法解析的文件部分,会报告警告信息,但 Source 的部分不会解析失败。文件中格式不佳的部分将被丢弃。