快速处理 Twitter 事件
概览
社交媒体平台每天处理来自数百万用户的海量消息。因为热门话题在不断变化,对企业来说,跟上这些帖子是具有挑战性的。但分析这些消息是必要的,因为它能让企业通过了解消费者和竞争对手的价值观来做出更具战略性的商业决策。针对这种情形,流式系统会很有帮助,因为它让公司能够及时了解时下流行话题。
为了跟踪话题,像 Twitter 这样的社交媒体平台使用了话题标签来展示帖子内容。话题标签的使用次数能反映出用户参与度。如果某话题标签经常被使用,它表明这个特定的话题很受欢迎。如果我们跟踪话题标签随时间的使用频率,可以确定观众参与度是在增加还是在减少。
在本教程中,您将学习如何使用 RisingWave 从文本数据中提取有价值的洞察。我们为本教程设置了一个演示集群,以便您可以轻松尝试。
开始之前
- 确保您的环境中安装了 Docker 和 Docker Compose。请注意,Docker Compose 包含在 Windows 和 macOS 的 Docker Desktop 中。如果您使用 Docker Desktop,请确保在启动演示集群之前已经运行。
- 确保 PostgreSQL 交互式终端
psql
已安装在您的环境中。有关详细说明,请参阅 下载 PostgreSQL。
第 1 步:启动演示集群
在演示集群中,我们打包了 RisingWave 和一个工作负载生成器。一旦集群启动,工作负载生成器将开始生成随机流量并将它们发送到 Kafka。
首先,将 risingwave 仓库克隆到环境中。
git clone https://github.com/risingwavelabs/risingwave.git
导航到 integration_tests/twitter
目录,并从 docker compose 文件启动演示集群。
cd risingwave/integration_tests/twitter
docker compose up -d
Compose V2 中的默认命令行句法以 docker compose
开头。详见 Docker 文档。
如果您使用的是 Compose V1,请改用 docker-compose
。
必要的 RisingWave 组件将被启动,包括 Frontend 节点、compute 节点、Meta 节点和 MinIO。工作负载生成器将开始生成随机数据并将它们发送到 Kafka topic。在这个演示集群中,物化视图的数据将存储在 MinIO 实例中。
连接到 RisingWave 以管理数据流并执行数据分析。
psql -h localhost -p 4566 -d dev -U root
第 2 步:将 RisingWave 连接到数据流
本教程将使用 RisingWave 来消费数据流并执行数据分析。推文将被用作示例数据,这样我们就可以查询给定日期上最受欢迎的话题标签,以跟踪流行话题。
以下是推文和 Twitter 用户的 Schema。在 tweet
Schema 中,text
包含了一条推文的内容,created_at
包含了一条推文发布的日期和时间。话题标签将从 text
中提取。
{
"data": {
"created_at": "2020-02-12T17:09:56.000Z",
"id": "1227640996038684673",
"text": "Doctors: Googling stuff online does not make you a doctor\n\nDevelopers: https://t.co/mrju5ypPkb",
"lang": "English"
},
"author": {
"created_at": "2013-12-14T04:35:55.000Z",
"id": "2244994945",
"name": "Singularity Data",
"username": "singularitty"
}
}
使用以下 SQL 语句连接到数据流。
CREATE SOURCE twitter (
data STRUCT < created_at TIMESTAMP WITH TIME ZONE,
id VARCHAR,
text VARCHAR,
lang VARCHAR >,
author STRUCT < created_at TIMESTAMP WITH TIME ZONE,
id VARCHAR,
name VARCHAR,
username VARCHAR,
followers INT >
) WITH (
connector = 'kafka',
topic = 'twitter',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;
请注意,SQL 语句使用了 STRUCT 数据类型。有关 STRUCT 数据类型的详细信息,请参阅 数据类型。
第 3 步:定义物化视图并分析数据
本教程将创建一个物化视图,用于跟踪每个话题标签每天的使用频率。
首先,使用 regexp_matches
函数从推文中提取所有使用的话题标签。例如,给定以下推文:
为高昂的扩展成本感到苦恼?别担心!#RisingWave 关心性能和成本效益。我们使用分层架构,充分利用 #cloud 资源,为用户提供对成本和性能的细粒度控制。
regexp_matches
函数将找到推文中与 RegEx 模式 #\w+
匹配的所有文本。这将提取推文中的所有话题标签并将它们存储在一个数组中。
hashtag | created_at
----------------------+--------------------------
[#RisingWave, #cloud] | 2022-05-18 17:00:00+00:00
然后 unnest
函数将把数组中的每个项目分离成单独的行。
hashtag | created_at
----------------------------------------
#RisingWave | 2022-05-18 17:00:00+00:00
#cloud | 2022-05-18 17:00:00+00:00
最后,我们可以按 hashtag
和 window_start
分组,计算每个话题标签每天的使用次数。
CREATE MATERIALIZED VIEW hot_hashtags AS WITH tags AS (
SELECT
unnest(regexp_matches((data).text, '#\w+', 'g')) AS hashtag,
(data).created_at AS created_at
FROM
twitter
)
SELECT
hashtag,
COUNT(*) AS hashtag_occurrences,
window_start
FROM
TUMBLE(tags, created_at, INTERVAL '1 day')
GROUP BY
hashtag,
window_start;
第 4 步:查询结果
我们可以查询十个最常使用的话题标签。
SELECT * FROM hot_hashtags
ORDER BY hashtag_occurrences DESC
LIMIT 10;
结果可能如下所示:
hashtag | hashtag_occurrences | window_start
------------------------------------------------------------
#Multi | 262 | 2022-08-18 00:00:00+00:00
#zero | 198 | 2022-08-18 00:00:00+00:00
knowledge | 150 | 2022-08-18 00:00:00+00:00
#Open | 148 | 2022-08-18 00:00:00+00:00
#User | 142 | 2022-08-18 00:00:00+00:00
#Cross | 141 | 2022-08-18 00:00:00+00:00
#local | 139 | 2022-08-18 00:00:00+00:00
#client | 138 | 2022-08-18 00:00:00+00:00
#system | 135 | 2022-08-18 00:00:00+00:00
#Re | 132 | 2022-08-18 00:00:00+00:00
如果工作负载生成器运行多天,将显示不同日期最常使用的话题标签。
完成后,运行以下命令断开 RisingWave 的连接。
\q
可选:要删除容器和生成的数据,请使用以下命令。
docker compose down -v
总结
在本教程中,我们学到了:
- 如何使用 RisingWave 定义嵌套表。
- 如何使用正则表达式从字符串中提取字符组合。