Skip to main content

从 Amazon MSK 摄取数据

Amazon Managed Streaming for Apache Kafka(MSK)是一项完全托管的服务,可简化 Apache Kafka 集群(一种流行的开源分布式流处理平台)的设置、扩展和管理。Kafka 旨在处理实时数据馈送,并遵循发布-订阅(pub-sub)模式。Kafka 处理大量实时数据的能力使其对于数据管道、分析和事件驱动架构至关重要。

要在 RisingWave 摄取 Amazon MSK 数据,您需要一个运行中的 Amazon MSK 集群和一个已建立的 Kafka Topic。设置完成后,您将利用 RisingWave 中的 Kafka 连接器从 MSK Topic 消费数据。

本指南将详细介绍如何在 RisingWave 摄取 Amazon MSK 中的流数据。

设置 Amazon MSK

要了解如何设置 Amazon MSK 账户并创建集群,请参阅 开始使用 Amazon MSK。在本演示中,我们假定 Cluster creation method 选择 Quick createCluster type 选择 Provisioned。创建集群可能需要约 15 分钟。

创建集群时,请记下要连接的集群的以下信息。

  1. All cluster settings 中获取 VPC 值。

  2. All cluster settings 中获取 Security groups associated with VPC

  3. Cluster summary 中获取 ARN 值。

要自定义 IAM 策略,请参阅 IAM 访问控制

在 AWS 上设置 EC2

要了解如何从 VPC 控制台创建 EC2 客户端计算机,并将客户端计算机的安全组添加到集群安全组的入站规则中,请参阅 创建客户端计算机

配置 MSK Kafka

启用 SASL

  1. 访问 Amazon MSK 控制台 并选择 MSK 集群。

  2. 单击 Properties 选项卡,然后单击 Security settings 部分中的 Edit

  3. 选择 SASL/SCRAM authentication,然后单击 Save changes

有关 SASL 设置的更多信息,请参阅 使用 AWS Secrets Manager 进行登录凭证身份验证

创建对称密钥

  1. 访问 AWS Key Management Service (AWS KMS) 控制台

  2. 单击 Create Key,选择 Symmetric,然后单击 Next

  3. 给密钥起一个 Alias,然后单击 Next

  4. Administrative permissions 下,选择 AWSServiceRoleForKafka,然后单击 Next

  5. Key usage permissions 下,再次选择 AWSServiceRoleForKafka,然后单击 Next

  6. 最后,查看详细信息并单击 Finish

有关更多信息,请参阅 创建对称加密 KMS 密钥

存储新 Secret

  1. 访问 AWS Secrets Manager 控制台

  2. 单击 Store a new secret

  3. Secret type 下,选择 Other type of secret

  4. Key/value pairs 下,单击 Plaintext,在下面的空白处粘贴以下内容,并将 <your-username><your-password> 替换为您要为集群设置的用户名和密码。

    {
    "username": "<your-username>",
    "password": "<your-password>"
    }
  5. Encryption key 下,选择先前创建的对称密钥别名。

  6. 在下一页,输入以 AmazonMSK_ 开头的 Secret name

  7. 创建 Secret 后,记录 Secret ARN (Amazon Resource Name) 值。

有关更多信息,请参阅 使用 AWS Secrets Manager 进行登录凭证身份验证

将 Secret 与 MSK 集群链接起来

  1. 访问 Amazon MSK 控制台 并选择 MSK 集群。

  2. 单击 Actions 选项卡并选择 Edit security settings

  3. 选择 SASL/SCRAM authentication,然后单击 Save changes

  4. 回到主页面,单击 Properties 选项卡,在 Security settings 部分,单击 SASL/SCRAM authentication 下的 Associate secrets

  5. 粘贴上一步中记录的 Secret ARN 值,然后单击 Associate secrets

使用 SSH 登录 EC2 机器

```
ssh -i “xxx.pem" ubuntu@ec2-xx-xxx-xxx-xxx.compute-1.amazonaws.com
```

查找特定命令值:

  1. 访问 EC2 控制台,选择您创建的实例。

  2. 点击 Connect,选择 SSH client,然后复制提供的命令示例。

安装 AWS CLI 和 Java

sudo apt install unzip
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
sudo apt install openjdk-8-jdk -y

下载 Kakfa 客户端

wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xzf kafka_2.12-2.6.2.tgz

在 EC2 上配置 AWS IAM 凭据

  1. 运行以下命令配置 AWS 凭据和默认设置。

    aws configure
  2. 将包含以下内容的 users_jaas.conf 放在 /home/ubuntu

    KafkaClient {
    org.apache.kafka.common.security.scram.ScramLoginModule required
    username="<your-username>"
    password="<your-password>";
    };
  3. 运行以下命令定义 Kafka 应使用的特定安全设置。

    export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ubuntu/users_jaas.conf
  4. 使用以下命令将 JDK 密钥库文件从 JVM cacerts 文件夹复制到 kafka.client.truststore.jks 副本中。

    cp /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/security/cacerts ~/kafka.client.truststore.jks
  5. /home/ubuntu创建client_sasl.properties 文件,内容如下。

    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    ssl.truststore.location=/home/ubuntu/kafka.client.truststore.jks

使用带有 SASL 的代理地址创建 Topic

  1. 访问 Amazon MSK 控制台 并选择集群。

  2. 点击 View client information,然后复制 SASL/SCRAMPrivate endpoint 的 URL。这将是您以后的 <broker-url>

  3. 运行以下命令创建 Topic。

    bin/kafka-topics.sh --bootstrap-server <broker-url> --command-config ~/client_sasl.properties --create --topic <topic-name>

    可选:以下命令将列出 Topic。

    bin/kafka-topics.sh --bootstrap-server <broker-url> --list --command-config ~/client_sasl.properties
  4. 插入测试数据。

    bin/kafka-console-producer.sh --bootstrap-server <broker-url> --topic <topic-name> --producer.config ~/client_sasl.properties

运行 kafka-console-producer 命令后,系统会提示您在控制台中输入消息。每条消息都应在新行中输入;可以输入任意数量的消息。

输入消息后,可以关闭控制台窗口,或按 Ctrl + C 退出 Producer。

在 RisingWave 中消费来自 Amazon MSK 的数据

安装并启动 RisingWave

有关如何运行 RisingWave 的选项,请参阅 快速入门

连接集群

psql -h localhost -p 4566 -d dev -U root

在 RisingWave 中创建 Source

要了解从 Kafka Topic 消费数据时使用的具体句法,请参阅 从 Kafka 摄取数据

例如,下面的查询可创建一个表,用于消费连接到 Kafka 的 MSK Topic 中的数据。

CREATE TABLE s (v1 int, v2 varchar) 
WITH (
connector = 'kafka', topic = '<topic-name>',
properties.bootstrap.server = '<broker-url>',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'sasl_ssl',
properties.sasl.username = '<your-username>',
properties.sasl.password =<your-password>’'
) FORMAT PLAIN ENCODE JSON;

然后即可计算记录的准确性。

SELECT * FROM s;