Skip to main content

通过事件流系统摄取的 CDC 数据

变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送到下游服务的过程。

您可以使用事件流系统(如 Kafka、Pulsar 或 Kinesis)将来自 MySQL、PostgreSQL 和 TiDB 的更改流式传输到 RisingWave。在这种情况下,您将需要一个额外的 CDC 工具,以便在摄取数据流至 RisingWave 时,对数据库的更改进行流式传输,并指定相应的格式。

RisingWave 还提供本地 MySQL 和 PostgreSQL CDC 连接器。使用这些连接器,您可以直接从这些数据库摄取 CDC 数据,而无需设置 Kafka 等其他服务。有关使用本机 CDC 连接器摄取 MySQL 和 PostgreSQL 数据的完整步骤指南,请参阅 从 MySQL 摄取数据从 PostgreSQL 摄取数据。本主题仅介绍使用 RisingWave 从事件流系统摄取 CDC 数据的配置。

要让 RisingWave 摄取 CDC 数据,您必须创建一个具有主键和连接器设置的表(CREATE TABLE)。这与创建标准 source 不同,因为 CDC 数据需要在 RisingWave 中持久化以确保正确性。

RisingWave 接受这些数据格式:

  • Debezium JSON(适用于 MySQL、PostgreSQL 和 SQL Server)

    对于 Debezium JSON(FORMAT DEBEZIUM ENCODE JSON),您可以使用 适用于 MySQL 的 Debezium 连接器适用于 PostgreSQL 的 Debezium 连接器适用于 SQL Server 的 Debezium 连接器 将 CDC 数据转换为 Kafka 或 Pulsar topic 或 Kinesis 数据流。

    请注意,如果要在 RisingWave 中摄取 timestamptimestamptz 类型的数据,上游值必须在 [1973-03-03 09:46:40, 5138-11-16 09:46:40] (UTC) 的范围内。否则,该值可能会被错误解析和摄取,而不会发出警告信息。

  • Debezium Mongo JSON(适用于 MongoDB)

    对于 Debezium Mongo JSON (FORMAT DEBEZIUM_MONGO ENCODE JSON),您可以使用 适用于 MongoDB 的 Debezium 连接器 将 CDC 数据转换为 Kafka topic。

  • Debezium AVRO(适用于 MySQL、PostgreSQL 和 SQL Server)

    对于 Debezium AVRO(FORMAT DEBEZIUM ENCODE AVRO),您可以使用 适用于 MySQL 的 Debezium 连接器适用于 PostgreSQL 的 Debezium 连接器适用于 SQL Server 的 Debezium 连接器 将 CDC 数据转换为 Kafka topic。

  • Maxwell JSON(仅适用于 MySQL)

    对于 Maxwell JSON(FORMAT MAXWELL ENCODE JSON),您需要使用 Maxwell's daemon 将 MySQL 数据变更转换为 Kafka topic 或 Kinesis 数据流。要了解如何配置 MySQL 和部署 Maxwell's daemon,请参阅 快速入门

  • TiCDC Canal JSON(仅适用于 TiDB)

    对于 TiCDC Canal JSON(FORMAT CANAL ENCODE JSON),您可以将 TiCDC 添加到现有的 TiDB 集群,以便将 TiDB 数据变更转换为 Kafka topic。您可能需要在 TiCDC 配置文件中定义 topic 名称。请注意,只有新的更改才会从 TiDB 中捕获。TiCDC 不会捕获目标表中已存在的数据。有关详细信息,请参阅 部署和维护 TiCDC

  • Canal JSON(仅适用于 MySQL)

    对于 Canal JSON(FORMAT CANAL ENCODE JSON),您需要使用 Canal source 连接器 将 MySQL 数据变更转换为 Pulsar topic。

句法

CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type [ PRIMARY KEY ], ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='connector',
connector_parameter='value', ...
)
FORMAT { DEBEZIUM | DEBEZIUM_MONGO | MAXWELL | CANAL | PLAIN }
ENCODE { JSON | AVRO | PROTOBUF | CSV } [( encode properties ... )];

连接器参数

有关连接参数,请参阅相应的数据摄取页面。

示例

Kafka

下面是使用 Kafka 连接器创建表以从 Kafka topic 摄取 CDC 数据的示例。

CREATE TABLE [IF NOT EXISTS] source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kafka',
topic='user_test_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='earliest'
) FORMAT DEBEZIUM ENCODE JSON;

Pulsar

以下是使用 Pulsar 创建表来从 Pulsar topic 摄取 CDC 数据的示例。

CREATE TABLE source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='pulsar',
topic='demo_topic',
service.url='pulsar://localhost:6650/',
admin.url='http://localhost:8080',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000'
)FORMAT DEBEZIUM ENCODE JSON;

Kinesis

以下是使用 Kinesis 创建表来从 Kinesis 数据流中摄取 CDC 数据的示例。

CREATE TABLE source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id'
) FORMAT DEBEZIUM ENCODE JSON;