Skip to main content

从 RisingWave 导出数据到 Apache Pulsar

本文描述了如何从 RisingWave 导出数据到 Apache Pulsar。

Apache Pulsar 是一个开源的分布式发布-订阅消息系统和事件流平台,可扩展,支持异地备份。

测试版功能

RisingWave 中的 Pulsar Sink 连接器目前处于测试版。如果您遇到任何问题或有任何反馈,请联系我们。

开始之前

在从 RisingWave 导出数据到 Pulsar 之前,请确保满足以下条件:

  • 已运行 Pulsar 集群并且可从 RisingWave 访问。
  • 您有权限访问您想要导出数据到的目标 Pulsar Topic。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='pulsar',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
format_parameter = 'value' ) ]
;

参数

参数名称描述
topic必填。Pulsar Topic 的地址。一个源只能对应一个 Topic。
service.url必填。Pulsar 服务的地址。
auth.token可选。用于认证的令牌。如果同时设置了 auth.tokenoauth,只考虑 oauth 认证。
oauth.issuer.url有条件必填。OAuth2 的发行者 URL。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.credentials.url有条件必填。凭证文件的路径,以 file:// 开头。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.audience有条件必填。OAuth2 的受众。如果指定了其他 oauth 字段,则必须填写此字段。
oauth.scope可选。OAuth2 的范围。
access_key可选。用于从 S3 加载的 AWS 访问密钥。如果 oauth.credentials.url 指定了本地路径,则不需要填写此字段。
secret_access可选。用于从 S3 加载的 AWS 秘密访问密钥。如果 oauth.credentials.url 指定了本地路径,则不需要填写此字段。
max_retry_num可选。向 Pulsar 发送批次的最大重试次数。这允许在遇到暂时性错误时重试。默认值为 3。
retry_interval可选。失败后重试发送批次之前的等待时间,以毫秒为单位。默认值为 100ms。
primary_key有条件必填。Sink 的主键。使用 ',' 来分隔主键列。创建 PLAIN Sink 时主键是可选的,但对于 UPSERTDEBEZIUM Sink 来说是必填项。

Sink 参数

字段注释
data_format数据格式。允许的格式:
  • PLAIN: 以插入操作输出数据。
  • DEBEZIUM: 以 Debezium 格式输出变更数据捕获(CDC)日志。
  • UPSERT: 作为变更日志流输出数据。在这种情况下,必须指定 primary_key
若要了解创建 UPSERT Sink 时何时定义主键,请参见概览
data_encode数据编码。支持的编码:JSON
force_append_only如果为 true,即使不能做到也会强制 Sink 为 PLAIN(即 append-only)。
timestamptz.handling.mode控制 timestamptz 输出格式。此参数特别适用于使用 JSON 编码的仅追加或 Upsert Sink。
- 如果省略,timestamptz 的输出格式为 2023-11-11T18:30:09.453000Z,包括 UTC 后缀 Z
- 当指定 utc_without_suffix 时,格式改为 2023-11-11 18:30:09.453000

示例

以下 SQL 查询在 RisingWave 中创建了一个 Pulsar Sink。

CREATE SINK IF NOT EXISTS pulsar_sink
FROM mv_name
WITH (
connector = 'pulsar',
topic = 'test-topic',
service.url = 'pulsar://broker:6650',

-- OAuth
oauth.issuer.url = 'https://issuer.com',
oauth.credentials.url = 'https://provider.com',
oauth.audience = 'test-aud',
oauth.scope = 'consume',

-- S3 凭证用于 OAuth 文件
access_key = 'xxx',
secret_access = 'xxx'
)
FORMAT DEBEZIUM ENCODE JSON;