Skip to main content

从 DataStax Astra Streaming 摄取数据

Astra Streaming 是 DataStax 基于 Apache Pulsar 开发的多云流式服务产品。Pulsa 是一种云原生、多租户、高性能的服务器间消息传递和队列解决方案,建立在发布-订阅(pub-sub)模式之上。Pulsar 将 RabbitMQ 等传统消息传递系统(如 RabbitMQ)的最佳功能与 Apache Kafka 等发布-订阅系统的最佳功能相结合,可在不停机的情况下动态缩放。

要从 RisingWave 摄取 Astra Streaming 的数据,需要设置一个 Astra Streaming 账户并创建一个 Astra Streaming Topic。然后,可以使用 RisingWave 中的 Pulsar 或 Kafka 连接器创建一个 Source 或表,以便从 Astra Streaming Topic 消费数据。

本指南将介绍如何从 RisingWave 摄取 Astra Streaming 的流数据。

设置 Astra Streaming

要了解如何设置 Astra Streaming 账户和创建 Topic,请参阅 Astra Streaming 快速入门。您可以使用 Pulsar 或 Kafka 与租户连接。在本演示中,我们假定租户连接的是 Pulsar。

创建 Topic 后,请记下要连接的租户和 Topic 的以下信息。

  1. 进入 Namespace and Topics,单击刚创建的 Topic 旁边的复制按钮,获取 Topic 的全名。

  2. 进入Connect 选项卡并向下滚动,获取租户的 Broker service URL

  3. 进入 Settings 选项卡并复制令牌,获取租户的令牌。

在 RisingWave 中消费 Astra Streaming 的数据

安装并启动 RisingWave

有关如何运行 RisingWave 的选项,请参阅 快速上手指南

在 RisingWave 中创建表

要了解从 Pulsar Topic 消费数据的特定句法,请参阅 从 Pulsar 摄取数据。要了解从 Kafka Topic 消费数据的特定句法,请参阅 从 Kafka 摄取数据

例如,下面的语句可创建一个表,用于消费连接到 Pulsar 的 Astra Streaming Topic 中的数据。

CREATE TABLE t (v1 int, v2 varchar)
WITH (
connector='pulsar',
topic='persistent://tenant0/default/topic0',
service.url='pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651',
auth.token='replace me with your token'
) FORMAT PLAIN ENCODE JSON;

在 Astra Streaming 中生成消息

现在我们可以从 Astra Streaming 向 RisingWave 发送消息。

在 Astra Streaming 中导航到 RisingWave 所连接的租户,点击 Try Me 选项卡。确保 NamespaceProducer topicConsumer topic 与 RisingWave 正从中消费数据的 Astra Streaming Topic 相匹配。

Connection type 设为 ReadRead position 设为 Earliest。单击 Connect

尝试在 Test message 文本框中逐行发送以下消息。将 Message type 设置为 JSON。请注意,信息的 Schema 与我们在 RisingWave 中创建的表的 Schema 一致。

Send messages on Astra Streaming
{"v1":1,"v2":"name0"}
{"v1":2,"v2":"name0"}
{"v1":6,"v2":"name3"}
{"v1":0,"v2":"name5"}
{"v1":5,"v2":"name8"}

在 RisingWave 中查询消息

现在我们可以在 RisingWave 中查询表,查看 RisingWave 是否已经消费了消息。

SELECT * FROM t;
  v1 |  v2  
----+-------
1 | name0
2 | name0
3 | name3
4 | name5
5 | name8