Hologres の親パーティションテーブルの複数の子パーティションテーブルに対して、INSERT、DELETE、UPDATE などの操作を実行する必要がある場合は、DataWorks の for-each ノードを使用して操作をループで実行できます。 for-each ノードは、複雑なループ処理ロジックを簡素化します。 データが更新された後、親パーティションテーブルからすべての子パーティションテーブルのデータをクエリできます。
背景情報
Hologres はネイティブ PostgreSQL エコシステムと互換性があり、パーティションテーブルをサポートしています。 パーティションテーブルに対して INSERT、DELETE、または UPDATE 操作を実行する場合、親パーティションテーブルではなく、特定の子パーティションテーブルに対して操作を実行する必要があります。 子パーティションテーブルに対して定期的に操作を実行する場合は、DataWorks スケジューリングノードを使用することをお勧めします。 たとえば、DataWorks を使用して、MaxCompute データを子パーティションテーブルに定期的にインポートできます。 詳細については、「DataWorks を使用して MaxCompute データを定期的にインポートする」をご参照ください。
ただし、複数の子パーティションテーブルに対して同時に操作を実行する場合、たとえば、複数の子パーティションテーブルの履歴データを更新する場合、スケジューリングノードは非効率的です。 これは、複数の SQL ステートメントを実行する必要があるためです。 このトピックでは、DataWorks の for-each ノードを使用して複数の子パーティションテーブルに対して操作を実行する方法について説明します。
for-each ノードを使用して、割り当てノードの出力をループでトラバースできます。 詳細については、「for-each ノードを構成する」をご参照ください。
制限
DataWorks Standard Edition 以上では、Hologres SQL ノードで [割り当てパラメーター] を使用できます。
例
この例では、for-each ノードを使用して、MaxCompute データを Hologres の複数の子パーティションテーブルに書き込みます。
テーブルとデータを準備します。
MaxCompute にパーティションテーブルを作成し、パーティションテーブルにデータを挿入します。
-- MaxCompute に odps_sale_detail という名前のパーティションテーブルを作成します。 CREATE TABLE IF NOT EXISTS odps_sale_detail ( shop_name STRING, customer_id STRING, total_price DOUBLE ) PARTITIONED BY ( sale_date STRING); ---- ソースパーティションテーブルに 20240110 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240110'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240110') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240111 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240111'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240111') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240112 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240112'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240112') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240113 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240113'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240113') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240114 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240114'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240114') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240115 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240115'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240115') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3); -- ソースパーティションテーブルに 20240116 パーティションを作成し、パーティションにデータを書き込みます。 ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240116'); INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240116') VALUES ('s1','c1',100.1), ('s2','c2',100.2), ('s3','c3',100.3);
Hologres に外部テーブルと親パーティションテーブルを作成します。
-- 外部テーブルを作成します。 IMPORT FOREIGN SCHEMA <maxCompute_projectname> LIMIT to( odps_sale_detail) FROM SERVER odps_server INTO public OPTIONS(if_table_exist 'error',if_unsupported_type 'error'); -- Hologres に親パーティションテーブル(内部テーブル)を作成します。 BEGIN ; CREATE TABLE IF NOT EXISTS holo_sale_detail( shop_name TEXT, customer_id TEXT , total_price FLOAT8, sale_date TEXT ) PARTITION BY LIST(sale_date); COMMIT;
Hologres データソースを DataWorks ワークスペースに関連付けます。
Hologres データソースを DataWorks ワークスペースに関連付けます。 詳細については、「Hologres データソースを追加する」をご参照ください。
Hologres SQL ノードを作成し、パーティション値を取得します。
DataWorks コンソールの [datastudio] ページで Hologres SQL ノードを作成します。 パーティション値を取得するための SQL コードを入力します。 値は for-each ノードの入力として使用されます。 Hologres SQL ノードの作成方法の詳細については、「Hologres SQL ノードを作成する」をご参照ください。
次の SQL コード例では、20240112 から 20240116 のパーティションから値を取得する方法を示しています。
SELECT distinct sale_date FROM odps_sale_detail WHERE sale_date > '20240111';
次の結果が返されます。
+------------+ | sale_date | +------------+ | 20240112 | | 20240113 | | 20240114 | | 20240115 | | 20240116 | +------------+
Hologres SQL ノードの出力パラメーターを構成します。
[プロパティ] タブの [入力および出力パラメーター] セクションにある [出力パラメーター] テーブルで、[割り当てパラメーターを追加] をクリックし、デフォルトの出力割り当てパラメーターを選択します。 詳細については、「入力および出力パラメーターの構成」トピックの「特別なシナリオ:割り当てパラメーター」をご参照ください。
子パーティションテーブルに対する操作をループするために for-each ノードを構成します。 詳細については、「for-each ノードを構成する」をご参照ください。
for-each ノードを作成します。
for-each ノードに Hologres SQL ノードを追加します。 start ノードを Hologres SQL ノードの祖先ノードとして構成し、end ノードを Hologres SQL ノードの子孫ノードとして構成します。 SQL コード例:
説明子パーティションテーブルを作成し、子パーティションテーブルにデータを挿入する必要があります。 for-each ノードは、前の手順で Hologres SQL ノードから取得したパーティション値をスクリプトのパラメーターに割り当てます。 この場合、すべての出力値が一致するまで、操作はループで実行されます。
-- 子パーティションテーブルを作成します。 CREATE TABLE IF NOT EXISTS public.holo_sale_detail_${dag.foreach.current} PARTITION OF public.holo_sale_detail FOR VALUES IN('${dag.foreach.current}'); -- データをインポートします。 INSERT INTO public.holo_sale_detail_${dag.foreach.current} SELECT * FROM odps_sale_detail where sale_date ='${dag.foreach.current}';
for-each ノードの構成タブで、[プロパティ] タブをクリックします。 [依存関係] セクションで、祖先ノードを構成します。 入力パラメーター入力パラメーターと出力パラメーターloopDataArray変更値のソース出力
セクションの テーブルで、デフォルトパラメーター を見つけ、[アクション] 列の をクリックします。 列のドロップダウンリストから、祖先割り当てノードの パラメーターを選択します。
for-each ノードを保存してコミットします。
説明使用しているワークスペースが標準モードの場合、ノードをコミットした後、構成タブの右上隅にある [デプロイ] をクリックしてノードをデプロイします。 詳細については、「ノードのデプロイ」をご参照ください。
for-each ノードをテストし、テスト結果を表示します。 詳細については、「for-each ノードの構成」トピックの「for-each ノードのテストとテスト結果の表示」をご参照ください。
[サイクルタスク] ページで、目的のノードを見つけ、[アクション] 列の [DAG] をクリックして、ノードの有向非巡回グラフ(DAG)を開きます。 ノードの DAG で、割り当てノード(最初のノード)を右クリックし、[実行] > [現在および子孫ノードを遡及的に] を選択します。
データバックフィルインスタンスが実行された後、DAG の for-each ノード(2 番目のノード)を右クリックし、[パッチデータ] ページで [内部ノードの表示] を選択します。 表示されるページは、5 つのループが実行されたことを示しています。
Hologres で、子パーティションテーブルが作成され、データが子パーティションテーブルに書き込まれているかどうかを確認します。
トラブルシューティング
for-each ノードを実行すると、ループが失敗した場合、残りのタスクの実行が停止します。 この場合、トラブルシューティングのために for-each ノードの内部ノードを表示できます。
トラブルシューティングが完了したら、[再実行] をクリックして、失敗したタスクを再実行できます。