从 RisingWave 导出数据到 Kafka
本文描述了如何将数据从 RisingWave 导出到 Kafka Broker,并如何指定安全(加密和认证)设置。
Sink 是一种外部目标,您可以将数据导出发送至 Sink。您可使用 CREATE SINK
语句从物化视图或表来创建一个 Sink。RisingWave 只支持以非事务模式写入消息。
句法
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='kafka',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
format_parameter = 'value'
) ]
;
名称和未加引号的标识符不区分大小写。因此,您必须为这些字段加上双引号,以便区分大小写。
基本参数
除另外说明,所有 WITH
选项都是必填的。
参数或子句 | 描述 |
---|---|
sink_name | 要创建的 Sink 名称。 |
sink_from | 指定将从中导出数据的直接来源。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询。 |
AS select_query | 指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句。参见 SELECT 了解 SELECT 命令的句法和示例。 |
connector | Sink 连接器类型必须为 'kafka' 用于创建 Kafka Sink。 |
properties.bootstrap.server | Kafka Broker 的地址。格式:‘ip:port’ 。如果有多个 Broker,用逗号分隔。 |
topic | Kafka Topic的地址。一个 Sink 只能对应一个 Topic。 |
primary_key | 条件性。Sink 的主键。使用 ',' 分隔主键列。创建 PLAIN Sink 时此字段是可选的,但创建 DEBEZIUM 或 UPSERT Sink 时为必填项。 |
额外的 Kafka 参数
在 RisingWave 中创建 Kafka Sink 时,您可以指定以下特定于 Kafka 的参数。要设置参数,请在 WITH
选项中添加 Kafka 参数在 RisingWave 中的对应参数。有关这些参数的更多详细信息,请参阅 配置属性。
Kafka 参数名称 | RisingWave 参数名称 | 类型 |
---|---|---|
allow.auto.create.topics | properties.allow.auto.create.topics | bool |
batch.num.messages | properties.batch.num.messages | int |
batch.size | properties.batch.size | int |
client.id | properties.client.id | string |
enable.idempotence | properties.enable.idempotence | bool |
max.in.flight.requests.per.connection | properties.max.in.flight.requests.per.connection | int |
message.max.bytes | properties.message.max.bytes | int |
message.send.max.retries | properties.message.send.max.retries | int |
message.timeout.ms | properties.message.timeout.ms | int |
queue.buffering.max.kbytes | properties.queue.buffering.max.kbytes | int |
queue.buffering.max.messages | properties.queue.buffering.max.messages | int |
queue.buffering.max.ms | properties.queue.buffering.max.ms | float |
retry.backoff.ms | properties.retry.backoff.ms | int |
receive.message.max.bytes | properties.receive.message.max.bytes | int |
ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
将 properties.ssl.endpoint.identification.algorithm
设置为 none
以绕过 CA 证书的验证并解决 SSL 握手失败。此参数可以设置为 https
或 none
。默认为 https
。
Sink 参数
字段 | 注释 |
---|---|
data_format | 数据格式。允许的格式:
UPSERT Sink 时何时定义主键,请参见数据导出概览。 |
data_encode | 数据编码。支持的编码:JSON 、AVRO 和 PROTOBUF 。对于 AVRO 编码,仅支持 UPSERT AVRO Sink。对于 PROTOBUF 编码,仅支持 PLAIN PROTOBUF Sink。 |
force_append_only | 如果为 true ,即使不能做到也会强制 Sink 为 PLAIN (即 append-only )。 |
timestamptz.handling.mode | 控制 timestamptz 输出格式。此参数特别适用于使用 JSON 编码的 Append-only 或 Upsert Sink。 - 如果省略,timestamptz 的输出格式为 2023-11-11T18:30:09.453000Z ,包括 UTC 后缀 Z 。- 当指定 utc_without_suffix 时,格式改为 2023-11-11 18:30:09.453000 。 |
schemas.enable | 仅对 Upsert JSON Sink 可配置。对于 Upsert JSON Sink,默认值为 false ,对于 Debezium JSON Sink,默认值为 true 。如果为 true ,RisingWave 将数据连同 Schema 一起导出到 Kafka Sink。请注意,这并不是指包含 JSON Schema 的 Schema 注册表,而是使用 Kafka Connect 定义的 Schema 格式。 |
Avro 特定参数
创建 upsert Avro Sink 时,可以在 FORMAT UPSERT ENCODE AVRO
后使用以下选项。
字段 | 注释 |
---|---|
schema.registry | 必填。 Schema 注册表的地址。 |
schema.registry.username | 可选。用于访问 Schema 注册表的用户名。 |
schema.registry.password | 可选。与用户名关联的密码。 |
schema.registry.name.strategy | 可选。接受的选项包括 topic_name_strategy (默认)、record_name_strategy 和 topic_record_name_strategy 。 |
key.message | 如果 schema.registry.name.strategy 设置为 record_name_strategy 或 topic_record_name_strategy ,则必填。 |
message | 如果 schema.registry.name.strategy 设置为 record_name_strategy 或 topic_record_name_strategy ,则必填。 |
句法:
FORMAT UPSERT
ENCODE AVRO (
schema.registry = 'schema_registry_url',
[schema.registry.username = 'username'],
[schema.registry.password = 'password'],
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key'],
[message = 'main_message',]
)
Protobuf 特定参数
创建 Append-only Protobuf Sink 时,可以在 FORMAT PLAIN ENCODE PROTOBUF
后使用以下选项。
字段 | 注释 |
---|---|
message | 必填。 Schema 定义中主消息的消息名称。 |
schema.location | 必填。 Schema 位置。可以是 file:// 、http:// 或 https:// 格式。 |
不建议在生产环境中使用 file://
格式。如果使用,它需要对元数据和 Compute 节点都可用。
句法:
FORMAT PLAIN
ENCODE PROTOBUF (
message = 'main_message',
schema.location = 'location'
)
示例
通过 SELECT 整个物化视图创建 Sink。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;
通过设置 properties.message.max.bytes
为 2000 来创建带有 Kafka 配置 message.max.bytes
设置为 2000 的 Sink。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test',
properties.message.max.bytes = 2000
)
FORMAT PLAIN ENCODE JSON;
通过选择 taxi_trips
的平均 distance
和 duration
创建 Sink。
taxi_trips
的 Schema 如下:
{
"id": VARCHAR,
"distance": DOUBLE PRECISION,
"duration": DOUBLE PRECISION,
"fare": DOUBLE PRECISION
}
表可能看起来像这样:
id | distance | duration | city
----+----------+----------+----------
1 | 16 | 23 | Dallas
2 | 23 | 9 | New York
3 | 6 | 15 | Chicago
4 | 9 | 35 | New York
CREATE SINK sink2 AS
SELECT
avg(distance) as avg_distance,
avg(duration) as avg_duration
FROM taxi_trips
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;
使用 VPC 连接创建 Sink
如果您的 Kafka Sink 服务位于与 RisingWave 不同的 VPC 中,请使用 AWS PrivateLink 或 GCP Private Service Connect 建立安全且直接的连接。有关如何设置 AWS PrivateLink 连接的详细信息,请参见创建 AWS PrivateLink 连接。
要使用 VPC 连接创建 Kafka Sink,在 CREATE SINK
语句的 WITH 部分指定以下参数。
参数 | 注释 |
---|---|
privatelink.targets | 对应于 Kafka Broker 的 PrivateLink 目标。目标应以 JSON 格式提供。请注意,列出的每个目标对应于 properties.bootstrap.server 字段中指定的每个 Broker。如果顺序不正确,将会有连接问题。 |
privatelink.endpoint | VPC 端点的 DNS 名称。 |
connection.name | 连接的名称,来自使用 CREATE CONNECTION 语句创建的连接。如果您已经使用 privatelink.endpoint 配置了 VPC 端点(推荐),则省略此参数。 |
以下是使用 PrivateLink 连接创建 Kafka Sink 的示例。注意 {"port": 8001}
对应于 Broker ip1:9092
,{"port": 8002}
对应于 Broker ip2:9092
。
CREATE SINK sink2 FROM mv2
WITH (
connector='kafka',
properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092',
topic='msk_topic',
privatelink.endpoint='10.148.0.4',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]'
)
FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);
TLS/SSL 加密和 SASL 认证
RisingWave 可以将数据导出到使用 传输层安全性(TLS) 加密和/或通过 SASL 进行认证的 Kafka。
安全套接字层 (SSL) 是传输层安全性 (TLS) 的前身,自 2015 年 6 月以来已被弃用。出于历史原因,配置和代码中使用了 SSL
而不是 TLS
。
简单认证和安全层 (SASL) 是一种用于 Internet 协议中的认证和数据安全的框架。
RisingWave 支持这些 SASL 认证机制:
SASL/PLAIN
SASL/SCRAM
SSL 加密可以与 SASL 认证机制同时使用。
要了解如何在 Kafka 中启用 SSL 加密和 SASL 认证,包括如何生成密钥和证书,请参阅 Confluent 的 安全教程。
您需要在 CREATE SINK
语句的 WITH 部分指定加密和认证参数。
仅 SSL 无 SASL
要将数据与 SSL 加密但无 SASL 认证导出,请在 CREATE SINK
语句的 WITH 部分指定这些参数。
参数 | 注释 |
---|---|
properties.security.protocol | 设置为 SSL 。 |
properties.ssl.ca.location | |
properties.ssl.certificate.location | |
properties.ssl.key.location | |
properties.ssl.key.password |
有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties.
开头,因此不包括此前缀。
以下是创建仅使用 SSL 加密而不使用 SASL 认证的 Sink 的示例。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;
SASL/PLAIN
参数 | 注释 |
---|---|
properties.security.protocol | 对于不使用 SSL 的 SASL/PLAIN,设置为 SASL_PLAINTEXT 。对于使用 SSL 的 SASL/PLAIN,设置为 SASL_SSL 。 |
properties.sasl.mechanism | 设置为 PLAIN 。 |
properties.sasl.username | |
properties.sasl.password |
有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties.
开头,因此不包括此前缀。
对于使用 SSL 的 SASL/PLAIN,您需要包含这些 SSL 参数:
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
以下是创建使用 SASL/PLAIN 认证而不使用 SSL 加密的 Sink 的示例。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;
这是创建使用 SASL/PLAIN 认证和 SSL 加密的 Sink 的示例。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;
SASL/SCRAM
参数 | 注释 |
---|---|
properties.security.protocol | 对于不使用 SSL 的 SASL/SCRAM,设置为 SASL_PLAINTEXT 。对于使用 SSL 的 SASL/SCRAM,设置为 SASL_SSL 。 |
properties.sasl.mechanism | 根据使用的加密方法设置为 SCRAM-SHA-256 或 SCRAM-SHA-512 。 |
properties.sasl.username | |
properties.sasl.password |
有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties.
开头,因此不包括此前缀。
对于使用 SSL 的 SASL/SCRAM,您还需要包含这些 SSL 参数:
- properties.ssl.ca.location
- properties.ssl.certificate.location
- properties.ssl.key.location
- properties.ssl.key.password
以下是创建使用 SASL/SCRAM 认证而不使用 SSL 加密的 Sink 的示例。
CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;
数据类型映射 - RisingWave 和 Debezium JSON
RisingWave 数据类型 | JSON 中的 Schema 类型 | JSON 中的 Schema 名称 |
---|---|---|
boolean | boolean | 不适用 |
smallint | int16 | 不适用 |
integer | int32 | 不适用 |
bigint | int64 | 不适用 |
real | float | 不适用 |
double precision | double | 不适用 |
character varying | string | 不适用 |
bytea | bytes | 不适用 |
numeric | string | 不适用 |
date | int32 | org.apache.kafka.connect.data.Date |
time without time zone | int64 | org.apache.kafka.connect.data.Time |
timestamp | int64 | org.apache.kafka.connect.data.Timestamp |
timestamptz | string | io.debezium.time.ZonedTimestamp |
interval | string | io.debezium.time.Interval |
JSONB | string | io.debezium.data.Json |
struct | string | 不适用 |
array | string | 不适用 |