Skip to main content

从 Pulsar 摄取数据

您可以通过使用 RisingWave 中的 Pulsar Source Connector 使得 RisingWave 能从 Pulsar 摄取数据。

测试版功能

RisingWave 中的 Pulsar Source Connector 目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

创建 Source 时,您可以选择在 RisingWave 中持久化 Source 中的数据,方法是使用 CREATE TABLE 而不是 CREATE SOURCE,并指定连接设置和数据格式。

句法

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name 
[ schema_definition ]
WITH (
connector='pulsar',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info

对于 Avro 和 Protobuf 数据,请勿在 CREATE SOURCECREATE TABLE 语句中指定 schema_definition。应在 ENCODE 部分的选项 schema.location 中的网络位置提供 Schema。

note

RisingWave 会对带有连接器设置的表执行主键约束检查,但不会对常规 Source 执行主键约束检查。如果需要执行检查,请创建带有连接器设置的表。

对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。

字段注释
topic必填。Pulsar Topic 的地址。一个 Source 只能对应一个 Topic。
service.url必填。Pulsar 服务的地址。
scan.startup.mode可选。RisingWave 用来消费数据的偏移模式。支持的两种模式是 earliest(最早的偏移)和 latest(最新的偏移)。如果未指定,将使用默认值 earliest
scan.startup.timestamp.millis.可选。RisingWave 将从指定的 UNIX 时间戳(毫秒)开始消费数据。
auth.token可选。用于身份验证的令牌。如果同时设置了 auth.tokenoauth,则只有 oauth 身份验证生效。
oauth.issuer.url有条件。OAuth2 的颁发者 URL。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.credentials.url有条件。凭据文件的路径,以 file:// 开头。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.audience有条件。OAuth2 的受众。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.scope可选。OAuth2 的范围。
access_key可选。用于从 S3 加载的 AWS 访问密钥。如果将 oauth.credentials.url 指定为本地路径,则无需填写此字段。
secret_access可选。用于从 S3 加载的 AWS 秘密访问密钥。如果将 oauth.credentials.url 指定为本地路径,则无需填写此字段。
字段注释
data_format支持的格式:DEBEZIUMMAXWELLCANAL
data_encode支持的编码:JSONAVROPROTOBUFCSV
messageSchema 定义中主要消息的名称。data_encodePROTOBUF 时必填。
locationSchema 文件的网络位置,格式为 http://...https://...S3://...data_encodeAVROPROTOBUF 时必填。示例:
https://<example_host>/risingwave/proto-simple-schema.proto
s3://risingwave-demo/schema-location

从位置读取 Schema

RisingWave 支持从网络位置(http://...https://...S3://... 格式)读取 Pulsar 数据(Avro 或 Protobuf 格式)的 Schema。

对于 Protobuf 数据,如果指定了 Schema 位置,则 Schema 文件必须是 FileDescriptorSet,可以使用以下命令从 .proto 文件编译:

protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto

要指定 Schema 位置,请在 CREATE SOURCE 语句中添加该子句。

ROW SCHEMA LOCATION 'location'

如果还需要定义主键,请使用表约束句法。

CREATE TABLE table1 (PRIMARY KEY(id)) 

示例

以下是将 RisingWave 连接到 Pulsar Broker 以从单个 Topic 读取数据的示例。

CREATE {TABLE | SOURCE} IF NOT EXISTS source_abc 
WITH (
connector='pulsar',
topic='demo_topic',
service.url='pulsar://localhost:6650/',
oauth.issuer.url='https://auth.streamnative.cloud/',
oauth.credentials.url='s3://bucket_name/your_key_file.file',
oauth.audience='urn:sn:pulsar:o-d6fgh:instance-0',
access_key='access_key',
secret_access='secret_access',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message',
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);