在圖資料庫的應用情境中,您可能選擇先將資料寫入到其它資料庫中,再進一步將資料同步到圖資料庫中進行圖查詢。本文以PolarDB MySQL版資料來源為例,為您介紹如何通過DTS任務將MySQL的資料,同步到PolarDB PostgreSQL版所管理的圖資料庫的全過程。
前提條件
版本說明
支援的PolarDB PostgreSQL版的版本如下:
PostgreSQL 16(核心小版本2.0.16.8.3.0及以上)
PostgreSQL 15(核心小版本2.0.15.12.4.0及以上)
PostgreSQL 14(核心小版本2.0.14.12.24.0及以上)
資料格式
要求寫入的節點和邊資料有一列在其類型(Label)中唯一且小於2^48的ID。而邊資料除唯一ID之外,還包括兩列分別指定其起點和終點對應的節點的ID。
對於沒有唯一ID、或者唯一ID不為整數類型的節點和邊,通常可以添加一列
serial類型的列作為唯一ID。如果沒有唯一ID的協助,將無法同步對資料的更改和刪除。這列唯一ID可以在資料表中使用serial的特性自動產生而不必手動插入,同時可以選擇不將此列ID加入到圖。
同步流程
最佳實務
資源準備
一個PolarDB MySQL版叢集作為資料來源,詳細操作請參考購買叢集。
一個PolarDB PostgreSQL版叢集,詳細操作請參考建立資料庫叢集。
PolarDB MySQL版和PolarDB PostgreSQL版均在同一地區、同一可用性區域和同一個VPC中。
步驟
在PolarDB MySQL版叢集寫入新增資料,通過DTS任務同步到PolarDB PostgreSQL版的圖資料庫中。
基礎資料準備。
假設需要同步的圖資料由三部分組成:兩個點資料表A和B,用於記錄圖上的點,且各自存在唯一ID。一個邊資料表C,記錄其起點在A中和終點在B中的唯一ID。然後在PolarDB PostgreSQL版中建立一張圖,包含A,B兩種類型的節點和C這種類型的邊。
PolarDB MySQL版
建立表定義。
CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp); CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp); CREATE TABLE raw_C(id integer, id_a integer, id_b integer);使用DTS前,需要開啟PolarDB MySQL版的Binlog功能,詳細操作請參考開啟Binlog。
PolarDB PostgreSQL版
在目標資料庫使用高許可權帳號建立並載入外掛程式,建立高許可權帳號請參考建立資料庫帳號。
說明使用Data Management(Data Management)用戶端設定
search_path時,可能會存在相容性問題,您可使用PolarDB-Tools執行相關語句。CREATE EXTENSION age; ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';建立圖、點和邊。
SELECT create_graph('gra'); SELECT create_vlabel('gra', 'label_a'); SELECT create_vlabel('gra', 'label_b'); SELECT create_elabel('gra', 'edge_c');
通過DTS將資料同步到PolarDB PostgreSQL版。
說明在搭建同步鏈路的過程中,請勿寫入資料,否則該部分資料將無法匯入圖中。
在搭建DTS鏈路完成後,請勿修改同步表的資料結構(如增加、刪除列等操作),否則可能導致後續無法同步。
進入目標地區的同步工作清單頁面(二選一)。
通過DTS控制台進入
在左側導覽列,單擊資料同步。
在頁面左上方,選擇同步執行個體所屬地區。
通過DMS控制台進入
說明實際操作可能會因DMS的模式和布局不同,而有所差異。更多資訊,請參見極簡模式控制台和自訂DMS介面布局與樣式。
在頂部功能表列中,選擇。
在同步任務右側,選擇同步執行個體所屬地區。
單擊創建任務,進入任務配置頁面。
在建立同步任務頁面,配置以下資訊。
說明此處僅為部分配置項,詳細配置項說明可參考配置參數說明。
類別
配置
說明
源庫資訊
資料庫類型
選擇MySQL。
接入方式
選擇專線/VPN網關/智能網關。
目標庫資訊
資料庫類型
選擇PostgreSQL。
接入方式
選擇專線/VPN網關/智能網關。
根據實際叢集資訊填寫的執行個體地區、VPC網段、叢集地址、連接埠、使用者名稱和密碼等資訊。
在配置任務對象的對象配置步驟,選擇對應資料庫下,TABLE欄目中的raw_A、raw_B、raw_C三張表。後續步驟使用預設配置,單擊下一步即可。
儲存任務並進行預檢查。預檢查通過率顯示為100%時,單擊下一步購買。
在購買頁面,選擇資料同步執行個體的計費方式、鏈路規格,配置完成後,閱讀並勾選《資料轉送(隨用隨付)服務條款》,單擊購買並啟動,並在彈出的確認對話方塊,單擊確定。
說明購買頁面的資訊配置請參考購買執行個體。
您可在DTS資料同步介面查看具體任務進度。
通過觸發器將資料同步到圖。
在PolarDB PostgreSQL版叢集建立如下輔助函數:
--- 函數1 CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text) RETURNS bigint AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)' language SQL IMMUTABLE STRICT PARALLEL SAFE; --- 函數2 CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql; --- 函數3 CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql;執行輔助函數即可構建從同步表到圖中的觸發器。
說明請統一使用小寫,大小寫敏感。
其中
your_schema_name需要替換成raw_a等資料表所在的schema,可在psql用戶端使用\d+ <table_name>查看,通常與原表schema保持一致。
select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a'); select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b'); select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
說明該觸發器只能同步增量資料,對於存量資料表,需要在建立上述觸發器之後執行如下語句:
INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A; INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B; INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;測實驗證。
資料插入
在PolarDB MySQL版中,向需要同步的表中插入測試資料。
INSERT INTO raw_a values(1,1,1,'2000-01-01'); INSERT INTO raw_b values(1,1,1,1,'2000-01-01'); INSERT INTO raw_c values(1,1,1);在PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料插入成功:
SELECT * FROM cypher('gra', $$ MATCH (v) RETURN v $$) as (v agtype);返回結果如下:
------ {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertexSELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);返回結果如下:
------ {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
屬性修改
在PolarDB MySQL版中,向需要同步的表中修改測試資料屬性。
UPDATE raw_a SET name = '2' WHERE id = 1;在PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料屬性修改成功:
SELECT * FROM cypher('gra', $$ MATCH (v:label_a {id:1}) RETURN v $$) as (v agtype);返回結果如下:
----- {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
資料刪除
在PolarDB MySQL版中,在需要同步的表中刪除測試資料。
DELETE FROM raw_c WHERE id = 1;在PolarDB PostgreSQL版叢集中使用cypher語言進行圖查詢,驗證資料刪除成功:
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);返回結果為空白,即資料刪除已同步。
注意事項
在搭建同步鏈路的過程中不可以寫入資料,否則這部分資料會無法匯入圖。
在搭建DTS鏈路完成後,請勿修改同步表的資料結構(如增加、刪除列等操作),否則可能導致後續無法同步。
上述輔助函數會將全部的列作為屬性加入到圖的屬性中,如果希望調整加入到圖中的屬性,可以通過修改形如
_sync_<表名>_row_to_properties函數的定義,其常規定義如下所示。將需要修改加入的列或對列的值進行修改的,可以修改select val.id, val.id_a, val.id_b部分。例如,希望將id_a和id_b兩列進行拼接,可以調整為select val.id_a::text || val.id_b::text AS id。CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype' LANGUAGE SQL;在寫入邊時,需要確保其兩側的點已經插入後再寫入這條邊,否則可能造成圖資料庫在查詢時因為找不到對應的點而產生錯誤。