从 RisingWave 导出数据到 Apache Pulsar
本文描述了如何从 RisingWave 导出数据到 Apache Pulsar。
Apache Pulsar 是一个开源的分布式发布-订阅消息系统和事件流平台,可扩展,支持异地备份。
测试版功能
RisingWave 中的 Pulsar Sink 连接器目前处于测试版。如果您遇到任何问题或有任何反馈,请联系我们。
开始之前
在从 RisingWave 导出数据到 Pulsar 之前,请确保满足以下条件:
- 已运行 Pulsar 集群并且可从 RisingWave 访问。
- 您有权限访问您想要导出数据到的目标 Pulsar Topic。
句法
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='pulsar',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
format_parameter = 'value' ) ]
;
参数
参数名称 | 描述 |
---|---|
topic | 必填。Pulsar Topic 的地址。一个源只能对应一个 Topic。 |
service.url | 必填。Pulsar 服务的地址。 |
auth.token | 可选。用于认证的令牌。如果同时设置了 auth.token 和 oauth ,只考虑 oauth 认证。 |
oauth.issuer.url | 有条件必填。OAuth2 的发行者 URL。如果指定了其他 oauth 字段,则必须填写此字段。 |
oauth.credentials.url | 有条件必填。凭证文件的路径,以 file:// 开头。如果指定了其他 oauth 字段,则必须填写此字段。 |
oauth.audience | 有条件必填。OAuth2 的受众。如果指定了其他 oauth 字段,则必须填写此字段。 |
oauth.scope | 可选。OAuth2 的范围。 |
access_key | 可选。用于从 S3 加载的 AWS 访问密钥。如果 oauth.credentials.url 指定了本地路径,则不需要填写此字段。 |
secret_access | 可选。用于从 S3 加载的 AWS 秘密访问密钥。如果 oauth.credentials.url 指定了本地路径,则不需要填写此字段。 |
max_retry_num | 可选。向 Pulsar 发送批次的最大重试次数。这允许在遇到暂时性错误时重试。默认值为 3。 |
retry_interval | 可选。失败后重试发送批次之前的等待时间,以毫秒为单位。默认值为 100ms。 |
primary_key | 有条件必填。Sink 的主键。使用 ',' 来分隔主键列。创建 PLAIN Sink 时主键是可选的,但对于 UPSERT 和 DEBEZIUM Sink 来说是必填项。 |
Sink 参数
字段 | 注释 |
---|---|
data_format | 数据格式。允许的格式:
UPSERT Sink 时何时定义主键,请参见概览。 |
data_encode | 数据编码。支持的编码:JSON 。 |
force_append_only | 如果为 true ,即使不能做到也会强制 Sink 为 PLAIN (即 append-only )。 |
timestamptz.handling.mode | 控制 timestamptz 输出格式。此参数特别适用于使用 JSON 编码的仅追加或 Upsert Sink。 - 如果省略,timestamptz 的输出格式为 2023-11-11T18:30:09.453000Z ,包括 UTC 后缀 Z 。- 当指定 utc_without_suffix 时,格式改为 2023-11-11 18:30:09.453000 。 |
示例
以下 SQL 查询在 RisingWave 中创建了一个 Pulsar Sink。
CREATE SINK IF NOT EXISTS pulsar_sink
FROM mv_name
WITH (
connector = 'pulsar',
topic = 'test-topic',
service.url = 'pulsar://broker:6650',
-- OAuth
oauth.issuer.url = 'https://issuer.com',
oauth.credentials.url = 'https://provider.com',
oauth.audience = 'test-aud',
oauth.scope = 'consume',
-- S3 凭证用于 OAuth 文件
access_key = 'xxx',
secret_access = 'xxx'
)
FORMAT DEBEZIUM ENCODE JSON;