All Products
Search
Document Center

AnalyticDB:Synchronize Kafka data using the APS data synchronization feature (Recommended)

Last Updated:Mar 28, 2026

The AnalyticDB Pipeline Service (APS) data synchronization feature lets you ingest ApsaraMQ for Kafka messages into AnalyticDB for MySQL in near real time, starting from any offset you choose. The feature supports near real-time data output, full historical data archiving, and elastic analytics. After a synchronization task starts, data is committed every 5 minutes by default, so the first batch of ingested data is queryable after approximately 5 minutes.

Only JSON-formatted Kafka messages are supported.

Limitations

  • Only JSON-formatted Kafka messages are supported. Other formats cause synchronization errors.

  • Kafka table schema changes are not propagated to AnalyticDB for MySQL automatically. You must make DDL changes manually.

  • Sample Kafka data larger than 8 KB is truncated by the Kafka API, which prevents the system from parsing the sample and auto-generating field mappings.

  • Data written to AnalyticDB for MySQL is not visible until a Commit operation runs. The default Commit interval is 5 minutes, so wait at least 5 minutes after starting a task before querying the first batch of data.

  • OSS paths across synchronization tasks cannot share a prefix. For example, oss://testBucketName/test/sls1/ and oss://testBucketName/test/ conflict and will cause data to be overwritten.

  • If a synchronization task fails and the Kafka topic data has expired, the cleared data cannot be recovered when you restart the task. To reduce this risk, increase the topic's data retention period. If a task fails, contact technical support promptly.

Prerequisites

Before you begin, make sure you have:

  • An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster

  • A job resource group

  • A database account for the cluster (see the table below)

  • An ApsaraMQ for Kafka instance in the same region as the AnalyticDB for MySQL cluster

  • A Kafka topic with messages already sent to it (see Quick start for ApsaraMQ for Kafka)

Database account requirements by account type:

Account typeRequired accountsAdditional steps
Alibaba Cloud accountPrivileged account onlyNone
Resource Access Management (RAM) userPrivileged account + standard accountAssociate the standard account with the RAM user

Billing

Using the data synchronization feature incurs the following fees:

How it works

  1. Add a Kafka data source to identify the ApsaraMQ for Kafka instance and topic to read from.

  2. Create a data link (synchronization job) that maps Kafka messages to an AnalyticDB for MySQL table, configures JSON parsing, partition keys, and the consumer offset.

  3. Start the task. The task begins consuming Kafka data from the selected offset and writes it to the destination table.

  4. Query the ingested data using Spark SQL.

Step 1: Create a data source

Skip this step if you have already added a Kafka data source. Go directly to Step 2: Create a data link.
  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, then click the cluster ID.

  2. In the left-side navigation pane, choose Data Ingestion > Data Sources.

  3. Click Create Data Source.

  4. On the Create Data Source page, configure the following parameters:

    ParameterDescription
    Data Source TypeSelect Kafka.
    Data Source NameAuto-generated from the source type and current time. Change it as needed.
    Data Source DescriptionEnter a description, such as the business scenario or data scope.
    Deployment ModeOnly Alibaba Cloud Instance is supported.
    Kafka InstanceThe Kafka instance ID. Find it on the Instances page of the ApsaraMQ for Kafka console.
    Kafka TopicThe topic name. Find it on the Topics page of the destination instance in the ApsaraMQ for Kafka console.
    Message Data FormatOnly JSON is supported.
  5. Click Create.

Step 2: Create a data link

  1. In the left-side navigation pane, click Simple Log Service/Kafka Data Synchronization.

  2. Click Create Synchronization Job.

  3. On the Create Synchronization Job page, configure the three sections below.

Source and destination settings

ParameterDescription
Job NameAuto-generated from the source type and current time. Change it as needed.
Data SourceSelect an existing Kafka data source or create a new one.
Destination TypeChoose where synchronized data is stored. See Choose a destination type below.
ADB Lake StorageThe lake storage for AnalyticDB for MySQL lake data. Select from the drop-down list, or click Automatically Created to create one. Required when Destination Type is Data Lake - AnalyticDB Lake Storage.
OSS PathThe OSS storage path for AnalyticDB for MySQL lake data. Required when Destination Type is Data Lake - User OSS. Select an empty folder — the path cannot be changed after creation, and it must not share a prefix with any other synchronization task's OSS path.
Storage FormatPAIMON (only available when Destination Type is Data Lake - User OSS) or ICEBERG.

Choose a destination type

Destination typeWhen to useNotes
Data Lake - AnalyticDB Lake Storage (recommended)Standard setups where you want AnalyticDB for MySQL to manage storageEnable the lake storage feature first. Only ICEBERG format is supported.
Data Lake - User OSSWhen you need to bring your own OSS bucketBoth PAIMON and ICEBERG formats are supported.

Destination database and table settings

ParameterDescription
Database NameThe destination database in AnalyticDB for MySQL. A new database is created if one with this name does not exist. If one exists, data is synchronized to it. See Limits for naming conventions. If Storage Format is PAIMON, the database must meet the requirements listed below.
Table NameThe destination table in AnalyticDB for MySQL. A new table is created if one with this name does not exist. If a table with the same name already exists, the synchronization task fails. See Limits for naming conventions.
Sample DataThe latest data auto-retrieved from the Kafka topic. The data must be in JSON format.
Parsed JSON LayersThe number of nested JSON levels to parse. Valid values: 0 (no parsing), 1 (default), 2, 3, 4. See JSON parsing levels and schema inference.
Schema Field MappingThe schema inferred from the sample data after parsing. Adjust destination field names and types, or add and remove fields as needed.
Partition Key Settings(Optional) A partition key for the destination table. Partition by log time or business logic to improve ingestion and query performance. If left blank, the table has no partitions.

PAIMON database requirements

If Storage Format is PAIMON, the destination database must meet all of the following conditions. If it does not, the synchronization task will fail.

  • It must be an external database created with CREATE EXTERNAL DATABASE <database_name>.

  • The DBPROPERTIES parameter must include catalog = paimon.

  • The DBPROPERTIES parameter must include adb.paimon.warehouse. Example: adb.paimon.warehouse=oss://testBucketName/aps/data.

  • The DBPROPERTIES parameter must include LOCATION with a .db suffix on the database name. Example: LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/. The bucket directory in this path must already exist, and the path must include .db after the database name — otherwise XIHE queries will fail.

Synchronization settings

ParameterDescription
Starting Consumer Offset for Incremental SynchronizationThe point in Kafka from which the task begins consuming data. Earliest offset (begin_cursor): consume from the oldest available data. Latest offset (end_cursor): consume from the most recent data only. Custom offset: consume from the first message at or after a specific time you select.
Job Resource GroupThe job resource group for the task to run in.
ACUs for Incremental SynchronizationThe number of ACUs allocated from the job resource group. Minimum: 2 ACUs. Maximum: the remaining available ACUs in the resource group. A higher ACU count improves ingestion performance and task stability.
Advanced SettingsCustom configurations. Contact technical support to enable.

ACU deduction example: If a job resource group has a maximum of 48 ACUs and an existing task already uses 8, the new task can use at most 40 ACUs.

  1. Click Submit.

Step 3: Start the data synchronization task

  1. On the Simple Log Service/Kafka Data Synchronization page, find the task you created and click Start in the Actions column.

  2. Click Search. The task has started successfully when its status changes to Running.

The first batch of data is queryable at least 5 minutes after the task starts, because data is committed in 5-minute intervals by default.

Step 4: Analyze the data

After data is synchronized, use Spark SQL to query it in AnalyticDB for MySQL. For more information, see Spark development editor and Offline Spark application development.

  1. In the left-side navigation pane, choose Job Development > Spark JAR Development.

  2. Enter your Spark SQL statements in the default template and click Run Now. The following is an example:

    -- Example of Spark SQL. Modify the content and run your Spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- SQL statements
    show tables from lakehouse20220413156_adbTest;
  3. (Optional) On the Applications tab, click Logs in the Actions column to view the Spark SQL run logs.

Step 5 (Optional): Manage the data source

Go to Data Ingestion > Data Sources. The following operations are available in the Actions column:

OperationDescription
Create JobCreate a data synchronization or data migration task for this data source.
ViewView the data source configuration.
EditEdit the data source name and description.
DeleteDelete the data source. If any synchronization or migration task exists for this data source, delete that task first on the Simple Log Service/Kafka Data Synchronization page.

JSON parsing levels and schema inference

The Parsed JSON Layers setting controls how many nested levels of a JSON message are expanded into separate destination fields.

Example message:

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
           }
         },
    "brand" : "none",
    "version" : "11.4.2"
  }
}
Important

Periods (.) in field names are automatically replaced with underscores (_) in destination field names.

Level 0 — No parsing. The entire JSON message is output as a single field.

JSON fieldValueDestination field name
__value__{"name":"zhangle","age":18,"device":{...}}__value__

Level 1 (default) — Top-level fields are expanded.

JSON fieldValueDestination field name
namezhanglename
age18age
device{"os":{...},"brand":"none","version":"11.4.2"}device

Level 2 — Two levels expanded. Non-nested fields are output directly; nested fields expand to their sub-fields.

JSON fieldValueDestination field name
namezhanglename
age18age
device.os{"test":"lag","member":{...}}device_os
device.brandnonedevice_brand
device.version11.4.2device_version

Level 3

JSON fieldValueDestination field name
namezhanglename
age18age
device.os.testlagdevice_os_test
device.os.member{"fa":"zhangsan","mo":"limei"}device_os_member
device.brandnonedevice_brand
device.version11.4.2device_version

Level 4

JSON fieldValueDestination field name
namezhanglename
age18age
device.os.testlagdevice_os_test
device.os.member.fazhangsandevice_os_member_fa
device.os.member.molimedevice_os_member_mo
device.brandnonedevice_brand
device.version11.4.2device_version

What's next