从 Confluent Cloud 摄取数据
本指南将介绍如何在 Confluent Cloud 上搭建 Kafka 集群,以便使用 RisingWave 连接并读取其中的数据。有关详细信息和入门方法,请参阅 Confluent。
在 Confluent Cloud 上搭建 Kafka
创建 Kafka 集群
创建新集群。点击相应集群类型下的 Begin configuration。
选择 AWS,以及相应的 Region 和 Availability。单击 Continue。
相应地命名群集。单击 Launch cluster。
生成样本数据
在 Cluster overview > Dashboard 下,选择 Produce sample data 下的 Get started。
点击 Add a new topic。
命名 topic 并输入分区数量。
点击 Create with defaults。
在 Add datagen source connector 页面,选择刚刚创建的 topic,然后点击 Continue。
点击 Generate API key & download 以生成凭据。点击 Continue。
选择首选的输出格式和模板。在本指南中,我们将使用 JSON and Orders。点击 Continue。
如有必要,调整连接器大小。否则保持不变。点击 Continue。
现在,source 连接器已创建。
将 RisingWave 连接到 Confluent Cloud
创建 API 密钥
- 从数据看板中选择刚刚创建的 Confluent Cloud Kafka 群集。
- 单击 API Keys 选项卡。
- 为刚刚创建的集群添加新的 API 密钥。
请注意,在 RisingWave 中创建 Kafka source 时需要 API 密钥。
运行 RisingWave
要启动 RisingWave,请参阅快速上手指南。
连接到数据流
在 RisingWave 中创建一个表,用于从 Confluent Cloud 中创建的 Kafka topic 中摄取数据。
下面的查询将创建一个表,连接到在 Confluent 中创建的数据生成器。请记得填写相应的身份验证参数。
有关句法和连接参数的更多详细信息,请参阅 从 Kafka 摄取数据 相关内容。
CREATE TABLE s (
ordertime timestamp,
orderid int,
itemid varchar,
orderunits double,
address STRUCT < city varchar,
state varchar,
zipcode int >
) WITH (
connector = 'kafka',
topic = 'topic_0',
properties.bootstrap.server = 'xyz-x00xx.us-east-1.aws.confluent.cloud:9092',
scan.startup.mode = 'earliest',
properties.security.protocol = 'SASL_SSL',
properties.sasl.mechanism = 'PLAIN',
properties.sasl.username = 'username',
properties.sasl.password = 'password'
) FORMAT PLAIN ENCODE JSON;
可以从表中查询数据。
SELECT * FROM s LIMIT 10 ;
ordertime | orderid | itemid | orderunits | address
-------------------------+---------+----------+---------------------+--------------------------
2017-07-11 13:10:57.470 | 69 | Item_923 | 0.34482867789025445 | (City_,State_12,79507)
2017-12-20 04:28:50.333 | 165 | Item_749 | 1.8283880900442675 | (City_,State_,29429)
2017-10-17 08:54:29.206 | 241 | Item_492 | 9.023949081036958 | (City_,State_6,59279)
2017-04-09 08:46:59.978 | 314 | Item_722 | 7.759774352468652 | (City_16,State_,39963)
2018-01-30 01:37:27.704 | 415 | Item_5 | 0.5213453497103845 | (City_8,State_7,12423)
2017-10-17 21:10:28.110 | 567 | Item_509 | 2.7527961549251914 | (City_16,State_8,82637)
2017-09-04 10:13:37.337 | 650 | Item_196 | 9.7117407038982 | (City_,State_,79763)
2018-02-18 13:08:11.272 | 696 | Item_541 | 1.389311132296164 | (City_2,State_87,55001)
2017-07-26 12:24:49.116 | 874 | Item_7 | 5.914994867745335 | (City_9,State_3,55552)
2017-10-17 20:06:57.658 | 912 | Item_3 | 8.24318992907263 | (City_73,State_96,62568)
(10 rows)
还可以点击侧边栏中的 Topics,选择刚刚创建的 topic,然后点击 Consumption,查看 Confluent 上的消费进度。