从 RisingWave 导出数据到 ClickHouse
本文描述了如何使用 RisingWave 中的 ClickHouse Sink 连接器将数据导出到 ClickHouse。
ClickHouse® 是一个高性能的、面向列的 SQL 数据库管理系统(DBMS),用于在线分析处理(OLAP)。有关 ClickHouse 的更多信息,请参见 ClickHouse 官方网站。
RisingWave 中的 ClickHouse Sink 连接器目前处于测试版。如果您遇到任何问题或有任何反馈,请联系我们。
开始之前
确保您已经有一个可以导出数据的 ClickHouse 表。有关创建表和设置 ClickHouse 的更多指导,请参见此快速开始指南。
确保您有一个可以从中导出数据的上游物化视图或 Source。
我们强烈推荐在 ClickHouse 中使用 ReplacingMergeTree 引擎。这是因为它解决了在 RisingWave 恢复期间 ClickHouse 可能出现的重复写入问题,当主键可以重复时。
句法
CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='clickhouse',
connector_parameter = 'value', ...
);
参数
参数名称 | 描述 |
---|---|
type | 必填。指定 Sink 是 upsert 还是 append-only 。如果创建的是 upsert Sink,必须指定主键。 |
primary_key | 可选。指定 ClickHouse Sink 的主键的 Column 名称列表字符串,以逗号分隔。 |
clickhouse.url | 必填。目标 ClickHouse 服务器的地址。格式:http://ip:port 。默认端口是 8123 。 |
clickhouse.user | 必填。用于访问 ClickHouse 服务器的用户名。 |
clickhouse.password | 必填。用于访问 ClickHouse 服务器的密码。 |
clickhouse.database | 必填。目标 ClickHouse 数据库的名称。 |
clickhouse.table | 必填。目标 ClickHouse 表的名称。 |
ClickHouse 不推荐使用 upsert
(update
和 delete
)功能,因为它可能导致显著的性能下降。
Upsert Sink
尽管 RisingWave 支持所有 ClickHouse 引擎的 append-only
Sink,但对 upsert
Sink 的支持是有限的。此外,对于 ReplacingMergeTree 引擎,append-only
Sink 将不会插入重复数据。
我们支持为 CollapsingMergeTree 和 VersionedCollapsingMergeTree 引擎创建 upsert
Sink。RisingWave 会将 DELETE
转换为 INSERT SIGN = 1
。
示例
本节包括一些示例,如果您想快速尝试将数据导出到 ClickHouse,可以使用这些示例。
创建 ClickHouse 表(如果还没有的话)
例如,让我们创建一个基础的 ClickHouse 表,其主键为 seq_id
,并将引擎设置为 ReplacingMergeTree
。要强调的是,如果不使用 ReplacingMergeTree
或其他去重技术,将存在写入 ClickHouse 重复数据的显著风险。
请注意,仅支持与 S3 兼容的对象存储,如 AWS S3 或 MinIO。
CREATE TABLE demo_test(
seq_id Int32,
user_id Int32,
user_name String
) ENGINE = ReplacingMergeTree
PRIMARY KEY (seq_id);
创建上游物化视图或 Source
以下查询创建了一个 Append-only 的 Source。有关创建 Source 的更多详情,请参见 CREATE SOURCE
。
CREATE SOURCE s1_source (
seq_id integer,
user_id integer,
user_name varchar)
WITH (
connector ='datagen',
fields.seq_id.kind ='sequence',
fields.seq_id.start ='1',
fields.seq_id.end ='10000000',
fields.user_id.kind ='random',
fields.user_id.min ='1',
fields.user_id.max ='10000000',
fields.user_name.kind ='random',
fields.user_name.length ='10',
datagen.rows.per.second ='20000'
) FORMAT PLAIN ENCODE JSON;
另一个选项是创建一个支持原地更新的 Upsert 表。有关创建表的更多详情,请参见 CREATE TABLE
。
CREATE TABLE s1_table (
seq_id integer,
user_id integer,
user_name varchar)
WITH (
connector ='datagen',
fields.seq_id.kind ='sequence',
fields.seq_id.start ='1',
fields.seq_id.end ='10000000',
fields.user_id.kind ='random',
fields.user_id.min ='1',
fields.user_id.max ='10000000',
fields.user_name.kind ='random',
fields.user_name.length ='10',
datagen.rows.per.second ='20000'
) FORMAT PLAIN ENCODE JSON;
从 Append-only Source 创建 Append-only Sink
如果您有一个 Append-only 的 Source,并且想创建一个 Append-only 的 Sink,请在 CREATE SINK
SQL 查询中设置 type = append-only
。
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'clickhouse',
type = 'append-only',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);
从 Upsert Source 创建 Append-only Sink
如果您有一个 Upsert Source,并想创建一个 的 Append-only Sink,请设置 type = append-only
、 force_append_only = true
。这将忽略上游的删除消息,并将上游的更新消息转换为插入消息。
CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'clickhouse',
type = 'append-only',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);
从 Upsert Source 创建 Upsert Sink
如果您有一个 Upsert Source,并想创建一个 Upsert Sink,请设置 type = upsert
。当 Sink 类型为 Upsert 时,务必设置 primary_key
字段来指定下游 ClickHouse 表的主键。
CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'seq_id',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);
数据类型映射
RisingWave 数据类型 | ClickHouse 数据类型 |
---|---|
boolean | Bool |
smallint | Int16 或 UInt16 |
integer | Int32 或 UInt32 |
bigint | Int64 或 UInt64 |
real | Float32 |
double precision | Float64 |
decimal | Decimal |
character varying | String |
bytea | 不支持 |
date | Date32 |
time without time zone | 不支持 |
timestamp | 不支持。您需要在将 timestamp 从 RisingWave 导出之前,将其转换为 timestamptz 。 |
timestamptz | DateTime64 |
interval | 不支持 |
struct | Nested |
array | Array |
JSONB | 不支持 |
在 ClickHouse 中,Nested
数据类型不支持多级嵌套。因此,当将 RisingWave 的 struct
数据导出到 ClickHouse 时,您需要将嵌套数据扁平化或重构,以符合 ClickHouse 的要求。
请注意,特定值的范围在 ClickHouse 类型和 RisingWave 类型之间是有差异的。请参考下表以获取详细信息。
ClickHouse 类型 | RisingWave 类型 | ClickHouse 范围 | RisingWave 范围 |
---|---|---|---|
Date32 | DATE | 1900-01-01 至 2299-12-31 | 0001-01-01 至 9999-12-31 |
DateTime64 | TIMESTAMPTZ | 1900-01-01 00:00:00 至 2299-12-31 23:59:59.99999999 | 0001-01-01 00:00:00 至 9999-12-31 23:59:59 |