使用 RisingWave 为 Supabase 赋予流处理能力
Supabase 是一个开源的 Firebase 替代品。它使用 PostgreSQL 作为底层存储系统,因此可以无缝地与 RisingWave 集成。
您可以将数据从 Supabase 导入到 RisingWave 进行处理,并将处理好的数据从 RisingWave 导出到 Supabase。
我们有端到端 Demo来展示 Supabase 用户如何无缝集成 RisingWave 以增强产品的流处理能力。
本指南提供了 RisingWave 与 Supabase 之间的简化版的集成,模拟了社交媒体监控场景。
在本文中,我们将涵盖以下任务:
- 从 Supabase 导入用户信息和帖子数据到 RisingWave。
- 计算最近帖子的实时结果,如用户发送的帖子数量。
- 将实时结果从 RisingWave 导出到 Supabase。
开始之前
- 安装并连接到 RisingWave。详情见 快速上手指南。
- 创建一个新的 Supabase Project。详情见 Supabase 文档。
创建 Supabase 表并启用 Table Replication
首先,在 Supabase 中创建两个表:users
表存储用户信息,posts
表存储用户发送的帖子。
确保启用这两个表的数据复制(data replication)。有关 Supabase 表数据复制的更多信息,请见 Supabase 文档 - Replication。
将数据导入 RisingWave
接下来,我们可以使用 PostgreSQL CDC 连接器 将数据从 Supabase 复制到 RisingWave。
要实时将数据导入 RisingWave,您需要在 RisingWave 中创建带有连接器设置的两个表:
CREATE TABLE users (
id int8,
created_at TIMESTAMPTZ,
name string,
PRIMARY KEY(id)
)
WITH (
connector='postgres-cdc',
hostname = 'db.xxxxxx.supabase.co',
port = '5432',
username = 'postgres',
password = 'xxxxxx',
database.name = 'postgres',
schema.name = 'public',
table.name = 'users',
publication.name = 'rw_publication' -- Supabase 中的数据库 Replication 的名称
);
CREATE TABLE posts (
id int8,
created_at TIMESTAMPTZ,
user_id int8,
content string,
PRIMARY KEY(id)
)
WITH (
...... -- 与上面相同
);
计算实时结果
有了导入到 RisingWave 的数据,现在可以使用物化视图进行实时数据计算了。RisingWave 中的物化视图支持增量计算,确保每次查询物化视图时都能获得最新结果。
获取最新的帖子
以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取最新的 100 条帖子。
CREATE MATERIALIZED VIEW recent_posts AS (
SELECT name, content, posts.created_at as created_at FROM posts
JOIN users ON posts.user_id = users.id
ORDER BY posts.created_at DESC LIMIT 100
);
实时获取热门话题标签
以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取每天的热门话题标签。
CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS (
SELECT
regexp_matches(content, '#\w+', 'g') AS hashtag,
created_at
FROM posts
)
SELECT
hashtag,
COUNT(*) AS hashtag_occurrences,
window_start
FROM
TUMBLE(tags, created_at, INTERVAL '1 day')
GROUP BY
hashtag,
window_start;
获取用户发送的帖子数量
以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取用户发送的帖子数量。
CREATE MATERIALIZED VIEW user_posts_cnt AS (
SELECT
users.id AS user_id,
COUNT(posts.id) AS cnt
FROM posts JOIN users ON users.id = posts.user_id
GROUP BY users.id
);
将实时结果导出发送至 Supabase
尽管 RisingWave 可以直接提供实时结果,但您可能更喜欢将这些结果发送到 Supabase 进行进一步分析。
您可以使用 JDBC 连接器将来自 RisingWave 的数据发送到 Supabase。
让我们将用户发送的帖子数量的实时结果发送到 Supabase。
在 RisingWave 中创建 Sink 之前,我们需要在 Supabase 中创建目标表user_posts_cnt
。模式如下所示:
表创建完成后,我们现在可以在 RisingWave 中运行以下语句,将结果发送到刚刚创建的 Supabase 表中。
CREATE SINK supabase_user_posts_cnt
FROM user_posts_cnt WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db.xxxxxx.supabase.co:5432/postgres?user=postgres&password=xxxxxx',
table.name = 'user_posts_cnt',
type = 'upsert',
primary_key= 'user_id'
)
成功创建 Sink 后,您应该能够在 Supabase 中看到结果。尝试向 users
和 posts
表中添加新行,您将实时看到 user_posts_cnt
中的结果已更新。