All Products
Search
Document Center

AnalyticDB:Synchronize Kafka data to AnalyticDB for MySQL using data synchronization (APS)

Last Updated:Dec 02, 2025

AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS) feature, which lets you create a Kafka sync task to ingest data from Kafka in real time from a specified offset. This topic describes how to add a Kafka data source, create a Kafka sync task, and start the task.

Prerequisites

Notes

  • Only Kafka data in JSON format can be synchronized.

  • Data in a Kafka topic is automatically deleted after a specific period. If data in a topic expires and the data synchronization task fails, the deleted data cannot be retrieved when you restart the task. This can cause data loss. To prevent this, set a longer lifecycle for your topic data. If a sync task fails, contact technical support immediately.

  • If the sample Kafka data is larger than 8 KB, the Kafka API truncates the data. This causes a parsing failure and prevents the automatic generation of field mapping information.

  • If the table schema of the source Kafka topic changes, Data Definition Language (DDL) changes are not automatically triggered. This means the schema changes are not synchronized to AnalyticDB for MySQL.

Billing

For information about the fees for elastic AnalyticDB Compute Unit (ACU) resources for an AnalyticDB for MySQL cluster, see Billing items for Data Lakehouse Edition and Billing items for Enterprise and Basic Editions.

Procedure

Step 1: Create a data source

Note

If you have already added a Kafka data source, you can skip this step and create a new synchronization link.

  1. Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.

  2. In the navigation pane on the left, choose Data Ingestion > Data Source Management.

  3. In the upper-right corner, click Create Data Source.

  4. On the Create Data Source page, configure the following parameters:

    Parameter

    Description

    Data Source Type

    Select Kafka as the data source type.

    Data Source Name

    The system generates a name based on the data source type and the current time. You can change the name as needed.

    Data Source Description

    A description of the data source, such as its application scenarios or business limitations.

    Cloud provider

    Only Alibaba Cloud instances are supported.

    Kafka Instance

    The ID of the Kafka instance.

    Log on to the ApsaraMQ for Kafka console and view the instance ID on the Clusters page.

    Kafka Topic

    The name of the topic created in Kafka.

    Log on to the ApsaraMQ for Kafka console and view the topic name on the Topic Management page of the target instance.

    Message Data Format

    The data format of Kafka messages. Only JSON is supported.

  5. After you configure the parameters, click Create.

Step 2: Create a sync link

  1. In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.

  2. In the upper-left corner, click Create Synchronization Job, and then click the Kafka Data Source tab.

  3. On the Create Synchronization Job page, configure the parameters for Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings.

    • The following table describes the parameters in the Source and Destination Settings section.

      Parameter

      Description

      Job Name

      The name of the data link. The system generates a name based on the data source type and the current time. You can change the name as needed.

      Data Source

      Select an existing Kafka data source or create a new one.

      Data Source Format

      Only JSON is supported.

      Destination Type

      Select: Data Warehouse - AnalyticDB for MySQL Storage.

      AnalyticDB for MySQL Account

      The database account for an AnalyticDB for MySQL cluster.

      AnalyticDB for MySQL Password

      AnalyticDB for MySQL cluster database account password.

    • The following table describes the parameters in the Destination Database and Table Settings section.

      Parameter

      Description

      Database Name

      The database name of the AnalyticDB for MySQL cluster.

      Table Name

      The table name of the AnalyticDB for MySQL cluster.

      Sample Data

      The latest data is automatically retrieved from the Kafka topic and used as sample data.

      Note

      The data in the Kafka topic must be in JSON format. If data in other formats exists, an error occurs during data synchronization.

      Parsed JSON Layers

      The number of nested layers to parse for JSON data. Valid values:

      • 0: No parsing.

      • 1 (Default): Parses one layer.

      • 2: Parses two layers.

      • 3: Parses three layers.

      • 4: Parses four layers.

      For more information about the JSON nested parsing policy, see Use the data synchronization feature (APS) to synchronize Kafka data (Recommended).

      Schema Field Mapping

      Displays the schema information of the sample data after JSON parsing. You can modify the destination field names and types, or add or remove fields as needed.

    • The following table describes the parameters in the Synchronization Settings section.

      Parameter

      Description

      Start Offset

      When the sync task starts, it consumes Kafka data from the selected point in time. You can select any point in time, and the system starts consuming from the first data record in Kafka that is at or after that time.

      Dirty Data Processing Mode

      During data synchronization, if the data type of a field in the destination table does not match the actual data type of the source Kafka data, the synchronization fails. For example, if the source data is abc and the field type in the destination table is int, a conversion error occurs, causing the synchronization to become abnormal.

      The dirty data processing mode can be set to one of the following values:

      • Stop Synchronization (Default): The data synchronization stops. You must modify the field type in the destination table or change the dirty data processing mode, and then restart the sync task.

      • Process as NULL: The field with dirty data is written to the destination table as a NULL value.

      For example, if a row of Kafka data has three fields (col1, col2, and col3) and the col2 field contains dirty data, the data for the col2 field is converted to NULL and written to the table. The data for the col1 and col3 fields is written normally.

      Job Resource Group

      Specify the Job-specific resource group for the task to run.

      ACUs for Incremental Synchronization

      The number of ACUs for the Job-specific resource group where the task runs. The minimum value is 2 ACUs. The maximum value is the maximum available computing resources of the Job-specific resource group. We recommend that you increase the number of ACUs to improve data ingestion performance and task stability.

      Note

      When you create a data sync task, it uses the elastic resources from the Job-specific resource group. A data sync task occupies resources for a long time. Therefore, the system deducts the resources occupied by the task from the resource group. For example, if the maximum computing resources of a Job-specific resource group are 48 ACUs and you have created a sync task that uses 8 ACUs, the maximum number of ACUs you can select for another sync task in this resource group is 40.

      Add To Whitelist

      You need to add the Kafka vSwitch CIDR block to the whitelist of the AnalyticDB for MySQL cluster to establish a network connection for data synchronization.

  4. After you configure the parameters, click Submit.

Step 3: Start the data sync task

  1. On the Simple Log Service/Kafka Data Synchronization page, find the data sync task that you created and in the Actions column, click Start.

  2. In the upper-left corner, click Search. The task status changes to Running, which indicates that the data sync task has started successfully.