从 Kafka 摄取数据
本文介绍如何将 RisingWave 连接到要从中接收数据的 Kafka 代理,以及如何指定数据格式、Schema 和安全(加密和身份验证)设置。
Source 是 RisingWave 可以从中读取数据的资源。可以使用 CREATE SOURCE
命令在 RisingWave 中创建 Source。创建 Source 时,可以通过使用 CREATE TABLE
命令并指定连接设置和数据格式,在 RisingWave 中持久化 Source 中的数据。
无论数据是否在 RisingWave 中持久化,您都可以创建物化视图来进行分析或数据转换。
RisingWave 支持精确一次语义,只有在相关事务已提交的情况下才读取事务消息。这是 RisingWave 的默认行为,不可配置。
句法
CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] source_name
[ schema_definition ]
WITH (
connector='kafka',
connector_parameter='value', ...
)
FORMAT data_format ENCODE data_encode (
message = 'message',
schema.location = 'location' | schema.registry = 'schema_registry_url'
);
schema_definition:
(
column_name data_type [ PRIMARY KEY ], ...
[ PRIMARY KEY ( column_name, ... ) ]
)
对于 Avro 和 Protobuf 数据,请勿在 CREATE SOURCE
语句中指定 schema_definition
。Schema 应在网络位置或 ROW SCHEMA LOCATION
部分的 Confluent Schema Registry 链接中提供。
RisingWave 对表执行主键约束检查,但不对 Source 执行该检查。如果需要执行检查,请创建表。
对于带有主键约束的表,如果新数据带有现有键,则新数据将覆盖现有数据。
连接器参数
字段 | 注释 |
---|---|
topic | 必填。Kafka Topic 的地址。一个 Source 只能对应一个 Topic。 |
properties.bootstrap.server | 必填。Kafka 代理的地址。格式:'ip:port,ip:port' 。 |
scan.startup.mode | 可选。RisingWave 用来消费数据的偏移模式。支持的两种模式是 earliest (最早的偏移)和 latest (最新的偏移)。如果未指定,将使用默认值 earliest 。 |
scan.startup.timestamp.millis | 可选。RisingWave 将从指定的 UNIX 时间戳(毫秒)开始消费数据。如果指定了该字段,将忽略 scan.startup.mode 的值。 |
properties.sync.call.timeout | 可选。指定超时时间。默认超时时间为 5 秒。 |
properties.client.id | 可选。与 Kafka 客户端关联的客户端 ID。 |
其他参数
字段 | 注释 |
---|---|
data_format | 数据格式。支持的格式:DEBEZIUM 、MAXWELL 、CANAL 、UPSERT 、PLAIN 。 |
data_encode | 数据编码。支持的编码:JSON 、AVRO 、PROTOBUF 、CSV 。 |
message | Schema 定义中主要消息的名称。对于 Protobuf 数据,是必需的。 |
location | Schema 文件的 Web 位置,格式为 http://... 、https://... 或 S3://... 。对于 Avro 和 Protobuf 数据,必须指定 Schema 位置或 Schema Registry,但不能同时指定两者。 |
schema.registry | Confluent Schema Registry URL。示例:http://127.0.0.1:8081 。对于 Avro 或 Protobuf 数据,必须指定 Schema 位置或 Confluent Schema Registry,但不能同时指定两者。 |
schema.registry.username | 有条件。Schema Registry 的用户名。必须与 schema.registry.password 一起指定。 |
schema.registry.password | 有条件。Schema Registry 的密码。必须与 schema.registry.username 一起指定。 |
其他 Kafka 参数
在 RisingWave 中创建 Source 时,可以指定以下 Kafka 参数。要设置参数,请在 WITH options
下添加 Kafka 参数的 RisingWave 等效项。有关这些参数的用法示例,请参阅 JSON 示例。有关这些参数的其他详细信息,请参阅 配置属性。
Kafka 参数名 | RisingWave 参数名 | 类型 |
---|---|---|
enable.auto.commit | properties.enable.auto.commit | boolean |
fetch.max.bytes | properties.fetch.max.bytes | int |
fetch.queue.backoff.ms | properties.fetch.queue.backoff.ms | int |
fetch.wait.max.ms | properties.fetch.wait.max.ms | int |
message.max.bytes | properties.message.max.bytes | int |
queued.max.messages.kbytes | properties.queued.max.messages.kbytes | int |
queued.min.messages | properties.queued.min.messages | int |
receive.message.max.bytes | properties.receive.message.max.bytes | int |
ssl.endpoint.identification.algorithm | properties.ssl.endpoint.identification.algorithm | str |
将 properties.ssl.endpoint.identification.algorithm
设置为 none
可绕过 CA 证书验证并解决 SSL 握手失败问题。该参数可设置为 https
或 none
。默认为 https
。
示例
以下是将 RisingWave 连接到 Kafka 代理以从单个 Topic 中读取数据的示例。
RisingWave 支持读取经过 zstd 压缩的消息。无需额外配置。
- Avro
- Upsert Avro
- JSON
- Upsert JSON
- Protobuf
- CSV
- Bytes
CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE AVRO (
message = 'message_name',
schema.registry = 'http://127.0.0.1:8081'
);
CREATE TABLE IF NOT EXISTS source_abc
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test_topic'
)
FORMAT UPSERT ENCODE AVRO (
schema.registry = 'http://127.0.0.1:8081',
schema.registry.username='your_schema_registry_username',
schema.registry.password='your_schema_registry_password'
);
CREATE SOURCE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE JSON;
创建 Source 时,分别使用 properties.queued.min.messages
和 properties.queued.max.messages.kbytes
指定额外的 Kafka 参数 queued.min.messages
和 queued.max.messages.kbytes
。
CREATE SOURCE s1 (v1 int, v2 varchar) with (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest',
properties.queued.min.messages = 10000,
properties.queued.max.messages.kbytes = 65536
) FORMAT PLAIN ENCODE JSON;
CREATE TABLE IF NOT EXISTS source_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='t1'
) FORMAT UPSERT ENCODE JSON;
CREATE SOURCE IF NOT EXISTS source_abc
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp.millis='140000000'
) FORMAT PLAIN ENCODE PROTOBUF (
message = 'package.message_name',
location = 'https://demo_bucket_name.s3-us-west-2.amazonaws.com/demo.proto'
);
CREATE TABLE s0 (v1 int, v2 varchar)
WITH (
connector = 'kafka',
topic = 'kafka_csv_topic',
properties.bootstrap.server = '127.0.0.1:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE CSV (
without_header = 'true',
delimiter = ','
);
使用 Kafka 连接器创建表时,不支持 CSV 表头。请在编码参数中添加
without_header
选项。delimiter
选项指定 CSV 数据中使用的分隔符。
CREATE SOURCE t1 (id bytea)
WITH (
connector='kafka',
topic='topic1',
properties.bootstrap.server='localhost:9093',
) FORMAT PLAIN ENCODE BYTES;
查询 Kafka 时间戳
对于创建的每个 Kafka Source,还将存在虚拟列 _rw_kafka_timestamp
。该列包含 Kafka 消息的时间戳。
可以将该列包含在视图或物化视图中,以显示 Kafka 时间戳。下面是一个示例。
CREATE MATERIALIZED VIEW v1 AS
SELECT _rw_kafka_timestamp, col1
FROM source_name;
如果直接从 Source 查询,可以使用 _rw_kafka_timestamp
来过滤特定时间段内发送的消息。例如,下面的查询只选择过去 10 分钟内发送的消息。
SELECT * FROM source_name
WHERE _rw_kafka_timestamp > now() - interval '10 minute';
从位置读取 Schema
RisingWave 支持从网络位置(http://...
、https://...
或 S3://...
格式)读取 Schema,或从 Confluent Schema Registry 中读取 Kafka 数据(Avro 或 Protobuf 格式)的 Schema。
对于 Protobuf 数据,如果指定了 Schema 位置,则 Schema 文件必须是 FileDescriptorSet
,可以使用以下命令从 .proto
文件编译:
protoc -I=$include_path --include_imports --descriptor_set_out=schema.pb schema.proto
要指定 Schema 位置,请在 CREATE SOURCE
语句中添加该子句。
ENCODE data_encode (
schema.location = 'location'
)
如果还需要定义主键,请使用表约束句法。
CREATE TABLE table1 (PRIMARY KEY(id))
从 Schema Registry 读取 Schema
Confluent Schema Registry 可为元数据提供服务层。它提供了一个 RESTful 接口,用于存储和检索 Schema。
RisingWave 支持从 Schema Registry 读取 Schema。当发出 CREATE SOURCE
语句时,将使用 TopicNameStrategy
策略从指定的 Schema Registry 检索最新 Schema。然后,RisingWave 中的 Schema 解析器将自动确定要在 Source 中使用的列和数据类型。
要指定 Schema Registry,请在 CREATE SOURCE
语句中添加该子句。
ENCODE data_encode (
schema.registry = 'schema_registry_url'
)
要了解有关 Confluent Schema Registry 以及如何构建 Schema Registry 的更多信息,请参阅 Confluent Schema Registry 文档。
如果还需要定义主键,请使用表约束句法。
CREATE TABLE table1 (PRIMARY KEY(id))
Schema 升级
根据为 Schema Registry 配置的兼容性类型,允许在不将 Schema 更改为不同版本的情况下进行一些更改。在这种情况下,RisingWave 将继续使用原始 Schema 定义。若要在 RisingWave 中使用更新版本的 Writer Schema,则需要删除并重新创建 Source。
要了解 Schema Registry 的兼容性类型以及允许的更改,请参阅 兼容性类型。
创建带有 VPC 连接的 Source
如果您的 Kafka Source 服务与 RisingWave 位于不同的 VPC,请使用 AWS PrivateLink 建立安全、直接的连接。有关如何创建 AWS PrivateLink 连接的详细信息,请参阅 创建 AWS PrivateLink 连接。
要创建带有 VPC 连接的 Kafka Source,请在 CREATE SOURCE
或 CREATE TABLE
语句的 WITH 部分指定以下参数。
参数 | 注释 |
---|---|
privatelink.targets | 与 Kafka Broker 相对应的 PrivateLink 目标。目标应为 JSON 格式。请注意,列出的每个目标都与 properties.bootstrap.server 字段中指定的每个 Broker 相对应。如果顺序不正确,就会出现连接问题。 |
privatelink.endpoint | VPC 端点的 DNS 名称。 |
connection.name | 连接的名称。 只有在使用 CREATE CONNECTION 语句创建的连接时,才应包含此参数。如果是使用 privatelink.endpoint (推荐)配置 VPC 端点,则省略此参数。 |
下面是使用 PrivateLink 连接创建 Kafka Source 的示例。请注意,{"port": 9094}
对应 Broker broker1-endpoint
,{"port": 9095}
对应 Broker broker2-endpoint
,{"port": 9096}
对应 Broker broker3-endpoint
。
CREATE TABLE IF NOT EXISTS crypto_source (
product_id VARCHAR,
price NUMERIC,
open_24h NUMERIC,
volume_24h NUMERIC,
low_24h NUMERIC,
high_24h NUMERIC,
volume_30d NUMERIC,
best_bid NUMERIC,
best_ask NUMERIC,
side VARCHAR,
time timestamp,
trade_id bigint,
)
WITH (
connector='kafka',
topic='crypto',
privatelink.endpoint='10.148.0.4',
privatelink.targets='[{"port": 9094}, {"port": 9095}, {"port": 9096}]',
properties.bootstrap.server='broker1-endpoint,broker2-endpoint,broker3-endpoint',
scan.startup.mode='latest'
) FORMAT PLAIN ENCODE JSON;
TLS/SSL 加密和 SASL 身份验证
RisingWave 可以读取使用传输层安全性协议(TLS)加密和/或使用 SASL 进行身份验证的 Kafka 数据。
安全套接字层(SSL)是传输层安全性协议(TLS)的前身,自 2015 年 6 月起已被弃用。由于历史原因,配置和代码中使用 SSL
而不是 TLS
。
简单身份验证和安全层(SASL)是互联网协议中用于身份验证和数据安全性的框架。
RisingWave 支持以下 SASL 身份验证机制:
SASL/PLAIN
SASL/SCRAM
SSL 加密可与 SASL 身份验证机制同时使用。
要了解如何在 Kafka 中启用 SSL 加密和 SASL 身份验证,包括如何生成密钥和证书,请参阅 Confluent 的 安全教程。
你需要在 CREATE SOURCE
语句的 WITH 部分指定加密和身份验证参数。
不使用 SASL 的 SSL
要读取使用 SSL 加密但不使用 SASL 身份验证的数据,请在 CREATE SOURCE
语句的 WITH 部分指定以下参数。
参数 | 注释 |
---|---|
properties.security.protocol | 设置为 SSL 。 |
properties.ssl.ca.location | |
properties.ssl.certificate.location | |
properties.ssl.key.location | |
properties.ssl.key.password |
有关参数的定义,请参阅 librdkafka 属性列表。请注意,列表中的参数假定所有参数都以 properties.
开头,因此不包含此前缀。
下面是创建使用 SSL 加密但不使用 SASL 身份验证的表的示例。
CREATE TABLE IF NOT EXISTS table_1 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.security.protocol='SSL',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;
SASL/PLAIN
参数 | 注释 |
---|---|
properties.security.protocol | 对于不使用 SSL 的 SASL/PLAIN,请设置为 SASL_PLAINTEXT 。对于使用 SSL 的 SASL/PLAIN,请设置为 SASL_SSL 。 |
properties.sasl.mechanism | 设置为 PLAIN 。 |
properties.sasl.username | |
properties.sasl.password |
有关参数的定义,请参阅 librdkafka 属性列表。请注意,列表中的参数假定所有参数都以 properties.
开头,因此不包含此前缀。
对于使用 SSL 的 SASL/PLAIN,需要包含以下 SSL 参数:
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
下面是创建使用 SASL/PLAIN 进行身份验证但不使用 SSL 加密的 Source 的示例。
CREATE SOURCE IF NOT EXISTS source_2 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;
下面是创建使用 SASL/PLAIN 进行身份验证且使用 SSL 加密的 Source 的示例。
CREATE SOURCE IF NOT EXISTS source_3 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='PLAIN',
properties.security.protocol='SASL_SSL',
properties.sasl.username='admin',
properties.sasl.password='admin-secret',
properties.ssl.ca.location='/home/ubuntu/kafka/secrets/ca-cert',
properties.ssl.certificate.location='/home/ubuntu/kafka/secrets/client_risingwave_client.pem',
properties.ssl.key.location='/home/ubuntu/kafka/secrets/client_risingwave_client.key',
properties.ssl.key.password='abcdefgh'
) FORMAT PLAIN ENCODE JSON;
SASL/SCRAM
参数 | 注释 |
---|---|
properties.security.protocol | 对于不使用 SSL 的 SASL/SCRAM,请设置为 SASL_PLAINTEXT 。对于使用 SSL 的 SASL/SRAM,请设置为 SASL_SSL 。 |
properties.sasl.mechanism | 根据使用的加密方法设置为 SCRAM-SHA-256 或 SCRAM-SHA-512 。 |
properties.sasl.username | |
properties.sasl.password |
有关参数的定义,请参阅 librdkafka 属性列表。请注意,列表中的参数假定所有参数都以 properties.
开头,因此不包含此前缀。
对于使用 SSL 的 SASL/SCRAM,还需要包含以下 SSL 参数:
properties.ssl.ca.location
properties.ssl.certificate.location
properties.ssl.key.location
properties.ssl.key.password
下面是创建使用 SASL/PLAIN 进行身份验证但不使用 SSL 加密的表的示例。
CREATE TABLE IF NOT EXISTS table_4 (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='quickstart-events',
properties.bootstrap.server='localhost:9093',
scan.startup.mode='earliest',
properties.sasl.mechanism='SCRAM-SHA-256',
properties.security.protocol='SASL_PLAINTEXT',
properties.sasl.username='admin',
properties.sasl.password='admin-secret'
) FORMAT PLAIN ENCODE JSON;