すべてのプロダクト
Search
ドキュメントセンター

DataWorks:Hologres データソース

最終更新日:Mar 07, 2026

Hologres データソースを使用すると、Hologres からの読み取りおよび Hologres への書き込みが可能です。本トピックでは、DataWorks が Hologres に対して提供するデータ同期機能について説明します。

制限事項

Hologres データソースのデータ同期タスクには、サーバーレスリソースグループが必要です。

オフライン読み取りおよび書き込み

  • Hologres ライターは、Hologres の外部テーブルへのデータ書き込みをサポートしていません。

  • Hologres データソースは、以下のロジックでエンドポイントを取得します。

    • 同一リージョン内の Hologres インスタンスの場合、エンドポイントの取得優先順位は次のとおりです:任意の Tunnelシングル Tunnelパブリック

    • 異なるリージョン内の Hologres インスタンスの場合、エンドポイントの取得優先順位は次のとおりです:パブリックシングル Tunnel

単一テーブルのリアルタイム読み取り

  • Hologres のバージョンは 2.1 以降である必要があります。

  • Hologres のパーティションテーブルに対する増分同期はサポートされていません。

  • Hologres テーブルからのデータ定義言語 (DDL) 変更メッセージの同期はサポートされていません。

  • Hologres からの増分同期でサポートされるデータの型は以下のとおりです。

    INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]。

  • 単一の Hologres テーブルに対してリアルタイム同期を実行するには、ソーステーブルで Hologres Binlog を有効化する必要があります。詳細については、「Hologres Binlog の購読」をご参照ください。

フルデータベースのリアルタイム書き込み

  • プライマリキーを持たないテーブルは、リアルタイムデータ同期タスクでサポートされていません。

  • MySQL から Hologres へのフルデータベースリアルタイム同期中は、パーティションテーブルの子テーブルにのみデータを書き込むことができます。親テーブルへの書き込みはできません。

サポートされるフィールドの型

オフライン読み取り

オフライン書き込み

リアルタイム書き込み

UUID

非対応

非対応

非対応

CHAR

対応

対応

対応

NCHAR

対応

対応

対応

VARCHAR

対応

対応

対応

LONGVARCHAR

対応

対応

対応

NVARCHAR

対応

対応

対応

LONGNVARCHAR

対応

対応

対応

CLOB

対応

対応

対応

NCLOB

対応

対応

対応

SMALLINT

対応

対応

対応

TINYINT

対応

対応

対応

INTEGER

対応

対応

対応

BIGINT

対応

対応

対応

NUMERIC

対応

対応

対応

DECIMAL

対応

対応

対応

FLOAT

対応

対応

対応

REAL

対応

対応

対応

DOUBLE

対応

対応

対応

TIME

対応

対応

対応

DATE

対応

対応

対応

TIMESTAMP

対応

対応

対応

BINARY

対応

対応

対応

VARBINARY

対応

対応

対応

BLOB

対応

対応

対応

LONGVARBINARY

対応

対応

対応

BOOLEAN

対応

対応

対応

BIT

対応

対応

対応

JSON

対応

対応

対応

JSONB

対応

対応

対応

仕組み

オフライン読み取り

Hologres リーダーは PSQL を使用して Hologres テーブルからデータを読み取り、テーブルの Shard Count に応じて複数の並列 SELECT タスクを開始します。

  • Hologres でテーブルを作成する際、同一の CREATE TABLE トランザクション内で CALL set_table_property('table_name', 'shard_count', 'xx') コマンドを実行することで、テーブルの Shard Count を設定できます。

    デフォルトでは、Hologres はデータベースの Shard Count を使用します。具体的な値は、ご利用のインスタンス構成によって異なります。

  • SELECT 文は、組み込みの hg_shard_id 列を使用してシャードごとにデータをフィルターします。

オフライン書き込み

Hologres ライターは、リーダーからプロトコルデータを取得し、conflictMode パラメーターを使用して書き込み操作中のデータ競合を処理します。

conflictMode パラメーターを設定して、プライマリキー競合が発生した場合の新規データの処理方法を指定します。

重要

conflictMode パラメーターは、プライマリキーを持つテーブルにのみ適用されます。書き込みメカニズムおよびパフォーマンスの詳細については、「技術的原理」をご参照ください。

  • conflictModeReplace(全行更新)に設定した場合、新規データが既存のデータを上書きします。行のすべての列が置き換えられます。マップされていない列は NULL に設定されます。

  • conflictModeUpdate に設定した場合、新規データが既存のデータを上書きします。マップされた列のみが更新されます。

  • conflictModeIgnore に設定した場合、新規データは破棄されます。

データソースの追加

DataWorks で同期タスクを開発する前に、データソース管理の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーター説明を表示することで、各パラメーターの意味を確認できます

データ同期タスク

単一テーブルのオフライン同期

単一テーブルのリアルタイム同期

フルデータベースのオフライン同期

フルデータベースのリアルタイム同期

サーバーレスフルデータベースリアルタイム同期

リアルタイム同期に関するよくある質問については、「Hologres へのリアルタイム同期に関する FAQ」をご参照ください。

付録:コードおよびパラメーター

コードエディタを使用したバッチ同期タスクの構成

コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプト形式要件に基づいて、関連するパラメーターをスクリプト内に設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを構成する際に、データソースで設定する必要があるパラメーターについて説明します。

リーダースクリプト

  • 非パーティションテーブルの構成

    • 以下のスクリプトは、非パーティション Hologres テーブルからメモリへデータを読み取る方法を示しています。

      {
          "transform": false,
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "holo",
                  "parameter": {
                      "datasource": "holo_db",
                      "envType": 1,
                      "column": [ 
                          "tag",
                          "id",
                          "title",
                          "body"
                      ],
                      "where": "",
                      "table": "holo_reader_basic_src"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "stream",
                  "parameter": {
                      "print": false,
                      "fieldDelimiter": ","
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "failoverEnable": null,
              "errorLimit": {
                  "record": "0"
              },
              "speed": {
                  "concurrent": 2,
                  "throttle": false
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • 以下のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • パーティションテーブルの構成

    • 以下のスクリプトは、パーティション Hologres テーブルの子テーブルからメモリへデータを読み取る方法を示しています。

      説明

      partition パラメーターの設定に注意してください。

      {
          "transform": false,
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "holo",
                  "parameter": {
                      "selectedDatabase": "public",
                      "partition": "tag=foo",
                      "datasource": "holo_db",
                      "envType": 1,
                      "column": [
                          "tag",
                          "id",
                          "title",
                          "body"
                      ],
                      "tableComment": "",
                      "where": "",
                      "table": "public.holo_reader_basic_part_src"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"
          },
          "speed":{
            "throttle":true,
            "concurrent":1,
            "mbps":"12"
              }
           },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • 以下のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # Make sure that the child table is created and data is imported.
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')

リーダースクリプトパラメーター

パラメーター

説明

必須

デフォルト

database

Hologres データベースの名前。

はい

なし

table

Hologres テーブルの名前。パーティションテーブルの場合は、親テーブルの名前を指定します。

はい

なし

column

データを読み取る列。例:["*"] はすべての列を指定します。

はい

なし

partition

パーティションテーブルの場合、パーティション列とその値を指定します。書式:column=value

重要
  • Hologres はリストパーティショニングのみをサポートしています。パーティションキーは、INT4 型または TEXT 型の単一列である必要があります。

  • この値は、テーブルの DDL 文におけるパーティション構成と一致している必要があります。

  • 対応する子テーブルが作成済みであり、データがインポート済みである必要があります。

いいえ

(非パーティションテーブルを示します)。

ライタースクリプト

  • 非パーティションテーブルの構成

    • 以下のスクリプトは、JDBC モードを使用して、MySQL データベースから非パーティション Hologres テーブルへデータを書き込む方法を示しています。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                      "useSpecialSecret": false,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "tableComment": "",
                      "connection": [
                          {
                              "datasource": "<mysql_source_name>",// MySQL データソースの名前
                              "table": [
                                  "<mysql_table_name>"
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "selectedDatabase":"public",
                      "schema": "public",
                      "maxConnectionCount": 9,
                      "truncate":true,// クリーンアップルール
                      "datasource": "<holo_sink_name>",// Hologres データソースの名前
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "<column1>",
                          "<column2>",
                          ......,
                          "<columnN>"
                      ],
                      "tableComment": "",
                      "table": "<holo_table_name>",
                      "reShuffleByDistributionKey":false
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": "0"
              },
              "locale": "zh_CN",
              "speed": {
                  "concurrent": 2,// ジョブの同時実行数
                  "throttle": false// スロットル
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • 以下のコードは、Hologres テーブルの DDL 文を示しています。

      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • パーティションテーブルの構成

    説明
    • Hologres はリストパーティショニングのみをサポートしています。パーティションキーは、INT4 型または TEXT 型の単一列である必要があります。

    • この値は、テーブルの DDL 文におけるパーティション構成と一致している必要があります。

    • 以下のスクリプトは、MySQL データベースからパーティション Hologres テーブルの子テーブルへデータを同期する方法を示しています。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "useSpecialSecret": false,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "tableComment": "",
              "connection": [
                {
                  "datasource": "<mysql_source_name>",
                  "table": [
                    "<mysql_table_name>"
                  ]
                }
              ],
              "where": "",
              "splitPk": "<mysql_pk>",// MySQL テーブルのプライマリキー列
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "selectedDatabase": "public",
              "maxConnectionCount": 9,
              "partition": "<partition_key>",// Hologres テーブルのパーティションキー
              "truncate": "false",
              "datasource": "<holo_sink_name>",// Hologres データソースの名前
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "<column1>",
                "<column2>",
                  ......,
                "<columnN>"
              ],
              "tableComment": "",
              "table": "<holo_table_name>",
              "reShuffleByDistributionKey":false
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "failoverEnable": null,
          "errorLimit": {
            "record": "0"
          },
          "speed": {
            "concurrent": 2,// ジョブの同時実行数
            "throttle": false// スロットル
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • 以下のコードは、Hologres テーブルの DDL 文を示しています。

      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;

ライタースクリプトパラメーター

パラメーター

説明

必須

デフォルト

database

Hologres データベースの名前。

はい

なし

table

Hologres テーブルの名前。テーブル名にはスキーマ名を含めることができます。例:schema_name.table_name

はい

なし

conflictMode

conflictMode には ReplaceUpdateIgnore のいずれかを指定します。詳細については、「実装原理」をご参照ください。

はい

なし

column

書き込み先の列。すべてのプライマリキー列を含める必要があります。例:["*"] はすべての列を指定します。

はい

なし

partition

パーティションテーブルの場合、パーティション列とその値を指定します。書式:column=value

説明
  • Hologres はリストパーティショニングのみをサポートしています。パーティションキーは、INT4 型または TEXT 型の単一列である必要があります。

  • この値は、テーブルの DDL 文におけるパーティション構成と一致している必要があります。

いいえ

(非パーティションテーブルを示します)。

reShuffleByDistributionKey

プライマリキーを持つテーブルへの並列バッチ書き込みを可能にし、デフォルトのテーブルロックをバイパスします。オフラインタスクでは、この機能により、分散キーに基づいてデータが特定のシャードに送信され、標準的な JDBC 書き込みと比較して書き込みパフォーマンスが向上し、サーバー負荷が軽減されます。

重要

この機能は、サーバーレスリソースグループを使用する場合にのみ利用可能です。

いいえ

false

truncate

書き込み前に宛先テーブルを切り捨て(TRUNCATE)するかどうかを指定します。

  • true:宛先テーブルを切り捨てます。

    説明
    • 非パーティションテーブルおよび静的パーティションテーブルのみ切り捨て可能です。動的パーティションテーブルを切り捨てようとした場合、タスクは失敗します。

    • 静的パーティションテーブルの場合、この値を true に設定すると、指定された子テーブルのみが切り捨てられます。親テーブルには影響しません。

  • false:宛先テーブルを切り捨てません。

いいえ

false