すべてのプロダクト
Search
ドキュメントセンター

Hologres:DataWorks を使用して複数の子パーティションテーブルに対して操作を実行する

最終更新日:Jan 11, 2025

Hologres の親パーティションテーブルの複数の子パーティションテーブルに対して、INSERT、DELETE、UPDATE などの操作を実行する必要がある場合は、DataWorks の for-each ノードを使用して操作をループで実行できます。 for-each ノードは、複雑なループ処理ロジックを簡素化します。 データが更新された後、親パーティションテーブルからすべての子パーティションテーブルのデータをクエリできます。

背景情報

Hologres はネイティブ PostgreSQL エコシステムと互換性があり、パーティションテーブルをサポートしています。 パーティションテーブルに対して INSERT、DELETE、または UPDATE 操作を実行する場合、親パーティションテーブルではなく、特定の子パーティションテーブルに対して操作を実行する必要があります。 子パーティションテーブルに対して定期的に操作を実行する場合は、DataWorks スケジューリングノードを使用することをお勧めします。 たとえば、DataWorks を使用して、MaxCompute データを子パーティションテーブルに定期的にインポートできます。 詳細については、「DataWorks を使用して MaxCompute データを定期的にインポートする」をご参照ください。

ただし、複数の子パーティションテーブルに対して同時に操作を実行する場合、たとえば、複数の子パーティションテーブルの履歴データを更新する場合、スケジューリングノードは非効率的です。 これは、複数の SQL ステートメントを実行する必要があるためです。 このトピックでは、DataWorks の for-each ノードを使用して複数の子パーティションテーブルに対して操作を実行する方法について説明します。

説明

for-each ノードを使用して、割り当てノードの出力をループでトラバースできます。 詳細については、「for-each ノードを構成する」をご参照ください。

制限

DataWorks Standard Edition 以上では、Hologres SQL ノードで [割り当てパラメーター] を使用できます。

この例では、for-each ノードを使用して、MaxCompute データを Hologres の複数の子パーティションテーブルに書き込みます。

  1. テーブルとデータを準備します。

    1. MaxCompute にパーティションテーブルを作成し、パーティションテーブルにデータを挿入します。

      -- MaxCompute に odps_sale_detail という名前のパーティションテーブルを作成します。
      CREATE TABLE IF NOT EXISTS odps_sale_detail (
          shop_name STRING,
      	  customer_id STRING,
          total_price DOUBLE
      )
      PARTITIONED BY ( sale_date STRING);
      
      ---- ソースパーティションテーブルに 20240110 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240110');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240110') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240111 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240111');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240111') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240112 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240112');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240112') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240113 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240113');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240113') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240114 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240114');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240114') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240115 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240115');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240115') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
      
      -- ソースパーティションテーブルに 20240116 パーティションを作成し、パーティションにデータを書き込みます。
      ALTER TABLE odps_sale_detail ADD IF NOT EXISTS PARTITION(sale_date='20240116');
      INSERT OVERWRITE TABLE odps_sale_detail PARTITION(sale_date='20240116') VALUES 
      ('s1','c1',100.1),
      ('s2','c2',100.2),
      ('s3','c3',100.3);
    2. Hologres に外部テーブルと親パーティションテーブルを作成します。

      -- 外部テーブルを作成します。
      IMPORT FOREIGN SCHEMA <maxCompute_projectname> LIMIT to(
        odps_sale_detail) 
      FROM SERVER odps_server INTO public 
      OPTIONS(if_table_exist 'error',if_unsupported_type 'error');
      
      -- Hologres に親パーティションテーブル(内部テーブル)を作成します。
      BEGIN ;
      CREATE TABLE IF NOT EXISTS holo_sale_detail(
          shop_name TEXT,
      	  customer_id TEXT ,
          total_price FLOAT8,
          sale_date TEXT
      )
      PARTITION BY LIST(sale_date);
      COMMIT;
  2. Hologres データソースを DataWorks ワークスペースに関連付けます。

    Hologres データソースを DataWorks ワークスペースに関連付けます。 詳細については、「Hologres データソースを追加する」をご参照ください。

  3. Hologres SQL ノードを作成し、パーティション値を取得します。

    1. DataWorks コンソールの [datastudio] ページで Hologres SQL ノードを作成します。 パーティション値を取得するための SQL コードを入力します。 値は for-each ノードの入力として使用されます。 Hologres SQL ノードの作成方法の詳細については、「Hologres SQL ノードを作成する」をご参照ください。

      次の SQL コード例では、20240112 から 20240116 のパーティションから値を取得する方法を示しています。

      SELECT distinct sale_date FROM odps_sale_detail WHERE sale_date > '20240111';

      次の結果が返されます。

      +------------+
      | sale_date  |
      +------------+
      | 20240112   |
      | 20240113   |
      | 20240114   |
      | 20240115   |
      | 20240116   |
      +------------+
    2. Hologres SQL ノードの出力パラメーターを構成します。

      [プロパティ] タブの [入力および出力パラメーター] セクションにある [出力パラメーター] テーブルで、[割り当てパラメーターを追加] をクリックし、デフォルトの出力割り当てパラメーターを選択します。 詳細については、「入力および出力パラメーターの構成」トピックの「特別なシナリオ:割り当てパラメーター」をご参照ください。image

  4. 子パーティションテーブルに対する操作をループするために for-each ノードを構成します。 詳細については、「for-each ノードを構成する」をご参照ください。

    1. for-each ノードを作成します。

    2. for-each ノードに Hologres SQL ノードを追加します。 start ノードを Hologres SQL ノードの祖先ノードとして構成し、end ノードを Hologres SQL ノードの子孫ノードとして構成します。 SQL コード例:

      説明

      子パーティションテーブルを作成し、子パーティションテーブルにデータを挿入する必要があります。 for-each ノードは、前の手順で Hologres SQL ノードから取得したパーティション値をスクリプトのパラメーターに割り当てます。 この場合、すべての出力値が一致するまで、操作はループで実行されます。

      -- 子パーティションテーブルを作成します。
      CREATE TABLE IF NOT EXISTS  public.holo_sale_detail_${dag.foreach.current} PARTITION OF public.holo_sale_detail FOR VALUES IN('${dag.foreach.current}');
      
      
      -- データをインポートします。
      INSERT  INTO public.holo_sale_detail_${dag.foreach.current} SELECT * FROM odps_sale_detail where sale_date ='${dag.foreach.current}';
    3. for-each ノードの構成タブで、[プロパティ] タブをクリックします。 [依存関係] セクションで、祖先ノードを構成します。 入力パラメーター入力パラメーターと出力パラメーターloopDataArray変更値のソース出力image セクションの テーブルで、デフォルトパラメーター を見つけ、[アクション] 列の をクリックします。 列のドロップダウンリストから、祖先割り当てノードの パラメーターを選択します。

    4. for-each ノードを保存してコミットします。

      説明

      使用しているワークスペースが標準モードの場合、ノードをコミットした後、構成タブの右上隅にある [デプロイ] をクリックしてノードをデプロイします。 詳細については、「ノードのデプロイ」をご参照ください。

  5. for-each ノードをテストし、テスト結果を表示します。 詳細については、「for-each ノードの構成」トピックの「for-each ノードのテストとテスト結果の表示」をご参照ください。

    1. [サイクルタスク] ページで、目的のノードを見つけ、[アクション] 列の [DAG] をクリックして、ノードの有向非巡回グラフ(DAG)を開きます。 ノードの DAG で、割り当てノード(最初のノード)を右クリックし、[実行] > [現在および子孫ノードを遡及的に] を選択します。image

    2. データバックフィルインスタンスが実行された後、DAG の for-each ノード(2 番目のノード)を右クリックし、[パッチデータ] ページで [内部ノードの表示] を選択します。 表示されるページは、5 つのループが実行されたことを示しています。image

    3. Hologres で、子パーティションテーブルが作成され、データが子パーティションテーブルに書き込まれているかどうかを確認します。image

トラブルシューティング

for-each ノードを実行すると、ループが失敗した場合、残りのタスクの実行が停止します。 この場合、トラブルシューティングのために for-each ノードの内部ノードを表示できます。image

トラブルシューティングが完了したら、[再実行] をクリックして、失敗したタスクを再実行できます。image