Skip to main content

修改流式作业

本文将介绍如何在 RisingWave 中修改流式作业,了解该机制可以有效管理数据处理工作流。

修改表格或 Source

要从表格或 Source 中添加或删除列,只需使用ALTER TABLEALTER SOURCE 命令。例如:

ALTER TABLE customers ADD COLUMN birth_date;

ALTER SOURCE test_source ADD COLUMN birth_date;

现有条目新增加的列将显示 NULL

修改物化视图

要修改物化视图,需要创建一个新物化视图并删除之前已有的物化视图。

假设我们要向物化视图mv1添加新列:

CREATE MATERIALIZED VIEW mv1 AS
SELECT
customer_id,
SUM(total_price) AS sales_amount,
FROM test_source
GROUP BY customer_id;

创建新物化视图mv1_new,并包含新列sales_count

CREATE MATERIALIZED VIEW mv1_new AS
SELECT
customer_id,
SUM(total_price) AS sales_amount,
COUNT(*) AS sales_count -- The new column
FROM test_source
GROUP BY customer_id;

创建新物化视图后,删除已有的物化视图 mv1 并且将 mv1_new 重命名为 mv1

DROP MATERIALIZED VIEW mv1;
ALTER MATERIALIZED VIEW mv1_new RENAME TO mv1;

修改 Sink

要修改 Sink, 需要创建新 Sink 并删除已有的 Sink。请查看上一章节的示例。

如果 Sink 是基于物化视图而创建,可以使用 CREATE SINK ... FROM ... 命令。在命令中加上 without_backfill = true 可以排除现有数据。

CREATE SINK ... FROM ... WITH (without_backfill = true).

为什么不能直接修改流式作业?

像 RisingWave 这样的流系统需要为流算子(如 Join 和 Aggregation)维护内部状态。通常,修改物化视图需要相应地对内部状态进行一致性更改,这并非总是可行。具体请看以下示例。

表格 adult_users 用于追踪年龄 ≥ 18 的用户数量:

CREATE MATERIALIZED VIEW adult_users AS
SELECT
COUNT(*) as user_count
FROM users
WHERE age >= 18;

后期发现法定成年年龄为 ≥16。用户可能先考虑将过滤条件从 age >= 18 修改为 age >= 16 作为直接解决方案。然而,在流处理中,这是不可行的,因为年龄在 16 到 18 之间的记录已被过滤掉。因此,恢复丢失的数据的唯一选择是从头重新计算整个流作业。

因此,我们建议用长期存储解决方案来处理源数据,例如RisingWave 表格,这样在需要修改流作业时可以方便重新计算物化视图。