すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:データ スキューのチューニング

最終更新日:Jun 24, 2026

このトピックでは、MaxCompute における一般的なデータスキューのシナリオとそのソリューションを説明します。

MapReduce

データスキューを理解するには、まず MapReduce を理解する必要があります。MapReduce は、分割統治法を採用する分散コンピューティングフレームワークです。大規模または複雑な問題を、扱いやすい小さなサブ問題に分割して解決し、それらの結果を結合して最終的な出力を生成します。従来の並列プログラミングフレームワークと比べて、MapReduce には、高いフォールトトレランス、使いやすさ、優れたスケーラビリティなどの利点があります。MapReduce で並列プログラムを実装する場合、データストレージ、ノード間通信、データ転送など、分散クラスターの基盤となる複雑さを管理する必要はありません。これにより、分散プログラミングが大幅に簡素化されます。

次の図は、MapReduce のワークフローを示しています:MapReduce

データスキュー

データスキューは、Reducer ステージで最も頻繁に発生します。Mapper は通常、入力ファイルを分割することで作業を均等に分散しますが、データスキューとは、異なるワーカー間でデータが不均衡に分散された状態のことです。この不均衡により、一部のワーカーは迅速に処理を完了する一方で、他のワーカーは完了までに大幅に時間がかかり、パフォーマンスボトルネックが発生します。実際には、データは自然に偏る傾向があり、パレートの法則 (「80 対 20 の法則」とも呼ばれます) に従います。例えば、フォーラムのアクティブユーザーの 20% が投稿の 80% を占めたり、Web サイトのトラフィックの 80% がわずか 20% のユーザーからもたらされたりします。ビッグデータ処理では、この問題により分散ジョブのパフォーマンスが著しく低下します。よく見られる症状として、一部の過負荷状態のワーカーがタスクを完了するのを待つ間、ジョブが 99% の進行状態で停止しているように見えることが挙げられます。

データスキューの特定

MaxCompute では、Logview を使用してデータスキューを特定できます。 次の手順に従います:判断数据倾斜

  1. [Fuxi Jobs] で、ジョブをレイテンシーの降順でソートし、ランタイムが最も長いジョブステージを選択します。

  2. そのステージの Fuxi インスタンスリストで、インスタンスをレイテンシーの降順でソートします。 ランタイムが平均より著しく長いインスタンスを選択します。 通常、ソートされたリストの最初のインスタンスから始めることができます。 次に、[StdOut] 列でその出力ログを確認します。

  3. [StdOut] ログの情報を使用して、対応するジョブ実行グラフを検索して調べます。

  4. ジョブ実行グラフのキーとなる情報を使用して、データスキューの原因となっている SQL スニペットを特定します。

次の例では、このプロセスを説明します。

  1. タスクの実行ログで Logview の URL を見つけます。 詳細については、「Logview のエントリポイント」をご参照ください。

    OK
    2022-05-19 15:29:13 start to get jobId:
    2022-05-19 15:29:13 get jobid:20220519072913521gdkgw3ys
    ID = 20220519072913521gdkgw3ys
    log view:
    http://logview.alibaba-inc.com/logview/?h=http://service.odps.aliyun-inc.com/api&p=China&i=20220519072913521gdkgw3ys&token=xxx
    Job Queueing...
    Job Queueing...
    Job Queueing...
    Job Queueing...
    Job Queueing...
    Summary:
    resource cost: cpu 0.57 Core * Min, memory 0.57 GB * Min
    inputs:
            xxx/ds=20220519/hh=14: 286 (1166786 bytes)
    outputs:
            xxx/ds=20220519/hh=14: 671 (36754 bytes)
    Job run time: 0.000
    Job run mode: service job 2.0
    Job run engine: execution engine
    M1:
            bubble: 0
            instance count: 1
            run time: 0.000
            instance time:
                    min: 0.000, max: 0.000, avg: 0.000
            input records:
            output records:
                    AdhocSink2: 1  (min: 1, max: 1, avg: 1)
            metrics_output_count:
  2. Logview を開きます。タスクを [レイテンシー] で降順に並べ替えて、問題をすばやく特定します。Logview の [Fuxi Jobs] ページで、[Fuxi Task] 列と [レイテンシー] 列を確認し、実行時間が最も長いタスクを特定します。この例では、R31_26_27 のレイテンシーは 00:26:44.379 で、完了率はわずか 99% ですが、他のすべてのタスクは完了しています。

  3. タスク R31_26_27 のランタイムが最も長いです。 タスク R31_26_27 をクリックして、インスタンス詳細ページを開きます。 Fuxi タスク R31_26_27 のインスタンス詳細では、上部のレイテンシー統計に min:00:00:06, avg:00:00:13, max:00:26:40 と表示されます。 最大値と平均値の間に大きなギャップがある場合は、データスキューを示しています。 [SmartFilter][Long-Tails] オプションと [Data-Skew] オプションを使用すると、問題のあるインスタンスをすばやくフィルタリングできます。 ランタイムが最も長いインスタンスのレイテンシーは 00:26:40.878 です。 Latency: {min:00:00:06, avg:00:00:13, max:00:26:40} は、タスク内のすべてのインスタンスについて、最小ランタイムが 6 s、平均ランタイムが 13 s、最大ランタイムが 00:26:40 であることを示しています。 インスタンスを Latency (インスタンスランタイム) の降順でソートすると、4 つのインスタンスのランタイムが比較的長いことがわかります。 MaxCompute は、ランタイムが平均ランタイムの 2 倍を超える Fuxi インスタンスをロングテールとして識別します。 これは、ランタイムが 26 s を超えるタスクインスタンスがロングテール (Long-Tails) として識別されることを意味します。 この例では、21 個のインスタンスのランタイムが 26 s を超えています。 ロングテール インスタンスが存在しても、必ずしもデータスキューを示すわけではありません。 インスタンスのレイテンシーの avg 値と max 値を比較する必要もあります。 max 値が avg 値よりはるかに大きいタスクは、データスキューが深刻であると見なされ、最適化が必要です。

  4. 次の例に示すように、[StdOut] 列の 输出日志 アイコンをクリックして出力ログを表示します。 ログには、StreamLineRead22 has large amounts of data (about 115GB) to be sorting, may need a long time. という重大な警告が含まれています。 このメッセージは、インスタンスでデータスキューが発生していることを示しています。 このインスタンスの実行ログは次のとおりです。

    [2022-05-14 21:04:41] ---------- Run Task: R31_26_27#762 ---------- [2022-05-14 21:04:46] Reader StreamLineRead22, Has total data: true, estimated size: 123518312656, record count: 2937076 [2022-05-14 21:04:46] StreamLineRead22 has large amounts of data (about 115GB) to be sorting, may need a long time. [2022-05-14 21:05:48] Reader StreamLineRead22 is sorting, current read count: 398932, dump file count: 54 [2022-05-14 21:06:48] Reader StreamLineRead22 is sorting, current read count: 835951, dump file count: 121 [2022-05-14 21:07:48] Reader StreamLineRead22 is sorting, current read count: 1214555, dump file count: 182 [2022-05-14 21:08:48] Reader StreamLineRead22 is sorting, current read count: 1654164, dump file count: 251 [2022-05-14 21:09:48] Reader StreamLineRead22 is sorting, current read count: 2076167, dump file count: 317 [2022-05-14 21:10:48] Reader StreamLineRead22 is sorting, current read count: 2464525, dump file count: 378 [2022-05-14 21:11:48] Reader StreamLineRead22 is sorting, current read count: 2856393, dump file count: 442 [2022-05-14 21:12:48] Reader StreamLineRead22 is sorting, current read count: 3245697, dump file count: 503 [2022-05-14 21:13:49] Reader StreamLineRead22 is sorting, current read count: 3656371, dump file count: 567 [2022-05-14 21:14:33] Reader StreamLineRead22 sort finish, total dump file count: 610 [2022-05-14 21:14:36] Reader StreamLineRead22 reads records: 287 [2022-05-14 21:14:37] Reader StreamLineRead22 reads records: 19275 [2022-05-14 21:14:38] Reader StreamLineRead22 reads records: 21967 [2022-05-14 21:14:38] Reader StreamLineRead22 reads records: 24569
  5. 問題のあるタスクを特定したら、[ジョブ詳細] タブに移動します。R31_26_27 を右クリックし、[すべて展開] を選択してタスクを展開します。詳細については、「Logview 2.0 を使用してジョブ情報を表示する」をご参照ください。ジョブの DAG では、オペレーター M26_7_10_25_30 は、データが R31_26_27 に流れる前に、R7_6R25_3_30J10_9_22 などの上流オペレーターからデータを集約します。このノードで [すべて展開] を選択して、すべてのインスタンスを表示します。StreamLineRead22 の前にあるオペレーター、つまり StreamLineWriter21 を調べて、データスキューの原因となっているキー (new_uri_path_structurecookie_x5check_useridcookie_userid) を特定します。これにより、データスキューの原因となっている SQL スニペットを特定できます。出力ログには、オペレーターの詳細が表示されます。StreamLineWrite21 (inner_time_ms: 1090, output_count: 3975) は、キーとして new_uri_path_structurecookie_x5check_useridcookie_userid を使用して HASH 分布を行います。下流オペレーター StreamLineRead22 の inner_time_ms は 4759、output_count は 25477 です。これら 2 つのオペレーター間で転送されたデータ量は 162,350,603,775 バイトです。StreamLineWrite21 の詳細を展開すると、その HASH 分布キーとパーティションフィールドが同一であることがわかります: cookie_useridcookie_x5check_useridnew_uri_path_structure。データ分布とパーティショニングのこの関係を分析して、データスキューの根本原因を特定します。

    ハッシュ分散キーを取得した後、これらのキー名を元の SQL クエリの join on 句のフィールドにマッピングし直します。 これにより、Logview オペレーターを SQL スニペットにマッピングできます。 この例では、StreamLineWriter21 のハッシュ分散キーは new_uri_path_structurecookie_x5check_useridcookie_userid です。 cookie 関連のフィールド (cookie_x5check_useridcookie_userid) は SQL join on 句の cookie フィールドに対応し、new_uri_path_structure は path フィールドに対応します。 このマッピングを確立することで、Logview のスキューしたオペレーターから、スキューの原因となっている特定の SQL 結合条件まで追跡でき、的を絞った最適化が可能になります。

データスキューへの対処

データスキューの最も一般的な原因は次のとおりです:

  • join

  • group by

  • COUNT(DISTINCT)

  • ROW_NUMBER (Top-N)

  • 動的パーティション

これらの原因は、頻度の高い順に並んでいます: join > group by > COUNT(DISTINCT) > ROW_NUMBER > dynamic partition

ジョイン

結合操作におけるデータスキューは、大きいテーブルと小さいテーブルの結合、大きいテーブルと中規模のテーブルの結合、またはホットキーが原因のロングテールを扱う場合など、いくつかのシナリオで発生する可能性があります。

  • 大きいテーブルと小さいテーブルの結合

    • データスキューの例

      次の例では、t1 は大きいテーブル、t2t3 は小さいテーブルです。

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • 解決策

      次の例に示すように、MAPJOIN ヒント 構文を使用します。

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • 注意事項

        • 小さいテーブルまたはサブクエリを参照する場合は、そのエイリアスを使用する必要があります。

        • MapJoin では、サブクエリを小さいテーブルとして使用できます。

        • MapJoin では、非等価結合を使用したり、or を使用して複数の条件を接続したりできます。on 句を省略し、mapjoin on 1 = 1 を使用してデカルト積を計算することもできます。 例: select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;。 ただし、この操作はデータ量を大幅に増加させます。

        • MapJoin では、/*+ mapjoin(a,b,c)*/ のように、複数の小さいテーブルをカンマ (,) で区切ります。

        • マップフェーズ中に、MapJoin は指定されたテーブルからすべてのデータをメモリにロードします。 したがって、指定されたテーブルは小さいテーブルである必要があります。 テーブルのインメモリサイズは 512 MB を超えることはできません。 MaxCompute は圧縮ストレージを使用するため、データサイズはメモリにロードされると大幅に増加します。 この 512 MB の制限は、拡張後のサイズに適用されます。 次のパラメーターを設定することで、メモリ制限を最大 8192 MB まで増やすことができます。

          SET odps.sql.mapjoin.memory.max=8192;
      • MapJoin 操作の制限

        • left outer join の場合、左側のテーブルは大きいテーブルでなければなりません。

        • right outer joinでは、右側のテーブルが大きいテーブルである必要があります。

        • full outer join はサポートされていません。

        • inner join の場合、左テーブルと右テーブルのどちらを大きいテーブルにしてもかまいません。

        • MapJoin は最大 128 個の小さいテーブルをサポートしています。 この制限を超えると、構文エラーが発生します。

  • 大きいテーブルと中規模のテーブルの結合

    • データスキューの例

      次の例では、t0 は大きいテーブル、t1 は中規模のテーブルです。

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • 解決策

      次の例に示すように、DISTRIBUTED MAPJOIN 構文を使用してデータスキューを解決します。

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      FROM <viewtable>
          t0
      LEFT JOIN (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          FROM <servicetable>
          WHERE ds = '${today}'
          AND     hh = '${hour}'
      ) t1 ON t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • ホットキーが原因のロングテールがある結合の処理

    • データスキューの例

      次のクエリでは、eleme_uid 列に多くのホットキーが含まれているため、データスキューが発生しやすくなります。

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      ON t1.eleme_uid = t2.eleme_uid;
    • 解決策

      この問題は、次の4つの方法のいずれかで解決できます。

      No.

      方法

      説明

      方法 1

      ホットキーの手動分割

      メインテーブルからホットキーを持つレコードをフィルターして MapJoin で処理し、残りのレコードを通常の結合で処理します。

      方法 2

      SkewJoin パラメーターの設定

      odps.sql.skewjoin=true; を設定します。

      方法 3

      SkewJoin ヒントの使用

      ヒント: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/ を使用します。SkewJoin ヒントを使用すると、スキューキーを見つけるために追加のクエリが実行され、クエリのランタイムが長くなります。 スキューキーがすでにわかっている場合は、SkewJoin パラメーターを設定する方が効率的です。

      方法 4

      乗数テーブルと剰余条件を使用した結合

      乗数テーブルを使用。

      • ホットキーを手動で分割する

        ホットキーを特定し、これらのレコードをメインテーブルからフィルターして MapJoin で処理します。 次に、残りのレコードを通常の結合で処理します。 最後に、UNION ALL を使用して両方の結合の結果を結合します。 コード例は次のとおりです:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        ON t3.eleme_uid = t4.eleme_uid
      • SkewJoin パラメーターを設定する

        これは一般的なアプローチです。set odps.sql.skewjoin=true; コマンドを実行することで、MaxCompute で SkewJoin 機能を有効にできます。 ただし、このコマンドだけではタスクの実行に影響はありません。 この機能を有効にするには、odps.sql.skewinfo パラメーターも設定する必要があります。odps.sql.skewinfo パラメーターは、Join の最適化の詳細を指定するために使用されます。 コマンド構文の例を次に示します:

        SET odps.sql.skewjoin=true;
        SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  --skewed_src はスキューが発生しているテーブル、skewed_value はホットキーです。

        このパラメーターの使用例を次に示します:

        -- 単一の列と単一のスキュー値の場合
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        -- 単一の列と複数のスキュー値の場合
        SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • SkewJoin ヒントを使用する

        SELECT ステートメントで次のヒントを使用します: /*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。この構文では、table_name はスキューが発生しているテーブル、column_name はスキューが発生している列、value はスキューキーです。 いくつかのコード例を次に示します:

        -- 方法 1: テーブル名をヒントとして指定 (テーブルのエイリアスを使用)。
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1;
        -- 方法 2: テーブル名と、スキューの可能性がある列をヒントとして指定。 たとえば、テーブル a の列 c0 と c1 でスキューが発生している場合。
        SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        -- 方法 3: テーブル、列、および特定のスキューキー値をヒントとして指定。 値が STRING 型の場合は、引用符で囲みます。 たとえば、(a.c0=1 かつ a.c1="2") と (a.c0=3 かつ a.c1="4") の値でスキューが発生している場合。
        SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;
        説明

        値を指定する SkewJoin ヒントを使用する方が、ホットキーを手動で分割したり、値を指定せずに SkewJoin パラメーターを設定したりするよりも効率的です。

        SkewJoin ヒントは、次の結合タイプに対応しています。

        • 内部結合の場合、結合のどちらのテーブルにもヒントを適用できます。

        • 左結合、セミ結合、またはアンチ結合の場合、ヒントは左側のテーブルにのみ適用できます。

        • 右結合の場合、ヒントは右側のテーブルにのみ適用できます。

        • 完全結合は SkewJoin ヒントに対応していません。

        このヒントは、集約操作を実行することでオーバーヘッドが発生するため、データスキューが判明している結合にのみ追加してください。

        SkewJoin ヒントを有効にするには、結合の両側の結合キーのデータ型が一致している必要があります。 たとえば、a.c0 の型は b.c0 の型と一致する必要があり、a.c1 の型は b.c1 の型と一致する必要があります。 次の例に示すように、サブクエリで CAST を使用して、型の一貫性を確保できます:

        CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int);
        CREATE TABLE T1(c0 string, c1 int, c2 int);
        -- 方法 1:
        SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1;
        -- 方法 2:
        SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;

        SkewJoin ヒントを使用すると、オプティマイザーは集約を実行して上位 20 個のホットキーを見つけます。 値 20 はデフォルト値であり、set odps.optimizer.skew.join.topk.num = xx; を設定することで変更できます。

        • SkewJoin ヒントは、結合の片側にのみ適用できます。

        • ヒントが指定された結合には、left_key = right_key などの等価結合条件が必要です。 デカルト積結合には対応していません。

        • すでに MapJoin ヒントがある結合に SkewJoin ヒントを追加することはできません。

      • 乗数テーブルと剰余条件で結合する

        この解決策のロジックは、前の3つの解決策とは異なります。 これは分割統治アプローチではなく、代わりに乗数テーブルを使用します。 このテーブルには、1 から N までの値を持つ単一の整数列があり、N はスキューの度合いに基づいて決定できます。 この乗数テーブルは、ユーザーの行動テーブルを N 回拡張するために使用されます。 次に、結合操作では、ユーザーID と number の2つの結合キーを使用します。 このようにして、number 結合条件が追加されるため、ユーザーID のみに基づいてデータをパーティション分割することが原因で発生するデータスキューは、元のレベルの 1/N に減少します。 ただし、このアプローチではデータも N 回拡張されます。

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        ON t1.eleme_uid = t2.eleme_uid
        AND mod(t1.<value_col>,10)+1 = t2.number;

        データセット全体が拡張されるのを避けるために、この拡張をホットキーを持つレコードにのみ適用できます。 まず、ホットキーを特定します。 次に、トラフィックテーブルとユーザー行動テーブルの両方で、eleme_uid_join などの新しい結合キー列を追加します。 ユーザーID がホットキーの場合、concat を使用して、ランダムに割り当てられた 0 以上の整数 (0 から事前定義された乗数、たとえば 1000 まで) と連結します。 それ以外の場合は、元のユーザーID を保持します。 次に、新しい eleme_uid_join 列を結合に使用します。 この方法は、ホットキー以外のレコードの不要なデータ拡張を回避しながら、ホットキーのみを拡張することでスキューを軽減します。 ただし、このアプローチは SQL のビジネスロジックを著しく複雑にするため、通常は推奨しません。

Groupby

次の擬似コード例では、GROUP BY 句を使用しています。

SELECT  shop_id
        ,sum(is_open) AS open_days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;

データ スキューは、次の 3 つの解決策のいずれかで解消できます。

No.

解決策

説明

解決策 1

GROUP BY のデータ スキューを緩和するためのパラメーターを設定する

set odps.sql.groupby.skewindata=true; を設定して、自動スキュー処理を有効にします。

解決策 2

乱数を追加する (ソルティング)

ロングテールキーを分割してワークロードを分散させます。

解決策 3

ロールアップテーブルを作成する

履歴データを事前集計して、日次ジョブでスキャンされるデータを削減します。

  • 解決策 1: GROUP BY のデータ スキューを緩和するためのパラメーターを設定する。

    SET odps.sql.groupby.skewindata=true;
  • 解決策 2: 乱数を追加する。

    この解決策はソルティングとも呼ばれ、GROUP BY のロングテールを効果的に処理します。 この方法では、SQL を書き換えて乱数を追加し、スキューしたキーを分割します。

    Select Key,Count(*) As Cnt From TableName Group By Key; のようなクエリの場合、コンバイナが使用されないと仮定すると、マップノードは COUNT 操作のためにデータをリデュースノードにシャッフルします。 これにより、M->R の実行計画になります。

    ロングテールの原因となるキーを特定した場合、次のようにそのワークロードを再分散できます。

    -- ロングテールの原因となるキーが 'KEY001' であると仮定します
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    この変更後、実行計画は M->R->R になります。 これにより Reduce ステップが追加されますが、ロングテールキーを 2 段階で処理することで、全体のランタイムを短縮できます。 リソース消費とパフォーマンスは解決策 1 と似ています。 ただし、実際のシナリオでは、複数のロングテールキーが存在する場合があります。 これらのキーを特定し、SQL を書き換えるコストを考慮すると、多くの場合、解決策 1 の方が費用対効果が高いです。

  • 解決策 3: ロールアップテーブルを作成する。

    この解決策は、主にコストを削減し、効率を向上させます。 過去 1 年間のデータを取得するという要件はよくあります。 オンラインジョブの場合、ジョブごとに T-1 から T-365 までのすべてのパーティションをスキャンすると、大量のリソースが無駄になります。 ロールアップテーブルを作成することで、1 年間のデータへのアクセスを失うことなく、毎日読み取られるパーティションの数を減らすことができます。 例:

    まず、365 日分のマーチャントデータを集計し、データ更新日をマークすることで、1 回限りの初期化を実行します。 結果は、ここでは m_xxx_365_df というテーブルに保存します。 その後のオンラインジョブでは、T-2 日目のテーブル m_xxx_365_dftable_xxx_di テーブルと結合し、再度 GROUP BY を実行できます。 このアプローチにより、毎日読み取られるパーティションの数が 365 から 2 に減少します。 その結果、shop_id プライマリキーの重複が大幅に減少し、リソース消費が削減されます。

    -- ロールアップテーブルを作成します
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING,
      last_update_ds STRING,
      `365d_open_days` BIGINT
    )
    PARTITIONED BY
    (
      ds STRING COMMENT '日付パーティション'
    )LIFECYCLE 7;
    
    -- 365日の期間が2021年5月1日から2022年5月1日までであると仮定します。まず、1回限りの初期化を実行します。
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS `365d_open_days`
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    
    -- その後のオンラインジョブは次のとおりです:
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      SELECT aa.shop_id, 
             aa.last_update_ds, 
             `365d_open_days` - COALESCE(is_open, 0) AS `365d_open_days` -- 営業日が無期限に蓄積されるのを防ぎます
      FROM (
        SELECT shop_id, 
               max(last_update_ds) AS last_update_ds, 
               sum(`365d_open_days`) AS `365d_open_days`
        FROM (
          SELECT shop_id,
                 ds AS last_update_ds,
                 sum(is_open) AS `365d_open_days`
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          UNION ALL
          SELECT shop_id,
                 last_update_ds,
                 `365d_open_days`
          FROM m_xxx_365_df
          WHERE ds = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        GROUP BY shop_id
      ) AS aa
      LEFT JOIN (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) AS bb
      ON aa.shop_id = bb.shop_id;
    

COUNT(DISTINCT)

テーブル内のデータが次のように分布していると仮定します。

ds (パーティション)

cnt (レコード数)

20220416

73025514

20220415

2292806

20220417

2319160

次のクエリでは、データスキューが発生しやすくなります:

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

解決策は次のとおりです:

No.

解決策

説明

解決策 1

パラメータチューニング

SET odps.sql.groupby.skewindata=true;

解決策 2

汎用的な 2 段階集計

パーティションフィールドの値に乱数を追加します。

解決策 3

2 段階集計に類似したアプローチ

まず 2 つのグループ化フィールド (ds and shop_id) でグループ化し、その後 COUNT(*) を使用します。

  • 解決策 1:パラメータチューニング

    次のパラメータを設定します:

    SET odps.sql.groupby.skewindata=true;
  • 解決策 2:汎用的な 2 段階集計

    shop_id フィールド内のデータ分布が不均一な場合、解決策 1 は効果がない可能性があります。より汎用的な解決策として、パーティションフィールドの値に乱数を追加します。

    -- オプション1: 乱数を追加します。CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    -- オプション2: 乱数列を追加します。ROUND(RAND(),1)*10 AS randint10
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • 解決策 3:2 段階集計に類似したアプローチ

    GROUP BY フィールドのデータが均等に分布している場合は、この最適化を使用します。まず 2 つのグループ化フィールド (ds and shop_id) でグループ化し、その後 COUNT(*) を使用します。

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER() (Top-N)

次の例は、Top-N クエリです。

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

データスキューが発生した場合、次のいずれかの方法を使用して問題を解決できます。

番号

方法

説明

方法 1

SQL で 2段階集約を実行します。

ランダム列 (ソルティングと呼ばれる手法) を追加し、パーティションパラメーターとして使用します。

方法 2

UDAF で 2段階集約を実行します。

最小ヒープを実装した UDAF でクエリを最適化します。

  • 方法 1: SQL による 2段階集約

    Map フェーズでグループ間にデータをより均等に分散させるために、ランダム列を追加してパーティションパラメーターとして使用します。

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- 2. 乱数を直接生成します
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • 方法 2: UDAF による 2段階集約

    純粋な SQL アプローチでは、コードが冗長になり、保守が困難になる可能性があります。より効率的な方法として、min-heap を実装した UDAF を使用してクエリを最適化する方法があります。この UDAF は、iterate フェーズ中に上位 N 件のアイテムのみを取得し、merge フェーズ中に N 個の要素のみをマージします。そのプロセスは次のとおりです。

    • iterate: 最初の K 個の要素をプッシュします。後続の要素については、各要素を最小ヒープのトップと比較し、新しい要素の方が大きい場合はトップの要素を置き換えます。

    • merge: 2 つのヒープをマージし、上位 K 個の要素を保持します。

    • terminate: ヒープを配列として返します。

    • 最終的な SQL クエリでは、配列は個々の行にネスト解除されます。

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

動的パーティション

動的パーティションを使用すると、パーティションテーブルにデータを挿入する際に、特定の値を指定せずにパーティション列を指定できます。この場合、パーティションの値は SELECT 句の対応する列から導出されます。そのため、SQL ステートメントの実行が完了するまで、正確なパーティションは不明です。詳細については、「動的パーティションへのデータの挿入または上書き (DYNAMIC PARTITION)」をご参照ください。次にコード例を示します。

CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);
INSERT overwrite TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue,region
FROM sale_detail;

動的パーティションを持つテーブルでは、データスキューがよく発生します。この問題を解決するには、次の解決策を使用します。

No.

解決策

説明

解決策 1

パラメーターチューニング

設定パラメーターを調整してパフォーマンスを最適化します。

解決策 2

プルーニング最適化

レコード数の多いパーティションを特定し、プルーニングしてから、個別に挿入します。

  • 解決策 1:パラメーターチューニング

    動的パーティションを使用すると、さまざまな基準を満たすデータをさまざまなパーティションにルーティングできます。この機能によりコードが簡素化されるため、特に多数のパーティションを扱う場合に、複数の INSERT OVERWRITE ステートメントは不要になります。ただし、動的パーティションは、多数のスモールファイルを作成する可能性もあります。

    • データスキューの例

      次の簡単な SQL ステートメントを例に説明します。

      INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM  part_test;

      K 個の Map インスタンスと N 個のターゲットパーティションがあると仮定します。

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      最悪のシナリオでは、この操作によって K*N 個のスモールファイルが生成される可能性があります。スモールファイルが多すぎると、ファイルシステムに高い負荷がかかります。この問題に対処するために、MaxCompute は追加の Reduce タスクを追加し、同じターゲットパーティションのデータを 1 つまたは少数の Reduce インスタンスに転送します。これにより、過剰なスモールファイルの生成を防ぎます。この操作は、常にジョブの最後の Reduce タスクです。この機能は MaxCompute でデフォルトで有効になっており、次のパラメーターが true に設定されています。

      SET odps.sql.reshuffle.dynamicpt=true;

      このデフォルト設定は過剰なスモールファイルの問題を解決しますが、データスキューを引き起こす可能性があります。このアプローチは、Reduce ステージを追加することで、より多くのコンピューティングリソースも消費します。したがって、これらのトレードオフのバランスを取る必要があります。

    • 解決策

      set odps.sql.reshuffle.dynamicpt=true; を有効にすると、過剰なスモールファイルの生成を防ぎます。ただし、ターゲットパーティションの数が少ない場合、過剰なスモールファイルが生成されるリスクは低くなります。この場合、デフォルト設定ではコンピューティングリソースが無駄になり、パフォーマンスが低下します。代わりに、set odps.sql.reshuffle.dynamicpt=false; を設定してこの機能を無効にすると、パフォーマンスを大幅に向上させることができます。次にコード例を示します。

      INSERT overwrite TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp)
      SELECT /*+ mapjoin(t2) */
          '20150503' AS ds,
          t1.lv AS lv,
          t1.type AS tp
      FROM
          (SELECT  ...
          FROM tbbi.ads_tb_cornucopia_user_d
          WHERE ds = '20150503'
          AND lv IN ('flat', '3rd')
          AND tp = 'T'
          AND pref_cat2_id > 0
          ) t1
      JOIN
          (SELECT ...
          FROM tbbi.ads_tb_cornucopia_auct_d
          WHERE ds = '20150503'
          AND tp = 'T'
          AND is_all = 'N'
          AND cat2_id > 0
          ) t2
      ON t1.pref_cat2_id = t2.cat2_id;

      デフォルトのパラメーターでは、ジョブの実行に約 1 時間 30 分かかりました。最後の Reduce ステージだけで約 1 時間 20 分かかり、合計ランタイムの約 90% を占めました。追加の Reduce ステージが各 Reduce インスタンスにデータを不均等に分散させたため、ロングテールのデータスキューが発生しました。

    上記の例では、統計分析によると、毎日約 2 つの動的パーティションしか生成されません。したがって、set odps.sql.reshuffle.dynamicpt=false; を設定できます。このパラメーターを false に設定すると、タスクはわずか 9 分で完了します。この簡単な変更により、パフォーマンスが大幅に向上し、コンピューティング時間とコンピューティングリソースが節約され、非常に大きなメリットが得られます。

    この最適化は、大規模で実行時間の長いジョブに限定されません。小規模で迅速なジョブを含め、少数の動적パーティションを使用するジョブであれば、odps.sql.reshuffle.dynamicpt パラメーターを false に設定して、リソースを節約し、パフォーマンスを向上させることができます。

    ランタイムに関係なく、次の 3 つの条件を満たすノードを最適化できます。

    • 動的パーティションを使用している。

    • 動的パーティションの数が 50 以下である。

    • odps.sql.reshuffle.dynamicpt パラメーターが true (デフォルト) に設定されている。

    最後の Fuxi インスタンスのランタイムを使用して、このパラメーターを設定する緊急度を判断できます。 diag_level フィールドは、次のルールに基づいて重大度を示します。

    • Last_Fuxi_Inst_Time が 30 分を超える場合: Diag_Level=4 ('Critical')

    • Last_Fuxi_Inst_Time が 20 分から 30 分の場合: Diag_Level=3 ('High')

    • Last_Fuxi_Inst_Time が 10 分から 20 分の場合: Diag_Level=2 ('Medium')

    • Last_Fuxi_Inst_Time が 10 分未満の場合: Diag_Level=1 ('Low')

  • 解決策 2:プルーニング最適化

    動的パーティション挿入の Map フェーズで発生するデータスキューに対処するには、レコード数の多いパーティションを特定し、プルーニングしてから、個別に挿入します。次の例に示すように、Map フェーズのパラメーターを調整することから始めます。

    SET odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    この結果は、このアプローチが依然としてフルテーブルスキャンを実行することを示しています。システムによって追加された Reduce ジョブを無効にすることで、これをさらに最適化できます。

    SET odps.sql.reshuffle.dynamicpt=false;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    プルーニング最適化は、レコード数の多いパーティションを特定して分離することで、Map フェーズのデータスキューを解決します。手順は次のとおりです。

    1. 次のクエリを実行して、レコード数が最も多いパーティションを見つけます。

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      結果の例を次の表に示します。

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. まず、スキューしたパーティションをフィルターで除外し、残りのデータを挿入します。次に、スキューしたパーティションのデータを別の操作で挿入します。

      -- スキューしたパーティションを除くすべてのデータを挿入
      SET odps.sql.reshuffle.dynamicpt=false;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      -- スキューしたパーティションのデータを挿入
      SET odps.sql.reshuffle.dynamicpt=false;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      -- カウントを確認して結果を検証
      SELECT  ds
        ,hh,COUNT(*) AS cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt desc;