Skip to main content

水位线(Watermarks)

在流处理中,当使用基于事件时间的处理逻辑和操作时,水位线是不可或缺的。水位线就像标记或信号,跟踪事件时间的进展,允许您在相应的时间窗口内处理事件。水位线是到目前为止观察到的最大事件时间的估计,或者是一个阈值,表示到目前为止收到的事件的时间戳晚于或等于当前水位线。时间戳早于当前水位线的到达事件被认为是迟到的,并且不会在其时间窗口内被处理。

让我们通过一个例子来看看水位线是如何生成和在窗口计算中使用的。假设以下是一些事件及其对应的事件时间戳。

事件时间戳
事件 F11:59:30 AM
事件 G12:00:00 PM
事件 H12:00:10 PM
事件 I11:59:50 PM

我们假设水位线被设置为:到目前为止观察到的最大事件时间减去 10 秒。于是将生成以下水位线。

事件时间戳水位线
事件 F11:59:30 AM11:59:20 AM
事件 G12:00:00 PM11:59:50 AM
事件 H12:00:11 PM12:00:01 PM
事件 I11:59:50 PM12:00:01 PM

现在,假设我们有一个下午 12 点截止的窗口,那么,这个窗口将一直等待,直到有一个时间戳至少为 12:00:00 PM 的水位线出现后,才会产生结果。因此,事件 F 和 G 被认为是准时的,将被包括在计算中。事件 H 和 I 将不会被包括在这个截至下午 12 点的窗口计算中,事件 I 被认为是迟到的,因为其事件时间戳比当前水位线时间戳早。

句法

水位线可以直接在 Source 上生成。

WATERMARK 子句在 RisingWave 中的句法如下:

WATERMARK FOR column_name AS expr

column_name 是在生成 Source 时创建的列,通常是事件时间列。

expr 指定水位线生成策略。水位线的返回类型必须是 timestamp 类型。如果返回值大于当前水位线,则水位线会更新。

例如,水位线生成策略可以指定为:

  • 观察到的最大时间戳

    WATERMARK FOR time_col AS time_col
  • 观察到的最大时间戳加上延迟

    WATERMARK FOR time_col AS time_col - INTERVAL 'string' time_unit

    我们支持的 time_unit 值包括:second, minute, hour, day, month, 和 year。更多详情,参见 数据类型概览 下的 interval 数据类型。

示例

我们可以生成水位线,作为 order_time 中观察到的最新时间戳减去 5 秒。

CREATE SOURCE s1 (
product VARCHAR,
user VARCHAR,
price DOUBLE PRECISION,
order_time TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
connector = 'kafka',
topic = 'test_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON;