从 NATS JetStream 摄取数据
可以使用 RisingWave 中的 NATS Source Connector 使得 RisingWave 能从 NATS JetStream 摄取数据。
NATS 是面向云原生应用的开源消息系统。它为高性能消息传递提供了轻量级发布-订阅架构。
NATS JetStream 是建立在 NATS 基础上的流数据平台。它通过持久订阅和消费者组实现对数据流的实时和历史访问。
测试版功能
RisingWave 中的 NATS Source Connector 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
开始之前
将数据从 NATS JetStream 导入到 RisingWave 之前,请确保:
- NATS JetStream 服务器正在运行并可从您的 RisingWave 集群访问。
- 如果 NATS JetStream 服务器需要进行身份验证,请确保您有客户端用户名和密码。客户端用户必须拥有主题的
subscribe
权限。 - 创建要从中摄取数据的 NATS 主题。
- 确保您的 RisingWave 集群正在运行。
将数据摄入 RisingWave
创建 Source 时,您可以选择在 RisingWave 中持久化 Source 中的数据,方法是使用 CREATE TABLE
而不是 CREATE SOURCE
,并指定连接设置和数据格式。
句法
CREATE { TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
connector='nats',
server_url='<your nats server>:<port>', [ <another_server_url_if_available>, ...]
subject='<subject>[,<another_subject...]',
stream='stream_name',
-- 可选参数
connect_mode=<connect_mode>
username='<your user name>',
password='<your password>'
jwt=`<your jwt>`,
nkey=`<your nkey>`
-- 传递参数
scan.startup.mode=`startup_mode`
scan.startup.timestamp.millis='xxxxx',
)
FORMAT PLAIN ENCODE JSON;
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
note
RisingWave 会对带有连接器设置的表执行主键约束检查,但不会对常规 Source 执行主键约束检查。如果需要执行检查,请创建带有连接器设置的表。
对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。
note
根据 NATS 文档,流名称必须遵守主题命名规则,并对文件系统友好。推荐依据以下准则对流名称进行命名:
- 使用字母数字值。
- 避免空格、制表符、句点(
.
)、大于号(>
)或星号(*
)。 - 不要包含路径分隔符(正斜杠或反斜杠)。
- 将名称长度限制在 32 个字符以内,因为 JetStream 存储目录包括账户、流名称和消费者名称。
- 避免使用保留的文件名,如
NUL
或LPT1
。 - 注意文件系统的大小写敏感性。为防止冲突,请确保流或帐户名称不会由于大小写差异而发生冲突。例如,在 Windows 或 macOS 系统中,
Foo
和foo
就会发生冲突。
参数
字段 | 注释 |
---|---|
server_url | 必填。NATS JetStream 服务器的 URL,格式为 地址:端口。如果指定多个地址,请用逗号分隔。 |
subject | 必填。要从中摄取数据的 NATS 主题。要指定多个主题,请使用逗号。 |
stream | 必填。要从中摄取数据的 NATS 流。 |
connect_mode | 必填。连接的身份验证模式。允许值:
|
jwt 和 nkey | JWT 和 NKEY 用于身份验证。有关详细信息,请参阅 JWT 和 NKeys。 |
username 和 password | 有条件。客户端用户名和密码。connect_mode 为 user_and_password 时必填。 |
scan.startup.mode | 可选。RisingWave 用于消费数据的偏移模式。支持的模式有:
earliest 。 |
scan.startup.timestamp.millis | 有条件。scan.startup.mode 为 timestamp_millis 时必填。RisingWave 将从指定的 UNIX 时间戳(毫秒)开始消费数据。 |
下一步
创建 Source 或表后,可以创建物化视图来转换或分析流数据。