Skip to main content

CREATE SOURCE

source 是 RisingWave 可以从中读取数据的资源。您可以使用 CREATE SOURCE 命令在 RisingWave 中创建 Source。 如果选择在 RisingWave 中保存 source 的数据,请使用带有连接器设置的 CREATE TABLE 命令。有关详细信息,请参阅 CREATE TABLE

无论数据是否保存在 RisingWave 中,您都可以创建物化视图以进行分析或数据转换。

句法

CREATE SOURCE [ IF NOT EXISTS ] source_name (
col_name data_type [ AS generation_expression ],
...
[ watermark_clause ]
)
[ WITH (
connector='connector_name',
connector_parameter='value', ...)]
[FORMAT data_format ENCODE data_encode [ (
message='message',
schema.location='location', ...) ]
];

注释

生成列是用非确定性函数定义的。摄取数据时,该函数会生成该字段的值。

名称和未引用的标识符不区分大小写。这些字段必须添加双引号才能区分大小写。

要了解数据何时加载到 RisingWave,请在创建表或 source 时定义基于处理时间生成的列 (<column_name> timestamptz AS proctime())。

参数

参数描述
source_namesource 的名称。如果提供 schema 的名称(例如,CREATE SOURCE <schema>.<source> ...),则表将在指定 schema 中创建。否则,它将在当前 schema 中创建。
col_name列的名称。
data_type列的数据类型。使用 struct 数据类型,可以创建嵌套表。嵌套表中的元素需要用尖括号("<>")括起来。
generation_expression生成列的表达式。有关生成列的详细信息,请参阅 生成列
watermark_clause用于定义时间戳列的水位线。句法为 WATERMARK FOR column_name as expr。有关水位线的详细信息,请参阅 水位线
WITH 子句如果尝试存储所有源数据,请在此处设置连接器。有关支持的 source 的完整列表,以及详细说明每个 source 句法的连接器页面链接,请参阅 支持的 source
FORMATENCODE 选项指定源数据的数据格式和编码格式。要了解支持的数据格式,请参阅 支持的格式

支持的 source

单击连接器名称,查看将 RisingWave 连接到该连接器的 SQL 句法、选项和示例语句。

::note

若要摄取 “T” 格式的数据,则需要创建(带连接器设置的)表。否则,您既可以选择创建 source,也可选择创建(带连接器设置的)表。

:::

连接器版本格式
Kafka3.1.0 或更高版本Avro, JSON, protobuf, Debezium JSON (T), Debezium AVRO (T), DEBEZIUM_MONGO_JSON (T), Maxwell JSON (T), Canal JSON (T), Upsert JSON, Upsert AVRO, Bytes
Redpanda最新版Avro, JSON, protobuf
Pulsar2.8.0 或更高版本Avro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T)
Astra Streaming最新版Avro, JSON, protobuf
Kinesis最新版Avro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T)
PostgreSQL CDC10, 11, 12, 13, 14Debezium JSON (T)
MySQL CDC5.7, 8.0Debezium JSON (T)
CDC via KafkaDebezium JSON (T), Maxwell JSON (T), Canal JSON (T)
Amazon S3最新版JSON, CSV
Load generator内置JSON
Google Pub/SubAvro, JSON, protobuf, Debezium JSON (T), Maxwell JSON (T), Canal JSON (T)
Google Cloud StorageJSON
note

创建 source 时,RisingWave 不会立即摄取数据。RisingWave 会在基于 source 创建物化视图后开始处理数据。

水位线

RisingWave 支持在创建 source 时生成水位线。水位线类似于跟踪事件时间进度的标记或信号,允许您在相应的时间窗口内处理事件。 WATERMARK 子句应该在 schema_definition 中使用。有关创建水位线的更多信息,请参阅 水位线

变更数据捕获(CDC)

变更数据捕获(CDC)是指识别和捕获数据库中的数据更改,然后将更改实时传递给下游服务的过程。

RisingWave 提供原生的 MySQL 和 PostgreSQL CDC 连接器。使用这些 CDC 连接器可以直接摄取这些数据库的 CDC 数据,无需设置其他服务,如 Kafka。

如果 Kafka 是您技术栈的一部分,您还可以使用 RisingWave 中的 Kafka 连接器,将 CDC 数据以 Kafka topic 的形式从数据库摄取到 RisingWave。您需要使用 CDC 工具,例如 MySQL 的 Debezium 连接器Maxwell's daemon,将 CDC 数据转换为 Kafka topic。

有关使用这两种方法从 MySQL 和 PostgreSQL 摄取数据的完整步骤,请参阅 从 MySQL 摄取数据从 PostgreSQL 摄取数据

支持的格式

创建 source 时,需在 CREATE SOURCECREATE TABLE 语句的 FORMATENCODE 部分指定数据和编码格式。

Avro

对于 Avro 格式的数据,必须指定消息和 schema 文件的位置。schema 文件的位置可以是实际的 Web 位置,格式为 http://...https://...S3://...。对于 Avro 格式的 Kafka 数据,您可以提供 Confluent Schema Registry(而不是 schema 文件位置),RisingWave 可以从中获取 schema。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema

schema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

如果设置了 schema.registry,还可以定义 schema.registry.name.strategy。可接受的选项包括 topic_name_strategyrecord_name_strategytopic_record_name_strategy。如果使用了 record_name_strategytopic_record_name_strategy,还必须定义 key.message 字段。有关名称策略的更多详细信息,请参阅 主题名称策略

测试版功能

schema.registry.name.strategy 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

请注意,RisingWave 中显示的时间戳可能与上游系统中的时间戳不同,因为 Avro 序列化过程中会丢失时区信息。

info

对于 Avro 数据,不能在 CREATE SOURCECREATE TABLE 语句的 schema_definition 部分指定 schema。

句法:

FORMAT PLAIN
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)

Debezium AVRO

使用 Debezium AVRO 创建 source 时,不需要在 CREATE TABLE 语句中定义 source 的 schema,因为可以从 SCHEMA REGISTRY 中推断。这意味着必须指定 schema 文件的位置。schema 文件的位置可以是实际的 Web 位置(格式为 http://...https://...S3://...),也可以是 Confluent Schema Registry。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema

schema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

如果设置了 schema.registry,还可以定义 schema.registry.name.strategy。可接受的选项包括 topic_name_strategyrecord_name_strategytopic_record_name_strategy。如果使用了 record_name_strategytopic_record_name_strategy,还必须定义 key.message 字段。有关名称策略的更多详细信息,请参阅 主题名称策略

测试版功能

schema.registry.name.strategy 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

句法:

FORMAT DEBEZIUM
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)

Upsert AVRO

消费 Kafka topic 中 AVRO 格式的数据时,需要将 FORMATENCODE 部分分别指定为 UPSERTAVRO。RisingWave 会知道源消息包含作为主列的关键字段以及 Kafka 消息值字段。如果消息的值字段不为空,若消息键不为空且已在数据库表中,则将更新行;若消息键不为空但不在数据库表中,则将插入行。如果值字段为空,则将删除行。

schema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

如果设置了 schema.registry,还可以定义 schema.registry.name.strategy。可接受的选项包括 topic_name_strategyrecord_name_strategytopic_record_name_strategy。如果使用了record_name_strategytopic_record_name_strategy,还必须定义 key.message 字段。有关名称策略的更多详情,请参阅 主题名称策略

测试版功能

schema.registry.name.strategy 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

句法:

FORMAT UPSERT
ENCODE AVRO (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)

JSON

RisingWave 可直接从外部 source 解码 JSON。从流式 JSON 数据创建 source 时,可以在 source 名称后的括号中定义 source 的 schema,或指定一个 schema.registry。在 FORMATENCODE 部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。

schema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

句法:

FORMAT PLAIN 
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
[schema.registry.username = 'username'],
[schema.registry.password = 'password']
) ]

Canal JSON

RisingWave 支持 Canal CDC 格式的 TiCDC。从 TiCDC 中创建流的 source 时,可以在 source 名称(句法中为 schema_definition)后的括号中定义 source 的 schema,并在 FORMATENCODE 部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。

句法:

FORMAT CANAL
ENCODE JSON

Debezium JSON

从 Debezium JSON 中的流创建 source 时,可以在 source 名称(句法中为 schema_definition)后的括号中定义 source 的 schema,并在 FORMATENCODE 部分中指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。

请注意,如果要在 RisingWave 中摄取类型为 timestamptimestamptz 的数据,上游值必须在 [1973-03-03 09:46:40, 5138-11-16 09:46:40] (UTC) 的范围内。该值在解析和摄取时可能会出现错误,但不显示警告信息。

句法:

FORMAT DEBEZIUM
ENCODE JSON

Debezium Mongo JSON

从 MongoDB Kafka topic 加载 Debezium Mongo JSON 格式的数据时,源表 schema 有一些限制。表 schema 必须包含 _idpayload 列。其中 _id 来自 MongoDB 文档的 id,是主键,payloadjsonb 类型,包含文档的其余部分。如果文档的 _id 类型是 ObjectID,那么在 RisingWave 中创建列时,应指定 _id 类型为 varchar。如果文档的 _id 类型为 int32int64,则在 RisingWave 中指定 _id 类型为 intbigint

句法:

FORMAT DEBEZIUM_MONGO
ENCODE JSON

Maxwell JSON

从 Maxwell JSON 中的数据流创建 source 时,您可以在 source 名称(句法中为 schema_definition)后的括号内定义 source 的 schema,并在 FORMATENCODE 部分指定数据和编码格式。您可以直接引用 JSON 有效负载中的数据字段,将其名称作为 schema 中的列名。

句法:

FORMAT MAXWELL
ENCODE JSON

Upsert JSON

从 Kafka topic 消费 JSON 格式的数据时,需要将 FORMATENCODE 部分分别指定为 UPSERTJSON。RisingWave 会知道源消息包含作为主列的关键字段以及 Kafka 消息值字段。如果消息的值字段不为空,若消息键不为空且已在数据库表中,则将更新行;若消息键不为空但不在数据库表中,则将插入行。如果值字段为空,则将删除行。

您可以在 source 名称后面的括号中定义 source 的 schema,也可以指定一个 schema.registryschema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

句法:

FORMAT UPSERT
ENCODE JSON [ (
schema.registry = 'schema_registry_url [, ...]',
[schema.registry.username = 'username'],
[schema.registry.password = 'password']
) ]

Protobuf

对于 protobuf 格式的数据,必须指定消息和 schema 位置。schema 位置可以是 http://...https://...S3://... 格式的实际网络位置。对于 protobuf 中的 Kafka 数据,可以提供 Confluent Schema Registry (而不是 schema 文件位置),RisingWave 可以从中获取 schema。有关在 Kafka 数据中使用 Schema Registry 的更多详细信息,请参阅 从 Schema Registry 读取 schema

schema.registry 可以接受多个地址。RisingWave 将向所有 URL 发送请求,并返回第一个成功的结果。

如果设置了 schema.registry,还可以定义 schema.registry.name.strategy。可接受的选项包括 topic_name_strategyrecord_name_strategytopic_record_name_strategy。如果使用了 record_name_strategytopic_record_name_strategy,还必须定义 key.message 字段。有关名称策略的更多详细信息,请参阅 主题名称策略

测试版功能

schema.registry.name.strategy 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

info

对于 protobuf 数据,不能在 CREATE SOURCECREATE TABLE 语句的 schema_definition 部分中指定 schema。

如果提供了文件位置,则 schema 文件必须是 FileDescriptorSet 文件,可以使用以下命令从 .proto 文件编译:

protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto

句法:

FORMAT PLAIN
ENCODE PROTOBUF (
message = 'main_message',
schema.location = 'location' | schema.registry = 'schema_registry_url [, ...]',
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key']
)

有关支持的 protobuf 类型的更多信息,请参阅 支持的 protobuf 类型.

Bytes

RisingWave 允许使用 BYTES 行格式读取数据流,无需解码数据。但表或 source 只能有一个 BYTEA 数据字段。

FORMAT PLAIN
ENCODE BYTES

另请参阅