全部產品
Search
文件中心

PolarDB:自動將PolarDB MySQL版資料同步至PolarSearch

更新時間:Dec 26, 2025

當您需要對PolarDB MySQL版中的業務資料進行全文檢索索引或複雜分析時,直接在資料庫上操作可能會影響核心業務的穩定性。PolarDB提供的AutoETL功能,能將資料從讀寫節點自動、持續地同步至叢集內的PolarSearch節點,為您提供一站式的資料服務。您無需額外部署和維護ETL工具,即可實現資料同步,並將搜尋分析負載與線上交易處理負載隔離。

說明

當前功能目前正處於灰階階段。如您有相關需求,請提交工單與我們聯絡,以便為您開啟該功能。

功能簡介

AutoETL是PolarDB MySQL版內建的資料同步能力,它允許資料在叢集內不同類型的節點間自動流轉。目前的版本僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點。,以用於高效能的搜尋和分析。

您可以通過資料庫內建的DBMS_ETL工具包,直接使用SQL命令來建立和管理資料同步鏈路。AutoETL提供三種靈活的資料同步方式:

  • 單表同步(dbms_etl.sync_by_table:將單個源表完整地同步到目標索引。

  • 多表匯聚(dbms_etl.sync_by_map:將多個源表通過JOIN操作匯聚後,同步到目標索引。

  • 自訂SQL(dbms_etl.sync_by_sql:使用相容Flink SQL的文法進行複雜的資料清洗、轉換和彙總。

適用範圍

使用AutoETL功能前,需確保環境滿足以下條件:

  • 叢集版本MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。

  • 同步方向:僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點

  • DDL限制:不允許對已建立同步鏈路的源表進行DDL變更。如需修改,必須重建ETL鏈路。

  • 資料類型:暫不支援BIT類型以及GEOMETRYPOINTLINESTRINGPOLYGONMULTIPOINTMULTILINESTRINGMULTIPOLYGONGEOMETRYCOLLECTION等空間資料類型的同步。

建立同步鏈路

單表同步

  1. 資料準備

    PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。

    CREATE DATABASE IF NOT EXISTS db1;
    USE db1;
    CREATE TABLE IF NOT EXISTS t1 (
        id INT PRIMARY KEY,
        c1 VARCHAR(100),
        c2 VARCHAR(100)
    );
    INSERT INTO t1(id, c1, c2) VALUES 
    (1, 'apple', 'red'),
    (2, 'banana', 'yellow'),
    (3, 'grape', 'purple');
    
  2. 建立同步鏈路

    使用dbms_etl.sync_by_table預存程序,建立從db1.t1表到PolarSearch節點的索引dest的同步任務。

    文法

    call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");

    參數說明

    參數

    說明

    search

    同步目標,當前固定為search,表示PolarSearch節點。

    <source_table>

    源表名,格式為資料庫名.表名

    <sink_table>

    PolarSearch節點中的目標索引名。

    <column_list>

    需要同步的列名列表,用英文逗號,分隔。如果為空白字串"",則同步源表所有列。

    使用限制

    • 源表需包含主鍵或唯一鍵。

    • 在不同的同步鏈路中,不能使用相同的源表或目標表。

    • 建立鏈路後,源表新增的列預設不會被自動同步。如需同步新增列,請重建鏈路。

    • 如果您希望使用自己定義的目標索引配置,您可以先在PolarSearch節點中手動建立索引並定義其配置,然後再建立同步鏈路。如果鏈路建立時目標索引不存在,系統將自動建立。

    樣本

    • db1.t1全表同步到PolarSearch的dest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");
    • db1.t1表的c1c2列同步到dest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
  3. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

多表匯聚

  1. 資料準備

    PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。

    CREATE DATABASE IF NOT EXISTS db1;
    CREATE DATABASE IF NOT EXISTS db2;
    CREATE DATABASE IF NOT EXISTS db3;
    
    CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT);
    CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT);
    CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10));
    
    INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33);
    INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444);
    INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');
  2. 建立同步鏈路

    使用dbms_etl.sync_by_map預存程序,可將多個表的資料連線(JOIN)後,匯聚到一個PolarSearch節點的索引中。

    文法

    call dbms_etl.sync_by_map(
        "search",
        "<columns_map>", -- 目標索引欄位與源表欄位的映射關係
        "<join_fields>", -- 表之間的串連鍵
        "<join_types>",  -- 連線類型 (inner, left)
        "<filter>"       -- 資料過濾條件
    );

    參數說明

    參數

    格式樣本

    說明

    columns_map

    dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)

    目標索引欄位與源表欄位的映射關係。

    樣本表示:目標索引destc1欄位來自db1.t1.c1c2欄位來自db2.t2.c2

    join_fields

    dest.id=db1.t1.id,db2.t2.id

    表之間的串連鍵。

    樣本表示:目標索引的文檔ID(dest.id)由db1.t1.iddb2.t2.id構成,同時db1.t1.iddb2.t2.id也是串連條件。

    join_types

    inner,left

    表之間的連線類型,串連順序與join_fields中表的出現順序一致。樣本表示:t1 INNER JOIN t2,然後結果再LEFT JOIN t3

    filter

    db1.t1.c1 > 10 AND db2.t2.c2 < 100

    一個標準的SQL WHERE子句,用於在同步前過濾源表資料。

    使用限制

    • 所有參與同步的源表必須包含主鍵。

    • 該功能使用流式計算,同步過程中僅保證最終一致性。

    • 對於目標索引的更新模式為先刪除後插入。如果您不希望在查詢時訪問到被刪除資料的中間狀態,可以在執行命令前設定會話變數set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch節點資料刪除的選項。

    樣本

    • 兩張表INNER JOIN:將db1.t1db2.t2通過id欄位進行INNER JOIN,並將t1.c1t2.c2同步到dest索引的c1c2欄位。

      call dbms_etl.sync_by_map(
        "search",
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2)",
        "dest.id=db1.t1.id,db2.t2.id", 
        "inner",
         ""
      );
    • 多張表混合JOIN並過濾:db1.t1db2.t2db3.t3三張表串連,其中t1t2INNER JOINt1t3LEFT JOIN,並篩選t1.c1 > 10t2.c2 < 100的資料。

      call dbms_etl.sync_by_map(
        "search", 
        "dest.c1(db1.t1.c1), dest.c2(db2.t2.c2), dest.c3(db3.t3.c3)", 
        "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", 
        "inner,left", 
        "db1.t1.c1 > 10 and db2.t2.c2 < 100"
      );
  3. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

自訂SQL

  1. 建立同步鏈路

    對於需要複雜轉換、彙總或計算的情境,dbms_etl.sync_by_sql預存程序支援使用Flink SQL文法定義資料同步邏輯。

    重要

    安全警告:嚴禁在SQL語句中寫入程式碼密碼以下樣本僅為示範文法結構,其WITH子句中包含純文字密碼,存在極大的安全風險。在生產環境中,必須使用更安全的方式管理憑證。

    文法

    call dbms_etl.sync_by_sql("search", "<sync_sql>");

    樣本

    call dbms_etl.sync_by_sql("search", "
    -- 步驟1:定義 PolarDB 源表
    CREATE TEMPORARY TABLE `db1`.`sbtest1` (
      `id`   BIGINT,
      `k`    BIGINT,
      `c`    STRING,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = 'xxxxxxx', -- 填寫 PolarDB 叢集地址
      'port' = '3306',
      'username' = 'xxx',     -- 生產環境嚴禁使用明文
      'password' = 'xxx',     -- 生產環境嚴禁使用明文
      'database-name' = 'db1',
      'table-name' = 'sbtest1'
    );
    
    -- 步驟2:定義 PolarSearch 目標表
    CREATE TEMPORARY TABLE `dest` (
      `k`  BIGINT,
      `max_c` STRING,
      PRIMARY KEY (`k`) NOT ENFORCED
    ) WITH (
      'connector' = 'opensearch',
      'hosts' = 'xxxxxx:xxxx',     -- 填寫 PolarSearch 串連地址
      'index' = 'dest',
      'username' = 'xxx',     -- 生產環境嚴禁使用明文
      'password' = 'xxx'      -- 生產環境嚴禁使用明文
    );
    
    -- 步驟3:定義計算和插入邏輯
    INSERT INTO `dest`
    SELECT
        `t1`.`k`,
        MAX(`t1`.`c`)
    FROM `db1`.`sbtest1` AS `t1`
    GROUP BY `t1`.`k`;
    ");
  2. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

管理同步鏈路

您可以使用以下命令查看和刪除已建立的同步鏈路。

查看鏈路

  • 查看所有鏈路:

    call dbms_etl.show_sync_link();
  • 根據ID查看指定鏈路:將<sync_id>替換為步驟二返回的ID。

    call dbms_etl.show_sync_link_by_id('<sync_id>')\G

    返回結果說明:

    *************************** 1. row ***************************
            SYNC_ID: crb5rmv8rttsg
               NAME: crb5rmv8rttsg
             SYSTEM: search
    SYNC_DEFINITION: db1.t1 -> dest
      SOURCE_TABLES: db1.t1
        SINK_TABLES: dest
             STATUS: active  -- 鏈路狀態,active表示正常運行
            MESSAGE:         -- 如果出錯,此處會顯示錯誤資訊
         CREATED_AT: 2024-05-20 11:55:06
         UPDATED_AT: 2024-05-20 17:28:04
            OPTIONS: ...

刪除鏈路

重要

刪除同步鏈路是高危操作。預設情況下,該操作會同時刪除PolarSearch中的目標索引及其所有資料。執行前請務必確認。

此操作用於停止資料同步並清理相關資源。

call dbms_etl.drop_sync_link('<sync_id>');

對不同狀態的鏈路執行drop_sync_link刪除時,系統的處理邏輯存在差異:

  • active狀態的鏈路:首先會變為dropping,待系統完成鏈路資源和目標索引資料的清理後,狀態才會變為dropped

  • dropped狀態的鏈路:系統將徹底清除該鏈路的資訊。

  • 其他狀態的鏈路:系統不支援刪除操作。

常見問題

如何將PolarDB的表欄位對應到PolarSearch節點的索引欄位?

AutoETL 提供兩種欄位對應方式:

  • 隱式映射(sync_by_table):在使用sync_by_table時,PolarSearch節點的索引欄位名預設與PolarDB MySQL版源表的列名一致。您可通過<column_list>參數指定需要建立和同步的特定列。

  • 顯式映射(sync_by_map):在進列欄位重新命名或多表匯聚時,可利用sync_by_map<columns_map>參數明確定義目標欄位與源表列之間的映射關係。例如,dest.title(db1.posts.post_title)表示將db1.posts表中的post_title列映射為dest索引中的title欄位。