All Products
Search
Document Center

PolarDB:Graph analysis based on PolarDB: Synchronize data tables from other databases to PolarDB graphs by using DTS

Last Updated:Jun 04, 2025

To query graph data in graph databases, you can first write data into other databases, and then synchronize the data to graph databases. This topic describes the process of creating a Data Transmission Service (DTS) task to transfer data from a PolarDB for MySQL database to a PolarDB for PostgreSQL graph database.

Prerequisites

Version requirements

Your PolarDB for PostgreSQL cluster runs one of the following database engine versions:

  • PolarDB for PostgreSQL 16 with revision version 2.0.16.8.3.0 or later

  • PolarDB for PostgreSQL 15 with revision version 2.0.15.12.4.0 or later

  • PolarDB for PostgreSQL 14 with revision version 2.0.14.12.24.0 or later

Note

You can check the revision version of your cluster in the PolarDB console or by executing the SHOW polardb_version; statement. If the revision version does not meet the requirements, update it.

Data format

  • A node or edge record must contain a unique ID value among the records of the same type (label). The ID value must be smaller than 2^48. For an edge record, in addition to the edge ID, it must also contain two columns indicating the IDs of start node and end node.

  • For a node or edge without a unique ID or with a non-integer unique ID, add a column of the serial type to generate its unique ID. Without unique IDs, data changes and deletions cannot be synchronized. The IDs are automatically generated by using the serial column and can be excluded when you import data to graphs.

Synchronization process

image

Best practice

Preparation

  • Create a PolarDB for MySQL cluster as the data source. For more information, see Purchase a cluster.

  • Create a PolarDB for PostgreSQL cluster. For more information, see Create a cluster.

  • The PolarDB for MySQL cluster and PolarDB for PostgreSQL cluster are in the same region, zone, and VPC.

Procedure

You can insert data into the PolarDB for MySQL cluster and synchronize the data to the PolarDB for PostgreSQL graph database by using DTS.

  1. Prepare basic data.

    In this example, three tables are created and imported to the graph database: two node tables A and B, each storing nodes in the graph with unique IDs, and an edge table C, storing the IDs of the start nodes in A and end nodes in B. A graph is created in PolarDB for PostgreSQL that consists of nodes of the A and B types and edges of the C type.

    • In the PolarDB for MySQL cluster:

      1. Create table definitions.

        CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
        CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
        CREATE TABLE raw_C(id integer, id_a integer, id_b integer);
      2. Before using DTS, you must enable the binary logging feature for the PolarDB for MySQL cluster. For more information, see Enable binary logging.

    • In the PolarDB for PostgreSQL cluster:

      1. Use a privileged account to create and load extensions into the destination database. For more information about how to create a privileged account, see Create a database account.

        Note

        Compatibility issues may occur when you use Data Management (DMS) to configure the search_path. In such cases, you can use PolarDB-Tools to execute related statements.

        CREATE EXTENSION age;
        ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog;
        ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';
      2. Create the graph, nodes, and edge.

        SELECT create_graph('gra');
        SELECT create_vlabel('gra', 'label_a');
        SELECT create_vlabel('gra', 'label_b');
        SELECT create_elabel('gra', 'edge_c');
  2. Use DTS to synchronize the data to the PolarDB for PostgreSQL database.

    Note
    • Do not write data into the source database during the creation of the synchronization instance. Such data cannot be synchronized to the graph.

    • After the DTS instance is created, do not modify the data structure of the tables, such as adding or deleting columns. Otherwise, subsequent synchronization tasks may fail.

    1. Use one of the following methods to go to the Data Synchronization page and select the region in which the data synchronization instance resides.

      DTS console

      1. Log on to the DTS console.

      2. In the left-side navigation pane, click Data Synchronization.

      3. In the upper-left corner of the page, select the region in which the data synchronization instance resides.

      DMS console

      Note

      The actual operations may vary based on the mode and layout of the DMS console. For more information, see Simple mode and Customize the layout and style of the DMS console.

      1. Log on to the DMS console.

      2. In the top navigation bar, move the pointer over Data + AI and choose DTS (DTS) > Data Synchronization.

      3. From the drop-down list to the right of Data Synchronization Tasks, select the region in which the data synchronization instance resides.

    2. Click Create Task to go to the task configuration page.

    3. On the synchronization task creation page, configure the following parameters:

      Note

      The following table lists a part of the parameters. For more information about the parameters, see parameters.

      Section

      Parameter

      Description

      Source Database

      Database Type

      Select MySQL.

      Access Method

      Select Express Connect, VPN Gateway, or Smart Access Gateway.

      Destination Database

      Database Type

      Select PostgreSQL.

      Access Method

      Select Express Connect, VPN Gateway, or Smart Access Gateway.

      Specify the instance region, VPC CIDR block, cluster endpoint, port, username, and password based on the cluster information.

    4. In the Configure Task Objects step of Configure Objects, select the raw_A, raw_B, and raw_C tables from the TABLE section of the corresponding database. Keep the default settings for subsequent steps and click Next.

      Note

      For more information about other parameters, see parameters and advanced settings.

    5. Save the task and perform a precheck. When the Success Rate displays 100%, click Next: Purchase Instance.

    6. On the Purchase page, select the billing method and instance class for the data synchronization instance. After that, view and agree to the Data Transmission Service (Pay-As-You-Go) Service Terms and click Buy and Start. In the pop-up dialog box, click OK.

      Note
  3. Synchronize data to the graph database by using triggers.

    1. In the PolarDB for PostgreSQL cluster, create the following auxiliary functions:

      --- Function 1
      CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)
      RETURNS bigint
      AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'
      language SQL IMMUTABLE STRICT PARALLEL SAFE;
      
      
      
      --- Function 2
      CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
      
      
      
      --- Function 3
      CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)
      RETURNS BOOL
      AS
      $outer$
      DECLARE
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
        sql := $$
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;
        $$;
      
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
    2. Execute the auxiliary functions to create triggers for the synchronization from the tables to the graph.

      Note
      • The content is case-sensitive. Use lowercase letters.

      • Replace your_schema_name with the actual schema in which raw_a and other data tables reside. Execute the \d+ <table_name> command in the psql client to view the schema. It is typically the same as the original schema.

      select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');
      select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');
      select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');
    Note

    This trigger is used to synchronize incremental data. To synchronize existing data tables, execute the following statements after creating the triggers:

    INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;
    INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;
    INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;
  4. Verify data synchronization.

    Data insertion

    1. In the PolarDB for MySQL cluster, insert test data into the tables for synchronization.

      INSERT INTO raw_a values(1,1,1,'2000-01-01');
      INSERT INTO raw_b values(1,1,1,1,'2000-01-01');
      INSERT INTO raw_c values(1,1,1);
    2. In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the data insertion is synchronized:

      • SELECT * FROM cypher('gra', $$
        MATCH (v)
        RETURN v
        $$) as (v agtype);

        Sample result:

        ------
         {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex
         {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex
      • SELECT * FROM cypher('gra', $$
        MATCH (v)-[e]->(v2)
        RETURN e
        $$) as (e agtype);

        Sample result:

        ------
         {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge

    Property modification

    1. In the PolarDB for MySQL cluster, modify the properties of test data in the table for synchronization.

      UPDATE raw_a SET name = '2' WHERE id = 1;
    2. In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the property modification is synchronized:

      SELECT * FROM cypher('gra', $$
      MATCH (v:label_a {id:1})
      RETURN v
      $$) as (v agtype);

      Sample result:

      -----
       {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex

    Data deletion

    1. In the PolarDB for MySQL cluster, delete test data from the table for synchronization.

      DELETE FROM raw_c WHERE id = 1;
    2. In the PolarDB for PostgreSQL cluster, query the graph by using Cypher to verify whether the data deletion is synchronized:

      SELECT * FROM cypher('gra', $$
      MATCH (v)-[e]->(v2)
      RETURN e
      $$) as (e agtype);

      An empty result indicates that the data deletion is synchronized.

Usage notes

  • Do not write data into the database during the creation of the synchronization instance. Such data cannot be synchronized to the graph.

  • After the DTS instance is created, do not modify the data structure of the tables, such as adding or deleting columns. Otherwise, subsequent synchronization tasks may fail.

  • The preceding auxiliary functions automatically add all columns as properties to the graph. If you want to customize the properties added to the graph, you can modify the definition of the _sync_<table_name>_row_to_properties function, which is typically used as shown in the following sample code. To change the columns to be added or to modify the column values, you can modify the section select val.id, val.id_a, val.id_b. For example, to merge the columns id_a and id_b, you can change the statement to select val.id_a::text || val.id_b::text AS id.

    CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)
    RETURNS agtype
    AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'
    LANGUAGE SQL;
  • Before you insert an edge, make sure that the start and end nodes of the edge are already inserted. Otherwise, the graph database cannot find the corresponding nodes and an error is reported.