Skip to main content

在 Python 应用程序中使用 RisingWave

由于 RisingWave 与 PostgreSQL 兼容,您可以使用第三方 PostgreSQL 驱动程序从 Python 应用程序与 RisingWave 交互。

在本指南中,我们使用 psycopg2 驱动程序连接到 RisingWave。

运行 RisingWave

要了解如何运行 RisingWave,请参阅运行 RisingWave

安装 psgcopg2 驱动程序

有关如何安装 psycopg 以及 psycopgpsycopg-binary 的区别,请参阅 psycopg 官方文档

连接到 RisingWave

通过 psycopg2 连接到 RisingWave:

import psycopg2

conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")

创建 source

下面的代码可创建一个带有 datagen 连接器的 source walkdatagen 连接器可用于生成模拟数据。walk source 由 distanceduration 两列组成,分别表示步行的距离和持续时间。该 source 是智能手表所跟踪数据的简化版本。

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev") # 连接到 RisingWave。
conn.autocommit = True # 设置自动提交查询。

with conn.cursor() as cur:
cur.execute("""
CREATE TABLE walk(distance INT, duration INT)
WITH (
connector = 'datagen',
fields.distance.kind = 'sequence',
fields.distance.start = '1',
fields.distance.end = '60',
fields.duration.kind = 'sequence',
fields.duration.start = '1',
fields.duration.end = '30',
datagen.rows.per.second='15',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON""") # 执行查询。

conn.close() # 关闭连接。
note

本指南中的所有代码示例都包含连接到 RisingWave 章节的内容。如果在一个连接会话中执行多个操作,则无需重复此部分内容。

创建物化视图

本节的代码可创建一个物化视图 counter,获取最新的总距离和持续时间。

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True

with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW counter
AS SELECT
SUM(distance) as total_distance,
SUM(duration) as total_duration
FROM walk;""")

conn.close()

查询物化视图

本节的代码可查询物化视图 counter,获取实时数据。

import psycopg2

conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True

with conn.cursor() as cur:
cur.execute("SELECT * FROM counter;")
print(cur.fetchall())
conn.close()