Skip to main content

使用 RisingWave 为 Supabase 赋予流处理能力

Supabase 是一个开源的 Firebase 替代品。它使用 PostgreSQL 作为底层存储系统,因此可以无缝地与 RisingWave 集成。

您可以将数据从 Supabase 导入到 RisingWave 进行处理,并将处理好的数据从 RisingWave 导出到 Supabase。

我们有端到端 Demo来展示 Supabase 用户如何无缝集成 RisingWave 以增强产品的流处理能力。

本指南提供了 RisingWave 与 Supabase 之间的简化版的集成,模拟了社交媒体监控场景。

在本文中,我们将涵盖以下任务:

  • 从 Supabase 导入用户信息和帖子数据到 RisingWave。
  • 计算最近帖子的实时结果,如用户发送的帖子数量。
  • 将实时结果从 RisingWave 导出到 Supabase。

开始之前

创建 Supabase 表并启用 Table Replication

首先,在 Supabase 中创建两个表:users 表存储用户信息,posts 表存储用户发送的帖子。

Supabase 表

确保启用这两个表的数据复制(data replication)。有关 Supabase 表数据复制的更多信息,请见 Supabase 文档 - Replication

在 Supabase 中启用表复制

将数据导入 RisingWave

接下来,我们可以使用 PostgreSQL CDC 连接器 将数据从 Supabase 复制到 RisingWave。

要实时将数据导入 RisingWave,您需要在 RisingWave 中创建带有连接器设置的两个表:

表1
CREATE TABLE users (
id int8,
created_at TIMESTAMPTZ,
name string,
PRIMARY KEY(id)
)
WITH (
connector='postgres-cdc',
hostname = 'db.xxxxxx.supabase.co',
port = '5432',
username = 'postgres',
password = 'xxxxxx',
database.name = 'postgres',
schema.name = 'public',
table.name = 'users',
publication.name = 'rw_publication' -- Supabase 中的数据库 Replication 的名称
);
表2
CREATE TABLE posts (
id int8,
created_at TIMESTAMPTZ,
user_id int8,
content string,
PRIMARY KEY(id)
)
WITH (
...... -- 与上面相同
);

计算实时结果

有了导入到 RisingWave 的数据,现在可以使用物化视图进行实时数据计算了。RisingWave 中的物化视图支持增量计算,确保每次查询物化视图时都能获得最新结果。

获取最新的帖子

以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取最新的 100 条帖子。

CREATE MATERIALIZED VIEW recent_posts AS (
SELECT name, content, posts.created_at as created_at FROM posts
JOIN users ON posts.user_id = users.id
ORDER BY posts.created_at DESC LIMIT 100
);

实时获取热门话题标签

以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取每天的热门话题标签。

CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS (
SELECT
regexp_matches(content, '#\w+', 'g') AS hashtag,
created_at
FROM posts
)
SELECT
hashtag,
COUNT(*) AS hashtag_occurrences,
window_start
FROM
TUMBLE(tags, created_at, INTERVAL '1 day')
GROUP BY
hashtag,
window_start;

获取用户发送的帖子数量

以下 SQL 语句在 RisingWave 中创建了一个物化视图,以获取用户发送的帖子数量。

CREATE MATERIALIZED VIEW user_posts_cnt AS (
SELECT
users.id AS user_id,
COUNT(posts.id) AS cnt
FROM posts JOIN users ON users.id = posts.user_id
GROUP BY users.id
);

将实时结果导出发送至 Supabase

尽管 RisingWave 可以直接提供实时结果,但您可能更喜欢将这些结果发送到 Supabase 进行进一步分析。

您可以使用 JDBC 连接器将来自 RisingWave 的数据发送到 Supabase。

让我们将用户发送的帖子数量的实时结果发送到 Supabase。

在 RisingWave 中创建 Sink 之前,我们需要在 Supabase 中创建目标表user_posts_cnt。模式如下所示:

在Supabase中创建sink的新表

表创建完成后,我们现在可以在 RisingWave 中运行以下语句,将结果发送到刚刚创建的 Supabase 表中。

CREATE SINK supabase_user_posts_cnt 
FROM user_posts_cnt WITH (
connector='jdbc',
jdbc.url='jdbc:postgresql://db.xxxxxx.supabase.co:5432/postgres?user=postgres&password=xxxxxx',
table.name = 'user_posts_cnt',
type = 'upsert',
primary_key= 'user_id'
)

成功创建 Sink 后,您应该能够在 Supabase 中看到结果。尝试向 usersposts 表中添加新行,您将实时看到 user_posts_cnt 中的结果已更新。