从 PostgreSQL CDC 摄取数据
变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送给下游服务的过程。
RisingWave 支持从 PostgreSQL 摄取 CDC 数据。支持 PostgreSQL 的 10、11、12、13、14 和 15 版本。
您可以通过两种方式从 PostgreSQL 将 CDC 数据摄取到 RisingWave:
使用内置的 PostgreSQL CDC 连接器
使用此连接器,RisingWave 可直接连接 PostgreSQL 数据库,从 binlog 获取数据,而无需启动其他服务。
测试版功能RisingWave 内置的 PostgreSQL CDC 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。
使用 CDC 工具和消息代理
您可以使用 CDC 工具,然后使用 Kafka、Pulsar 或 Kinesis 连接器将 CDC 数据摄取到 RisingWave。更多详细信息,请参阅 通过事件流系统创建 Source 相关内容。
设置 PostgreSQL
- Self-hosted
- AWS RDS
确保
wal_level
为logical
。使用以下语句检查。SHOW wal_level;
默认情况下,它是
replica
。对于 CDC,需要在数据库配置文件(postgresql.conf
)中或通过psql
命令将其设置为 logical。以下命令将更改wal_level
。ALTER SYSTEM SET wal_level = logical;
请注意,更改
wal_level
需要重启 PostgreSQL 实例,可能影响数据库性能。note如果选择在不使用共享 Source 的情况下创建多个 CDC 表,请确保将
max_wal_senders
设置为大于或等于同步表的数量。默认情况下,max_wal_senders
为 10。将
REPLICATION
、LOGIN
和CREATEDB
角色属性分配给用户。对于现有用户,可运行以下语句分配属性:
ALTER USER <username> REPLICATION LOGIN CREATEDB;
对于新用户,可运行以下语句创建用户并分配属性:
CREATE USER <username> REPLICATION LOGIN CREATEDB;
可以使用
\du
psql 命令检查角色属性:dev-# \du
List of roles
Role name | Attributes | Member of
-----------+-----------------------------------------------------------+---------
rw | Create DB, Replication | {}
postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}授予用户所需的权限。
运行以下语句可授予用户所需的权限。
GRANT CONNECT ON DATABASE <database_name> TO <username>;
GRANT USAGE ON SCHEMA <schema_name> TO <username>;
GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;
GRANT CREATE ON DATABASE <database_name> TO <username>;可以使用以下语句检查用户对表的权限:
postgres=# SELECT table_name, grantee, privilege_type
FROM information_schema.role_table_grants
WHERE grantee='<username>';示例结果:
table_name | grantee | privilege_type
-----------+---------+----------------
lineitem | rw | SELECT
customer | rw | SELECT
nation | rw | SELECT
orders | rw | SELECT
part | rw | SELECT
partsupp | rw | SELECT
supplier | rw | SELECT
region | rw | SELECT
(8 rows)
这里我们将使用一个标准类 MySQL 实例(不带多可用区部署)作为示例。
检查
wal_level
参数是否设置为logical
。如果是logical
,则已完成设置。否则,请为 Postgres 实例创建一个参数组。我们为运行 Postgres 12 的实例创建了一个名为 pg-cdc 的参数组。接下来,点击 pg-cdc 参数组,将rds.logical_replication
的值改为 1。如果选择在不使用共享 Source 的情况下创建多个 CDC 表,请将
max_wal_senders
设置为大于或等于同步表的数量。默认情况下,在版本 13 及更高版本中,max_wal_senders
为 20。noteAWS Aurora PostgreSQL 的 WAL 透写缓存存在已知问题,会导致数据丢失。这会影响到 Aurora PostgreSQL 版本 14.5、13.8、12.12 和 11.17。为了避免这种情况,请将
rds.logical_wal_cache
参数设置为 0。进入 Databases 页面,修改实例以使用 pg-cdc 参数组。
点击 Continue 并选择 Apply immediately。最后,点击 Modify DB instance 保存更改。记得要重启 Postgres 实例才能使更改生效。
授予用户 RDS 复制权限。
GRANT rds_replication TO <username>;
从二进制文件运行 RisingWave 的注意事项
如果从二进制文件在本地运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中安装了 JDK 11 或更高版本。
使用原生 CDC 连接器创建表
为确保捕获所有数据更改,必须创建表或 Source 并指定主键。更多详细信息,请参阅 CREATE TABLE
命令。
句法
创建 CDC 表的句法。注意必须指定主键。
CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='postgres-cdc',
connector_parameter='value', ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];
创建 CDC Source 的句法。
CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='postgres-cdc',
<field>=<value>, ...
);
连接器参数
除非另有说明,否则所列字段均为必填项。请注意,这些参数的值应用单引号括起来。
字段 | 注释 |
---|---|
hostname | 数据库的主机名。 |
port | 数据库的端口号。 |
username | 数据库的用户名。 |
password | 数据库的密码。 |
database.name | 数据库的名称。 |
schema.name | 可选。Schema 名称。默认值为 public 。 |
table.name | 要从中摄取数据的表的名称。 |
slot.name | 可选。此 PostgreSQL Source 的 复制槽。默认情况下,将随机生成一个唯一的槽名。每个 Source 都应有唯一的槽名。 |
publication.name | 可选。Publication 的名称。默认值为 rw_publication 。更多信息,请参阅 多个 CDC 源表。 |
publication.create.enable | 可选。默认值为 'true' 。如果 publication.name 不存在且该值为 'true' ,则将创建 publication.name 。如果 publication.name 不存在且该值为 'false' ,则将返回错误信息。 |
transactional | 可选。指定是否要为即将创建的 CDC 表启用事务。默认值为 'false' 。多表事务的共享 CDC Source 也支持此功能。有关详细信息,请参阅 CDC 表内的事务。 |
RisingWave 通过 PostgreSQL 复制实现 CDC。通过 pg_replication_slots
视图检查当前进度。通过 pg_drop_replication_slot()
,删除不活动的复制槽。RisingWave 不会自动删除不活动的复制槽。您必须手动执行此操作,防止 WAL 文件在上游 PostgreSQL 数据库中累积。
数据格式
数据采用 Debezium JSON 格式。Debezium 是一款基于日志的 CDC 工具,可从各种数据库管理系统(如 PostgreSQL、MySQL 和 SQL Server)中捕获行更改,并实时生成结构一致的事件。RisingWave 中的 PostgreSQL CDC 连接器支持 JSON 作为 Debezium 数据的序列化格式。以 postgres-cdc
为 Source 创建表时,无需指定数据格式。
示例
创建单个 CDC 表
下面的示例可在 RisingWave 中创建一个表,从 PostgreSQL 的 shipments
表中读取 CDC 数据。shipments
表位于 dev
数据库下的 public
Schema 中。连接到 PostgreSQL 中的特定表时,请使用 CREATE TABLE
命令。
CREATE TABLE shipments (
shipment_id integer,
order_id integer,
origin string,
destination string,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'dev',
schema.name = 'public',
table.name = 'shipments'
);
使用同一 Source 创建多个 CDC 表
RisingWave 支持创建单个 PostgreSQL Source,允许您从位于同一数据库中的多个表读取 CDC 数据。
使用 CREATE SOURCE
命令和 PostgreSQL CDC 参数创建 CDC Source,连接到上游数据库。数据格式固定为 FORMAT PLAIN ENCODE JSON
,无需指定。
CREATE SOURCE pg_mydb WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot'
);
Source 创建后即可创建多个 CDC 表,从上游数据库的不同表和 Schema 中摄取数据,无需再次指定数据库连接参数。
例如,RisingWave 中的以下 CDC 表从 Schema public
中的表 tt3
摄取数据。在关键字 TABLE
后面的 FROM
子句中指定 PostgreSQL 表名时,还必须指定 Schema 的名称。
CREATE TABLE tt3 (
v1 integer primary key,
v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';
您可以在 RisingWave 中创建另一个 CDC 表,该表从 Schema ods
的表 tt4
中摄取数据。
CREATE TABLE tt4 (
v1 integer primary key,
v2 varchar,
PRIMARY KEY (v1)
) FROM pg_mydb TABLE 'ods.tt4';
要检查回填历史数据的进度,可使用 SHOW INTERNAL TABLES
命令找到相应的内部表并进行查询。
数据类型映射
下表显示了创建 Source 时应指定的 RisingWave 中的相应数据类型。有关原生 RisingWave 数据类型的详细信息,请参阅 数据类型概述。
标有星号的 RisingWave 数据类型表示,虽然没有相应的 RisingWave 数据类型,但摄取的数据仍可作为所列类型进行消费。
RisingWave 无法正确解析 PostgreSQL 中的 Composite 类型,因为 Debezium 不支持 PostgreSQL 中的 Composite 类型。
PostgreSQL 类型 | RisingWave 类型 |
---|---|
BOOLEAN | BOOLEAN |
BIT(1) | BOOLEAN |
BIT( > 1) | No support |
BIT VARYING[(M)] | No support |
SMALLINT, SMALLSERIAL | SMALLINT |
INTEGER, SERIAL | INTEGER |
BIGINT, BIGSERIAL, OID | BIGINT |
REAL | REAL |
DOUBLE PRECISION | DOUBLE PRECISION |
CHAR[(M)] | CHARACTER VARYING |
VARCHAR[(M)] | CHARACTER VARYING |
CHARACTER[(M)] | CHARACTER VARYING |
CHARACTER VARYING[(M)] | CHARACTER VARYING |
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONE | TIMESTAMP WITH TIME ZONE |
TIMETZ, TIME WITH TIME ZONE | TIME WITHOUT TIME ZONE (assume UTC time zone) |
INTERVAL [P] | INTERVAL |
BYTEA | BYTEA |
JSON, JSONB | JSONB |
XML | CHARACTER VARYING |
UUID | CHARACTER VARYING |
POINT | STRUCT (with form <x REAL, y REAL> ) |
LTREE | No support |
CITEXT | CHARACTER VARYING* |
INET | CHARACTER VARYING* |
INT4RANGE | CHARACTER VARYING* |
INT8RANGE | CHARACTER VARYING* |
NUMRANGE | CHARACTER VARYING* |
TSRANGE | CHARACTER VARYING* |
TSTZRANGE | CHARACTER VARYING* |
DATERANGE | CHARACTER VARYING* |
ENUM | CHARACTER VARYING* |
DATE | DATE |
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6) | TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40) ) |
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3) | TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40) ) |
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMP | TIMESTAMP WITHOUT TIME ZONE |
NUMERIC[(M[,D])] | NUMERIC |
DECIMAL[(M[,D])] | NUMERIC |
MONEY[(M[,D])] | NUMERIC |
HSTORE | No support |
HSTORE | No support |
INET | CHARACTER VARYING* |
CIDR | CHARACTER VARYING* |
MACADDR | CHARACTER VARYING* |
MACADDR8 | CHARACTER VARYING* |