Skip to main content

从 RisingWave 导出数据到 Cassandra 或 ScyllaDB

您可以将数据从 RisingWave 导出到 Cassandra。由于 ScyllaDB 可以作为 Cassandra 的直接替代品,这意味着您也可以将数据从 RisingWave 导出到 ScyllaDB。

本文描述了如何使用 RisingWave 中的 Cassandra Sink 连接器将数据导出到 Cassandra 或 ScyllaDB。

测试版功能

RisingWave 中的 Cassandra Sink 连接器目前处于测试阶段。如果您遇到任何问题或有任何反馈,请联系我们。

开始之前

  • 确保您的 Cassandra 或 ScyllaDB 集群可以从 RisingWave 访问。

  • 如果您在本地通过二进制文件运行 RisingWave,并打算使用原生 CDC Source 连接器或 JDBC Sink 连接器,请确保您的环境中安装了 JDK 11 或更高版本。

句法

要将数据导出到 Cassandra 或 ScyllaDB,请使用以下句法在 RisingWave 中创建 Cassandra Sink:

CREATE SINK [ IF NOT EXISTS ] sink_name
[FROM sink_from | AS select_query]
WITH (
connector='cassandra',
type='<type>',
cassandra.url = '<node1>,<node2>,<node3>',
cassandra.keyspace = '<keyspace>',
cassandra.table = '<cassandra_table>',
cassandra.datacenter = '<data_center>'
);

创建 Sink 后,数据变更将被流式传输到指定的表。

参数

参数名称描述
sink_name要创建的 Sink 的名称。
sink_from指定将从中输出数据的直接来源的子句。sink_from 可以是物化视图或表。必须指定此子句或 select_query 查询之一。
AS select_query指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或一个 sink_from 子句。有关 SELECT 命令的句法和示例,请参见 SELECT
type必填。指定 Sink 是 upsert 还是 append-only。如果创建的是 upsert Sink,必须指定主键。
primary_key可选。指定 Cassandra Sink 的主键的 Column 名称列表字符串,以逗号分隔。
force_append_only如果为 true,即使无法做到也会强制 Sink 为 append-only
cassandra.url必填。您想要连接的 Cassandra 或 ScyllaDB 集群或节点的 URL 或 IP 地址。
cassandra.keyspace必填。您想要存储数据的 Cassandra 数据库或 ScyllaDB 中的 keyspace 名称。Keyspace 是用于在 Cassandra 中组织数据的逻辑容器。
cassandra.table必填。指定 keyspace 中您想要插入或更新数据的表的名称。
cassandra.datacenter可选。如果是多数据中心的 Cassandra,您可能需要指定目标数据中心的名称,数据应写入该数据中心。
note

RisingWave 中的 Cassandra Sink 提供至少一次交付语义。在失败的情况下,事件可能会被重新交付。我们推荐使用 upsert Sink 类型以避免重复数据。

数据类型映射

RisingWave 数据类型Cassandra 数据类型
booleanboolean
smallintsmallint
integerint
bigintbigint
numericdouble
realfloat
double precisiondouble
character varying (varchar)text
byteablob
datedate
time without time zonetime
timestamp without time zonetimestamp
timestamp with time zonetimestamp
intervalduration
struct不支持
array不支持
JSONB不支持