Skip to main content

从 WarpStream 摄取数据

您可以将 WarpStream(直接构建在 S3 之上、与 Apache Kafka 兼容的数据流平台)中的数据导入到 RisingWave 中。

本指南将帮助您设置 WarpStream,将数据导入到 RisingWave,并在 RisingWave 中创建和查询物化视图。

设置 WarpStream

安装 WarpStream Agent

要与 WarpStream 集群交互,请安装 WarpStream Agent 或 CLI:

curl https://console.warpstream.com/install.sh | bash

创建 Kafka Topic 并生成示例数据

安装完成后,在终端运行以下命令:

  • 启动 WarpStream Playground:

    warpstream playground
  • 生成带有虚假点击流数据的 Kafka Topic:

    warpstream demo

将数据导入到 RisingWave 中

创建 Source

在 RisingWave 中,创建名为 "website_visits" 的 Source,将 RisingWave 连接到 WarpStream Topic:

CREATE SOURCE IF NOT EXISTS website_visits_stream (
timestamp timestamp,
user_id varchar,
page_id varchar,
action varchar
)
WITH (
connector='kafka',
topic='demo-stream',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
) ROW FORMAT JSON;

创建物化视图

在 RisingWave 中创建物化视图,深入了解用户在不同页面上的行为,分析网站流量和用户参与度。

CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id,
count(*) AS total_visits,
count(DISTINCT user_id) AS unique_visitors,
max(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;

查询物化视图

让我们从创建的物化视图中检索数据:

SELECT * FROM visits_stream_mv;

预期结果:


page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------
page_0 | 2 | 2 | 2023-07-26 19:03:08
page_4 | 9 | 9 | 2023-07-26 19:03:00
page_8 | 9 | 9 | 2023-07-26 19:02:57
page_3 | 14 | 14 | 2023-07-26 19:03:09
page_7 | 4 | 4 | 2023-07-26 19:02:52
page_1 | 7 | 6 | 2023-07-26 19:02:55
page_5 | 9 | 9 | 2023-07-26 19:03:01
page_9 | 12 | 12 | 2023-07-26 19:02:48
page_2 | 4 | 4 | 2023-07-26 19:02:58
page_6 | 7 | 6 | 2023-07-26 19:03:03

您已成功将 WarpStream 中的数据导入到 RisingWave 中,创建了物化视图,并在 RisingWave 中对其进行了查询。