使用 JDBC 连接器从 RisingWave 导出数据到 MySQL
本文介绍如何使用 JDBC Sink 连接器,将数据从 RisingWave 导出到支持 JDBC 的数据库中。MySQL 是一种常用的关系数据库系统(RDS),具有 JDBC 驱动程序,并且可通过 AWS 作为云数据库轻松设置和维护。我们将向您展示如何配置 MySQL 和 RisingWave 以创建 MySQL Sink。连接到任何支持 JDBC 的数据库时,RisingWave 的配置都是相同的。
支持的 MySQL 版本为 5.7 和 8.0.x。
设置 MySQL 数据库
- AWS RDS
- 自托管 MySQL
在使用 RisingWave 中的原生 MySQL CDC 连接器之前,您需要完成 MySQL 上的几项配置。
在 AWS 上设置 MySQL RDS 实例
登录 AWS 控制台。在服务中搜索 “RDS” 并选择 RDS 面板。
创建一个数据库,将 引擎类型 设置为 MySQL。我们推荐设置用户名和密码或使用其他安全选项。
新实例可用后,点击其面板。
从 连接性 面板中,我们可以找到端点和连接端口信息。
从 MySQL 连接到 RDS 实例
现在我们可以连接到 RDS 实例。确保您的本地机器上安装了 MySQL,并打开了 MySQL Prompt。在连接参数中填写端点、端口和登录凭证。
mysql -h rw-to-mysql.xxxxxx.us-east-1.rds.amazonaws.com -P 3306 -u <username> -p <password>
有关更多登录选项,请参阅 RDS 连接指南。
设置目标表
使用以下查询在 RDS 实例中设置一个数据库和表。
CREATE TABLE test_db.personnel (
id integer,
name varchar(200),
PRIMARY KEY (id)
);
如果创建成功,将返回一条消息。
Query OK, 0 rows affected (0.10 sec)
连接到 MySQL
连接到您的 MySQL 服务器。有关更多详情,请参见 连接到 MySQL 服务器 指南。
设置目标表
使用以下查询在 MySQL 中设置一个数据库和表。
CREATE DATABASE test_db;
USE test_db;
CREATE TABLE personnel (
id integer,
name varchar(200),
PRIMARY KEY (id)
);
设置 RisingWave
安装并启动 RisingWave
要在本地安装并启动 RisingWave,请参阅 快速上手指南。我们推荐以测试模式在本地运行 RisingWave。
从二进制文件运行 RisingWave 的注意事项
如果您打算使用原生 CDC Source 连接器或 JDBC Sink 连接器在本地从二进制文件运行 RisingWave,请确保您的环境中安装了 JDK 11 或更高版本。
创建 Sink
句法
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='jdbc',
field_name = 'field', ...
);
参数
所有 WITH
选项都是必填的。
参数或子句 | 描述 |
---|---|
sink_name | 要创建的 Sink 的名称。 |
sink_from | 指定将从中导出数据的直接来源的子句。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询。 |
AS select_query | 指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句。有关 SELECT 命令的句法和示例,请参见 SELECT。 |
connector | Sink 连接器类型必须为 'jdbc' ,用于 MySQL Sink。 |
jdbc.url | 目标数据库的 JDBC URL,对于驱动程序识别并连接到数据库至关重要。 |
table.name | 目标数据库中的表。 |
type | 数据格式。允许的格式:
upsert :以变更日志流输出数据。 |
primary_key | 如果 type 是 upsert ,则必填。下游表的主键。 |
从 RisingWave 导出数据到 MySQL
创建表和 Sink
要导出到 MySQL,请确保 RisingWave 表和 MySQL 表具有相同的表架构。在 RisingWave 中使用以下查询来创建表和 Sink。
jdbc.url
必须准确。格式根据您使用的是 AWS RDS MySQL 还是自托管的 MySQL 版本而略有不同。如果您的 MySQL 是自托管的,则 jdbc.url
的格式为 jdbc:mysql://127.0.0.1:3306/testdb?user=<username>&password=<password>
。
CREATE TABLE personnel (
id integer,
name varchar,
);
CREATE SINK s_mysql FROM personnel WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://<aws_rds_endpoint>:<port>/test_db?user=<username>&password=<password>',
table.name='personnel',
type = 'upsert',
primary_key = 'id'
);
更新表
使用以下查询插入一些数据。记得使用 FLUSH
命令来提交更新。
INSERT INTO personnel VALUES (1, 'Alice'), (2, 'Bob');
FLUSH;
验证 Sink 连接
数据变化将同步到 MySQL。为了验证更新,连接到 MySQL 并查询表。您对表所做的更改将会反映出来。
SELECT * FROM personnel;
+------+-------+
| id | name |
+------+-------+
| 1 | Alice |
+------+-------+
| 2 | Bob |
+------+-------+
数据类型映射
有关 MySQL 数据类型映射表,请参见 从 MySQL CDC 导入数据 - 数据类型映射表。
请注意,当从 RisingWave 导出到 MySQL 时,数组数据类型将被转换为字符串。只有一维数组可以导出到 MySQL。例如,当 ARRAY['Value 1', 'Value 2']
导出到 MySQL 时,会被转换为字符串 Value 1, Value 2
。