全部產品
Search
文件中心

E-MapReduce:使用MirrorMaker 2(Dedicated)跨叢集同步資料

更新時間:Jul 01, 2024

本文通過樣本為您介紹如何通過EMR的叢集指令碼功能,快速部署使用MirrorMaker 2.0(MM2)服務同步資料。

背景資訊

本文的業務情境以EMR DataFlow叢集作為目的叢集,並且在目的叢集中以Dedicated MirrorMaker叢集的方式部署MM2,即EMR DataFlow叢集既作為目的叢集又作為Dedicated MirrorMaker叢集。在實際業務情境中,您可以將MirrorMaker叢集部署到單獨的伺服器上。

Kafka MM2適用於下列情境:
  • 遠端資料同步:通過MM2,Kafka資料可以在不同地區的叢集進行傳輸複製。
  • 災備情境:通過MM2,可以構建不同資料中心的主備兩個叢集容災架構,MM2即時同步兩個叢集的資料。當其中一個叢集不可用時,可以將上面的應用程式切換到另一個叢集,從而實現異地容災功能。
  • 資料移轉情境:在業務上雲、混合雲、叢集升級等情境,存在資料從舊叢集遷移到新叢集的需求。此時,您可以使用MM2實現新舊資料的遷移,保證業務的連續性。
  • 彙總資料中心情境:通過MM2,可以將多個Kafka子叢集的資料同步到一個中心Kafka叢集,實現資料的匯聚。
Kafka MM2作為資料複製工具,具有以下功能:
  • 複製topics資料以及配置資訊。
  • 複製consumer groups及其消費topic的offset資訊。
  • 複製ACLs。
  • 自動檢測新的topic以及partition。
  • 提供MM2的metrics。
  • 高可用以及可水平擴充的架構。
MM2任務有以下執行方式:
  • Distributed Connect叢集的connector方式(推薦):在已有Connect叢集執行MM2 connector任務的方式。具體操作,請參見使用MirrorMaker 2(on Connect)跨叢集同步資料
  • Dedicated MirrorMaker叢集方式:不需要使用Connect叢集執行MM2 connector任務,而是直接通過Driver程式管理MM2的所有任務。

    您可以參照本文通過Driver程式來管理MM2任務。

  • Standalone Connect的worker方式:執行單個MirrorSourceConnector任務,適合在測試情境下使用。
說明 推薦在Distributed Connect叢集上啟動MM2 connector任務,可以藉助Connect叢集的Rest服務管理MM2任務。

前提條件

  • 已建立兩個Kafka叢集,一個為源叢集,一個為目的叢集(EMR DataFlow叢集),並選擇了Kafka服務,建立DataFlow叢集詳情請參見建立叢集
    說明 本文樣本的源和目的叢集都以EMR-3.42.0版本的DataFlow叢集為例。
  • 已在OSS上建立儲存空間,詳情請參見控制台建立儲存空間

使用限制

EMR DataFlow叢集的Kafka軟體的版本為2.12_2.4.1及以上。

操作步驟

  1. 準備MM2設定檔mm2.properties並上傳到您的OSS儲存。
    以下配置內容僅作為參考,您需要替換文本中的源叢集和目的地組群的src.bootstrap.serversdest.bootstrap.servers,並根據實際業務需求進行相應的配置。MM2配置的詳細資料請參見Configuring Geo-Replication
    # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
    
    # Sample MirrorMaker 2.0 top-level configuration file
    # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
    
    # specify any number of cluster aliases
    clusters = src, dest
    
    # connection information for each cluster
    src.bootstrap.servers = <your source kafka cluster servers>
    dest.bootstrap.servers = <your destination kafka cluster servers>
    
    # enable and configure individual replication flows
    src->dest.enabled = true
    src->dest.topics = foo-.*
    groups=.*
    topics.blacklist="__.*"
    
    # customize as needed
    replication.factor=3
  2. 準備部署指令碼kafka_mm2_deploy.sh並上傳到OSS儲存。
    #!/bin/bash
    SIGNAL=${SIGNAL:-TERM}
    PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}')
    if [ -n "$PIDS" ]; then
      echo "stop the exist mirror maker server."
      kill -s $SIGNAL $PIDS
    fi
    KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf
    TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1
    cd $KAFKA_CONF
    if [ -e "./mm2.properties" ]; then
      mv mm2.properties mm2.properties.bak
    fi
    ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret>
    su - kafka <<EOF
    exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties
    exit;
    EOF
    涉及替換參數如下。
    參數描述
    KAFKA_CONF檢查變數路徑是否正確,如果不正確,則需要修改為實際的地址。
    TAIHAO_EXECUTOR
    oss://<yourBucket>/mm2.properties替換為mm2.properties的實際儲存路徑。
    <yourEndpoint>OSS服務的地址。
    <yourAccessKeyId>阿里雲帳號的AccessKey ID。
    <yourAccessKeySecret>阿里雲帳號的AccessKey Secret。
  3. 在EMR控制台執行指令碼, 具體操作請參見手動執行指令碼
    說明 在建立執行指令碼的過程中,您應正確選擇指令碼的執行節點,通常選擇所有的Broker節點。
    執行完成後,即實現了Kafka叢集間的資料移轉。