AnalyticDB for MySQL supports creating Kafka data synchronization jobs to ingest data in real time from a specified offset in Kafka. This feature supports use cases such as near-real-time output, full historical data archiving, and elastic analytics. This topic describes how to add a Kafka data source, create and start a Kafka data synchronization job, analyze the synchronized data, and manage the data source.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user.
You have created an ApsaraMQ for Kafka (Kafka) instance in the same region as the AnalyticDB for MySQL cluster.
You have created a Kafka topic and sent messages. For more information, see Quick Start for Message Queue for Apache Kafka.
Notes
Only Kafka data in JSON format can be synchronized.
Data in a Kafka topic is automatically deleted after a specific period. If data in the topic expires and the data synchronization task fails, the expired data cannot be read when you restart the task. This can cause data loss. To prevent this issue, increase the data lifecycle of the topic and contact technical support promptly if the data synchronization task fails.
If the sample Kafka data is larger than 8 KB, the Kafka API truncates the data. This causes a failure when parsing the sample data, and the field mapping information cannot be automatically generated.
If the table schema of the Kafka source changes, DDL changes are not automatically triggered. In other words, the changes are not synchronized to AnalyticDB for MySQL.
After data is ingested, a Commit operation must be executed to make the written data visible. To prevent short commit intervals from affecting job stability and read/write performance, the AnalyticDB for MySQL data synchronization feature has a default commit interval of 5 minutes. Therefore, when you create and start a data synchronization task for the first time, you must wait at least 5 minutes before you can view the first batch of written data.
Billing overview
Using the AnalyticDB for MySQL data synchronization feature incurs the following fees.
For more information about the fees and billable items for ACU elastic resources in AnalyticDB for MySQL, see Billable items for the Data Lakehouse Edition and Billable items for the Enterprise and Basic editions.
Fees for OSS storage, GET and PUT requests, and other request types. For more information about billable items, see Billing overview.
Procedure
Step 1: Create a data source.
Step 2: Create a data link.
Step 3: Start the data synchronization task.
Step 4: Analyze data.
Step 5 (Optional): Manage the data source.
Create a data source
If you have already added a Kafka data source, you can skip this step and directly create a data link. For more information, see Create a data link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
In the navigation pane on the left, choose Data Ingestion > Data Source Management.
In the upper-right corner, click Create Data Source.
On the Create Data Source page, configure the parameters. The following table describes the parameters.
Parameter
Description
Data Source Type
Select Kafka.
Data Source Name
The system automatically generates a name based on the data source type and the current time. You can change the name as needed.
Data Source Description
Enter a description for the data source, such as the application scenario or business limits.
Deployment Mode
Only Alibaba Cloud Instance is supported.
Kafka Instance
The ID of the Kafka instance.
Log on to the ApsaraMQ for Kafka console and view the instance ID on the Instance List page.
Kafka Topic
The name of the topic created in Kafka.
Log on to the ApsaraMQ for Kafka console and view the topic name on the Topic Management page of the target instance.
Message Data Format
The data format of Kafka messages. Only JSON is supported.
After you configure the parameters, click Create.
Create a data link
In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.
In the upper-right corner, click Create Synchronization Job.
On the Create Synchronization Job page, configure the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings.
The following table describes the parameters in the Source and Destination Settings section.
Parameter
Description
Job Name
The name of the data link. The system automatically generates a name based on the data source type and the current time. You can change the name as needed.
Data Source
Select an existing Kafka data source or create one.
Destination Type
The following options are supported:
Data Lake - User OSS.
Data Lake - ADB Lake Storage (Recommended).
ImportantWhen you select Data Lake - ADB Lake Storage, you must enable the lake storage feature.
ADB Lake Storage
The name of the lake storage that contains the AnalyticDB for MySQL lake data.
Select a destination lake storage from the drop-down list. If no lake storage is created, click Automatic Creation in the drop-down list to automatically create one.
ImportantThis parameter is required when you set Destination Type to Data Lake - ADB Lake Storage.
OSS Path
AnalyticDB for MySQL lake data storage path in OSS.
ImportantThis parameter is required when you set Destination Type to Data Lake - User OSS.
The displayed buckets are all the buckets in the same region as the AnalyticDB for MySQL cluster. You can select any of them. Plan the storage path carefully as needed because it cannot be modified after it is created.
We recommend that you select an empty folder and ensure that its path does not have a prefix relationship with the OSS paths of other tasks. This prevents data from being overwritten. For example, if the OSS paths of two data synchronization tasks are
oss://testBucketName/test/sls1/andoss://testBucketName/test/, the paths have a prefix relationship, and data may be overwritten during data synchronization.
Storage Format
The data storage format. The following options are supported:
PAIMON.
ImportantThis format is supported only when Destination Type is set to Data Lake - User OSS.
ICEBERG.
The following table describes the parameters in the Destination Database and Table Settings section.
Parameter
Description
Database Name
The name of the database in AnalyticDB for MySQL to which data is synchronized. If no database with this name exists, a database is created. If a database with this name already exists, data is synchronized to the existing database. For more information about the database naming conventions, see Limits.
ImportantIn the Source and Destination Settings section, if you set Storage Format to PAIMON, the existing database must meet the following conditions. Otherwise, the data synchronization task fails:
The database must be an external database. The CREATE DATABASE statement must be
CREATE EXTERNAL DATABASE <Database name>.The DBPROPERTIES parameter in the CREATE DATABASE statement must contain the
catalogproperty, and the value ofcatalogmust bepaimon.The DBPROPERTIES parameter must contain the
adb.paimon.warehouseproperty. Example:adb.paimon.warehouse=oss://testBucketName/aps/data.The DBPROPERTIES parameter must contain the
LOCATIONproperty, and.dbmust be added to the end of the database name. Otherwise, queries in XIHE fail. Example:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.The bucket directory in the OSS path specified by
LOCATIONmust exist. Otherwise, the database fails to be created.
Table Name
The name of the table in AnalyticDB for MySQL to which data is synchronized. If no table with this name exists in the database, a table is created. If a table with this name already exists in the database, data fails to be synchronized. For the table naming convention, see Limits.
Sample Data
The latest data is automatically obtained from the Kafka topic and used as sample data.
NoteThe data in the Kafka topic must be in JSON format. If data in other formats exists, an error is reported during data synchronization.
JSON Parsing Level
The number of nested levels to parse in the JSON data. Valid values:
0: No parsing.
1 (Default): Parses one level.
2: Parses two levels.
3: Parses three levels.
4: Parses four levels.
For more information about the JSON nested parsing policy, see Example of JSON parsing levels and schema field inference.
Schema Field Mapping
The schema information of the sample data after JSON parsing. You can change the destination field names and types, or add or delete fields as needed.
Partition Key Settings
Set partition keys for the destination table. We recommend that you configure partitions based on log time or business logic to ensure data ingestion and query performance. If you do not set partition keys, the destination table has no partitions.
The format of the destination partition key can be processed using time formatting or by specifying a partition field.
To partition data by datetime, select a datetime field as the partition field. Then, set the Format Processing Method parameter to Formatted Time, and configure the Source Field Format and Destination Partition Format parameters. AnalyticDB for MySQL recognizes the value of the partition field based on the source field format and converts it to the destination partition format for partitioning. For example, if the source field is gmt_created whose value is 1711358834, the Source Field Format parameter is set to Timestamp Accurate to Seconds, and the Destination Partition Format parameter is set to yyyyMMdd, the value is partitioned based on 20240325.
To partition data by field value, select specify a partition field for the format processing method.
The following table describes the parameters in the Synchronization Settings section.
Parameter
Description
Starting Consumer Offset for Incremental Synchronization
The sync task starts consuming Kafka data from the specified point in time. Valid values:
Earliest offset (begin_cursor): Consumes data from the earliest available offset in the Kafka topic.
Latest offset (end_cursor): Consumes data starting from the latest offset in the Kafka topic.
Custom offset: You can select a point in time. The system starts consuming data from the first message whose timestamp is later than or equal to the specified time.
Job Resource Group
The Job resource group used to run the task.
ACUs for Incremental Synchronization
The number of ACUs to allocate from the Job resource group. The minimum value is 2. The maximum value is the maximum available computing resources of the Job resource group. We recommend that you specify a larger number of ACUs to improve data ingestion performance and task stability.
NoteWhen you create a data synchronization task, elastic resources from the Job resource group are used. The data synchronization task occupies resources for a long time. Therefore, the system deducts the resources occupied by the task from the resource group. For example, if the maximum computing resources of a Job resource group are 48 ACUs and you have created a sync task that uses 8 ACUs, the maximum number of ACUs you can select when creating another sync task in this resource group is 40.
Advanced Settings
Advanced configuration lets you customize the sync task. To customize the configuration, contact technical support.
After you configure the parameters, click Submit.
Start the data synchronization task
On the Simple Log Service/Kafka Data Synchronization page, find the data synchronization task that you created and click Start in the Actions column.
In the upper-right corner, click Query. The task is started when its status changes to Starting.
Analyze data
After the data synchronization task is running, you can use Spark Jar Development to analyze the data synchronized to AnalyticDB for MySQL. For more information about Spark development, see Spark development editor and Spark offline application development.
In the navigation pane on the left, choose .
In the default template, enter the sample statements and click Run Now.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;Optional: On the Application List tab, click Log in the Actions column to view the run logs of the Spark SQL job.
Manage the data source
On the Data Source Management page, you can perform the following operations in the Actions column.
Action | Description |
Create Job | Quickly go to the page for creating a data synchronization or data migration task for this data source. |
View | View the detailed configuration of the data source. |
Edit | Edit the properties of the data source, such as the name and description. |
Delete | Delete the current data source. Note If a data synchronization or data migration task exists for the data source, you cannot directly delete the data source. You must first go to the Simple Log Service/Kafka Data Synchronization page, find the target sync task, and click Delete in the Actions column to delete the task. |
Example of JSON parsing levels and schema field inference
The parsing level specifies the depth of parsing used to extract fields from the JSON data. Assume that a user sends the following JSON data to Kafka.
{
"name" : "zhangle",
"age" : 18,
"device" : {
"os" : {
"test": "lag",
"member": {
"fa": "zhangsan",
"mo": "limei"
}
},
"brand" : "none",
"version" : "11.4.2"
}
}The following sections show the parsing results for levels 0 to 4.
Level 0 parsing
No parsing is performed. The original JSON data is directly output.
JSON field | Value | Destination field name |
__value__ | { "name" : "zhangle","age" : 18, "device" : { "os" : { "test":"lag","member":{ "fa":"zhangsan","mo":"limei" }},"brand": "none","version" : "11.4.2" }} | __value__ |
Level 1 parsing
Parses the first-level fields of the JSON data.
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device | { "os" : { "test":"lag","member":{ "fa":"zhangsan","mo":"limei" }},"brand": "none","version" : "11.4.2" } | device |
Level 2 parsing
Parses the second-level fields of the JSON data. If a field is not nested, it is directly output. For example, the name and age fields are directly output. If a field is nested, its sub-level fields are output. For example, the device field is nested, so its sub-level fields device.os, device.brand, and device.version are output.
Because destination field names cannot contain periods (.), periods are automatically replaced with underscores (_).
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os | { "test":"lag","member":{ "fa":"zhangsan","mo":"limei" }} | device_os |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 3 parsing
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member | { "fa":"zhangsan","mo":"limei" } | device_os_member |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |
Level 4 parsing
JSON field | Value | Destination field name |
name | zhangle | name |
age | 18 | age |
device.os.test | lag | device_os_test |
device.os.member.fa | zhangsan | device_os_member_fa |
device.os.member.mo | lime | device_os_member_mo |
device.brand | none | device_brand |
device.version | 11.4.2 | device_version |