从 S3 存储桶摄取数据
使用下面的 SQL 语句将 RisingWave 连接到 Amazon S3 Source。RisingWave 同时支持 CSV 和 ndjson 文件格式。
句法
CREATE SOURCE [ IF NOT EXISTS ] source_name
schema_definition
WITH (
connector={ 's3' | 's3_v2' },
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
without_header = 'true' | 'false',
delimiter = 'delimiter'
);
info
对于 CSV 数据,请在 ENCODE properties
中的 delimiter
选项中指定分隔符。
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
参数
字段 | 注释 |
---|---|
connector | 必填。在 s3 和 s3_v2 (推荐)连接器之间进行选择。了解有关 s3_v2 的更多信息。 |
s3.region_name | 必填。服务区域。 |
s3.bucket_name | 必填。存储数据源的存储桶名称。 |
s3.credentials.access | 必填。此字段表示 AWS 的访问密钥 ID。 |
s3.credentials.secret | 必填。此字段表示 AWS 的秘密访问密钥。 |
match_pattern | 有条件。此字段用于查找 s3.bucket_name 中与给定模式匹配的对象键。支持标准 Unix 风格 glob 句法。 |
s3.endpoint_url | 有条件。与 S3 兼容的对象存储服务器的主机 URL。这允许用户使用与标准 S3 服务器不同的服务器。 |
note
CSV 文件中的空单元格会被解析为 NULL
。
字段 | 注释 |
---|---|
data_format | 支持的数据格式:PLAIN 。 |
data_encode | 支持的数据编码:CSV 、JSON 。 |
without_header | 第一行是否为标题。接受的值:'true' 、'false' 。默认值:'true' 。 |
delimiter | RisingWave 如何分割内容。对于 JSON 编码,分隔符是 /n 。 |
s3_v2
连接器
测试版功能
s3_v2
连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
s3
连接器将文件视为 Split,导致在处理大量文件时可扩展性差,并且可能超时。
s3_v2
连接器旨在通过实施更高效的列表和获取机制来解决 s3
连接器的可扩展性和性能限制。若想了解这种新方法的技术细节,请参阅 设计文档。
示例
以下是将 RisingWave 连接到 S3 Source 以从单个流中读取数据的示例。
- CSV
- JSON
CREATE TABLE s(
id int,
name varchar,
age int,
primary key(id)
)
WITH (
connector = 's3_v2',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);
CREATE TABLE s3(
id int,
name TEXT,
age int,
mark int,
)
WITH (
connector = 's3_v2',
match_pattern = '%Ring%*.ndjson',
s3.region_name = 'ap-southeast-2',
s3.bucket_name = 'example-s3-source',
s3.credentials.access = 'xxxxx',
s3.credentials.secret = 'xxxxx',
s3.endpoint_url = 'https://s3.us-east-1.amazonaws.com'
) FORMAT PLAIN ENCODE JSON;
重要考虑事项
S3 存储桶中的对象过滤
RisingWave 有一个用于过滤 S3 存储桶中对象的前缀参数。它依赖于 Apache Opendal,其前缀过滤器实现预计将很快发布。
源文件名作为列
创建包含源文件名的列的功能目前正在开发中。您可以在此处跟踪进度。
处理存储桶中的新文件
RisingWave 会自动摄取添加到存储桶中的新文件。但是,如果同时删除文件并添加同名的新文件,则其无法检测到文件的更新。此外,RisingWave 还会忽略文件的删除。
从 Source 读取数据
您需要从 Source 创建物化视图或使用 S3 连接器创建表来读取数据。下面是一些示例:
-- 从 Source 创建物化视图
CREATE SOURCE s3_source WITH ( connector = 's3_v2', ... );
CREATE MATERIALIZED VIEW mv AS SELECT * FROM s3_source;
-- 使用 S3 连接器创建表
CREATE TABLE s3_table ( ... ) WITH ( connector = 's3_v2', ... );
处理意外文件类型或格式不佳的文件
无论文件类型如何,RisingWave 都会尝试根据指定规则将文件解释和解析为 CSV 或 ndjson 格式。对于无法解析的文件部分,会报告警告信息,但 Source 的部分不会解析失败。文件中格式不佳的部分将被丢弃。