使用 dbt 进行数据转换
本章将帮助您使用 dbt 在 RisingWave 中管理实时数据转换。
安装插件
要在 RisingWave 中使用 dbt 管理数据,您需要安装 dbt-risingwave 插件。
请先确保您的环境已安装 Python3,然后运行以下命令安装插件:
python3 -m pip install dbt-risingwave
插件安装后,运行以下命令以确保 risingwave
显示在插件下。
dbt --version
如果您看到如下所示,说明插件已成功安装:
Plugins:
...
- risingwave: 1.6.0 - Up to date!
...
初始化一个 dbt 项目
在您初始化一个 dbt 项目之前,您需要确保 RisingWave 已安装并正在运行。要了解如何安装和运行 RisingWave,请参见 快速上手 下的主题。
您可以通过运行以下命令来初始化一个 dbt 项目。
dbt init
它会要求您输入项目名称,选择您要使用的数据库(risingwave
),并指定其他数据库配置,如主机名、端口、用户名等。
默认数据库配置为:
- 主机:localhost
- 端口:4566
- 用户:root
- 数据库:dev
如果您使用了不同的配置,请确保这里的配置与您在 RisingWave 中的配置匹配。
定义 dbt 模型
用于管理 RisingWave 中数据转换的 dbt 模型类似于典型的 dbt SQL 模型,其主要区别在于 Materialization。我们自定义了 Materialization,以适应 RisingWave 的数据处理模型。
RisingWave 接受以下这些 Materialization。
Materialization | 注释 |
---|---|
table | 此 Materialization 创建一个表。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='table') }} 。 |
view | 创建一个视图。要使用此 Materialization,请在您的模型 SQL 文件中添加 {{ config(materialized='view') }} 。 |
ephemeral | 此 Materialization 在 RisingWave 中使用公共表表达式。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='ephemeral') }} 。 |
materializedview | 将被废弃。仅为向后兼容而可用。请改用 materialized_view 。 |
materialized_view | 创建一个物化视图(Materialized View)。此 Materialization 对应 dbt 中的 incremental 。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='materialized_view') }} 。 |
incremental | 请使用 materialized_view 替代。RisingWave 本身就是用物化视图以增量方式管理数据转换的,所以您可以直接使用 materialized_view Materialization 。 |
source | 定义一个源。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='source') }}。您需要在此模型中完整提供创建源语句。详见示例模型文件。 |
table_with_connector | 定义一个带有连接器设置的表。带有连接器设置的表类似于 Source,不同之处在于,带有连接器设置的表对象在 Source 中会保存原始流数据,而 Source 对象则不会。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='table_with_connector') }}。您需要在此模型中完整提供创建带有连接器的表语句(详见示例模型文件)。因为 dbt 的表有其自身的语义,RisingWave 使用 table_with_connector 来与 dbt 的表区分。 |
sink | 定义一个 Sink。要使用此 Materialization ,请在您的 SQL 文件中添加 {{ config(materialized='sink') }}。您需要在此模型中完整提供创建 sink 语句。详见示例模型文件。 |
要了解普通情况下如何定义 SQL 模型,请参见 dbt 文档的 SQL 模型部分。
要了解 dbt 如何在实践中与 RisingWave 协作,请查看我们的演示项目,该项目使用 dbt 管理 RisingWave 中的 Nexmark 查询。
运行 dbt 模型
假设您已在之前的步骤里定义了 dbt 模型。
请先导航到您的项目目录。
cd <your project name>
然后,您可以运行 dbt debug
来检查您与 RisingWave 的连接。
dbt debug
如果连接有效,您将看到“OK connection ok”。在这种情况下,您可以运行您的模型。否则,请检查 ~/.dbt/profiles.yml
,以确保您的连接配置有效。
最后,运行以下命令来运行您的模型。
dbt run
示例模型文件
{{ config(materialized='source') }}
CREATE SOURCE {{ this }} (v1 int, v2 varchar) WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON
{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (v1 int, v2 varchar) WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON
{{ config(materialized='sink') }}
CREATE SINK {{ this }} AS
SELECT
avg(distance) as avg_distance,
avg(duration) as avg_duration
FROM taxi_trips
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;
{{ config(materialized='materialized_view') }}
SELECT
auction,
to_char(date_time, 'YYYY-MM-DD') AS day,
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
min(price) AS min_price,
max(price) AS max_price,
avg(price) AS avg_price,
sum(price) AS sum_price
FROM {{ ref('bid') }}
GROUP BY auction, to_char(date_time, 'YYYY-MM-DD')
{{ config(
materialized = 'table',
indexes=[
{'columns': ['c1']},
{'columns': ['c2', 'c1']},
]
)}}
select 1 as c1, 1 c2 union select 2 as c1, 2 as c2
其他资源
如果您想了解普通情况下如何建立部署 dbt 模型,请参见 dbt 文档。