Skip to main content

从 Confluent Cloud 摄取数据

本指南将介绍如何在 Confluent Cloud 上搭建 Kafka 集群,以便使用 RisingWave 连接并读取其中的数据。有关详细信息和入门方法,请参阅 Confluent

在 Confluent Cloud 上搭建 Kafka

创建 Kafka 集群

  1. 创建新集群。点击相应集群类型下的 Begin configuration

    Create Kafka cluster on Confluent
  2. 选择 AWS,以及相应的 RegionAvailability。单击 Continue

  3. 相应地命名群集。单击 Launch cluster

生成样本数据

  1. Cluster overview > Dashboard 下,选择 Produce sample data 下的 Get started

    Start producing sample data
  2. 点击 Add a new topic

  3. 命名 topic 并输入分区数量。

  4. 点击 Create with defaults

  5. Add datagen source connector 页面,选择刚刚创建的 topic,然后点击 Continue

  6. 点击 Generate API key & download 以生成凭据。点击 Continue

  7. 选择首选的输出格式和模板。在本指南中,我们将使用 JSON and Orders。点击 Continue

  8. 如有必要,调整连接器大小。否则保持不变。点击 Continue

现在,source 连接器已创建。

将 RisingWave 连接到 Confluent Cloud

创建 API 密钥

  1. 从数据看板中选择刚刚创建的 Confluent Cloud Kafka 群集。
  2. 单击 API Keys 选项卡。
  3. 为刚刚创建的集群添加新的 API 密钥。

请注意,在 RisingWave 中创建 Kafka source 时需要 API 密钥。

运行 RisingWave

要启动 RisingWave,请参阅快速上手指南

连接到数据流

在 RisingWave 中创建一个表,用于从 Confluent Cloud 中创建的 Kafka topic 中摄取数据。

下面的查询将创建一个表,连接到在 Confluent 中创建的数据生成器。请记得填写相应的身份验证参数。

有关句法和连接参数的更多详细信息,请参阅 从 Kafka 摄取数据 相关内容。

CREATE TABLE s (
ordertime timestamp,
orderid int,
itemid varchar,
orderunits double,
address STRUCT < city varchar,
state varchar,
zipcode int >
) WITH (
connector = 'kafka',
topic = 'topic_0',
properties.bootstrap.server = 'xyz-x00xx.us-east-1.aws.confluent.cloud:9092',
scan.startup.mode = 'earliest',
properties.security.protocol = 'SASL_SSL',
properties.sasl.mechanism = 'PLAIN',
properties.sasl.username = 'username',
properties.sasl.password = 'password'
) FORMAT PLAIN ENCODE JSON;

可以从表中查询数据。

SELECT * FROM s LIMIT 10 ; 

ordertime | orderid | itemid | orderunits | address
-------------------------+---------+----------+---------------------+--------------------------
2017-07-11 13:10:57.470 | 69 | Item_923 | 0.34482867789025445 | (City_,State_12,79507)
2017-12-20 04:28:50.333 | 165 | Item_749 | 1.8283880900442675 | (City_,State_,29429)
2017-10-17 08:54:29.206 | 241 | Item_492 | 9.023949081036958 | (City_,State_6,59279)
2017-04-09 08:46:59.978 | 314 | Item_722 | 7.759774352468652 | (City_16,State_,39963)
2018-01-30 01:37:27.704 | 415 | Item_5 | 0.5213453497103845 | (City_8,State_7,12423)
2017-10-17 21:10:28.110 | 567 | Item_509 | 2.7527961549251914 | (City_16,State_8,82637)
2017-09-04 10:13:37.337 | 650 | Item_196 | 9.7117407038982 | (City_,State_,79763)
2018-02-18 13:08:11.272 | 696 | Item_541 | 1.389311132296164 | (City_2,State_87,55001)
2017-07-26 12:24:49.116 | 874 | Item_7 | 5.914994867745335 | (City_9,State_3,55552)
2017-10-17 20:06:57.658 | 912 | Item_3 | 8.24318992907263 | (City_73,State_96,62568)
(10 rows)

还可以点击侧边栏中的 Topics,选择刚刚创建的 topic,然后点击 Consumption,查看 Confluent 上的消费进度。

Topic consumption on Confluent