快速上手
在本文中,我们将帮助您启动并连接 RisingWave,然后用它完成一些常见任务。
启动 RisingWave
以下提供的几种方式启动的都是 Playground 模式,这种模式主要用于快速测试。此模式下,数据会被存储于内存中,服务会在无活动 30 分钟后自动终止,所有用户数据会被随之删除。由于内存容量有限,资源密集型操作可能导致内存不足(OOM)错误。
对于较大规模的测试或单机部署,请考虑通过 Docker Compose 启动 RisingWave。若要在生产环境中使用 RisingWave,请使用 Operator 或 Helm 在 Kubernetes 上部署 RisingWave。
curl
安装单机模式
运行以下 curl
命令,根据提示安装。
curl https://risingwave.com/sh | sh
Docker
请先确保您已安装并运行 Docker Desktop,然后运行以下命令。
docker run -it --pull=always -p 4566:4566 -p 5691:5691 risingwavelabs/risingwave:latest playground
Homebrew
请先确保您已安装 Homebrew,然后运行以下命令:
brew tap risingwavelabs/risingwave
brew install risingwave
risingwave playground
二进制文件
wget https://github.com/risingwavelabs/risingwave/releases/download/v1.5.0/risingwave-v1.5.0-x86_64-unknown-linux-all-in-one.tar.gz
tar xvf risingwave-v1.5.0-x86_64-unknown-linux-all-in-one.tar.gz
# 不要移动解压后的文件或文件夹。这可能会在启动 RisingWave 时引起问题。
./risingwave playground
在线体验
点击此链接可通过浏览器在线体验 RisingWave:https://playground.risingwave.dev/。
连接 RisingWave
在启动并运行 RisingWave 后,我们将通过 Postgres 交互式终端 psql
连接 RisingWave。请确保您的环境中已安装 psql
。如需了解如何安装,请参见在不安装 PostgreSQL 的情况下安装 psql
。
要用 psql
连接 RisingWave,请打开一个新的终端窗口,并运行以下命令:
psql -h localhost -p 4566 -d dev -U root
关于 psql
选项的说明:
-h
选项用于指定要连接的 PostgreSQL 服务器的主机名或 IP 地址。-p
选项用于指定服务器监听的端口号。-d
选项用于指定要连接的数据库的名称。-U
选项用于指定进行连接的数据库用户。- 默认情况下,PostgreSQL 服务器使用
root
用户来连接dev
数据库。注意,此用户连接时不需要密码。
创建表
RisingWave 是一个数据库,所以您可以直接创建表并向其中插入数据。例如,我们可以创建一个表,用来存储有关网页访问的数据。
CREATE TABLE website_visits (
timestamp timestamp with time zone,
user_id varchar,
page_id varchar,
action varchar
);
插入数据
您可以通过两种方式将数据插入 RisingWave:直接插入数据,或者从流式数据源(streaming data source)导入数据。
对于直接插入数据,RisingWave 的操作与其他 SQL 数据库相同。例如,我们可以向刚刚创建的 website_visits
表直接插入 5 行数据。
INSERT INTO website_visits (timestamp, user_id, page_id, action) VALUES
('2023-06-13T10:00:00Z', 'user1', 'page1', 'view'),
('2023-06-13T10:01:00Z', 'user2', 'page2', 'view'),
('2023-06-13T10:02:00Z', 'user3', 'page3', 'view'),
('2023-06-13T10:03:00Z', 'user4', 'page1', 'view'),
('2023-06-13T10:04:00Z', 'user5', 'page2', 'view');
对于第二种方式,我们将在下一节讲解。
连接到数据源
通过上游 source(数据源)导入数据是 RisingWave 中最常见的数据获取方式,这些 source 包括 message queues(消息队列)、Change Data Capture(更改数据捕获,CDC)流等。对于这种方式,您需要先使用 CREATE SOURCE
命令连接一个数据源。
在这个演示里,我们用 Kafka 作为上游 source。假设您在 Kafka 中创建了一个名为 test
的 topic,并已有与 website_visits
表相同 schema 的如下数据:
{"timestamp": "2023-06-13T10:05:00Z", "user_id": "user1", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:06:00Z", "user_id": "user2", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:07:00Z", "user_id": "user3", "page_id": "page1", "action": "view"}
{"timestamp": "2023-06-13T10:08:00Z", "user_id": "user4", "page_id": "page2", "action": "view"}
{"timestamp": "2023-06-13T10:09:00Z", "user_id": "user5", "page_id": "page3", "action": "view"}
然后您可以通过以下命令从 RisingWave 连接该 topic:
CREATE SOURCE IF NOT EXISTS website_visits_stream (
timestamp timestamp with time zone,
user_id varchar,
page_id varchar,
action varchar
)
WITH (
connector='kafka',
topic='test',
properties.bootstrap.server='localhost:9092',
scan.startup.mode='earliest'
) FORMAT PLAIN ENCODE JSON;
请注意,在创建 source 之后,数据不会自动导入到 RisingWave 中。您需要创建一个物化视图来开始数据传输。
RisingWave 支持从包括主流消息队列和数据库在内的数据源导入数据。有关它们的详细信息,请参见我们支持的 source和我们支持的格式。
使用物化视图转换数据
在 RisingWave 中,数据通过物化视图进行 Join 和转换。您无需设置处理作业 (processing job) 或管道 (pipeline)。
物化视图可以基于表或 Source 创建,也可以基于表和 Source Join 之后的数据创建。
让我们创建一个物化视图,基于上一节创建的 source 中的数据,获取每个页面的总访问次数、去重访客数和最后访问时间。
CREATE MATERIALIZED VIEW visits_stream_mv AS
SELECT page_id,
count(*) AS total_visits,
count(DISTINCT user_id) AS unique_visitors,
max(timestamp) AS last_visit_time
FROM website_visits_stream
GROUP BY page_id;
查询数据
可使用 SELECT
命令查询表或物化视图中的数据。
例如,让我们查看物化视图 visits_stream_mv
的最新结果:
SELECT * FROM visits_stream_mv;
----返回结果
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------------
page2 | 2 | 2 | 2023-06-13 10:08:00+00:00
page1 | 2 | 2 | 2023-06-13 10:07:00+00:00
page3 | 1 | 1 | 2023-06-13 10:09:00+00:00
(3 rows)
随着新数据进入 RisingWave,visits_stream_mv
中的结果会自动更新,RisingWave 会在新数据进入时执行增量计算。
例如,如果您在 test
topic 中再输入五行数据:
{"timestamp": "2023-06-13T10:10:00Z", "user_id": "user1", "page_id": "page3", "action": "scroll"}
{"timestamp": "2023-06-13T10:11:00Z", "user_id": "user2", "page_id": "page1", "action": "click"}
{"timestamp": "2023-06-13T10:12:00Z", "user_id": "user3", "page_id": "page2", "action": "scroll"}
{"timestamp": "2023-06-13T10:13:00Z", "user_id": "user4", "page_id": "page3", "action": "view"}
{"timestamp": "2023-06-13T10:14:00Z", "user_id": "user5", "page_id": "page1", "action": "click"}
再次查询数据时,您看到的结果将更新:
SELECT * FROM visits_stream_mv;
----返回结果
page_id | total_visits | unique_visitors | last_visit_time
---------+--------------+-----------------+---------------------------
page2 | 3 | 3 | 2023-06-13 10:12:00+00:00
page3 | 3 | 3 | 2023-06-13 10:13:00+00:00
page1 | 4 | 4 | 2023-06-13 10:14:00+00:00
(3 rows)
从 RisingWave 中导出数据
表和物化视图中的数据是存储在 RisingWave 里的。因此您可以将数据从 RisingWave 导出到 Kafka Topic 或其他数据库中。
要从 RisingWave 导出数据,您需要使用 CREATE SINK
命令创建一个 sink。sink 可以基于现有的表、source 或物化视图创建,也可以基于一个临时 SELECT
查询创建。
例如,我们可以将 visits_stream_mv
中的所有数据导出到一个 Kafka topic 中:
CREATE SINK sink1 FROM visits_stream_mv
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='sink1'
) FORMAT PLAIN ENCODE JSON (
force_append_only='true',
);