Skip to main content

从 RisingWave 导出数据到 ClickHouse

本文描述了如何使用 RisingWave 中的 ClickHouse Sink 连接器将数据导出到 ClickHouse。

ClickHouse® 是一个高性能的、面向列的 SQL 数据库管理系统(DBMS),用于在线分析处理(OLAP)。有关 ClickHouse 的更多信息,请参见 ClickHouse 官方网站

测试版功能

RisingWave 中的 ClickHouse Sink 连接器目前处于测试版。如果您遇到任何问题或有任何反馈,请联系我们。

开始之前

  • 确保您已经有一个可以导出数据的 ClickHouse 表。有关创建表和设置 ClickHouse 的更多指导,请参见此快速开始指南

  • 确保您有一个可以从中导出数据的上游物化视图或 Source。

note

我们强烈推荐在 ClickHouse 中使用 ReplacingMergeTree 引擎。这是因为它解决了在 RisingWave 恢复期间 ClickHouse 可能出现的重复写入问题,当主键可以重复时。

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='clickhouse',
connector_parameter = 'value', ...
);

参数

参数名称描述
type必填。指定 Sink 是 upsert 还是 append-only。如果创建的是 upsert Sink,必须指定主键。
primary_key可选。指定 ClickHouse Sink 的主键的 Column 名称列表字符串,以逗号分隔。
clickhouse.url必填。目标 ClickHouse 服务器的地址。格式:http://ip:port。默认端口是 8123
clickhouse.user必填。用于访问 ClickHouse 服务器的用户名。
clickhouse.password必填。用于访问 ClickHouse 服务器的密码。
clickhouse.database必填。目标 ClickHouse 数据库的名称。
clickhouse.table必填。目标 ClickHouse 表的名称。
note

ClickHouse 不推荐使用 upsertupdatedelete)功能,因为它可能导致显著的性能下降。

Upsert Sink

尽管 RisingWave 支持所有 ClickHouse 引擎的 append-only Sink,但对 upsert Sink 的支持是有限的。此外,对于 ReplacingMergeTree 引擎,append-only Sink 将不会插入重复数据。

我们支持为 CollapsingMergeTree 和 VersionedCollapsingMergeTree 引擎创建 upsert Sink。RisingWave 会将 DELETE 转换为 INSERT SIGN = 1

示例

本节包括一些示例,如果您想快速尝试将数据导出到 ClickHouse,可以使用这些示例。

创建 ClickHouse 表(如果还没有的话)

例如,让我们创建一个基础的 ClickHouse 表,其主键为 seq_id,并将引擎设置为 ReplacingMergeTree。要强调的是,如果不使用 ReplacingMergeTree 或其他去重技术,将存在写入 ClickHouse 重复数据的显著风险。

请注意,仅支持与 S3 兼容的对象存储,如 AWS S3 或 MinIO。

CREATE TABLE demo_test(
seq_id Int32,
user_id Int32,
user_name String
) ENGINE = ReplacingMergeTree
PRIMARY KEY (seq_id);

创建上游物化视图或 Source

以下查询创建了一个 Append-only 的 Source。有关创建 Source 的更多详情,请参见 CREATE SOURCE

CREATE SOURCE s1_source (
seq_id integer,
user_id integer,
user_name varchar)
WITH (
connector ='datagen',
fields.seq_id.kind ='sequence',
fields.seq_id.start ='1',
fields.seq_id.end ='10000000',
fields.user_id.kind ='random',
fields.user_id.min ='1',
fields.user_id.max ='10000000',
fields.user_name.kind ='random',
fields.user_name.length ='10',
datagen.rows.per.second ='20000'
) FORMAT PLAIN ENCODE JSON;

另一个选项是创建一个支持原地更新的 Upsert 表。有关创建表的更多详情,请参见 CREATE TABLE

CREATE TABLE s1_table (
seq_id integer,
user_id integer,
user_name varchar)
WITH (
connector ='datagen',
fields.seq_id.kind ='sequence',
fields.seq_id.start ='1',
fields.seq_id.end ='10000000',
fields.user_id.kind ='random',
fields.user_id.min ='1',
fields.user_id.max ='10000000',
fields.user_name.kind ='random',
fields.user_name.length ='10',
datagen.rows.per.second ='20000'
) FORMAT PLAIN ENCODE JSON;

从 Append-only Source 创建 Append-only Sink

如果您有一个 Append-only 的 Source,并且想创建一个 Append-only 的 Sink,请在 CREATE SINK SQL 查询中设置 type = append-only

CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'clickhouse',
type = 'append-only',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);

从 Upsert Source 创建 Append-only Sink

如果您有一个 Upsert Source,并想创建一个 的 Append-only Sink,请设置 type = append-onlyforce_append_only = true。这将忽略上游的删除消息,并将上游的更新消息转换为插入消息。

CREATE SINK s1_sink FROM s1_source
WITH (
connector = 'clickhouse',
type = 'append-only',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);

从 Upsert Source 创建 Upsert Sink

如果您有一个 Upsert Source,并想创建一个 Upsert Sink,请设置 type = upsert。当 Sink 类型为 Upsert 时,务必设置 primary_key 字段来指定下游 ClickHouse 表的主键。

CREATE SINK s1_sink FROM s1_table
WITH (
connector = 'clickhouse',
type = 'upsert',
primary_key = 'seq_id',
clickhouse.url = '${CLICKHOUSE_URL}',
clickhouse.user = '${CLICKHOUSE_USER}',
clickhouse.password = '${CLICKHOUSE_PASSWORD}',
clickhouse.database = 'default',
clickhouse.table='demo_test'
);

数据类型映射

RisingWave 数据类型ClickHouse 数据类型
booleanBool
smallintInt16 或 UInt16
integerInt32 或 UInt32
bigintInt64 或 UInt64
realFloat32
double precisionFloat64
decimalDecimal
character varyingString
bytea不支持
dateDate32
time without time zone不支持
timestamp不支持。您需要在将 timestamp 从 RisingWave 导出之前,将其转换为 timestamptz
timestamptzDateTime64
interval不支持
structNested
arrayArray
JSONB不支持
note

在 ClickHouse 中,Nested 数据类型不支持多级嵌套。因此,当将 RisingWave 的 struct 数据导出到 ClickHouse 时,您需要将嵌套数据扁平化或重构,以符合 ClickHouse 的要求。

请注意,特定值的范围在 ClickHouse 类型和 RisingWave 类型之间是有差异的。请参考下表以获取详细信息。

ClickHouse 类型RisingWave 类型ClickHouse 范围RisingWave 范围
Date32DATE1900-01-01 至 2299-12-310001-01-01 至 9999-12-31
DateTime64TIMESTAMPTZ1900-01-01 00:00:00 至 2299-12-31 23:59:59.999999990001-01-01 00:00:00 至 9999-12-31 23:59:59