All Products
Search
Document Center

DataHub:Migrate data to DataHub with Kafka Mirror Maker

Last Updated:Jun 25, 2026

You can use the Kafka Mirror Maker tool to migrate data from Kafka to DataHub.

Prerequisites

Ensure that you have created a Project and a Topic. For more information, see Create a Topic.

Note
  • You can only migrate data from Kafka to DataHub. Migrating data from DataHub to Kafka is not supported.

  • DataHub does not support transactions or idempotence. You must disable idempotence in the configuration for the destination DataHub.

Procedure

  1. Upload the kafka_mirror_datahub.tgz package to your source Kafka server and extract it.

    tar -zxvf kafka_mirror_datahub.tgz
  1. Navigate to the config directory to modify the source and destination configuration files. The files are described as follows:

    1. consumer.properties: The configuration file for the source Kafka cluster.

    # The server configuration for the source Kafka cluster
    bootstrap.servers=xx:9092
    # The consumer group ID
    group.id=test-consumer-group
    auto.offset.reset=earliest
    session.timeout.ms=60000
    heartbeat.interval.ms=40000
    ssl.endpoint.identification.algorithm=
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

    b. producer.properties: The configuration file for the destination DataHub service.

    bootstrap.servers=dh-cn-zhangjiakou-pre.aliyuncs.com:9092
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"AccessKey ID\"\npassword=\"AccessKey Secret\";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    value.serializer=org.apache.kafka.common.serialization.StringSerializer
    compression.type=lz4
    # Disable idempotence
    enable.idempotence=false

    Parameter descriptions

    • For a list of domain names for the destination bootstrap.servers, see Kafka compatibility. The endpoint in this example, dh-cn-zhangjiakou-pre.aliyuncs.com:9092, corresponds to the China (Zhangjiakou) region.

    • The sasl.jaas.config parameter specifies the required login module and credentials for SASL authentication. Replace AccessKey ID and AccessKey Secret with your AccessKey information.

    • For more information about configuration items, see Kafka compatibility.

  1. Configure the topic-map.properties file for topic mapping.

    This file maps source Kafka topics to destination DataHub Topics. Each line represents a mapping rule where the left side is the source topic name and the right side is the destination. A destination DataHub Topic is specified in the Project.Topic format. Place each mapping rule on a new line.

    topicname=testproject.testtopic
    topicname1=testproject1.testtopic1
  1. Configure the log4j.properties file for logging.

    1. Create a log4j.properties file.

    2. Use the following template for your configuration:

      1. log4j.rootLogger=INFO, stdout, file
        log4j.appender.stdout=org.apache.log4j.ConsoleAppender
        log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
        log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n
        log4j.appender.file=org.apache.log4j.DailyRollingFileAppender
        log4j.appender.file.DatePattern='.'yyyy-MM-dd
        log4j.appender.file.File=/opt/logs/mm1.log
        log4j.appender.file.layout=org.apache.log4j.PatternLayout
        log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n
        log4j.logger.kafka=INFO
        log4j.logger.org.apache.kafka=INFO
        log4j.logger.kafka.tools.MirrorMaker=INFO
        log4j.logger.org.apache.zookeeper=WARN
  2. Run the migration script.

    Run the following script from the root directory of your Kafka installation and check the log output.

    Parameter descriptions

    • --consumer.config: The configuration file for the source Kafka cluster.

    • --producer.config: The configuration file for the destination DataHub service.

    • --whitelist: The source topic names. To specify multiple topics, separate them with a vertical bar (|), for example, topicA|topicB|topicC.

    • --topic.mapping.file: The configuration file for topic mapping.

    • KAFKA_LOG4J_OPTS: The path to the log configuration file.

    nohup KAFKA_LOG4J_OPTS="log4j.properties"  bin/kafka-mirror-maker.sh   --consumer.config config/consumer.properties   --producer.config config/producer.properties   --whitelist "mirrortest"   --topic.mapping.file /opt/kafka_2.12-3.7.2/config/topic-map.properties ... > /dev/null 2>&1 &
  3. Check the log for errors. The following output indicates a successful startup:

    1. [2025-08-06 17:27:41] [INFO] Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$:31)
      [2025-08-06 17:27:41] [INFO] Starting mirror maker (kafka.tools.MirrorMaker$:62)
      [2025-08-06 17:27:41] [INFO] Loaded topic mappings: mirrortest -> test_suyang.mirror (kafka.tools.MirrorMaker$:62)
      [2025-08-06 17:27:41] [INFO] ProducerConfig values:
      	acks = -1
      	batch.size = 16384
      	bootstrap.servers = [xxx]
      	buffer.memory = 33554432
      	client.dns.lookup = use_all_dns_ips
      	client.id = producer-1
      	compression.type = lz4
      	connections.max.idle.ms = 540000
      	delivery.timeout.ms = 2147483647
      	enable.idempotence = false
      	interceptor.classes = []
      	internal.auto.downgrade.txn.commit = false
      	key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
      	linger.ms = 0
      	max.block.ms = 9223372036854775807
      	max.in.flight.requests.per.connection = 1
      	max.request.size = 1048576
      	metadata.max.age.ms = 300000
      	metadata.max.idle.ms = 300000
  4. Log in to the DataHub console to verify that data has been written to DataHub.

    On the Data Bus page, navigate to the destination Topic within the destination Project, such as kafkatest/test. On the Shard List tab, confirm that the shard status is ACTIVE and that a recent data timestamp is displayed. Click Sample in the Actions column. In the panel that appears, select a Shard ID, specify the number of records to retrieve, and click Sample. In the data preview table, verify that the data was successfully written to DataHub.