从 Kinesis 摄取数据
使用下面的 SQL 语句将 RisingWave 连接到 Kinesis 数据流。
创建 Source 时,您可以选择在 RisingWave 中持久化 Source 中的数据,方法是使用 CREATE TABLE
而不是 CREATE SOURCE
,并指定连接设置和数据格式。
句法
CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
connector='kinesis',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
info
对于 Avro 和 Protobuf 数据,请勿在 CREATE SOURCE
或 CREATE TABLE
语句中指定 schema_definition
。应在 ENCODE
部分的选项 schema.location
中的网络位置提供 Schema。
note
RisingWave 会对带有连接器设置的表执行主键约束检查,但不会对常规 Source 执行此操作。如果需要执行检查,请创建带有连接器设置的表。
对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。
连接器参数
字段 | 注释 |
---|---|
stream | 必填。流的名称。 |
aws.region | 必填。AWS 服务区域。例如,美国东部(弗吉尼亚州北部)。 |
endpoint | 可选。AWS Kinesis 服务入口点的 URL。 |
aws.credentials.access_key_id | 必填。此字段表示 AWS 的访问密钥 ID。 |
aws.credentials.secret_access_key | 必填。此字段表示 AWS 的秘密访问密钥。 |
aws.credentials.session_token | 可选。与临时安全凭证关联的会话令牌。 |
aws.credentials.role.arn | 可选。要担任角色的亚马逊资源名称(ARN)。 |
aws.credentials.role.external_id | 可选。用于授权访问第三方资源的 外部 id。 |
scan.startup.mode | 可选。Kinesis 消费者的启动 Schema。支持的 Schema 有:earliest (从最早的偏移开始)、latest (从最近的偏移开始)和 timestamp (从特定的时间戳开始,由 scan.startup.timestamp.millis 指定)。默认模式为 earliest 。 |
scan.startup.timestamp.millis | 可选。该字段可指定时间戳(以 i64 表示),从该时间戳开始进行消费。 |
其他参数
字段 | 注释 |
---|---|
data_format | 支持的格式:debezium 、maxwell 、canal 。 |
data_encode | 支持的编码:JSON 、AVRO 、PROTOBUF 、CSV 。 |
message | Schema 定义中主要消息的名称。当 data_encode 为 PROTOBUF 时必填。 |
location | Schema 文件的网络位置,格式为 http://... 、https://... 或 S3://... 。当 data_encode 为 AVRO 或 PROTOBUF 时必填。示例:https://<example_host>/risingwave/proto-simple-schema.proto s3://risingwave-demo/schema-location |
示例
下面是将 RisingWave 连接到 Kinesis 数据流以从单个数据流中读取数据的示例。
- Avro
- JSON
- Protobuf
CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE AVRO (
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.avsc'
);
CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name (
column1 varchar,
column2 integer,
)
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE JSON;
CREATE {TABLE | SOURCE} [IF NOT EXISTS] source_name
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id',
aws.credentials.access_key_id = 'your_access_key',
aws.credentials.secret_access_key = 'your_secret_key'
) FORMAT PLAIN ENCODE PROTOBUF (
message = 'package.message_name',
schema.location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.proto'
);