When your graph data originates in a relational database, keeping the two systems in sync requires ongoing work. This topic describes how to use Data Transmission Service (DTS) to continuously replicate data from a PolarDB for MySQL cluster to a PolarDB for PostgreSQL graph database powered by Apache AGE. Changes to relational tables—inserts, updates, and deletes—are reflected in the graph through PostgreSQL triggers created by the auxiliary functions provided here.
Prerequisites
Before you begin, make sure you have:
-
A PolarDB for PostgreSQL cluster running one of the following revision versions:
-
PolarDB for PostgreSQL 16, revision version 2.0.16.8.3.0 or later
-
PolarDB for PostgreSQL 15, revision version 2.0.15.12.4.0 or later
-
PolarDB for PostgreSQL 14, revision version 2.0.14.12.24.0 or later
Check the revision version of your cluster in the PolarDB console or by running
SHOW polardb_version;. If the version does not meet the requirements, update it. -
-
A PolarDB for PostgreSQL cluster. For more information, see Create a cluster.
-
A PolarDB for MySQL cluster as the data source, with binary logging enabled. See Purchase a cluster.
-
Both clusters in the same region, zone, and VPC.
-
A privileged account on the PolarDB for PostgreSQL cluster to create and load extensions. See .
Source data format requirements
Your source tables must meet the following ID constraints before you set up synchronization.
Node tables:
| Requirement | Details |
|---|---|
| Unique ID per record | Required within the same label; must be an integer less than 2^48 |
| No unique ID or non-integer ID | Add a serial type column to generate IDs automatically; serial-generated IDs can be excluded when importing to graphs |
| Without unique IDs | Data changes and deletions cannot be synchronized |
Edge tables:
| Requirement | Details |
|---|---|
| Edge ID | Required; must be an integer less than 2^48 |
| Start node column | Required; stores the ID of the start node |
| End node column | Required; stores the ID of the end node |
How it works
DTS transfers data from PolarDB for MySQL to the relational tables in PolarDB for PostgreSQL. PostgreSQL triggers on those tables then mirror every change into the Apache AGE graph structure.
Limitations
-
Do not write data into the source database while the DTS synchronization instance is being created. Data written during this window cannot be synchronized to the graph.
-
After the DTS instance is created, do not modify the table data structure (for example, adding or deleting columns). Structural changes cause subsequent synchronization tasks to fail.
-
Insert all start and end nodes before inserting an edge. If the corresponding nodes do not exist, the graph database reports an error.
-
The auxiliary functions in this topic add all columns as graph properties by default. To customize which columns are added, see Customize graph properties.
Set up the environment
Step 1: Create source tables in PolarDB for MySQL
Create the node and edge tables to be synchronized:
CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
In this example, raw_A and raw_B store nodes, and raw_C stores edges. Each row in raw_C references a node in raw_A (via id_a) and a node in raw_B (via id_b).
Step 2: Set up the graph in PolarDB for PostgreSQL
-
Use a privileged account to load the AGE extension into the destination database.
Compatibility issues may occur when using Data Management (DMS) to configure
search_path. Use PolarDB-Tools to run the following statements instead.CREATE EXTENSION age; ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age'; -
Create the graph, vertex labels, and edge label:
SELECT create_graph('gra'); SELECT create_vlabel('gra', 'label_a'); SELECT create_vlabel('gra', 'label_b'); SELECT create_elabel('gra', 'edge_c');
Step 3: Create a DTS synchronization task
-
Go to the Data Synchronization page using one of the following methods: DTS console: DMS console:
-
Log on to the DMS console.
-
In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.
-
From the drop-down list next to Data Synchronization Tasks, select the region where the synchronization instance resides.
The actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
-
-
Click Create Task.
-
On the task configuration page, set the following parameters:
The table below covers key parameters. For the full parameter list, see parameters.
Section Parameter Value Source database Database type MySQL Access method Express Connect, VPN Gateway, or Smart Access Gateway Destination database Database type PostgreSQL Access method Express Connect, VPN Gateway, or Smart Access Gateway Specify the instance region, VPC CIDR block, cluster endpoint, port, username, and password based on your cluster information.
-
In the Configure Task Objects step, select
raw_A,raw_B, andraw_Cfrom the TABLE section. Keep the default settings for subsequent steps and click Next.For other parameters, see parameters and advanced settings.
-
Save the task and run the precheck. When Success Rate shows 100%, click Next: Purchase Instance.
-
On the Purchase page, select the billing method and instance class, agree to the Data Transmission Service (Pay-As-You-Go) Service Terms, and click Buy and Start. In the dialog box, click OK.
For details about the Purchase page, see Purchase a data synchronization instance. Monitor the task progress on the DTS data synchronization page.
Step 4: Create triggers to sync data into the graph
Run the following statements in the PolarDB for PostgreSQL cluster to create three auxiliary functions.
Function 1: `age_name_to_idx_start`
Resolves a graph label name to its starting ID range.
CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
RETURNS bigint
AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
language SQL IMMUTABLE STRICT PARALLEL SAFE;
Function 2: `build_age_triggers_for_vertex`
Creates INSERT, UPDATE, and DELETE triggers that mirror changes from a relational table into a graph vertex label.
CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
column_names TEXT;
sql TEXT;
BEGIN
SELECT string_agg(format('val.%I', column_name), ', ')
INTO column_names
FROM information_schema.columns
WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
sql := $$
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
RETURN OLD;
END IF;
RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
$$;
EXECUTE sql;
RETURN true;
END;
$outer$
LANGUAGE plpgsql;
Parameters:
| Parameter | Description | Example |
|---|---|---|
schema_name |
Schema where the source table resides | public |
table_name |
Name of the source relational table | raw_a |
table_id_col |
Column that holds the unique node ID | id |
graph_name |
Name of the AGE graph | gra |
graph_label |
Vertex label in the graph | label_a |
Function 3: `build_age_triggers_for_edge`
Creates INSERT, UPDATE, and DELETE triggers that mirror changes from a relational table into a graph edge label.
CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
RETURNS BOOL
AS
$outer$
DECLARE
column_names TEXT;
sql TEXT;
BEGIN
SELECT string_agg(format('val.%I', column_name), ', ')
INTO column_names
FROM information_schema.columns
WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
sql := $$
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
RETURNS graphid
AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
LANGUAGE SQL;
CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
$inner$
BEGIN
IF TG_OP = 'INSERT' THEN
INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
RETURN NEW;
ELSIF TG_OP = 'UPDATE' THEN
UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
RETURN NEW;
ELSIF TG_OP = 'DELETE' THEN
DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
RETURN OLD;
END IF;
RETURN NULL;
END;
$inner$ LANGUAGE plpgsql;
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
$$;
EXECUTE sql;
RETURN true;
END;
$outer$
LANGUAGE plpgsql;
Parameters:
| Parameter | Description | Example |
|---|---|---|
schema_name |
Schema where the source table resides | public |
table_name |
Name of the source edge table | raw_c |
table_id_col |
Column that holds the unique edge ID | id |
start_table_name |
Name of the source table for the start node | raw_a |
start_id_col |
Column in the edge table that holds the start node ID | id_a |
end_table_name |
Name of the source table for the end node | raw_b |
end_id_col |
Column in the edge table that holds the end node ID | id_b |
graph_name |
Name of the AGE graph | gra |
graph_label |
Edge label in the graph | edge_c |
Step 5: Activate triggers for your tables
Call the auxiliary functions to create triggers for the example tables.
Table names are case-sensitive. Use lowercase letters. Replaceyour_schema_namewith the actual schema whereraw_aand other tables reside. Run\d+ <table_name>in the psql client to check the schema.
select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
These triggers handle incremental data from this point forward. To backfill data that already exists in the tables, run the following after creating the triggers:
INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;
INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;
INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
Verify data synchronization
Run the following tests in PolarDB for MySQL and check the results in PolarDB for PostgreSQL.
Data insertion
-
In PolarDB for MySQL, insert test rows:
INSERT INTO raw_a values(1,1,1,'2000-01-01'); INSERT INTO raw_b values(1,1,1,1,'2000-01-01'); INSERT INTO raw_c values(1,1,1); -
In PolarDB for PostgreSQL, query all nodes:
SELECT * FROM cypher('gra', $$ MATCH (v) RETURN v $$) as (v agtype);Expected result:
{"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex -
Query all edges:
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);Expected result:
{"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
Property modification
-
In PolarDB for MySQL, update a node property:
UPDATE raw_a SET name = '2' WHERE id = 1; -
In PolarDB for PostgreSQL, verify the update:
SELECT * FROM cypher('gra', $$ MATCH (v:label_a {id:1}) RETURN v $$) as (v agtype);Expected result:
{"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
Data deletion
-
In PolarDB for MySQL, delete a row:
DELETE FROM raw_c WHERE id = 1; -
In PolarDB for PostgreSQL, verify the deletion:
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);An empty result set confirms the edge was deleted from the graph.
Customize graph properties
By default, the auxiliary functions map all columns in a table to graph properties. To control which columns are included, redefine the _sync_<table_name>_row_to_properties function.
For example, to include only id, id_a, and id_b from raw_C:
CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
RETURNS agtype
AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
LANGUAGE SQL;
To merge two columns into one property, change the SELECT clause. For example, to concatenate id_a and id_b:
-- Change: select val.id_a, val.id_b
-- To:
select val.id_a::text || val.id_b::text AS id