To query graph data in graph databases, you can first write data into other databases, and then synchronize the data to graph databases. This topic describes the process of creating a Data Transmission Service (DTS) task to transfer data from a PolarDB for MySQL database to a PolarDB for PostgreSQL graph database.
Prerequisites
Version requirements
Your PolarDB for PostgreSQL cluster runs one of the following database engine versions:
PolarDB for PostgreSQL 16 with revision version 2.0.16.8.3.0 or later
PolarDB for PostgreSQL 15 with revision version 2.0.15.12.4.0 or later
PolarDB for PostgreSQL 14 with revision version 2.0.14.12.24.0 or later
You can check the revision version of your cluster in the PolarDB console or by executing the SHOW polardb_version; statement. If the revision version does not meet the requirements, update it.
Data format
A node or edge record must contain a unique ID value among the records of the same type (label). The ID value must be smaller than 2^48. For an edge record, in addition to the edge ID, it must also contain two columns indicating the IDs of start node and end node.
For a node or edge without a unique ID or with a non-integer unique ID, add a column of the
serialtype to generate its unique ID. Without unique IDs, data changes and deletions cannot be synchronized. The IDs are automatically generated by using theserialcolumn and can be excluded when you import data to graphs.
Synchronization process
Best practice
Preparation
Create a PolarDB for MySQL cluster as the data source. For more information, see Purchase a cluster.
Create a PolarDB for PostgreSQL cluster. For more information, see Create a cluster.
The PolarDB for MySQL cluster and PolarDB for PostgreSQL cluster are in the same region, zone, and VPC.
Procedure
You can insert data into the PolarDB for MySQL cluster and synchronize the data to the PolarDB for PostgreSQL graph database by using DTS.
Prepare basic data.
In this example, three tables are created and imported to the graph database: two node tables A and B, each storing nodes in the graph with unique IDs, and an edge table C, storing the IDs of the start nodes in A and end nodes in B. A graph is created in PolarDB for PostgreSQL that consists of nodes of the A and B types and edges of the C type.
In the PolarDB for MySQL cluster:
Create table definitions.
CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp); CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp); CREATE TABLE raw_C(id integer, id_a integer, id_b integer);Before using DTS, you must enable the binary logging feature for the PolarDB for MySQL cluster. For more information, see Enable binary logging.
In the PolarDB for PostgreSQL cluster:
Use a privileged account to create and load extensions into the destination database. For more information about how to create a privileged account, see Create a database account.
NoteCompatibility issues may occur when you use Data Management (DMS) to configure the
search_path. In such cases, you can use PolarDB-Tools to execute related statements.CREATE EXTENSION age; ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog; ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';Create the graph, nodes, and edge.
SELECT create_graph('gra'); SELECT create_vlabel('gra', 'label_a'); SELECT create_vlabel('gra', 'label_b'); SELECT create_elabel('gra', 'edge_c');
Use DTS to synchronize the data to the PolarDB for PostgreSQL database.
NoteDo not write data into the source database during the creation of the synchronization instance. Such data cannot be synchronized to the graph.
After the DTS instance is created, do not modify the data structure of the tables, such as adding or deleting columns. Otherwise, subsequent synchronization tasks may fail.
Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.
DTS console
Log on to the DTS console.
In the left-side navigation pane, click Data Synchronization.
In the upper-left corner of the page, select the region in which the data synchronization instance resides.
DMS console
NoteThe actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.
Log on to the DMS console.
In the top navigation bar, move the pointer over Data + AI and choose .
From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.
Click Create Task to go to the task configuration page.
On the synchronization task creation page, configure the following parameters:
NoteThe following table lists a part of the parameters. For more information about the parameters, see parameters.
Section
Parameter
Description
Source Database
Database Type
Select MySQL.
Access Method
Select Express Connect, VPN Gateway, or Smart Access Gateway.
Destination Database
Database Type
Select PostgreSQL.
Access Method
Select Express Connect, VPN Gateway, or Smart Access Gateway.
Specify the instance region, VPC CIDR block, cluster endpoint, port, username, and password based on the cluster information.
In the Configure Task Objects step of Configure Objects, select the raw_A, raw_B, and raw_C tables from the TABLE section of the corresponding database. Keep the default settings for subsequent steps and click Next.
NoteFor more information about other parameters, see parameters and advanced settings.
Save the task and perform a precheck. When the Success Rate displays 100%, click Next: Purchase Instance.
On the Purchase page, select the billing method and instance class for the data synchronization instance. After that, view and agree to the Data Transmission Service (Pay-As-You-Go) Service Terms and click Buy and Start. In the pop-up dialog box, click OK.
NoteFor information about the purchase page, see Purchase a data synchronization instance.
You can monitor the task progress on the DTS data synchronization page.
Synchronize data to the graph database by using triggers.
In the PolarDB for PostgreSQL cluster, create the following auxiliary functions:
--- Function 1 CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text) RETURNS bigint AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)' language SQL IMMUTABLE STRICT PARALLEL SAFE; --- Function 2 CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql; --- Function 3 CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text) RETURNS BOOL AS $outer$ DECLARE column_names TEXT; sql TEXT; BEGIN SELECT string_agg(format('val.%I', column_name), ', ') INTO column_names FROM information_schema.columns WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name; sql := $$ CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint) RETURNS graphid AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype' LANGUAGE SQL; CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS $inner$ BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW)); RETURN NEW; ELSIF TG_OP = 'UPDATE' THEN UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN NEW; ELSIF TG_OP = 'DELETE' THEN DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$"); RETURN OLD; END IF; RETURN NULL; END; $inner$ LANGUAGE plpgsql; CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$ FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$(); ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update; ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete; $$; EXECUTE sql; RETURN true; END; $outer$ LANGUAGE plpgsql;Execute the auxiliary functions to create triggers for the synchronization from the tables to the graph.
NoteThe content is case-sensitive. Use lowercase letters.
Replace
your_schema_namewith the actualschemain whichraw_aand other data tables reside. Execute the\d+ <table_name>command in thepsqlclient to view the schema. It is typically the same as the originalschema.
select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a'); select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b'); select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
NoteThis trigger is used to synchronize incremental data. To synchronize existing data tables, execute the following statements after creating the triggers:
INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A; INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B; INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;Verify data synchronization.
Data insertion
In the PolarDB for MySQL cluster, insert test data into the tables for synchronization.
INSERT INTO raw_a values(1,1,1,'2000-01-01'); INSERT INTO raw_b values(1,1,1,1,'2000-01-01'); INSERT INTO raw_c values(1,1,1);In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the data insertion is synchronized:
SELECT * FROM cypher('gra', $$ MATCH (v) RETURN v $$) as (v agtype);Sample result:
------ {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertexSELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);Sample result:
------ {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge
Property modification
In the PolarDB for MySQL cluster, modify the properties of test data in the table for synchronization.
UPDATE raw_a SET name = '2' WHERE id = 1;In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the property modification is synchronized:
SELECT * FROM cypher('gra', $$ MATCH (v:label_a {id:1}) RETURN v $$) as (v agtype);Sample result:
----- {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex
Data deletion
In the PolarDB for MySQL cluster, delete test data from the table for synchronization.
DELETE FROM raw_c WHERE id = 1;In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the data deletion is synchronized:
SELECT * FROM cypher('gra', $$ MATCH (v)-[e]->(v2) RETURN e $$) as (e agtype);An empty result indicates that the data deletion is synchronized.
Usage notes
Do not write data into the database during the creation of the synchronization instance. Such data cannot be synchronized to the graph.
After the DTS instance is created, do not modify the data structure of the tables, such as adding or deleting columns. Otherwise, subsequent synchronization tasks may fail.
The preceding auxiliary functions automatically add all columns as properties to the graph. If you want to customize the properties added to the graph, you can modify the definition of the
_sync_<table_name>_row_to_propertiesfunction, which is typically used as shown in the following sample code. To change the columns to be added or to modify the column values, you can modify the sectionselect val.id, val.id_a, val.id_b. For example, to merge the columnsid_aandid_b, you can change the statement toselect val.id_a::text || val.id_b::text AS id.CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C) RETURNS agtype AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype' LANGUAGE SQL;Before you insert an edge, make sure that the start and end nodes of the edge are already inserted. Otherwise, the graph database cannot find the corresponding nodes and an error is reported.