从 MySQL CDC 摄取数据
变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送给下游服务的过程。
RisingWave 支持从 MySQL 数据库的变更中摄取行级数据(INSERT
、UPDATE
和 DELETE
操作)。支持 MySQL 的 5.7 和 8.0.x 版本。
您可以通过两种方式从 MySQL 摄取 CDC 数据:
使用 RisingWave 中的原生 MySQL CDC 连接器
使用此连接器,RisingWave 可直接连接 MySQL 数据库,从 binlog 获取数据,而无需启动其他服务。
测试版功能RisingWave 中的原生 MySQL CDC 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
使用 CDC 工具和消息代理
您可以使用 CDC 工具,然后使用 Kafka、Pulsar 或 Kinesis 连接器将 CDC 数据发送到 RisingWave。
该内容介绍如何使用原生 MySQL CDC 连接器将 MySQL CDC 数据导入到 RisingWave。有关使用外部 CDC 工具和消息代理的介绍,请参阅 通过事件流系统创建 Source。
设置 MySQL
在 RisingWave 中使用原生 MySQL CDC 连接器之前,您需要在 MySQL 上完成几项配置。
- Self-hosted
- AWS RDS
要使用 MySQL CDC 功能,我们需要在所有 RisingWave 将从中进行读取的数据库上创建拥有适当权限的 MySQL 用户账户。
创建用户并授予权限
- 使用以下语句创建 MySQL 用户。
CREATE USER 'user'@'%' IDENTIFIED BY 'password';
- 授予用户适当的权限。
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user'@'%';
- 完成权限设置。
FLUSH PRIVILEGES;
启用 binlog
MySQL 复制必须启用 binlog。二进制日志可记录事务更新,以便复制工具传播更改。
- 检查
log-bin
是否已启用。
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | OFF |
+---------------+-------+
- 如果是
OFF
,请在 MySQL 服务器配置文件 my.cnf 中配置以下属性。重启 MySQL 服务器,配置即可生效。
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
- 再次检查
log-bin
以确认更改。
SHOW VARIABLES LIKE 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
更多详细信息,请参阅 设置 MySQL。
如果您的 MySQL 托管在 AWS RDS 上,则配置过程有所不同。我们将使用标准类 MySQL 实例(不带多可用区部署)进行说明。
打开二进制日志,并为 Retention period 选择一个非零值。
为 MySQL 实例创建参数组。我们为运行 MySQL 5.7.x 的实例创建了一个名为 MySQL-CDC 的参数组。
单击 MySQL-CDC 参数组,将 binlog_format 的值改为 ROW,binlog_row_image 的值改为 full。
修改 RDS 实例,并将修改后的参数组应用于数据库。
点击 Continue 并选择 Apply immediately。最后,单击 Modify DB instance 保存更改。记得重启 MySQL 实例。
从二进制文件运行 RisingWave 的注意事项
如果从二进制文件在本地运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中安装了 JDK 11 或更高版本。
使用 RisingWave 中的原生 CDC 连接器创建表
为确保捕获所有数据更改,必须创建表并指定主键。更多详细信息,请参阅 CREATE TABLE
命令。
句法
创建 CDC 表的句法。注意必须指定主键。
CREATE TABLE [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='mysql-cdc',
connector_parameter='value', ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];
创建 CDC Source 的句法。
CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='mysql-cdc',
<field>=<value>, ...
);
连接器参数
下面列出的所有字段都是必填项。请注意,这些参数的值应用单引号括起来。
字段 | 注释 |
---|---|
hostname | 数据库的主机名。 |
port | 数据库的端口号。 |
username | 数据库的用户名。 |
password | 数据库的密码。 |
database.name | 数据库的名称。请注意,RisingWave 无法从内置的 MySQL 数据库(如 mysql 、sys 等)中读取数据。 |
table.name | 要从中摄取数据的表的名称。 |
server.id | 若创建共享 Source,则必须填写。数据库客户端的数字 ID。在 MySQL 集群中运行的所有数据库进程中,该 ID 必须是唯一的。如果未指定,RisingWave 将生成一个随机 ID。 |
transactional | 可选。指定是否要为即将创建的 CDC 表启用事务。默认值为 'false' 。多表事务的共享 CDC Source 也支持此功能。有关详细信息,请参阅 CDC 表内的事务。 |
数据格式
数据采用 Debezium JSON 格式。Debezium 是一款基于日志的 CDC 工具,可以从各种数据库管理系统(如 PostgreSQL、MySQL 和 SQL Server)中捕获行更改,并实时生成结构一致的事件。RisingWave 中的 MySQL CDC 连接器支持 JSON 作为 Debezium 数据的序列化格式。以 mysql-cdc
为 Source 创建表时,无需指定数据格式。
示例
创建单个 CDC 表
下面的示例可在 RisingWave 中创建一个表,从 MySQL 的 orders
表中读取 CDC 数据。连接到 MySQL 中的特定表时,请使用 CREATE TABLE
命令。
CREATE TABLE orders (
order_id int,
order_date bigint,
customer_name string,
price decimal,
product_id int,
order_status smallint,
PRIMARY KEY (order_id)
) WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '3306',
username = 'root',
password = '123456',
database.name = 'mydb',
table.name = 'orders',
server.id = '5454'
);
使用同一 Source 创建多个 CDC 表
RisingWave 支持创建单个 MySQL Source,允许您从位于同一数据库中的多个表读取 CDC 数据。
使用 CREATE SOURCE
命令和 MySQL CDC 参数创建 CDC Source,连接到上游数据库。数据格式固定为 FORMAT PLAIN ENCODE JSON
,无需指定。
CREATE SOURCE mysql_mydb WITH (
connector = 'mysql-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
server.id = 5888
);
Source 创建后即可创建多个 CDC 表,从上游数据库的不同表中摄取数据,无需再次指定数据库连接参数。
例如,RisingWave 中的以下 CDC 表从数据库 mydb
中的表 t1
中摄取数据。在关键字 TABLE
后面的 FROM
子句中指定 MySQL 表名时,还必须指定数据库的名称。
CREATE TABLE t1_rw (
v1 int,
v2 int,
PRIMARY KEY(v1)
) FROM mysql_mydb TABLE 'mydb.t1';
您可以在 RisingWave 中创建另一个 CDC 表,该表从同一数据库 mydb
的表 t3
中摄取数据。
CREATE TABLE t3_rw (
v1 INTEGER,
v2 timestamptz,
PRIMARY KEY (v1)
) FROM mysql_mydb TABLE 'mydb.t3';
要检查回填历史数据的进度,可使用 SHOW INTERNAL TABLES
命令找到相应的内部表并进行查询。例如,下面的 SQL 查询显示了名为 orders_rw
的 CDC 表的进度。
SELECT * FROM __internal_orders_rw_4002_streamcdcscan_5002;
-[ RECORD 1 ]-----+---------------------------------------------------------------
split_id | 5001
o_orderkey | 4024320
backfill_finished | f
row_count | 1006080
cdc_offset | {"MySql": {"filename": "binlog.000005", "position": 60946679}}
数据类型映射
下表显示了创建 Source 时应指定的 RisingWave 中的相应数据类型。有关原生 RisingWave 数据类型的详细信息,请参阅 数据类型概述。
标有星号的 RisingWave 数据类型表示,虽然没有相应的 RisingWave 数据类型,但摄取的数据仍可作为所列类型进行消费。
MySQL 类型 | RisingWave 类型 |
---|---|
BOOLEAN, BOOL | BOOLEAN |
BIT(1) | BOOLEAN* |
BIT(>1) | No support |
TINYINT | SMALLINT |
SMALLINT[(M)] | SMALLINT |
MEDIUMINT[(M)] | INTEGER |
INT, INTEGER[(M)] | INTEGER |
BIGINT[(M)] | BIGINT |
REAL[(M,D)] | REAL |
FLOAT[(P)] | REAL |
FLOAT(M,D) | DOUBLE PRECISION |
DOUBLE[(M,D)] | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
BINARY[(M)] | BYTEA |
VARBINARY[(M)] | BYTEA |
TINYBLOB | BYTEA |
TINYTEXT | CHARACTER VARYING |
BLOB | BYTEA |
TEXT | CHARACTER VARYING |
MEDIUMBLOB | BYTEA |
MEDIUMTEXT | CHARACTER VARYING |
LONGBLOB | BYTEA |
LONGTEXT | BYTEA or CHARACTER VARYING |
JSON | JSONB |
ENUM | CHARACTER VARYING* |
SET | No support |
YEAR[(2|4)] | INTEGER |
TIMESTAMP[(M)] | TIMESTAMPTZ |
DATE | DATE |
TIME[(M)] | TIME |
DATETIME[(fsp)] Optional fractional seconds precision (fsp: 0-6). If omitted, the default precision is 0. | TIMESTAMP |
NUMERIC[(M[,D])] | NUMERIC |
DECIMAL[(M[,D])] | NUMERIC |
GEOMETRY, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, GEOMETRYCOLLECTION | STRUCT |
请注意,MySQL 类型和 RisingWave 类型具体值的范围有所不同。有关详细信息,请参阅下表。
MySQL 类型 | RisingWave 类型 | MySQL 范围 | RisingWave 范围 |
---|---|---|---|
TIME | TIME | -838:59:59.000000 to 838:59:59.000000 | 00:00:00 to 23:59:59 |
DATE | DATE | 1000-01-01 to 9999-12-31 | 0001-01-01 to 9999-12-31 |
DATETIME | TIMESTAMP | 1000-01-01 00:00:00.000000 to 9999-12-31 23:59:59.49999 | 1973-03-03 09:46:40 to 5138-11-16 09:46:40 |
TIMESTAMP | TIMESTAMPTZ | 1970-01-01 00:00:01.000000 to 2038-01-19 03:14:07.499999 | 0001-01-01 00:00:00 to 9999-12-31 23:59:59 |