时间窗口函数
在流处理过程中,时间窗口是我们划分事件和执行数据计算所依据的时间间隔。
RisingWave 支持两种类型的时间窗口:
- Tumbling 窗口
- Hopping 窗口
每种类型的时间窗口都有相应的时间窗口函数(以下简称为“时间窗口函数”),用于创建该类型的窗口。对于 Tumbling 窗口,函数为 tumble()
。对于 Hopping 窗口,函数为 hop()
。
在 RisingWave 中,时间窗口函数的结果是一张表,其中每一行都包含一个时间窗口的数据。时间窗口函数可通过两个新列 window_start
和 window_end
扩展原始表的 schema,分别表示时间窗口的开始和结束。
在 RisingWave 中,可在 FROM 子句中调用时间窗口函数。有关两个时间窗口函数的句法,请参阅以下内容。
tumble()
时间窗口函数
Tumbling 窗口是连续的时间间隔。
tumble()
窗口函数的句法如下:
SELECT [ ALL | DISTINCT ] [ * | expression [ AS output_name ] [, expression [ AS output_name ]...] ]
FROM TUMBLE ( table_or_source, start_time, window_size [, offset ] );
start_time 可以是时间戳格式,也可以是带时区的时间戳格式。
带时区的时间戳格式示例:2022-01-01 10:00:00+00:00。
window_size 的格式是
INTERVAL 'interval'
。示例:
INTERVAL '2 MINUTES'
。也支持标准 SQL 格式,即把时间单位放在引号外(例如:INTERVAL '2' MINUTE
)。offset 是可选参数,可用于调整 tumbling 窗口的起点。
默认情况下,tumbling 窗口在窗口结束时是包含的,在窗口开始时是不包含的。通过指定 offset,可以将 start_time 移动指定的持续时间。
假设我们有一个名为 taxi_trips
的表,该表由以下列组成:trip_id
、taxi_id
、completed_at
、distance
和 duration
。
trip_id | taxi_id | completed_at | distance | duration |
---|---|---|---|---|
1 | 1001 | 2022-07-01 22:00:00 | 4 | 6 |
2 | 1002 | 2022-07-01 22:01:00 | 6 | 9 |
3 | 1003 | 2022-07-01 22:02:00 | 3 | 5 |
4 | 1004 | 2022-07-01 22:03:00 | 7 | 15 |
5 | 1005 | 2022-07-01 22:05:00 | 2 | 4 |
6 | 1006 | 2022-07-01 22:05:30 | 8 | 17 |
下面是使用 tumble 窗口函数的示例。
SELECT trip_id, taxi_id, completed_at, window_start, window_end
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES');
结果如下:
trip_id | taxi_id | completed_at | window_start | window_end
--------+-----------+-----------------------+-----------------------+---------------------
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:04:00 | 2022-07-01 22:06:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:06:00 | 2022-07-01 22:08:00
hop()
时间窗口函数
Hopping 窗口是预定的时间间隔。Hopping 窗口由三个与时间相关的参数组成:开始时间、跳跃步长和窗口大小。
有关 hop()
窗口函数的句法,请参阅下文。
SELECT [ ALL | DISTINCT] [ * | expression [ AS output_name ] [, expression [ AS output_name ]...] ]
FROM HOP ( table_or_source, start_time, hop_size, window_size [, offset ]);
start_time 可以是时间戳格式,也可以是带时区的时间戳格式。
带时区的时间戳格式示例:2022-01-01 10:00:00+00:00。
hop_size 和 window_size 的格式都是
INTERVAL '<interval>'
。例如:
INTERVAL '2 MINUTES'
。也支持标准 SQL 格式,即把时间单位放在引号外(例如:INTERVAL '2' MINUTE
)。offset 是可选参数,可用于调整 hopping 窗口的起点。
默认情况下,hopping 窗口在窗口结束时是包含的,在窗口开始时是不包含的。通过指定 offset,可以将 start_time 移动指定的持续时间。
下面是一个示例。
SELECT trip_id, taxi_id, completed_at, window_start, window_end
FROM HOP (taxi_trips, completed_at, INTERVAL '1 MINUTE', INTERVAL '2 MINUTES')
ORDER BY window_start;
结果如下表所示。请注意,hop 窗口函数结果中的行数是原始表中行数的 N 倍,其中 N 是窗口大小除以跳跃步长。
trip_id | taxi_id | completed_at | window_start | window_end
---------+---------+------------------------+-----------------------+--------------------
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 21:59:00 | 2022-07-01 22:01:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
1 | 1001 | 2022-07-01 22:00:00 | 2022-07-01 22:00:00 | 2022-07-01 22:02:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:01:00 | 2022-07-01 22:03:00
2 | 1002 | 2022-07-01 22:01:00 | 2022-07-01 22:01:00 | 2022-07-01 22:03:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
3 | 1003 | 2022-07-01 22:02:10 | 2022-07-01 22:02:00 | 2022-07-01 22:04:00
4 | 1004 | 2022-07-01 22:03:00 | 2022-07-01 22:03:00 | 2022-07-01 22:05:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:04:00 | 2022-07-01 22:06:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:05:00 | 2022-07-01 22:07:00
5 | 1005 | 2022-07-01 22:05:00 | 2022-07-01 22:05:00 | 2022-07-01 22:07:00
6 | 1006 | 2022-07-01 22:06:00 | 2022-07-01 22:06:00 | 2022-07-01 22:08:00
(12 rows)
窗口聚合
让我们看看如何执行时间窗口聚合。
Tumble 窗口聚合
以下是 Tumble 窗口聚合的示例。在此示例中,我们要获取每个 Tumble 窗口(2 分钟)的行程数量和总距离。
SELECT window_start, window_end, count(trip_id) as no_of_trips, sum(distance) as total_distance
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES')
GROUP BY window_start, window_end
ORDER BY window_start ASC;
结果如下:
window_start | window_end | no_of_trips | total_distance
---------------------+---------------------+-------------+----------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 2 | 10
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 2 | 10
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 1 | 2
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 1 | 8
Hop 窗口聚合
以下是 Hopping 窗口聚合的示例。在此示例中,我们要获取每分钟内每个 Hop 窗口(2 分钟)的行程数量和总距离。
SELECT window_start, window_end, count(trip_id) as no_of_trips, sum(distance) as total_distance
FROM HOP (taxi_trips, completed_at, INTERVAL '1 MINUTES', INTERVAL '2 MINUTES')
GROUP BY window_start, window_end
ORDER BY window_start ASC;
结果如下:
window_start | window_end | no_of_trips | total_distance
---------------------+---------------------+-------------+----------------
2022-07-01 21:59:00 | 2022-07-01 22:01:00 | 1 | 4
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 2 | 10
2022-07-01 22:01:00 | 2022-07-01 22:03:00 | 2 | 9
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 2 | 10
2022-07-01 22:03:00 | 2022-07-01 22:05:00 | 1 | 7
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 1 | 2
2022-07-01 22:05:00 | 2022-07-01 22:07:00 | 2 | 10
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 1 | 8
窗口连接
可以将时间窗口与表或另一个具有相同类型和相同时间属性的时间窗口连接。
与表连接
让我们来看看如何将时间窗口与表连接起来。
假设有一个包含以下数据的简单表 taxi_simple
:
taxi_id |company
---------------+-------------------
1001 |'SAFE TAXI'
1002 |'SUPER TAXI'
1003 |'FAST TAXI'
1004 |'BEST TAXI'
1005 |'WEST TAXI'
1006 |'EAST TAXI'
可以将其与时间窗口连接起来:
SELECT trip.window_start, trip.window_end, trip.distance, taxi_simple.company
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES') as trip
JOIN taxi_simple
ON trip.taxi_id = taxi_simple.taxi_id
ORDER BY trip.window_start ASC;
结果如下:
window_start | window_end | distance | company
---------------------+---------------------+----------+------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 6 | SAFE TAXI
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 4 | SUPER TAXI
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 3 | FAST TAXI
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 7 | BEST TAXI
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 2 | WEST TAXI
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 8 | EAST TAXI
窗口连接
可以连接两个 tumble 时间窗口来获取行程和车费信息。相应的表是 taxi_trips
和 taxi_fare
。
taxi_fare
表有以下数据:
trip_id| completed_at | total_fare | payment_status
------+--------------+--------------+--------------
1 | 2022-07-01 22:00:00 | 8 | COMPLETED
2 | 2022-07-01 22:01:00 | 12 | PROCESSING
3 | 2022-07-01 22:02:10 | 5 | COMPLETED
4 | 2022-07-01 22:03:00 | 15 | COMPLETED
5 | 2022-07-01 22:06:00 | 5 | REJECTED
6 | 2022-07-01 22:06:00 | 20 | COMPLETED
可以连接两个时间窗口:
SELECT trip.window_start, trip.window_end, trip.distance, fare.total_fare, fare.payment_status
FROM TUMBLE (taxi_trips, completed_at, INTERVAL '2 MINUTES') as trip
JOIN TUMBLE (taxi_fare, completed_at, INTERVAL '2 MINUTES') as fare
ON trip.trip_id = fare.trip_id AND trip.window_start = fare.window_start
ORDER BY trip.window_start ASC;
结果如下。
window_start | window_end | distance | total_fare | payment_status
---------------------+---------------------+----------+------------+----------------
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 4 | 8 | COMPLETED
2022-07-01 22:00:00 | 2022-07-01 22:02:00 | 6 | 12 | PROCESSING
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 7 | 15 | COMPLETED
2022-07-01 22:02:00 | 2022-07-01 22:04:00 | 3 | 5 | COMPLETED
2022-07-01 22:04:00 | 2022-07-01 22:06:00 | 2 | 5 | REJECTED
2022-07-01 22:06:00 | 2022-07-01 22:08:00 | 8 | 20 | COMPLETED