从 Amazon MSK 摄取数据
Amazon Managed Streaming for Apache Kafka(MSK)是一项完全托管的服务,可简化 Apache Kafka 集群(一种流行的开源分布式流处理平台)的设置、扩展和管理。Kafka 旨在处理实时数据馈送,并遵循发布-订阅(pub-sub)模式。Kafka 处理大量实时数据的能力使其对于数据管道、分析和事件驱动架构至关重要。
要在 RisingWave 摄取 Amazon MSK 数据,您需要一个运行中的 Amazon MSK 集群和一个已建立的 Kafka Topic。设置完成后,您将利用 RisingWave 中的 Kafka 连接器从 MSK Topic 消费数据。
本指南将详细介绍如何在 RisingWave 摄取 Amazon MSK 中的流数据。
设置 Amazon MSK
要了解如何设置 Amazon MSK 账户并创建集群,请参阅 开始使用 Amazon MSK。在本演示中,我们假定 Cluster creation method 选择 Quick create,Cluster type 选择 Provisioned。创建集群可能需要约 15 分钟。
创建集群时,请记下要连接的集群的以下信息。
从 All cluster settings 中获取 VPC 值。
从 All cluster settings 中获取 Security groups associated with VPC。
从 Cluster summary 中获取 ARN 值。
要自定义 IAM 策略,请参阅 IAM 访问控制。
在 AWS 上设置 EC2
要了解如何从 VPC 控制台创建 EC2 客户端计算机,并将客户端计算机的安全组添加到集群安全组的入站规则中,请参阅 创建客户端计算机。
配置 MSK Kafka
启用 SASL
访问 Amazon MSK 控制台 并选择 MSK 集群。
单击 Properties 选项卡,然后单击 Security settings 部分中的 Edit。
选择 SASL/SCRAM authentication,然后单击 Save changes。
有关 SASL 设置的更多信息,请参阅 使用 AWS Secrets Manager 进行登录凭证身份验证。
创建对称密钥
单击 Create Key,选择 Symmetric,然后单击 Next。
给密钥起一个 Alias,然后单击 Next。
在 Administrative permissions 下,选择 AWSServiceRoleForKafka,然后单击 Next。
在 Key usage permissions 下,再次选择 AWSServiceRoleForKafka,然后单击 Next。
最后,查看详细信息并单击 Finish。
有关更多信息,请参阅 创建对称加密 KMS 密钥。
存储新 Secret
单击 Store a new secret。
在 Secret type 下,选择 Other type of secret。
在 Key/value pairs 下,单击 Plaintext,在下面的空白处粘贴以下内容,并将
<your-username>
和<your-password>
替换为您要为集群设置的用户名和密码。{
"username": "<your-username>",
"password": "<your-password>"
}在 Encryption key 下,选择先前创建的对称密钥别名。
在下一页,输入以
AmazonMSK_
开头的 Secret name。创建 Secret 后,记录 Secret ARN (Amazon Resource Name) 值。
有关更多信息,请参阅 使用 AWS Secrets Manager 进行登录凭证身份验证。
将 Secret 与 MSK 集群链接起来
访问 Amazon MSK 控制台 并选择 MSK 集群。
单击 Actions 选项卡并选择 Edit security settings。
选择 SASL/SCRAM authentication,然后单击 Save changes。
回到主页面,单击 Properties 选项卡,在 Security settings 部分,单击 SASL/SCRAM authentication 下的 Associate secrets。
粘贴上一步中记录的 Secret ARN 值,然后单击 Associate secrets。
使用 SSH 登录 EC2 机器
```
ssh -i “xxx.pem" ubuntu@ec2-xx-xxx-xxx-xxx.compute-1.amazonaws.com
```
查找特定命令值:
访问 EC2 控制台,选择您创建的实例。
点击 Connect,选择 SSH client,然后复制提供的命令示例。
安装 AWS CLI 和 Java
sudo apt install unzip
curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
sudo apt install openjdk-8-jdk -y
下载 Kakfa 客户端
wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz
tar -xzf kafka_2.12-2.6.2.tgz
在 EC2 上配置 AWS IAM 凭据
运行以下命令配置 AWS 凭据和默认设置。
aws configure
将包含以下内容的
users_jaas.conf
放在/home/ubuntu
。KafkaClient {
org.apache.kafka.common.security.scram.ScramLoginModule required
username="<your-username>"
password="<your-password>";
};运行以下命令定义 Kafka 应使用的特定安全设置。
export KAFKA_OPTS=-Djava.security.auth.login.config=/home/ubuntu/users_jaas.conf
使用以下命令将 JDK 密钥库文件从 JVM
cacerts
文件夹复制到kafka.client.truststore.jks
副本中。cp /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/security/cacerts ~/kafka.client.truststore.jks
在
/home/ubuntu
创建client_sasl.properties
文件,内容如下。security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-512
ssl.truststore.location=/home/ubuntu/kafka.client.truststore.jks
使用带有 SASL 的代理地址创建 Topic
访问 Amazon MSK 控制台 并选择集群。
点击 View client information,然后复制 SASL/SCRAM 下 Private endpoint 的 URL。这将是您以后的
<broker-url>
。运行以下命令创建 Topic。
bin/kafka-topics.sh --bootstrap-server <broker-url> --command-config ~/client_sasl.properties --create --topic <topic-name>
可选:以下命令将列出 Topic。
bin/kafka-topics.sh --bootstrap-server <broker-url> --list --command-config ~/client_sasl.properties
插入测试数据。
bin/kafka-console-producer.sh --bootstrap-server <broker-url> --topic <topic-name> --producer.config ~/client_sasl.properties
运行 kafka-console-producer
命令后,系统会提示您在控制台中输入消息。每条消息都应在新行中输入;可以输入任意数量的消息。
输入消息后,可以关闭控制台窗口,或按 Ctrl + C 退出 Producer。
在 RisingWave 中消费来自 Amazon MSK 的数据
安装并启动 RisingWave
有关如何运行 RisingWave 的选项,请参阅 快速入门。
连接集群
psql -h localhost -p 4566 -d dev -U root
在 RisingWave 中创建 Source
要了解从 Kafka Topic 消费数据时使用的具体句法,请参阅 从 Kafka 摄取数据。
例如,下面的查询可创建一个表,用于消费连接到 Kafka 的 MSK Topic 中的数据。
CREATE TABLE s (v1 int, v2 varchar)
WITH (
connector = 'kafka', topic = '<topic-name>',
properties.bootstrap.server = '<broker-url>',
scan.startup.mode = 'earliest',
properties.sasl.mechanism = 'SCRAM-SHA-512',
properties.security.protocol = 'sasl_ssl',
properties.sasl.username = '<your-username>',
properties.sasl.password = ‘<your-password>’'
) FORMAT PLAIN ENCODE JSON;
然后即可计算记录的准确性。
SELECT * FROM s;