Skip to main content

从 Kinesis 摄取数据

使用下面的 SQL 语句将 RisingWave 连接到 Kinesis 数据流。

创建 Source 时,您可以选择在 RisingWave 中持久化 Source 中的数据,方法是使用 CREATE TABLE 而不是 CREATE SOURCE,并指定连接设置和数据格式。

句法

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name 
[ schema_definition ]
WITH (
connector='kinesis',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);

schema_definition:

(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info

对于 Avro 和 Protobuf 数据,请勿在 CREATE SOURCECREATE TABLE 语句中指定 schema_definition。应在 ENCODE 部分的选项 schema.location 中的网络位置提供 Schema。

note

RisingWave 会对带有连接器设置的表执行主键约束检查,但不会对常规 Source 执行此操作。如果需要执行检查,请创建带有连接器设置的表。

对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。

连接器参数

字段注释
stream必填。流的名称。
aws.region必填。AWS 服务区域。例如,美国东部(弗吉尼亚州北部)。
endpoint可选。AWS Kinesis 服务入口点的 URL。
aws.credentials.access_key_id必填。此字段表示 AWS 的访问密钥 ID。
aws.credentials.secret_access_key必填。此字段表示 AWS 的秘密访问密钥。
aws.credentials.session_token可选。与临时安全凭证关联的会话令牌。
aws.credentials.role.arn可选。要担任角色的亚马逊资源名称(ARN)。
aws.credentials.role.external_id可选。用于授权访问第三方资源的 外部 id
scan.startup.mode可选。Kinesis 消费者的启动 Schema。支持的 Schema 有:earliest(从最早的偏移开始)、latest(从最近的偏移开始)和 timestamp(从特定的时间戳开始,由 scan.startup.timestamp.millis 指定)。默认模式为 earliest
scan.startup.timestamp.millis可选。该字段可指定时间戳(以 i64 表示),从该时间戳开始进行消费。

其他参数

字段注释
data_format支持的格式:debeziummaxwellcanal
data_encode支持的编码:JSONAVROPROTOBUFCSV
messageSchema 定义中主要消息的名称。当 data_encodePROTOBUF 时必填。
locationSchema 文件的网络位置,格式为 http://...https://...S3://...。当 data_encodeAVROPROTOBUF 时必填。示例:
https://<example_host>/risingwave/proto-simple-schema.proto
s3://risingwave-demo/schema-location

示例

下面是将 RisingWave 连接到 Kinesis 数据流以从单个数据流中读取数据的示例。

CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE AVRO (
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);