All Products
Search
Document Center

AnalyticDB:Synchronize Kafka data using the APS data synchronization feature (Recommended)

Last Updated:Dec 05, 2025

AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS) data synchronization feature. You can use this feature to create a Kafka data link to ingest data from Kafka in real time starting from a specific offset. This feature supports near real-time data output, full historical data archiving, and elastic analytics. This topic describes how to add a Kafka data source, create and start a Kafka data link, and then analyze the data and manage the data source.

Prerequisites

Notes

  • Only Kafka data in JSON format can be synchronized.

  • Data in a Kafka topic is automatically cleared after a specific period. If a data synchronization task fails and the topic data has expired, the cleared data cannot be retrieved when you restart the task. This can cause data loss. To prevent this, increase the data lifecycle of the topic. If a synchronization task fails, contact technical support promptly.

  • If the sample Kafka data is larger than 8 KB, the Kafka API truncates the data. This causes the system to fail to parse the sample data into JSON format, and field mapping information cannot be automatically generated.

  • Changes to the source Kafka table schema do not automatically trigger DDL changes in AnalyticDB for MySQL.

  • After data is ingested, a Commit operation must be executed to make the written data visible. To prevent a short Commit operation interval from affecting job stability and read and write performance, the data synchronization feature of AnalyticDB for MySQL has a default Commit operation interval of 5 minutes. Therefore, when you create and start a data synchronization job for the first time, you must wait at least 5 minutes to view the first batch of written data.

Billing

Using the AnalyticDB for MySQL data synchronization feature incurs the following fees.

Procedure

Create a data source

Note

If you have already added a Kafka data source, skip this step and proceed to Create a data link.

  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.

  2. In the navigation pane on the left, choose Data Ingestion > Data Sources.

  3. In the upper-left corner, click Create Data Source.

  4. On the Create Data Source page, configure the parameters. The following table describes the parameters.

    Parameter

    Description

    Data Source Type

    Select Kafka.

    Data Source Name

    The system automatically generates a name based on the data source type and the current time. You can change the name as needed.

    Data Source Description

    Enter a description for the data source, such as the data lakehouse scenario or business limits.

    Deployment Mode

    Only Alibaba Cloud Instance is supported.

    Kafka Instance

    The Kafka instance ID.

    Log on to the ApsaraMQ for Kafka console and view the instance ID on the Instances page.

    Kafka Topic

    The name of the topic created in Kafka.

    Log on to the ApsaraMQ for Kafka console and view the topic name on the Topics page of the destination instance.

    Message Data Format

    The data format of Kafka messages. Only JSON is supported.

  5. After you configure the parameters, click Create.

Create a data link

  1. In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.

  2. In the upper-left corner, click Create Synchronization Job.

  3. On the Create Synchronization Job page, configure the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings sections.

    • The following table describes the parameters for Source and Destination Settings.

      Parameter

      Description

      Job Name

      The name of the data link. The system automatically generates a name based on the data source type and the current time. You can change the name as needed.

      Data Source

      Select an existing Kafka data source or create a new one.

      Destination Type

      Valid values:

      • Data Lake - User OSS.

      • Data Lake - AnalyticDB Lake Storage (Recommended).

        Important

        If you set Destination type to Data Lake - AnalyticDB Lake Storage, you must enable the lake storage feature.

      ADB Lake Storage

      The name of the lake storage where the AnalyticDB for MySQL lake data is located.

      Select a destination lake storage from the drop-down list. If no lake storage has been created, click Automatically Created in the drop-down list to automatically create one.

      Important

      This parameter is required when you set Destination Type to Data Lake - AnalyticDB Lake Storage.

      OSS Path

      The storage path in OSS for the AnalyticDB for MySQL lake data.

      Important
      • This parameter is required when you set Destination Type to Data Lake - User OSS.

      • The buckets displayed are all buckets in the same region as the AnalyticDB for MySQL cluster. You can select any of them. Plan the storage path carefully. You cannot change it after creation.

      • Select an empty folder. The OSS path cannot have a prefix relationship with the OSS paths of other tasks to prevent data from being overwritten. For example, if the OSS paths for two data synchronization tasks are oss://testBucketName/test/sls1/ and oss://testBucketName/test/, they have a prefix relationship, which will cause data to be overwritten during data synchronization.

      Storage Format

      The data storage format. Valid values:

      • PAIMON.

        Important

        This format is supported only when Destination Type is set to Data Lake - User OSS.

      • ICEBERG.

    • The following table describes the parameters for Destination Database and Table Settings.

      Parameter

      Description

      Database Name

      The name of the destination database in AnalyticDB for MySQL. If a database with the same name does not exist, a new one is created. If a database with the same name exists, data is synchronized to the existing database. For more information about database naming conventions, see Limits.

      Important

      In the Source and Destination Settings section, if you set Storage Format to PAIMON, an existing database must meet the following conditions. Otherwise, the data synchronization task will fail.

      • It must be an external database. The statement to create the database must be CREATE EXTERNAL DATABASE<database_name>.

      • The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the catalog property, and the value of catalog must be paimon.

      • The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the adb.paimon.warehouse property. For example: adb.paimon.warehouse=oss://testBucketName/aps/data.

      • The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the LOCATION property, and you must add .db after the database name. Otherwise, XIHE queries will fail. For example: LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.

        The bucket directory in the OSS path configured for LOCATION must exist. Otherwise, creating the database will fail.

      Table Name

      The name of the destination table in AnalyticDB for MySQL. If a table with the same name does not exist in the database, a new one is created. If a table with the same name already exists, the data synchronization will fail. For more information about table naming conventions, see Limits.

      Sample Data

      The latest data is automatically retrieved from the Kafka topic and used as sample data.

      Note

      The data in the Kafka topic must be in JSON format. If other data formats exist, an error will occur during data synchronization.

      Parsed JSON Layers

      Set the number of nested levels to parse in the JSON data. Valid values:

      • 0: No parsing.

      • 1 (Default): Parse one level.

      • 2: Parse two levels.

      • 3: Parse three levels.

      • 4: Parse four levels.

      For more information about the JSON nested parsing policy, see Example of JSON parsing levels and schema inference.

      Schema Field Mapping

      Displays the schema information of the sample data after JSON parsing. You can adjust the destination field names and types, or add or delete fields as needed.

      Partition Key Settings

      Set a partition key for the destination table. We recommend configuring partitions based on log time or business logic to ensure data ingestion and query performance. If you do not set a partition key, the destination table will not have partitions by default.

      You can format the destination partition key using a time format or by specifying a partition field.

      • To partition by date and time, select a date-time field for the partition field name. For the format handling method, select Time Formatting, then select the source field format and the destination partition format. AnalyticDB for MySQL identifies the value of the partition field based on the source field format and converts it to the destination partition format for partitioning. For example, if the source field is gmt_created with a value of 1711358834, the source field format is a second-level precision timestamp, and the destination partition format is yyyyMMdd, the data will be partitioned by 20240325.

      • To partition by field value, select Specify Partition Field for the format handling method.

    • The following table describes the parameters for Synchronization Settings.

      Parameter

      Description

      Starting Consumer Offset for Incremental Synchronization

      When the sync task starts, it begins consuming Kafka data from the selected point in time. Valid values:

      • Earliest offset (begin_cursor): Automatically consume data from the earliest point in time in the Kafka data.

      • Latest offset (end_cursor): Automatically consume data from the latest point in time in the Kafka data.

      • Custom offset: You can select any point in time. The system will start consuming from the first piece of data in Kafka that is at or after this time.

      Job Resource Group

      Specify the Job resource group for the task to run in.

      ACUs for Incremental Synchronization

      Specify the number of ACUs for the Job resource group. The minimum number of ACUs is 2, and the maximum is the maximum available computing resources of the Job resource group. We recommend specifying a higher number of ACUs to improve data ingestion performance and task stability.

      Note

      When you create a data synchronization task, it uses elastic resources from the Job resource group. Data synchronization tasks occupy resources for a long time, so the system deducts the resources used by the task from the resource group. For example, if a Job resource group has a maximum of 48 ACUs and you have already created a sync task that uses 8 ACUs, the maximum number of ACUs you can select for another sync task in this resource group is 40.

      Advanced Settings

      Advanced configurations let you customize the sync task. To make custom configurations, contact technical support.

  4. After you configure the parameters, click Submit.

Start the data synchronization task

  1. On the Simple Log Service/Kafka Data Synchronization page, find the data synchronization task that you created and click Start in the Actions column.

  2. In the upper-left corner, click Search. The task has started successfully when its status changes to Running.

Data analytics

After the data is synchronized, you can use the Spark Jar development feature to analyze the data in AnalyticDB for MySQL. For more information about Spark development, see Spark development editor and Offline Spark application development.

  1. In the navigation pane on the left, choose Job Development > Spark JAR Development.

  2. Enter the sample statements in the default template and click Run Now.

    -- Here is just an example of SparkSQL. Modify the content and run your spark program.
    
    conf spark.driver.resourceSpec=medium;
    conf spark.executor.instances=2;
    conf spark.executor.resourceSpec=medium;
    conf spark.app.name=Spark SQL Test;
    conf spark.adb.connectors=oss;
    
    -- Here are your sql statements
    show tables from lakehouse20220413156_adbTest;
  3. Optional: On the Applications tab, click Logs in the Actions column to view the run logs of the Spark SQL job.

Manage the data source

In the navigation pane on the left, choose Data Ingestion > Data Sources. You can perform the following operations in the Actions column.

Operation

Description

Create Job

Quickly go to the page for creating a data synchronization or data migration task for this data source.

View

View the detailed configuration of the data source.

Edit

Edit the properties of the data source, such as its name and description.

Delete

Delete the current data source.

Note

If a data synchronization or data migration task exists for the data source, you cannot delete the data source directly. You must first go to the Simple Log Service/Kafka Data Synchronization page, find the target sync task, and click Delete in the Actions column to delete the data synchronization or data migration task.

Example of JSON parsing levels and schema inference

The parsing level specifies the number of nested levels to parse in the JSON data. For example, a user sends the following JSON data to Kafka.

{
  "name" : "zhangle",
  "age" : 18,
  "device" : {
    "os" : {
        "test":lag,
        "member":{
             "fa":zhangsan,
             "mo":limei
       }
     },
    "brand" : "none",
    "version" : "11.4.2"
  }
}

The following sections show the parsing results for levels 0 to 4.

Level 0 parsing

The data is not parsed. The original JSON data is directly output.

JSON field

Value

Destination field name

__value__

{ "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }}

__value__

Level 1 parsing

The first level of the JSON data is parsed.

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device

{ "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }

device

Level 2 parsing

The second level of the JSON data is parsed. If a field is not nested, it is directly output. For example, the name and age fields are directly output. If a field is nested, its sub-fields are output. For example, the device field is nested, so its sub-fields device.os, device.brand, and device.version are output.

Important

Because destination field names cannot contain periods (.), periods are automatically replaced with underscores (_).

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os

{ "test":lag,"member":{ "fa":zhangsan,"mo":limei }}

device_os

device.brand

none

device_brand

device.version

11.4.2

device_version

Level 3 parsing

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member

{ "fa":zhangsan,"mo":limei }

device_os_member

device.brand

none

device_brand

device.version

11.4.2

device_version

Level 4 parsing

JSON field

Value

Destination field name

name

zhangle

name

age

18

age

device.os.test

lag

device_os_test

device.os.member.fa

zhangsan

device_os_member_fa

device.os.member.mo

lime

device_os_member_mo

device.brand

none

device_brand

device.version

11.4.2

device_version