本文為您介紹如何建立E-MapReduce(簡稱EMR)Kafka叢集、使用Kafka Topic和Kafka Connect服務,幫您快速瞭解和上手使用EMR Kafka。
注意事項
建立EMR Kafka叢集前,您需要根據業務的預估負載,選擇合適的ECS執行個體機型以及Broker執行個體個數。由於業務情境差異很大,所以無法給出通用的叢集規劃,您需要根據您的實際環境建立叢集。通常,建議您選擇機型時考慮以下配置:
Broker機型的CPU和記憶體配比為1:4。
選擇雲端硬碟作為資料存放區盤。
充分考慮雲端硬碟的I/O吞吐率以及網卡頻寬之間的關係。
在部署參數上,考慮以下因素:
由於EMR Kafka版本仍依賴於Zookeeper,且Zookeeper的可用性直接關係到Kafka服務的高可用,因此,建議您建立叢集時,選擇高可用的部署方式。啟用高可用後,將建立3個節點的Zookeeper服務。
如果Master機器組只部署Zookeeper,則Master機器組只需要配置1塊資料盤即可。
更詳細的評估建議,請參見叢集資源規格評估建議。
建立EMR Kafka叢集
該部分內容為您簡單介紹如何建立Kafka叢集。如需更詳細的建立操作,請參見建立叢集。
進入建立叢集頁面。
單擊上方的建立叢集。
在軟體配置階段,您可以根據需要的Kafka版本,選擇對應的EMR版本。
開啟服務高可用開關,建立3節點的ZooKeeper叢集。
重要啟用高可用後,將在Master機器組上部署3個節點的Zookeeper服務。由於EMR Kafka版本的服務可用性仍依賴於Zookeeper,所以建議您建立叢集時,選擇高可用的部署方式。
在硬體設定階段,選擇合適的ECS執行個體機型以及節點數量。
機型:Core節點群組選擇CPU和記憶體配比為1 Core:4 GB的機型。
節點數量:Core節點群組選擇比Kafka分區副本數多1的節點數量以保持足夠的冗餘。例如,如果規劃副本數為3,則節點數選擇為4。

請根據需求填寫相關參數,以完成叢集的建立。
使用Kafka Topic
該部分內容為您介紹如何使用Kafka Topic進行資料的生產消費。實際業務情境,您也可以使用Kafka Manager或Cruise Control等軟體管理叢集。
使用SSH方式登入Kafka叢集的Master節點,詳情請參見登入叢集。
執行以下命令,建立Kafka Topic。
sudo su - kafka kafka-topics.sh --partitions 10 --replication-factor 2 --bootstrap-server core-1-1:9092 --topic test --create執行以下命令,查看Kafka topic詳細資料。
kafka-topics.sh --bootstrap-server core-1-1:9092 --topic test --describe執行以下命令,生產資料。
kafka-console-producer.sh --broker-list core-1-1:9092 --topic test在此命令執行後,您可以輸入訊息並按斷行符號鍵將其發送到Topic。
請新開一個終端視窗,執行以下命令,以消費資料。
kafka-console-consumer.sh --bootstrap-server core-1-1:9092 --topic test --from-beginning --group test-consumer-group
使用Kafka Connect服務
EMR-3.41.0之後版本、EMR-5.7.0之後版本支援Kafka Connect組件的部署。該部分內容為您介紹如何使用Kafka Connect服務。
進入節點管理頁面。
在頂部功能表列處,根據實際情況選擇地區和資源群組。
單擊目的地組群操作列的節點管理。
建立Kafka Connect節點群組。
Connect安裝在EMR Task節點群組。在EMR Kafka叢集建立Task節點群組後,EMR會自動在該節點群組建立Kafka Connect叢集。
查看KafkaConnect服務狀態,確保Kafka Connect叢集已經啟動。
您可以在Kafka服務的狀態頁面的組件列表地區,查看KafkaConnect的組件狀態,確保組件在運行中。

檢查Kafka Connect Rest服務狀態。
使用SSH方式登入Kafka叢集的Master節點,詳情請參見登入叢集。
執行以下命令,檢查Kafka Connect Rest服務狀態。
curl -X GET http://task-1-1:8083| jq .您會看到返回以下類似資訊。
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 91 100 91 0 0 13407 0 --:--:-- --:--:-- --:--:-- 15166 { "version": "2.4.1", "commit": "42ce056344c5625a", "kafka_cluster_id": "6Z7IdHW4SVO1Pbql4c****" }
使用Kafka Connect遷移資料。
您可以在Kafka叢集中啟動MirrorMaker任務,進行資料複製與遷移。具體操作,請參見使用MirrorMaker 2(on Connect)跨叢集同步資料。
相關文檔
如果需要開啟SSL,並使用SSL串連Kafka,詳情請參見使用SSL加密Kafka連結。
如果需要開啟SASL,並使用SASL串連Kafka,詳情請參見使用SASL登入認證Kafka服務。