Skip to main content

从 NATS JetStream 摄取数据

可以使用 RisingWave 中的 NATS Source Connector 使得 RisingWave 能从 NATS JetStream 摄取数据。

NATS 是面向云原生应用的开源消息系统。它为高性能消息传递提供了轻量级发布-订阅架构。

NATS JetStream 是建立在 NATS 基础上的流数据平台。它通过持久订阅和消费者组实现对数据流的实时和历史访问。

测试版功能

RisingWave 中的 NATS Source Connector 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

开始之前

将数据从 NATS JetStream 导入到 RisingWave 之前,请确保:

  • NATS JetStream 服务器正在运行并可从您的 RisingWave 集群访问。
  • 如果 NATS JetStream 服务器需要进行身份验证,请确保您有客户端用户名和密码。客户端用户必须拥有主题的 subscribe 权限。
  • 创建要从中摄取数据的 NATS 主题。
  • 确保您的 RisingWave 集群正在运行。

将数据摄入 RisingWave

创建 Source 时,您可以选择在 RisingWave 中持久化 Source 中的数据,方法是使用 CREATE TABLE 而不是 CREATE SOURCE,并指定连接设置和数据格式。

句法

CREATE { TABLE | SOURCE} [ IF NOT EXISTS ] source_name 
[ schema_definition ]
WITH (
connector='nats',
server_url='<your nats server>:<port>', [ <another_server_url_if_available>, ...]
subject='<subject>[,<another_subject...]',
stream='stream_name',

-- 可选参数
connect_mode=<connect_mode>
username='<your user name>',
password='<your password>'
jwt=`<your jwt>`,
nkey=`<your nkey>`

-- 传递参数
scan.startup.mode=`startup_mode`
scan.startup.timestamp.millis='xxxxx',
)
FORMAT PLAIN ENCODE JSON;

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
note

RisingWave 会对带有连接器设置的表执行主键约束检查,但不会对常规 Source 执行主键约束检查。如果需要执行检查,请创建带有连接器设置的表。

对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。

note

根据 NATS 文档,流名称必须遵守主题命名规则,并对文件系统友好。推荐依据以下准则对流名称进行命名:

  • 使用字母数字值。
  • 避免空格、制表符、句点(.)、大于号(>)或星号(*)。
  • 不要包含路径分隔符(正斜杠或反斜杠)。
  • 将名称长度限制在 32 个字符以内,因为 JetStream 存储目录包括账户、流名称和消费者名称。
  • 避免使用保留的文件名,如 NULLPT1
  • 注意文件系统的大小写敏感性。为防止冲突,请确保流或帐户名称不会由于大小写差异而发生冲突。例如,在 Windows 或 macOS 系统中,Foofoo 就会发生冲突。

参数

字段注释
server_url必填。NATS JetStream 服务器的 URL,格式为 地址:端口。如果指定多个地址,请用逗号分隔。
subject必填。要从中摄取数据的 NATS 主题。要指定多个主题,请使用逗号。
stream必填。要从中摄取数据的 NATS 流。
connect_mode必填。连接的身份验证模式。允许值:
  • plain:无身份验证。
  • user_and_password:使用用户名和密码进行身份验证。对于此选项,必须指定 usernamepassword
  • credential:使用 JSON Web 令牌(JWT)和 NKeys 进行身份验证。对于此选项,必须指定 jwtnkey
jwtnkeyJWT 和 NKEY 用于身份验证。有关详细信息,请参阅 JWTNKeys
usernamepassword有条件。客户端用户名和密码。connect_modeuser_and_password 时必填。
scan.startup.mode可选。RisingWave 用于消费数据的偏移模式。支持的模式有:
  • earliest:从最早的偏移量开始消费数据。
  • latest:从最新的偏移量开始消费数据。
  • timestamp_millis:从特定的 UNIX 时间戳消费数据,该时间戳通过 scan.startup.timestamp.millis 指定。
如果未指定,将使用默认值 earliest
scan.startup.timestamp.millis有条件。scan.startup.modetimestamp_millis 时必填。RisingWave 将从指定的 UNIX 时间戳(毫秒)开始消费数据。

下一步

创建 Source 或表后,可以创建物化视图来转换或分析流数据。