You can use the Kafka Mirror Maker tool to migrate data from Kafka to DataHub.
Prerequisites
Ensure that you have created a Project and a Topic. For more information, see Create a Topic.
-
You can only migrate data from Kafka to DataHub. Migrating data from DataHub to Kafka is not supported.
-
DataHub does not support transactions or idempotence. You must disable idempotence in the configuration for the destination DataHub.
Procedure
-
Upload the kafka_mirror_datahub.tgz package to your source Kafka server and extract it.
tar -zxvf kafka_mirror_datahub.tgz
-
Navigate to the
configdirectory to modify the source and destination configuration files. The files are described as follows:-
consumer.properties: The configuration file for the source Kafka cluster.
# The server configuration for the source Kafka cluster bootstrap.servers=xx:9092 # The consumer group ID group.id=test-consumer-group auto.offset.reset=earliest session.timeout.ms=60000 heartbeat.interval.ms=40000 ssl.endpoint.identification.algorithm= key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializerb.
producer.properties: The configuration file for the destination DataHub service.bootstrap.servers=dh-cn-zhangjiakou-pre.aliyuncs.com:9092 sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required\nusername=\"AccessKey ID\"\npassword=\"AccessKey Secret\"; security.protocol=SASL_SSL sasl.mechanism=PLAIN key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=org.apache.kafka.common.serialization.StringSerializer compression.type=lz4 # Disable idempotence enable.idempotence=falseParameter descriptions
-
For a list of domain names for the destination
bootstrap.servers, see Kafka compatibility. The endpoint in this example,dh-cn-zhangjiakou-pre.aliyuncs.com:9092, corresponds to the China (Zhangjiakou) region. -
The
sasl.jaas.configparameter specifies the required login module and credentials for SASL authentication. ReplaceAccessKey IDandAccessKey Secretwith your AccessKey information. -
For more information about configuration items, see Kafka compatibility.
-
-
Configure the
topic-map.propertiesfile for topic mapping.This file maps source Kafka topics to destination DataHub Topics. Each line represents a mapping rule where the left side is the source topic name and the right side is the destination. A destination DataHub Topic is specified in the
Project.Topicformat. Place each mapping rule on a new line.topicname=testproject.testtopic topicname1=testproject1.testtopic1
-
Configure the
log4j.propertiesfile for logging.-
Create a
log4j.propertiesfile. -
Use the following template for your configuration:
log4j.rootLogger=INFO, stdout, file log4j.appender.stdout=org.apache.log4j.ConsoleAppender log4j.appender.stdout.layout=org.apache.log4j.PatternLayout log4j.appender.stdout.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.appender.file=org.apache.log4j.DailyRollingFileAppender log4j.appender.file.DatePattern='.'yyyy-MM-dd log4j.appender.file.File=/opt/logs/mm1.log log4j.appender.file.layout=org.apache.log4j.PatternLayout log4j.appender.file.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss}] [%p] %m (%c:%L)%n log4j.logger.kafka=INFO log4j.logger.org.apache.kafka=INFO log4j.logger.kafka.tools.MirrorMaker=INFO log4j.logger.org.apache.zookeeper=WARN
-
-
Run the migration script.
Run the following script from the root directory of your Kafka installation and check the log output.
Parameter descriptions
-
--consumer.config: The configuration file for the source Kafka cluster. -
--producer.config: The configuration file for the destination DataHub service. -
--whitelist: The source topic names. To specify multiple topics, separate them with a vertical bar (|), for example,topicA|topicB|topicC. -
--topic.mapping.file: The configuration file for topic mapping. -
KAFKA_LOG4J_OPTS: The path to the log configuration file.
nohup KAFKA_LOG4J_OPTS="log4j.properties" bin/kafka-mirror-maker.sh --consumer.config config/consumer.properties --producer.config config/producer.properties --whitelist "mirrortest" --topic.mapping.file /opt/kafka_2.12-3.7.2/config/topic-map.properties ... > /dev/null 2>&1 & -
-
Check the log for errors. The following output indicates a successful startup:
-
[2025-08-06 17:27:41] [INFO] Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$:31) [2025-08-06 17:27:41] [INFO] Starting mirror maker (kafka.tools.MirrorMaker$:62) [2025-08-06 17:27:41] [INFO] Loaded topic mappings: mirrortest -> test_suyang.mirror (kafka.tools.MirrorMaker$:62) [2025-08-06 17:27:41] [INFO] ProducerConfig values: acks = -1 batch.size = 16384 bootstrap.servers = [xxx] buffer.memory = 33554432 client.dns.lookup = use_all_dns_ips client.id = producer-1 compression.type = lz4 connections.max.idle.ms = 540000 delivery.timeout.ms = 2147483647 enable.idempotence = false interceptor.classes = [] internal.auto.downgrade.txn.commit = false key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer linger.ms = 0 max.block.ms = 9223372036854775807 max.in.flight.requests.per.connection = 1 max.request.size = 1048576 metadata.max.age.ms = 300000 metadata.max.idle.ms = 300000
-
-
Log in to the DataHub console to verify that data has been written to DataHub.
On the Data Bus page, navigate to the destination Topic within the destination Project, such as
kafkatest/test. On the Shard List tab, confirm that the shard status isACTIVEand that a recent data timestamp is displayed. Click Sample in the Actions column. In the panel that appears, select a Shard ID, specify the number of records to retrieve, and click Sample. In the data preview table, verify that the data was successfully written to DataHub.