从 RisingWave 导出数据到 Elasticsearch
您可以将在 RisingWave 中摄取和转换的数据传送导出到 Elasticsearch,以服务于搜索或分析的需求。
本文描述了如何使用 RisingWave 中的 Elasticsearch Sink 连接器将数据导出到 Elasticsearch。
Elasticsearch 是一个分布式的、RESTful 搜索和分析引擎,能够处理越来越多的用例。它将您的数据集中存储,以便于快速搜索、精细调整的相关性和轻松扩展的强大分析。
RisingWave 中的 Elasticsearch Sink 连接器目前是测试版功能,仅支持 Elasticsearch 的 7.x 和 8.x 版本。如果您遇到任何问题或有反馈,请联系我们。
RisingWave 中的 Elasticsearch Sink 连接器提供至少一次交付语义。在失败的情况下,事件可能会被重新传送。
开始之前
确保 Elasticsearch 集群(7.x 或 8.x 版本)可以从 RisingWave 访问。
如果您在本地从二进制文件运行 RisingWave,请确保您的环境中安装了 JDK 11 或更高版本。
创建 Elasticsearch Sink
使用以下句法创建 Elasticsearch Sink。一旦创建了 Sink,任何对 Sink 的插入或更新都将被流式传输到指定的 Elasticsearch 端点。
CREATE SINK sink_name
[ FROM sink_from | AS select_query ]
WITH (
connector = 'elasticsearch',
primary_key = '<sink_from 对象的主键>',
index = '<您的 Elasticsearch 索引>',
url = 'http://<ES 主机名>:<ES 端口>',
username = '<您的 ES 用户名>',
password = '<您的密码>',
delimiter='<分隔符>'
);
参数
参数 | 描述 |
---|---|
sink_name | 要创建的 Sink 的名称。 |
sink_from | 指定将从中输出数据的直接来源的子句。sink_from 可以是物化视图或表。必须指定此子句或 SELECT 查询中的一个。 |
AS select_query | 指定要输出到 Sink 的数据的 SELECT 查询。必须指定此查询或 FROM 子句中的一个。有关 SELECT 命令的句法和示例,请参见 SELECT。 |
primary_key | 可选。Sink 的主键。如果主键有多个列,请在下面的 delimiter 参数中设置分隔符以连接它们。 |
index | 必填。目标 Elasticsearch 索引的名称。 |
url | 必填。Elasticsearch REST API 端点的 URL。 |
username | 可选。用于访问 Elasticsearch 端点的 elastic 用户名。必须与 password 一同指定。 |
password | 可选。用于访问 Elasticsearch 端点的密码。必须与 username 一同指定。 |
delimiter | 可选。当 Sink 的主键有多个列时,Elasticsearch ID 的分隔符。 |
在 8.x 以下版本中,有一个 type
参数。在 Elasticsearch 6.x 中,用户可以直接设置类型,但从 7.x 开始,它被设置为不推荐,并且默认值统一为 '_doc'。在 8.x 版本中,类型已被完全移除。有关更多详情,请参见 Elasticsearch 的官方文档。
因此,如果您使用的是 Elasticsearch 7.x,我们将其设置为官方推荐的值,即 '_doc'。如果您使用的是 Elasticsearch 8.x,该参数已被 Elasticsearch 官方移除,因此无需设置。
主键和 Elasticsearch ID 的说明
Elasticsearch Sink 默认为 upsert
Sink 类型。它不支持 append-only
Sink 类型。
如果您想自定义 Elasticsearch ID,请通过 primary_key
参数指定。RisingWave 会将多个主键值组合成一个字符串,并使用您设置的分隔符作为 Elasticsearch ID。
如果您不想自定义 Elasticsearch ID,RisingWave 会使用 Sink 定义中的第一列作为 Elasticsearch ID。
数据类型映射
ElasticSearch 使用名为 动态字段映射 的机制,自动创建字段并自动确定其类型。它将所有整数类型视为 long,所有浮点类型视为 float。为了确保 RisingWave 中的数据类型正确映射到 Elasticsearch 中的数据类型,我们建议在创建 Sink 之前通过 索引模板 或 动态模板 指定映射。
RisingWave 数据类型 | ElasticSearch 字段类型 |
---|---|
boolean | boolean |
smallint | long |
integer | long |
bigint | long |
numeric | text |
real | float |
double precision | float |
character varying | text |
bytea | text |
date | date |
time without time zone | text |
timestamp without time zone | text |
timestamp with time zone | text |
interval | text |
struct | object |
array | array |
JSONB | object (RisingWave 的 Elasticsearch Sink 会将 JSONB 作为 JSON 字符串发送,Elasticsearch 会将其转换为对象) |
Elasticsearch 不要求用户显式地 CREATE TABLE
。相反,它根据首次摄取的记录基础上推断模式。例如,如果第一条记录包含一个 jsonb '{v1: 100}',v1 将被推断为 long 类型。然而,如果下一条记录是 '{v1: "abc"}',摄取将失败,因为 "abc" 被推断为字符串,两种类型不兼容。
需要注意这个行为,否则您的数据可能会比预期少。在监控方面,您可以查看 Grafana,有面板显示所有 Sink 写入错误。