Skip to main content

从 RisingWave 导出数据到 AWS Kinesis

本文介绍如何将数据从 RisingWave 导出到 AWS Kinesis Data Streams。

测试版功能

RisingWave 中的 AWS Kinesis Sink 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name  
[FROM sink_from | AS select_query]
WITH (
connector = 'kinesis',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
format_parameter = 'value'
) ]
;

基本参数

字段备注
stream必填。流的名称。
aws.region必填。AWS 区域。例如,US East (N. Virginia)。
endpoint可选。AWS Kinesis 服务的入口点 URL。
aws.credentials.access_key_id必填。AWS 的访问密钥 ID。
aws.credentials.secret_access_key必填。AWS 的秘密访问密钥。
aws.credentials.session_token可选。与临时安全凭证关联的会话令牌。
aws.credentials.role.arn可选。要扮演的角色的 Amazon 资源名称(ARN)。
aws.credentials.role.external_id可选。用于授权访问第三方资源的 外部 ID
primary_key必填。Sink 的主键。使用 ',' 分隔主键列。

Sink 参数

字段备注
data_format数据格式。允许的格式:
  • PLAIN:输出插入操作的数据。
  • DEBEZIUM:以 Debezium 格式输出变更数据捕获(CDC)日志。
  • UPSERT:以变更日志流的形式输出数据。在这种情况下必须指定 primary_key
要了解在创建 UPSERT Sink 时何时定义主键,请参见 概述
data_encode数据编码格式。支持的编码:JSON
force_append_only如果为 true,即使无法做到,也强制 Sink 为 PLAIN(即 append-only)。
timestamptz.handling.mode控制 timestamptz 输出格式。该参数特别适用于使用 JSON 编码的 Append-only 或 Upsert Sink。
- 如果省略,timestamptz 的输出格式为 2023-11-11T18:30:09.453000Z,包括 UTC 后缀 Z
- 当指定 utc_without_suffix 时,格式更改为 2023-11-11 18:30:09.453000

示例

CREATE SINK s1 FROM t WITH (  
connector = 'kinesis',
stream = 'kinesis-sink-demo',
aws.region = 'us-east-1',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
)
FORMAT DEBEZIUM ENCODE JSON;