全部產品
Search
文件中心

PolarDB:基於PolarDB的圖分析:通過DTS將其他資料庫的資料表同步至PolarDB的圖

更新時間:Jun 14, 2025

在圖資料庫的應用情境中,您可能選擇先將資料寫入到其它資料庫中,再進一步將資料同步到圖資料庫中進行圖查詢。本文以PolarDB MySQL版資料來源為例,為您介紹如何通過DTS任務將MySQL的資料,同步到PolarDB PostgreSQL版所管理的圖資料庫的全過程。

前提條件

版本說明

支援的PolarDB PostgreSQL版的版本如下:

  • PostgreSQL 16(核心小版本2.0.16.8.3.0及以上)

  • PostgreSQL 15(核心小版本2.0.15.12.4.0及以上)

  • PostgreSQL 14(核心小版本2.0.14.12.24.0及以上)

說明

您可在控制台查看核心小版本號碼,也可以通過SHOW polardb_version;語句查看。如未滿足核心小版本要求,請升級核心小版本

資料格式

  • 要求寫入的節點和邊資料有一列在其類型(Label)中唯一且小於2^48的ID。而邊資料除唯一ID之外,還包括兩列分別指定其起點和終點對應的節點的ID。

  • 對於沒有唯一ID、或者唯一ID不為整數類型的節點和邊,通常可以添加一列serial類型的列作為唯一ID。如果沒有唯一ID的協助,將無法同步對資料的更改和刪除。這列唯一ID可以在資料表中使用serial的特性自動產生而不必手動插入,同時可以選擇不將此列ID加入到圖。

同步流程

最佳實務

資源準備

  • 一個PolarDB MySQL版叢集作為資料來源,詳細操作請參考購買叢集

  • 一個PolarDB PostgreSQL版叢集,詳細操作請參考建立資料庫叢集

  • PolarDB MySQL版PolarDB PostgreSQL版均在同一地區、同一可用性區域和同一個VPC中。

步驟

PolarDB MySQL版叢集寫入新增資料,通過DTS任務同步到PolarDB PostgreSQL版的圖資料庫中。

  1. 基礎資料準備。

    假設需要同步的圖資料由三部分組成:兩個點資料表A和B,用於記錄圖上的點,且各自存在唯一ID。一個邊資料表C,記錄其起點在A中和終點在B中的唯一ID。然後在PolarDB PostgreSQL版中建立一張圖,包含A,B兩種類型的節點和C這種類型的邊。

    • PolarDB MySQL版

      1. 建立表定義。

        CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
        CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
        CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
      2. 使用DTS前,需要開啟PolarDB MySQL版的Binlog功能,詳細操作請參考開啟Binlog

    • PolarDB PostgreSQL版

      1. 在目標資料庫使用高許可權帳號建立並載入外掛程式,建立高許可權帳號請參考建立資料庫帳號

        說明

        使用Data Management(Data Management)用戶端設定search_path時,可能會存在相容性問題,您可使用PolarDB-Tools執行相關語句。

        CREATE EXTENSION age;
        ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog;
        ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';
      2. 建立圖、點和邊。

        SELECT create_graph('gra');
        SELECT create_vlabel('gra', 'label_a');
        SELECT create_vlabel('gra', 'label_b');
        SELECT create_elabel('gra', 'edge_c');
  2. 通過DTS將資料同步到PolarDB PostgreSQL版

    說明
    • 在搭建同步鏈路的過程中,請勿寫入資料,否則該部分資料將無法匯入圖中。

    • 在搭建DTS鏈路完成後,請勿修改同步表的資料結構(如增加、刪除列等操作),否則可能導致後續無法同步。

    1. 進入目標地區的同步工作清單頁面(二選一)。

      通過DTS控制台進入

      1. 登入Data Transmission Service控制台

      2. 在左側導覽列,單擊資料同步

      3. 在頁面左上方,選擇同步執行個體所屬地區。

      通過DMS控制台進入

      說明

      實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台自訂DMS介面布局與樣式

      1. 登入Data Management服務

      2. 在頂部功能表列中,選擇Data + AI > 資料轉送(DTS) > 資料同步

      3. 同步任務右側,選擇同步執行個體所屬地區。

    2. 單擊創建任務,進入任務配置頁面。

    3. 在建立同步任務頁面,配置以下資訊。

      說明

      此處僅為部分配置項,詳細配置項說明可參考配置參數說明

      類別

      配置

      說明

      源庫資訊

      資料庫類型

      選擇MySQL

      接入方式

      選擇專線/VPN網關/智能網關

      目標庫資訊

      資料庫類型

      選擇PostgreSQL

      接入方式

      選擇專線/VPN網關/智能網關

      根據實際叢集資訊填寫的執行個體地區、VPC網段、叢集地址、連接埠、使用者名稱和密碼等資訊。

    4. 配置任務對象對象配置步驟,選擇對應資料庫下,TABLE欄目中的raw_A、raw_B、raw_C三張表。後續步驟使用預設配置,單擊下一步即可。

      說明

      其他配置任務對象的詳細說明請參考配置任務對象進階配置

    5. 儲存任務並進行預檢查。預檢查通過率顯示為100%時,單擊下一步購買

    6. 購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》,單擊購買並啟動,並在彈出的確認對話方塊,單擊確定

      說明
      • 購買頁面的資訊配置請參考購買執行個體

      • 您可在DTS資料同步介面查看具體任務進度。

  3. 通過觸發器將資料同步到圖。

    1. PolarDB PostgreSQL版叢集建立如下輔助函數:

      --- 函數1
      CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
      RETURNS bigint
      AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
      language SQL IMMUTABLE STRICT PARALLEL SAFE;
      
      
      
      --- 函數2
      CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
      
      
      
      --- 函數3
      CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
      
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
    2. 執行輔助函數即可構建從同步表到圖中的觸發器。

      說明
      • 請統一使用小寫,大小寫敏感。

      • 其中your_schema_name需要替換成raw_a等資料表所在的schema,可在psql用戶端使用\d+ <table_name>查看,通常與原表schema保持一致。

      select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
      select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
      select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
    說明

    該觸發器只能同步增量資料,對於存量資料表,需要在建立上述觸發器之後執行如下語句:

    INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;
    INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;
    INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
  4. 測實驗證。

    資料插入

    1. PolarDB MySQL版中,向需要同步的表中插入測試資料。

      INSERT INTO raw_a values(1,1,1,'2000-01-01');
      INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
      INSERT INTO raw_c values(1,1,1);
    2. PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料插入成功:

      • SELECT * FROM cypher('gra', $$
        MATCH (v)
        RETURN v
        $$) as (v agtype);

        返回結果如下:

        ------
         {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
         {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
      • SELECT * FROM cypher('gra', $$
        MATCH (v)-[e]->(v2)
        RETURN e
        $$) as (e agtype);

        返回結果如下:

        ------
         {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge

    屬性修改

    1. PolarDB MySQL版中,向需要同步的表中修改測試資料屬性。

      UPDATE raw_a SET name = '2' WHERE id = 1;
    2. PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料屬性修改成功:

      SELECT * FROM cypher('gra', $$
      MATCH (v:label_a {id:1})
      RETURN v
      $$) as (v agtype);

      返回結果如下:

      -----
       {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex

    資料刪除

    1. PolarDB MySQL版中,在需要同步的表中刪除測試資料。

      DELETE FROM raw_c WHERE id = 1;
    2. PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料刪除成功:

      SELECT * FROM cypher('gra', $$
      MATCH (v)-[e]->(v2)
      RETURN e
      $$) as (e agtype);

      返回結果為空白,即資料刪除已同步。

注意事項

  • 在搭建同步鏈路的過程中不可以寫入資料,否則這部分資料會無法匯入圖。

  • 在搭建DTS鏈路完成後,請勿修改同步表的資料結構(如增加、刪除列等操作),否則可能導致後續無法同步。

  • 上述輔助函數會將全部的列作為屬性加入到圖的屬性中,如果希望調整加入到圖中的屬性,可以通過修改形如_sync_<表名>_row_to_properties函數的定義,其常規定義如下所示。將需要修改加入的列或對列的值進行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如,希望將id_aid_b兩列進行拼接,可以調整為select val.id_a::text || val.id_b::text AS id

    CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
    RETURNS agtype
    AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
    LANGUAGE SQL;
  • 在寫入邊時,需要確保其兩側的點已經插入後再寫入這條邊,否則可能造成圖資料庫在查詢時因為找不到對應的點而產生錯誤。