Skip to main content

从 Citus CDC 摄取数据

RisingWave 支持从 Citus 数据库摄取变更数据捕获 (CDC) 数据。支持 Citus 10.2 版本。

Citus 数据库是 PostgreSQL 的扩展,可将 PostgreSQL 转变为分布式数据库。有关详细信息,请参阅 Citus 数据

测试版功能

从 Citus 数据库摄取 CDC 数据目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

设置 Citus

  1. 在 Citus 集群中创建 superuser 角色。

    ---在 Coordinator 节点上创建角色
    CREATE ROLE dbz Superuser LOGIN;

    ---在 Worker 节点上创建角色
    SELECT run_command_on_workers ($cmd$
    CREATE ROLE dbz Superuser LOGIN;
    $cmd$);
  2. 通过修改 postgresql.conf 确保每个 Worker 节点上的 wal_levellogical

  3. 将要从中摄取数据的表的复制标识设置为 FULL

    ---在 Coordinator 节点上执行
    ALTER TABLE github_events REPLICA IDENTITY FULL;

限制

在 RisingWave 中从 Citus 摄取 CDC 数据时有一些限制。

  • 需要 PostgreSQL superuser 角色。
  • 仅支持 分布式表
  • 无法检测到新添加的 Worker 节点。

从二进制文件运行 RisingWave 的注意事项

如果您在本地通过二进制文件运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中已安装 JDK 11 或更高版本。

使用原生 CDC 连接器在 RisingWave 中创建表

句法

注意必须指定主键。

CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type PRIMARY KEY , ...
[PRIMARY KEY ( column_name, ... )]
)
WITH (
connector='citus-cdc',
<field>=<value>, ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];

WITH 参数

除非另有说明,否则所列字段为必填项。

字段注释
hostnameCoordinator 节点的主机名。
portCoordinator 节点的端口号。
username数据库的用户名。
password数据库的密码。
database.serversCitus Worker 节点的主机和端口。
database.name数据库的名称。
schema.name可选。Schema 的名称。默认值为 public
table.name要从中摄取数据的表的名称。
slot.name可选。每个 Source 的 Slot 的名称。每个 Source 都应有唯一的 Slot 名称。

示例

CREATE TABLE github_events_rw (
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
user_id bigint,
org jsonb,
created_at timestamp,
PRIMARY KEY (event_id, user_id)
) WITH (
connector = 'citus-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'dbz',
password = '123456',
database.servers = '172.31.29.245:5432,172.31.31.177:5432',
database.name = 'postgres',
schema.name = 'public',
table.name = 'github_events',
slot.name = 'github_events_dbz_slot1',
);