本文通過樣本為您介紹如何通過EMR的叢集指令碼功能,快速部署使用MirrorMaker 2.0(MM2)服務同步資料。
背景資訊
本文的業務情境以EMR DataFlow叢集作為目的叢集,並且在目的叢集中以Dedicated MirrorMaker叢集的方式部署MM2,即EMR DataFlow叢集既作為目的叢集又作為Dedicated MirrorMaker叢集。在實際業務情境中,您可以將MirrorMaker叢集部署到單獨的伺服器上。
Kafka MM2適用於下列情境:
- 遠端資料同步:通過MM2,Kafka資料可以在不同地區的叢集進行傳輸複製。
- 災備情境:通過MM2,可以構建不同資料中心的主備兩個叢集容災架構,MM2即時同步兩個叢集的資料。當其中一個叢集不可用時,可以將上面的應用程式切換到另一個叢集,從而實現異地容災功能。
- 資料移轉情境:在業務上雲、混合雲、叢集升級等情境,存在資料從舊叢集遷移到新叢集的需求。此時,您可以使用MM2實現新舊資料的遷移,保證業務的連續性。
- 彙總資料中心情境:通過MM2,可以將多個Kafka子叢集的資料同步到一個中心Kafka叢集,實現資料的匯聚。
Kafka MM2作為資料複製工具,具有以下功能:
- 複製topics資料以及配置資訊。
- 複製consumer groups及其消費topic的offset資訊。
- 複製ACLs。
- 自動檢測新的topic以及partition。
- 提供MM2的metrics。
- 高可用以及可水平擴充的架構。
MM2任務有以下執行方式:
- Distributed Connect叢集的connector方式(推薦):在已有Connect叢集執行MM2 connector任務的方式。具體操作,請參見使用MirrorMaker 2(on Connect)跨叢集同步資料。
- Dedicated MirrorMaker叢集方式:不需要使用Connect叢集執行MM2 connector任務,而是直接通過Driver程式管理MM2的所有任務。
您可以參照本文通過Driver程式來管理MM2任務。
- Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試情境下使用。
說明 推薦在Distributed Connect叢集上啟動MM2 connector任務,可以藉助Connect叢集的Rest服務管理MM2任務。
前提條件
使用限制
EMR DataFlow叢集的Kafka軟體的版本為2.12_2.4.1及以上。
操作步驟
- 準備MM2設定檔mm2.properties並上傳到您的OSS儲存。以下配置內容僅作為參考,您需要替換文本中的源叢集和目的地組群的src.bootstrap.servers和dest.bootstrap.servers,並根據實際業務需求進行相應的配置。MM2配置的詳細資料請參見Configuring Geo-Replication。
# see org.apache.kafka.clients.consumer.ConsumerConfig for more details # Sample MirrorMaker 2.0 top-level configuration file # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties # specify any number of cluster aliases clusters = src, dest # connection information for each cluster src.bootstrap.servers = <your source kafka cluster servers> dest.bootstrap.servers = <your destination kafka cluster servers> # enable and configure individual replication flows src->dest.enabled = true src->dest.topics = foo-.* groups=.* topics.blacklist="__.*" # customize as needed replication.factor=3 - 準備部署指令碼kafka_mm2_deploy.sh並上傳到OSS儲存。
#!/bin/bash SIGNAL=${SIGNAL:-TERM} PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}') if [ -n "$PIDS" ]; then echo "stop the exist mirror maker server." kill -s $SIGNAL $PIDS fi KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1 cd $KAFKA_CONF if [ -e "./mm2.properties" ]; then mv mm2.properties mm2.properties.bak fi ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret> su - kafka <<EOF exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties exit; EOF涉及替換參數如下。參數 描述 KAFKA_CONF 檢查變數路徑是否正確,如果不正確,則需要修改為實際的地址。 TAIHAO_EXECUTOR oss://<yourBucket>/mm2.properties 替換為mm2.properties的實際儲存路徑。 <yourEndpoint> OSS服務的地址。 <yourAccessKeyId> 阿里雲帳號的AccessKey ID。 <yourAccessKeySecret> 阿里雲帳號的AccessKey Secret。 - 在EMR控制台執行指令碼, 具體操作請參見手動執行指令碼。說明 在建立執行指令碼的過程中,您應正確選擇指令碼的執行節點,通常選擇所有的Broker節點。執行完成後,即實現了Kafka叢集間的資料移轉。