When you need to perform full-text search or complex analysis on business data in PolarDB for MySQL, running these operations directly on the database may affect the stability of your core business. The AutoETL feature provided by PolarDB automatically and continuously synchronizes data from read/write nodes to PolarSearch nodes within the same cluster, delivering an integrated data service. You can synchronize data without deploying or maintaining additional ETL tools and isolate search and analytics workloads from online transaction processing workloads.
This feature is currently in a canary release. If you need this functionality, submit a ticket to contact us so we can enable it for you.
Feature overview
AutoETL is a built-in data synchronization capability of PolarDB for MySQL. It enables automatic data flow between different node types within the same cluster. The current version supports synchronizing data only from PolarDB for MySQL to PolarSearch nodes in the same cluster for high-performance search and analytics.
You can use the built-in DBMS_ETL toolkit in the database to create and manage data synchronization links directly with SQL commands. AutoETL provides three flexible data synchronization methods:
Single-table sync (
dbms_etl.sync_by_table): Synchronize an entire source table to a destination index.Multi-table aggregation (
dbms_etl.sync_by_map): Aggregate multiple source tables using aJOINoperation and synchronize the result to a destination index.Custom SQL (
dbms_etl.sync_by_sql): Use Flink SQL-compatible syntax for complex data cleaning, transformation, and aggregation.
Applicability
Before using AutoETL, ensure your environment meets the following conditions:
Cluster version:
MySQL 8.0.1, with revision version 8.0.1.1.52 or later.
MySQL 8.0.2, with revision version 8.0.2.2.33 or later.
Synchronization direction: Supports synchronizing only from PolarDB for MySQL to PolarSearch nodes in the same cluster.
DDL limitations: When performing DDL operations on source tables that already have synchronization links, follow specific rules and practices to avoid synchronization interruption. Some incompatible changes require rebuilding the link. For details, see DDL change rules and best practices.
Data types: Does not support synchronizing the
BITtype or spatial data types such asGEOMETRY,POINT,LINESTRING,POLYGON,MULTIPOINT,MULTILINESTRING,MULTIPOLYGON, andGEOMETRYCOLLECTION.
Create a sync link
Single-table sync
Data preparation
Run the following SQL statements in PolarDB for MySQL to create a sample database and table, and insert test data.
CREATE DATABASE IF NOT EXISTS db1; USE db1; CREATE TABLE IF NOT EXISTS t1 ( id INT PRIMARY KEY, c1 VARCHAR(100), c2 VARCHAR(100) ); INSERT INTO t1(id, c1, c2) VALUES (1, 'apple', 'red'), (2, 'banana', 'yellow'), (3, 'grape', 'purple');Create a synchronization link
Use the
dbms_etl.sync_by_tablestored procedure to create a synchronization task from thedb1.t1table to thedestindex on the PolarSearch node.Syntax
call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");Parameter description
Parameter
Description
searchSync target. Currently fixed as
search, indicating a PolarSearch node.<source_table>Source table name in the format
database.table.<sink_table>Destination index name in the PolarSearch node.
<column_list>List of column names to sync, separated by commas (
,). If set to an empty string (""), all columns from the source table are synced.Limits
The source table must have a primary key or unique key.
You cannot use the same source table or destination table in different synchronization links.
After creating a link, newly added columns in the source table are not synchronized by default. To synchronize new columns, rebuild the link.
If you want to use a custom index configuration, manually create the index and define its settings in the PolarSearch node before creating the synchronization link. If the destination index does not exist when the link is created, the system creates it automatically.
Examples
Synchronize the entire
db1.t1table to thedestindex in PolarSearch:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");Synchronize only the
c1andc2columns from thedb1.t1table to thedestindex:call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
Verify data
Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.
# Replace <polarsearch_endpoint> with the PolarSearch node endpoint curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Multi-table aggregation
Data preparation
Run the following SQL statements in PolarDB for MySQL to create sample databases and tables, and insert test data.
CREATE DATABASE IF NOT EXISTS db1; CREATE DATABASE IF NOT EXISTS db2; CREATE DATABASE IF NOT EXISTS db3; CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT); CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT); CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10)); INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33); INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444); INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');Create a synchronization link
Use the
dbms_etl.sync_by_mapstored procedure to join data from multiple tables and aggregate it into an index on a PolarSearch node.Syntax
call dbms_etl.sync_by_map( "search", "<columns_map>", -- Mapping between destination index fields and source table fields "<join_fields>", -- Join keys between tables "<join_types>", -- Join types (inner, left) "<filter>" -- Data filter condition );Parameter description
Parameter
Format example
Description
columns_mapdest.c1(db1.t1.c1),dest.c2(db2.t2.c2)Mapping between destination index fields and source table fields.
This example means: field
c1in destination indexdestcomes fromdb1.t1.c1, and fieldc2comes fromdb2.t2.c2.join_fieldsdest.id=db1.t1.id,db2.t2.idJoin keys between tables.
This example indicates that the document ID of the target index (
dest.id) is composed ofdb1.t1.idanddb2.t2.id, anddb1.t1.idanddb2.t2.idalso serve as join conditions.join_typesinner,leftJoin types between tables. The order matches the appearance of tables in
join_fields. This example means:t1 INNER JOIN t2, then the resultLEFT JOIN t3.filterdb1.t1.c1 > 10 AND db2.t2.c2 < 100A standard SQL
WHEREclause to filter source table data before syncing.Limits
All source tables involved in the synchronization must have primary keys.
This feature uses stream processing and guarantees only eventual consistency during synchronization.
The update mode for the destination index is delete-then-insert. If you do not want queries to access intermediate states of deleted data, set the session variable before running the command:
set sink_options = "'ignore-delete' = 'true'";to ignore data deletion on the PolarSearch node.
Examples
INNER JOINtwo tables: Joindb1.t1anddb2.t2on theidfield, and synchronizet1.c1andt2.c2to fieldsc1andc2in thedestindex.call dbms_etl.sync_by_map( "search", "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)", "dest.id=db1.t1.id,db2.t2.id", "inner", "" );Mixed
JOINand filter across multiple tables: Join three tables—db1.t1,db2.t2, anddb3.t3—wheret1andt2useINNER JOIN, andt1andt3useLEFT JOIN. Filter data wheret1.c1 > 10andt2.c2 < 100.call dbms_etl.sync_by_map( "search", "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2),dest.c3(db3.t3.c3)", "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", "inner,left", "db1.t1.c1 > 10 and db2.t2.c2 < 100" );
Verify data
Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.
# Replace <polarsearch_endpoint> with the PolarSearch node endpoint curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Custom SQL
Create a synchronization link
For scenarios requiring complex transformations, aggregations, or calculations, the
dbms_etl.sync_by_sqlstored procedure supports defining data synchronization logic using Flink SQL syntax.ImportantSecurity warning: Never hard code passwords in SQL statements. The following example shows syntax structure only. The
WITHclause contains plaintext passwords, which poses a serious security risk. In production environments, always use secure credential management methods.Syntax
call dbms_etl.sync_by_sql("search", "<sync_sql>");Example
The system automatically replaces placeholders in the SQL:
{mysql_host},{mysql_port},{mysql_user},{mysql_password},{search_host},{search_port},{search_user}, and{search_password}. Write your SQL using these fixed placeholders.CALL dbms_etl.sync_by_sql("search", " -- Step 1: Define PolarDB source table CREATE TEMPORARY TABLE `db1`.`sbtest1` ( `id` BIGINT, `k` BIGINT, `c` STRING, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'mysql', 'hostname' = '{mysql_host}', 'port' = '{mysql_port}', 'username' = '{mysql_user}', -- Never use plaintext in production 'password' = '{mysql_password}', -- Never use plaintext in production 'database-name' = 'db1', 'table-name' = 'sbtest1' ); -- Step 2: Define PolarSearch destination table CREATE TEMPORARY TABLE `dest` ( `k` BIGINT, `max_c` STRING, PRIMARY KEY (`k`) NOT ENFORCED ) WITH ( 'connector' = 'opensearch', 'hosts' = '{search_host}:{search_port}', 'index' = 'dest', 'username' = '{search_user}', -- Never use plaintext in production 'password' = '{search_password}' -- Never use plaintext in production ); -- Step 3: Define computation and insert logic INSERT INTO `dest` SELECT `t1`.`k`, MAX(`t1`.`c`) FROM `db1`.`sbtest1` AS `t1` GROUP BY `t1`.`k`; ");Verify data
Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.
# Replace <polarsearch_endpoint> with the PolarSearch node endpoint curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"
Manage sync links
Use the following commands to view and delete existing synchronization links.
View links
View all links:
call dbms_etl.show_sync_link();View a specific link by ID: Replace
<sync_id>with the ID returned in step two.call dbms_etl.show_sync_link_by_id('<sync_id>')\GResult explanation:
*************************** 1. row *************************** SYNC_ID: crb5rmv8rttsg NAME: crb5rmv8rttsg SYSTEM: search SYNC_DEFINITION: db1.t1 -> dest SOURCE_TABLES: db1.t1 SINK_TABLES: dest STATUS: active -- Link status; active means running normally MESSAGE: -- Error message appears here if an error occurs CREATED_AT: 2024-05-20 11:55:06 UPDATED_AT: 2024-05-20 17:28:04 OPTIONS: ...
Delete a link
Deleting a synchronization link is an important operation. By default, this action also deletes the destination index and all its data in PolarSearch. Confirm carefully before proceeding.
This operation stops data synchronization and cleans up related resources.
call dbms_etl.drop_sync_link('<sync_id>');System behavior varies depending on the link status when you run drop_sync_link:
Links in
activestatus first change todropping. After the system finishes cleaning up link resources and destination index data, the status changes todropped.Links in
droppedstatus are permanently removed from the system.Links in other statuses cannot be deleted.
DDL change rules and best practices
When performing DDL operations on source tables that already have synchronization links, follow different procedures based on the synchronization method and specific operation to ensure synchronization stability. Improper DDL operations may interrupt the synchronization link.
Single-table sync (sync_by_table) links
Links created with sync_by_table do not support synchronizing only specified fields.
Add a column: To avoid synchronization interruption, first add the column to the destination index in PolarSearch, then run the
ADD COLUMNoperation on the source table.Add a field mapping to the destination index in PolarSearch. For example, add the
agefield to thedemoindex:PUT demo/_mapping { "properties": { "age": { "type": "integer" } } }Add the corresponding column to the source table:
ALTER TABLE demo ADD COLUMN age INT;
Delete a column: After deleting a column from the source table, incremental writes synchronize normally, but historical data in PolarSearch retains the deleted column's values.
Modify a column type:
Compatible types: If the new type is compatible with the original (for example, changing from
INTtoTINYINT), modify the source table directly. Incremental data synchronizes normally.ALTER TABLE demo MODIFY COLUMN score TINYINT;Incompatible types: If the types are incompatible, the synchronization link becomes unusable. Rebuild the link.
Multi-table aggregation (sync_by_map) and custom SQL (sync_by_sql) links
DDL on non-synchronized columns: Adding, deleting, or modifying columns not included in the synchronization does not affect the link. Incremental data synchronizes normally.
DDL on synchronized columns:
Add a column: Rebuild the link.
Delete a column: After deleting a synchronized column, incremental data synchronizes normally, but the deleted column's value becomes
nullin incremental data.Modify a column type:
Compatible types: If the new type is compatible with the original, modify the source table directly. Incremental data synchronizes normally.
Incompatible types: If the types are incompatible, the synchronization link becomes unusable. Rebuild the link.
Best practices for rebuilding links
To minimize business impact, rebuild links using a "new index + new link" approach. After the new link finishes synchronizing and you verify the data, switch query traffic to the new index.
Example: Add a new column to the shop.user table and rebuild the link. Assume the original link synchronizes the shop.user table (with columns id, name, phone, gmt_create) to the user_v1 index. Now you need to add the membership_level column without affecting live queries.
Create a new index: Create a new index named
user_v2with a mapping that includes the newmembership_levelfield.PUT user_v2 { "mappings": { "properties": { "id": { "type": "keyword" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword" } } }, "phone": { "type": "keyword" }, "gmt_create": { "type": "date" }, "membership_level": { "type": "integer" } } } }Modify the source table: Add the new column to the
usertable in the source MySQL instance.ALTER TABLE user ADD COLUMN membership_level TINYINT NOT NULL DEFAULT 0 COMMENT 'Membership Level';Create a new synchronization link: Create a new synchronization link to synchronize data from the
shop.usertable to theuser_v2index.Verify and switch: After data synchronization completes, verify the data in the new index. Once confirmed, switch query traffic to the new
user_v2index.Clean up old resources: After confirming stable operation of the new link, delete the old synchronization link and the
user_v1index.