Skip to main content

从 RisingWave 导出数据到 Apache Doris

本文介绍了如何将数据从 RisingWave 导出到 Apache Doris。Apache Doris 是一个适用于在线分析处理(OLAP)的开源实时数据仓库。更多信息,请参见 Apache Doris

开始之前

  • 确保 RisingWave 能够访问 Doris 后端和前端所在的网络。更多详情,请参见 通过外部表同步数据

  • 确保您有一个可以从中导出数据的上游物化视图或 Source。更多详情,请参见 CREATE SOURCECREATE MATERIALIZED VIEW

  • 确保对于 struct 元素,Doris 和 RisingWave 中的名称和类型相同。如果它们不相同,值将被设置为 NULL 或默认值。有关 struct 数据类型的更多详情,请参见 Struct

句法

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='doris',
connector_parameter = 'value', ...
);

参数

参数名称描述
type必填。指定 Sink 是 upsert 还是 append-only。如果创建 upsert Sink,您导出到的表需要有 UNIQUE KEY
doris.url必填。Doris 前端的连接端口。非 MySQL 连接端口。
doris.username必填。Doris 用户的用户名。
doris.password必填。与 Doris 用户关联的密码。
doris.database必填。目标 Doris 数据库。
doris.table必填。目标 Doris 表。
force_append_only可选。如果为 true,即使无法做到也强制使 Sink 为 append-only
primary_key可选。Sink 的主键。使用 ',' 分隔主键列。

示例

创建 Append-only Sink

要创建一个 append-only Sink,在 CREATE SINK 查询中设置 type = 'append-only'

CREATE SINK doris_sink FROM mv1
WITH (
connector = 'doris',
type = 'append-only',
doris.url = 'http://fe:8030',
doris.user = 'xxxx',
doris.password = 'xxxx',
doris.database = 'demo',
doris.table='demo_bhv_table',
force_append_only='true'
);

创建一个 Upsert Sink

要创建一个 upsert Sink,在 CREATE SINK 查询中设置 type = 'upsert'。创建 upsert sink 时,Doris 表必须有 UNIQUE KEY

CREATE SINK doris_sink FROM mv1 
WITH (
connector = 'doris',
type = 'upsert',
doris.url = 'http://fe:8030',
doris.user = 'xxxx',
doris.password = 'xxxx',
doris.database = 'demo',
doris.table='demo_bhv_table',
primary_key = 'user_id'
);

数据类型映射

以下表格显示了在创建 Sink 时应指定的 RisingWave 和 Doris 之间对应的数据类型。关于原生 RisingWave 数据类型的详情,请参见 数据类型概述

对于 decimal 类型,RisingWave 将四舍五入到最近的小数位以确保其精度与 Doris 匹配。确保导入 Doris 的 decimal 类型长度不超过 Doris 的 decimal 长度,否则将导入失败。

Doris 类型RisingWave 类型
BOOLEANBOOLEAN
SMALLINTSMALLINT
INTINTEGER
BIGINTBIGINT
FLOATREAL
DOUBLEDOUBLE
DECIMALDECIMAL
DATEDATE
STRING, VARCHARVARCHAR
不支持TIME
DATETIMETIMESTAMP WITHOUT TIME ZONE
不支持TIMESTAMP WITH TIME ZONE
不支持INTERVAL
STRUCTSTRUCT
ARRAYARRAY
不支持BYTEA
JSONBJSONB
BIGINTSERIAL