Skip to main content

从 MySQL CDC 摄取数据

变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送给下游服务的过程。

RisingWave 支持从 MySQL 数据库的变更中摄取行级数据(INSERTUPDATEDELETE 操作)。支持 MySQL 的 5.7 和 8.0.x 版本。

您可以通过两种方式从 MySQL 摄取 CDC 数据:

  • 使用 RisingWave 中的原生 MySQL CDC 连接器

    使用此连接器,RisingWave 可直接连接 MySQL 数据库,从 binlog 获取数据,而无需启动其他服务。

    测试版功能

    RisingWave 中的原生 MySQL CDC 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

  • 使用 CDC 工具和消息代理

    您可以使用 CDC 工具,然后使用 Kafka、Pulsar 或 Kinesis 连接器将 CDC 数据发送到 RisingWave。

该内容介绍如何使用原生 MySQL CDC 连接器将 MySQL CDC 数据导入到 RisingWave。有关使用外部 CDC 工具和消息代理的介绍,请参阅 通过事件流系统创建 Source

设置 MySQL

在 RisingWave 中使用原生 MySQL CDC 连接器之前,您需要在 MySQL 上完成几项配置。

要使用 MySQL CDC 功能,我们需要在所有 RisingWave 将从中进行读取的数据库上创建拥有适当权限的 MySQL 用户账户。

创建用户并授予权限

  1. 使用以下语句创建 MySQL 用户。
CREATE USER 'user'@'%' IDENTIFIED BY 'password';
  1. 授予用户适当的权限。
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
  1. 完成权限设置。
FLUSH PRIVILEGES;

启用 binlog

MySQL 复制必须启用 binlog。二进制日志可记录事务更新,以便复制工具传播更改。

  1. 检查 log-bin 是否已启用。
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
  1. 如果是 OFF,请在 MySQL 服务器配置文件 my.cnf 中配置以下属性。重启 MySQL 服务器,配置即可生效。
server-id         = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
  1. 再次检查 log-bin 以确认更改。
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+

更多详细信息,请参阅 设置 MySQL

从二进制文件运行 RisingWave 的注意事项

如果从二进制文件在本地运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中安装了 JDK 11 或更高版本。

使用 RisingWave 中的原生 CDC 连接器创建表

为确保捕获所有数据更改,必须创建表并指定主键。更多详细信息,请参阅 CREATE TABLE 命令。

句法

创建 CDC 表的句法。注意必须指定主键。

CREATE TABLE [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='mysql-cdc',
connector_parameter='value', ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];

创建 CDC Source 的句法。

CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='mysql-cdc',
<field>=<value>, ...
);

连接器参数

下面列出的所有字段都是必填项。请注意,这些参数的值应用单引号括起来。

字段注释
hostname数据库的主机名。
port数据库的端口号。
username数据库的用户名。
password数据库的密码。
database.name数据库的名称。请注意,RisingWave 无法从内置的 MySQL 数据库(如 mysqlsys 等)中读取数据。
table.name要从中摄取数据的表的名称。
server.id若创建共享 Source,则必须填写。数据库客户端的数字 ID。在 MySQL 集群中运行的所有数据库进程中,该 ID 必须是唯一的。如果未指定,RisingWave 将生成一个随机 ID。
transactional可选。指定是否要为即将创建的 CDC 表启用事务。默认值为 'false'。多表事务的共享 CDC Source 也支持此功能。有关详细信息,请参阅 CDC 表内的事务

数据格式

数据采用 Debezium JSON 格式。Debezium 是一款基于日志的 CDC 工具,可以从各种数据库管理系统(如 PostgreSQL、MySQL 和 SQL Server)中捕获行更改,并实时生成结构一致的事件。RisingWave 中的 MySQL CDC 连接器支持 JSON 作为 Debezium 数据的序列化格式。以 mysql-cdc 为 Source 创建表时,无需指定数据格式。

示例

创建单个 CDC 表

下面的示例可在 RisingWave 中创建一个表,从 MySQL 的 orders 表中读取 CDC 数据。连接到 MySQL 中的特定表时,请使用 CREATE TABLE 命令。

CREATE TABLE orders (
order_id int,
order_date bigint,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
table.name = 'orders',
server.id = '5454'
);

使用同一 Source 创建多个 CDC 表

RisingWave 支持创建单个 MySQL Source,允许您从位于同一数据库中的多个表读取 CDC 数据。

使用 CREATE SOURCE 命令和 MySQL CDC 参数创建 CDC Source,连接到上游数据库。数据格式固定为 FORMAT PLAIN ENCODE JSON,无需指定。

CREATE SOURCE mysql_mydb WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);

Source 创建后即可创建多个 CDC 表,从上游数据库的不同表中摄取数据,无需再次指定数据库连接参数。

例如,RisingWave 中的以下 CDC 表从数据库 mydb 中的表 t1 中摄取数据。在关键字 TABLE 后面的 FROM 子句中指定 MySQL 表名时,还必须指定数据库的名称。

CREATE TABLE t1_rw (
v1 int,
v2 int,
PRIMARY KEY(v1)
) FROM mysql_mydb TABLE 'mydb.t1';

您可以在 RisingWave 中创建另一个 CDC 表,该表从同一数据库 mydb 的表 t3 中摄取数据。

CREATE TABLE t3_rw (
v1 INTEGER,
v2 timestamptz,
PRIMARY KEY (v1)
) FROM mysql_mydb TABLE 'mydb.t3';

要检查回填历史数据的进度,可使用 SHOW INTERNAL TABLES 命令找到相应的内部表并进行查询。例如,下面的 SQL 查询显示了名为 orders_rw 的 CDC 表的进度。

SELECT * FROM __internal_orders_rw_4002_streamcdcscan_5002;

-[ RECORD 1 ]-----+---------------------------------------------------------------
split_id | 5001
o_orderkey | 4024320
backfill_finished | f
row_count | 1006080
cdc_offset | {"MySql": {"filename": "binlog.000005", "position": 60946679}}

数据类型映射

下表显示了创建 Source 时应指定的 RisingWave 中的相应数据类型。有关原生 RisingWave 数据类型的详细信息,请参阅 数据类型概述

标有星号的 RisingWave 数据类型表示,虽然没有相应的 RisingWave 数据类型,但摄取的数据仍可作为所列类型进行消费。

MySQL 类型RisingWave 类型
BOOLEAN, BOOLBOOLEAN
BIT(1)BOOLEAN*
BIT(>1)No support
TINYINTSMALLINT
SMALLINT[(M)]SMALLINT
MEDIUMINT[(M)]INTEGER
INT, INTEGER[(M)]INTEGER
BIGINT[(M)]BIGINT
REAL[(M,D)]REAL
FLOAT[(P)]REAL
FLOAT(M,D)DOUBLE PRECISION
DOUBLE[(M,D)]DOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
BINARY[(M)]BYTEA
VARBINARY[(M)]BYTEA
TINYBLOBBYTEA
TINYTEXTCHARACTER VARYING
BLOBBYTEA
TEXTCHARACTER VARYING
MEDIUMBLOBBYTEA
MEDIUMTEXTCHARACTER VARYING
LONGBLOBBYTEA
LONGTEXTBYTEA or CHARACTER VARYING
JSONJSONB
ENUMCHARACTER VARYING*
SETNo support
YEAR[(2|4)]INTEGER
TIMESTAMP[(M)]TIMESTAMPTZ
DATEDATE
TIME[(M)]TIME
DATETIME[(fsp)]
Optional fractional seconds precision (fsp: 0-6). If omitted, the default precision is 0.
TIMESTAMP
NUMERIC[(M[,D])]NUMERIC
DECIMAL[(M[,D])]NUMERIC
GEOMETRY, LINESTRING, POLYGON,
MULTIPOINT, MULTILINESTRING,
MULTIPOLYGON, GEOMETRYCOLLECTION
STRUCT

请注意,MySQL 类型和 RisingWave 类型具体值的范围有所不同。有关详细信息,请参阅下表。

MySQL 类型RisingWave 类型MySQL 范围RisingWave 范围
TIMETIME-838:59:59.000000 to 838:59:59.00000000:00:00 to 23:59:59
DATEDATE1000-01-01 to 9999-12-310001-01-01 to 9999-12-31
DATETIMETIMESTAMP1000-01-01 00:00:00.000000 to 9999-12-31 23:59:59.499991973-03-03 09:46:40 to 5138-11-16 09:46:40
TIMESTAMPTIMESTAMPTZ1970-01-01 00:00:01.000000 to 2038-01-19 03:14:07.4999990001-01-01 00:00:00 to 9999-12-31 23:59:59