在 Python 应用程序中使用 RisingWave
由于 RisingWave 与 PostgreSQL 兼容,您可以使用第三方 PostgreSQL 驱动程序从 Python 应用程序与 RisingWave 交互。
在本指南中,我们使用 psycopg2
驱动程序连接到 RisingWave。
运行 RisingWave
要了解如何运行 RisingWave,请参阅运行 RisingWave。
安装 psgcopg2
驱动程序
有关如何安装 psycopg
以及 psycopg
和 psycopg-binary
的区别,请参阅 psycopg 官方文档。
连接到 RisingWave
通过 psycopg2
连接到 RisingWave:
import psycopg2
conn = psycopg2.connect(host="127.0.0.1", port=4566, user="root", dbname="dev")
创建 source
下面的代码可创建一个带有 datagen
连接器的 source walk
。datagen
连接器可用于生成模拟数据。walk
source 由 distance
和 duration
两列组成,分别表示步行的距离和持续时间。该 source 是智能手表所跟踪数据的简化版本。
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev") # 连接到 RisingWave。
conn.autocommit = True # 设置自动提交查询。
with conn.cursor() as cur:
cur.execute("""
CREATE TABLE walk(distance INT, duration INT)
WITH (
connector = 'datagen',
fields.distance.kind = 'sequence',
fields.distance.start = '1',
fields.distance.end = '60',
fields.duration.kind = 'sequence',
fields.duration.start = '1',
fields.duration.end = '30',
datagen.rows.per.second='15',
datagen.split.num = '1'
) FORMAT PLAIN ENCODE JSON""") # 执行查询。
conn.close() # 关闭连接。
note
本指南中的所有代码示例都包含连接到 RisingWave 章节的内容。如果在一个连接会话中执行多个操作,则无需重复此部分内容。
创建物化视图
本节的代码可创建一个物化视图 counter
,获取最新的总距离和持续时间。
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("""CREATE MATERIALIZED VIEW counter
AS SELECT
SUM(distance) as total_distance,
SUM(duration) as total_duration
FROM walk;""")
conn.close()
查询物化视图
本节的代码可查询物化视图 counter
,获取实时数据。
import psycopg2
conn = psycopg2.connect(host="localhost", port=4566, user="root", dbname="dev")
conn.autocommit = True
with conn.cursor() as cur:
cur.execute("SELECT * FROM counter;")
print(cur.fetchall())
conn.close()