Skip to main content

从 PostgreSQL CDC 摄取数据

变更数据捕获(CDC)是指识别和捕获数据库中的数据变更,然后将变更实时传送给下游服务的过程。

RisingWave 支持从 PostgreSQL 摄取 CDC 数据。支持 PostgreSQL 的 10、11、12、13、14 和 15 版本。

您可以通过两种方式从 PostgreSQL 将 CDC 数据摄取到 RisingWave:

  • 使用内置的 PostgreSQL CDC 连接器

    使用此连接器,RisingWave 可直接连接 PostgreSQL 数据库,从 binlog 获取数据,而无需启动其他服务。

    测试版功能

    RisingWave 内置的 PostgreSQL CDC 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

  • 使用 CDC 工具和消息代理

    您可以使用 CDC 工具,然后使用 Kafka、Pulsar 或 Kinesis 连接器将 CDC 数据摄取到 RisingWave。更多详细信息,请参阅 通过事件流系统创建 Source 相关内容。

设置 PostgreSQL

  1. 确保 wal_levellogical。使用以下语句检查。

    SHOW wal_level;

    默认情况下,它是 replica。对于 CDC,需要在数据库配置文件(postgresql.conf)中或通过 psql 命令将其设置为 logical。以下命令将更改 wal_level

    ALTER SYSTEM SET wal_level = logical;

    请注意,更改 wal_level 需要重启 PostgreSQL 实例,可能影响数据库性能。

    note

    如果选择在不使用共享 Source 的情况下创建多个 CDC 表,请确保将 max_wal_senders 设置为大于或等于同步表的数量。默认情况下,max_wal_senders 为 10。

  2. REPLICATIONLOGINCREATEDB 角色属性分配给用户。

    对于现有用户,可运行以下语句分配属性:

    ALTER USER <username> REPLICATION LOGIN CREATEDB;

    对于新用户,可运行以下语句创建用户并分配属性:

    CREATE USER <username> REPLICATION LOGIN CREATEDB;

    可以使用 \du psql 命令检查角色属性:

    dev-# \du
    List of roles
    Role name | Attributes | Member of
    -----------+-----------------------------------------------------------+---------
    rw | Create DB, Replication | {}
    postgres | Superuser, Create role, Create DB, Replication, Bypass RLS | {}
  3. 授予用户所需的权限。

    运行以下语句可授予用户所需的权限。

    GRANT CONNECT ON DATABASE <database_name> TO <username>;   
    GRANT USAGE ON SCHEMA <schema_name> TO <username>;
    GRANT SELECT ON ALL TABLES IN SCHEMA <schema_name> TO <username>;
    GRANT CREATE ON DATABASE <database_name> TO <username>;

    可以使用以下语句检查用户对表的权限:

    postgres=# SELECT table_name, grantee, privilege_type
    FROM information_schema.role_table_grants
    WHERE grantee='<username>';

    示例结果:

     table_name | grantee | privilege_type
    -----------+---------+----------------
    lineitem | rw | SELECT
    customer | rw | SELECT
    nation | rw | SELECT
    orders | rw | SELECT
    part | rw | SELECT
    partsupp | rw | SELECT
    supplier | rw | SELECT
    region | rw | SELECT
    (8 rows)

从二进制文件运行 RisingWave 的注意事项

如果从二进制文件在本地运行 RisingWave,并打算使用原生 CDC Source Connector 或 JDBC Sink Connector,请确保您的环境中安装了 JDK 11 或更高版本。

使用原生 CDC 连接器创建表

为确保捕获所有数据更改,必须创建表或 Source 并指定主键。更多详细信息,请参阅 CREATE TABLE 命令。

句法

创建 CDC 表的句法。注意必须指定主键。

CREATE {TABLE | SOURCE} [ IF NOT EXISTS ] table_name (
column_name data_type PRIMARY KEY , ...
PRIMARY KEY ( column_name, ... )
)
WITH (
connector='postgres-cdc',
connector_parameter='value', ...
)
[ FORMAT DEBEZIUM ENCODE JSON ];

创建 CDC Source 的句法。

CREATE SOURCE [ IF NOT EXISTS ] source_name WITH (
connector='postgres-cdc',
<field>=<value>, ...
);

连接器参数

除非另有说明,否则所列字段均为必填项。请注意,这些参数的值应用单引号括起来。

字段注释
hostname数据库的主机名。
port数据库的端口号。
username数据库的用户名。
password数据库的密码。
database.name数据库的名称。
schema.name可选。Schema 名称。默认值为 public
table.name要从中摄取数据的表的名称。
slot.name可选。此 PostgreSQL Source 的 复制槽。默认情况下,将随机生成一个唯一的槽名。每个 Source 都应有唯一的槽名。
publication.name可选。Publication 的名称。默认值为 rw_publication。更多信息,请参阅 多个 CDC 源表
publication.create.enable可选。默认值为 'true'。如果 publication.name 不存在且该值为 'true',则将创建 publication.name。如果 publication.name 不存在且该值为 'false',则将返回错误信息。
transactional可选。指定是否要为即将创建的 CDC 表启用事务。默认值为 'false'。多表事务的共享 CDC Source 也支持此功能。有关详细信息,请参阅 CDC 表内的事务
note

RisingWave 通过 PostgreSQL 复制实现 CDC。通过 pg_replication_slots 视图检查当前进度。通过 pg_drop_replication_slot(),删除不活动的复制槽。RisingWave 不会自动删除不活动的复制槽。您必须手动执行此操作,防止 WAL 文件在上游 PostgreSQL 数据库中累积。

数据格式

数据采用 Debezium JSON 格式。Debezium 是一款基于日志的 CDC 工具,可从各种数据库管理系统(如 PostgreSQL、MySQL 和 SQL Server)中捕获行更改,并实时生成结构一致的事件。RisingWave 中的 PostgreSQL CDC 连接器支持 JSON 作为 Debezium 数据的序列化格式。以 postgres-cdc 为 Source 创建表时,无需指定数据格式。

示例

创建单个 CDC 表

下面的示例可在 RisingWave 中创建一个表,从 PostgreSQL 的 shipments 表中读取 CDC 数据。shipments 表位于 dev 数据库下的 public Schema 中。连接到 PostgreSQL 中的特定表时,请使用 CREATE TABLE 命令。

 CREATE TABLE shipments (
shipment_id integer,
order_id integer,
origin string,
destination string,
is_arrived boolean,
PRIMARY KEY (shipment_id)
) WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '5432',
username = 'postgres',
password = 'postgres',
database.name = 'dev',
schema.name = 'public',
table.name = 'shipments'
);

使用同一 Source 创建多个 CDC 表

RisingWave 支持创建单个 PostgreSQL Source,允许您从位于同一数据库中的多个表读取 CDC 数据。

使用 CREATE SOURCE 命令和 PostgreSQL CDC 参数创建 CDC Source,连接到上游数据库。数据格式固定为 FORMAT PLAIN ENCODE JSON,无需指定。

CREATE SOURCE pg_mydb WITH (
connector = 'postgres-cdc',
hostname = '127.0.0.1',
port = '8306',
username = 'root',
password = '123456',
database.name = 'mydb',
slot.name = 'mydb_slot'
);

Source 创建后即可创建多个 CDC 表,从上游数据库的不同表和 Schema 中摄取数据,无需再次指定数据库连接参数。

例如,RisingWave 中的以下 CDC 表从 Schema public 中的表 tt3 摄取数据。在关键字 TABLE 后面的 FROM 子句中指定 PostgreSQL 表名时,还必须指定 Schema 的名称。

CREATE TABLE tt3 (
v1 integer primary key,
v2 timestamp with time zone
) FROM pg_mydb TABLE 'public.tt3';

您可以在 RisingWave 中创建另一个 CDC 表,该表从 Schema ods 的表 tt4 中摄取数据。

CREATE TABLE tt4 (
v1 integer primary key,
v2 varchar,
PRIMARY KEY (v1)
) FROM pg_mydb TABLE 'ods.tt4';

要检查回填历史数据的进度,可使用 SHOW INTERNAL TABLES 命令找到相应的内部表并进行查询。

数据类型映射

下表显示了创建 Source 时应指定的 RisingWave 中的相应数据类型。有关原生 RisingWave 数据类型的详细信息,请参阅 数据类型概述

标有星号的 RisingWave 数据类型表示,虽然没有相应的 RisingWave 数据类型,但摄取的数据仍可作为所列类型进行消费。

note

RisingWave 无法正确解析 PostgreSQL 中的 Composite 类型,因为 Debezium 不支持 PostgreSQL 中的 Composite 类型。

PostgreSQL 类型RisingWave 类型
BOOLEANBOOLEAN
BIT(1)BOOLEAN
BIT( > 1)No support
BIT VARYING[(M)]No support
SMALLINT, SMALLSERIALSMALLINT
INTEGER, SERIALINTEGER
BIGINT, BIGSERIAL, OIDBIGINT
REALREAL
DOUBLE PRECISIONDOUBLE PRECISION
CHAR[(M)]CHARACTER VARYING
VARCHAR[(M)]CHARACTER VARYING
CHARACTER[(M)]CHARACTER VARYING
CHARACTER VARYING[(M)]CHARACTER VARYING
TIMESTAMPTZ, TIMESTAMP WITH TIME ZONETIMESTAMP WITH TIME ZONE
TIMETZ, TIME WITH TIME ZONETIME WITHOUT TIME ZONE (assume UTC time zone)
INTERVAL [P]INTERVAL
BYTEABYTEA
JSON, JSONBJSONB
XMLCHARACTER VARYING
UUIDCHARACTER VARYING
POINTSTRUCT (with form <x REAL, y REAL>)
LTREENo support
CITEXTCHARACTER VARYING*
INETCHARACTER VARYING*
INT4RANGECHARACTER VARYING*
INT8RANGECHARACTER VARYING*
NUMRANGECHARACTER VARYING*
TSRANGECHARACTER VARYING*
TSTZRANGECHARACTER VARYING*
DATERANGECHARACTER VARYING*
ENUMCHARACTER VARYING*
DATEDATE
TIME(1), TIME(2), TIME(3), TIME(4), TIME(5), TIME(6)TIME WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(1), TIMESTAMP(2), TIMESTAMP(3)TIMESTAMP WITHOUT TIME ZONE (limited to [1973-03-03 09:46:40, 5138-11-16 09:46:40))
TIMESTAMP(4), TIMESTAMP(5), TIMESTAMP(6), TIMESTAMPTIMESTAMP WITHOUT TIME ZONE
NUMERIC[(M[,D])]NUMERIC
DECIMAL[(M[,D])]NUMERIC
MONEY[(M[,D])]NUMERIC
HSTORENo support
HSTORENo support
INETCHARACTER VARYING*
CIDRCHARACTER VARYING*
MACADDRCHARACTER VARYING*
MACADDR8CHARACTER VARYING*