AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS) feature, which lets you create a Kafka sync task to ingest data from Kafka in real time from a specified offset. This topic describes how to add a Kafka data source, create a Kafka sync task, and start the task.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user.
You have created an ApsaraMQ for Kafka (Kafka) instance and deployed it in the same VPC as the AnalyticDB for MySQL cluster.
You have created a Kafka topic and sent a message. For more information, see Message Queue for Apache Kafka Quick Start.
Notes
Only Kafka data in JSON format can be synchronized.
Data in a Kafka topic is automatically deleted after a specific period. If data in a topic expires and the data synchronization task fails, the deleted data cannot be retrieved when you restart the task. This can cause data loss. To prevent this, set a longer lifecycle for your topic data. If a sync task fails, contact technical support immediately.
If the sample Kafka data is larger than 8 KB, the Kafka API truncates the data. This causes a parsing failure and prevents the automatic generation of field mapping information.
If the table schema of the source Kafka topic changes, Data Definition Language (DDL) changes are not automatically triggered. This means the schema changes are not synchronized to AnalyticDB for MySQL.
Billing
For information about the fees for elastic AnalyticDB Compute Unit (ACU) resources for an AnalyticDB for MySQL cluster, see Billing items for Data Lakehouse Edition and Billing items for Enterprise and Basic Editions.
Procedure
Step 1: Create a data source
If you have already added a Kafka data source, you can skip this step and create a new synchronization link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
In the navigation pane on the left, choose Data Ingestion > Data Source Management.
In the upper-right corner, click Create Data Source.
On the Create Data Source page, configure the following parameters:
Parameter
Description
Data Source Type
Select Kafka as the data source type.
Data Source Name
The system generates a name based on the data source type and the current time. You can change the name as needed.
Data Source Description
A description of the data source, such as its application scenarios or business limitations.
Cloud provider
Only Alibaba Cloud instances are supported.
Kafka Instance
The ID of the Kafka instance.
Log on to the ApsaraMQ for Kafka console and view the instance ID on the Clusters page.
Kafka Topic
The name of the topic created in Kafka.
Log on to the ApsaraMQ for Kafka console and view the topic name on the Topic Management page of the target instance.
Message Data Format
The data format of Kafka messages. Only JSON is supported.
After you configure the parameters, click Create.
Step 2: Create a sync link
In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.
In the upper-left corner, click Create Synchronization Job, and then click the Kafka Data Source tab.
On the Create Synchronization Job page, configure the parameters for Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings.
The following table describes the parameters in the Source and Destination Settings section.
Parameter
Description
Job Name
The name of the data link. The system generates a name based on the data source type and the current time. You can change the name as needed.
Data Source
Select an existing Kafka data source or create a new one.
Data Source Format
Only JSON is supported.
Destination Type
Select: Data Warehouse - AnalyticDB for MySQL Storage.
AnalyticDB for MySQL Account
The database account for an AnalyticDB for MySQL cluster.
AnalyticDB for MySQL Password
AnalyticDB for MySQL cluster database account password.
The following table describes the parameters in the Destination Database and Table Settings section.
Parameter
Description
Database Name
The database name of the AnalyticDB for MySQL cluster.
Table Name
The table name of the AnalyticDB for MySQL cluster.
Sample Data
The latest data is automatically retrieved from the Kafka topic and used as sample data.
NoteThe data in the Kafka topic must be in JSON format. If data in other formats exists, an error occurs during data synchronization.
Parsed JSON Layers
The number of nested layers to parse for JSON data. Valid values:
0: No parsing.
1 (Default): Parses one layer.
2: Parses two layers.
3: Parses three layers.
4: Parses four layers.
For more information about the JSON nested parsing policy, see Use the data synchronization feature (APS) to synchronize Kafka data (Recommended).
Schema Field Mapping
Displays the schema information of the sample data after JSON parsing. You can modify the destination field names and types, or add or remove fields as needed.
The following table describes the parameters in the Synchronization Settings section.
Parameter
Description
Start Offset
When the sync task starts, it consumes Kafka data from the selected point in time. You can select any point in time, and the system starts consuming from the first data record in Kafka that is at or after that time.
Dirty Data Processing Mode
During data synchronization, if the data type of a field in the destination table does not match the actual data type of the source Kafka data, the synchronization fails. For example, if the source data is
abcand the field type in the destination table isint, a conversion error occurs, causing the synchronization to become abnormal.The dirty data processing mode can be set to one of the following values:
Stop Synchronization (Default): The data synchronization stops. You must modify the field type in the destination table or change the dirty data processing mode, and then restart the sync task.
Process as NULL: The field with dirty data is written to the destination table as a NULL value.
For example, if a row of Kafka data has three fields (col1, col2, and col3) and the col2 field contains dirty data, the data for the col2 field is converted to NULL and written to the table. The data for the col1 and col3 fields is written normally.
Job Resource Group
Specify the Job-specific resource group for the task to run.
ACUs for Incremental Synchronization
The number of ACUs for the Job-specific resource group where the task runs. The minimum value is 2 ACUs. The maximum value is the maximum available computing resources of the Job-specific resource group. We recommend that you increase the number of ACUs to improve data ingestion performance and task stability.
NoteWhen you create a data sync task, it uses the elastic resources from the Job-specific resource group. A data sync task occupies resources for a long time. Therefore, the system deducts the resources occupied by the task from the resource group. For example, if the maximum computing resources of a Job-specific resource group are 48 ACUs and you have created a sync task that uses 8 ACUs, the maximum number of ACUs you can select for another sync task in this resource group is 40.
Add To Whitelist
You need to add the Kafka vSwitch CIDR block to the whitelist of the AnalyticDB for MySQL cluster to establish a network connection for data synchronization.
After you configure the parameters, click Submit.
Step 3: Start the data sync task
On the Simple Log Service/Kafka Data Synchronization page, find the data sync task that you created and in the Actions column, click Start.
In the upper-left corner, click Search. The task status changes to Running, which indicates that the data sync task has started successfully.