CREATE SOURCE
source 是 RisingWave 可以从中读取数据的资源。您可以使用 CREATE SOURCE
命令在 RisingWave 中创建 Source。
如果选择在 RisingWave 中保存 source 的数据,请使用带有连接器设置的 CREATE TABLE
命令。有关详细信息,请参阅 CREATE TABLE。
无论数据是否保存在 RisingWave 中,您都可以创建物化视图以进行分析或数据转换。
句法
CREATE SOURCE [ IF NOT EXISTS ] source_name (
col_name data_type [ AS generation_expression ],
...
[ watermark_clause ]
)
[ WITH (
connector='connector_name',
connector_parameter='value', ...)]
[FORMAT data_format ENCODE data_encode [ (
message='message',
schema.location='location', ...) ]
];
注释
生成列是用非确定性函数定义的。摄取数据时,该函数会生成该字段的值。
名称和未引用的标识符不区分大小写。这些字段必须添加双引号才能区分大小写。
要了解数据何时加载到 RisingWave,请在创建表或 source 时定义基于处理时间生成的列 (<column_name> timestamptz AS proctime()
)。
参数
参数 | 描述 |
---|---|
source_name | source 的名称。如果提供 schema 的名称(例如,CREATE SOURCE <schema>.<source> ... ),则表将在指定 schema 中创建。否则,它将在当前 schema 中创建。 |
col_name | 列的名称。 |
data_type | 列的数据类型。使用 struct 数据类型,可以创建嵌套表。嵌套表中的元素需要用尖括号("<>")括起来。 |
generation_expression | 生成列的表达式。有关生成列的详细信息,请参阅 生成列。 |
watermark_clause | 用于定义时间戳列的水位线。句法为 WATERMARK FOR column_name as expr 。有关水位线的详细信息,请参阅 水位线。 |
WITH 子句 | 如果尝试存储所有源数据,请在此处设置连接器。有关支持的 source 的完整列表,以及详细说明每个 source 句法的连接器页面链接,请参阅 支持的 source。 |
FORMAT 和 ENCODE 选项 | 指定源数据的数据格式和编码格式。要了解支持的数据格式,请参阅 支持的格式。 |
支持的 source
单击连接器名称,查看将 RisingWave 连接到该连接器的 SQL 句法、选项和示例语句。
::note
若要摄取 “T” 格式的数据,则需要创建(带连接器设置的)表。否则,您既可以选择创建 source,也可选择创建(带连接器设置的)表。
:::
连接器 | 版本 | 格式 |
---|---|---|
Kafka | 3.1.0 或更高版本 | Avro, JSON, protobuf, Debezium JSON (T), Debezium AVRO (T), DEBEZIUM_MONGO_JSON (T), Maxwell JSON (T), Canal JSON (T), Upsert JSON, Upsert AVRO, Bytes |
Redpanda | 最新版 | Avro, JSON, protobuf |
Pulsar | 2.8.0 或更高版本 | Avro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T) |
Astra Streaming | 最新版 | Avro, JSON, protobuf |
Kinesis | 最新版 | Avro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T) |
PostgreSQL CDC | 10, 11, 12, 13, 14 | Debezium JSON (T) |
MySQL CDC | 5.7, 8.0 | Debezium JSON (T) |
CDC via Kafka | Debezium JSON (T), Maxwell JSON (T), Canal JSON (T) | |
Amazon S3 | 最新版 | JSON, CSV |
Load generator | 内置 | JSON |
Google Pub/Sub | Avro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T) | |
Google Cloud Storage | JSON |
创建 source 时,RisingWave 不会立即摄取数据。RisingWave 会在基于 source 创建物化视图后开始处理数据。
水位线
RisingWave 支持在创建 source 时生成水位线。水位线类似于跟踪事件时间进度的标记或信号,允许您在相应的时间窗口内处理事件。 WATERMARK
子句应该在 schema_definition
中使用。有关创建水位线的更多信息,请参阅 水位线。
变更数据捕获(CDC)
变更数据捕获(CDC)是指识别和捕获数据库中的数据更改,然后将更改实时传递给下游服务的过程。
RisingWave 提供原生的 MySQL 和 PostgreSQL CDC 连接器。使用这些 CDC 连接器可以直接摄取这些数据库的 CDC 数据,无需设置其他服务,如 Kafka。
如果 Kafka 是您技术栈的一部分,您还可以使用 RisingWave 中的 Kafka 连接器,将 CDC 数据以 Kafka topic 的形式从数据库摄取到 RisingWave。您需要使用 CDC 工具,例如 MySQL 的 Debezium 连接器 或 Maxwell's daemon,将 CDC 数据转换为 Kafka topic。
有关使用这两种方法从 MySQL 和 PostgreSQL 摄取数据的完整步骤,请参阅 从 MySQL 摄取数据 和 从 PostgreSQL 摄取数据。
支持的格式
创建 source 时,需在 CREATE SOURCE
或 CREATE TABLE
语句的 FORMAT
和 ENCODE
部分指定数据和编码格式。
Avro
对于 Avro 格式的数据,必须指定消息和 schema 文件的位置。schema 文件的位置可以是实际的 Web 位置,格式为 http://...
、https://...
或 S3://...
。对于 Avro 格式的 Kafka 数据,您可以提供 Confluent Schema Registry(而不是 schema 文件位置),RisingWave 可以从中获取 schema。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema。
schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
如果设置了 schema.registry
,还可以定义 schema.registry.name.strategy
。可接受的选项包括 topic_name_strategy
、record_name_strategy
和 topic_record_name_strategy
。如果使用了 record_name_strategy
或 topic_record_name_strategy
,还必须定义 key.message
字段。有关名称策略的更多详细信息,请参阅 主题名称策略。
schema.registry.name.strategy
目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
请注意,RisingWave 中显示的时间戳可能与上游系统中的时间戳不同,因为 Avro 序列化过程中会丢失时区信息。
对于 Avro 数据,不能在 CREATE SOURCE
或 CREATE TABLE
语句的 schema_definition
部分指定 schema。
句法:
FORMAT PLAIN
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)
Debezium AVRO
使用 Debezium AVRO 创建 source 时,不需要在 CREATE TABLE
语句中定义 source 的 schema,因为可以从 SCHEMA REGISTRY
中推断。这意味着必须指定 schema 文件的位置。schema 文件的位置可以是实际的 Web 位置(格式为 http://...
、https://...
或 S3://...
),也可以是 Confluent Schema Registry。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema。
schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
如果设置了 schema.registry
,还可以定义 schema.registry.name.strategy
。可接受的选项包括 topic_name_strategy
、record_name_strategy
和 topic_record_name_strategy
。如果使用了 record_name_strategy
或 topic_record_name_strategy
,还必须定义 key.message
字段。有关名称策略的更多详细信息,请参阅 主题名称策略。
schema.registry.name.strategy
目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
句法:
FORMAT DEBEZIUM
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)
Upsert AVRO
消费 Kafka topic 中 AVRO 格式的数据时,需要将 FORMAT
和 ENCODE
部分分别指定为 UPSERT
和 AVRO
。RisingWave 会知道源消息包含作为主列的关键字段以及 Kafka 消息值字段。如果消息的值字段不为空,若消息键不为空且已在数据库表中,则将更新行;若消息键不为空但不在数据库表中,则将插入行。如果值字段为空,则将删除行。
schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
如果设置了 schema.registry
,还可以定义 schema.registry.name.strategy
。可接受的选项包括 topic_name_strategy
、record_name_strategy
和 topic_record_name_strategy
。如果使用了record_name_strategy
或 topic_record_name_strategy
,还必须定义 key.message
字段。有关名称策略的更多详情,请参阅 主题名称策略。
schema.registry.name.strategy
目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
句法:
FORMAT UPSERT
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)
JSON
RisingWave 可直接从外部 source 解码 JSON。从流式 JSON 数据创建 source 时,可以在 source 名称后的括号中定义 source 的 schema,或指定一个 schema.registry
。在 FORMAT
和 ENCODE
部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。
schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
句法:
FORMAT PLAIN
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
[schema.registry.username = 'username'],
[schema.registry.password = 'password']
) ]
Canal JSON
RisingWave 支持 Canal CDC 格式的 TiCDC。从 TiCDC 中创建流的 source 时,可以在 source 名称(句法中为 schema_definition
)后的括号中定义 source 的 schema,并在 FORMAT
和 ENCODE
部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。
句法:
FORMAT CANAL
ENCODE JSON
Debezium JSON
从 Debezium JSON 中的流创建 source 时,可以在 source 名称(句法中为 schema_definition
)后的括号中定义 source 的 schema,并在 FORMAT
和 ENCODE
部分中指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。
请注意,如果要在 RisingWave 中摄取类型为 timestamp
或 timestamptz
的数据,上游值必须在 [1973-03-03 09:46:40, 5138-11-16 09:46:40] (UTC)
的范围内。该值在解析和摄取时可能会出现错误,但不显示警告信息。
句法:
FORMAT DEBEZIUM
ENCODE JSON
Debezium Mongo JSON
从 MongoDB Kafka topic 加载 Debezium Mongo JSON 格式的数据时,源表 schema 有一些限制。表 schema 必须包含 _id
和 payload
列。其中 _id
来自 MongoDB 文档的 id
,是主键,payload
是 jsonb
类型,包含文档的其余部分。如果文档的 _id
类型是 ObjectID
,那么在 RisingWave 中创建列时,应指定 _id
类型为 varchar
。如果文档的 _id
类型为 int32
或 int64
,则在 RisingWave 中指定 _id
类型为 int
或 bigint
。
句法:
FORMAT DEBEZIUM_MONGO
ENCODE JSON
Maxwell JSON
从 Maxwell JSON 中的数据流创建 source 时,您可以在 source 名称(句法中为 schema_definition
)后的括号内定义 source 的 schema,并在 FORMAT
和 ENCODE
部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。
句法:
FORMAT MAXWELL
ENCODE JSON
Upsert JSON
从 Kafka topic 消费 JSON 格式的数据时,需要将 FORMAT
和 ENCODE
部分分别指定为 UPSERT
和 JSON
。RisingWave 会知道源消息包含作为主列的关键字段以及 Kafka 消息值字段。如果消息的值字段不为空,若消息键不为空且已在数据库表中,则将更新行;若消息键不为空但不在数据库表中,则将插入行。如果值字段为空,则将删除行。
您可以在 source 名称后面的括号中定义 source 的 schema,也可以指定一个 schema.registry
。schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
句法:
FORMAT UPSERT
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
[schema.registry.username = 'username'],
[schema.registry.password = 'password']
) ]
Protobuf
对于 protobuf 格式的数据,必须指定消息和 schema 位置。schema 位置可以是 http://...
、https://...
或 S3://...
格式的实际网络位置。对于 protobuf 中的 Kafka 数据,可以提供 Confluent Schema Registry (而不是 schema 文件位置),RisingWave 可以从中获取 schema。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema。
schema.registry
可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。
如果设置了 schema.registry
,还可以定义 schema.registry.name.strategy
。可接受的选项包括 topic_name_strategy
、record_name_strategy
和 topic_record_name_strategy
。如果使用了 record_name_strategy
或 topic_record_name_strategy
,还必须定义 key.message
字段。有关名称策略的更多详细信息,请参阅 主题名称策略。
schema.registry.name.strategy
目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
对于 protobuf 数据,不能在 CREATE SOURCE
或 CREATE TABLE
语句的 schema_definition
部分中指定 schema。
如果提供了文件位置,则 schema 文件必须是 FileDescriptorSet
文件,可以使用以下命令从 .proto
文件编译:
protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto
句法:
FORMAT PLAIN
ENCODE PROTOBUF (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)
有关支持的 protobuf 类型的更多信息,请参阅 支持的 protobuf 类型.
Bytes
RisingWave 允许使用 BYTES
行格式读取数据流,无需解码数据。但表或 source 只能有一个 BYTEA
数据字段。
FORMAT PLAIN
ENCODE BYTES
另请参阅
DROP SOURCE
— 删除 source.SHOW CREATE SOURCE
— 显示用于创建 source 的 SQL 语句。