Skip to main content

从 RisingWave 导出数据到 Kafka

本文描述了如何将数据从 RisingWave 导出到 Kafka Broker,并如何指定安全(加密和认证)设置。

Sink 是一种外部目标,您可以将数据导出发送至 Sink。您可使用 CREATE SINK 语句从物化视图或表来创建一个 Sink。RisingWave 只支持以非事务模式写入消息。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='kafka',
connector_parameter = 'value', ...
)
FORMAT data_format ENCODE data_encode [ (
format_parameter = 'value'
) ]
;
note

名称和未加引号的标识符不区分大小写。因此,您必须为这些字段加上双引号,以便区分大小写。

基本参数

除另外说明,所有 WITH 选项都是必填的。

参数或子句描述
sink_name要创建的 Sink 名称。
sink_from指定将从中导出数据的直接来源。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询。
AS select_query指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句。参见 SELECT 了解 SELECT 命令的句法和示例。
connectorSink 连接器类型必须为 'kafka' 用于创建 Kafka Sink。
properties.bootstrap.serverKafka Broker 的地址。格式:‘ip:port’。如果有多个 Broker,用逗号分隔。
topicKafka Topic的地址。一个 Sink 只能对应一个 Topic。
primary_key条件性。Sink 的主键。使用 ',' 分隔主键列。创建 PLAIN Sink 时此字段是可选的,但创建 DEBEZIUMUPSERT Sink 时为必填项。

额外的 Kafka 参数

在 RisingWave 中创建 Kafka Sink 时,您可以指定以下特定于 Kafka 的参数。要设置参数,请在 WITH 选项中添加 Kafka 参数在 RisingWave 中的对应参数。有关这些参数的更多详细信息,请参阅 配置属性

Kafka 参数名称RisingWave 参数名称类型
allow.auto.create.topicsproperties.allow.auto.create.topicsbool
batch.num.messagesproperties.batch.num.messagesint
batch.sizeproperties.batch.sizeint
client.idproperties.client.idstring
enable.idempotenceproperties.enable.idempotencebool
max.in.flight.requests.per.connectionproperties.max.in.flight.requests.per.connectionint
message.max.bytesproperties.message.max.bytesint
message.send.max.retriesproperties.message.send.max.retriesint
message.timeout.msproperties.message.timeout.msint
queue.buffering.max.kbytesproperties.queue.buffering.max.kbytesint
queue.buffering.max.messagesproperties.queue.buffering.max.messagesint
queue.buffering.max.msproperties.queue.buffering.max.msfloat
retry.backoff.msproperties.retry.backoff.msint
receive.message.max.bytesproperties.receive.message.max.bytesint
ssl.endpoint.identification.algorithmproperties.ssl.endpoint.identification.algorithmstr
note

properties.ssl.endpoint.identification.algorithm 设置为 none 以绕过 CA 证书的验证并解决 SSL 握手失败。此参数可以设置为 httpsnone。默认为 https

Sink 参数

字段注释
data_format数据格式。允许的格式:
  • PLAIN: 以插入操作输出数据。
  • DEBEZIUM: 以 Debezium 格式输出变更数据捕获(CDC)日志。
  • UPSERT: 作为变更日志流输出数据。在这种情况下,必须指定 primary_key
要了解创建 UPSERT Sink 时何时定义主键,请参见数据导出概览
data_encode数据编码。支持的编码:JSONAVROPROTOBUF。对于 AVRO 编码,仅支持 UPSERT AVRO Sink。对于 PROTOBUF 编码,仅支持 PLAIN PROTOBUF Sink。
force_append_only如果为 true,即使不能做到也会强制 Sink 为 PLAIN(即 append-only)。
timestamptz.handling.mode控制 timestamptz 输出格式。此参数特别适用于使用 JSON 编码的 Append-only 或 Upsert Sink。
- 如果省略,timestamptz 的输出格式为 2023-11-11T18:30:09.453000Z,包括 UTC 后缀 Z
- 当指定 utc_without_suffix 时,格式改为 2023-11-11 18:30:09.453000
schemas.enable仅对 Upsert JSON Sink 可配置。对于 Upsert JSON Sink,默认值为 false,对于 Debezium JSON Sink,默认值为 true。如果为 true,RisingWave 将数据连同 Schema 一起导出到 Kafka Sink。请注意,这并不是指包含 JSON Schema 的 Schema 注册表,而是使用 Kafka Connect 定义的 Schema 格式。

Avro 特定参数

创建 upsert Avro Sink 时,可以在 FORMAT UPSERT ENCODE AVRO 后使用以下选项。

字段注释
schema.registry必填。 Schema 注册表的地址。
schema.registry.username可选。用于访问 Schema 注册表的用户名。
schema.registry.password可选。与用户名关联的密码。
schema.registry.name.strategy可选。接受的选项包括 topic_name_strategy(默认)、record_name_strategytopic_record_name_strategy
key.message如果 schema.registry.name.strategy 设置为 record_name_strategytopic_record_name_strategy,则必填。
message如果 schema.registry.name.strategy 设置为 record_name_strategytopic_record_name_strategy,则必填。

句法:

FORMAT UPSERT
ENCODE AVRO (
schema.registry = 'schema_registry_url',
[schema.registry.username = 'username'],
[schema.registry.password = 'password'],
[schema.registry.name.strategy = 'topic_name_strategy'],
[key.message = 'test_key'],
[message = 'main_message',]
)

Protobuf 特定参数

创建 Append-only Protobuf Sink 时,可以在 FORMAT PLAIN ENCODE PROTOBUF 后使用以下选项。

字段注释
message必填。 Schema 定义中主消息的消息名称。
schema.location必填。 Schema 位置。可以是 file://http://https:// 格式。
note

不建议在生产环境中使用 file:// 格式。如果使用,它需要对元数据和 Compute 节点都可用。

句法:

FORMAT PLAIN
ENCODE PROTOBUF (
message = 'main_message',
schema.location = 'location'
)

示例

通过 SELECT 整个物化视图创建 Sink。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;

通过设置 properties.message.max.bytes 为 2000 来创建带有 Kafka 配置 message.max.bytes 设置为 2000 的 Sink。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test',
properties.message.max.bytes = 2000
)
FORMAT PLAIN ENCODE JSON;

通过选择 taxi_trips 的平均 distanceduration 创建 Sink。

taxi_trips 的 Schema 如下:

{
"id": VARCHAR,
"distance": DOUBLE PRECISION,
"duration": DOUBLE PRECISION,
"fare": DOUBLE PRECISION
}

表可能看起来像这样:

 id | distance | duration |   city
----+----------+----------+----------
1 | 16 | 23 | Dallas
2 | 23 | 9 | New York
3 | 6 | 15 | Chicago
4 | 9 | 35 | New York
CREATE SINK sink2 AS
SELECT
avg(distance) as avg_distance,
avg(duration) as avg_duration
FROM taxi_trips
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;

使用 VPC 连接创建 Sink

如果您的 Kafka Sink 服务位于与 RisingWave 不同的 VPC 中,请使用 AWS PrivateLink 或 GCP Private Service Connect 建立安全且直接的连接。有关如何设置 AWS PrivateLink 连接的详细信息,请参见创建 AWS PrivateLink 连接

要使用 VPC 连接创建 Kafka Sink,在 CREATE SINK 语句的 WITH 部分指定以下参数。

参数注释
privatelink.targets对应于 Kafka Broker 的 PrivateLink 目标。目标应以 JSON 格式提供。请注意,列出的每个目标对应于 properties.bootstrap.server 字段中指定的每个 Broker。如果顺序不正确,将会有连接问题。
privatelink.endpointVPC 端点的 DNS 名称。
connection.name连接的名称,来自使用 CREATE CONNECTION 语句创建的连接。如果您已经使用 privatelink.endpoint 配置了 VPC 端点(推荐),则省略此参数。

以下是使用 PrivateLink 连接创建 Kafka Sink 的示例。注意 {"port": 8001} 对应于 Broker ip1:9092{"port": 8002} 对应于 Broker ip2:9092

CREATE SINK sink2 FROM mv2
WITH (
connector='kafka',
properties.bootstrap.server='b-1.xxx.amazonaws.com:9092,b-2.test.xxx.amazonaws.com:9092',
topic='msk_topic',
privatelink.endpoint='10.148.0.4',
privatelink.targets = '[{"port": 8001}, {"port": 8002}]'
)
FORMAT PLAIN ENCODE JSON (
force_append_only='true'
);

TLS/SSL 加密和 SASL 认证

RisingWave 可以将数据导出到使用 传输层安全性(TLS) 加密和/或通过 SASL 进行认证的 Kafka。

安全套接字层 (SSL) 是传输层安全性 (TLS) 的前身,自 2015 年 6 月以来已被弃用。出于历史原因,配置和代码中使用了 SSL 而不是 TLS

简单认证和安全层 (SASL) 是一种用于 Internet 协议中的认证和数据安全的框架。

RisingWave 支持这些 SASL 认证机制:

  • SASL/PLAIN
  • SASL/SCRAM

SSL 加密可以与 SASL 认证机制同时使用。

要了解如何在 Kafka 中启用 SSL 加密和 SASL 认证,包括如何生成密钥和证书,请参阅 Confluent 的 安全教程

您需要在 CREATE SINK 语句的 WITH 部分指定加密和认证参数。

仅 SSL 无 SASL

要将数据与 SSL 加密但无 SASL 认证导出,请在 CREATE SINK 语句的 WITH 部分指定这些参数。

参数注释
properties.security.protocol设置为 SSL
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
note

有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties. 开头,因此不包括此前缀。

以下是创建仅使用 SSL 加密而不使用 SASL 认证的 Sink 的示例。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;

SASL/PLAIN

参数注释
properties.security.protocol对于不使用 SSL 的 SASL/PLAIN,设置为 SASL_PLAINTEXT。对于使用 SSL 的 SASL/PLAIN,设置为 SASL_SSL
properties.sasl.mechanism设置为 PLAIN
properties.sasl.username
properties.sasl.password
note

有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties. 开头,因此不包括此前缀。

对于使用 SSL 的 SASL/PLAIN,您需要包含这些 SSL 参数:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

以下是创建使用 SASL/PLAIN 认证而不使用 SSL 加密的 Sink 的示例。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;

这是创建使用 SASL/PLAIN 认证和 SSL 加密的 Sink 的示例。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
)
FORMAT PLAIN ENCODE JSON;

SASL/SCRAM

参数注释
properties.security.protocol对于不使用 SSL 的 SASL/SCRAM,设置为 SASL_PLAINTEXT。对于使用 SSL 的 SASL/SCRAM,设置为 SASL_SSL
properties.sasl.mechanism根据使用的加密方法设置为 SCRAM-SHA-256SCRAM-SHA-512
properties.sasl.username
properties.sasl.password
note

有关参数的定义,请参见 librdkafka 属性列表。请注意,列表中的参数假设所有参数都以 properties. 开头,因此不包括此前缀。

对于使用 SSL 的 SASL/SCRAM,您还需要包含这些 SSL 参数:

  • properties.ssl.ca.location
  • properties.ssl.certificate.location
  • properties.ssl.key.location
  • properties.ssl.key.password

以下是创建使用 SASL/SCRAM 认证而不使用 SSL 加密的 Sink 的示例。

CREATE SINK sink1 FROM mv1
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
)
FORMAT PLAIN ENCODE JSON;

数据类型映射 - RisingWave 和 Debezium JSON

RisingWave 数据类型JSON 中的 Schema 类型JSON 中的 Schema 名称
booleanboolean不适用
smallintint16不适用
integerint32不适用
bigintint64不适用
realfloat不适用
double precisiondouble不适用
character varyingstring不适用
byteabytes不适用
numericstring不适用
dateint32org.apache.kafka.connect.data.Date
time without time zoneint64org.apache.kafka.connect.data.Time
timestampint64org.apache.kafka.connect.data.Timestamp
timestamptzstringio.debezium.time.ZonedTimestamp
intervalstringio.debezium.time.Interval
JSONBstringio.debezium.data.Json
structstring不适用
arraystring不适用