Skip to main content

从 MongoDB CDC 摄取数据

该内容将向您介绍 RisingWave 从 MongoDB 摄取变更流的步骤。

要让 RisingWave 从 MongoDB 摄取 CDC 数据,您需要使用 MongoDB 的 Debezium 连接器,先将 MongoDB 的变更流转换为 Kafka Topic,然后再从 RisingWave 摄取这些 Kafka Topic。

Ingest data from MongoDB CDC to RisingWave

步骤概述

  1. 配置 MongoDB

  2. 部署 MongoDB 的 Debezium 连接器

  3. 从 RisingWave 摄取这些数据

配置 MongoDB

首先,您需要确保已安装并正确配置了 MongoDB。

请注意,MongoDB 的 Debezium 连接器使用 MongoDB 的变更流来捕获变更,因此该连接器仅适用于 MongoDB 副本集或分片集群(其中每个分片都是一个单独的副本集)。

要了解如何设置副本集或分片集群,请参阅 MongoDB 文档。

你还必须拥有一个拥有适当权限的 MongoDB 用户,并执行其他配置。按照 设置 MongoDB 部分的说明,完成 MongoDB 中的配置。

部署 MongoDB 的 Debezium 连接器

要部署 Debezium MongoDB 连接器,需要安装 Debezium MongoDB 连接器归档文件,配置连接器,并通过将其配置添加到 Kafka Connect 来启动连接器。

连接器启动后,Debezium 将在 Kafka 中创建一个 CDC Topic,并将捕获的事件发布到此 Topic。

有关完整的部署说明,请参阅 部署

MongoDB 连接器配置示例

下面是一个连接器配置示例。

{
"name": "mongodb-connector",
"config": {
"connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
"tasks.max": "1",
"mongodb.hosts": "<mongodb host>:27017",
"mongodb.name": "dbserver1",
"database.history.kafka.bootstrap.servers": "<message queue host>:29092"
}
}

<mongodb host>:27017 指的是副本集中 MongoDB 服务器的主机名和端口对(形式为 'host' 或 'host:port')的逗号分隔列表。

<message queue host>:29092 指的是用于存储数据库 Schema 历史 Topic 的 Kafka 集群的引导服务器。

将数据导入到 RisingWave 中

为确保捕获所有数据变更,必须在 RisingWave 中创建表并指定主键。创建表时,请将连接器指定为 kafka,并使用 DEBEZIUM_MONGO 作为格式,JSON 作为编码选项。

有关句法和参数的详细信息,请参阅 CREATE TABLE

示例
CREATE TABLE source_name (
_id jsonb PRIMARY KEY
payload jsonb
)
WITH (
connector='kafka',
topic='debezium_mongo_json_customers',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode = 'earliest'
) FORMAT DEBEZIUM_MONGO ENCODE JSON;

表创建后,可以根据需要查看和转换数据。