The AnalyticDB Pipeline Service (APS) data synchronization feature lets you ingest ApsaraMQ for Kafka messages into AnalyticDB for MySQL in near real time, starting from any offset you choose. The feature supports near real-time data output, full historical data archiving, and elastic analytics. After a synchronization task starts, data is committed every 5 minutes by default, so the first batch of ingested data is queryable after approximately 5 minutes.
Only JSON-formatted Kafka messages are supported.
Limitations
Only JSON-formatted Kafka messages are supported. Other formats cause synchronization errors.
Kafka table schema changes are not propagated to AnalyticDB for MySQL automatically. You must make DDL changes manually.
Sample Kafka data larger than 8 KB is truncated by the Kafka API, which prevents the system from parsing the sample and auto-generating field mappings.
Data written to AnalyticDB for MySQL is not visible until a Commit operation runs. The default Commit interval is 5 minutes, so wait at least 5 minutes after starting a task before querying the first batch of data.
OSS paths across synchronization tasks cannot share a prefix. For example,
oss://testBucketName/test/sls1/andoss://testBucketName/test/conflict and will cause data to be overwritten.If a synchronization task fails and the Kafka topic data has expired, the cleared data cannot be recovered when you restart the task. To reduce this risk, increase the topic's data retention period. If a task fails, contact technical support promptly.
Prerequisites
Before you begin, make sure you have:
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster
A database account for the cluster (see the table below)
An ApsaraMQ for Kafka instance in the same region as the AnalyticDB for MySQL cluster
A Kafka topic with messages already sent to it (see Quick start for ApsaraMQ for Kafka)
Database account requirements by account type:
| Account type | Required accounts | Additional steps |
|---|---|---|
| Alibaba Cloud account | Privileged account only | None |
| Resource Access Management (RAM) user | Privileged account + standard account | Associate the standard account with the RAM user |
Billing
Using the data synchronization feature incurs the following fees:
AnalyticDB for MySQL: Elastic ACU resource fees. See Billing for Data Lakehouse Edition and Billing for Enterprise Edition and Basic Edition.
Object Storage Service (OSS): Storage fees, GET request fees, and PUT and other request fees. See Billing overview.
How it works
Add a Kafka data source to identify the ApsaraMQ for Kafka instance and topic to read from.
Create a data link (synchronization job) that maps Kafka messages to an AnalyticDB for MySQL table, configures JSON parsing, partition keys, and the consumer offset.
Start the task. The task begins consuming Kafka data from the selected offset and writes it to the destination table.
Query the ingested data using Spark SQL.
Step 1: Create a data source
Skip this step if you have already added a Kafka data source. Go directly to Step 2: Create a data link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner, select a region. In the left-side navigation pane, click Clusters, then click the cluster ID.
In the left-side navigation pane, choose Data Ingestion > Data Sources.
Click Create Data Source.
On the Create Data Source page, configure the following parameters:
Parameter Description Data Source Type Select Kafka. Data Source Name Auto-generated from the source type and current time. Change it as needed. Data Source Description Enter a description, such as the business scenario or data scope. Deployment Mode Only Alibaba Cloud Instance is supported. Kafka Instance The Kafka instance ID. Find it on the Instances page of the ApsaraMQ for Kafka console. Kafka Topic The topic name. Find it on the Topics page of the destination instance in the ApsaraMQ for Kafka console. Message Data Format Only JSON is supported. Click Create.
Step 2: Create a data link
In the left-side navigation pane, click Simple Log Service/Kafka Data Synchronization.
Click Create Synchronization Job.
On the Create Synchronization Job page, configure the three sections below.
Source and destination settings
| Parameter | Description |
|---|---|
| Job Name | Auto-generated from the source type and current time. Change it as needed. |
| Data Source | Select an existing Kafka data source or create a new one. |
| Destination Type | Choose where synchronized data is stored. See Choose a destination type below. |
| ADB Lake Storage | The lake storage for AnalyticDB for MySQL lake data. Select from the drop-down list, or click Automatically Created to create one. Required when Destination Type is Data Lake - AnalyticDB Lake Storage. |
| OSS Path | The OSS storage path for AnalyticDB for MySQL lake data. Required when Destination Type is Data Lake - User OSS. Select an empty folder — the path cannot be changed after creation, and it must not share a prefix with any other synchronization task's OSS path. |
| Storage Format | PAIMON (only available when Destination Type is Data Lake - User OSS) or ICEBERG. |
Choose a destination type
| Destination type | When to use | Notes |
|---|---|---|
| Data Lake - AnalyticDB Lake Storage (recommended) | Standard setups where you want AnalyticDB for MySQL to manage storage | Enable the lake storage feature first. Only ICEBERG format is supported. |
| Data Lake - User OSS | When you need to bring your own OSS bucket | Both PAIMON and ICEBERG formats are supported. |
Destination database and table settings
| Parameter | Description |
|---|---|
| Database Name | The destination database in AnalyticDB for MySQL. A new database is created if one with this name does not exist. If one exists, data is synchronized to it. See Limits for naming conventions. If Storage Format is PAIMON, the database must meet the requirements listed below. |
| Table Name | The destination table in AnalyticDB for MySQL. A new table is created if one with this name does not exist. If a table with the same name already exists, the synchronization task fails. See Limits for naming conventions. |
| Sample Data | The latest data auto-retrieved from the Kafka topic. The data must be in JSON format. |
| Parsed JSON Layers | The number of nested JSON levels to parse. Valid values: 0 (no parsing), 1 (default), 2, 3, 4. See JSON parsing levels and schema inference. |
| Schema Field Mapping | The schema inferred from the sample data after parsing. Adjust destination field names and types, or add and remove fields as needed. |
| Partition Key Settings | (Optional) A partition key for the destination table. Partition by log time or business logic to improve ingestion and query performance. If left blank, the table has no partitions. |
PAIMON database requirements
If Storage Format is PAIMON, the destination database must meet all of the following conditions. If it does not, the synchronization task will fail.
It must be an external database created with
CREATE EXTERNAL DATABASE <database_name>.The
DBPROPERTIESparameter must includecatalog = paimon.The
DBPROPERTIESparameter must includeadb.paimon.warehouse. Example:adb.paimon.warehouse=oss://testBucketName/aps/data.The
DBPROPERTIESparameter must includeLOCATIONwith a.dbsuffix on the database name. Example:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/. The bucket directory in this path must already exist, and the path must include.dbafter the database name — otherwise XIHE queries will fail.
Synchronization settings
| Parameter | Description |
|---|---|
| Starting Consumer Offset for Incremental Synchronization | The point in Kafka from which the task begins consuming data. Earliest offset (begin_cursor): consume from the oldest available data. Latest offset (end_cursor): consume from the most recent data only. Custom offset: consume from the first message at or after a specific time you select. |
| Job Resource Group | The job resource group for the task to run in. |
| ACUs for Incremental Synchronization | The number of ACUs allocated from the job resource group. Minimum: 2 ACUs. Maximum: the remaining available ACUs in the resource group. A higher ACU count improves ingestion performance and task stability. |
| Advanced Settings | Custom configurations. Contact technical support to enable. |
ACU deduction example: If a job resource group has a maximum of 48 ACUs and an existing task already uses 8, the new task can use at most 40 ACUs.
Click Submit.
Step 3: Start the data synchronization task
On the Simple Log Service/Kafka Data Synchronization page, find the task you created and click Start in the Actions column.
Click Search. The task has started successfully when its status changes to Running.
The first batch of data is queryable at least 5 minutes after the task starts, because data is committed in 5-minute intervals by default.
Step 4: Analyze the data
After data is synchronized, use Spark SQL to query it in AnalyticDB for MySQL. For more information, see Spark development editor and Offline Spark application development.
In the left-side navigation pane, choose Job Development > Spark JAR Development.
Enter your Spark SQL statements in the default template and click Run Now. The following is an example:
-- Example of Spark SQL. Modify the content and run your Spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- SQL statements show tables from lakehouse20220413156_adbTest;(Optional) On the Applications tab, click Logs in the Actions column to view the Spark SQL run logs.
Step 5 (Optional): Manage the data source
Go to Data Ingestion > Data Sources. The following operations are available in the Actions column:
| Operation | Description |
|---|---|
| Create Job | Create a data synchronization or data migration task for this data source. |
| View | View the data source configuration. |
| Edit | Edit the data source name and description. |
| Delete | Delete the data source. If any synchronization or migration task exists for this data source, delete that task first on the Simple Log Service/Kafka Data Synchronization page. |
JSON parsing levels and schema inference
The Parsed JSON Layers setting controls how many nested levels of a JSON message are expanded into separate destination fields.
Example message:
{
"name" : "zhangle",
"age" : 18,
"device" : {
"os" : {
"test":lag,
"member":{
"fa":zhangsan,
"mo":limei
}
},
"brand" : "none",
"version" : "11.4.2"
}
}Periods (.) in field names are automatically replaced with underscores (_) in destination field names.
Level 0 — No parsing. The entire JSON message is output as a single field.
| JSON field | Value | Destination field name |
|---|---|---|
__value__ | {"name":"zhangle","age":18,"device":{...}} | __value__ |
Level 1 (default) — Top-level fields are expanded.
| JSON field | Value | Destination field name |
|---|---|---|
name | zhangle | name |
age | 18 | age |
device | {"os":{...},"brand":"none","version":"11.4.2"} | device |
Level 2 — Two levels expanded. Non-nested fields are output directly; nested fields expand to their sub-fields.
| JSON field | Value | Destination field name |
|---|---|---|
name | zhangle | name |
age | 18 | age |
device.os | {"test":"lag","member":{...}} | device_os |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 3
| JSON field | Value | Destination field name |
|---|---|---|
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member | {"fa":"zhangsan","mo":"limei"} | device_os_member |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 4
| JSON field | Value | Destination field name |
|---|---|---|
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member.fa | zhangsan | device_os_member_fa |
device.os.member.mo | lime | device_os_member_mo |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |