Skip to main content

从 RisingWave 导出数据到 Elasticsearch

您可以将在 RisingWave 中摄取和转换的数据传送导出到 Elasticsearch,以服务于搜索或分析的需求。

本文描述了如何使用 RisingWave 中的 Elasticsearch Sink 连接器将数据导出到 Elasticsearch。

Elasticsearch 是一个分布式的、RESTful 搜索和分析引擎,能够处理越来越多的用例。它将您的数据集中存储,以便于快速搜索、精细调整的相关性和轻松扩展的强大分析。

测试版功能

RisingWave 中的 Elasticsearch Sink 连接器目前是测试版功能,仅支持 Elasticsearch 的 7.x 和 8.x 版本。如果您遇到任何问题或有反馈,请联系我们。

note

RisingWave 中的 Elasticsearch Sink 连接器提供至少一次交付语义。在失败的情况下,事件可能会被重新传送。

开始之前

  • 确保 Elasticsearch 集群(7.x 或 8.x 版本)可以从 RisingWave 访问。

  • 如果您在本地从二进制文件运行 RisingWave,请确保您的环境中安装了 JDK 11 或更高版本。

创建 Elasticsearch Sink

使用以下句法创建 Elasticsearch Sink。一旦创建了 Sink,任何对 Sink 的插入或更新都将被流式传输到指定的 Elasticsearch 端点。

CREATE SINK sink_name
[ FROM sink_from | AS select_query ]
WITH (
connector = 'elasticsearch',
primary_key = '<sink_from 对象的主键>',
index = '<您的 Elasticsearch 索引>',
url = 'http://<ES 主机名>:<ES 端口>',
username = '<您的 ES 用户名>',
password = '<您的密码>',
delimiter='<分隔符>'
);

参数

参数描述
sink_name要创建的 Sink 的名称。
sink_from指定将从中输出数据的直接来源的子句。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询中的一个。
AS select_query指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句中的一个。有关 SELECT 命令的句法和示例,请参见 SELECT
primary_key可选。Sink 的主键。如果主键有多个列,请在下面的 delimiter 参数中设置分隔符以连接它们。
index必填。目标 Elasticsearch 索引的名称。
url必填。Elasticsearch REST API 端点的 URL。
username可选。用于访问 Elasticsearch 端点的 elastic 用户名。必须与 password 一同指定。
password可选。用于访问 Elasticsearch 端点的密码。必须与 username 一同指定。
delimiter可选。当 Sink 的主键有多个列时,Elasticsearch ID 的分隔符。
note

在 8.x 以下版本中,有一个 type 参数。在 Elasticsearch 6.x 中,用户可以直接设置类型,但从 7.x 开始,它被设置为不推荐,并且默认值统一为 '_doc'。在 8.x 版本中,类型已被完全移除。有关更多详情,请参见 Elasticsearch 的官方文档

因此,如果您使用的是 Elasticsearch 7.x,我们将其设置为官方推荐的值,即 '_doc'。如果您使用的是 Elasticsearch 8.x,该参数已被 Elasticsearch 官方移除,因此无需设置。

主键和 Elasticsearch ID 的说明

Elasticsearch Sink 默认为 upsert Sink 类型。它不支持 append-only Sink 类型。

如果您想自定义 Elasticsearch ID,请通过 primary_key 参数指定。RisingWave 会将多个主键值组合成一个字符串,并使用您设置的分隔符作为 Elasticsearch ID。

如果您不想自定义 Elasticsearch ID,RisingWave 会使用 Sink 定义中的第一列作为 Elasticsearch ID。

数据类型映射

ElasticSearch 使用名为 动态字段映射 的机制,自动创建字段并自动确定其类型。它将所有整数类型视为 long,所有浮点类型视为 float。为了确保 RisingWave 中的数据类型正确映射到 Elasticsearch 中的数据类型,我们建议在创建 Sink 之前通过 索引模板动态模板 指定映射。

RisingWave 数据类型ElasticSearch 字段类型
booleanboolean
smallintlong
integerlong
bigintlong
numerictext
realfloat
double precisionfloat
character varyingtext
byteatext
datedate
time without time zonetext
timestamp without time zonetext
timestamp with time zonetext
intervaltext
structobject
arrayarray
JSONBobject (RisingWave 的 Elasticsearch Sink 会将 JSONB 作为 JSON 字符串发送,Elasticsearch 会将其转换为对象)
note

Elasticsearch 不要求用户显式地 CREATE TABLE。相反,它根据首次摄取的记录基础上推断模式。例如,如果第一条记录包含一个 jsonb '{v1: 100}',v1 将被推断为 long 类型。然而,如果下一条记录是 '{v1: "abc"}',摄取将失败,因为 "abc" 被推断为字符串,两种类型不兼容。 需要注意这个行为,否则您的数据可能会比预期少。在监控方面,您可以查看 Grafana,有面板显示所有 Sink 写入错误。