Hologres の親テーブルに属する複数のパーティション分割された子テーブルに対して INSERT、DELETE、UPDATE などの操作を実行するには、DataWorks の for-each ノードのループ機能を使用できます。この機能により、複雑なループ処理が簡素化されます。データが更新された後、親テーブルですべてのパーティションデータを確認できます。
背景情報
Hologres はネイティブの PostgreSQL エコシステムと互換性があり、パーティションテーブルをサポートしています。パーティションテーブルに対して INSERT、DELETE、または UPDATE 操作を実行する場合、子テーブルに対して実行する必要があります。親テーブルに対する操作は、自動的に子テーブルにルーティングされません。これらの操作を定期的に実行するには、DataWorks のスケジューリングノードを使用できます。詳細については、「DataWorks を使用して MaxCompute データを定期的にインポートするためのベストプラクティス」をご参照ください。
しかし、複数のパーティション分割された子テーブルに対する一度限りの操作を伴う既存データのバックフィルなどのシナリオでは、複数の SQL 実行をスケジューリングするのは手間がかかります。このトピックでは、DataWorks の for-each ノードを使用して複数パーティションの操作を実行する方法について説明します。
for-each ノードを使用して、代入ノードからの結果セットをループ処理できます。詳細については、「for-each ノードの設定」をご参照ください。
制限
Hologres SQL ノードで 代入パラメーター 機能を使用するには、DataWorks Standard Edition 以降のバージョンを購入する必要があります。
例
このトピックでは、MaxCompute のデータを Hologres の複数のパーティション分割された子テーブルに書き込む例を示します。以下の手順では、for-each ノードを使用して複数パーティションの操作を実行する方法について説明します。
-
テーブルとデータを準備します。
-
MaxCompute でパーティションテーブルを作成し、データを挿入します。
-- odps_sale_detail という名前の MaxCompute パーティションテーブルを作成します。 CREATE TABLE IF NOT EXISTS odps_sale_detail ( shop_name STRING, customer_id STRING, total_price DOUBLE ) PARTITIONED BY ( sale_date STRING); ---- ソーステーブルにパーティション 20240110 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240110'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240110') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240111 を追加します。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240111'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240111') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240112 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240112'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240112') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240113 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240113'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240113') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240114 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240114'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240114') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240115 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240115'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240115') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソーステーブルにパーティション 20240116 を追加し、データを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240116'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240116') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -
Hologres で外部テーブルとパーティション分割された親テーブルを作成します。
-- 外部テーブルを作成します。 IMPORT FOREIGN SCHEMA <maxCompute_projectname> LIMIT to( odps_sale_detail) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'error',if_unsupported_type 'error'); -- Hologres のパーティション分割された親テーブル (内部テーブル) を作成します。 BEGIN ; CREATE TABLE IF NOT EXISTS holo_sale_detail( shop_name TEXT, customer_id TEXT , total_price FLOAT8, sale_date TEXT ) PARTITION BY LIST(sale_date); COMMIT;
-
-
Hologres データソースを DataWorks ワークスペースにアタッチ
DataWorks で Hologres データソースをアタッチできます。詳細については、「Hologres コンピューティングリソースのアタッチ」をご参照ください。
-
Hologres SQL ノードを追加してパーティションを取得
-
DataWorks の Data Studio モジュールで、Hologres SQL ノードを作成し、for-each ノードの入力として使用するパーティション値を取得するための SQL 文を入力します。詳細については、「Hologres SQL ノード」をご参照ください。
サンプル SQL:20240112 から 20240116 までの 5 つのパーティション値を取得します。
SELECT distinct sale_date FROM odps_sale_detail WHERE sale_date > '20240111';次の結果が返されます。
+------------+ | sale_date | +------------+ | 20240112 | | 20240113 | | 20240114 | | 20240115 | | 20240116 | +------------+ -
ノードの出力パラメーターを設定します。
スケジューリング設定 > ノードコンテキストパラメーター > 出力パラメーター セクションで、[代入パラメーターの追加] をクリックし、デフォルトの outputs パラメーターを選択します。詳細については、「ノードコンテキストの設定」をご参照ください。

-
-
for-each ノードを設定して、パーティションデータをループ処理し、操作します。詳細については、「for-each ノードの設定」をご参照ください。
-
for-each ノードを追加します。
-
for-each ノード内に、新しい Hologres SQL ノードを追加します。その先祖ノードを start ノードに、子孫ノードを end ノードに設定します。Hologres SQL ノードにデータインポートロジックを入力します。以下のサンプル SQL が提供されています。
説明スクリプトはまず子テーブルを作成し、次にデータを挿入します。for-each ノードは、前のステップ の Hologres SQL ノードからのパーティション結果をスクリプト内のパラメーターに割り当てます。その後、ノードはすべての出力値が処理されるまでスクリプトをループ実行します。
-- 子テーブルを作成します。 CREATE TABLE IF NOT EXISTS public.holo_sale_detail_${dag.foreach.current} PARTITION OF public.holo_sale_detail FOR VALUES IN('${dag.foreach.current}'); -- データをインポートします。 INSERT INTO public.holo_sale_detail_${dag.foreach.current} SELECT * FROM odps_sale_detail where sale_date ='${dag.foreach.current}'; -
for-each ノードの編集ページで、右側の スケジューリング設定 をクリックします。依存関係 セクションで、このノードの先祖ノードを設定します。ノードコンテキスト > 入力パラメーター セクションで、デフォルトのパラメーター名 loopDataArray の横にある Edit をクリックします。値ソース リストで、先祖の代入ノードの outputs パラメーターを選択します。

-
for-each ノードを保存して送信します。
説明標準モードのワークスペースを使用している場合は、ノードを送信した後に右上隅の [公開] をクリックする必要があります。詳細については、「タスクの公開」をご参照ください。
-
-
ノードをテストし、結果を確認します。詳細については、「ノードのテストと結果の確認」をご参照ください。
-
自動トリガータスク ページで、右側の有向非巡回グラフ (DAG) 内で、代入ノード (最初のノード) を右クリックし、[データバックフィル] > [現在のノードと子孫ノード] を選択します。

-
データバックフィルインスタンスが正常に実行された後、データバックフィルインスタンス ページに移動します。次に、for-each ノード (2 番目のノード) を右クリックし、[内部ノードの表示] を選択します。ジョブが 5 回ループしたことが確認できます。

-
Hologres で、新しい子テーブルが作成され、データが書き込まれていることを確認できます。

-
障害の処理
for-each ノードの実行中にループでエラーが発生した場合、残りのタスクは実行を停止します。for-each ノードを開いて、障害の詳細を確認できます。
障害を解決した後、タスクを 再実行 して再度実行できます。