すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Hologres データソース

最終更新日:Nov 09, 2025

Hologres データソースは、Hologres との間で双方向のデータ読み書きチャネルを提供します。このトピックでは、DataWorks における Hologres データソースのデータ同期機能について説明します。

制限事項

オフラインでの読み書き

  • Hologres データソースは、Serverless リソースグループ (推奨) および データ統合専用リソースグループ をサポートします。

  • Hologres Writer は、Hologres 外部テーブルへのデータ書き込みをサポートしていません。

  • Hologres データソースは、次のロジックに基づいて Hologres エンドポイントを取得します。

    • 現在のリージョンの Hologres インスタンスの場合、エンドポイントは次の順序で取得されます: [any Tunnel] > [single Tunnel] > [Public (Internet)]

    • クロスリージョンの Hologres インスタンスの場合、エンドポイントは次の順序で取得されます: [Public (Internet)] > [single Tunnel]

リアルタイムでのデータベース全体の書き込み

サポートされているフィールドタイプ

フィールドタイプ

オフライン読み取り (Hologres Reader)

オフライン書き込み (Hologres Writer)

リアルタイム書き込み

UUID

サポートされていません

サポートされていません

サポートされていません

CHAR

サポートされています

サポートされています

サポートされています

NCHAR

サポートされています

サポートされています

サポートされています

VARCHAR

サポートされています

サポートされています

サポートされています

LONGVARCHAR

サポートされています

サポートされています

サポートされています

NVARCHAR

サポートされています

サポートされています

サポートされています

LONGNVARCHAR

サポートされています

サポートされています

サポートされています

CLOB

サポートされています

サポート

サポートされています

NCLOB

サポートされています

サポートされています

サポート

SMALLINT

サポートされています

サポートされています

サポート

TINYINT

サポート

サポートされています

サポートされています

INTEGER

サポートされています

サポートされています

サポート

BIGINT

サポート

サポートされています

サポート

NUMERIC

サポートされています

サポート

サポートされています

DECIMAL

サポートされています

サポートされています

サポートされています

FLOAT

サポートされています

サポートされています

サポートされています

REAL

サポートされています

サポートされています

サポートされています

DOUBLE

サポートされています

サポートされています

サポートされています

TIME

サポートされています

サポート

サポートされています

DATE

サポートされています

サポートされています

サポートされています

TIMESTAMP

サポートされています

サポートされています

サポート

BINARY

サポートされています

サポート

サポートされています

VARBINARY

サポートされています

サポートされています

サポートされています

BLOB

サポートされています

サポートされています

サポートされています

LONGVARBINARY

サポート

サポートされています

サポートされています

BOOLEAN

サポートされています

サポートされています

サポートされています

BIT

サポートされています

サポートされています

サポートされています

JSON

サポートされています

サポートされています

サポートされています

JSONB

サポートされています

サポートされています

サポートされています

仕組み

オフライン読み取り

Hologres Reader は PSQL を使用して Hologres テーブルからデータを読み取ります。テーブルのシャード数に基づいて複数の同時タスクを開始します。各シャードは、同時 SELECT タスクに対応します。

  • Hologres でテーブルを作成するときに、同じ CREATE TABLE トランザクション内で CALL set_table_property('table_name', 'shard_count', 'xx') 文を実行して、そのシャード数を構成できます。

    デフォルトでは、データベースのシャード数が使用されます。具体的な値は、Hologres インスタンスの構成によって異なります。

  • SELECT 文は、テーブルの組み込み hg_shard_id 列を使用して、シャードごとにデータをフィルターします。

オフライン書き込み

Hologres Writer は、データ同期フレームワークを通じてリーダーによって生成されたプロトコルデータを取得します。次に、conflictMode (競合ポリシー) の設定に基づいて、データ書き込みの競合解決ポリシーを決定します。

新しいデータと既存のデータの間でプライマリキーの競合が発生した場合に新しいデータを処理する方法を指定するために、conflictMode パラメーターを構成できます。

重要

conflictMode は、プライマリキーを持つテーブルにのみ適用されます。書き込みの原則とパフォーマンスの詳細については、「技術的原則」をご参照ください。

  • conflictModeReplace (行全体の更新) に設定されている場合、新しいデータが古いデータを上書きします。行のすべての列が上書きされます。列にマッピングされていないフィールドは、強制的に NULL に設定されます。

  • conflictModeUpdate に設定されている場合、新しいデータが古いデータを上書きします。列にマッピングされているフィールドのみが上書きされます。

  • conflictModeIgnore に設定されている場合、新しいデータは無視されます。

データソースの追加

DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターのツールチップを表示して、パラメーターの意味を理解できます

データ同期タスクの開発

同期タスクを構成するためのエントリポイントと手順については、次の構成ガイドをご参照ください。

単一テーブルのオフライン同期タスクの構成ガイド

単一テーブルのリアルタイム同期タスクの構成ガイド

操作フローの詳細については、「DataStudio でリアルタイム同期タスクを構成する」および「データ統合でリアルタイム同期タスクを構成する」をご参照ください。

オフラインおよびリアルタイムのデータベース全体の同期タスクの構成ガイド

手順の詳細については、「オフラインのデータベース全体の同期タスクを構成する」および「データベース全体のリアルタイム同期タスク構成」をご参照ください。

付録: スクリプトのデモとパラメーターの説明

コードエディタを使用してバッチ同期タスクを構成する

コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプトで関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。

Reader スクリプトのデモ

  • 非パーティションテーブルの構成

    • 次のコードは、非パーティション化された Hologres テーブルからメモリにデータを読み取るタスクを構成する方法を示しています。

      {
          "transform": false,
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "holo",
                  "parameter": {
                      "datasource": "holo_db",
                      "envType": 1,
                      "column": [ 
                          "tag",
                          "id",
                          "title",
                          "body"
                      ],
                      "where": "",
                      "table": "holo_reader_basic_src"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "stream",
                  "parameter": {
                      "print": false,
                      "fieldDelimiter": ","
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "failoverEnable": null,
              "errorLimit": {
                  "record": "0"
              },
              "speed": {
                  "concurrent": 2,
                  "throttle": false
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • 次のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • パーティションテーブルの構成

    • パーティション化された Hologres テーブルの子テーブルからメモリにデータを読み取るタスクを構成できます。

      説明

      partition パラメーターの構成に注意してください。

      {
          "transform": false,
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "holo",
                  "parameter": {
                      "selectedDatabase": "public",
                      "partition": "tag=foo",
                      "datasource": "holo_db",
                      "envType": 1,
                      "column": [
                          "tag",
                          "id",
                          "title",
                          "body"
                      ],
                      "tableComment": "",
                      "where": "",
                      "table": "public.holo_reader_basic_part_src"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"
          },
          "speed":{
            "throttle":true,
            "concurrent":1,
            "mbps":"12"
              }
           },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • 次のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # パーティションテーブルの子テーブルが作成され、データがインポートされていることを確認してください。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')

Reader スクリプトのパラメーター

パラメータ

説明

必須

デフォルト値

database

Hologres インスタンス内のデータベースの名前。

はい

なし

table

Hologres テーブルの名前。テーブルがパーティションテーブルの場合は、親テーブルの名前を指定します。

はい

なし

column

データを読み取る列。値には、ソーステーブルのプライマリキー列を含める必要があります。例: ["*"] はすべての列を指定します。

はい

なし

partition

パーティションテーブルの場合、このパラメーターはパーティション列とその値を指定します。形式は column=value です。

重要
  • Hologres は LIST パーティショニングのみをサポートします。パーティション列としてサポートされるのは単一の列のみで、列は INT4 または TEXT 型である必要があります。

  • このパラメーターがテーブルの DDL 文のパーティション構成と一致していることを確認してください。

  • 対応する子テーブルが作成され、データがインポートされていることを確認してください。

いいえ

。これは非パーティションテーブルを示します。

Writer スクリプトのデモ

  • 非パーティションテーブルの構成

    • この例は、JDBC モードで MySQL から Hologres 標準テーブルにデータをインポートするための構成を示しています。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                      "useSpecialSecret": false,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "tableComment": "",
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",// MySQL データソースの名前。
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "selectedDatabase":"public",
                      "schema": "public",
                      "maxConnectionCount": 9,
                      "truncate":true,// クリーンアップルール。
                      "datasource": "<holo_sink_name>",// Hologres データソースの名前。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "tableComment": "",
                      "table": "<holo_table_name>",
                      "reShuffleByDistributionKey":false
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": "0"
              },
              "locale": "zh_CN",
              "speed": {
                  "concurrent": 2,// 同時ジョブの数。
                  "throttle": false// スロットリング。
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • 次のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • パーティションテーブルの構成

    説明
    • Hologres は LIST パーティショニングのみをサポートします。パーティション列として使用できるのは単一の列のみで、列は INT4 または TEXT 型である必要があります。

    • このパラメーターがテーブルの DDL 文のパーティション構成と一致していることを確認してください。

    • MySQL テーブルからパーティション化された Hologres テーブルの子テーブルにデータを同期するタスクを構成できます。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "useSpecialSecret": false,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "tableComment": "",
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",// MySQL テーブルのプライマリキーフィールド。
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "selectedDatabase": "public",
              "maxConnectionCount": 9,
              "partition": "<partition_key>",// Hologres テーブルのパーティションキー。
              "truncate": "false",
              "datasource": "<holo_sink_name>",// Hologres データソースの名前。
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "tableComment": "",
              "table": "<holo_table_name>",
              "reShuffleByDistributionKey":false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "failoverEnable": null,
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "concurrent": 2,// 同時ジョブの数。
            "throttle": false// スロットリング。
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • 次のコードは、Hologres テーブルの DDL 文を示しています。

      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;

Writer スクリプトのパラメーター

パラメータ

説明

必須

デフォルト値

database

Hologres インスタンス内のデータベースの名前。

はい

なし

table

Hologres テーブルの名前。テーブル名にスキーマ名を含めることができます。例: schema_name.table_name

はい

なし

conflictMode

conflictMode には ReplaceUpdateIgnore が含まれます。詳細については、「仕組み」をご参照ください。

はい

なし

column

データを書き込む列。値には、宛先テーブルのプライマリキー列を含める必要があります。例: ["*"] はすべての列を指定します。

はい

なし

partition

パーティションテーブルの場合、このパラメーターはパーティション列とその値を指定します。形式は column=value です。

説明
  • Hologres は LIST パーティショニングのみをサポートします。パーティション列としてサポートされるのは単一の列のみで、列は INT4 または TEXT 型である必要があります。

  • このパラメーターがテーブルの DDL 文のパーティション構成と一致していることを確認してください。

いいえ

。これは非パーティションテーブルを示します。

reShuffleByDistributionKey

Hologres では、プライマリキーを持つテーブルにデータをバッチインポートすると、デフォルトでテーブルロックがトリガーされます。これにより、複数の接続の同時書き込み機能が制限されます。オフライン同期シナリオで reShuffle 機能を有効にできます。この機能により、さまざまなタスクがデータシャーディングキーに基づいて指定された Holo シャードにデータを書き込むことができます。これにより、同時バッチ書き込みが可能になり、書き込みパフォーマンスが大幅に向上します。従来の JDBC モードでのリアルタイム書き込みと比較して、この機能は Holo サーバーのペイロードを削減し、書き込み効率を向上させます。

重要

この機能は、Serverless リソースグループでのみ有効にできます。

いいえ

false

truncate

データを書き込む前に宛先テーブルからデータを削除するかどうかを指定します。

  • true: 宛先テーブルからデータを削除します。

    説明
    • 非パーティションテーブルと静的パーティションテーブルからのみデータを削除できます。動的パーティションテーブルからデータを削除することはできません。動的パーティションテーブルを使用してこのパラメーターを true に設定すると、同期タスクは予期せず終了します。

    • 静的パーティションテーブルを使用してこのパラメーターを true に設定すると、子パーティションテーブルのデータは削除されますが、親テーブルのデータは削除されません。

  • false: 宛先テーブルからデータを削除しません。

いいえ

false