从 MongoDB CDC 摄取数据
该内容将向您介绍 RisingWave 从 MongoDB 摄取变更流的步骤。
要让 RisingWave 从 MongoDB 摄取 CDC 数据,您需要使用 MongoDB 的 Debezium 连接器,先将 MongoDB 的变更流转换为 Kafka Topic,然后再从 RisingWave 摄取这些 Kafka Topic。
步骤概述
配置 MongoDB
部署 MongoDB 的 Debezium 连接器
从 RisingWave 摄取这些数据
配置 MongoDB
首先,您需要确保已安装并正确配置了 MongoDB。
请注意,MongoDB 的 Debezium 连接器使用 MongoDB 的变更流来捕获变更,因此该连接器仅适用于 MongoDB 副本集或分片集群(其中每个分片都是一个单独的副本集)。
要了解如何设置副本集或分片集群,请参阅 MongoDB 文档。
你还必须拥有一个拥有适当权限的 MongoDB 用户,并执行其他配置。按照 设置 MongoDB 部分的说明,完成 MongoDB 中的配置。
部署 MongoDB 的 Debezium 连接器
要部署 Debezium MongoDB 连接器,需要安装 Debezium MongoDB 连接器归档文件,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。
连接器启动后,Debezium 将在 Kafka 中创建一个 CDC Topic,并将捕获的事件发布到此 Topic。
有关完整的部署说明,请参阅 部署。
MongoDB 连接器配置示例
下面是一个连接器配置示例。
{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "<mongodb host>:27017",
"mongodb.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "<message queue host>:29092"
}
}
<mongodb host>:27017
指的是副本集中 MongoDB 服务器的主机名和端口对(形式为 'host' 或 'host:port')的逗号分隔列表。
<message queue host>:29092
指的是用于存储数据库 Schema 历史 Topic 的 Kafka 集群的引导服务器。
将数据导入到 RisingWave 中
为确保捕获所有数据变更,必须在 RisingWave 中创建表并指定主键。创建表时,请将连接器指定为 kafka
,并使用 DEBEZIUM_MONGO
作为格式,JSON
作为编码选项。
有关句法和参数的详细信息,请参阅 CREATE TABLE
。
CREATE TABLE source_name (
_id jsonb PRIMARY KEY
payload jsonb
)
WITH (
connector='kafka',
topic='debezium_mongo_json_customers',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;
表创建后,可以根据需要查看和转换数据。