Skip to main content

使用 dbt 进行数据转换

本章将帮助您使用 dbt 在 RisingWave 中管理实时数据转换。

安装插件

要在 RisingWave 中使用 dbt 管理数据,您需要安装 dbt-risingwave 插件。

请先确保您的环境已安装 Python3,然后运行以下命令安装插件:

python3 -m pip install dbt-risingwave

插件安装后,运行以下命令以确保 risingwave 显示在插件下。

dbt --version

如果您看到如下所示,说明插件已成功安装:

Plugins:
...
- risingwave: 1.6.0 - Up to date!
...

初始化一个 dbt 项目

在您初始化一个 dbt 项目之前,您需要确保 RisingWave 已安装并正在运行。要了解如何安装和运行 RisingWave,请参见 快速上手 下的主题。

您可以通过运行以下命令来初始化一个 dbt 项目

dbt init 

它会要求您输入项目名称,选择您要使用的数据库(risingwave),并指定其他数据库配置,如主机名、端口、用户名等。

默认数据库配置为:

  • 主机:localhost
  • 端口:4566
  • 用户:root
  • 数据库:dev

如果您使用了不同的配置,请确保这里的配置与您在 RisingWave 中的配置匹配。

定义 dbt 模型

用于管理 RisingWave 中数据转换的 dbt 模型类似于典型的 dbt SQL 模型,其主要区别在于 Materialization。我们自定义了 Materialization,以适应 RisingWave 的数据处理模型。

RisingWave 接受以下这些 Materialization

Materialization注释
table此 Materialization 创建一个表。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='table') }}
view创建一个视图。要使用此 Materialization,请在您的模型 SQL 文件中添加 {{ config(materialized='view') }}
ephemeral此 Materialization 在 RisingWave 中使用公共表表达式。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='ephemeral') }}
materializedview将被废弃。仅为向后兼容而可用。请改用 materialized_view
materialized_view创建一个物化视图(Materialized View)。此 Materialization 对应 dbt 中的 incremental。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='materialized_view') }}
incremental请使用 materialized_view 替代。RisingWave 本身就是用物化视图以增量方式管理数据转换的,所以您可以直接使用 materialized_view Materialization 。
source定义一个源。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='source') }}。您需要在此模型中完整提供创建源语句。详见示例模型文件
table_with_connector定义一个带有连接器设置的表。带有连接器设置的表类似于 Source,不同之处在于,带有连接器设置的表对象在 Source 中会保存原始流数据,而 Source 对象则不会。要使用此 Materialization ,请在您的模型 SQL 文件中添加 {{ config(materialized='table_with_connector') }}。您需要在此模型中完整提供创建带有连接器的表语句(详见示例模型文件)。因为 dbt 的表有其自身的语义,RisingWave 使用 table_with_connector 来与 dbt 的表区分。
sink定义一个 Sink。要使用此 Materialization ,请在您的 SQL 文件中添加 {{ config(materialized='sink') }}。您需要在此模型中完整提供创建 sink 语句。详见示例模型文件

要了解普通情况下如何定义 SQL 模型,请参见 dbt 文档的 SQL 模型部分

要了解 dbt 如何在实践中与 RisingWave 协作,请查看我们的演示项目,该项目使用 dbt 管理 RisingWave 中的 Nexmark 查询。

运行 dbt 模型

假设您已在之前的步骤里定义了 dbt 模型。

请先导航到您的项目目录。

cd <your project name>

然后,您可以运行 dbt debug 来检查您与 RisingWave 的连接。

dbt debug

如果连接有效,您将看到“OK connection ok”。在这种情况下,您可以运行您的模型。否则,请检查 ~/.dbt/profiles.yml,以确保您的连接配置有效。

最后,运行以下命令来运行您的模型。

dbt run

示例模型文件

在 dbt 中定义一个 Source
{{ config(materialized='source') }}
CREATE SOURCE {{ this }} (v1 int, v2 varchar) WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON
在 dbt 中定义一个带有连接器设置的表
{{ config(materialized='table_with_connector') }}
CREATE TABLE {{ this }} (v1 int, v2 varchar) WITH (
connector = 'kafka',
topic = 'kafka_1_partition_topic',
properties.bootstrap.server = 'message_queue:29092',
scan.startup.mode = 'earliest'
) FORMAT PLAIN ENCODE JSON
在 dbt 中定义一个 Sink
{{ config(materialized='sink') }}
CREATE SINK {{ this }} AS
SELECT
avg(distance) as avg_distance,
avg(duration) as avg_duration
FROM taxi_trips
WITH (
connector='kafka',
properties.bootstrap.server='localhost:9092',
topic='test'
)
FORMAT PLAIN ENCODE JSON;
在 dbt 中定义一个物化视图
{{ config(materialized='materialized_view') }}
SELECT
auction,
to_char(date_time, 'YYYY-MM-DD') AS day,
count(*) AS total_bids,
count(*) filter (where price < 10000) AS rank1_bids,
count(*) filter (where price >= 10000 and price < 1000000) AS rank2_bids,
count(*) filter (where price >= 1000000) AS rank3_bids,
min(price) AS min_price,
max(price) AS max_price,
avg(price) AS avg_price,
sum(price) AS sum_price
FROM {{ ref('bid') }}
GROUP BY auction, to_char(date_time, 'YYYY-MM-DD')
在 dbt 中定义一个带索引的表
{{ config(
materialized = 'table',
indexes=[
{'columns': ['c1']},
{'columns': ['c2', 'c1']},
]
)}}
select 1 as c1, 1 c2 union select 2 as c1, 2 as c2

其他资源

如果您想了解普通情况下如何建立部署 dbt 模型,请参见 dbt 文档