从 DataStax Astra Streaming 摄取数据
Astra Streaming 是 DataStax 基于 Apache Pulsar 开发的多云流式服务产品。Pulsa 是一种云原生、多租户、高性能的服务器间消息传递和队列解决方案,建立在发布-订阅(pub-sub)模式之上。Pulsar 将 RabbitMQ 等传统消息传递系统(如 RabbitMQ)的最佳功能与 Apache Kafka 等发布-订阅系统的最佳功能相结合,可在不停机的情况下动态缩放。
要从 RisingWave 摄取 Astra Streaming 的数据,需要设置一个 Astra Streaming 账户并创建一个 Astra Streaming Topic。然后,可以使用 RisingWave 中的 Pulsar 或 Kafka 连接器创建一个 Source 或表,以便从 Astra Streaming Topic 消费数据。
本指南将介绍如何从 RisingWave 摄取 Astra Streaming 的流数据。
设置 Astra Streaming
要了解如何设置 Astra Streaming 账户和创建 Topic,请参阅 Astra Streaming 快速入门。您可以使用 Pulsar 或 Kafka 与租户连接。在本演示中,我们假定租户连接的是 Pulsar。
创建 Topic 后,请记下要连接的租户和 Topic 的以下信息。
进入
Namespace and Topics
,单击刚创建的 Topic 旁边的复制按钮,获取 Topic 的全名。进入
Connect
选项卡并向下滚动,获取租户的Broker service URL
。进入
Settings
选项卡并复制令牌,获取租户的令牌。
在 RisingWave 中消费 Astra Streaming 的数据
安装并启动 RisingWave
有关如何运行 RisingWave 的选项,请参阅 快速上手指南。
在 RisingWave 中创建表
要了解从 Pulsar Topic 消费数据的特定句法,请参阅 从 Pulsar 摄取数据。要了解从 Kafka Topic 消费数据的特定句法,请参阅 从 Kafka 摄取数据。
例如,下面的语句可创建一个表,用于消费连接到 Pulsar 的 Astra Streaming Topic 中的数据。
CREATE TABLE t (v1 int, v2 varchar)
WITH (
connector='pulsar',
topic='persistent://tenant0/default/topic0',
service.url='pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651',
auth.token='replace me with your token'
) FORMAT PLAIN ENCODE JSON;
在 Astra Streaming 中生成消息
现在我们可以从 Astra Streaming 向 RisingWave 发送消息。
在 Astra Streaming 中导航到 RisingWave 所连接的租户,点击 Try Me
选项卡。确保 Namespace
、Producer topic
和 Consumer topic
与 RisingWave 正从中消费数据的 Astra Streaming Topic 相匹配。
将 Connection type
设为 Read
,Read position
设为 Earliest
。单击 Connect
。
尝试在 Test message
文本框中逐行发送以下消息。将 Message type
设置为 JSON
。请注意,信息的 Schema 与我们在 RisingWave 中创建的表的 Schema 一致。
{"v1":1,"v2":"name0"}
{"v1":2,"v2":"name0"}
{"v1":6,"v2":"name3"}
{"v1":0,"v2":"name5"}
{"v1":5,"v2":"name8"}
在 RisingWave 中查询消息
现在我们可以在 RisingWave 中查询表,查看 RisingWave 是否已经消费了消息。
SELECT * FROM t;
v1 | v2
----+-------
1 | name0
2 | name0
3 | name3
4 | name5
5 | name8