Skip to main content

使用 JDBC 连接器从 RisingWave 导出数据到 MySQL

本文介绍如何使用 JDBC Sink 连接器,将数据从 RisingWave 导出到支持 JDBC 的数据库中。MySQL 是一种常用的关系数据库系统(RDS),具有 JDBC 驱动程序,并且可通过 AWS 作为云数据库轻松设置和维护。我们将向您展示如何配置 MySQL 和 RisingWave 以创建 MySQL Sink。连接到任何支持 JDBC 的数据库时,RisingWave 的配置都是相同的。

note

支持的 MySQL 版本为 5.7 和 8.0.x。

设置 MySQL 数据库

在使用 RisingWave 中的原生 MySQL CDC 连接器之前,您需要完成 MySQL 上的几项配置。

在 AWS 上设置 MySQL RDS 实例

  1. 登录 AWS 控制台。在服务中搜索 “RDS” 并选择 RDS 面板。

    Search for RDS
  2. 创建一个数据库,将 引擎类型 设置为 MySQL。我们推荐设置用户名和密码或使用其他安全选项。

    Configurations for setting up a MySQL RDS
  3. 新实例可用后,点击其面板。

    MySQL instance panel
  4. 连接性 面板中,我们可以找到端点和连接端口信息。

    Endpoint and port information

从 MySQL 连接到 RDS 实例

现在我们可以连接到 RDS 实例。确保您的本地机器上安装了 MySQL,并打开了 MySQL Prompt。在连接参数中填写端点、端口和登录凭证。

mysql -h rw-to-mysql.xxxxxx.us-east-1.rds.amazonaws.com -P 3306 -u <username> -p <password>

有关更多登录选项,请参阅 RDS 连接指南

设置目标表

使用以下查询在 RDS 实例中设置一个数据库和表。

CREATE TABLE test_db.personnel (
id integer,
name varchar(200),
PRIMARY KEY (id)
);

如果创建成功,将返回一条消息。

Query OK, 0 rows affected (0.10 sec)

设置 RisingWave

安装并启动 RisingWave

要在本地安装并启动 RisingWave,请参阅 快速上手指南。我们推荐以测试模式在本地运行 RisingWave。

从二进制文件运行 RisingWave 的注意事项

如果您打算使用原生 CDC Source 连接器或 JDBC Sink 连接器在本地从二进制文件运行 RisingWave,请确保您的环境中安装了 JDK 11 或更高版本。

创建 Sink

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='jdbc',
field_name = 'field', ...
);

参数

所有 WITH 选项都是必填的。

参数或子句描述
sink_name要创建的 Sink 的名称。
sink_from指定将从中导出数据的直接来源的子句。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询。
AS select_query指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句。有关 SELECT 命令的句法和示例,请参见 SELECT
connectorSink 连接器类型必须为 'jdbc',用于 MySQL Sink。
jdbc.url目标数据库的 JDBC URL,对于驱动程序识别并连接到数据库至关重要。
table.name目标数据库中的表。
type数据格式。允许的格式:
  • append-only:以插入操作输出数据。
upsert:以变更日志流输出数据。
primary_key如果 typeupsert,则必填。下游表的主键。

从 RisingWave 导出数据到 MySQL

创建表和 Sink

要导出到 MySQL,请确保 RisingWave 表和 MySQL 表具有相同的表架构。在 RisingWave 中使用以下查询来创建表和 Sink。

jdbc.url 必须准确。格式根据您使用的是 AWS RDS MySQL 还是自托管的 MySQL 版本而略有不同。如果您的 MySQL 是自托管的,则 jdbc.url 的格式为 jdbc:mysql://127.0.0.1:3306/testdb?user=<username>&password=<password>

CREATE TABLE personnel (
id integer,
name varchar,
);

CREATE SINK s_mysql FROM personnel WITH (
connector='jdbc',
jdbc.url='jdbc:mysql://<aws_rds_endpoint>:<port>/test_db?user=<username>&password=<password>',
table.name='personnel',
type = 'upsert',
primary_key = 'id'
);

更新表

使用以下查询插入一些数据。记得使用 FLUSH 命令来提交更新。

INSERT INTO personnel VALUES (1, 'Alice'), (2, 'Bob');

FLUSH;

验证 Sink 连接

数据变化将同步到 MySQL。为了验证更新,连接到 MySQL 并查询表。您对表所做的更改将会反映出来。

SELECT * FROM personnel;

+------+-------+
| id | name |
+------+-------+
| 1 | Alice |
+------+-------+
| 2 | Bob |
+------+-------+

数据类型映射

有关 MySQL 数据类型映射表,请参见 从 MySQL CDC 导入数据 - 数据类型映射表

请注意,当从 RisingWave 导出到 MySQL 时,数组数据类型将被转换为字符串。只有一维数组可以导出到 MySQL。例如,当 ARRAY['Value 1', 'Value 2'] 导出到 MySQL 时,会被转换为字符串 Value 1, Value 2