AnalyticDB for MySQL provides the AnalyticDB Pipeline Service (APS) feature. You can use this feature to create a Simple Log Service (SLS) data link to synchronize data in real time from a Logstore to an AnalyticDB for MySQL cluster, starting from a specified time offset. This feature meets requirements such as near real-time data output, full historical data archiving, and elastic analytics. This topic describes how to add an SLS data source, create an SLS data link, start a sync task, perform data analytics, and manage the data source.
Prerequisites
An AnalyticDB for MySQL Enterprise Edition, Basic Edition, or Data Lakehouse Edition cluster is created.
A job resource group is created for the AnalyticDB for MySQL cluster.
A database account is created for the AnalyticDB for MySQL cluster.
If you use an Alibaba Cloud account, you need to only create a privileged account.
If you use a Resource Access Management (RAM) user, you must create a privileged account and a standard account and associate the standard account with the RAM user.
You have activated Simple Log Service and created a project and a Logstore in the same region as your AnalyticDB for MySQL cluster. For more information, see Use LoongCollector to collect and analyze ECS text logs.
Notes
A table in an AnalyticDB for MySQL cluster can be synchronized with only one Logstore in Simple Log Service.
After data is ingested, a Commit operation must be executed to make the written data visible. To prevent a short Commit operation interval from affecting job stability and read and write performance, the data synchronization feature of AnalyticDB for MySQL has a default Commit operation interval of 5 minutes. Therefore, when you create and start a data synchronization job for the first time, you must wait at least 5 minutes to view the first batch of written data.
Billing information
Data migration to OSS using the data migration feature of AnalyticDB for MySQL incurs the following fees.
Elastic resource fees for AnalyticDB Compute Units (ACUs) in AnalyticDB for MySQL. For more information, see Data Lakehouse Edition billing items and Enterprise Edition and Basic Edition billing items.
Storage fees, GET requests, and other requests such as PUT for OSS. For more information, see Billing overview.
Procedure
Step 1 (Optional): Configure RAM authorization.
Step 2: Create a data source.
Step 3: Create a data link.
Step 4: Start the data sync task.
Step 5: Perform data analytics.
Step 6 (Optional): Manage the data source.
Configure RAM authorization
To synchronize SLS data to AnalyticDB for MySQL across different Alibaba Cloud accounts, you must create a Resource Access Management (RAM) role in the source account, grant precise permissions to the RAM role, and modify the trust policy of the RAM role. If you synchronize SLS data within the same account, skip this step and proceed to Create a data source.
Create a RAM role. For more information, see Create a RAM role for an Alibaba Cloud account.
NoteWhen configuring the Principal Name parameter, select Other Account and enter the ID of the Alibaba Cloud account that contains the AnalyticDB for MySQL cluster. You can log on to the Account Centerand view the Account ID on the Security Settings page.
Grant the AliyunAnalyticDBAccessingLogRolePolicy permission to the RAM role. For more information, see Grant precise permissions to a RAM role.
Modify the trust policy of the RAM role. For more information, see Modify the trust policy of a RAM role.
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "RAM": [ "acs:ram::<Alibaba Cloud account ID>:root" ], "Service": [ "<Alibaba Cloud account ID>@ads.aliyuncs.com" ] } } ], "Version": "1" }NoteThe Alibaba Cloud account ID is the ID of the Alibaba Cloud account that you specified in Step 1. Do not include the angle brackets (<>).
Create a data source
If you want to use an existing data source, skip this step and proceed to Create a data link.
Log on to the AnalyticDB for MySQL console. In the upper-left corner of the console, select a region. In the left-side navigation pane, click Clusters. Find the cluster that you want to manage and click the cluster ID.
In the navigation pane on the left, choose Data Ingestion > Data Sources.
In the upper-left corner, click Create Data Source.
On the Create Data Source page, configure the parameters. The following table describes the parameters.
Parameter
Description
Data Source Type
Select SLS.
Data Source Name
The system generates a name based on the data source type and the current time. You can change the name as needed.
Data Source Description
A description of the data source, such as the data lakehouse scenario or business limits.
Deployment Mode
Currently, only Alibaba Cloud Instance is supported.
Region of Simple Log Service Project
The region where the SLS project resides.
Across Alibaba Cloud Accounts
The SLS data source supports synchronizing data from another Alibaba Cloud account to AnalyticDB for MySQL.
No: Synchronize SLS data within the current account to AnalyticDB for MySQL.
Yes: Synchronize SLS data from another account to AnalyticDB for MySQL. If you select this option, you must specify Alibaba Cloud Account and RAM Role.
NoteAlibaba Cloud Account: The ID of the Alibaba Cloud account to which the source belongs.
RAM Role: The name of the RAM role created in the source account. This is the RAM role that you created in Step 1.
Simple Log Service Project
The source SLS project.
ImportantThe SLS Project list displays all projects under the Alibaba Cloud account and its RAM users. If you select a project that belongs to the Alibaba Cloud account, make sure that the RAM user has permissions on the project. Otherwise, data cannot be synchronized to AnalyticDB for MySQL.
Simple Log Service Logstore
The source SLS Logstore.
After you configure the parameters, click Create.
Create a data link
In the navigation pane on the left, click Simple Log Service/Kafka Data Synchronization.
In the upper-left corner, click Create Synchronization Job.
On the Create Synchronization Job page, configure the Source and Destination Settings, Destination Database and Table Settings, and Synchronization Settings parameters.
The following table describes the Source and Destination parameters.
Parameter
Description
Job Name
The name of the data link. The system generates a name based on the data source type and the current time. You can change the name as needed.
Data Source
Select an existing SLS data source or create a new one.
Destination Type
The following options are supported:
Data Lake - User OSS.
Data Lake - AnalyticDB Lake Storage (Recommended).
ImportantWhen you select Data Lake - AnalyticDB Lake Storage, you must enable the lake storage feature.
ADB Lake Storage
AnalyticDB for MySQLThe name of the lake storage where the lake data resides.
Select a destination lake storage from the drop-down list. If no lake storage is created, click Automatically Created in the drop-down list to automatically create one.
ImportantThis parameter is required only when Destination Type is set to Data Lake - AnalyticDB Lake Storage.
OSS Path
AnalyticDB for MySQLThe storage path in OSS for the data lakehouse data.
ImportantThis parameter is required when you set Destination Type to Data Lake - User OSS.
The buckets displayed are all buckets in the same region as the AnalyticDB for MySQL cluster. You can select any of them. Plan the storage path carefully. You cannot change it after creation.
Select an empty folder. The OSS path cannot have a prefix relationship with the OSS paths of other tasks to prevent data from being overwritten. For example, if the OSS paths for two data synchronization tasks are
oss://testBucketName/test/sls1/andoss://testBucketName/test/, they have a prefix relationship, which will cause data to be overwritten during data synchronization.
Storage Format
The data storage format. The following options are supported:
PAIMON.
ImportantThis format is supported only when Destination Type is set to Data Lake - User OSS.
ICEBERG.
The following table describes the Destination Database and Table Settings parameters.
Parameter
Description
Database Name
The name of the destination database in AnalyticDB for MySQL. If a database with the same name does not exist, a new database is created. If a database with the same name exists, data is synchronized to the existing database. For more information about naming conventions, see Limits.
ImportantIn the Source and Destination Settings section, if you set Storage Format to PAIMON, an existing database must meet the following conditions. Otherwise, the data synchronization task will fail.
It must be an external database. The statement to create the database must be
CREATE EXTERNAL DATABASE<database_name>.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
catalogproperty, and the value ofcatalogmust bepaimon.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
adb.paimon.warehouseproperty. For example:adb.paimon.warehouse=oss://testBucketName/aps/data.The `DBPROPERTIES` parameter in the `CREATE DATABASE` statement must include the
LOCATIONproperty, and you must add.dbafter the database name. Otherwise, XIHE queries will fail. For example:LOCATION=oss://testBucketName/aps/data/kafka_paimon_external_db.db/.The bucket directory in the OSS path configured for
LOCATIONmust exist. Otherwise, creating the database will fail.
Table Name
The name of the destination table in AnalyticDB for MySQL. If a table with the same name does not exist in the database, a new table is created. If a table with the same name already exists, the data synchronization fails. For more information about naming conventions, see Limits.
Schema Field Mapping
By default, fields are retrieved from the delivery task configuration of Simple Log Service. If no delivery task is configured for the Logstore, fields are retrieved from the latest log data by default.
Supported data types: BOOLEAN, INT, BIGINT, FLOAT, DOUBLE, and STRING.
Synchronization of SLS reserved fields is supported. For more information, see Reserved fields.
ImportantModifying destination field names is not supported.
If the task has been started (running or completed), you cannot modify existing columns, but you can add new columns. If the task is created but not started, you can modify the columns.
Partition Key Settings
Set a partition key for the destination table. We recommend configuring partitions based on log time or business logic to ensure data ingestion and query performance. If you do not set a partition key, the destination table will not have partitions by default.
You can format the destination partition key using a time format or by specifying a partition field.
To partition by date and time, select a date-time field for the partition field name. For the format handling method, select Time Formatting, then select the source field format and the destination partition format. AnalyticDB for MySQL identifies the value of the partition field based on the source field format and converts it to the destination partition format for partitioning. For example, if the source field is gmt_created with a value of 1711358834, the source field format is a second-level precision timestamp, and the destination partition format is yyyyMMdd, the data will be partitioned by 20240325.
To partition by field value, select Specify Partition Field for the format handling method.
The following table describes the Synchronization Settings parameters.
Parameter
Description
Starting Consumer Offset for Incremental Synchronization
When the sync task starts, it consumes SLS data from the selected point in time. Valid values:
Earliest offset (begin_cursor): Automatically consumes data from the earliest point in time in the SLS data.
Latest offset (end_cursor): Automatically retrieves data from the latest point in time in the SLS data.
Custom offset: You can select any point in time. The system starts consuming data from the first entry in SLS that is at or after this time.
Job Resource Group
Specify the Job resource group for the task to run.
ACUs for Incremental Synchronization
Specify the number of ACUs for the Job resource group. The minimum number of ACUs is 2, and the maximum is the maximum available computing resources of the Job resource group. We recommend specifying a higher number of ACUs to improve data ingestion performance and task stability.
NoteWhen you create a data synchronization task, it uses elastic resources from the Job resource group. Data synchronization tasks occupy resources for a long time, so the system deducts the resources used by the task from the resource group. For example, if a Job resource group has a maximum of 48 ACUs and you have already created a sync task that uses 8 ACUs, the maximum number of ACUs you can select for another sync task in this resource group is 40.
Advanced Settings
Advanced configuration lets you customize the sync task. To use custom configurations, contact technical support.
After you configure the parameters, click Submit.
Start the data synchronization task
On the Simple Log Service/Kafka Data Synchronization page, find the data synchronization task that you created and click Start in the Actions column.
In the upper-left corner, click Search. The task has started successfully when its status changes to Running.
Data analytics
After the data is synchronized, you can use the Spark Jar development feature to analyze the data in AnalyticDB for MySQL. For more information about Spark development, see Spark development editor and Offline Spark application development.
In the navigation pane on the left, choose .
Enter the sample statements in the default template and click Run Now.
-- Here is just an example of SparkSQL. Modify the content and run your spark program. conf spark.driver.resourceSpec=medium; conf spark.executor.instances=2; conf spark.executor.resourceSpec=medium; conf spark.app.name=Spark SQL Test; conf spark.adb.connectors=oss; -- Here are your sql statements show tables from lakehouse20220413156_adbTest;Optional: On the Applications tab, click Logs in the Actions column to view the run logs of the Spark SQL job.
Manage the data source
In the navigation pane on the left, choose Data Ingestion > Data Sources. You can perform the following operations in the Actions column.
Operation | Description |
Create Job | Quickly go to the page for creating a data synchronization or data migration task for this data source. |
View | View the detailed configuration of the data source. |
Edit | Edit the properties of the data source, such as its name and description. |
Delete | Delete the current data source. Note If a data synchronization or data migration task exists for the data source, you cannot delete the data source directly. You must first go to the Simple Log Service/Kafka Data Synchronization page, find the target sync task, and click Delete in the Actions column to delete the data synchronization or data migration task. |