All Products
Search
Document Center

PolarDB:Automatically synchronize data from PolarDB for MySQL to PolarSearch

Last Updated:Dec 26, 2025

Performing full-text searches or complex analysis directly on your PolarDB for MySQL database can affect the stability of your core business. PolarDB provides the AutoETL feature to automatically and continuously synchronize data from read/write nodes to PolarSearch nodes in the same cluster. This feature provides a one-stop data service that lets you synchronize data without deploying or maintaining separate extract, transform, and load (ETL) tools. This process isolates search and analysis workloads from online transaction processing workloads.

Note

This feature is in canary release. To use this feature, submit a ticket to enable it.

Feature overview

AutoETL is a built-in data synchronization feature in PolarDB for MySQL. It allows data to flow automatically between different types of nodes within a cluster. Currently, this feature only supports data synchronization from PolarDB for MySQL to PolarSearch nodes in the same cluster for high-performance search and analysis.

You can use the built-in DBMS_ETL toolkit to create and manage data synchronization links directly with SQL commands. AutoETL provides three flexible data synchronization methods:

  • Single-table synchronization (dbms_etl.sync_by_table): Synchronizes an entire source table to a destination index.

  • Multi-table aggregation (dbms_etl.sync_by_map): Aggregates multiple source tables using JOIN operations and synchronizes the result to a destination index.

  • Custom SQL (dbms_etl.sync_by_sql): Uses Flink SQL-compatible syntax for complex data cleansing, transformation, and aggregation.

Applicability

Before you use AutoETL, ensure that your environment meets the following conditions:

  • Cluster version: MySQL 8.0.1, with revision 8.0.1.1.52 or later.

  • Synchronization direction: Only from PolarDB for MySQL to PolarSearch nodes in the same cluster.

  • DDL limits: Do not perform Data Definition Language (DDL) operations on a source table that has a synchronization link. To modify the table, you must recreate the synchronization link.

  • Data types: Synchronization of the BIT type and spatial data types such as GEOMETRY, POINT, LINESTRING, POLYGON, MULTIPOINT, MULTILINESTRING, MULTIPOLYGON, and GEOMETRYCOLLECTION is not currently supported.

Create a synchronization link

Single-table synchronization

  1. Prepare the data

    In PolarDB for MySQL, you can run the following SQL statements to create a sample database and table, and then insert test data.

    CREATE DATABASE IF NOT EXISTS db1;
    USE db1;
    CREATE TABLE IF NOT EXISTS t1 (
        id INT PRIMARY KEY,
        c1 VARCHAR(100),
        c2 VARCHAR(100)
    );
    INSERT INTO t1(id, c1, c2) VALUES 
    (1, 'apple', 'red'),
    (2, 'banana', 'yellow'),
    (3, 'grape', 'purple');
    
  2. Create the synchronization link

    You can use the dbms_etl.sync_by_table stored procedure to create a sync task from the db1.t1 table to the dest index on the PolarSearch node.

    Syntax

    call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");

    Parameters

    Parameter

    Description

    search

    The synchronization destination. This is currently fixed as search, which indicates the PolarSearch node.

    <source_table>

    The source table name, in database_name.table_name format.

    <sink_table>

    The name of the destination index on the PolarSearch node.

    <column_list>

    A list of column names to synchronize, separated by commas (,). If this is an empty string (""), all columns from the source table are synchronized.

    Limits

    • The source table must have a primary key or a unique key.

    • Do not use the same source or destination table in different synchronization links.

    • After a link is created, new columns that are added to the source table are not automatically synchronized. To synchronize the new columns, you must recreate the link.

    • To use a custom configuration for the destination index, you must first create the index and define its configuration on the PolarSearch node. Then, you can create the synchronization link. If the destination index does not exist when you create the link, the system automatically creates it.

    Examples

    • Synchronize the entire db1.t1 table to the dest index in PolarSearch:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");
    • Synchronize the c1 and c2 columns of the db1.t1 table to the dest index:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
  3. Verify the data

    Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.

    # Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Multi-table aggregation

  1. Prepare the data

    In PolarDB for MySQL, you can run the following SQL statements to create sample databases and tables, and then insert test data.

    CREATE DATABASE IF NOT EXISTS db1;
    CREATE DATABASE IF NOT EXISTS db2;
    CREATE DATABASE IF NOT EXISTS db3;
    
    CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT);
    CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT);
    CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10));
    
    INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33);
    INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444);
    INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');
  2. Create the synchronization link

    You can use the dbms_etl.sync_by_map stored procedure to join data from multiple tables and aggregate the data into an index on a PolarSearch node.

    Syntax

    call dbms_etl.sync_by_map(
        "search",
        "<columns_map>", -- Mapping of destination index fields to source table fields
        "<join_fields>", -- Join keys between tables
        "<join_types>",  -- Join types (inner, left)
        "<filter>"       -- Data filter condition
    );

    Parameters

    Parameter

    Format example

    Description

    columns_map

    dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)

    The mapping of destination index fields to source table fields.

    The example shows that the c1 field of the dest index comes from db1.t1.c1, and the c2 field comes from db2.t2.c2.

    join_fields

    dest.id=db1.t1.id,db2.t2.id

    The join keys between tables.

    In this example, the document ID of the target index (dest.id) is composed of db1.t1.id and db2.t2.id. Additionally, db1.t1.id and db2.t2.id are also the join conditions.

    join_types

    inner,left

    The join types between tables. The join order corresponds to the order of tables in join_fields. The example shows t1 INNER JOIN t2, and then the result is LEFT JOIN t3.

    filter

    db1.t1.c1 > 10 AND db2.t2.c2 < 100

    A standard SQL WHERE clause used to filter source table data before synchronization.

    Limits

    • All source tables involved in the synchronization must have a primary key.

    • This feature uses stream processing. Therefore, only eventual consistency is guaranteed during synchronization.

    • The update mode for the destination index is delete then insert. If you do not want to access the intermediate state of deleted data during queries, you can set the session variable set sink_options = "'ignore-delete' = 'true'"; before you run the command. This setting ignores the data deletion option for the PolarSearch node.

    Examples

    • Perform an INNER JOIN operation on the db1.t1 and db2.t2 tables using the id field, and then synchronize t1.c1 and t2.c2 to the c1 and c2 fields of the dest index.

      call dbms_etl.sync_by_map(
        "search",
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)",
        "dest.id=db1.t1.id,db2.t2.id", 
        "inner",
         ""
      );
    • Mixed JOIN and filtering on multiple tables: The three tables db1.t1, db2.t2, and db3.t3 are joined. An INNER JOIN operation is performed between t1 and t2, and a LEFT JOIN operation is performed between t1 and t3. The data is then filtered using the conditions t1.c1 > 10 and t2.c2 < 100.

      call dbms_etl.sync_by_map(
        "search", 
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", 
        "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", 
        "inner,left", 
        "db1.t1.c1 > 10 and db2.t2.c2 < 100"
      );
  3. Verify the data

    Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.

    # Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Custom SQL

  1. Create the synchronization link

    For scenarios that require complex transformations, aggregations, or calculations, you can use the dbms_etl.sync_by_sql stored procedure. This procedure supports Flink SQL syntax to define the data synchronization logic.

    Important

    Security warning: Do not hard-code passwords in SQL statements. The following example demonstrates only the syntax structure. Its WITH clause contains a plaintext password, which poses a major security risk. In a production environment, you must use a more secure method to manage credentials.

    Syntax

    call dbms_etl.sync_by_sql("search", "<sync_sql>");

    Example

    call dbms_etl.sync_by_sql("search", "
    -- Step 1: Define the PolarDB source table
    CREATE TEMPORARY TABLE `db1`.`sbtest1` (
      `id`   BIGINT,
      `k`    BIGINT,
      `c`    STRING,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'xxxxxxx', -- Enter the PolarDB cluster endpoint
      'port' = '3306',
      'username' = 'xxx',     -- Do not use plaintext in a production environment
      'password' = 'xxx',     -- Do not use plaintext in a production environment
      'database-name' = 'db1',
      'table-name' = 'sbtest1'
    );
    
    -- Step 2: Define the PolarSearch destination table
    CREATE TEMPORARY TABLE `dest` (
      `k`  BIGINT,
      `max_c` STRING,
      PRIMARY KEY (`k`) NOT ENFORCED
    ) WITH (
      'connector' = 'opensearch',
      'hosts' = 'xxxxxx:xxxx',     -- Enter the PolarSearch endpoint
      'index' = 'dest',
      'username' = 'xxx',     -- Do not use plaintext in a production environment
      'password' = 'xxx'      -- Do not use plaintext in a production environment
    );
    
    -- Step 3: Define the calculation and insertion logic
    INSERT INTO `dest`
    SELECT
        `t1`.`k`,
        MAX(`t1`.`c`)
    FROM `db1`.`sbtest1` AS `t1`
    GROUP BY `t1`.`k`;
    ");
  2. Verify the data

    Connect to the PolarSearch node. You can then use the Elasticsearch-compatible REST API to query the data and verify the synchronization.

    # Replace <polarsearch_endpoint> with the endpoint of your PolarSearch node
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

Manage synchronization links

You can use the following commands to view and delete synchronization links.

View links

  • View all links:

    call dbms_etl.show_sync_link();
  • View a specific link by ID: Replace <sync_id> with the ID that was returned when you created the link.

    call dbms_etl.show_sync_link_by_id('<sync_id>')\G

    Description of the returned results:

    *************************** 1. row ***************************
            SYNC_ID: crb5rmv8rttsg
               NAME: crb5rmv8rttsg
             SYSTEM: search
    SYNC_DEFINITION: db1.t1 -> dest
      SOURCE_TABLES: db1.t1
        SINK_TABLES: dest
             STATUS: active  -- The status of the link. active indicates that it is running normally.
            MESSAGE:         -- If an error occurs, the error message is displayed here.
         CREATED_AT: 2024-05-20 11:55:06
         UPDATED_AT: 2024-05-20 17:28:04
            OPTIONS: ...

Delete a link

Important

Deleting a synchronization link is a high-risk operation. By default, this operation also deletes the destination index and all its data in PolarSearch. You must confirm the operation before you proceed.

This operation stops data synchronization and cleans up the related resources.

call dbms_etl.drop_sync_link('<sync_id>');

When you run the drop_sync_link command to delete links in different states, the system's processing logic differs:

  • For links in the active state, the status first changes to dropping. After the system finishes cleaning up the link resources and destination index data, the status changes to dropped.

  • For links in the dropped state, the system completely purges the link's information.

  • For links in other states, the delete operation is not supported.

FAQ

How do I map fields from a PolarDB table to index fields on a PolarSearch node?

AutoETL provides two field mapping methods:

  • Implicit mapping (sync_by_table): When you use sync_by_table, the index field names on the PolarSearch node default to the column names of the source table in PolarDB for MySQL. You can use the <column_list> parameter to specify which columns to create and synchronize.

  • Explicit mapping (sync_by_map): When you rename fields or aggregate multiple tables, you can use the <columns_map> parameter of sync_by_map to explicitly define the mapping between destination fields and source table columns. For example, dest.title(db1.posts.post_title) maps the post_title column in the db1.posts table to the title field in the dest index.