All Products
Search
Document Center

PolarDB:Automatically sync data from PolarDB for MySQL to PolarSearch

Last Updated:Feb 11, 2026

When you need to perform full-text search or complex analysis on business data in PolarDB for MySQL, running these operations directly on the database may affect the stability of your core business. The AutoETL feature provided by PolarDB automatically and continuously synchronizes data from read/write nodes to PolarSearch nodes within the same cluster, delivering an integrated data service. You can synchronize data without deploying or maintaining additional ETL tools and isolate search and analytics workloads from online transaction processing workloads.

Note

This feature is currently in a canary release. If you need this functionality, submit a ticket to contact us so we can enable it for you.

Feature overview

AutoETL is a built-in data synchronization capability of PolarDB for MySQL. It enables automatic data flow between different node types within the same cluster. The current version supports synchronizing data only from PolarDB for MySQL to PolarSearch nodes in the same cluster for high-performance search and analytics.

You can use the built-in DBMS_ETL toolkit in the database to create and manage data synchronization links directly with SQL commands. AutoETL provides three flexible data synchronization methods:

  • Single-table sync (dbms_etl.sync_by_table): Synchronize an entire source table to a destination index.

  • Multi-table aggregation (dbms_etl.sync_by_map): Aggregate multiple source tables using a JOIN operation and synchronize the result to a destination index.

  • Custom SQL (dbms_etl.sync_by_sql): Use Flink SQL-compatible syntax for complex data cleaning, transformation, and aggregation.

Applicability

Before using AutoETL, ensure your environment meets the following conditions:

  • Cluster version:

    • MySQL 8.0.1, with revision version 8.0.1.1.52 or later.

    • MySQL 8.0.2, with revision version 8.0.2.2.33 or later.

  • Synchronization direction: Supports synchronizing only from PolarDB for MySQL to PolarSearch nodes in the same cluster.

  • DDL limitations: When performing DDL operations on source tables that already have synchronization links, follow specific rules and practices to avoid synchronization interruption. Some incompatible changes require rebuilding the link. For details, see DDL change rules and best practices.

  • Data types: Does not support synchronizing the BIT type or spatial data types such as GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, and GEOMETRYCOLLECTION.

Create a sync link

Single-table sync

  1. Data preparation

    Run the following SQL statements in PolarDB for MySQL to create a sample database and table, and insert test data.

    CREATE DATABASE IF NOT EXISTS db1;
    USE db1;
    CREATE TABLE IF NOT EXISTS t1 (
        id INT PRIMARY KEY,
        c1 VARCHAR(100),
        c2 VARCHAR(100)
    );
    INSERT INTO t1(id, c1, c2) VALUES 
    (1, 'apple', 'red'),
    (2, 'banana', 'yellow'),
    (3, 'grape', 'purple');
    
  2. Create a synchronization link

    Use the dbms_etl.sync_by_table stored procedure to create a synchronization task from the db1.t1 table to the dest index on the PolarSearch node.

    Syntax

    call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");

    Parameter description

    Parameter

    Description

    search

    Sync target. Currently fixed as search, indicating a PolarSearch node.

    <source_table>

    Source table name in the format database.table.

    <sink_table>

    Destination index name in the PolarSearch node.

    <column_list>

    List of column names to sync, separated by commas (,). If set to an empty string (""), all columns from the source table are synced.

    Limits

    • The source table must have a primary key or unique key.

    • You cannot use the same source table or destination table in different synchronization links.

    • After creating a link, newly added columns in the source table are not synchronized by default. To synchronize new columns, rebuild the link.

    • If you want to use a custom index configuration, manually create the index and define its settings in the PolarSearch node before creating the synchronization link. If the destination index does not exist when the link is created, the system creates it automatically.

    Examples

    • Synchronize the entire db1.t1 table to the dest index in PolarSearch:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");
    • Synchronize only the c1 and c2 columns from the db1.t1 table to the dest index:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
  3. Verify data

    Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.

    # Replace <polarsearch_endpoint> with the PolarSearch node endpoint
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Multi-table aggregation

  1. Data preparation

    Run the following SQL statements in PolarDB for MySQL to create sample databases and tables, and insert test data.

    CREATE DATABASE IF NOT EXISTS db1;
    CREATE DATABASE IF NOT EXISTS db2;
    CREATE DATABASE IF NOT EXISTS db3;
    
    CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT);
    CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT);
    CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10));
    
    INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33);
    INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444);
    INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');
  2. Create a synchronization link

    Use the dbms_etl.sync_by_map stored procedure to join data from multiple tables and aggregate it into an index on a PolarSearch node.

    Syntax

    call dbms_etl.sync_by_map(
        "search",
        "<columns_map>", -- Mapping between destination index fields and source table fields
        "<join_fields>", -- Join keys between tables
        "<join_types>",  -- Join types (inner, left)
        "<filter>"       -- Data filter condition
    );

    Parameter description

    Parameter

    Format example

    Description

    columns_map

    dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)

    Mapping between destination index fields and source table fields.

    This example means: field c1 in destination index dest comes from db1.t1.c1, and field c2 comes from db2.t2.c2.

    join_fields

    dest.id=db1.t1.id,db2.t2.id

    Join keys between tables.

    This example indicates that the document ID of the target index (dest.id) is composed of db1.t1.id and db2.t2.id, and db1.t1.id and db2.t2.id also serve as join conditions.

    join_types

    inner,left

    Join types between tables. The order matches the appearance of tables in join_fields. This example means: t1 INNER JOIN t2, then the result LEFT JOIN t3.

    filter

    db1.t1.c1 > 10 AND db2.t2.c2 < 100

    A standard SQL WHERE clause to filter source table data before syncing.

    Limits

    • All source tables involved in the synchronization must have primary keys.

    • This feature uses stream processing and guarantees only eventual consistency during synchronization.

    • The update mode for the destination index is delete-then-insert. If you do not want queries to access intermediate states of deleted data, set the session variable before running the command: set sink_options = "'ignore-delete' = 'true'"; to ignore data deletion on the PolarSearch node.

    Examples

    • INNER JOIN two tables: Join db1.t1 and db2.t2 on the id field, and synchronize t1.c1 and t2.c2 to fields c1 and c2 in the dest index.

      call dbms_etl.sync_by_map(
        "search",
        "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)",
        "dest.id=db1.t1.id,db2.t2.id", 
        "inner",
         ""
      );
    • Mixed JOIN and filter across multiple tables: Join three tables—db1.t1, db2.t2, and db3.t3—where t1 and t2 use INNER JOIN, and t1 and t3 use LEFT JOIN. Filter data where t1.c1 > 10 and t2.c2 < 100.

      call dbms_etl.sync_by_map(
        "search", 
        "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2),dest.c3(db3.t3.c3)", 
        "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", 
        "inner,left", 
        "db1.t1.c1 > 10 and db2.t2.c2 < 100"
      );
  3. Verify data

    Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.

    # Replace <polarsearch_endpoint> with the PolarSearch node endpoint
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Custom SQL

  1. Create a synchronization link

    For scenarios requiring complex transformations, aggregations, or calculations, the dbms_etl.sync_by_sql stored procedure supports defining data synchronization logic using Flink SQL syntax.

    Important

    Security warning: Never hard code passwords in SQL statements. The following example shows syntax structure only. The WITH clause contains plaintext passwords, which poses a serious security risk. In production environments, always use secure credential management methods.

    Syntax

    call dbms_etl.sync_by_sql("search", "<sync_sql>");

    Example

    The system automatically replaces placeholders in the SQL: {mysql_host}, {mysql_port}, {mysql_user}, {mysql_password}, {search_host}, {search_port}, {search_user}, and {search_password}. Write your SQL using these fixed placeholders.

    CALL dbms_etl.sync_by_sql("search", "
    -- Step 1: Define PolarDB source table
    CREATE TEMPORARY TABLE `db1`.`sbtest1` (
      `id`   BIGINT,
      `k`    BIGINT,
      `c`    STRING,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '{mysql_host}',
      'port' = '{mysql_port}',
      'username' = '{mysql_user}',       -- Never use plaintext in production
      'password' = '{mysql_password}',   -- Never use plaintext in production
      'database-name' = 'db1',
      'table-name' = 'sbtest1'
    );
    
    -- Step 2: Define PolarSearch destination table
    CREATE TEMPORARY TABLE `dest` (
      `k`  BIGINT,
      `max_c` STRING,
      PRIMARY KEY (`k`) NOT ENFORCED
    ) WITH (
      'connector' = 'opensearch',
      'hosts' = '{search_host}:{search_port}',   
      'index' = 'dest',
      'username' = '{search_user}',     -- Never use plaintext in production
      'password' = '{search_password}'  -- Never use plaintext in production
    );
    
    -- Step 3: Define computation and insert logic
    INSERT INTO `dest`
    SELECT
        `t1`.`k`,
        MAX(`t1`.`c`)
    FROM `db1`.`sbtest1` AS `t1`
    GROUP BY `t1`.`k`;
    ");
  2. Verify data

    Connect to the PolarSearch node and use an Elasticsearch-compatible REST API to query and confirm that data has been synchronized.

    # Replace <polarsearch_endpoint> with the PolarSearch node endpoint
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Manage sync links

Use the following commands to view and delete existing synchronization links.

View links

  • View all links:

    call dbms_etl.show_sync_link();
  • View a specific link by ID: Replace <sync_id> with the ID returned in step two.

    call dbms_etl.show_sync_link_by_id('<sync_id>')\G

    Result explanation:

    *************************** 1. row ***************************
            SYNC_ID: crb5rmv8rttsg
               NAME: crb5rmv8rttsg
             SYSTEM: search
    SYNC_DEFINITION: db1.t1 -> dest
      SOURCE_TABLES: db1.t1
        SINK_TABLES: dest
             STATUS: active  -- Link status; active means running normally
            MESSAGE:         -- Error message appears here if an error occurs
         CREATED_AT: 2024-05-20 11:55:06
         UPDATED_AT: 2024-05-20 17:28:04
            OPTIONS: ...

Delete a link

Important

Deleting a synchronization link is an important operation. By default, this action also deletes the destination index and all its data in PolarSearch. Confirm carefully before proceeding.

This operation stops data synchronization and cleans up related resources.

call dbms_etl.drop_sync_link('<sync_id>');

System behavior varies depending on the link status when you run drop_sync_link:

  • Links in active status first change to dropping. After the system finishes cleaning up link resources and destination index data, the status changes to dropped.

  • Links in dropped status are permanently removed from the system.

  • Links in other statuses cannot be deleted.

DDL change rules and best practices

When performing DDL operations on source tables that already have synchronization links, follow different procedures based on the synchronization method and specific operation to ensure synchronization stability. Improper DDL operations may interrupt the synchronization link.

Single-table sync (sync_by_table) links

Note

Links created with sync_by_table do not support synchronizing only specified fields.

  • Add a column: To avoid synchronization interruption, first add the column to the destination index in PolarSearch, then run the ADD COLUMN operation on the source table.

    1. Add a field mapping to the destination index in PolarSearch. For example, add the age field to the demo index:

      PUT demo/_mapping
      {
        "properties": {
          "age": { "type": "integer" }
        }
      }
    2. Add the corresponding column to the source table:

      ALTER TABLE demo ADD COLUMN age INT;
  • Delete a column: After deleting a column from the source table, incremental writes synchronize normally, but historical data in PolarSearch retains the deleted column's values.

  • Modify a column type:

    • Compatible types: If the new type is compatible with the original (for example, changing from INT to TINYINT), modify the source table directly. Incremental data synchronizes normally.

      ALTER TABLE demo MODIFY COLUMN score TINYINT;
    • Incompatible types: If the types are incompatible, the synchronization link becomes unusable. Rebuild the link.

Multi-table aggregation (sync_by_map) and custom SQL (sync_by_sql) links

  • DDL on non-synchronized columns: Adding, deleting, or modifying columns not included in the synchronization does not affect the link. Incremental data synchronizes normally.

  • DDL on synchronized columns:

    • Add a column: Rebuild the link.

    • Delete a column: After deleting a synchronized column, incremental data synchronizes normally, but the deleted column's value becomes null in incremental data.

    • Modify a column type:

      • Compatible types: If the new type is compatible with the original, modify the source table directly. Incremental data synchronizes normally.

      • Incompatible types: If the types are incompatible, the synchronization link becomes unusable. Rebuild the link.

Best practices for rebuilding links

To minimize business impact, rebuild links using a "new index + new link" approach. After the new link finishes synchronizing and you verify the data, switch query traffic to the new index.

Example: Add a new column to the shop.user table and rebuild the link. Assume the original link synchronizes the shop.user table (with columns id, name, phone, gmt_create) to the user_v1 index. Now you need to add the membership_level column without affecting live queries.

  1. Create a new index: Create a new index named user_v2 with a mapping that includes the new membership_level field.

    PUT user_v2
    {
      "mappings": {
        "properties": {
          "id":               { "type": "keyword" },
          "name":             { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
          "phone":            { "type": "keyword" },
          "gmt_create":       { "type": "date" },
          "membership_level": { "type": "integer" }
        }
      }
    }
  2. Modify the source table: Add the new column to the user table in the source MySQL instance.

    ALTER TABLE user ADD COLUMN membership_level TINYINT NOT NULL DEFAULT 0 COMMENT 'Membership Level';
  3. Create a new synchronization link: Create a new synchronization link to synchronize data from the shop.user table to the user_v2 index.

  4. Verify and switch: After data synchronization completes, verify the data in the new index. Once confirmed, switch query traffic to the new user_v2 index.

  5. Clean up old resources: After confirming stable operation of the new link, delete the old synchronization link and the user_v1 index.

FAQ

How do I map PolarDB table columns to index fields in a PolarSearch node?

AutoETL provides two field mapping methods:

  • Implicit mapping (sync_by_table): When using sync_by_table, index field names in the PolarSearch node match column names in the PolarDB for MySQL source table by default. Use the <column_list> parameter to specify particular columns to create and synchronize.

  • Explicit mapping (sync_by_map): When renaming fields or aggregating multiple tables, use the sync_by_map <columns_map> parameter to explicitly define the mapping between destination fields and source table columns. For example, dest.title(db1.posts.post_title) maps the post_title column in the db1.posts table to the title field in the dest index.