Skip to main content

从 Upstash Kafka 摄取数据

您可以将部署在 Upstash 中的 Kafka 数据导入到 RisingWave 中。Upstash 是一个无服务器平台,提供 Redis、Kafka 和 Qstash 服务,具有可扩展性、高级安全选项和专门支持服务等优势。

在 Upstash 上搭建 Kafka

本指南将介绍在 Upstash 上搭建 Kafka 集群并将其连接到 RisingWave 以进行数据摄取的步骤。有关从 Upstash 摄取数据的更多信息,请参阅 Upstash 文档

注册 Upstash Cloud 账户

首先注册一个免费的 Upstash Cloud 账户,该账户可提供对 Kafka 服务的访问权限。要创建账户,请访问 Upstash Cloud 账户

Sign up for Upstash Cloud

搭建 Kafka 集群

登录后,使用以下详细信息搭建 Kafka 集群:

  • Cluster Name:为您的 Kafka 集群指定唯一的标识名称。
  • Region:选择托管 Kafka 集群的区域。
  • Cluster Type:选择适合您需求的集群类型。
Create a cluster

创建 Kafka Topic

搭建 Kafka 集群后,可创建 Kafka Topic。Upstash Kafka 为分区数量和保留策略提供了默认配置,简化了创建过程。

Create a topic

连接并与您的 Kafka 集群交互

现即可使用各种 Kafka 客户端连接到您的 Kafka 集群。这些客户端使您能够从 Kafka Topic 中生成和消费数据。因此,您可以从 Python Wikipedia API 中提取实时数据,并将其导入 Upstash 中的 Kafka Topic。

Connect and interact with your Kafka cluster

通过这些步骤,您就可以利用 Upstash Kafka 和 RisingWave 的功能来构建流处理应用和管道了!

有关详细文档和特定客户端指南,请参阅 Upstash Kafka 文档

从 Upstash Kafka 摄取和处理数据

创建 RisingWave 集群

请参阅快速上手指南安装、启动、连接 RisingWave。

创建 Source

启动 RisingWave 集群后,可使用以下 SQL 查询创建 Source。

CREATE SOURCE wiki_source (
contributor VARCHAR,
title VARCHAR,
edit_timestamp TIMESTAMPTZ,
registration TIMESTAMPTZ,
gender VARCHAR,
edit_count VARCHAR
)
WITH(
connector = 'kafka',
topic = '<topic-name>',
properties.bootstrap.server = '<broker-url>',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'SASL_SSL',
properties.sasl.username = '<your-username>',
properties.sasl.password = '<your-password>'
) FORMAT PLAIN ENCODE JSON;

创建物化视图

让我们基于 Source wiki_source 创建一个名为 wiki_mv 的物化视图,该视图可过滤掉带有空值的行。

CREATE MATERIALIZED VIEW wiki_mv AS
SELECT
contributor,
title,
CAST(edit_timestamp AS TIMESTAMP) AS edit_timestamp,
CAST(registration AS TIMESTAMP) AS registration,
gender,
CAST(edit_count AS INT) AS edit_count
FROM wiki_source
WHERE edit_timestamp IS NOT NULL
AND registration IS NOT NULL
AND edit_count IS NOT NULL;

查询物化视图

可以查询物化视图来检索 Source 的最新数据:

SELECT * FROM wiki_mv LIMIT 5;

检索结果应该如下所示:

contributor    |   title                     |     edit_timestamp             |       registration        | gender  | edit_count
---------------+-----------------------------+---------------------------+---------------------------+---------+-----------

Omnipaedista | Template:Good and evil | 2023-12-03 10:22:02+00:00 | 2008-12-14 06:02:32+00:00 | male | 222563
PepeBonus | Moshi mo Inochi ga Egaketara| 2023-12-03 10:22:16+00:00 | 2012-06-02 13:39:53+00:00 | unknown | 20731
Koulog | Ionikos F.C. | 2023-12-03 10:23:00+00:00 | 2023-10-28 05:52:35+00:00 | unknown | 691
Fau Tzy | 2023 Liga 3 Maluku | 2023-12-03 10:23:17+00:00 | 2022-07-23 09:53:11+00:00 | unknown | 4697
Cavarrone | Cheers (season 8) | 2023-12-03 10:23:40+00:00 | 2008-08-23 11:13:14+00:00 | male | 83643

(5 rows)

您已成功从 Upstash Kafka Topic 消费数据到 RisingWave,创建了一个 Source 和一个物化视图,然后对其进行了查询。