すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:データスキューのチューニング

最終更新日:Jan 07, 2026

このトピックでは、MaxCompute における一般的なデータスキューのシナリオと、それに対応するソリューションについて説明します。

MapReduce

データスキューを理解するには、まず MapReduce を理解する必要があります。MapReduce は、分割統治法を用いる分散コンピューティングフレームワークです。この手法は、大規模または複雑な問題を、より小さく管理しやすいサブ問題に分割し、それらを解決した後、結果を最終的なソリューションに統合します。MapReduce は、従来の並列プログラミングフレームワークよりも高いフォールトトレランス、使いやすさ、および優れた拡張性を提供します。MapReduce を使用して並列プログラミングを行う場合、データストレージ、ノード間通信、転送メカニズムなど、分散クラスターにおけるプログラミング以外の問題を管理する必要がありません。この機能により、分散プログラミングが大幅に簡素化されます。

以下の図は、MapReduce のワークフローを示しています。MapReduce

データスキュー

データスキューは、多くの場合 reduce ステージで発生します。マッパーは通常、入力ファイルに基づいてデータを均等に分割します。データスキューは、テーブル内のデータが異なるワーカー間で不均等に分散される場合に発生します。この不均等な分散により、一部のワーカーは計算を迅速に終了する一方で、他のワーカーははるかに長い時間がかかります。本番環境では、ほとんどのデータは偏っており、多くの場合 80/20 ルールに従います。例えば、フォーラムのアクティブユーザーの 20% が投稿の 80% を寄稿したり、ウェブサイトのトラフィックの 80% を 20% のユーザーが生成したりします。ビッグデータの時代では、データ量の爆発的な増加に伴い、データスキューは分散プログラムのパフォーマンスに深刻な影響を与える可能性があります。一般的な症状として、ジョブの実行進捗が 99% でスタックすることが挙げられます。

データスキューの特定方法

MaxCompute では、Logview を使用してデータスキューを特定できます。以下の手順でそのプロセスを説明します。判断数据倾斜

  1. Fuxi ジョブで、ジョブをレイテンシーの降順でソートし、実行時間が最も長いジョブステージを選択します。

  2. Fuxi ステージの Fuxi インスタンスリストで、タスクをレイテンシーの降順でソートします。平均よりも著しく実行時間が長いタスクを選択します。通常、リストの最初のタスクに注目できます。その StdOut ログを表示します。

  3. StdOut の情報を使用して、対応するジョブ実行グラフを表示します。

  4. ジョブ実行グラフのキー情報を使用して、データスキューを引き起こしている SQL コードスニペットを特定します。

次の例は、このメソッドの使用方法を示しています。

  1. タスクの操作ログから Logview ログを見つけます。詳細については、「Logview のエントリーポイント」をご参照ください。logview

  2. 問題を迅速に特定するために、Logview インターフェイスに移動し、Fuxi タスクを [レイテンシー] の降順でソートし、実行時間が最も長いタスクを選択します。Fuxi Task

  3. タスク R31_26_27 の実行時間が最も長いです。タスク R31_26_27 をクリックして、次の図に示すようにインスタンス詳細ページを開きます。时间最长业务Latency: {min:00:00:06, avg:00:00:13, max:00:26:40} は、このタスクのすべてのインスタンスについて、最小実行時間が 6 秒、平均実行時間が 13 秒、最大実行時間が 26 分 40 秒であることを示します。Latency (インスタンス実行時間) で降順にソートできます。実行時間が長い 4 つのインスタンスが表示されます。MaxCompute は、Fuxi インスタンスの実行時間が平均の 2 倍以上である場合、それをロングテールと見なします。これは、実行時間が 26 秒 を超えるタスクインスタンスはロングテールと見なされることを意味します。この場合、21 個のインスタンスの実行時間が 26 秒 を超えています。ロングテールインスタンスの存在が常にタスクスキューを示すわけではありません。インスタンス実行時間の avgmax の値も比較する必要があります。max 値が avg 値よりはるかに大きい場合、深刻なデータスキューを示しています。このタスクには管理が必要です。

  4. [標準出力] 列の Output log アイコンをクリックすると、次の図に示すように出力ログが表示されます。Example output

  5. 問題を特定したら、[ジョブ詳細] タブに移動します。R31_26_27 を右クリックし、[すべて展開] を選択してタスクを展開します。詳細については、「Logview 2.0 を使用してジョブ情報を表示する」をご参照ください。展开任务 StreamLineRead22 の前のステップである StreamLineWriter21 を調べます。このステップにより、データスキューを引き起こしているキーが明らかになります:new_uri_path_structurecookie_x5check_userid、および cookie_userid。この情報を使用して、データスキューを引き起こしている SQL スニペットを見つけます。KEY

データスキューのトラブルシューティングとソリューション

データスキューの最も一般的な原因は次のとおりです:

  • Join

  • GroupBy

  • Count (Distinct)

  • ROW_NUMBER (TopN)

  • 動的パーティション

発生頻度は、最も一般的なものから順に次のようになります:JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > 動的パーティション

Join

JOIN 操作によるデータスキューは、大きいテーブルと小さいテーブルの JOIN、大きいテーブルと中規模テーブルの JOIN、またはホットキー値がロングテールを引き起こす場合など、さまざまな状況で発生する可能性があります。

  • 大きいテーブルと小さいテーブルの JOIN。

    • データスキューの例

      次の例では、t1 は大きいテーブルで、t2t3 は小さいテーブルです。

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • ソリューション

      次の例に示すように、MAPJOIN ヒント構文を使用します。

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • 注意事項

        • 小さいテーブルまたはサブクエリを参照する場合は、そのエイリアスを使用する必要があります。

        • MapJoin は、サブクエリを小さいテーブルとして使用することをサポートしています。

        • MapJoin では、非等値結合を使用したり、OR で複数の条件を接続したりできます。ON 句を省略し、mapjoin on 1 = 1 を使用してデカルト積を計算することもできます。例:select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;。ただし、この操作はデータ膨張を引き起こす可能性があります。

        • MapJoin では、複数の小さいテーブルをカンマ (,) で区切ります。例:/*+ mapjoin(a,b,c)*/

        • map ステージ中、MapJoin は指定されたテーブルからすべてのデータをメモリにロードします。したがって、指定されたテーブルは小さい必要があります。ロードされた後のテーブルが占有する合計メモリは 512 MB を超えてはなりません。MaxCompute は圧縮ストレージを使用するため、小さいテーブルのデータサイズはメモリにロードされた後に大幅に拡大します。512 MB の制限は、データがロードされた後のサイズに適用されます。次のパラメーターを設定することで、この制限を最大 8192 MB まで増やすことができます。

          SET odps.sql.mapjoin.memory.max=2048;
      • MapJoin 操作の制限事項

        • LEFT OUTER JOIN では、左側のテーブルが大きいテーブルである必要があります。

        • RIGHT OUTER JOIN では、右側のテーブルが大きいテーブルである必要があります。

        • FULL OUTER JOIN はサポートされていません。

        • INNER JOIN では、左側または右側のテーブルのいずれかが大きいテーブルであってもかまいません。

        • MapJoin は最大 128 個の小さいテーブルをサポートします。それ以上指定すると構文エラーが発生します。

  • 大きいテーブルと中規模テーブルの JOIN。

    • データスキューの例

      次の例では、t0 は大きいテーブルで、t1 は中規模テーブルです。

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • ソリューション

      次の例に示すように、DISTRIBUTED MAPJOIN ヒントを使用してデータスキューを解決します。

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • JOIN におけるホットキー値によるロングテール。

    • データスキューの例

      次の例では、eleme_uid フィールドに多くのホットキー値が含まれており、データスキューを引き起こす可能性があります。

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      ON t1.eleme_uid = t2.eleme_uid;
    • ソリューション

      この問題を解決するには、次の 4 つの方法のいずれかを使用できます。

      No.

      ソリューション

      説明

      ソリューション 1

      手動でのホットバリュー分割

      ホットキー値を分析して特定します。プライマリテーブルからホットキー値を持つレコードをフィルターします。まず、それらに対して MapJoin を実行します。次に、ホットキー値以外のレコードに対して MergeJoin を実行します。最後に、2 つの JOIN の結果をマージします。

      ソリューション 2

      SkewJoin パラメーターの設定

      set odps.sql.skewjoin=true;

      ソリューション 3

      SkewJoin ヒント

      ヒントを使用します:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。SkewJoin ヒントメソッドは、スキューキーを見つけるための追加ステップを追加するため、クエリの実行時間が増加します。スキューキーが既にわかっている場合は、SkewJoin パラメーターを設定して時間を節約できます。

      ソリューション 4

      乗数テーブルを使用したモジュロ等価結合の実行

      乗数テーブルを使用します。

      • 手動でのホットキー値の分割

        このメソッドは、ホットキー値を分析して特定することを含みます。まず、プライマリテーブルからホットキー値を含むレコードをフィルターし、それらに対して MapJoin を実行します。次に、ホットキー値を含まない残りのレコードに対して MergeJoin を実行します。最後に、2 つの JOIN の結果をマージします。次のコードは例を示しています:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        ON t3.eleme_uid = t4.eleme_uid
      • SkewJoin パラメーターの設定

        これは一般的なソリューションです。MaxCompute は、SkewJoin 機能を有効にするための set odps.sql.skewjoin=true; パラメーターを提供します。ただし、単に SkewJoin を有効にしてもタスクの実行には影響しません。この機能が有効になるには、odps.sql.skewinfo パラメーターも設定する必要があります。odps.sql.skewinfo パラメーターは、JOIN 最適化の詳細を指定します。以下はコマンド構文の例です。

        SET odps.sql.skewjoin=true;
        SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  --skewed_src はトラフィックテーブル、skewed_value はホットキー値です。

        次の例は、コマンドの使用方法を示しています:

        --単一フィールドの単一スキュー値の場合
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        
        --単一フィールドの複数スキュー値の場合
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • SkewJoin ヒント

        SELECT 文で MapJoin を実行するには、次のヒントを使用します:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。このヒントでは、table_name はスキューテーブル、column_name はスキューカラム、value はスキューキー値です。次の例は、このヒントの使用方法を示しています。

        --方法 1:テーブル名をヒントします。テーブルのエイリアスをヒントすることに注意してください。
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1;
        
        --方法 2:テーブル名と、スキューしている可能性があると思われるカラムをヒントします。例えば、テーブル a のカラム c0 と c1 にデータスキューがあります。
        SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        
        --方法 3:テーブル名とカラムをヒントし、スキューキー値を指定します。型が STRING の場合は、値を引用符で囲みます。例えば、(a.c0=1 and a.c1="2") と (a.c0=3 and a.c1="4") の両方の値にデータスキューがあります。
        SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        説明

        値を直接指定する SkewJoin ヒントメソッドは、手動でホットキー値を分割したり、値を指定せずに SkewJoin パラメーターを設定したりするよりも効率的です。

        SkewJoin ヒントでサポートされる JOIN タイプ:

        • Inner Join:JOIN のどちら側にもヒントできます。

        • Left Join、Semi Join、Anti Join:左側のテーブルにのみヒントできます。

        • Right Join:右側のテーブルにのみヒントできます。

        • Full Join:Skew Join ヒントはサポートされていません。

        ヒントは集約操作を実行するためコストがかかるため、データスキューが確実にある JOIN にのみヒントを追加してください。

        ヒントされた JOIN では、左の結合キーのデータの型が右の結合キーのデータの型と一致する必要があります。そうでない場合、SkewJoin ヒントは機能しません。例えば、前の例では、a.c0 の型は b.c0 の型と一致する必要があり、a.c1 の型は b.c1 の型と一致する必要があります。サブクエリで CAST を使用して、結合キーの型が一致するようにできます。次の例は、その方法を示しています:

        CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int);
        CREATE TABLE T1(c0 string, c1 int, c2 int);
        
        --方法 1:
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1;
        
        --方法 2:
        SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;

        SkewJoin ヒントを追加すると、オプティマイザーは集約操作を実行して上位 20 個のホットキー値を取得します。値 20 はデフォルトです。この値は set odps.optimizer.skew.join.topk.num = xx; を実行することで変更できます。

        • SkewJoin ヒントは、JOIN の片側のみをヒントすることをサポートします。

        • ヒントされた JOIN は left key = right key 条件を持つ必要があり、デカルト積 JOIN はサポートしません。

        • 既に MapJoin ヒントがある JOIN に SkewJoin ヒントを追加することはできません。

      • 乗数テーブルを使用したモジュロ等価結合の実行

        このソリューションのロジックは、前の 3 つとは異なります。これは分割統治アプローチではありません。代わりに、INT データ型の単一カラムを持つ乗数テーブルを使用します。値は 1 から N の範囲で、N はデータスキューの度合いによって決まります。この乗数テーブルは、ユーザー行動テーブルを N 倍に拡張するために使用されます。その後、JOIN を実行するときに、ユーザー ID と number の両方を関連付けキーとして使用できます。元々ユーザー ID のみでデータを分散させることによって引き起こされていたデータスキューは、number 条件が追加されたことにより、元のレベルの 1/N に減少します。ただし、この方法ではデータも N 倍に膨張します。

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        AND mod(t1.<value_col>,10)+1 = t2.number;

        データ膨張の問題に対処するために、両方のテーブルでホットキー値を持つレコードのみに拡張を制限できます。ホットキー値以外のレコードは変更されません。まず、ホットキー値を持つレコードを見つけます。次に、eleme_uid_join という名前の新しいカラムを追加して、トラフィックテーブルとユーザー行動テーブルを別々に処理します。ユーザー ID がホットキー値の場合、CONCAT を使用してランダムに割り当てられた正の整数 (例えば 0 から 1,000) を追加します。ホットキー値でない場合は、元のユーザー ID を保持します。2 つのテーブルを JOIN する際には、eleme_uid_join カラムを使用します。この方法は、ホットキー値の乗数を増やしてスキューを減らし、ホットキー値以外のレコードの不要なデータ膨張を回避します。ただし、このロジックは元のビジネスロジック SQL を完全に変更するため、推奨されません。

GroupBy

次の例は、GroupBy 句を含む擬似コードを示しています。

SELECT  shop_id
        ,sum(is_open) AS business_days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;

データスキューが発生した場合、次の 3 つのソリューションのいずれかを使用して解決できます:

No.

ソリューション

説明

ソリューション 1

Group By のスキュー対策パラメーターの設定

set odps.sql.groupby.skewindata=true;

ソリューション 2

乱数の追加

ロングテールを引き起こしているキーを分割します。

ソリューション 3

ローリングテーブルの作成

コストを削減し、効率を向上させます。

  • ソリューション 1:`GROUP BY` のスキュー対策パラメーターの設定

    SET odps.sql.groupby.skewindata=true;
  • ソリューション 2:乱数の追加

    ソリューション 1 とは異なり、このソリューションでは SQL 文を再書き込みする必要があります。ロングテールを引き起こすキーを分割するために乱数を追加することは、`GROUP BY` のロングテールを解決する効果的な方法です。

    SQL 文 SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key; について、コンバイナが使用されない場合、map ノードはデータを reduce ノードにシャッフルします。reduce ノードはその後 `COUNT` 操作を実行します。対応する実行計画は M->R です。

    ロングテールを引き起こすキーを見つけた場合、そのキーの作業を次のように再分散できます:

    -- ロングテールを引き起こすキーが KEY001 であると仮定します。
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    変更後、実行計画は M->R->R になります。実行ステップ数は増えますが、ロングテールキーは 2 段階で処理されるため、全体の実行時間を短縮できます。リソース消費と時間効率はソリューション 1 と似ています。しかし、実際のシナリオでは、ロングテールは複数のキーによって引き起こされることが多いため、ロングテールキーを見つけて SQL 文を再書き込みするコストを考慮すると、ソリューション 1 の方がコスト効率が高いです。

  • ローリングテーブルの作成

    このソリューションの核心は、コストを削減し、効率を向上させることです。主な要件は、過去 1 年間のマーチャントデータを取得することです。オンラインタスクでは、毎回 T-1 から T-365 までのすべてのパーティションを読み取ることは、かなりのリソースを浪費します。ローリングテーブルを作成することで、過去 1 年間のデータの取得に影響を与えることなく、読み取るパーティションの数を減らすことができます。次の例は、その方法を示しています。

    まず、Group By を使用して集約された 365 日分のマーチャントビジネスデータを初期化します。データ更新日をマークし、データを a という名前のテーブルに保存します。その後のオンラインタスクでは、T-2 日目のテーブル atable_xxx_di テーブルと JOIN し、その後 Group By を使用できます。これにより、毎日読み取られるパーティションの数が 365 から 2 に減少します。プライマリキー shopid の重複が大幅に減少し、リソース消費も削減されます。

    -- ローリングテーブルの作成
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING COMMENT,
      last_update_ds COMMENT,
      365d_open_days COMMENT
    )
    PARTITIONED BY
    (
      ds STRING COMMENT 'Date partition'
    )LIFECYCLE 7;
    -- 365d が 2021 年 5 月 1 日から 2022 年 5 月 1 日までと仮定し、まずテーブルを初期化します。
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    -- 次に、実行されるオンラインタスクは次のとおりです。
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      SELECT aa.shop_id, 
             aa.last_update_ds, 
             365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- 営業日の無限ローリングを排除します。
      FROM (
        SELECT shop_id, 
               max(last_update_ds) AS last_update_ds, 
               sum(365d_open_days) AS 365d_open_days
        FROM (
          SELECT shop_id,
                 ds AS last_update_ds,
                 sum(is_open) AS 365d_open_days
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          UNION ALL
          SELECT shop_id,
                 last_update_ds,
                 365d_open_days
          FROM m_xxx_365_df
          WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        GROUP BY shop_id
      ) AS aa
      LEFT JOIN (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) AS bb
      ON aa.shop_id = bb.shop_id;
                                

Count (Distinct)

あるテーブルに次のデータ分布があるとします。

ds (パーティション)

cnt (レコード数)

20220416

73025514

20220415

2292806

20220417

2319160

次の文を使用すると、データスキューが発生する可能性があります:

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

次のソリューションが利用可能です:

No.

ソリューション

説明

ソリューション 1

パラメーター設定の最適化

SET odps.sql.groupby.skewindata=true;

ソリューション 2

一般的な2段階集約

パーティションフィールド値に乱数を連結します。

ソリューション 3

2段階集約に類似

まず、2つのフィールド (ds+shop_id) でグループ化し、次に count(distinct) を使用します。

  • ソリューション 1:パラメーター設定による最適化

    次のパラメーターを設定します。

    SET odps.sql.groupby.skewindata=true;
  • ソリューション 2:一般的な2段階集約の使用

    shop_id フィールドのデータが均一でない場合、ソリューション 1 での最適化はできません。より一般的な方法は、パーティションフィールドの値に乱数を連結することです。

    -- 方法 1:乱数を連結します。CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    
    -- 方法 2:乱数フィールドを追加します。ROUND(RAND(),1)*10 AS randint10
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • ソリューション 3:2段階集約に類似した方法の使用

    GroupBy フィールドと Distinct フィールドの両方のデータが均一である場合、まず 2 つのフィールド (`ds` と `shop_id`) でグループ化し、次に count(distinct) コマンドを使用することでクエリを最適化できます。

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER (TopN)

次の例は、上位 10 件を示しています。

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

データスキューが発生した場合、次のいずれかの方法で解決できます:

No.

ソリューション

説明

ソリューション 1

SQL を使用した2段階集約

ランダムなカラムを追加するか、乱数を連結してパーティションのパラメーターとして使用します。

ソリューション 2

ユーザー定義の集計関数 (UDAF) を使用した2段階集約

最小ヒープ優先度付きキューを持つ UDAF を使用して最適化します。

  • ソリューション 1:SQL を使用した2段階集約

    map ステージで各パーティショングループのデータをできるだけ均一にするために、ランダムなカラムを追加し、それをパーティションのパラメーターとして使用できます。

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- 2. 乱数をカスタマイズします。
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • ソリューション 2:UDAF を使用した2段階集約

    SQL メソッドは、保守が困難な大量のコードになる可能性があります。この場合、最小ヒープ優先度付きキューを持つユーザー定義の集計関数 (UDAF) を使用して最適化できます。これは、iterate ステージでは上位 N 個の項目のみが処理され、merge ステージでは N 個の要素のみがマージされることを意味します。プロセスは次のとおりです。

    • iterate:最初の K 個の要素をプッシュします。後続の要素については、最小の上位要素と継続的に比較し、ヒープ内の要素を交換します。

    • merge:2 つのヒープをマージし、上位 K 個の要素をインプレースで返します。

    • terminate:ヒープを配列として返します。

    • SQL 文で、配列を行に分割します。

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

動的パーティション

動的パーティションは、パーティションテーブルにデータを挿入する際にパーティションキー列を指定できる機能ですが、特定の値を指定する必要はありません。代わりに、Select 句の対応するカラムがパーティション値を提供します。したがって、SQL 文を実行する前にどのパーティションが作成されるかはわかりません。SQL 文が実行された後、パーティションカラムの値に基づいてどのパーティションが作成されたかを判断できます。詳細については、「動的パーティションへのデータの挿入または上書き (DYNAMIC PARTITION)」をご参照ください。以下は SQL の例です。

CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);

INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;

多くのシナリオで、動的パーティションを持つテーブルを作成するとデータスキューにつながる可能性があります。データスキューが発生した場合、次のソリューションを使用して解決できます。

No.

ソリューション

説明

ソリューション 1

パラメーター構成の最適化

パラメーター構成を通じて最適化します。

ソリューション 2

パーティションプルーニングの最適化

多くのレコードを持つパーティションを見つけ、それらをプルーニングし、別々に挿入します。

  • ソリューション 1:パラメーター構成による最適化

    動的パーティションを使用すると、異なる条件を満たすデータを異なるパーティションに入れることができます。この機能により、特に多くのパーティションがある場合に、テーブルに書き込むための複数の Insert Overwrite 操作が不要になり、コードを大幅に簡素化できます。ただし、動的パーティションはあまりにも多くの小規模ファイルを作成する可能性もあります。

    • データスキューの例

      次の単純な SQL を例にとります。

      INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM  part_test;

      K 個の Map インスタンスと N 個のターゲットパーティションがあると仮定します。

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      最も極端なケースでは、K*N 個の小規模ファイルが生成される可能性があります。あまりにも多くの小規模ファイルは、ファイルシステムに多大な管理負担をかける可能性があります。そのため、MaxCompute は追加の reduce ステージを導入することで動的パーティションを処理します。同じターゲットパーティションを単一の reduce インスタンスまたは少数の reduce インスタンスに割り当てて書き込みます。この方法により、あまりにも多くの小規模ファイルが作成されるのを防ぎます。この reduce 操作は常に最後の reduce タスク操作です。MaxCompute では、この機能はデフォルトで有効になっており、次のパラメーターが `true` に設定されていることを意味します。

      SET odps.sql.reshuffle.dynamicpt=true;

      この機能をデフォルトで有効にすると、小規模ファイルが多すぎる問題が解決され、単一のインスタンスが生成するファイルが多すぎるためにタスクが失敗するのを防ぎます。ただし、データスキューや追加の reduce 操作によるコンピューティングリソースの消費など、新たな問題も発生します。したがって、これらの要素を慎重にバランスさせる必要があります。

    • ソリューション

      set odps.sql.reshuffle.dynamicpt=true; を有効にし、追加の reduce ステージを導入する目的は、小規模ファイルが多すぎる問題を解決することです。しかし、ターゲットパーティションの数が少なく、小規模ファイルが多すぎるリスクがない場合、この機能をデフォルトで有効にすると、コンピューティングリソースが無駄になり、パフォーマンスが低下します。この状況では、set odps.sql.reshuffle.dynamicpt=false; を設定してこの機能を無効にすることで、パフォーマンスを大幅に向上させることができます。以下に例を示します。

      INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp)
      SELECT /*+ mapjoin(t2) */
          '20150503' AS ds,
          t1.lv AS lv,
          t1.type AS tp
      FROM
          (SELECT  ...
          FROM tbbi.ads_tb_cornucopia_user_d
          WHERE ds = '20150503'
          AND lv IN ('flat', '3rd')
          AND tp = 'T'
          AND pref_cat2_id > 0
          ) t1
      JOIN
          (SELECT ...
          FROM tbbi.ads_tb_cornucopia_auct_d
          WHERE ds = '20150503'
          AND tp = 'T'
          AND is_all = 'N'
          AND cat2_id > 0
          ) t2
      ON t1.pref_cat2_id = t2.cat2_id;

      上記のコードにデフォルトのパラメーターを使用すると、タスク全体の実行に約 1 時間 30 分かかります。最後の reduce タスクには約 1 時間 20 分かかり、これは総実行時間の約 90% に相当します。追加の reduce タスクを導入すると、各 reduce インスタンスのデータ分布が非常に不均一になり、ロングテールが発生します。

    上記の例では、過去に作成された動的パーティションの数を分析すると、毎日約 2 つの動的パーティションしか作成されていません。したがって、set odps.sql.reshuffle.dynamicpt=false; を安全に設定できます。タスクはわずか 9 分で完了できます。この場合、このパラメーターを false に設定すると、パフォーマンスが大幅に向上し、コンピューティング時間とリソースが節約され、1 つのパラメーターを設定するだけで高い限界便益が得られます。

    この最適化は、時間がかかり、大量のリソースを消費する大規模なタスクに限定されません。実行が速く、リソース消費が少ない小規模で通常のタスクでも、動的パーティションを使用し、動的パーティションの数が多くない限り、odps.sql.reshuffle.dynamicpt パラメーターを false に設定できます。この設定は、すべての場合においてリソースを節約し、パフォーマンスを向上させます。

    タスクの期間に関係なく、次の 3 つの条件を満たすノードは最適化できます。

    • 動的パーティションを使用している

    • 動的パーティションの数 <= 50

    • `set odps.sql.reshuffle.dynamicpt=false;` が設定されていない

    ノードにこのパラメーターを設定する緊急度は、diag_level フィールドで識別される最後の Fuxi インスタンスの実行時間によって、次のルールに基づいて決定されます:

    • Last_Fuxi_Inst_Time が 30 分を超える場合:Diag_Level=4 ('Critical')

    • Last_Fuxi_Inst_Time が 20 分から 30 分の場合:Diag_Level=3 ('High')

    • Last_Fuxi_Inst_Time が 10 分から 20 分の場合:Diag_Level=2 ('Medium')

    • Last_Fuxi_Inst_Time が 10 分未満の場合:Diag_Level=1 ('Low')

  • 解決策 2: 最適化されたトリミング。

    動的パーティションにデータを挿入する際に map ステージで発生するデータスキューを解決するには、多くのレコードを持つパーティションを見つけ、それらをプルーニングしてから別々に挿入します。実際のシナリオに基づいて、次の例のように map ステージのパラメーター設定を変更できます:

    SET odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    結果は、全表スキャンが実行されたことを示しています。パフォーマンスをさらに最適化するには、次のようにシステムが生成する Reduce ジョブを無効にできます:

    SET odps.sql.reshuffle.dynamicpt=false ;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    動的パーティションにデータを挿入する際に map ステージで発生するデータスキューを解決するには、多くのレコードを持つパーティションを特定し、それらを分離してから別々に挿入します。手順は次のとおりです。

    1. 次のコマンドを使用して、多くのレコードを含むパーティションをクエリします。

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      パーティションの一部は次のとおりです:

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. 次のコマンドを使用して、上記で特定されたパーティションにデータをフィルターして挿入し、多くのレコードを含むパーティションに対しては別の挿入操作を実行します。

      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      SET odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      SELECT  ds
        ,hh,COUNT(*) AS cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt DESC;