Skip to main content

快速上手

在本文中,我们将帮助您启动并连接 RisingWave,然后用它完成一些常见任务。

启动 RisingWave

info

以下提供的几种方式启动的都是 Playground 模式,这种模式主要用于快速测试。此模式下,数据会被存储于内存中,服务会在无活动 30 分钟后自动终止,所有用户数据会被随之删除。由于内存容量有限,资源密集型操作可能导致内存不足(OOM)错误。

对于较大规模的测试或单机部署,请考虑通过 Docker Compose 启动 RisingWave。若要在生产环境中使用 RisingWave,请使用 OperatorHelm 在 Kubernetes 上部署 RisingWave。

curl 安装单机模式

运行以下 curl 命令,根据提示安装。

curl https://risingwave.com/sh | sh

Docker

请先确保您已安装并运行 Docker Desktop,然后运行以下命令。

docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground

Homebrew

请先确保您已安装 Homebrew,然后运行以下命令:

brew tap risingwavelabs/risingwave
brew install risingwave
risingwave playground

二进制文件

wget https://github.com/risingwavelabs/risingwave/releases/download/v1.5.0/risingwave-v1.5.0-x86_64-unknown-linux-all-in-one.tar.gz
tar xvf risingwave-v1.5.0-x86_64-unknown-linux-all-in-one.tar.gz
# 不要移动解压后的文件或文件夹。这可能会在启动 RisingWave 时引起问题。
./risingwave playground

在线体验

点击此链接可通过浏览器在线体验 RisingWave:https://playground.risingwave.dev/

连接 RisingWave

在启动并运行 RisingWave 后,我们将通过 Postgres 交互式终端 psql 连接 RisingWave。请确保您的环境中已安装 psql。如需了解如何安装,请参见在不安装 PostgreSQL 的情况下安装 psql

要用 psql 连接 RisingWave,请打开一个新的终端窗口,并运行以下命令:

psql -h localhost -p 4566 -d dev -U root

关于 psql 选项的说明:

  • -h 选项用于指定要连接的 PostgreSQL 服务器的主机名或 IP 地址。
  • -p 选项用于指定服务器监听的端口号。
  • -d 选项用于指定要连接的数据库的名称。
  • -U 选项用于指定进行连接的数据库用户。
  • 默认情况下,PostgreSQL 服务器使用 root 用户来连接 dev 数据库。注意,此用户连接时不需要密码。

创建表

RisingWave 是一个数据库,所以您可以直接创建表并向其中插入数据。例如,我们可以创建一个表,用来存储有关网页访问的数据。

CREATE TABLE website_visits (
timestamp timestamp with time zone,
user_id varchar,
page_id varchar,
action varchar
);

插入数据

您可以通过两种方式将数据插入 RisingWave:直接插入数据,或者从流式数据源(streaming data source)导入数据。

对于直接插入数据,RisingWave 的操作与其他 SQL 数据库相同。例如,我们可以向刚刚创建的 website_visits 表直接插入 5 行数据。

INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');

对于第二种方式,我们将在下一节讲解。

连接到数据源

通过上游 source(数据源)导入数据是 RisingWave 中最常见的数据获取方式,这些 source 包括 message queues(消息队列)、Change Data Capture(更改数据捕获,CDC)流等。对于这种方式,您需要先使用 CREATE SOURCE 命令连接一个数据源。

在这个演示里,我们用 Kafka 作为上游 source。假设您在 Kafka 中创建了一个名为 test 的 topic,并已有与 website_visits 表相同 schema 的如下数据:

{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}

然后您可以通过以下命令从 RisingWave 连接该 topic:

CREATE SOURCE IF NOT EXISTS website_visits_stream (
timestamp timestamp with time zone,
user_id varchar,
page_id varchar,
action varchar
)
WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
) FORMAT PLAIN ENCODE JSON;

请注意,在创建 source 之后,数据不会自动导入到 RisingWave 中。您需要创建一个物化视图来开始数据传输。

RisingWave 支持从包括主流消息队列和数据库在内的数据源导入数据。有关它们的详细信息,请参见我们支持的 source我们支持的格式

使用物化视图转换数据

在 RisingWave 中,数据通过物化视图进行 Join 和转换。您无需设置处理作业 (processing job) 或管道 (pipeline)。

物化视图可以基于表或 Source 创建,也可以基于表和 Source Join 之后的数据创建。

让我们创建一个物化视图,基于上一节创建的 source 中的数据,获取每个页面的总访问次数、去重访客数和最后访问时间。

CREATE MATERIALIZED VIEW visits_stream_mv AS 
SELECT page_id,
count(*) AS total_visits,
count(DISTINCT user_id) AS unique_visitors,
max(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;

查询数据

可使用 SELECT 命令查询表或物化视图中的数据。

例如,让我们查看物化视图 visits_stream_mv 的最新结果:

SELECT * FROM visits_stream_mv;
 ----返回结果
 page_id | total_visits | unique_visitors |   last_visit_time
---------+--------------+-----------------+---------------------------
page2 | 2 | 2 | 2023-06-13 10:08:00+00:00
page1 | 2 | 2 | 2023-06-13 10:07:00+00:00
page3 | 1 | 1 | 2023-06-13 10:09:00+00:00
(3 rows)

随着新数据进入 RisingWave,visits_stream_mv 中的结果会自动更新,RisingWave 会在新数据进入时执行增量计算。

例如,如果您在 test topic 中再输入五行数据:

{"timestamp": "2023-06-13T10:10:00Z", "user_id": "user1", "page_id": "page3", "action": "scroll"}
{"timestamp": "2023-06-13T10:11:00Z", "user_id": "user2", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:12:00Z", "user_id": "user3", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:13:00Z", "user_id": "user4", "page_id": "page3", "action": "view"}
{"timestamp": "2023-06-13T10:14:00Z", "user_id": "user5", "page_id": "page1", "action": "click"}

再次查询数据时,您看到的结果将更新:

SELECT * FROM visits_stream_mv;
----返回结果
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------------
page2 | 3 | 3 | 2023-06-13 10:12:00+00:00
page3 | 3 | 3 | 2023-06-13 10:13:00+00:00
page1 | 4 | 4 | 2023-06-13 10:14:00+00:00
(3 rows)

从 RisingWave 中导出数据

表和物化视图中的数据是存储在 RisingWave 里的。因此您可以将数据从 RisingWave 导出到 Kafka Topic 或其他数据库中。

要从 RisingWave 导出数据,您需要使用 CREATE SINK 命令创建一个 sink。sink 可以基于现有的表、source 或物化视图创建,也可以基于一个临时 SELECT 查询创建。

例如,我们可以将 visits_stream_mv 中的所有数据导出到一个 Kafka topic 中:

CREATE SINK sink1 FROM visits_stream_mv
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='sink1'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);