Hologres データソースを使用すると、Hologres からの読み取りおよび Hologres への書き込みが可能です。本トピックでは、DataWorks が Hologres に対して提供するデータ同期機能について説明します。
制限事項
Hologres データソースのデータ同期タスクには、サーバーレスリソースグループが必要です。
オフライン読み取りおよび書き込み
Hologres ライターは、Hologres の外部テーブルへのデータ書き込みをサポートしていません。
Hologres データソースは、以下のロジックでエンドポイントを取得します。
同一リージョン内の Hologres インスタンスの場合、エンドポイントの取得優先順位は次のとおりです:任意の Tunnel > シングル Tunnel > パブリック。
異なるリージョン内の Hologres インスタンスの場合、エンドポイントの取得優先順位は次のとおりです:パブリック > シングル Tunnel。
単一テーブルのリアルタイム読み取り
Hologres のバージョンは 2.1 以降である必要があります。
Hologres のパーティションテーブルに対する増分同期はサポートされていません。
Hologres テーブルからのデータ定義言語 (DDL) 変更メッセージの同期はサポートされていません。
Hologres からの増分同期でサポートされるデータの型は以下のとおりです。
INTEGER、BIGINT、TEXT、CHAR(n)、VARCHAR(n)、REAL、JSON、SERIAL、OID、INT4[]、INT8[]、FLOAT8[]、BOOLEAN[]、TEXT[]。
単一の Hologres テーブルに対してリアルタイム同期を実行するには、ソーステーブルで Hologres Binlog を有効化する必要があります。詳細については、「Hologres Binlog の購読」をご参照ください。
フルデータベースのリアルタイム書き込み
プライマリキーを持たないテーブルは、リアルタイムデータ同期タスクでサポートされていません。
MySQL から Hologres へのフルデータベースリアルタイム同期中は、パーティションテーブルの子テーブルにのみデータを書き込むことができます。親テーブルへの書き込みはできません。
サポートされるフィールドの型
型 | オフライン読み取り | オフライン書き込み | リアルタイム書き込み |
UUID | 非対応 | 非対応 | 非対応 |
CHAR | 対応 | 対応 | 対応 |
NCHAR | 対応 | 対応 | 対応 |
VARCHAR | 対応 | 対応 | 対応 |
LONGVARCHAR | 対応 | 対応 | 対応 |
NVARCHAR | 対応 | 対応 | 対応 |
LONGNVARCHAR | 対応 | 対応 | 対応 |
CLOB | 対応 | 対応 | 対応 |
NCLOB | 対応 | 対応 | 対応 |
SMALLINT | 対応 | 対応 | 対応 |
TINYINT | 対応 | 対応 | 対応 |
INTEGER | 対応 | 対応 | 対応 |
BIGINT | 対応 | 対応 | 対応 |
NUMERIC | 対応 | 対応 | 対応 |
DECIMAL | 対応 | 対応 | 対応 |
FLOAT | 対応 | 対応 | 対応 |
REAL | 対応 | 対応 | 対応 |
DOUBLE | 対応 | 対応 | 対応 |
TIME | 対応 | 対応 | 対応 |
DATE | 対応 | 対応 | 対応 |
TIMESTAMP | 対応 | 対応 | 対応 |
BINARY | 対応 | 対応 | 対応 |
VARBINARY | 対応 | 対応 | 対応 |
BLOB | 対応 | 対応 | 対応 |
LONGVARBINARY | 対応 | 対応 | 対応 |
BOOLEAN | 対応 | 対応 | 対応 |
BIT | 対応 | 対応 | 対応 |
JSON | 対応 | 対応 | 対応 |
JSONB | 対応 | 対応 | 対応 |
仕組み
オフライン読み取り
Hologres リーダーは PSQL を使用して Hologres テーブルからデータを読み取り、テーブルの Shard Count に応じて複数の並列 SELECT タスクを開始します。
Hologres でテーブルを作成する際、同一の
CREATE TABLEトランザクション内でCALL set_table_property('table_name', 'shard_count', 'xx')コマンドを実行することで、テーブルの Shard Count を設定できます。デフォルトでは、Hologres はデータベースの Shard Count を使用します。具体的な値は、ご利用のインスタンス構成によって異なります。
SELECT 文は、組み込みの hg_shard_id 列を使用してシャードごとにデータをフィルターします。
オフライン書き込み
Hologres ライターは、リーダーからプロトコルデータを取得し、conflictMode パラメーターを使用して書き込み操作中のデータ競合を処理します。
conflictMode パラメーターを設定して、プライマリキー競合が発生した場合の新規データの処理方法を指定します。
conflictMode パラメーターは、プライマリキーを持つテーブルにのみ適用されます。書き込みメカニズムおよびパフォーマンスの詳細については、「技術的原理」をご参照ください。
conflictMode を Replace(全行更新)に設定した場合、新規データが既存のデータを上書きします。行のすべての列が置き換えられます。マップされていない列は NULL に設定されます。
conflictMode を Update に設定した場合、新規データが既存のデータを上書きします。マップされた列のみが更新されます。
conflictMode を Ignore に設定した場合、新規データは破棄されます。
データソースの追加
DataWorks で同期タスクを開発する前に、データソース管理の手順に従って、必要なデータソースを DataWorks に追加する必要があります。データソースを追加する際に、DataWorks コンソールのパラメーター説明を表示することで、各パラメーターの意味を確認できます。
データ同期タスク
単一テーブルのオフライン同期
サポートされるデータソース:Data Integration でサポートされるすべてのデータソースタイプ。
構成ガイド:「コードレス UI 構成」および「スクリプトモード構成」。スクリプトモードのパラメーター一覧およびスクリプト例については、「付録:コードおよびパラメーター」をご参照ください。
単一テーブルのリアルタイム同期
サポートされるデータソース:DataHub、Hologres、Kafka、LogHub。
構成ガイド:「単一テーブル向けリアルタイム同期タスクの構成」。
フルデータベースのオフライン同期
サポートされるデータソース:AnalyticDB for MySQL 3.0、ClickHouse、Doris、Hologres、Oracle、PolarDB、SQL Server。
構成ガイド:「フルデータベースオフライン同期タスクの構成」。
フルデータベースのリアルタイム同期
サポートされるデータソース:AnalyticDB for OceanBase、MongoDB、MySQL、Oracle、PolarDB、PolarDB-X 2.0、PostgreSQL。
構成ガイド:「フルデータベース向けリアルタイム同期タスクの構成」。
サーバーレスフルデータベースリアルタイム同期
サポートされるデータソース:MySQL。
構成ガイド:「サーバーレス同期タスクの構成」。
リアルタイム同期に関するよくある質問については、「Hologres へのリアルタイム同期に関する FAQ」をご参照ください。
付録:コードおよびパラメーター
コードエディタを使用したバッチ同期タスクの構成
コードエディタを使用してバッチ同期タスクを構成する場合は、統一されたスクリプト形式要件に基づいて、関連するパラメーターをスクリプト内に設定する必要があります。詳細については、「コードエディタの使用」をご参照ください。以下に、コードエディタを使用してバッチ同期タスクを構成する際に、データソースで設定する必要があるパラメーターについて説明します。
リーダースクリプト
非パーティションテーブルの構成
以下のスクリプトは、非パーティション Hologres テーブルからメモリへデータを読み取る方法を示しています。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "where": "", "table": "holo_reader_basic_src" }, "name": "Reader", "category": "reader" }, { "stepType": "stream", "parameter": { "print": false, "fieldDelimiter": "," }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2, "throttle": false } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }以下のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists holo_reader_basic_src; create table holo_reader_basic_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)); call set_table_property('holo_reader_basic_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_src', 'shard_count', '3'); commit;
パーティションテーブルの構成
以下のスクリプトは、パーティション Hologres テーブルの子テーブルからメモリへデータを読み取る方法を示しています。
説明partition パラメーターの設定に注意してください。
{ "transform": false, "type": "job", "version": "2.0", "steps": [ { "stepType": "holo", "parameter": { "selectedDatabase": "public", "partition": "tag=foo", "datasource": "holo_db", "envType": 1, "column": [ "tag", "id", "title", "body" ], "tableComment": "", "where": "", "table": "public.holo_reader_basic_part_src" }, "name": "Reader", "category": "reader" }, { "stepType":"stream", "parameter":{}, "name":"Writer", "category":"writer" } ], "setting":{ "errorLimit":{ "record":"0" }, "speed":{ "throttle":true, "concurrent":1, "mbps":"12" } }, "order":{ "hops":[ { "from":"Reader", "to":"Writer" } ] } }以下のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists holo_reader_basic_part_src; create table holo_reader_basic_part_src( tag text not null, id int not null, title text not null, body text, primary key (tag, id)) partition by list( tag ); call set_table_property('holo_reader_basic_part_src', 'orientation', 'column'); call set_table_property('holo_reader_basic_part_src', 'shard_count', '3'); commit; create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo'); # Make sure that the child table is created and data is imported. postgres=# \d+ holo_reader_basic_part_src Table "public.holo_reader_basic_part_src" Column | Type | Collation | Nullable | Default | Storage | Stats target | Description --------+---------+-----------+----------+---------+----------+--------------+------------- tag | text | | not null | | extended | | id | integer | | not null | | plain | | title | text | | not null | | extended | | body | text | | | | extended | | Partition key: LIST (tag) Indexes: "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id) Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')
リーダースクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト |
database | Hologres データベースの名前。 | はい | なし |
table | Hologres テーブルの名前。パーティションテーブルの場合は、親テーブルの名前を指定します。 | はい | なし |
column | データを読み取る列。例: | はい | なし |
partition | パーティションテーブルの場合、パーティション列とその値を指定します。書式: 重要
| いいえ | 空(非パーティションテーブルを示します)。 |
ライタースクリプト
非パーティションテーブルの構成
以下のスクリプトは、JDBC モードを使用して、MySQL データベースから非パーティション Hologres テーブルへデータを書き込む方法を示しています。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>",// MySQL データソースの名前 "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "", "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase":"public", "schema": "public", "maxConnectionCount": 9, "truncate":true,// クリーンアップルール "datasource": "<holo_sink_name>",// Hologres データソースの名前 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "errorLimit": { "record": "0" }, "locale": "zh_CN", "speed": { "concurrent": 2,// ジョブの同時実行数 "throttle": false// スロットル } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }以下のコードは、Hologres テーブルの DDL 文を示しています。
begin; drop table if exists mysql_to_holo_test; create table mysql_to_holo_test( tag text not null, id int not null, body text not null, brrth date, primary key (tag, id)); call set_table_property('mysql_to_holo_test', 'orientation', 'column'); call set_table_property('mysql_to_holo_test', 'distribution_key', 'id'); call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth'); commit;
パーティションテーブルの構成
説明Hologres はリストパーティショニングのみをサポートしています。パーティションキーは、INT4 型または TEXT 型の単一列である必要があります。
この値は、テーブルの DDL 文におけるパーティション構成と一致している必要があります。
以下のスクリプトは、MySQL データベースからパーティション Hologres テーブルの子テーブルへデータを同期する方法を示しています。
{ "type": "job", "version": "2.0", "steps": [ { "stepType": "mysql", "parameter": { "envType": 0, "useSpecialSecret": false, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "connection": [ { "datasource": "<mysql_source_name>", "table": [ "<mysql_table_name>" ] } ], "where": "", "splitPk": "<mysql_pk>",// MySQL テーブルのプライマリキー列 "encoding": "UTF-8" }, "name": "Reader", "category": "reader" }, { "stepType": "holo", "parameter": { "selectedDatabase": "public", "maxConnectionCount": 9, "partition": "<partition_key>",// Hologres テーブルのパーティションキー "truncate": "false", "datasource": "<holo_sink_name>",// Hologres データソースの名前 "conflictMode": "ignore", "envType": 0, "column": [ "<column1>", "<column2>", ......, "<columnN>" ], "tableComment": "", "table": "<holo_table_name>", "reShuffleByDistributionKey":false }, "name": "Writer", "category": "writer" } ], "setting": { "executeMode": null, "failoverEnable": null, "errorLimit": { "record": "0" }, "speed": { "concurrent": 2,// ジョブの同時実行数 "throttle": false// スロットル } }, "order": { "hops": [ { "from": "Reader", "to": "Writer" } ] } }以下のコードは、Hologres テーブルの DDL 文を示しています。
BEGIN; CREATE TABLE public.hologres_parent_table( a text , b int, c timestamp, d text, ds text, primary key(ds,b) ) PARTITION BY LIST(ds); CALL set_table_property('public.hologres_parent_table', 'orientation', 'column'); CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215'); CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216'); CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217'); COMMIT;
ライタースクリプトパラメーター
パラメーター | 説明 | 必須 | デフォルト |
database | Hologres データベースの名前。 | はい | なし |
table | Hologres テーブルの名前。テーブル名にはスキーマ名を含めることができます。例: | はい | なし |
conflictMode | conflictMode には Replace、Update、Ignore のいずれかを指定します。詳細については、「実装原理」をご参照ください。 | はい | なし |
column | 書き込み先の列。すべてのプライマリキー列を含める必要があります。例: | はい | なし |
partition | パーティションテーブルの場合、パーティション列とその値を指定します。書式: 説明
| いいえ | 空(非パーティションテーブルを示します)。 |
reShuffleByDistributionKey | プライマリキーを持つテーブルへの並列バッチ書き込みを可能にし、デフォルトのテーブルロックをバイパスします。オフラインタスクでは、この機能により、分散キーに基づいてデータが特定のシャードに送信され、標準的な JDBC 書き込みと比較して書き込みパフォーマンスが向上し、サーバー負荷が軽減されます。 重要 この機能は、サーバーレスリソースグループを使用する場合にのみ利用可能です。 | いいえ | false |
truncate | 書き込み前に宛先テーブルを切り捨て(TRUNCATE)するかどうかを指定します。
| いいえ | false |