Skip to main content

时间窗口函数

在流处理过程中,时间窗口是我们划分事件和执行数据计算所依据的时间间隔。

RisingWave 支持两种类型的时间窗口:

  • Tumbling 窗口
  • Hopping 窗口

每种类型的时间窗口都有相应的时间窗口函数(以下简称为“时间窗口函数”),用于创建该类型的窗口。对于 Tumbling 窗口,函数为 tumble()。对于 Hopping 窗口,函数为 hop()

在 RisingWave 中,时间窗口函数的结果是一张表,其中每一行都包含一个时间窗口的数据。时间窗口函数可通过两个新列 window_startwindow_end 扩展原始表的 schema,分别表示时间窗口的开始和结束。

在 RisingWave 中,可在 FROM 子句中调用时间窗口函数。有关两个时间窗口函数的句法,请参阅以下内容。

tumble() 时间窗口函数

Tumbling 窗口是连续的时间间隔。

tumble() 窗口函数的句法如下:

SELECT [ ALL | DISTINCT ] [ * | expression [ AS output_name ] [, expression [ AS output_name ]...] ]
FROM TUMBLE ( table_or_source, start_time, window_size [, offset ] );
  • start_time 可以是时间戳格式,也可以是带时区的时间戳格式。

    带时区的时间戳格式示例:2022-01-01 10:00:00+00:00。

  • window_size 的格式是 INTERVAL 'interval'

    示例:INTERVAL '2 MINUTES'。也支持标准 SQL 格式,即把时间单位放在引号外(例如:INTERVAL '2' MINUTE)。

  • offset 是可选参数,可用于调整 tumbling 窗口的起点。

    默认情况下,tumbling 窗口在窗口结束时是包含的,在窗口开始时是不包含的。通过指定 offset,可以将 start_time 移动指定的持续时间。

假设我们有一个名为 taxi_trips 的表,该表由以下列组成:trip_idtaxi_idcompleted_atdistanceduration

trip_idtaxi_idcompleted_atdistanceduration
110012022-07-01 22:00:0046
210022022-07-01 22:01:0069
310032022-07-01 22:02:0035
410042022-07-01 22:03:00715
510052022-07-01 22:05:0024
610062022-07-01 22:05:30817

下面是使用 tumble 窗口函数的示例。

SELECT trip_id, taxi_id, completed_at, window_start, window_end
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES');

结果如下:

trip_id | taxi_id   | completed_at          | window_start          | window_end 
--------+-----------+-----------------------+-----------------------+---------------------
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:04:00 | 2022-07-01 22:06:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:06:00 | 2022-07-01 22:08:00

hop() 时间窗口函数

Hopping 窗口是预定的时间间隔。Hopping 窗口由三个与时间相关的参数组成:开始时间、跳跃步长和窗口大小。

有关 hop() 窗口函数的句法,请参阅下文。

SELECT [ ALL | DISTINCT] [ * | expression [ AS output_name ] [, expression [ AS output_name ]...] ]
FROM HOP ( table_or_source, start_time, hop_size, window_size [, offset ]);
  • start_time 可以是时间戳格式,也可以是带时区的时间戳格式。

    带时区的时间戳格式示例:2022-01-01 10:00:00+00:00。

  • hop_sizewindow_size 的格式都是 INTERVAL '<interval>'

    例如:INTERVAL '2 MINUTES'。也支持标准 SQL 格式,即把时间单位放在引号外(例如:INTERVAL '2' MINUTE)。

  • offset 是可选参数,可用于调整 hopping 窗口的起点。

    默认情况下,hopping 窗口在窗口结束时是包含的,在窗口开始时是不包含的。通过指定 offset,可以将 start_time 移动指定的持续时间。

下面是一个示例。

SELECT trip_id, taxi_id, completed_at, window_start, window_end
FROM HOP (taxi_trips, completed_at, INTERVAL '1 MINUTE', INTERVAL '2 MINUTES')
ORDER BY window_start;

结果如下表所示。请注意,hop 窗口函数结果中的行数是原始表中行数的 N 倍,其中 N 是窗口大小除以跳跃步长。

 trip_id | taxi_id  | completed_at          | window_start          | window_end 
---------+---------+------------------------+-----------------------+--------------------
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 21:59:00 | 2022-07-01 22:01:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:01:00 | 2022-07-01 22:03:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:01:00 | 2022-07-01 22:03:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:03:00 | 2022-07-01 22:05:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:04:00 | 2022-07-01 22:06:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:05:00 | 2022-07-01 22:07:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:05:00 | 2022-07-01 22:07:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:06:00 | 2022-07-01 22:08:00
(12 rows)

窗口聚合

让我们看看如何执行时间窗口聚合。

Tumble 窗口聚合

以下是 Tumble 窗口聚合的示例。在此示例中,我们要获取每个 Tumble 窗口(2 分钟)的行程数量和总距离。

SELECT window_start, window_end, count(trip_id) as no_of_trips, sum(distance) as total_distance 
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES')
GROUP BY window_start, window_end
ORDER BY window_start ASC;

结果如下:

 window_start        | window_end          | no_of_trips | total_distance 
---------------------+---------------------+-------------+----------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 2 | 10
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 2 | 10
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 1 | 2
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 1 | 8

Hop 窗口聚合

以下是 Hopping 窗口聚合的示例。在此示例中,我们要获取每分钟内每个 Hop 窗口(2 分钟)的行程数量和总距离。

SELECT window_start, window_end, count(trip_id) as no_of_trips, sum(distance) as total_distance 
FROM HOP (taxi_trips, completed_at, INTERVAL '1 MINUTES', INTERVAL '2 MINUTES')
GROUP BY window_start, window_end
ORDER BY window_start ASC;

结果如下:

 window_start        | window_end          | no_of_trips | total_distance 
---------------------+---------------------+-------------+----------------
2022-07-01 21:59:00 | 2022-07-01 22:01:00 | 1 | 4
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 2 | 10
2022-07-01 22:01:00 | 2022-07-01 22:03:00 | 2 | 9
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 2 | 10
2022-07-01 22:03:00 | 2022-07-01 22:05:00 | 1 | 7
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 1 | 2
2022-07-01 22:05:00 | 2022-07-01 22:07:00 | 2 | 10
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 1 | 8

窗口连接

可以将时间窗口与表或另一个具有相同类型和相同时间属性的时间窗口连接。

与表连接

让我们来看看如何将时间窗口与表连接起来。

假设有一个包含以下数据的简单表 taxi_simple

taxi_id        |company    
---------------+-------------------
1001 |'SAFE TAXI'
1002 |'SUPER TAXI'
1003 |'FAST TAXI'
1004 |'BEST TAXI'
1005 |'WEST TAXI'
1006 |'EAST TAXI'

可以将其与时间窗口连接起来:

SELECT trip.window_start, trip.window_end, trip.distance, taxi_simple.company
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES') as trip
JOIN taxi_simple
ON trip.taxi_id = taxi_simple.taxi_id
ORDER BY trip.window_start ASC;

结果如下:

 window_start        | window_end          | distance | company 
---------------------+---------------------+----------+------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 6 | SAFE TAXI
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 4 | SUPER TAXI
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 3 | FAST TAXI
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 7 | BEST TAXI
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 2 | WEST TAXI
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 8 | EAST TAXI

窗口连接

可以连接两个 tumble 时间窗口来获取行程和车费信息。相应的表是 taxi_tripstaxi_fare

taxi_fare 表有以下数据:

trip_id| completed_at | total_fare | payment_status
------+--------------+--------------+--------------
1 | 2022-07-01 22:00:00 | 8 | COMPLETED
2 | 2022-07-01 22:01:00 | 12 | PROCESSING
3 | 2022-07-01 22:02:10 | 5 | COMPLETED
4 | 2022-07-01 22:03:00 | 15 | COMPLETED
5 | 2022-07-01 22:06:00 | 5 | REJECTED
6 | 2022-07-01 22:06:00 | 20 | COMPLETED

可以连接两个时间窗口:

SELECT trip.window_start, trip.window_end, trip.distance, fare.total_fare, fare.payment_status
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES') as trip
JOIN TUMBLE (taxi_fare, completed_at, INTERVAL '2 MINUTES') as fare
ON trip.trip_id = fare.trip_id AND trip.window_start = fare.window_start
ORDER BY trip.window_start ASC;

结果如下。

 window_start        | window_end          | distance | total_fare | payment_status 
---------------------+---------------------+----------+------------+----------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 4 | 8 | COMPLETED
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 6 | 12 | PROCESSING
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 7 | 15 | COMPLETED
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 3 | 5 | COMPLETED
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 2 | 5 | REJECTED
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 8 | 20 | COMPLETED