Skip to main content

从 Instaclustr Kafka 摄取数据

您可以从部署在 Instaclustr 中的 Kafka 中摄取数据。Instaclustr 是一个完全托管且集成了流行开源工具(如 Kafka、PostgreSQL、Cassandra 和 Redis)的平台。 它便于进行 Kafka Connect 集成,包含专用的 ZooKeeper 和 Kraft,通过 100% 的开源解决方案提供无缝的 Kafka 之旅。

在 Instaclustr Cloud 上构建 Kafka

本指南将介绍在 Instaclustr 中构建 Kafka 集群并将其连接到 RisingWave 以进行数据摄取的步骤。 有关从 Instaclustr Cloud 进行数据摄取的更多信息,请参阅 Instaclustr 文档

注册 Upstash Cloud 账户

首先注册一个免费的 Instaclustr 账户,该帐户将授予您访问 Kafka 服务的权限。要创建账户,请访问 Instaclustr Cloud

Instaclustr Cloud account

创建 Kafka 集群

登录后,请按照 Apache Kafka 入门 中的说明在 Instaclustr Cloud 中创建 Kafka 集群。

Instaclustr Kafka instruction

成功创建 Kafka 集群和 Kafka topic 后,将连接计算机的 IP 地址添加到集群的防火墙规则中,以生产和消费数据。

Instaclustr cluster

连接 Kafka 集群并与之交互

您可以使用各种客户端和语言(如 Java、C#、Python 和 Ruby)连接 Kafka 集群,使用连接信息(Connection Info)页面生成和消费 Kafka 消息。该页面包含节点地址列表、连接到集群的身份验证凭据以及 Kafka 支持的常用客户端的一些连接示例。因此,您可以从 AviationStck API 中提取实时飞行数据,并将其导入部署在 Instaclustr Cloud 中的 Kafka 集群的 Kafka topic 中。

Kafka connection info

完成这些步骤后,您就可以使用部署在 Instaclustr cloud 和 RisingWave 中的 Kafka 构建流处理应用程序和管道了!

从 Instaclustr Kafka 摄取数据

创建 RisingWave 集群

您可以按照 RisingWave 文档 快速入门 中的步骤创建和连接 RisingWave 集群。

创建 source

成功部署 RisingWave 集群并与其连接后,请在 RisingWave 中创建一个 source,以便从 Instaclustr Kafka 中摄取数据。

CREATE SOURCE aviation_source (
flight_date VARCHAR,
flight_status VARCHAR,

departure_airport VARCHAR,
departure_timezone VARCHAR,
departure_iata VARCHAR,
departure_icao VARCHAR,
departure_terminal VARCHAR,
departure_gate VARCHAR,
departure_delay VARCHAR,
departure_scheduled TIMESTAMP WITH TIME ZONE,
departure_estimated TIMESTAMP WITH TIME ZONE,
departure_actual TIMESTAMP WITH TIME ZONE,
departure_estimated_runway TIMESTAMP WITH TIME ZONE,
departure_actual_runway TIMESTAMP WITH TIME ZONE,

arrival_airport VARCHAR,
arrival_timezone VARCHAR,
arrival_iata VARCHAR,
arrival_icao VARCHAR,
arrival_terminal VARCHAR,
arrival_gate VARCHAR,
arrival_baggage VARCHAR,
arrival_delay VARCHAR,
arrival_scheduled TIMESTAMP WITH TIME ZONE,
arrival_estimated TIMESTAMP WITH TIME ZONE,
arrival_actual TIMESTAMP WITH TIME ZONE,
arrival_estimated_runway TIMESTAMP WITH TIME ZONE,
arrival_actual_runway TIMESTAMP WITH TIME ZONE,

airline_name VARCHAR,
airline_iata VARCHAR,
airline_icao VARCHAR,

flight_number VARCHAR,
flight_iata VARCHAR,
flight_icao VARCHAR,

codeshared_airline_name VARCHAR,
codeshared_airline_iata VARCHAR,
codeshared_airline_icao VARCHAR,
codeshared_flight_number VARCHAR,
codeshared_flight_iata VARCHAR

) WITH (
connector = 'kafka',
topic='Insta-topic',
properties.bootstrap.server = 'x.x.x.x:9092',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-256',
properties.security.protocol = 'SASL_PLAINTEXT',
properties.sasl.username = 'ickafka',
properties.sasl.password = 'xxxxxx'
) FORMAT PLAIN ENCODE JSON;

创建物化视图

让我们基于 source aviation_source 创建一个名为 aviation_mv 的物化视图,以对源数据中的某些列执行各种数据转换,有效修改其数据类型。


CREATE MATERIALIZED VIEW aviation_mv AS
SELECT
flight_date,
departure_airport,
CAST(departure_scheduled AS TIMESTAMP) AS departure_scheduled,
CAST(departure_estimated AS TIMESTAMP)AS departure_estimated,
arrival_airport,
CAST(arrival_scheduled AS TIMESTAMP) AS arrival_scheduled,
airline_name,
flight_number,
FROM aviation_source;

查询物化视图

可以通过查询物化视图从 source 中获取最新数据:

SELECT * FROM aviation_mv LIMIT 5;

检索结果应该如下所示:

| flight_date | flight_status | departure_airport                                   | departure_scheduled       | departure_estimated       | arrival_airport                                     | arrival_scheduled         | airline_name         | flight_number 
--------------+---------------+-----------------------------------------------------+---------------------------+---------------------------+-----------------------------------------------------+---------------------------+----------------------+------------------
| 2023-12-21 | scheduled | Melbourne - Tullamarine Airport | 2023-12-21T00:30:00Z | 2023-12-21T00:30:00Z | Kuala Lumpur International Airport (klia) | 2023-12-21T05:45:00Z | KLM | KL4109
| 2023-12-21 | scheduled | Taiwan Taoyuan International (Chiang Kai Shek Int'l)| 2023-12-21T00:05:00Z | 2023-12-21T00:05:00Z | Hong Kong International | 2023-12-21T02:00:00Z | EVA Air | BR2895
| 2023-12-21 | scheduled | Ngurah Rai International | 2023-12-21T00:10:00Z | 2023-12-21T00:10:00Z | Adelaide International Airport | 2023-12-21T07:40:00Z | Virgin Australia | VA110
| 2023-12-21 | scheduled | Hangzhou | 2023-12-21T00:10:00Z | 2023-12-21T00:10:00Z | Doha International | 2023-12-21T05:10:00Z | Qatar Airways | QR891
| 2023-12-21 | scheduled | Hangzhou | 2023-12-21T00:10:00Z | 2023-12-21T00:10:00Z | Kansai International | 2023-12-21T04:50:00Z | YTO Cargo Airlines | YG9133
(5 rows)

您已成功地将 Kafka topic 中的数据导入 RisingWave,并创建了 source 和物化视图,然后对其进行了查询。