This topic describes how to use the cluster script feature of E-MapReduce (EMR) to quickly deploy and use the MirrorMaker 2.0 (MM2) service to synchronize data.

Background information

In this topic, an EMR Dataflow cluster is used as the cluster to which data is to be synchronized, and MM2 is deployed in the cluster in dedicated mode. This way, the EMR Dataflow cluster is used as both a destination cluster and a dedicated MirrorMaker cluster. In actual business scenarios, you can deploy a MirrorMaker cluster on a separate server.

Kafka MM2 is suitable for the following scenarios:
  • Remote data synchronization: You can use Kafka MM2 to synchronize data among clusters in different regions.
  • Disaster recovery: You can use Kafka MM2 to build a disaster recovery architecture that consists of primary and secondary clusters in different data centers. Data in the two clusters can be synchronized in real time. If one cluster becomes unavailable, you can transfer applications in the cluster to a different cluster. This ensures geo-disaster recovery.
  • Data migration: In scenarios such as cloud migration of businesses, hybrid clouds, and cluster upgrades, data needs to be migrated from the original cluster to a new cluster. You can use Kafka MM2 to migrate data to ensure business continuity.
  • Data aggregation: You can use Kafka MM2 to synchronize data from multiple Kafka sub-clusters to a Kafka central cluster. This way, data can be aggregated.
As a data replication tool, Kafka MM2 provides the following features:
  • Replicates the data and configuration information of topics.
  • Replicates the offset information of consumer groups and the consumed topics.
  • Replicates access control lists (ACLs).
  • Automatically detects new topics and partitions.
  • Provides Kafka MM2 metrics.
  • Provides high-availability architectures that are horizontally scalable.
MM2 tasks can be run by using one of the following three methods:
  • Method 1 (Recommended): Run MM2 connector tasks in an existing distributed Kafka Connect cluster. For more information, see Use Kafka MM2 to synchronize data across clusters.
  • Method 2: Deploy a dedicated MirrorMaker cluster. This way, you can run driver programs to manage all MM2 tasks.

    You can run driver programs to manage MM2 tasks by referring to this topic.

  • Method 3: Run a MirrorSourceConnector task on a single Connect worker. This method is suitable for test scenarios.
Note We recommend that you run MM2 connector tasks in a distributed Kafka Connect cluster. You can use the REST service of the Connect cluster to manage MM2 tasks.

Prerequisites

  • Two clusters are created and the Kafka service is selected from the optional services during cluster creation. One is the source cluster and the other is the destination EMR Dataflow cluster. For more information about how to create a Dataflow cluster, see Create a cluster.
    Note In this example, the source and destination clusters are both Dataflow clusters of EMR V3.42.0.
  • A bucket is created in Object Storage Service (OSS). For more information, see Create buckets.

Limits

The version of the Kafka service that is selected for EMR Dataflow clusters must be 2.12_2.4.1 or later.

Procedure

  1. Prepare the mm2.properties configuration file of MM2 and upload the configuration file to your OSS bucket.
    The following configuration is for reference only. You need to change the values of the src.bootstrap.servers and dest.bootstrap.servers parameters of the source cluster and destination cluster and configure other parameters based on actual business requirements. For more information about the details of MM2 configurations, see Configuring Geo-Replication.
    # see org.apache.kafka.clients.consumer.ConsumerConfig for more details
    
    # Sample MirrorMaker 2.0 top-level configuration file
    # Run with ./bin/connect-mirror-maker.sh connect-mirror-maker.properties
    
    # specify any number of cluster aliases
    clusters = src, dest
    
    # connection information for each cluster
    src.bootstrap.servers = <your source kafka cluster servers>
    dest.bootstrap.servers = <your destination kafka cluster servers>
    
    # enable and configure individual replication flows
    src->dest.enabled = true
    src->dest.topics = foo-.*
    groups=.*
    topics.blacklist="__.*"
    
    # customize as needed
    replication.factor=3
  2. Prepare the kafka_mm2_deploy.sh deployment script and upload the script to your OSS bucket.
    #!/bin/bash
    SIGNAL=${SIGNAL:-TERM}
    PIDS=$(ps ax | grep -i 'org.apache.kafka.connect.mirror.MirrorMaker' | grep java | grep -v grep | awk '{print $1}')
    if [ -n "$PIDS" ]; then
      echo "stop the exist mirror maker server."
      kill -s $SIGNAL $PIDS
    fi
    KAFKA_CONF=/etc/taihao-apps/kafka-conf/kafka-conf
    TAIHAO_EXECUTOR=/usr/local/taihao-executor-all/executor/1.0.1
    cd $KAFKA_CONF
    if [ -e "./mm2.properties" ]; then
      mv mm2.properties mm2.properties.bak
    fi
    ${TAIHAO_EXECUTOR}/ossutil64 cp oss://<yourBuket>/mm2.properties ./ -e <yourEndpoint> -i <yourAccessKeyId> -k <yourAccessKeySecret>
    su - kafka <<EOF
    exec connect-mirror-maker.sh -daemon $KAFKA_CONF/mm2.properties
    exit;
    EOF
    The following table describes the parameters whose values need to be changed.
    Parameter Description
    KAFKA_CONF The variables. Check whether the storage locations are correct. If the values are incorrect, you need to change the values to the actual locations.
    TAIHAO_EXECUTOR
    oss://<yourBucket>/mm2.properties The storage path of the mm2.properties configuration file. Replace the value with the actual storage path of the mm2.properties configuration file.
    <yourEndpoint> The endpoint of the OSS service.
    <yourAccessKeyId> The AccessKey ID of your Alibaba Cloud account.
    <yourAccessKeySecret> The AccessKey secret of your Alibaba Cloud account.
  3. Execute the script in the EMR console. For more information, see Manually run scripts.
    Note When you create an execution script, you need to select the correct node on which the script is executed. In most cases, all brokers are selected.
    After the execution is complete, data is synchronized between Kafka clusters.