从 Citus CDC 摄取数据
RisingWave 支持从 Citus 数据库摄取变更数据捕获 (CDC) 数据。支持 Citus 10.2 版本。
Citus 数据库是 PostgreSQL 的扩展,可将 PostgreSQL 转变为分布式数据库。有关详细信息,请参阅 Citus 数据。
测试版功能
从 Citus 数据库摄取 CDC 数据目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
设置 Citus
在 Citus 集群中创建
superuser
角色。---在 Coordinator 节点上创建角色
CREATE ROLE dbz Superuser LOGIN;
---在 Worker 节点上创建角色
SELECT run_command_on_workers ($cmd$
CREATE ROLE dbz Superuser LOGIN;
$cmd$);通过修改
postgresql.conf
确保每个 Worker 节点上的wal_level
为logical
。将要从中摄取数据的表的复制标识设置为
FULL
。---在 Coordinator 节点上执行
ALTER TABLE github_events REPLICA IDENTITY FULL;
限制
在 RisingWave 中从 Citus 摄取 CDC 数据时有一些限制。
- 需要 PostgreSQL
superuser
角色。 - 仅支持 分布式表。
- 无法检测到新添加的 Worker 节点。
从二进制文件运行 RisingWave 的注意事项
如果您在本地通过二进制文件运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中已安装 JDK 11 或更高版本。
使用原生 CDC 连接器在 RisingWave 中创建表
句法
注意必须指定主键。
CREATE TABLE [ IF NOT EXISTS ] source_name (
column_name data_type PRIMARY KEY , ...
[PRIMARY KEY ( column_name, ... )]
)
WITH (
connector='citus-cdc',
<field>=<value>, ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];
WITH 参数
除非另有说明,否则所列字段为必填项。
字段 | 注释 |
---|---|
hostname | Coordinator 节点的主机名。 |
port | Coordinator 节点的端口号。 |
username | 数据库的用户名。 |
password | 数据库的密码。 |
database.servers | Citus Worker 节点的主机和端口。 |
database.name | 数据库的名称。 |
schema.name | 可选。Schema 的名称。默认值为 public 。 |
table.name | 要从中摄取数据的表的名称。 |
slot.name | 可选。每个 Source 的 Slot 的名称。每个 Source 都应有唯一的 Slot 名称。 |
示例
CREATE TABLE github_events_rw (
event_id bigint,
event_type text,
event_public boolean,
repo_id bigint,
payload jsonb,
repo jsonb,
user_id bigint,
org jsonb,
created_at timestamp,
PRIMARY KEY (event_id, user_id)
) WITH (
connector = 'citus-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'dbz',
password = '123456',
database.servers = '172.31.29.245:5432,172.31.31.177:5432',
database.name = 'postgres',
schema.name = 'public',
table.name = 'github_events',
slot.name = 'github_events_dbz_slot1',
);