Skip to main content

CREATE TABLE

使用 CREATE TABLE 命令创建新表。表由固定列和可插入的行组成。可以使用 INSERT 命令添加行。创建表时,可以指定连接器设置和数据格式。

info

如果选择不在 RisingWave 中保存 source 的数据,请改用 CREATE SOURCE

句法

CREATE TABLE [ IF NOT EXISTS ] table_name (
col_name data_type [ PRIMARY KEY ] [ DEFAULT default_expr ] [ AS generation_expression ],
...
[ PRIMARY KEY (col_name, ... ) ]
[ watermark_clause ]
)
[ APPEND ONLY ]
[ WITH (
connector='connector_name',
connector_parameter='value', ...)]
[FORMAT data_format ENCODE data_encode [ (
message='message',
schema.location='location', ...) ]
];

注意事项

对于带有主键约束的表,如果插入带有键的新数据,新记录将覆盖现有记录。

不能将使用非确定性函数定义的生成列指定为主键的一部分。例如,如果 A1 被定义为 current_timestamp(),那么它就不能成为主键的一部分。

名称和未加引号的标识符不区分大小写。这些字段必须添加双引号才能区分大小写。

创建带连接器设置的表和支持的连接器的句法与创建 source 的句法相同。有关支持的连接器和数据格式的完整列表,请参阅 CREATE SOURCE

要知道数据何时加载到 RisingWave,可以在创建表或 source 时定义一个基于处理时间生成的列(<column_name> timestamptz AS proctime())。

参数

参数或子句描述
table_name表的名称。如果给定 schema 名称(例如 CREATE TABLE <schema>.<table> ...),则在指定的 schema 中创建表。否则,在当前 schema 中创建。
col_name列的名称。
data_type列的数据类型。使用 struct 数据类型,可以创建嵌套表。嵌套表中的元素需要用尖括号 (“<>”) 括起来。
DEFAULTDEFAULT 子句允许将默认值分配给列。插入新行时,若没有为该列设定显式值,则使用该默认值。default_expr 是不引用当前表中其他列或不涉及子查询的任何常量或不包含变量的表达式。default_expr 的数据类型必须与列的数据类型相匹配。
generation_expression生成列的表达式。有关生成列的详细信息,请参阅 生成列
watermark_clause为时间戳列定义水位线的子句。句法为 WATERMARK FOR column_name as expr。要使水位线子句有效,表必须是 Append-Only 表。也就是说,必须指定 APPEND ONLY 选项。此限制仅适用于表。有关水位线的详细信息,请参阅 水位线
APPEND ONLY指定该选项后,创建的表即为 Append-Only 表。Append-Only 表不能有主键。对于 Append-Only 表,UPDATEDELETE 语句无效。请注意,Append-Only 表是测试版功能。
WITH 子句如果要存储所有源数据,请在此处设置连接器。有关支持的 source 的完整列表,以及详细说明每个 source 句法的连接器页面链接,请参阅 数据摄取
FORMATENCODE 选项指定源数据的数据格式和编码格式。要了解支持的数据格式,请参阅 数据格式

水位线

RisingWave 支持在创建 Append-Only 流式表时生成水位线。水位线类似于跟踪事件时间进度的标记或信号,允许您在相应的时间窗口内处理事件。有关创建水位线的更多信息,请参阅 水位线

示例

下面的语句可创建一个有三列的表。

CREATE TABLE taxi_trips(
id VARCHAR,
distance DOUBLE PRECISION,
city VARCHAR
);

下面的语句创建了一个包含嵌套表的表。

CREATE TABLE IF NOT EXISTS taxi_trips(
id VARCHAR,
distance DOUBLE PRECISION,
duration DOUBLE PRECISION,
fare STRUCT<
initial_charge DOUBLE PRECISION,
subsequent_charge DOUBLE PRECISION,
surcharge DOUBLE PRECISION,
tolls DOUBLE PRECISION>);

下面的语句使用 Kafka broker 作为数据源创建了一个表。

CREATE TABLE IF NOT EXISTS table_abc (
column1 varchar,
column2 integer,
)
WITH (
connector='kafka',
topic='demo_topic',
properties.bootstrap.server='172.10.1.1:9090,172.10.1.2:9090',
scan.startup.mode='latest',
scan.startup.timestamp_millis='140000000',
) FORMAT PLAIN ENCODE JSON;