通过事件流系统摄取的 CDC 数据
变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送到下游服务的过程。
您可以使用事件流系统(如 Kafka、Pulsar 或 Kinesis)将来自 MySQL、PostgreSQL 和 TiDB 的更改流式传输到 RisingWave。在这种情况下,您将需要一个额外的 CDC 工具,以便在摄取数据流至 RisingWave 时,对数据库的更改进行流式传输,并指定相应的格式。
RisingWave 还提供本地 MySQL 和 PostgreSQL CDC 连接器。使用这些连接器,您可以直接从这些数据库摄取 CDC 数据,而无需设置 Kafka 等其他服务。有关使用本机 CDC 连接器摄取 MySQL 和 PostgreSQL 数据的完整步骤指南,请参阅 从 MySQL 摄取数据 和 从 PostgreSQL 摄取数据。本主题仅介绍使用 RisingWave 从事件流系统摄取 CDC 数据的配置。
要让 RisingWave 摄取 CDC 数据,您必须创建一个具有主键和连接器设置的表(CREATE TABLE
)。这与创建标准 source 不同,因为 CDC 数据需要在 RisingWave 中持久化以确保正确性。
RisingWave 接受这些数据格式:
Debezium JSON(适用于 MySQL、PostgreSQL 和 SQL Server)
对于 Debezium JSON(
FORMAT DEBEZIUM ENCODE JSON
),您可以使用 适用于 MySQL 的 Debezium 连接器、适用于 PostgreSQL 的 Debezium 连接器 或 适用于 SQL Server 的 Debezium 连接器 将 CDC 数据转换为 Kafka 或 Pulsar topic 或 Kinesis 数据流。请注意,如果要在 RisingWave 中摄取
timestamp
或timestamptz
类型的数据,上游值必须在[1973-03-03 09:46:40, 5138-11-16 09:46:40] (UTC)
的范围内。否则,该值可能会被错误解析和摄取,而不会发出警告信息。Debezium Mongo JSON(适用于 MongoDB)
对于 Debezium Mongo JSON (
FORMAT DEBEZIUM_MONGO ENCODE JSON
),您可以使用 适用于 MongoDB 的 Debezium 连接器 将 CDC 数据转换为 Kafka topic。Debezium AVRO(适用于 MySQL、PostgreSQL 和 SQL Server)
对于 Debezium AVRO(
FORMAT DEBEZIUM ENCODE AVRO
),您可以使用 适用于 MySQL 的 Debezium 连接器、适用于 PostgreSQL 的 Debezium 连接器 或 适用于 SQL Server 的 Debezium 连接器 将 CDC 数据转换为 Kafka topic。Maxwell JSON(仅适用于 MySQL)
对于 Maxwell JSON(
FORMAT MAXWELL ENCODE JSON
),您需要使用 Maxwell's daemon 将 MySQL 数据变更转换为 Kafka topic 或 Kinesis 数据流。要了解如何配置 MySQL 和部署 Maxwell's daemon,请参阅 快速入门。TiCDC Canal JSON(仅适用于 TiDB)
对于 TiCDC Canal JSON(
FORMAT CANAL ENCODE JSON
),您可以将 TiCDC 添加到现有的 TiDB 集群,以便将 TiDB 数据变更转换为 Kafka topic。您可能需要在 TiCDC 配置文件中定义 topic 名称。请注意,只有新的更改才会从 TiDB 中捕获。TiCDC 不会捕获目标表中已存在的数据。有关详细信息,请参阅 部署和维护 TiCDC。Canal JSON(仅适用于 MySQL)
对于 Canal JSON(
FORMAT CANAL ENCODE JSON
),您需要使用 Canal source 连接器 将 MySQL 数据变更转换为 Pulsar topic。
句法
CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type [ PRIMARY KEY ], ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='connector',
connector_parameter='value', ...
)
FORMAT { DEBEZIUM | DEBEZIUM_MONGO | MAXWELL | CANAL | PLAIN }
ENCODE { JSON | AVRO | PROTOBUF | CSV } [( encode properties ... )];
连接器参数
有关连接参数,请参阅相应的数据摄取页面。
示例
Kafka
下面是使用 Kafka 连接器创建表以从 Kafka topic 摄取 CDC 数据的示例。
- Debezium JSON
- Debezium Mongo JSON
- Debezium AVRO
CREATE TABLE [IF NOT EXISTS] source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kafka',
topic='user_test_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='earliest'
) FORMAT DEBEZIUM ENCODE JSON;
有关此行格式的更多详细信息,请参阅 Debezium Mongo JSON。
CREATE TABLE [IF NOT EXISTS] source_name (
_id BIGINT PRIMARY KEY
payload jsonb
)
WITH (
connector='kafka',
topic='debezium_mongo_json_customers',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
) FORMAT DEBEZIUM_MONGO ENCODE JSON;
CREATE TABLE orders (
order_id INT PRIMARY KEY
)
WITH (
connector = 'kafka',
topic = 'mysql.mydb.orders',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
)
FORMAT DEBEZIUM ENCODE AVRO (
confluent_schema_registry = 'http://localhost:8081'
);
虽然 CREATE TABLE
命令只指定了一列,但上游 MySQL 表中的其他列仍会被导出和包含。
如果上游是 PostgreSQL,PostgreSQL 中的 interval
类型可能与 RisingWave 中的 bigint
或 varchar
不匹配,这取决于 Debezium 连接器设置中的 interval.handing.mode。
Pulsar
以下是使用 Pulsar 创建表来从 Pulsar topic 摄取 CDC 数据的示例。
CREATE TABLE source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='pulsar',
topic='demo_topic',
service.url='pulsar://localhost:6650/',
admin.url='http://localhost:8080',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000'
)FORMAT DEBEZIUM ENCODE JSON;
Kinesis
以下是使用 Kinesis 创建表来从 Kinesis 数据流中摄取 CDC 数据的示例。
CREATE TABLE source_name (
column1 varchar,
column2 integer,
PRIMARY KEY (column1)
)
WITH (
connector='kinesis',
stream='kafka',
aws.region='user_test_topic',
endpoint='172.10.1.1:9090,172.10.1.2:9090',
aws.credentials.session_token='AQoEXAMPLEH4aoAH0gNCAPyJxz4BlCFFxWNE1OPTgk5TthT+FvwqnKwRcOIfrRh3c/L To6UDdyJwOOvEVPvLXCrrrUtdnniCEXAMPLE/IvU1dYUg2RVAJBanLiHb4IgRmpRV3z rkuWJOgQs8IZZaIv2BXIa2R4OlgkBN9bkUDNCJiBeb/AXlzBBko7b15fjrBs2+cTQtp Z3CYWFXG8C5zqx37wnOE49mRl/+OtkIKGO7fAE',
aws.credentials.role.arn='arn:aws-cn:iam::602389639824:role/demo_role',
aws.credentials.role.external_id='demo_external_id'
) FORMAT DEBEZIUM ENCODE JSON;