Hologres データソースは、Hologres との間で双方向のデータ読み書きチャネルを提供します。このトピックでは、DataWorks における Hologres データソースのデータ同期機能について説明します。
制限事項
オフラインでの読み書き
Hologres データソースは、Serverless リソースグループ (推奨) および データ統合専用リソースグループ をサポートします。
Hologres Writer は、Hologres 外部テーブルへのデータ書き込みをサポートしていません。
Hologres データソースは、次のロジックに基づいて Hologres エンドポイントを取得します。
現在のリージョンの Hologres インスタンスの場合、エンドポイントは次の順序で取得されます: 。
クロスリージョンの Hologres インスタンスの場合、エンドポイントは次の順序で取得されます: 。
リアルタイムでのデータベース全体の書き込み
リアルタイムデータ同期タスクは、Serverless リソースグループ (推奨) および データ統合専用リソースグループ のみをサポートします。
リアルタイムデータ同期タスクは、プライマリキーのないテーブルをサポートしていません。
サポートされているフィールドタイプ
フィールドタイプ | オフライン読み取り (Hologres Reader) | オフライン書き込み (Hologres Writer) | リアルタイム書き込み |
UUID | サポートされていません | サポートされていません | サポートされていません |
CHAR | サポートされています | サポートされています | サポートされています |
NCHAR | サポートされています | サポートされています | サポートされています |
VARCHAR | サポートされています | サポートされています | サポートされています |
LONGVARCHAR | サポートされています | サポートされています | サポートされています |
NVARCHAR | サポートされています | サポートされています | サポートされています |
LONGNVARCHAR | サポートされています | サポートされています | サポートされています |
CLOB | サポートされています | サポート | サポートされています |
NCLOB | サポートされています | サポートされています | サポート |
SMALLINT | サポートされています | サポートされています | サポート |
TINYINT | サポート | サポートされています | サポートされています |
INTEGER | サポートされています | サポートされています | サポート |
BIGINT | サポート | サポートされています | サポート |
NUMERIC | サポートされています | サポート | サポートされています |
DECIMAL | サポートされています | サポートされています | サポートされています |
FLOAT | サポートされています | サポートされています | サポートされています |
REAL | サポートされています | サポートされています | サポートされています |
DOUBLE | サポートされています | サポートされています | サポートされています |
TIME | サポートされています | サポート | サポートされています |
DATE | サポートされています | サポートされています | サポートされています |
TIMESTAMP | サポートされています | サポートされています | サポート |
BINARY | サポートされています | サポート | サポートされています |
VARBINARY | サポートされています | サポートされています | サポートされています |
BLOB | サポートされています | サポートされています | サポートされています |
LONGVARBINARY | サポート | サポートされています | サポートされています |
BOOLEAN | サポートされています | サポートされています | サポートされています |
BIT | サポートされています | サポートされています | サポートされています |
JSON | サポートされています | サポートされています | サポートされています |
JSONB | サポートされています | サポートされています | サポートされています |
仕組み
オフライン読み取り
Hologres Reader は PSQL を使用して Hologres テーブルからデータを読み取ります。テーブルのシャード数に基づいて複数の同時タスクを開始します。各シャードは、同時 SELECT タスクに対応します。
Hologres でテーブルを作成するときに、同じ
CREATE TABLEトランザクション内でCALL set_table_property('table_name', 'shard_count', 'xx')文を実行して、そのシャード数を構成できます。デフォルトでは、データベースのシャード数が使用されます。具体的な値は、Hologres インスタンスの構成によって異なります。
SELECT 文は、テーブルの組み込み hg_shard_id 列を使用して、シャードごとにデータをフィルターします。
オフライン書き込み
Hologres Writer は、データ同期フレームワークを通じてリーダーによって生成されたプロトコルデータを取得します。次に、conflictMode (競合ポリシー) の設定に基づいて、データ書き込みの競合解決ポリシーを決定します。
新しいデータと既存のデータの間でプライマリキーの競合が発生した場合に新しいデータを処理する方法を指定するために、conflictMode パラメーターを構成できます。
conflictMode は、プライマリキーを持つテーブルにのみ適用されます。書き込みの原則とパフォーマンスの詳細については、「技術的原則」をご参照ください。
conflictMode が Replace (行全体の更新) に設定されている場合、新しいデータが古いデータを上書きします。行のすべての列が上書きされます。列にマッピングされていないフィールドは、強制的に NULL に設定されます。
conflictMode が Update に設定されている場合、新しいデータが古いデータを上書きします。列にマッピングされているフィールドのみが上書きされます。
conflictMode が Ignore に設定されている場合、新しいデータは無視されます。
データソースの追加
DataWorks で同期タスクを開発する前に、「データソース管理」の指示に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールでパラメーターのツールチップを表示して、パラメーターの意味を理解できます。
データ同期タスクの開発
同期タスクを構成するためのエントリポイントと手順については、次の構成ガイドをご参照ください。
単一テーブルのオフライン同期タスクの構成ガイド
手順の詳細については、「コードレス UI 構成」または「コードエディタ構成」をご参照ください。
スクリプトモードのパラメーターとコードサンプルの完全なリストについては、「付録: コードとパラメーター」をご参照ください。
単一テーブルのリアルタイム同期タスクの構成ガイド
操作フローの詳細については、「DataStudio でリアルタイム同期タスクを構成する」および「データ統合でリアルタイム同期タスクを構成する」をご参照ください。
オフラインおよびリアルタイムのデータベース全体の同期タスクの構成ガイド
手順の詳細については、「オフラインのデータベース全体の同期タスクを構成する」および「データベース全体のリアルタイム同期タスク構成」をご参照ください。
付録: スクリプトのデモとパラメーターの説明
コードエディタを使用してバッチ同期タスクを構成する
コードエディタを使用してバッチ同期タスクを構成する場合、統一されたスクリプト形式の要件に基づいて、スクリプトで関連パラメーターを構成する必要があります。詳細については、「コードエディタでタスクを構成する」をご参照ください。次の情報は、コードエディタを使用してバッチ同期タスクを構成する際に、データソースに対して構成する必要があるパラメーターについて説明しています。
Reader スクリプトのデモ
非パーティションテーブルの構成
次のコードは、非パーティション化された Hologres テーブルからメモリにデータを読み取るタスクを構成する方法を示しています。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "where": "", "table": "holo_reader_basic_src" }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": { "print": false, "fieldDelimiter": "," }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }次のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
パーティションテーブルの構成
パーティション化された Hologres テーブルの子テーブルからメモリにデータを読み取るタスクを構成できます。
説明partition パラメーターの構成に注意してください。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "selectedDatabase": "public", "partition": "tag=foo", "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "tableComment": "", "where": "", "table": "public.holo_reader_basic_part_src" }, "name": "Reader", "category": "reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0" }, "speed":{ "throttle":true, "concurrent":1, "mbps":"12" } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }次のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # パーティションテーブルの子テーブルが作成され、データがインポートされていることを確認してください。 postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
Reader スクリプトのパラメーター
パラメータ | 説明 | 必須 | デフォルト値 |
database | Hologres インスタンス内のデータベースの名前。 | はい | なし |
table | Hologres テーブルの名前。テーブルがパーティションテーブルの場合は、親テーブルの名前を指定します。 | はい | なし |
column | データを読み取る列。値には、ソーステーブルのプライマリキー列を含める必要があります。例: | はい | なし |
partition | パーティションテーブルの場合、このパラメーターはパーティション列とその値を指定します。形式は 重要
| いいえ | 空。これは非パーティションテーブルを示します。 |
Writer スクリプトのデモ
非パーティションテーブルの構成
この例は、JDBC モードで MySQL から Hologres 標準テーブルにデータをインポートするための構成を示しています。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>",// MySQL データソースの名前。 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase":"public", "schema": "public", "maxConnectionCount": 9, "truncate":true,// クリーンアップルール。 "datasource": "<holo_sink_name>",// Hologres データソースの名前。 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" }, "locale": "zh_CN", "speed": { "concurrent": 2,// 同時ジョブの数。 "throttle": false// スロットリング。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }次のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
パーティションテーブルの構成
説明Hologres は LIST パーティショニングのみをサポートします。パーティション列として使用できるのは単一の列のみで、列は INT4 または TEXT 型である必要があります。
このパラメーターがテーブルの DDL 文のパーティション構成と一致していることを確認してください。
MySQL テーブルからパーティション化された Hologres テーブルの子テーブルにデータを同期するタスクを構成できます。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",// MySQL テーブルのプライマリキーフィールド。 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase": "public", "maxConnectionCount": 9, "partition": "<partition_key>",// Hologres テーブルのパーティションキー。 "truncate": "false", "datasource": "<holo_sink_name>",// Hologres データソースの名前。 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2,// 同時ジョブの数。 "throttle": false// スロットリング。 } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }次のコードは、Hologres テーブルの DDL 文を示しています。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
Writer スクリプトのパラメーター
パラメータ | 説明 | 必須 | デフォルト値 |
database | Hologres インスタンス内のデータベースの名前。 | はい | なし |
table | Hologres テーブルの名前。テーブル名にスキーマ名を含めることができます。例: | はい | なし |
conflictMode | conflictMode には Replace、Update、Ignore が含まれます。詳細については、「仕組み」をご参照ください。 | はい | なし |
column | データを書き込む列。値には、宛先テーブルのプライマリキー列を含める必要があります。例: | はい | なし |
partition | パーティションテーブルの場合、このパラメーターはパーティション列とその値を指定します。形式は 説明
| いいえ | 空。これは非パーティションテーブルを示します。 |
reShuffleByDistributionKey | Hologres では、プライマリキーを持つテーブルにデータをバッチインポートすると、デフォルトでテーブルロックがトリガーされます。これにより、複数の接続の同時書き込み機能が制限されます。オフライン同期シナリオで reShuffle 機能を有効にできます。この機能により、さまざまなタスクがデータシャーディングキーに基づいて指定された Holo シャードにデータを書き込むことができます。これにより、同時バッチ書き込みが可能になり、書き込みパフォーマンスが大幅に向上します。従来の JDBC モードでのリアルタイム書き込みと比較して、この機能は Holo サーバーのペイロードを削減し、書き込み効率を向上させます。 重要 この機能は、Serverless リソースグループでのみ有効にできます。 | いいえ | false |
truncate | データを書き込む前に宛先テーブルからデータを削除するかどうかを指定します。
| いいえ | false |