AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS) data synchronization feature. You can use this feature to create a Kafka data link to ingest data from Kafka in real time starting from a specific offset. This feature supports near real-time data output, full historical data archiving, and elastic analytics. This topic describes how to add a Kafka data source, create and start a Kafka data link, and then analyze the data and manage the data source.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user.
An ApsaraMQ for Kafka (Kafka) instance is created. The instance is deployed in the same region as the AnalyticDB for MySQL cluster.
A Kafka topic is created and messages are sent to it. For more information, see Quick Start for ApsaraMQ for Kafka.
Notes
Only Kafka data in JSON format can be synchronized.
Data in a Kafka topic is automatically cleared after a specific period. If a data synchronization task fails and the topic data has expired, the cleared data cannot be retrieved when you restart the task. This can cause data loss. To prevent this, increase the data lifecycle of the topic. If a synchronization task fails, contact technical support promptly.
If the sample Kafka data is larger than 8 KB, the Kafka API truncates the data. This causes the system to fail to parse the sample data into JSON format, and field mapping information cannot be automatically generated.
Changes to the source Kafka table schema do not automatically trigger DDL changes in AnalyticDB for MySQL.
After data is ingested, a Commit operation must be executed to make the written data visible. To prevent a short Commit operation interval from affecting job stability and read and write performance, the data synchronization feature of AnalyticDB for MySQL has a default Commit operation interval of 5 minutes. Therefore, when you create and start a data synchronization job for the first time, you must wait at least 5 minutes to view the first batch of written data.
Billing
Using the AnalyticDB for MySQL data synchronization feature incurs the following fees.
Elastic ACU resource fees for AnalyticDB for MySQL. For more information, see Billing for Data Lakehouse Edition and Billing for Enterprise Edition and Basic Edition.
Storage fees, GET request fees, and PUT and other request fees for OSS. For more information, see Billing overview.
Procedure
Step 1: Create a data source.
Step 2: Create a data link.
Step 3: Start the data synchronization task.
Step 4: Analyze the data.
Step 5 (Optional): Manage the data source.
Create a data source
If you have already added a Kafka data source, skip this step and proceed to Create a data link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
In the navigation pane on the left, choose Data Ingestion > Data Sources.
In the upper-left corner, click Create Data Source.
On the Create Data Source page, configure the parameters. The following table describes the parameters.
Parameter
Description
Data Source Type
Select Kafka.
Data Source Name
The system automatically generates a name based on the data source type and the current time. You can change the name as needed.
Data Source Description
Enter a description for the data source, such as the data lakehouse scenario or business limits.
Deployment Mode
Only Alibaba Cloud Instance is supported.
Kafka Instance
The Kafka instance ID.
Log on to the ApsaraMQ for Kafka console and view the instance ID on the Instances page.
Kafka Topic
The name of the topic created in Kafka.
Log on to the ApsaraMQ for Kafka console and view the topic name on the Topics page of the destination instance.
Message Data Format
The data format of Kafka messages. Only JSON is supported.
After you configure the parameters, click Create.
Create a data link
In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.
In the upper-left corner, click Create Synchronization Job.
On the Create Synchronization Job page, configure the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings sections.
The following table describes the parameters for Source and Destination Settings.
Parameter
Description
Job Name
The name of the data link. The system automatically generates a name based on the data source type and the current time. You can change the name as needed.
Data Source
Select an existing Kafka data source or create a new one.
Destination Type
Valid values:
Data Lake - User OSS.
Data Lake - AnalyticDB Lake Storage (Recommended).
ImportantIf you set Destination type to Data Lake - AnalyticDB Lake Storage, you must enable the lake storage feature.
ADB Lake Storage
The name of the lake storage where the AnalyticDB for MySQL lake data is located.
Select a destination lake storage from the drop-down list. If no lake storage has been created, click Automatically Created in the drop-down list to automatically create one.
ImportantThis parameter is required when you set Destination Type to Data Lake - AnalyticDB Lake Storage.
OSS Path
The storage path in OSS for the AnalyticDB for MySQL lake data.
ImportantThis parameter is required when you set Destination Type to Data Lake - User OSS.
The buckets displayed are all buckets in the same region as the AnalyticDB for MySQL cluster. You can select any of them. Plan the storage path carefully. You cannot change it after creation.
Select an empty folder. The OSS path cannot have a prefix relationship with the OSS paths of other tasks to prevent data from being overwritten. For example, if the OSS paths for two data synchronization tasks are
oss://testBucketName/test/sls1/andoss://testBucketName/test/, they have a prefix relationship, which will cause data to be overwritten during data synchronization.
Storage Format
The data storage format. Valid values:
PAIMON.
ImportantThis format is supported only when Destination Type is set to Data Lake - User OSS.
ICEBERG.
The following table describes the parameters for Destination Database and Table Settings.
Parameter
Description
Database Name
The name of the destination database in AnalyticDB for MySQL. If a database with the same name does not exist, a new one is created. If a database with the same name exists, data is synchronized to the existing database. For more information about database naming conventions, see Limits.
ImportantIn the Source and Destination Settings section, if you set Storage Format to PAIMON, an existing database must meet the following conditions. Otherwise, the data synchronization task will fail.
It must be an external database. The statement to create the database must be
CREATE EXTERNAL DATABASE<database_name>.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
catalogproperty, and the value ofcatalogmust bepaimon.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
adb.paimon.warehouseproperty. For example:adb.paimon.warehouse=oss://testBucketName/aps/data.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
LOCATIONproperty, and you must add.dbafter the database name. Otherwise, XIHE queries will fail. For example:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.The bucket directory in the OSS path configured for
LOCATIONmust exist. Otherwise, creating the database will fail.
Table Name
The name of the destination table in AnalyticDB for MySQL. If a table with the same name does not exist in the database, a new one is created. If a table with the same name already exists, the data synchronization will fail. For more information about table naming conventions, see Limits.
Sample Data
The latest data is automatically retrieved from the Kafka topic and used as sample data.
NoteThe data in the Kafka topic must be in JSON format. If other data formats exist, an error will occur during data synchronization.
Parsed JSON Layers
Set the number of nested levels to parse in the JSON data. Valid values:
0: No parsing.
1 (Default): Parse one level.
2: Parse two levels.
3: Parse three levels.
4: Parse four levels.
For more information about the JSON nested parsing policy, see Example of JSON parsing levels and schema inference.
Schema Field Mapping
Displays the schema information of the sample data after JSON parsing. You can adjust the destination field names and types, or add or delete fields as needed.
Partition Key Settings
Set a partition key for the destination table. We recommend configuring partitions based on log time or business logic to ensure data ingestion and query performance. If you do not set a partition key, the destination table will not have partitions by default.
You can format the destination partition key using a time format or by specifying a partition field.
To partition by date and time, select a date-time field for the partition field name. For the format handling method, select Time Formatting, then select the source field format and the destination partition format. AnalyticDB for MySQL identifies the value of the partition field based on the source field format and converts it to the destination partition format for partitioning. For example, if the source field is gmt_created with a value of 1711358834, the source field format is a second-level precision timestamp, and the destination partition format is yyyyMMdd, the data will be partitioned by 20240325.
To partition by field value, select Specify Partition Field for the format handling method.
The following table describes the parameters for Synchronization Settings.
Parameter
Description
Starting Consumer Offset for Incremental Synchronization
When the sync task starts, it begins consuming Kafka data from the selected point in time. Valid values:
Earliest offset (begin_cursor): Automatically consume data from the earliest point in time in the Kafka data.
Latest offset (end_cursor): Automatically consume data from the latest point in time in the Kafka data.
Custom offset: You can select any point in time. The system will start consuming from the first piece of data in Kafka that is at or after this time.
Job Resource Group
Specify the Job resource group for the task to run in.
ACUs for Incremental Synchronization
Specify the number of ACUs for the Job resource group. The minimum number of ACUs is 2, and the maximum is the maximum available computing resources of the Job resource group. We recommend specifying a higher number of ACUs to improve data ingestion performance and task stability.
NoteWhen you create a data synchronization task, it uses elastic resources from the Job resource group. Data synchronization tasks occupy resources for a long time, so the system deducts the resources used by the task from the resource group. For example, if a Job resource group has a maximum of 48 ACUs and you have already created a sync task that uses 8 ACUs, the maximum number of ACUs you can select for another sync task in this resource group is 40.
Advanced Settings
Advanced configurations let you customize the sync task. To make custom configurations, contact technical support.
After you configure the parameters, click Submit.
Start the data synchronization task
On the Simple Log Service/Kafka Data Synchronization page, find the data synchronization task that you created and click Start in the Actions column.
In the upper-left corner, click Search. The task has started successfully when its status changes to Running.
Data analytics
After the data is synchronized, you can use the Spark Jar development feature to analyze the data in AnalyticDB for MySQL. For more information about Spark development, see Spark development editor and Offline Spark application development.
In the navigation pane on the left, choose .
Enter the sample statements in the default template and click Run Now.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;Optional: On the Applications tab, click Logs in the Actions column to view the run logs of the Spark SQL job.
Manage the data source
In the navigation pane on the left, choose Data Ingestion > Data Sources. You can perform the following operations in the Actions column.
Operation | Description |
Create Job | Quickly go to the page for creating a data synchronization or data migration task for this data source. |
View | View the detailed configuration of the data source. |
Edit | Edit the properties of the data source, such as its name and description. |
Delete | Delete the current data source. Note If a data synchronization or data migration task exists for the data source, you cannot delete the data source directly. You must first go to the Simple Log Service/Kafka Data Synchronization page, find the target sync task, and click Delete in the Actions column to delete the data synchronization or data migration task. |
Example of JSON parsing levels and schema inference
The parsing level specifies the number of nested levels to parse in the JSON data. For example, a user sends the following JSON data to Kafka.
{
"name" : "zhangle",
"age" : 18,
"device" : {
"os" : {
"test":lag,
"member":{
"fa":zhangsan,
"mo":limei
}
},
"brand" : "none",
"version" : "11.4.2"
}
}The following sections show the parsing results for levels 0 to 4.
Level 0 parsing
The data is not parsed. The original JSON data is directly output.
JSON field | Value | Destination field name |
__value__ | { "name" : "zhangle","age" : 18, "device" : { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" }} | __value__ |
Level 1 parsing
The first level of the JSON data is parsed.
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device | { "os" : { "test":lag,"member":{ "fa":zhangsan,"mo":limei }},"brand": "none","version" : "11.4.2" } | device |
Level 2 parsing
The second level of the JSON data is parsed. If a field is not nested, it is directly output. For example, the name and age fields are directly output. If a field is nested, its sub-fields are output. For example, the device field is nested, so its sub-fields device.os, device.brand, and device.version are output.
Because destination field names cannot contain periods (.), periods are automatically replaced with underscores (_).
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os | { "test":lag,"member":{ "fa":zhangsan,"mo":limei }} | device_os |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 3 parsing
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member | { "fa":zhangsan,"mo":limei } | device_os_member |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 4 parsing
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member.fa | zhangsan | device_os_member_fa |
device.os.member.mo | lime | device_os_member_mo |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |