全部產品
Search
文件中心

PolarDB:自動將PolarDB MySQL版資料同步至PolarSearch

更新時間:Feb 11, 2026

當您需要對PolarDB MySQL版中的業務資料進行全文檢索索引或複雜分析時,直接在資料庫上操作可能會影響核心業務的穩定性。PolarDB提供的AutoETL功能,能將資料從讀寫節點自動、持續地同步至叢集內的PolarSearch節點,為您提供一站式的資料服務。您無需額外部署和維護ETL工具,即可實現資料同步,並將搜尋分析負載與線上交易處理負載隔離。

說明

當前功能目前正處於灰階階段。如您有相關需求,請提交工單與我們聯絡,以便為您開啟該功能。

功能簡介

AutoETL是PolarDB MySQL版內建的資料同步能力,它允許資料在叢集內不同類型的節點間自動流轉。目前的版本僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點,以用於高效能的搜尋和分析。

您可以通過資料庫內建的DBMS_ETL工具包,直接使用SQL命令來建立和管理資料同步鏈路。AutoETL提供三種靈活的資料同步方式:

  • 單表同步(dbms_etl.sync_by_table:將單個源表完整地同步到目標索引。

  • 多表匯聚(dbms_etl.sync_by_map:將多個源表通過JOIN操作匯聚後,同步到目標索引。

  • 自訂SQL(dbms_etl.sync_by_sql:使用相容Flink SQL的文法進行複雜的資料清洗、轉換和彙總。

適用範圍

使用AutoETL功能前,需確保環境滿足以下條件:

  • 叢集版本

    • MySQL 8.0.1,且修訂版本需為8.0.1.1.52或以上。

    • MySQL 8.0.2,且修訂版本需為8.0.2.2.33或以上。

  • 同步方向:僅支援從PolarDB MySQL版同步至同一叢集內的PolarSearch節點

  • DDL限制:對已建立同步鏈路的源表進行DDL操作時,需遵循特定的規則與實踐以避免同步中斷。部分不相容的變更需要重建鏈路。詳情請參見DDL變更規則與實踐

  • 資料類型:暫不支援BIT類型以及GEOMETRYPOINTLINESTRINGPOLYGONMULTIPOINTMULTILINESTRINGMULTIPOLYGONGEOMETRYCOLLECTION等空間資料類型的同步。

建立同步鏈路

單表同步

  1. 資料準備

    PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。

    CREATE DATABASE IF NOT EXISTS db1;
    USE db1;
    CREATE TABLE IF NOT EXISTS t1 (
        id INT PRIMARY KEY,
        c1 VARCHAR(100),
        c2 VARCHAR(100)
    );
    INSERT INTO t1(id, c1, c2) VALUES 
    (1, 'apple', 'red'),
    (2, 'banana', 'yellow'),
    (3, 'grape', 'purple');
    
  2. 建立同步鏈路

    使用dbms_etl.sync_by_table預存程序,建立從db1.t1表到PolarSearch節點的索引dest的同步任務。

    文法

    call dbms_etl.sync_by_table("search", "<source_table>", "<sink_table>", "<column_list>");

    參數說明

    參數

    說明

    search

    同步目標,當前固定為search,表示PolarSearch節點。

    <source_table>

    源表名,格式為資料庫名.表名

    <sink_table>

    PolarSearch節點中的目標索引名。

    <column_list>

    需要同步的列名列表,用英文逗號,分隔。如果為空白字串"",則同步源表所有列。

    使用限制

    • 源表需包含主鍵或唯一鍵。

    • 在不同的同步鏈路中,不能使用相同的源表或目標表。

    • 建立鏈路後,源表新增的列預設不會被自動同步。如需同步新增列,請重建鏈路。

    • 如果您希望使用自己定義的目標索引配置,您可以先在PolarSearch節點中手動建立索引並定義其配置,然後再建立同步鏈路。如果鏈路建立時目標索引不存在,系統將自動建立。

    樣本

    • db1.t1全表同步到PolarSearch的dest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "");
    • db1.t1表的c1c2列同步到dest索引:

      call dbms_etl.sync_by_table("search", "db1.t1", "dest", "c1, c2");
  3. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

多表匯聚

  1. 資料準備

    PolarDB MySQL版中執行以下SQL語句,建立樣本資料庫和表,並插入測試資料。

    CREATE DATABASE IF NOT EXISTS db1;
    CREATE DATABASE IF NOT EXISTS db2;
    CREATE DATABASE IF NOT EXISTS db3;
    
    CREATE TABLE IF NOT EXISTS db1.t1 (id INT PRIMARY KEY, c1 INT);
    CREATE TABLE IF NOT EXISTS db2.t2 (id INT PRIMARY KEY, c2 INT);
    CREATE TABLE IF NOT EXISTS db3.t3 (id INT PRIMARY KEY, c3 VARCHAR(10));
    
    INSERT INTO db1.t1(id, c1) VALUES (1, 11), (2, 22), (3, 33);
    INSERT INTO db2.t2(id, c2) VALUES (1, 111), (2, 222), (4, 444);
    INSERT INTO db3.t3(id, c3) VALUES (1, 'aaa'), (3, 'ccc'), (4, 'ddd');
  2. 建立同步鏈路

    使用dbms_etl.sync_by_map預存程序,可將多個表的資料連線(JOIN)後,匯聚到一個PolarSearch節點的索引中。

    文法

    call dbms_etl.sync_by_map(
        "search",
        "<columns_map>", -- 目標索引欄位與源表欄位的映射關係
        "<join_fields>", -- 表之間的串連鍵
        "<join_types>",  -- 連線類型 (inner, left)
        "<filter>"       -- 資料過濾條件
    );

    參數說明

    參數

    格式樣本

    說明

    columns_map

    dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)

    目標索引欄位與源表欄位的映射關係。

    樣本表示:目標索引destc1欄位來自db1.t1.c1c2欄位來自db2.t2.c2

    join_fields

    dest.id=db1.t1.id,db2.t2.id

    表之間的串連鍵。

    樣本表示:目標索引的文檔ID(dest.id)由db1.t1.iddb2.t2.id構成,同時db1.t1.iddb2.t2.id也是串連條件。

    join_types

    inner,left

    表之間的連線類型,串連順序與join_fields中表的出現順序一致。樣本表示:t1 INNER JOIN t2,然後結果再LEFT JOIN t3

    filter

    db1.t1.c1 > 10 AND db2.t2.c2 < 100

    一個標準的SQL WHERE子句,用於在同步前過濾源表資料。

    使用限制

    • 所有參與同步的源表必須包含主鍵。

    • 該功能使用流式計算,同步過程中僅保證最終一致性。

    • 對於目標索引的更新模式為先刪除後插入。如果您不希望在查詢時訪問到被刪除資料的中間狀態,可以在執行命令前設定會話變數set sink_options = "'ignore-delete' = 'true'";以忽略PolarSearch節點資料刪除的選項。

    樣本

    • 兩張表INNER JOIN:將db1.t1db2.t2通過id欄位進行INNER JOIN,並將t1.c1t2.c2同步到dest索引的c1c2欄位。

      call dbms_etl.sync_by_map(
        "search",
        "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2)",
        "dest.id=db1.t1.id,db2.t2.id", 
        "inner",
         ""
      );
    • 多張表混合JOIN並過濾:db1.t1db2.t2db3.t3三張表串連,其中t1t2INNER JOINt1t3LEFT JOIN,並篩選t1.c1 > 10t2.c2 < 100的資料。

      call dbms_etl.sync_by_map(
        "search", 
        "dest.id(db1.t1.id),dest.c1(db1.t1.c1),dest.c2(db2.t2.c2),dest.c3(db3.t3.c3)", 
        "dest.id=db1.t1.id,db2.t2.id,db3.t3.id", 
        "inner,left", 
        "db1.t1.c1 > 10 and db2.t2.c2 < 100"
      );
  3. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

自訂SQL

  1. 建立同步鏈路

    對於需要複雜轉換、彙總或計算的情境,dbms_etl.sync_by_sql預存程序支援使用Flink SQL文法定義資料同步邏輯。

    重要

    安全警告:嚴禁在SQL語句中寫入程式碼密碼以下樣本僅為示範文法結構,其WITH子句中包含純文字密碼,存在極大的安全風險。在生產環境中,必須使用更安全的方式管理憑證。

    文法

    call dbms_etl.sync_by_sql("search", "<sync_sql>");

    樣本

    系統將自動替換SQL中的預留位置{mysql_host}{mysql_port}{mysql_user}{mysql_password}{search_host}{search_port}{search_user}{search_password}。您僅需按固定預留位置編寫SQL語句即可。

    CALL dbms_etl.sync_by_sql("search", "
    -- 步驟1:定義 PolarDB 源表
    CREATE TEMPORARY TABLE `db1`.`sbtest1` (
      `id`   BIGINT,
      `k`    BIGINT,
      `c`    STRING,
      PRIMARY KEY (`id`) NOT ENFORCED
    ) WITH (
      'connector' = 'mysql',
      'hostname' = '{mysql_host}',
      'port' = '{mysql_port}',
      'username' = '{mysql_user}',       -- 生產環境嚴禁使用明文
      'password' = '{mysql_password}',   -- 生產環境嚴禁使用明文
      'database-name' = 'db1',
      'table-name' = 'sbtest1'
    );
    
    -- 步驟2:定義 PolarSearch 目標表
    CREATE TEMPORARY TABLE `dest` (
      `k`  BIGINT,
      `max_c` STRING,
      PRIMARY KEY (`k`) NOT ENFORCED
    ) WITH (
      'connector' = 'opensearch',
      'hosts' = '{search_host}:{search_port}',   
      'index' = 'dest',
      'username' = '{search_user}',     -- 生產環境嚴禁使用明文
      'password' = '{search_password}'  -- 生產環境嚴禁使用明文
    );
    
    -- 步驟3:定義計算和插入邏輯
    INSERT INTO `dest`
    SELECT
        `t1`.`k`,
        MAX(`t1`.`c`)
    FROM `db1`.`sbtest1` AS `t1`
    GROUP BY `t1`.`k`;
    ");
  2. 驗證資料

    串連到PolarSearch節點,使用與Elasticsearch相容的REST API進行查詢,確認資料已同步。

    # 將<polarsearch_endpoint>替換為PolarSearch節點的串連地址
    curl -u <user>:<password> -X GET "http://<polarsearch_endpoint>/dest/_search"

管理同步鏈路

您可以使用以下命令查看和刪除已建立的同步鏈路。

查看鏈路

  • 查看所有鏈路:

    call dbms_etl.show_sync_link();
  • 根據ID查看指定鏈路:將<sync_id>替換為步驟二返回的ID。

    call dbms_etl.show_sync_link_by_id('<sync_id>')\G

    返回結果說明:

    *************************** 1. row ***************************
            SYNC_ID: crb5rmv8rttsg
               NAME: crb5rmv8rttsg
             SYSTEM: search
    SYNC_DEFINITION: db1.t1 -> dest
      SOURCE_TABLES: db1.t1
        SINK_TABLES: dest
             STATUS: active  -- 鏈路狀態,active表示正常運行
            MESSAGE:         -- 如果出錯,此處會顯示錯誤資訊
         CREATED_AT: 2024-05-20 11:55:06
         UPDATED_AT: 2024-05-20 17:28:04
            OPTIONS: ...

刪除鏈路

重要

刪除同步鏈路是高危操作。預設情況下,該操作會同時刪除PolarSearch中的目標索引及其所有資料。執行前請務必確認。

此操作用於停止資料同步並清理相關資源。

call dbms_etl.drop_sync_link('<sync_id>');

對不同狀態的鏈路執行drop_sync_link刪除時,系統的處理邏輯存在差異:

  • active狀態的鏈路:首先會變為dropping,待系統完成鏈路資源和目標索引資料的清理後,狀態才會變為dropped

  • dropped狀態的鏈路:系統將徹底清除該鏈路的資訊。

  • 其他狀態的鏈路:系統不支援刪除操作。

DDL變更規則與實踐

對已建立同步鏈路的源表進行DDL操作時,需要根據建立鏈路的方式和具體操作,遵循不同的處理流程,以確保資料同步的穩定性。不當的DDL操作可能導致同步鏈路中斷。

單表同步(sync_by_table)鏈路

說明

通過sync_by_table建立的鏈路不支援僅同步指定欄位。

  • 增加欄位:為保證同步鏈路不中斷,您需要先在PolarSearch目標索引中增加欄位,然後再對來源資料表執行ADD COLUMN操作。

    1. PolarSearch中為目標索引增加欄位對應。例如,為demo索引增加age欄位:

      PUT demo/_mapping
      {
        "properties": {
          "age": { "type": "integer" }
        }
      }
    2. 在來源資料表中增加對應的列:

      ALTER TABLE demo ADD COLUMN age INT;
  • 刪除欄位:在來源資料表刪除欄位後,增量寫入的資料可以正常同步,但PolarSearch存量資料中仍會保留被刪除欄位的值。

  • 修改欄位類型

    • 類型相容:如果修改後的類型與原類型相容(例如,從INT修改為TINYINT),可直接在源表修改,增量資料可以正常同步。

      ALTER TABLE demo MODIFY COLUMN score TINYINT;
    • 類型不相容:如果類型不相容,會導致同步鏈路不可用,需重建鏈路。

多表匯聚(sync_by_map)與自訂SQL(sync_by_sql)鏈路

  • 對非同步欄位進行DDL操作:在源表中增加、刪除或修改未被同步的欄位,不會影響同步鏈路,增量資料可以正常同步。

  • 對同步欄位進行DDL操作:

    • 增加欄位:需要重建鏈路。

    • 刪除欄位:刪除同步欄位後,增量資料可以正常同步,但增量資料中該欄位的值將為null

    • 修改欄位類型

      • 類型相容:如果修改後的類型與原類型相容,可直接在源表修改,增量資料可以正常同步。

      • 類型不相容:如果類型不相容,會導致同步鏈路不可用,需重建鏈路。

重建鏈路最佳實務

為最小化對業務的影響,建議採用“新索引 + 新鏈路”的方式進行重建。待新鏈路資料同步完成並驗證無誤後,再將業務查詢流量切換至新的索引。

樣本:為shop.user表新增欄位並重建鏈路。假設原鏈路將shop.user表(含id, name, phone, gmt_create欄位)同步至user_v1索引。現需新增membership_level欄位,並確保線上查詢不受影響。

  1. 建立新索引:建立一個名為user_v2的新索引,並在其映射關係中包含新欄位membership_level

    PUT user_v2
    {
      "mappings": {
        "properties": {
          "id":               { "type": "keyword" },
          "name":             { "type": "text", "fields": { "keyword": { "type": "keyword" } } },
          "phone":            { "type": "keyword" },
          "gmt_create":       { "type": "date" },
          "membership_level": { "type": "integer" }
        }
      }
    }
  2. 修改源表:在源端MySQL的user表中增加新列。

    ALTER TABLE user ADD COLUMN membership_level TINYINT NOT NULL DEFAULT 0 COMMENT '會員等級';
  3. 建立同步鏈路:建立一條新的同步鏈路,將shop.user表的資料同步到user_v2索引。

  4. 驗證與切換:待資料同步完成後,驗證新索引中的資料是否正確。確認無誤後,將業務查詢流量切換到新的user_v2索引。

  5. 清理舊資源:觀察新鏈路運行穩定後,可刪除舊的同步鏈路和user_v1索引。

常見問題

如何將PolarDB的表欄位對應到PolarSearch節點的索引欄位?

AutoETL 提供兩種欄位對應方式:

  • 隱式映射(sync_by_table):在使用sync_by_table時,PolarSearch節點的索引欄位名預設與PolarDB MySQL版源表的列名一致。您可通過<column_list>參數指定需要建立和同步的特定列。

  • 顯式映射(sync_by_map):在進列欄位重新命名或多表匯聚時,可利用sync_by_map<columns_map>參數明確定義目標欄位與源表列之間的映射關係。例如,dest.title(db1.posts.post_title)表示將db1.posts表中的post_title列映射為dest索引中的title欄位。