Performing full-text searches or complex analysis directly on your PolarDB for MySQL database can affect the stability of your core business. PolarDB provides the AutoETL feature to automatically and continuously synchronize data from read/write nodes to PolarSearch nodes in the same cluster. This feature provides a one-stop data service that lets you synchronize data without deploying or maintaining separate extract, transform, and load (ETL) tools. This process isolates search and analysis workloads from online transaction processing workloads.
This feature is in canary release. To use this feature, submit a ticket to enable it.
Feature overview
AutoETL is a built-in data synchronization feature in PolarDB for MySQL. It allows data to flow automatically between different types of nodes within a cluster. Currently, this feature only supports data synchronization from PolarDB for MySQL to PolarSearch nodes in the same cluster for high-performance search and analysis.
You can use the built-in DBMS_ETL toolkit to create and manage data synchronization links directly with SQL commands. AutoETL provides three flexible data synchronization methods:
Single-table synchronization (
dbms_etl.sync_by_table): Synchronizes an entire source table to a destination index.Multi-table aggregation (
dbms_etl.sync_by_map): Aggregates multiple source tables usingJOINoperations and synchronizes the result to a destination index.Custom SQL (
dbms_etl.sync_by_sql): Uses Flink SQL-compatible syntax for complex data cleansing, transformation, and aggregation.
Applicability
Before you use AutoETL, ensure that your environment meets the following conditions:
Cluster version: MySQL 8.0.1, with revision 8.0.1.1.52 or later.
Synchronization direction: Only from PolarDB for MySQL to PolarSearch nodes in the same cluster.
DDL limits: Do not perform Data Definition Language (DDL) operations on a source table that has a synchronization link. To modify the table, you must recreate the synchronization link.
Data types: Synchronization of the
BITtype and spatial data types such asGEOMETRY,POINT,LINESTRING,POLYGON,MULTIPOINT,MULTILINESTRING,MULTIPOLYGON, andGEOMETRYCOLLECTIONis not currently supported.
Create a synchronization link
Single-table synchronization
Prepare the data
In PolarDB for MySQL, you can run the following SQL statements to create a sample database and table, and then insert test data.
CREATE DATABASE IF NOT EXISTS db1; USE db1; CREATE TABLE IF NOT EXISTS t1 ( id INT PRIMARY KEY, c1 VARCHAR(100), c2 VARCHAR(100) ); INSERT INTO t1(id, c1, c2) VALUES (1, 'apple', 'red'), (2, 'banana', 'yellow'), (3, 'grape', 'purple');Create the synchronization link
You can use the
dbms_etl.sync_by_tablestored procedure to create a sync task from thedb1.t1table to thedestindex on the PolarSearch node.Syntax
call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");Parameters
Parameter
Description
searchThe synchronization destination. This is currently fixed as
search, which indicates the PolarSearch node.<source_table>The source table name, in
database_name.table_nameformat.<sink_table>The name of the destination index on the PolarSearch node.
<column_list>A list of column names to synchronize, separated by commas (
,). If this is an empty string (""), all columns from the source table are synchronized.Limits
The source table must have a primary key or a unique key.
Do not use the same source or destination table in different synchronization links.
After a link is created, new columns that are added to the source table are not automatically synchronized. To synchronize the new columns, you must recreate the link.
To use a custom configuration for the destination index, you must first create the index and define its configuration on the PolarSearch node. Then, you can create the synchronization link. If the destination index does not exist when you create the link, the system automatically creates it.
Examples
Synchronize the entire
db1.t1table to thedestindex in PolarSearch:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");Synchronize the
c1andc2columns of thedb1.t1table to thedestindex:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
Verify the data
Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.
# Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Multi-table aggregation
Prepare the data
In PolarDB for MySQL, you can run the following SQL statements to create sample databases and tables, and then insert test data.
CREATE DATABASE IF NOT EXISTS db1; CREATE DATABASE IF NOT EXISTS db2; CREATE DATABASE IF NOT EXISTS db3; CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT); CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT); CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10)); INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33); INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444); INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');Create the synchronization link
You can use the
dbms_etl.sync_by_mapstored procedure to join data from multiple tables and aggregate the data into an index on a PolarSearch node.Syntax
call dbms_etl.sync_by_map( "search", "<columns_map>", -- Mapping of destination index fields to source table fields "<join_fields>", -- Join keys between tables "<join_types>", -- Join types (inner, left) "<filter>" -- Data filter condition );Parameters
Parameter
Format example
Description
columns_mapdest.c1(db1.t1.c1),dest.c2(db2.t2.c2)The mapping of destination index fields to source table fields.
The example shows that the
c1field of thedestindex comes fromdb1.t1.c1, and thec2field comes fromdb2.t2.c2.join_fieldsdest.id=db1.t1.id,db2.t2.idThe join keys between tables.
In this example, the document ID of the target index (
dest.id) is composed ofdb1.t1.idanddb2.t2.id. Additionally,db1.t1.idanddb2.t2.idare also the join conditions.join_typesinner,leftThe join types between tables. The join order corresponds to the order of tables in
join_fields. The example showst1 INNER JOIN t2, and then the result isLEFT JOIN t3.filterdb1.t1.c1 > 10 AND db2.t2.c2 < 100A standard SQL
WHEREclause used to filter source table data before synchronization.Limits
All source tables involved in the synchronization must have a primary key.
This feature uses stream processing. Therefore, only eventual consistency is guaranteed during synchronization.
The update mode for the destination index is delete then insert. If you do not want to access the intermediate state of deleted data during queries, you can set the session variable
set sink_options = "'ignore-delete' = 'true'";before you run the command. This setting ignores the data deletion option for the PolarSearch node.
Examples
Perform an
INNER JOINoperation on thedb1.t1anddb2.t2tables using theidfield, and then synchronizet1.c1andt2.c2to thec1andc2fields of thedestindex.call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)", "dest.id=db1.t1.id,db2.t2.id", "inner", "" );Mixed
JOINand filtering on multiple tables: The three tablesdb1.t1,db2.t2, anddb3.t3are joined. AnINNER JOINoperation is performed betweent1andt2, and aLEFT JOINoperation is performed betweent1andt3. The data is then filtered using the conditionst1.c1 > 10andt2.c2 < 100.call dbms_etl.sync_by_map( "search", "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", "inner,left", "db1.t1.c1 > 10 and db2.t2.c2 < 100" );
Verify the data
Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.
# Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Custom SQL
Create the synchronization link
For scenarios that require complex transformations, aggregations, or calculations, you can use the
dbms_etl.sync_by_sqlstored procedure. This procedure supports Flink SQL syntax to define the data synchronization logic.ImportantSecurity warning: Do not hard-code passwords in SQL statements. The following example demonstrates only the syntax structure. Its
WITHclause contains a plaintext password, which poses a major security risk. In a production environment, you must use a more secure method to manage credentials.Syntax
call dbms_etl.sync_by_sql("search", "<sync_sql>");Example
call dbms_etl.sync_by_sql("search", " -- Step 1: Define the PolarDB source table CREATE TEMPORARY TABLE `db1`.`sbtest1` ( `id` BIGINT, `k` BIGINT, `c` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = 'xxxxxxx', -- Enter the PolarDB cluster endpoint 'port' = '3306', 'username' = 'xxx', -- Do not use plaintext in a production environment 'password' = 'xxx', -- Do not use plaintext in a production environment 'database-name' = 'db1', 'table-name' = 'sbtest1' ); -- Step 2: Define the PolarSearch destination table CREATE TEMPORARY TABLE `dest` ( `k` BIGINT, `max_c` STRING, PRIMARY KEY (`k`) NOT ENFORCED ) WITH ( 'connector' = 'opensearch', 'hosts' = 'xxxxxx:xxxx', -- Enter the PolarSearch endpoint 'index' = 'dest', 'username' = 'xxx', -- Do not use plaintext in a production environment 'password' = 'xxx' -- Do not use plaintext in a production environment ); -- Step 3: Define the calculation and insertion logic INSERT INTO `dest` SELECT `t1`.`k`, MAX(`t1`.`c`) FROM `db1`.`sbtest1` AS `t1` GROUP BY `t1`.`k`; ");Verify the data
Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.
# Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Manage synchronization links
You can use the following commands to view and delete synchronization links.
View links
View all links:
call dbms_etl.show_sync_link();View a specific link by ID: Replace
<sync_id>with the ID that was returned when you created the link.call dbms_etl.show_sync_link_by_id('<sync_id>')\GDescription of the returned results:
*************************** 1. row *************************** SYNC_ID: crb5rmv8rttsg NAME: crb5rmv8rttsg SYSTEM: search SYNC_DEFINITION: db1.t1 -> dest SOURCE_TABLES: db1.t1 SINK_TABLES: dest STATUS: active -- The status of the link. active indicates that it is running normally. MESSAGE: -- If an error occurs, the error message is displayed here. CREATED_AT: 2024-05-20 11:55:06 UPDATED_AT: 2024-05-20 17:28:04 OPTIONS: ...
Delete a link
Deleting a synchronization link is a high-risk operation. By default, this operation also deletes the destination index and all its data in PolarSearch. You must confirm the operation before you proceed.
This operation stops data synchronization and cleans up the related resources.
call dbms_etl.drop_sync_link('<sync_id>');When you run the drop_sync_link command to delete links in different states, the system's processing logic differs:
For links in the
activestate, the status first changes todropping. After the system finishes cleaning up the link resources and destination index data, the status changes todropped.For links in the
droppedstate, the system completely purges the link's information.For links in other states, the delete operation is not supported.