すべてのプロダクト
Search
ドキュメントセンター

PolarDB:PolarDB に基づくグラフ分析: DTS を使用して他のデータベースから PolarDB グラフにデータテーブルを同期する

最終更新日:Jun 05, 2025

グラフデータベースのグラフデータをクエリするには、最初に他のデータベースにデータを書き込み、次にデータをグラフデータベースに同期します。 このトピックでは、PolarDB for MySQL データベースから PolarDB for PostgreSQL グラフデータベースにデータを転送する Data Transmission Service (DTS) タスクを作成するプロセスについて説明します。

前提条件

バージョン要件

PolarDB for PostgreSQL クラスタは、次のデータベースエンジンバージョンのいずれかを実行しています。

  • PolarDB for PostgreSQL 16 リビジョンバージョン 2.0.16.8.3.0 以後

  • PolarDB for PostgreSQL 15 リビジョンバージョン 2.0.15.12.4.0 以後

  • PolarDB for PostgreSQL 14 リビジョンバージョン 2.0.14.12.24.0 以後

説明

PolarDB コンソールで、または SHOW polardb_version; 文を実行することで、クラスタのリビジョンバージョンを確認できます。 リビジョンバージョンが要件を満たしていない場合は、更新してください。

データ形式

  • ノードまたはエッジレコードには、同じタイプ(ラベル)のレコードの中で一意の ID 値が含まれている必要があります。 ID 値は 2^48 より小さい必要があります。 エッジレコードの場合、エッジ ID に加えて、開始ノードと終了ノードの ID を示す 2 つの列も含まれている必要があります。

  • 一意の ID がないノードまたはエッジ、または整数ではない一意の ID を持つノードまたはエッジの場合は、serial タイプの列を追加して、一意の ID を生成します。 一意の ID がないと、データの変更と削除を同期できません。 ID は serial 列を使用して自動的に生成され、グラフにデータをインポートするときに除外できます。

同期プロセス

ベストプラクティス

準備

  • データソースとして PolarDB for MySQL クラスタを作成します。 詳細については、「クラスタを購入する」をご参照ください。

  • PolarDB for PostgreSQL クラスタを作成します。 詳細については、「クラスタを作成する」をご参照ください。

  • PolarDB for MySQL クラスタと PolarDB for PostgreSQL クラスタは、同じリージョン、ゾーン、および VPC にあります。

手順

PolarDB for MySQL クラスタにデータを挿入し、DTS を使用して PolarDB for PostgreSQL グラフデータベースにデータを同期できます。

  1. 基本データを準備します。

    この例では、3 つのテーブルが作成され、グラフデータベースにインポートされます。2 つのノードテーブル A と B はそれぞれ一意の ID を持つグラフのノードを格納し、エッジテーブル C は A の開始ノードと B の終了ノードの ID を格納します。 PolarDB for PostgreSQL には、A タイプと B タイプのノード、および C タイプのエッジで構成されるグラフが作成されます。

    • PolarDB for MySQL クラスタの場合:

      1. テーブル定義を作成します。

        CREATE TABLE raw_A(id integer, name text, `desc` text, time_created timestamp);
        CREATE TABLE raw_B(id integer, name text, `desc` text, `value` integer, time_created timestamp);
        CREATE TABLE raw_C(id integer, id_a integer, id_b integer);/* raw_C テーブルを作成 */
      2. DTS を使用する前に、PolarDB for MySQL クラスタのバイナリロギング機能を有効にする必要があります。 詳細については、「バイナリロギングを有効にする」をご参照ください。

    • PolarDB for PostgreSQL クラスタの場合:

      1. 特権アカウントを使用して、ターゲットデータベースに拡張機能を作成してロードします。 特権アカウントの作成方法の詳細については、「データベースアカウントを作成する」をご参照ください。

        説明

        データ管理 (DMS) を使用して search_path を構成すると、互換性の問題が発生する可能性があります。 そのような場合は、PolarDB-Tools を使用して関連する文を実行できます。

        CREATE EXTENSION age;/* age 拡張機能を作成 */
        ALTER DATABASE <dbname> SET search_path = "$user", public, ag_catalog;/* データベースの検索パスを設定 */
        ALTER DATABASE <dbname> SET session_preload_libraries TO 'age';/* データベースのセッションプリロードライブラリを設定 */
      2. グラフ、ノード、およびエッジを作成します。

        SELECT create_graph('gra');/* グラフを作成 */
        SELECT create_vlabel('gra', 'label_a');/* ノードラベルを作成 */
        SELECT create_vlabel('gra', 'label_b');/* ノードラベルを作成 */
        SELECT create_elabel('gra', 'edge_c');/* エッジラベルを作成 */
  2. DTS を使用して PolarDB for PostgreSQL データベースにデータを同期します。

    説明
    • 同期インスタンスの作成中にデータベースにデータを書き込まないでください。 そのようなデータはグラフに同期できません。

    • DTS インスタンスの作成後、列の追加や削除など、テーブルのデータ構造を変更しないでください。 そうしないと、後続の同期タスクが失敗する可能性があります。

    1. 次のいずれかの方法を使用して データ同期 ページに移動し、データ同期インスタンスが存在するリージョンを選択します。

      DTS コンソール

      1. DTS コンソール にログインします。

      2. 左側のナビゲーションウィンドウで、データ同期 をクリックします。

      3. ページの左上隅で、データ同期インスタンスが存在するリージョンを選択します。

      DMS コンソール

      説明

      実際の操作は、DMS コンソールのモードとレイアウトによって異なる場合があります。 詳細については、「シンプルモード」および「DMS コンソールのレイアウトとスタイルをカスタマイズする」をご参照ください。

      1. DMS コンソール にログインします。

      2. 上部のナビゲーションバーで、[データ + AI] にポインタを移動し、[DTS (DTS)] > [データ同期] を選択します。

      3. データ同期タスク の右側にあるドロップダウンリストから、データ同期インスタンスが存在するリージョンを選択します。

    2. タスクの作成 をクリックして、タスク構成ページに移動します。

    3. 同期タスク作成ページで、次のパラメータを構成します。

      説明

      次の表に、パラメータの一部を示します。 パラメータの詳細については、「パラメータ」をご参照ください。

      セクション

      パラメータ

      説明

      ソースデータベース

      データベースタイプ

      [MySQL] を選択します。

      アクセス方法

      [Express Connect、VPN Gateway、または Smart Access Gateway] を選択します。

      宛先データベース

      データベースタイプ

      [PostgreSQL] を選択します。

      アクセス方法

      [Express Connect、VPN Gateway、または Smart Access Gateway] を選択します。

      クラスタ情報に基づいて、インスタンスリージョン、VPC CIDR ブロック、クラスターエンドポイント、ポート、ユーザー名、およびパスワードを指定します。

    4. [オブジェクトの構成][タスクオブジェクトの構成] ステップで、対応するデータベースの TABLE セクションから raw_A、raw_B、および raw_C テーブルを選択します。 後続のステップではデフォルト設定を維持し、[次へ] をクリックします。

      説明

      その他のパラメータの詳細については、「パラメータ」および「詳細設定」をご参照ください。

    5. タスクを保存し、事前チェックを実行します。 [成功率][100%] と表示されたら、[次へ: インスタンスの購入] をクリックします。

    6. [購入] ページで、データ同期インスタンスの課金方法とインスタンスクラスを選択します。 その後、[Data Transmission Service (従量課金制) サービス規約] を確認して同意し、[購入して開始] をクリックします。 ポップアップ [ダイアログボックス] で、[OK] をクリックします。

      説明
  3. トリガーを使用してグラフデータベースにデータを同期します。

    1. PolarDB for PostgreSQL クラスタで、次の補助関数を作成します。

      --- 関数 1
      CREATE OR REPLACE FUNCTION age_name_to_idx_start(graph_name text, kind_name text, label_name text)/* age_name_to_idx_start 関数を作成または置換 */
      RETURNS bigint
      AS 'SELECT id::bigint<<48 FROM ag_catalog.ag_label WHERE kind = kind_name and name = label_name and graph = (SELECT graphid FROM ag_catalog.ag_graph WHERE name = graph_name)'/* SQL 文 */
      language SQL IMMUTABLE STRICT PARALLEL SAFE;
      
      
      
      --- 関数 2
      CREATE OR REPLACE FUNCTION build_age_triggers_for_vertex(schema_name text, table_name text, table_id_col text, graph_name text, graph_label text)/* build_age_triggers_for_vertex 関数を作成または置換 */
      RETURNS BOOL
      AS
      $outer$
      DECLARE/* 変数を宣言 */
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')/* 列名を連結 */
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_vertex.schema_name AND columns.table_name = build_age_triggers_for_vertex.table_name;
        sql := $$/* SQL 文を定義 */
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)/* _sync_[スキーマ名]_[テーブル名]_row_to_id 関数を作成または置換 */
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''v'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'/* SQL 文 */
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)/* _sync_[スキーマ名]_[テーブル名]_row_to_properties 関数を作成または置換 */
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'/* SQL 文 */
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS/* _sync_[スキーマ名]_[テーブル名] 関数を作成または置換 */
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN/* 挿入操作の場合 */
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));/* グラフにデータを挿入 */
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN/* 更新操作の場合 */
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET properties = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");/* グラフのデータを更新 */
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN/* 削除操作の場合 */
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");/* グラフからデータを削除 */
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert/* _sync_[スキーマ名]_[テーブル名]_insert トリガーを作成または置換 */
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$/* 挿入後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update/* _sync_[スキーマ名]_[テーブル名]_update トリガーを作成または置換 */
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$/* 更新後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete/* _sync_[スキーマ名]_[テーブル名]_delete トリガーを作成または置換 */
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$/* 削除後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;/* トリガーを有効化 */
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;/* トリガーを有効化 */
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;/* トリガーを有効化 */
        $$;
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
      
      
      
      --- 関数 3
      CREATE OR REPLACE FUNCTION build_age_triggers_for_edge(schema_name text, table_name text, table_id_col text, start_table_name text, start_id_col text, end_table_name text, end_id_col text, graph_name text, graph_label text)/* build_age_triggers_for_edge 関数を作成または置換 */
      RETURNS BOOL
      AS
      $outer$
      DECLARE/* 変数を宣言 */
        column_names TEXT;
        sql TEXT;
      BEGIN
        SELECT string_agg(format('val.%I', column_name), ', ')/* 列名を連結 */
          INTO column_names
          FROM information_schema.columns
          WHERE columns.table_schema = build_age_triggers_for_edge.schema_name AND columns.table_name = build_age_triggers_for_edge.table_name;
        sql := $$/* SQL 文を定義 */
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(id bigint)/* _sync_[スキーマ名]_[テーブル名]_row_to_id 関|| $$_row_to_id(id bigint)/* _sync_[スキーマ名]_[テーブル名]_row_to_id 関数を作成または置換 */
      RETURNS graphid
      AS 'SELECT (age_name_to_idx_start(''$$ || graph_name || $$'', ''e'', ''$$ || graph_label|| $$'') + id)::text::ag_catalog.graphid'/* SQL 文 */
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(val $$ || schema_name || $$.$$ || table_name || $$)/* _sync_[スキーマ名]_[テーブル名]_row_to_properties 関数を作成または置換 */
      RETURNS agtype
      AS 'SELECT row_to_json((select x FROM (select $$|| column_names || $$) x))::text::agtype'/* SQL 文 */
      LANGUAGE SQL;
      
      CREATE OR REPLACE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$() RETURNS TRIGGER AS/* _sync_[スキーマ名]_[テーブル名] 関数を作成または置換 */
      $inner$
      BEGIN
        IF TG_OP = 'INSERT' THEN/* 挿入操作の場合 */
          INSERT INTO "$$ || graph_name || $$"."$$ || graph_label || $$" (id, start_id, end_id, properties) VALUES (_sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(NEW."$$ || table_id_col || $$"), _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_properties(NEW));/* グラフにデータを挿入 */
          RETURN NEW;
        ELSIF TG_OP = 'UPDATE' THEN/* 更新操作の場合 */
          UPDATE "$$ || graph_name || $$"."$$ || graph_label || $$" SET start_id = _sync_$$ || schema_name || $$_$$ || start_table_name || $$_row_to_id(NEW."$$ || start_id_col || $$"), end_id = _sync_$$ || schema_name || $$_$$ || end_table_name || $$_row_to_id(NEW."$$ || end_id_col || $$"), properties = _sync_raw_A_row_to_properties(NEW) WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");/* グラフのデータを更新 */
          RETURN NEW;
        ELSIF TG_OP = 'DELETE' THEN/* 削除操作の場合 */
          DELETE FROM "$$ || graph_name || $$"."$$ || graph_label || $$" WHERE id = _sync_$$ || schema_name || $$_$$ || table_name || $$_row_to_id(OLD."$$ || table_id_col || $$");/* グラフからデータを削除 */
          RETURN OLD;
        END IF;
        RETURN NULL;
      END;
      $inner$ LANGUAGE plpgsql;
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert/* _sync_[スキーマ名]_[テーブル名]_insert トリガーを作成または置換 */
      AFTER INSERT ON $$ || schema_name || $$.$$ || table_name || $$/* 挿入後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update/* _sync_[スキーマ名]_[テーブル名]_update トリガーを作成または置換 */
      AFTER UPDATE ON $$ || schema_name || $$.$$ || table_name || $$/* 更新後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      CREATE OR REPLACE TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete/* _sync_[スキーマ名]_[テーブル名]_delete トリガーを作成または置換 */
      AFTER DELETE ON $$ || schema_name || $$.$$ || table_name || $$/* 削除後 */
      FOR EACH ROW EXECUTE FUNCTION _sync_$$ || schema_name || $$_$$ || table_name || $$();/* 各行に対して関数を実行 */
      
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_insert;/* トリガーを有効化 */
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_update;/* トリガーを有効化 */
      ALTER TABLE $$ || schema_name || $$.$$ || table_name || $$ ENABLE ALWAYS TRIGGER _sync_$$ || schema_name || $$_$$ || table_name || $$_delete;/* トリガーを有効化 */
        $$;
      
        EXECUTE sql;
        RETURN true;
      END;
      $outer$
      LANGUAGE plpgsql;
    2. 補助関数を実行して、テーブルからグラフへの同期のためのトリガーを作成します。

      説明
      • コンテンツは大文字と小文字が区別されます。 小文字を使用してください。

      • your_schema_name を、raw_a などのデータテーブルが存在する実際の スキーマ に置き換えます。 psql クライアントで \d+ <table_name> コマンドを実行して、スキーマを表示します。 通常は元の スキーマ と同じです。

      select build_age_triggers_for_vertex('your_schema_name','raw_a', 'id', 'gra', 'label_a');/* 頂点のトリガーを作成 */
      select build_age_triggers_for_vertex('your_schema_name','raw_b', 'id', 'gra', 'label_b');/* 頂点のトリガーを作成 */
      select build_age_triggers_for_edge('your_schema_name','raw_c', 'id', 'raw_a', 'id_a', 'raw_b', 'id_b', 'gra', 'edge_c');/* エッジのトリガーを作成 */
    説明

    このトリガーは、増分データを同期するために使用されます。 既存のデータテーブルを同期するには、トリガーを作成した後に次の文を実行します。

    INSERT INTO "gra"."label_a" (id, properties) SELECT sync_a_row_to_id(raw_A.id), sync_a_row_to_properties(raw_A) FROM raw_A;/* データをグラフに挿入 */
    INSERT INTO "gra"."label_b" (id, properties) SELECT sync_b_row_to_id(raw_B.id), sync_b_row_to_properties(raw_B) FROM raw_B;/* データをグラフに挿入 */
    INSERT INTO "gra"."edge_c" (id, start_id, end_id, properties) SELECT sync_c_row_to_id(raw_C.id), sync_a_row_to_id(raw_C.id_a), sync_b_row_to_id(raw_C.id_b), sync_c_row_to_properties(raw_C) FROM raw_C;/* データをグラフに挿入 */
  4. データ同期を確認します。

    データの挿入

    1. PolarDB for MySQL クラスタで、同期用のテーブルにテストデータを挿入します。

      INSERT INTO raw_a values(1,1,1,'2000-01-01');/* raw_a テーブルにデータを挿入 */
      INSERT INTO raw_b values(1,1,1,1,'2000-01-01');/* raw_b テーブルにデータを挿入 */
      INSERT INTO raw_c values(1,1,1);/* raw_c テーブルにデータを挿入 */
    2. PolarDB for PostgreSQL クラスタで、Cypher を使用してグラフをクエリし、データの挿入が同期されているかどうかを確認します。

      • SELECT * FROM cypher('gra', $$/* Cypher クエリを実行 */
        MATCH (v)/* ノード v に一致 */
        RETURN v/* v を返す */
        $$) as (v agtype);

        結果例:

        ------
         {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "1", "time_created": "2000-01-01T00:00:00"}}::vertex/* 頂点 */
         {"id": 1125899906842625, "label": "label_b", "properties": {"id": 1, "desc": "1", "name": "1", "value": 1, "time_created": "2000-01-01T00:00:00"}}::vertex/* 頂点 */
      • SELECT * FROM cypher('gra', $$/* Cypher クエリを実行 */
        MATCH (v)-[e]->(v2)/* ノード v からノード v2 へのエッジ e に一致 */
        RETURN e/* e を返す */
        $$) as (e agtype);

        結果例:

        ------
         {"id": 1407374883553281, "label": "edge_c", "end_id": 1125899906842625, "start_id": 844424930131969, "properties": {"id": "11"}}::edge/* エッジ */

    プロパティの変更

    1. PolarDB for MySQL クラスタで、同期用テーブルのテストデータのプロパティを変更します。

      UPDATE raw_a SET name = '2' WHERE id = 1;/* raw_a テーブルのデータを更新 */
    2. PolarDB for PostgreSQL クラスタで、Cypher を使用してグラフをクエリし、プロパティの変更が同期されているかどうかを確認します。

      SELECT * FROM cypher('gra', $$/* Cypher クエリを実行 */
      MATCH (v:label_a {id:1})/* id が 1 の label_a ノード v に一致 */
      RETURN v/* v を返す */
      $$) as (v agtype);

      結果例:

      -----
       {"id": 844424930131969, "label": "label_a", "properties": {"id": 1, "desc": "1", "name": "2", "time_created": "2000-01-01T00:00:00"}}::vertex/* 頂点 */

    データの削除

    1. PolarDB for MySQL クラスタで、同期用テーブルからテストデータを削除します。

      DELETE FROM raw_c WHERE id = 1;/* raw_c テーブルからデータを削除 */
    2. PolarDB for PostgreSQL クラスタで、Cypher を使用してグラフをクエリし、データの削除が同期されているかどうかを確認します。

      SELECT * FROM cypher('gra', $$/* Cypher クエリを実行 */
      MATCH (v)-[e]->(v2)/* ノード v からノード v2 へのエッジ e に一致 */
      RETURN e/* e を返す */
      $$) as (e agtype);

      空の結果は、データの削除が同期されていることを示します。

使用上の注意

  • 同期インスタンスの作成中にデータベースにデータを書き込まないでください。 そのようなデータはグラフに同期できません。

  • DTS インスタンスの作成後、列の追加や削除など、テーブルのデータ構造を変更しないでください。 そうしないと、後続の同期タスクが失敗する可能性があります。

  • 上記の 補助関数 は、すべての列をプロパティとしてグラフに自動的に追加します。 グラフに追加されるプロパティをカスタマイズする場合は、_sync_<table_name>_row_to_properties 関数の定義を変更できます。これは通常、次のサンプルコードのように使用されます。 追加する列を変更したり、列の値を変更したりするには、select val.id, val.id_a, val.id_b セクションを変更します。 たとえば、id_a 列と id_b 列をマージするには、文を select val.id_a::text || val.id_b::text AS id に変更します。

    CREATE OR REPLACE FUNCTION _sync_raw_C_row_to_properties(val raw_C)/* _sync_raw_C_row_to_properties 関数を作成または置換 */
    RETURNS agtype
    AS 'SELECT row_to_json((select x FROM (select val.id, val.id_a, val.id_b) x))::text::agtype'/* SQL 文 */
    LANGUAGE SQL;
  • エッジを挿入する前に、エッジの開始ノードと終了ノードがすでに挿入されていることを確認してください。 そうしないと、グラフデータベースは対応するノードを見つけられず、エラーが報告されます。