All Products
Search
Document Center

E-MapReduce:Data synchronization

Last Updated:Mar 26, 2026

Change Data Capture (CDC) keeps a data warehouse or data lake in sync with an upstream business database. When records are inserted, updated, or deleted in the source database, CDC detects those changes and propagates them downstream. Unlike the database field, where mature CDC tools and standards are widely available, the big data field has few established CDC practices or technical implementations — you typically need to compose existing engines to build your own CDC pipeline.

Choose a synchronization method

Two CDC methods are available on E-MapReduce (EMR). Choose based on your latency and deletion requirements.

Method How it works Supports deletions Latency When to use
Batch update Periodically exports incremental rows by timestamp or auto-incremented ID, then merges them into Delta Lake using MERGE No Minutes to hours Append-heavy workloads where deletes are not needed and near-real-time sync is not required
Real-time synchronization Streams database binary logs (binlog) through Kafka into Spark, then replays INSERT, UPDATE, and DELETE operations against a Delta table Yes Seconds Any workload that requires delete propagation or low-latency replication
The batch update method requires a primary key (or composite primary key) on the table for the MERGE operation, and a timestamp or auto-incremented field (such as a modified column) to identify incremental rows. Without such a field, full-table exports are needed for each sync cycle.

Operational trade-offs

Before choosing a method, consider these operational factors:

Factor Batch update Real-time synchronization
Small-file problem Not applicable Each micro-batch commit creates small files; requires compaction strategy
Compaction risk Not applicable Compaction can conflict with the active write stream if intervals are too frequent
Infrastructure Sqoop, HDFS ApsaraDB RDS for MySQL, Data Transmission Service (DTS), Kafka, Spark Structured Streaming
Code complexity Low High

Batch update

This example uses Sqoop to export incremental rows from MySQL and MERGE to apply them to a Delta table.

Prerequisites

Before you begin, ensure that you have:

  • A MySQL database with a table that has a timestamp or auto-incremented field to track changes

  • An EMR DataLake cluster with access to Hadoop Distributed File System (HDFS)

  • Sqoop installed on the cluster

Set up the source table

  1. Create the sales table in MySQL and insert initial data.

    CREATE TABLE sales(id LONG, date DATE, name VARCHAR(32), sales DOUBLE, modified DATETIME);
    
    INSERT INTO sales VALUES
      (1, '2019-11-11', 'Robert', 323.00, '2019-11-11 12:00:05'),
      (2, '2019-11-11', 'Lee', 500.00, '2019-11-11 16:11:46'),
      (3, '2019-11-12', 'Robert', 136.00, '2019-11-12 10:23:54'),
      (4, '2019-11-13', 'Lee', 211.00, '2019-11-13 11:33:27');

    The modified field marks when each row was last changed. Sqoop uses this field to identify incremental updates.

  2. Export the full table to HDFS.

    sqoop import \
      --connect jdbc:mysql://emr-header-1:3306/test \
      --username root \
      --password EMRroot1234 \
      -table sales \
      -m1 \
      --target-dir /tmp/cdc/staging_sales

Create the initial Delta table

  1. Start the streaming-sql client.

    streaming-sql
  2. Create a temporary external table over the HDFS export, then create the Delta table from it.

    -- LOAD DATA INPATH does not work with Delta tables; use a temporary external table instead.
    CREATE TABLE staging_sales (id LONG, date STRING, name STRING, sales DOUBLE, modified STRING)
    USING csv LOCATION '/tmp/cdc/staging_sales/';
    
    CREATE TABLE sales USING delta LOCATION '/user/hive/warehouse/test.db/test'
    SELECT * FROM staging_sales;
    
    -- Remove the temporary table and its HDFS directory.
    DROP TABLE staging_sales;
    hdfs dfs -rm -r -skipTrash /tmp/cdc/staging_sales/

Apply incremental updates

After the source database changes, export only the updated rows and merge them into the Delta table.

  1. Export rows modified after the last sync. Replace 2019-11-20 00:00:00 with the actual last-sync timestamp.

    Sqoop cannot capture deleted rows. To synchronize deletes, use real-time synchronization instead.
    sqoop import \
      --connect jdbc:mysql://emr-header-1:3306/test \
      --username root \
      --password EMRroot1234 \
      -table sales \
      -m1 \
      --target-dir /tmp/cdc/staging_sales \
      --incremental lastmodified \
      --check-column modified \
      --last-value "2019-11-20 00:00:00"
  2. Create a temporary table over the new export and merge it into the Delta table.

    CREATE TABLE staging_sales (id LONG, date STRING, name STRING, sales DOUBLE, modified STRING)
    USING csv LOCATION '/tmp/cdc/staging_sales/';
    
    MERGE INTO sales AS target
    USING staging_sales AS source
    ON target.id = source.id
    WHEN MATCHED THEN UPDATE SET *
    WHEN NOT MATCHED THEN INSERT *;

Real-time synchronization

This example streams binlog events from ApsaraDB RDS for MySQL through Data Transmission Service (DTS) and Kafka, then uses Spark Structured Streaming and the Delta merge API to replay INSERT, UPDATE, and DELETE operations in real time.

The pipeline has four stages:

  1. Source: ApsaraDB RDS for MySQL emits binlog events.

  2. Transport: DTS reads the binlog and publishes events to a Kafka topic in Avro format.

  3. Processing: Spark reads the Kafka topic, decodes events using the dts_binlog_parser user-defined function (UDF), and applies changes to a Delta table via the merge API.

  4. Sink: A Delta table reflects the latest state of the source database.

cdc_delta
This example uses DTS to export binary logs to Kafka. You can also use open-source Maxwell or Canal to export binary logs. If the binary logs of the source database are not in the standard format, you may need to write a UDF to process them. For example, the format of the binary logs provided by Simple Log Service differs from that of the binary logs provided by ApsaraDB RDS for MySQL.

Prerequisites

Before you begin, ensure that you have:

  • An ApsaraDB RDS for MySQL instance with binlog enabled, with users, databases, and permissions configured

  • An EMR DataLake cluster with SSH access (Log on to a cluster)

  • An EMR Kafka cluster (or an existing Kafka cluster)

  • DTS activated, with a synchronization task configured to publish the source table's binlog to a Kafka topic

Binlog event schema

DTS publishes binlog events to Kafka in Avro format. EMR provides the dts_binlog_parser UDF to decode each message. After decoding, each event contains the following fields.

Field Type Description
recordID Long Unique record identifier
source JSON string Source database metadata, e.g. {"sourceType": "MySQL", "version": "0.0.0.0"}
dbTable String Database and table name, e.g. delta_cdc.sales
recordType String Change type: INIT, INSERT, UPDATE, or DELETE
recordTimestamp Timestamp Timestamp of the change
extraTags JSON string Additional metadata
fields JSON array Column names, e.g. ["id","date","name","sales"]
beforeImages JSON string Column values before the change (empty for INSERT and INIT)
afterImages JSON string Column values after the change (empty for DELETE)

The recordType field drives the merge logic:

  • INIT: initial data load. Write these rows to the Delta table using append mode before starting the streaming job.

  • INSERT: insert the afterImages values as a new row.

  • UPDATE: update the matching row using afterImages; match on beforeImages.id to handle ID changes.

  • DELETE: remove the row identified by beforeImages.id.

Set up the source table and Kafka topic

  1. Create the sales table in the ApsaraDB RDS console and insert initial data.

    CREATE TABLE `sales` (
      `id` bigint(20) NOT NULL,
      `date` date DEFAULT NULL,
      `name` varchar(32) DEFAULT NULL,
      `sales` double DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    INSERT INTO sales VALUES
      (1, '2019-11-11', 'Robert', 323.00),
      (2, '2019-11-11', 'Lee', 500.00),
      (3, '2019-11-12', 'Robert', 136.00),
      (4, '2019-11-13', 'Lee', 211.00);

    add_data

  2. Create a Kafka topic named sales in the EMR Kafka cluster.

    kafka-topics.sh --create \
      --bootstrap-server core-1-1:9092 \
      --partitions 1 \
      --replication-factor 1 \
      --topic sales
  3. In the DTS console, create a synchronization task. Set Database Type to MySQL in the source section and Datasource Type to Kafka in the destination section. Configure the task to replicate the sales table's binlog to the sales Kafka topic.

Write and deploy the Spark Streaming job

The job has two phases:

  • Initial load: reads INIT records from Kafka and writes them to the Delta table.

  • Incremental streaming: reads INSERT, UPDATE, and DELETE records and replays them against the Delta table using the merge API.

Choose the Scala or SQL approach based on your preference.

  • Scala

    1. Write the Spark code.

      import io.delta.tables._ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.expressions.Window import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DataTypes, StructField} val schema = DataTypes.createStructType(Array[StructField]( DataTypes.createStructField("id", DataTypes.StringType, false), DataTypes.createStructField("date", DataTypes.StringType, true), DataTypes.createStructField("name", DataTypes.StringType, true), DataTypes.createStructField("sales", DataTypes.StringType, true) )) // Phase 1: load INIT records into the Delta table. def initDeltaTable(): Unit = { spark.read .format("kafka") .option("kafka.bootstrap.servers", "192.168.XX.XX:9092") .option("subscribe", "sales") .option("failOnDataLoss", value = false) .load() .createTempView("initData") // Decode Avro-encoded DTS events using the dts_binlog_parser UDF. val dataBatch = spark.sql( """ |SELECT dts_binlog_parser(value) |AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages) |FROM initData """.stripMargin) // Write only INIT records as the initial Delta table state. dataBatch.select(from_json(col("afterImages").cast("string"), schema).as("jsonData")) .where("recordType = 'INIT'") .select( col("jsonData.id").cast("long").as("id"), col("jsonData.date").as("date"), col("jsonData.name").as("name"), col("jsonData.sales").cast("decimal(7,2)")).as("sales") .write.format("delta").mode("append").save("/delta/sales") } try { DeltaTable.forPath("/delta/sales") } catch { case e: AnalysisException if e.getMessage().contains("is not a Delta table") => initDeltaTable() } // Phase 2: stream incremental changes and merge them into the Delta table. spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "192.168.XX.XX:9092") .option("subscribe", "sales") .option("startingOffsets", "earliest") .option("maxOffsetsPerTrigger", 1000) .option("failOnDataLoss", value = false) .load() .createTempView("incremental") // Decode Avro-encoded DTS events using the dts_binlog_parser UDF. val dataStream = spark.sql( """ |SELECT dts_binlog_parser(value) |AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages) |FROM incremental """.stripMargin) val task = dataStream.writeStream .option("checkpointLocation", "/delta/sales_checkpoint") .foreachBatch( (ops, id) => { // For each micro-batch, keep only the latest change per record. // Partition by the pre-change ID to handle UPDATE and DELETE correctly. // This also handles out-of-order events: if two updates arrive for the // same record, the one with the higher recordId wins. val windowSpec = Window .partitionBy(coalesce(col("before_id"), col("id"))) .orderBy(col("recordId").desc) val mergeDf = ops .select( col("recordId"), col("recordType"), from_json(col("beforeImages").cast("string"), schema).as("before"), from_json(col("afterImages").cast("string"), schema).as("after")) .where("recordType != 'INIT'") .select( col("recordId"), col("recordType"), // Use beforeImages.id as the lookup key for UPDATE and DELETE. when(col("recordType") === "INSERT", col("after.id")).otherwise(col("before.id")).cast("long").as("before_id"), when(col("recordType") === "DELETE", col("before.id")).otherwise(col("after.id")).cast("long").as("id"), when(col("recordType") === "DELETE", col("before.date")).otherwise(col("after.date")).as("date"), when(col("recordType") === "DELETE", col("before.name")).otherwise(col("after.name")).as("name"), when(col("recordType") === "DELETE", col("before.sales")).otherwise(col("after.sales")).cast("decimal(7,2)").as("sales") ) .select( dense_rank().over(windowSpec).as("rk"), col("recordType"), col("before_id"), col("id"), col("date"), col("name"), col("sales") ) .where("rk = 1") // Merge the micro-batch into the Delta table. val mergeCond = "target.id = source.before_id" DeltaTable.forPath(spark, "/delta/sales").as("target") .merge(mergeDf.as("source"), mergeCond) .whenMatched("source.recordType='UPDATE'") .updateExpr(Map( "id" -> "source.id", "date" -> "source.date", "name" -> "source.name", "sales" -> "source.sales")) .whenMatched("source.recordType='DELETE'") .delete() .whenNotMatched("source.recordType='INSERT' OR source.recordType='UPDATE'") .insertExpr(Map( "id" -> "source.id", "date" -> "source.date", "name" -> "source.name", "sales" -> "source.sales")) .execute() } ).start() task.awaitTermination()
    2. Package and deploy the code.

      1. After debugging locally, package the JAR.

        mvn clean install
      2. Log on to the DataLake cluster via SSH and upload the JAR to the root directory.

    3. Submit the Spark Streaming job.

      spark-submit \ --master yarn \ --deploy-mode cluster \ --driver-memory 1g \ --executor-cores 2 \ --executor-memory 3g \ --num-executors 1 \ --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.1.2 \ --class com.aliyun.delta.StreamToDelta \ delta-demo-1.0.jar
      Note: This example uses delta-demo-1.0.jar and the class com.aliyun.delta.StreamToDelta. Replace both with the actual JAR name and class name for your implementation.
  • SQL

    1. Start the streaming-sql client.

      streaming-sql --master local
    2. Run the following SQL statements to load initial data and start the streaming merge job.

      -- Create a Kafka source table. CREATE TABLE kafka_sales USING kafka OPTIONS( kafka.bootstrap.servers='192.168.XX.XX:9092', subscribe='sales' ); -- Create the Delta sink table. CREATE TABLE delta_sales(id long, date string, name string, sales decimal(7, 2)) USING delta LOCATION '/delta/sales'; -- Load INIT records as the initial data. INSERT INTO delta_sales SELECT CAST(jsonData.id AS LONG), jsonData.date, jsonData.name, jsonData.sales FROM ( SELECT from_json(CAST(afterImages as STRING), 'id STRING, date DATE, name STRING, sales STRING') as jsonData FROM ( SELECT dts_binlog_parser(value) AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages) FROM kafka_sales ) binlog WHERE recordType='INIT' ) binlog_wo_init; -- Define the incremental stream over the Kafka source. CREATE SCAN incremental on kafka_sales USING STREAM OPTIONS( startingOffsets='earliest', maxOffsetsPerTrigger='1000', failOnDataLoss=false ); -- Start the streaming merge job. CREATE STREAM job OPTIONS( checkpointLocation='/delta/sales_checkpoint' ) MERGE INTO delta_sales as target USING ( SELECT recordId, recordType, before_id, id, date, name, sales FROM ( SELECT recordId, recordType, CASE WHEN recordType = "INSERT" then after.id else before.id end as before_id, CASE WHEN recordType = "DELETE" then CAST(before.id as LONG) else CAST(after.id as LONG) end as id, CASE WHEN recordType = "DELETE" then before.date else after.date end as date, CASE WHEN recordType = "DELETE" then before.name else after.name end as name, CASE WHEN recordType = "DELETE" then CAST(before.sales as DECIMAL(7, 2)) else CAST(after.sales as DECIMAL(7, 2)) end as sales, dense_rank() OVER (PARTITION BY coalesce(before.id, after.id) ORDER BY recordId DESC) as rank FROM ( SELECT recordId, recordType, from_json(CAST(beforeImages as STRING), 'id STRING, date STRING, name STRING, sales STRING') as before, from_json(CAST(afterImages as STRING), 'id STRING, date STRING, name STRING, sales STRING') as after FROM ( SELECT dts_binlog_parser(value) AS (recordID, source, dbTable, recordType, recordTimestamp, extraTags, fields, beforeImages, afterImages) FROM incremental ) binlog WHERE recordType != 'INIT' ) binlog_wo_init ) binlog_extract WHERE rank=1 ) as source ON target.id = source.before_id WHEN MATCHED AND source.recordType='UPDATE' THEN UPDATE SET id=source.id, date=source.date, name=source.name, sales=source.sales WHEN MATCHED AND source.recordType='DELETE' THEN DELETE WHEN NOT MATCHED AND (source.recordType='INSERT' OR source.recordType='UPDATE') THEN INSERT (id, date, name, sales) VALUES (source.id, source.date, source.name, source.sales);

Verify the pipeline

  1. Start a spark-shell session and read the Delta table.

    spark-shell --master local
    spark.read.format("delta").load("/delta/sales").show
    
    +---+----------+------+------+
    | id|      date|  name| sales|
    +---+----------+------+------+
    |  1|2019-11-11|Robert|323.00|
    |  2|2019-11-11|   Lee|500.00|
    |  3|2019-11-12|Robert|136.00|
    |  4|2019-11-13|   Lee|211.00|
    +---+----------+------+------+
  2. Run the following statements in the ApsaraDB RDS console to make changes to the source table.

    Row 2 is updated twice. The streaming job uses a dense_rank() window to keep only the latest change per record within each micro-batch, so the final value of sales for row 2 reflects the second update (175).
    DELETE FROM sales WHERE id = 1;
    UPDATE sales SET sales = 150 WHERE id = 2;
    UPDATE sales SET sales = 175 WHERE id = 2;
    INSERT INTO sales VALUES (5, '2019-11-14', 'Robert', 233);

    delta_data

  3. Read the Delta table again to confirm the changes have been applied.

    spark.read.format("delta").load("/delta/sales").show
    
    +---+----------+------+------+
    | id|      date|  name| sales|
    +---+----------+------+------+
    |  5|2019-11-14|Robert|233.00|
    |  3|2019-11-12|Robert|136.00|
    |  4|2019-11-13|   Lee|211.00|
    |  2|2019-11-11|   Lee|175.00|
    +---+----------+------+------+

    Row 1 is deleted, row 2 reflects the latest update, and row 5 is inserted.

Best practices

Manage small files in real-time mode

Real-time synchronization writes many small files to the Delta table as each micro-batch commits. Over time, this degrades query performance. Use one or both of the following strategies.

Partition the table by date or another high-cardinality column. Most writes land in the current partition, so you can compact historical partitions without risking transaction conflicts with the active write stream. Partition-pruned queries are also significantly faster than full-table scans.

Compact periodically within the streaming job. Run a compaction (for example, every 10 micro-batches) to merge small files. Keep compaction intervals infrequent enough that the operation completes before the next micro-batch begins; otherwise, in-flight writes may be delayed.

Use batch mode when low latency is not required

If data does not need to be synchronized in real time, collect binlog events to Object Storage Service (OSS) on a schedule rather than processing them in a continuous stream. Use DTS to publish events to Kafka, then use kafka-connect-oss to archive them to OSS. A Spark batch job can then read all accumulated events from OSS and apply them to Delta Lake in a single merge operation. This eliminates the small-file problem and removes the risk of compaction failures caused by transaction conflicts with the streaming writer.

cdc_delta
The components framed in dotted lines in the diagram can be replaced with equivalent tools.

Appendix: Inspect binlog events in Kafka

Use the dts_binlog_parser UDF to decode Avro-encoded DTS events and inspect the raw binlog data.

  • Scala

    1. Start a spark-shell session.

      spark-shell --master local
    2. Read and decode the Kafka topic.

      spark.read .format("kafka") .option("kafka.bootstrap.servers", "192.168.XX.XX:9092") .option("subscribe", "sales") .option("maxOffsetsPerTrigger", 1000) .load() .createTempView("kafkaData") val kafkaDF = spark.sql("SELECT dts_binlog_parser(value) FROM kafkaData") kafkaDF.show(false)
  • SQL

    1. Start the streaming-sql client.

      streaming-sql --master local
    2. Create a table and query the decoded events.

      CREATE TABLE kafkaData USING kafka OPTIONS( kafka.bootstrap.servers='192.168.XX.XX:9092', subscribe='sales' ); SELECT dts_binlog_parser(value) FROM kafkaData;

      The output looks similar to:

      +--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+ |recordid|source                                       |dbtable        |recordtype|recordtimestamp    |extratags|fields                      |beforeimages|afterimages                                                   | +--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+ |1       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"323.0","date":"2019-11-11","name":"Robert","id":"1"}| |2       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"500.0","date":"2019-11-11","name":"Lee","id":"2"}   | |3       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"136.0","date":"2019-11-12","name":"Robert","id":"3"}| |4       |{"sourceType": "MySQL", "version": "0.0.0.0"}|delta_cdc.sales|INIT      |1970-01-01 08:00:00|{}       |["id","date","name","sales"]|{}          |{"sales":"211.0","date":"2019-11-13","name":"Lee","id":"4"}   | +--------+---------------------------------------------+---------------+----------+-------------------+---------+----------------------------+------------+--------------------------------------------------------------+