すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:データスキューチューニング

最終更新日:Jan 20, 2025

このトピックでは、MaxComputeを使用するときに発生する一般的なデータスキューの問題について説明し、関連ソリューションを提供します。

MapReduce

データスキューを理解するには、MapReduceに精通している必要があります。 MapReduceは、典型的な分散コンピューティングフレームワークである。 MapReduceは、分割-適用-結合メソッドを使用します。これは、大きな課題を分割したり、難しい課題を扱いやすいサブ課題に分割したりして、各サブ課題の結果を個別に生成し、その結果を最終結果にマージします。 従来の並列プログラミングフレームワークと比較して、MapReduceには、高いフォールトトレランス、使いやすさ、および高いスケーラビリティという利点があります。 MapReduceを使用して並列プログラミングを実行する場合、分散クラスターでのプログラミングに関連する問題のみを考慮する必要があります。 データストレージ、ノード間の情報交換、伝送メカニズムなどの他の問題を考慮する必要はありません。 このように、分散プログラミング操作は単純化される。

次の図は、MapReduceのワークフローを示しています。MapReduce

データスキュー

ほとんどの場合、データスキューの問題はリデューサで発生します。 マッパーは入力ファイルごとにデータを分割します。 通常、データは均等に分割され、ワーカーに配布されます。 ただし、テーブル間のデータ分散が均等でない場合、データを異なるワーカーに均等に分散することはできません。 その結果、データスキューの問題が発生する。 データの分散が不均一な場合、一部のワーカーはすぐにデータ計算を完了しますが、他のワーカーは長時間実行してデータ計算を完了します。 実際の生産では、ほとんどのデータが歪んでおり、これはパレートの原則 (80/20ルール) に沿っています。 例えば、フォーラムのアクティブユーザの20% は、ウェブサイトの投稿の80% を投稿したり、ウェブサイトのページビューを80% したりすることが、ウェブサイト訪問者の20% によって提供される。 データ量が爆発的に増加するビッグデータの時代では、データスキューの問題が分散プログラムの実行効率に大きく影響します。 例えば、ジョブの実行進捗は、データのスキューにより、99% に長時間留まる。

データスキューの問題を特定する

MaxComputeでは、Logviewを使用してデータスキューの問題を特定できます。 次の図は、データスキューの問題を特定する方法を示しています。Identification process

  1. [Fuxiジョブ] ページで、実行レイテンシに基づいて、FuxiジョブのFuxiタスクを降順に並べ替えます。 実行レイテンシが最大のFuxiタスクを選択します。

  2. Fuxiタスク内のFuxiインスタンスを、実行レイテンシに基づいて降順に並べ替えます。 最初のFuxiインスタンスなど、実行レイテンシが平均実行レイテンシよりもはるかに長いFuxiインスタンスを選択し、Fuxiインスタンスをロックしてから、StdOutで出力ログを表示します。

  3. StdOutのログ情報に基づいて、ジョブの有向非巡回グラフ (DAG) を表示します。

  4. ジョブのDAGのキー情報に基づいて、データスキューの問題の原因となるSQLコードスニペットを見つけます。

例:

  1. ジョブの実行ログに基づいてLogviewログを取得します。 詳細については、「エントリポイント」をご参照ください。 Logview

  2. Logview UIに移動し、実行レイテンシに基づいてFuxiタスクを降順で並べ替えます。 データスキューの問題をすばやく特定するために、実行レイテンシが最大のFuxiタスクを選択します。 Fuxi Task

  3. タスクR31_26_27の実行レイテンシが最大です。 次の図に示すように、R31_26_27タスクをクリックして、インスタンスの詳細ページに移動します。 Fuxi instance with the maximum execution latencyレイテンシ: {min:00:00:06, avg: 00:00:00:13, max:00:26:40} は、タスクのすべてのインスタンスの最小実行レイテンシが6秒、平均実行レイテンシが13秒、最大実行レイテンシが26分40秒 (1,600秒) であることを示します。 タスクインスタンスは、レイテンシ (タスク実行レイテンシ) の降順でソートできます。 ソート結果は、4つのインスタンスの実行待ち時間が長いことを示しています。 Fuxiインスタンスの実行レイテンシが平均実行レイテンシ (26秒) の2倍を超える場合、MaxComputeはFuxiインスタンスがロングテールであると見なします。 この例では、21インスタンスの実行レイテンシは26秒より長く、インスタンスはロングテールです。 ロングテールインスタンスは、必ずしもデータスキューの問題が発生したことを示すわけではありません。 avgの値とmaxの値を比較する必要があります。 maxの値がavgの値よりもはるかに大きい場合、インスタンスで重大なデータスキューの問題が発生します。 この問題を管理する必要があります。

  4. 次の図に示すように、インスタンスを見つけ、[StdOut] 列でOutput logをクリックして出力ログを表示します。 Sample output result

  5. データスキューの問題が特定されたら、[ジョブの詳細] タブで [R31_26_27] を右クリックし、[すべて展開] を選択します。 詳細については、「LogView V2.0を使用したジョブ情報の表示」をご参照ください。 データスキューの問題の原因となるキーを特定するには、StreamLineRead22の前にStreamLineWriter21Expand the taskします。 この例では、キーはnew_uri_path_structurecookie_x5check_userid、およびcookie_useridです。 これにより、データスキューの問題の原因となるSQLコードスニペットを見つけることができます。 KEY

データスキューのトラブルシューティングとソリューション

実際の経験から、データスキューの問題は通常、次の理由によって発生します。

  • 参加

  • GROUP BY

  • カウント (DISTINCT)

  • ROW_NUMBER (トップN)

  • 動的パーティショニング

発生頻度順の理由は、join > GROUP BY > COUNT(DISTINCT) > ROW_NUMBER > Dynamic partitioningである。

参加

結合操作によってデータスキューの問題が発生した場合、大きなテーブルと小さなテーブル間の結合、大きなテーブルと中程度のテーブル間の結合、または結合操作中のホットキー値によるロングテールなど、複数のケースが含まれる可能性があります。

  • 大きなテーブルと小さなテーブルの結合。

    • データスキューの例

      次の例では、t1テーブルは大きなテーブルであり、t2およびt3テーブルは小さなテーブルです。

      SELECT  t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN <other_viewtable> t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
    • 解決策

      マップ結合ヒントを使用します。 マップ結合ヒントの詳細については、「MAPJOIN HINT」をご参照ください。 例:

      SELECT  /*+ mapjoin(t2,t3)*/
              t1.ip
              ,t1.is_anon
              ,t1.user_id
              ,t1.user_agent
              ,t1.referer
              ,t2.ssl_ciphers
              ,t3.shop_province_name
              ,t3.shop_city_name
      FROM    <viewtable> t1
      LEFT OUTER JOIN (<other_viewtable>) t2
      ON t1.header_eagleeye_traceid = t2.eagleeye_traceid
      LEFT OUTER JOIN (  SELECT  shop_id
                                  ,city_name AS shop_city_name
                                  ,province_name AS shop_province_name
                          FROM    <tenanttable>
                          WHERE   ds = MAX_PT('<tenanttable>')
                          AND     is_valid = 1
                      ) t3
      ON t1.shopid = t3.shop_id
      • 使用上の注意

        • 小さなテーブルまたはサブクエリを参照する場合は、テーブルまたはサブクエリのエイリアスを参照する必要があります。

        • マップ結合操作は、サブクエリの小さなテーブルをサポートします。

        • マップ結合操作では、非等差結合を使用するか、orを使用して条件を結合できます。 ON句を指定せずに、MAPJOIN ON 1 = 1を使用してデカルト積を計算できます。 たとえば、select /* + mapjoin(a) */ a.id from shop a join table_name b on 1=1; ステートメントを実行できます。 ただし、このステートメントはデータの肥大化の問題を引き起こす可能性があります。

        • map join操作の複数の小さなテーブルは、/* + mapjoin(a,b,c)*/ などのコンマ (,) で区切ります。

        • マップステージでは、MapReduceシステムは、プログラムに対してマップ結合操作を実行して、指定されたテーブル内のすべてのデータをプログラムのメモリにロードします。 map join操作に指定するテーブルは小さなテーブルである必要があり、テーブルデータが占めるメモリの合計は512 MBを超えることはできません。 MaxComputeは圧縮ストレージを使用します。 小テーブルがメモリにロードされた後、小テーブルのデータ量は急激に増加する。 512 MBは、小さなテーブルをメモリにロードした後の最大データ量を示します。 odps.sql.mapjoin.memory.maxパラメーターを大きい値に設定して、メモリサイズを増やすことができます。 パラメータの最大値は8192です。 単位:MB。

          set odps.sql.mapjoin.memory.max=2048;
      • マップ結合操作の制限

        • left OUTER JOIN操作の左側のテーブルは、大きなテーブルである必要があります。

        • right OUTER JOIN操作の右側のテーブルは、大きなテーブルである必要があります。

        • マップ結合操作は、FULL OUTER join操作では使用できません。

        • INNER JOIN操作の左または右のテーブルは、大きなテーブルにすることができます。

        • MaxComputeでは、マップ結合操作に最大128個の小さなテーブルを指定できます。 128を超える小さなテーブルを指定すると、構文エラーが返されます。

  • 大きなテーブルと中テーブルの結合。

    • データスキューの例

      次の例では、t0テーブルは大きなテーブルで、t1テーブルは中テーブルです。

      SELECT  request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      from <viewtable>
         t0
      LEFT join (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          from <servicetable>
          where ds = '${today}'
          AND     hh = '${hour}'
      ) t1 on t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
    • 解決策

      分散MAPJOINを使用します。 構文の詳細については、「DISTRIBUTED MAPJOIN」をご参照ください。 例:

      SELECT  /*+distmapjoin(t1)*/
              request_datetime
              ,host
              ,URI
              ,eagleeye_traceid
      from <viewtable>
         t0
      LEFT join (
          SELECT
          traceid,
          eleme_uid,
          isLogin_is
          from <servicetable>
          where ds = '${today}'
          AND     hh = '${hour}'
      ) t1 on t0.eagleeye_traceid = t1.traceid
      WHERE   ds = '${today}'
      AND     hh = '${hour}'
  • 結合操作中のホットキー値によるロングテール。

    • データスキューの例

      次の例では、eleme_uidフィールドに多数のホットキー値が存在します。 データスキューの問題が発生する可能性があります。

      SELECT
      eleme_uid,
      ...
      FROM (
          SELECT
          eleme_uid,
          ...
          FROM <viewtable>
      )t1
      LEFT JOIN(
          SELECT
          eleme_uid,
          ...
          FROM <customertable>
      )  t2
      on t1.eleme_uid = t2.eleme_uid;
    • 解決策

      次の表に、データスキューの問題を解決するために使用できるソリューションを示します。

      いいえ

      解決策

      説明

      ソリューション 1

      ホットキー値を手動で分割します。

      分析結果に基づいて、プライマリテーブルからホットキー値を除外します。 ホットキー値に対してマップ結合操作を実行し、非ホットキー値に対してマージ結合操作を実行してから、2つの結合操作の結果をマージします。

      ソリューション 2

      skew joinパラメーターを設定します。

      odps.sql.skewjoinをtrueに設定します。

      ソリューション 3

      スキュー結合ヒントを追加します。

      スキュー結合ヒントを次の形式で使用します。/* + skewJoin(<table_name>[(<column1_name>[,<column2_name>,...]))][((<value11 >,< value12>))[,(<value21 >,< value22>)...])]*/ スキュー結合ヒントを使用する場合、スキューキーを取得する操作は1回繰り返されます。 これにより、クエリ期間が延長されます。 skewキーが取得されている場合は、skew joinパラメーターを設定して時間を節約できます。

      ソリューション 4

      複数のテーブルを使用してモジュロ等しい結合を実行します。

      複数のテーブルを使用します。

      • ホットキー値を手動で分割します。

        分析結果に基づいて、プライマリテーブルからホットキー値を除外します。 ホットキー値に対してマップ結合操作を実行し、非ホットキー値に対してマージ結合操作を実行してから、2つの結合操作の結果をマージします。 サンプルコード:

        SELECT
        /*+ MAPJOIN (t2) */
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid = <skewed_value>
        )t1
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid = <skewed_value>
        )  t2
        on t1.eleme_uid = t2.eleme_uid
        UNION ALL
        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
            WHERE eleme_uid != <skewed_value>
        )t3
        LEFT JOIN(
            SELECT
            eleme_uid,
            ...
            FROM <customertable>
            WHERE eleme_uid != <skewed_value>
        )  t4
        on t3.eleme_uid = t4.eleme_uid
      • skew joinパラメーターを設定します。

        これは一般的な解決策です。 MaxComputeでは、odps.sql.skewjoinパラメーターをtrueに設定して、スキュー結合機能を有効にします。 スキュー結合機能のみを有効にした場合、タスクの実行は影響を受けません。 スキュー結合機能を有効にするには、odps.sql.skewinfoパラメーターも設定する必要があります。 odps.sql.skewinfoパラメーターは、結合の最適化情報を指定します。 サンプルコマンド:

        set odps.sql.skewjoin=true;
        set odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")];  -- skewed_src specifies a traffic table, and skewed_value specifies a hot key value.

        例:

        Configure the join optimization information for a single skewed value of a single field.
        set odps.sql.skewinfo=src_skewjoin1:(key)[("0")];
        Configure the join optimization information for multiple skewed values of a single field.
        set odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];
      • スキュー結合ヒントを追加します。

        SELECTステートメントで、次の形式でスキュー結合ヒントを追加して、マップ結合操作を実行します。/* + skewJoin(<table_name>[(<column1_name>[,<column2_name>,...]))][((<value11 >,< value12>))[,(<value21 >,< value22>))...]]) *] / table_nameはスキューされたテーブルの名前を指定し、colume_nameはスキューされた列の名前を指定し、valueはスキューされたキー値を指定します。 例:

        -- Method 1: Add a hint to specify the alias of the table. 
        select /*+ skewjoin(a) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1;
        -- Method 2: Add a hint to specify the name of the table and the names of possibly skewed columns. For example, the following statement shows that Columns c0 and c1 in Table a are skewed columns. 
        select /*+ skewjoin(a(c0, c1)) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
        -- Method 3: Add a hint to specify the name of table and the names of columns and provide the skewed key values. If skewed key values are of the STRING type, enclose each value with double quotation marks ("). In the following statement, (a.c0=1 and a.c1="2") and (a.c0=3 and a.c1="4") contain skewed key values. 
        select /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * from T0 a join T1 b on a.c0 = b.c0 and a.c1 = b.c1 and a.c2 = b.c2;
        説明

        スキュー結合ヒントを使用してスキュー値を指定すると、方法1および方法2 (スキュー値を指定しない) よりも処理効率が高くなります。

        結合操作にスキュー結合ヒントを使用する場合は、次の制限事項に注意してください。

        • INNER JOIN: INNER joinステートメントで、左または右のテーブルにスキュー結合ヒントを指定できます。

        • LEFT JOIN、SEMI JOIN、ANTI JOIN: スキュー結合ヒントは、左側のテーブルに対してのみ指定できます。

        • RIGHT JOIN: スキュー結合ヒントは、正しいテーブルに対してのみ指定できます。

        • FULL JOIN: Skew joinヒントはサポートされていません。

        集計は、スキュー結合ヒントが追加された後に実行され、結合操作が遅くなります。 したがって、データスキューの原因となるjoin文にのみスキュー結合ヒントを追加することをお勧めします。

        左側結合キーのデータ型は、スキュー結合ヒントが追加されるJoinステートメントの右側結合キーのデータ型と同じである必要があります。 データ型が異なる場合、スキュー結合ヒントは無効になります。 上記の例では、a.c0b.c0のデータ型は同じである必要があり、a.c1b.c1のデータ型は同じである必要があります。 データ型の一貫性を確保するために、CAST関数を使用してサブクエリの結合キーのデータ型を変換できます。 サンプル文:

        create table T0(c0 int, c1 int, c2 int, c3 int);
        create table T1(c0 string, c1 int, c2 int);
        -- Method 1:
        select /*+ skewjoin(a) */ * from T0 a join T1 b on cast(a.c0 as string) = cast(b.c0 as string) and a.c1 = b.c1;
        -- Method 2:
        select /*+ skewjoin(b) */ * from (select cast(a.c0 as string) as c00 from T0 a) b join T1 c on b.c00 = c.c0;

        スキュー結合ヒントが追加された後、オプティマイザは集計を実行して最初のN個のホットキー値を取得します。 デフォルトでは、最初の20のホットキー値が取得されます。 set odps.optimizer.skew.join.topk.num = xx; コマンドを実行して、オプティマイザが取得できるホットキー値の数を指定できます。

        • Skew結合ヒントを使用すると、joinステートメントに含まれる左または右のテーブルにのみヒントを追加できます。

        • スキュー結合ヒントが追加されたJOIN文には、left key = right keyを含める必要があります。 CARTESIAN joinステートメントにSkew JOINヒントを追加することはできません。

        • マップ結合ヒントが追加されているjoin文には、スキュー結合ヒントを追加することはできません。

      • 複数のテーブルを使用してモジュロ等しい結合を実行します。

        このソリューションのロジックは、他のソリューションのロジックとは異なります。 このソリューションでは、INT型の1つの列のみを含む複数テーブルが使用されます。 例えば、列は1からNまでの値を含む。具体的な値は、スキューの程度に基づいて決定される。 複数のテーブルを使用して、ユーザー行動テーブルをN回拡張できます。 2つの関連するキー、ユーザIDおよび番号は、結合操作に使用される。 このようにして、データスキューの問題を引き起こすユーザIDベースのデータ配信は、2つの関連キーユーザIDと番号が使用された後に1/Nデータ配信に変更されます。 しかし、この場合、データ量はN倍に増加する。

        SELECT
        eleme_uid,
        ...
        FROM (
            SELECT
            eleme_uid,
            ...
            FROM <viewtable>
        )t1
        LEFT JOIN(
            SELECT
            /*+mapjoin(<multipletable>)*/
            eleme_uid,
            number
            ...
            FROM <customertable>
            JOIN <multipletable>
        )  t2
        on t1.eleme_uid = t2.eleme_uid
        and mod(t1.<value_col>,10)+1 = t2.number;

        上記のデータブロートメカニズムに基づいて、2つのテーブルのホットキー値に対してのみデータブロートを有効にすることができます。 テーブル内の非ホットキー値は変更されません。 ホットキーの値を見つけ、トラフィックテーブルとユーザー動作テーブルを別々に処理します。 eleme_uid_join列をテーブルに追加します。 ユーザーIDがホットキー値の場合、CONCATメソッドを使用してランダムな正の整数を返します。 整数は、0から所定の倍数、例えば0から1000までの範囲である。 戻り値が指定された範囲内にない場合、元のユーザーIDは変更されません。 2つのテーブルを結合すると、eleme_uid_join列が使用されます。 このようにして、複数のホットキー値が肥大化し、データスキューが軽減されます。 非ホットキー値の不必要な膨張は防止される。 ただし、元のビジネスロジックに基づくSQLコードは完全に変更されています。 したがって、このソリューションを使用しないことをお勧めします。

GROUP BY

GROUP BY句が指定されているサンプル疑似コード:

SELECT  shop_id
        ,sum(is_open) AS Business days
FROM    table_xxx_di
WHERE   dt BETWEEN '${bizdate_365}' AND  '${bizdate}'
GROUP BY shop_id;

次の表に、GROUP by句によって発生するデータスキューの問題に対する解決策を示します。

いいえ

解決策

説明

ソリューション 1

GROUP BY句のアンチスキュー機能を有効にするパラメーターを設定します。

set odps.sql.groupby.skewindata=trueを追加します。

ソリューション 2

乱数を追加します。

ロングテールの原因となるキーを分割します。

ソリューション 3

ローリングテーブルを作成します。

このソリューションは、コストの削減と効率の向上に役立ちます。

  • 解決策1: GROUP BY句のアンチスキュー機能を有効にするパラメータを設定します。

    set odps.sql.groupby.skewindata=true;
  • 解決策2: 乱数を追加します。

    ソリューション1と比較して、このソリューションでは乱数を追加してSQL文を書き換えることができます。 このソリューションは、GROUP BY句のロングテールを引き起こすキーを分割するために使用されます。

    Select Key,Count(*) As Cnt From TableName Group By Key; ステートメントの場合、コンバイナは必要ありません。 MapReduceシステムは、出力データをマッパーからリデューサーにシャッフルします。 次に、リデューサは、COUNTオペレーションを実行する。 関連する実行プランはM -> Rです。

    次の例では、ロングテールの原因となるキーが再割り当てされます。

    -- The key that causes a long tail is KEY001.
    SELECT  a.Key
            ,SUM(a.Cnt) AS Cnt
    FROM(SELECT  Key
                ,COUNT(*) AS Cnt
                FROM    <TableName>
                GROUP BY Key
                ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50
                 ELSE 0
                END
            ) a
    GROUP BY a.Key;

    キーが再割り当てされた後、実行計画はM -> R -> Rになります。実行計画にもう1つのステップが追加されますが、ロングテールの原因となるキーは2つのステップを使用して処理されます。 このようにして、実行期間全体を短縮することができる。 リソースと時間の消費は基本的に解決策1と同じです。 ただし、実際のシナリオでは、通常、ロングテールは複数のキーによって発生します。 キーを識別し、SQL文を書き換えるためのコストは高い。 解決策1は、よりコスト効率が高い。

  • ローリングテーブルを作成します。

    このソリューションは、コストを削減し、効率を向上させることを目的としています。 主要な要件は、過去1年間の販売者データを取得することです。 オンラインタスクの場合、T-1からT-365までのすべてのパーティションのデータを読み取る必要があるため、リソースが無駄になります。 ローリングテーブルを作成すると、パーティションから読み取られるデータ量は減少しますが、過去1年間のすべてのデータの取得には影響しません。

    次の例では、365日間のマーチャントビジネスデータが初めて初期化されます。 GROUP BY句は、データを収集するために使用されます。 データが更新された日付は注釈され、データはテーブルaに格納される。 後続のタスクでは、日付T-2のデータを収集するために使用されるテーブルaがテーブルtable_xxx_diに関連付けられ、GROUP BY操作が実行されます。 このようにして、毎日データが読み取られるパーティションの数が365から2に減少します。 プライマリキーのshopidが読み取られる回数は大幅に削減され、リソース消費も削減されます。

    -- Create a rolling table.
    CREATE TABLE IF NOT EXISTS m_xxx_365_df
    (
      shop_id STRING COMMENT,
      last_update_ds COMMENT,
      365d_open_days COMMENT
    )
    PARTITIONED BY
    (
        ds STRING COMMENT 'Date partition'
    )
    LIFECYCLE 7;
    -- Initialize the m_xxx_365_df table that stores the data from May 1, 2021 to May 1, 2022.
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501')
      SELECT shop_id,
             max(ds) as last_update_ds,
             sum(is_open) AS 365d_open_days
      FROM table_xxx_di
      WHERE dt BETWEEN '20210501' AND '20220501'
      GROUP BY shop_id;
    -- Execute the following statement for subsequent tasks:
    INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}')
      select aa.shop_id,
             aa.last_update_ds,
             365d_open_days - COALESCE(is_open, 0) as 365d_open_days -- Infinite rolling storage without specifying business days.
      from (
        select shop_id,
               max(last_update_ds) as last_update_ds,
               sum(365d_open_days) AS 365d_open_days
        from (
          SELECT shop_id,
                 ds as last_update_ds,
                 sum(is_open) AS 365d_open_days
          FROM table_xxx_di
          WHERE ds = '${bizdate}'
          GROUP BY shop_id
          union all
          SELECT shop_id,
                 last_update_ds,
                 365d_open_days
          FROM m_xxx_365_df
          WHERE dt = '${bizdate_2}' and last_update_ds >= '${bizdate_365}'
          GROUP BY shop_id
        )
        group by shop_id
      ) as aa
      left join (
        SELECT shop_id,
               is_open
        FROM table_xxx_di
        WHERE ds = '${bizdate_366}'
      ) as bb
      on aa.shop_id = bb.shop_id;

カウント (DISTINCT)

この例では、テーブルに次のデータが含まれます。

ds (パーティション)

cnt (レコード数)

20220416

73025514

20220415

2292806

20220417

2319160

次の文を使用すると、データスキューの問題が発生する可能性があります。

SELECT  ds
        ,COUNT(DISTINCT shop_id) AS cnt
FROM    demo_data0
GROUP BY ds;

次の表に、データスキューの問題を解決するために使用できるソリューションを示します。

いいえ

解決策

説明

ソリューション 1

パラメーター設定を最適化します。

SET odps.sql.groupby.skewindata=true; を追加します。

ソリューション 2

2ステージの集計操作を実行します。

パーティションフィールドの値を乱数で連結します。

ソリューション 3

この溶液は、溶液2と同様である。

(ds + shop_id) でGROUP BY操作を実行し、COUNT(DISTINCT) 操作を実行します。

  • 解決策1: パラメータ設定を最適化します。

    odps.sql.groupby.skewindataパラメーターをtrueに設定します。

    SET odps.sql.groupby.skewindata=true;
  • 解決策2: 2段階の集計操作を実行する。

    shop_idフィールドのデータが均等に分散されていない場合、ソリューション1を使用してデータスキューの問題を解決することはできません。 パーティションフィールドの値を乱数と連結できます。

    -- Method 1: Execute CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds.
    SELECT  SPLIT_PART(rand_ds, '_',2) ds
            ,COUNT(*) id_cnt
      FROM (
            SELECT  rand_ds
                    ,shop_id
            FROM    demo_data0
            GROUP BY rand_ds,shop_id
            )
    GROUP BY SPLIT_PART(rand_ds, '_',2);
    -- Method 2: Execute ROUND(RAND(),1)*10 AS randint10 to add a random number.
    SELECT  ds
            ,COUNT(*) id_cnt
    FROM    (SELECT  ds
                     ,randint10
                     ,shop_id
               FROM  demo_data0
            GROUP BY ds,randint10,shop_id
            )
    GROUP BY ds;
  • ソリューション3: ソリューション2と同様のソリューションを実装します。

    GROUP BYおよびDISTINCT操作に使用されるフィールドのデータが均等に分散されている場合、2つのグループフィールドdsおよびshop_idに対してGROUP BY操作を実行してから、COUNT(DISTINCT) コマンドを実行できます。

    SELECT  ds
            ,COUNT(*) AS cnt
    FROM(SELECT  ds
                ,shop_id
                FROM    demo_data0
                GROUP BY ds ,shop_id
        )
    GROUP BY ds;

ROW_NUMBER (トップN)

次のコードは、上位10行のデータを返す方法の例を示しています。

SELECT  main_id
        ,type
FROM    (SELECT  main_id
                 ,type
                 ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
            FROM <data_demo2>
        ) A
WHERE   A.rn <= 10;

ROW_NUMBER(TopN) によってデータスキューの問題が発生している場合は、次のいずれかの解決策を使用して問題を解決できます。

いいえ

解決策

説明

ソリューション 1

SQL文を使用して2段階の集計操作を実行します。

ランダムな列を追加するか、パーティションのパラメーターとして乱数を連結します。

ソリューション 2

ユーザー定義集計関数 (UDAF) を使用して、2段階の集計操作を実行します。

最小ヒープのキューにUDAFを使用します。

  • 解決策1: SQL文を使用して2段階の集計操作を実行します。

    [マップ] ステージで、パーティションのパラメーターとしてランダムな列を追加して、パーティションの各グループにデータを均等に分散します。

    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                            FROM (SELECT  main_id
                                          ,type
                                          ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                                     FROM (SELECT  main_id
                                                   ,type
                                                   ,ceil(110 * rand()) % 11 AS src_pt
                                             FROM  data_demo2
                                          )
                                    ) B
                            WHERE   B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
    -- Configure a random number.
    SELECT  main_id
            ,type
      FROM  (SELECT  main_id
                     ,type
                     ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
                FROM (SELECT  main_id
                              ,type
                        FROM(SELECT  main_id
                                     ,type
                                     ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn
                               FROM  (SELECT  main_id
                                              ,type
                                              ,ceil(10 * rand()) AS src_pt
                                              FROM    data_demo2
                                      )
                                    ) B
                            WHERE B.rn <= 10
                        )
            ) A
    WHERE   A.rn <= 10;
  • 解決策2: UDAFを使用して2段階の集計操作を実行します。

    SQLには大量のコードがあり、保守が難しい場合があります。 この例では、最小ヒープのキューはUDAFを使用して最適化されています。 iterateステージでは、データの上位N行のみが取得され、mergeステージではN個の要素のみがマージされます。 プロセス全体の実装:

    • iterate: 最初のK個の要素をプッシュし、K以降の要素を最小ヒープのキュー内の要素と比較してから、最小ヒープのキュー内の要素を交換します。

    • merge: 要素の2つのヒープがマージされた後の最初のK個の要素を返します。

    • terminate: ヒープを配列として返します。

    • SQL文を使用して配列を行に分割します。

    @annotate('* -> array<string>')
    class GetTopN(BaseUDAF):
        def new_buffer(self):
            return [[], None]
        def iterate(self, buffer, order_column_val, k):
            # heapq.heappush(buffer, order_column_val)
            # buffer = [heapq.nlargest(k, buffer), k]
            if not buffer[1]:
                buffer[1] = k
            if len(buffer[0]) < k:
                heapq.heappush(buffer[0], order_column_val)
            else:
                heapq.heappushpop(buffer[0], order_column_val)
        def merge(self, buffer, pbuffer):
            first_buffer, first_k = buffer
            second_buffer, second_k = pbuffer
            k = first_k or second_k
            merged_heap = first_buffer + second_buffer
            merged_heap.sort(reverse=True)
            merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap
            buffer[0] = merged_heap
            buffer[1] = k
        def terminate(self, buffer):
            return buffer[0]
    
    SET odps.sql.python.version=cp37;
    SELECT main_id,type_val FROM (
      SELECT  main_id ,get_topn(type, 10) AS type_array
      FROM data_demo2
      GROUP BY main_id
    )
    LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;

動的パーティショニング

動的パーティショニングは、パーティションテーブルにデータを挿入するときに、パーティション内のパーティションキー列の名前を指定できるようにする構文です。 パーティションキー列の値を指定する必要はありません。 SELECT句でパーティションキー列の値を指定するだけです。 SQL文を実行する前に、生成されるパーティションは非決定論的です。 SQL文が完全に実行された後、パーティションキー列の値に基づいて生成されるパーティションを特定できます。 詳細については、「動的パーティションへのデータの挿入または上書き (dynamic PARTITION) 」をご参照ください。 サンプル文:

create table total_revenues (revenue bigint) partitioned by (region string);

insert overwrite table total_revenues partition(region)
select  total_price as revenue
      ,region
from    sale_detail;

さまざまなシナリオで動的パーティションを含むテーブルを作成すると、データスキューの問題が発生する可能性があります。 データスキューの問題が発生した場合、次のいずれかの解決策を使用して問題を解決できます。

いいえ

解決策

説明

ソリューション 1

パラメーター設定を最適化します。

パラメーター設定を最適化します。

ソリューション 2

パーティションプルーニングを実行します。

このソリューションでは、多数のデータレコードが格納されているパーティションを削除し、データを個別に挿入できます。

  • 解決策1: パラメータ設定を最適化する。

    動的パーティション分割を使用すると、異なる条件を満たすデータを異なるパーティションに配置して、テーブルに対する複数のINSERT OVERWRITE操作を防ぐことができます。 これにより、特に多数のパーティションが利用可能な場合にコードが簡素化されます。 しかしながら、動的パーティショニングは、過剰な数の小さなファイルが生成されることもあり得る。

    • データスキューの例

      この例では、次のSQL文が使用されます。

      insert into table part_test partition(ds) select * from  part_test;

      K個のマップインスタンスとN個の宛先パーティションが存在します。

                                  ds=1
      cfile1                      ds=2
      ...             X           ds=3
      cfilek                      ...
                                  ds=n

      最も極端な場合、K × Nの小さなファイルが生成され、管理が困難である。 この問題に対処するために、MaxComputeでは動的パーティションにレベル1の削減タスクを追加できます。 このように、1つまたは少数のreduceインスタンスを使用して、同じパーティションにデータを書き込みます。 これにより、過剰な数の小さなファイルが生成されるのを防ぎ、最後のreduceタスクを使用してreduce操作を実行することを保証します。 デフォルトでは、odps.sql.reshuffle.dynamicptはtrueに設定され、機能を有効にします。

      set odps.sql.reshuffle.dynamicpt=true;

      この機能を有効にすると、小さなファイルの数が多すぎず、タスクを減らすことができます。 しかしながら、データスキューの問題が発生することがある。 追加のレベル1削減操作は、コンピューティングリソースを消費します。 したがって、この操作を実行するときは注意が必要です。

    • 解決策

      set odps.sql.reshuffle.dynamicpt=true; の設定は、追加のレベル1の削減タスクが使用されるときに、過剰な数の小さなファイルが生成されるのを防ぐために使用されます。 存在する宛先パーティションの数が少ない場合、生成される小さなファイルの数は多くありません。 この構成は、リソースの浪費を引き起こし、性能を低下させる。 この場合、set odps.sql.reshuffle.dynamicpt=false; を設定すると、パフォーマンスが大幅に向上します。 例:

      insert overwrite table ads_tb_cornucopia_pool_d partition (ds, lv, tp)
      select /*+ mapjoin(t2) */
          '20150503' as ds,
          t1.lv as lv,
          t1.type as tp
      from
          (select  ...
          from tbbi.ads_tb_cornucopia_user_d
          where ds = '20150503'
          and lv in ('flat', '3rd')
          and tp = 'T'
          and pref_cat2_id > 0
          ) t1
      join
          (select ...
          from tbbi.ads_tb_cornucopia_auct_d
          where ds = '20150503'
          and tp = 'T'
          and is_all = 'N'
          and cat2_id > 0
          ) t2
      on t1.pref_cat2_id = t2.cat2_id;

      上記のコードで既定のパラメーターを使用する場合、タスク全体の実行時間は約1時間30分です。 最後のReduceタスクの実行時間は約1時間20分で、これは総実行時間の約90% を占めます。 追加のReduceタスクが導入されると、各Reduceインスタンスのデータ分布は特に不均一になります。 その結果、ロングテールが発生します。

    上記の例では、統計情報に基づいて計算された動的パーティションの生成数は、1日あたりに生成される動的パーティションの数が約2であることを示しています。 したがって、set odps.sql.reshuffle.dynamicpt=false; の設定が許可されています。 このようにして、タスクは9分で完了できます。 したがって、set odps.sql.reshuffle.dynamicpt=false; の設定は、パフォーマンスの向上、コンピューティング時間の短縮、コンピューティングリソースの節約、および限界収益の増加に役立ちます。

    set odps.sql.reshuffle.dynamicpt=false; の設定は、実行時間が長くなく、動的パーティションの数が少ない場合にのみリソース消費が高くないタスクにも適しています。 この設定は、リソースの節約とパフォーマンスの向上に役立ちます。

    ソリューション1は、タスク実行期間を考慮せずに、以下のすべての条件が満たされる場合にノードに使用できます。

    • 動的パーティションが使用されます。

    • 動的パーティションの数は50以下です。

    • set odps.sql.reshuffle.dynamicpt=false; の設定は使用されません。

    ノードの優先順位を決定して、最後のFuxiインスタンスの実行期間に基づいてodps.sql.reshuffle.dynamicptパラメーターを設定できます。 diag_levelフィールドは、ノード優先度を指定する。 次のルールに基づいてフィールドを設定できます。

    • Diag_Level=4 ('Severe'): Last_Fuxi_Inst_Timeは30分より長い。

    • Diag_Level=3 ('High'): Last_Fuxi_Inst_Timeの範囲は20分から30分です。

    • Diag_Level=2 ('Medium'): Last_Fuxi_Inst_Timeの範囲は10分から20分です。

    • Diag_Level=1 ('Low'): Last_Fuxi_Inst_Timeは10分未満です。

  • 解決策2: パーティションプルーニングを実行する。

    動的パーティションにデータを挿入するときにマップステージでデータスキューの問題が発生した場合は、多数のレコードが格納されているパーティションを特定し、パーティションプルーニングを実行してから、データを個別に挿入できます。 実際の状況に基づいて、Mapステージのパラメーター設定を変更できます。 次のコードは例を示しています。

    set odps.sql.mapper.split.size=128;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT  *
    FROM    dwd_alsc_ent_shop_info_hi;

    返された結果は、プロセス全体でフルテーブルスキャンが実行されたことを示しています。 データスキューの問題を解決するには、SET odps.sql. reshffle. dynamicpt=false; の設定を使用できます。 サンプル文:

    SET odps.sql.reshuffle.dynamicpt=false ;
    INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
    SELECT *
    FROM dwd_alsc_ent_shop_info_hi;

    動的パーティションにデータを挿入するときにマップステージでデータスキューの問題が発生した場合は、多数のレコードが格納されているパーティションを特定し、パーティションプルーニングを実行してから、データを個別に挿入できます。 以下のステップは、例を示す。

    1. 次のステートメントを実行して, 大量のレコードが格納されているパーティションを照会します。

      SELECT  ds
              ,hh
              ,COUNT(*) AS cnt
      FROM    dwd_alsc_ent_shop_info_hi
      GROUP BY ds
               ,hh
      ORDER BY cnt DESC;

      次の表に、一部のパーティションのデータを示します。

      ds

      hh

      cnt

      20200928

      17

      1052800

      20191017

      17

      1041234

      20210928

      17

      1034332

      20190328

      17

      1000321

      20210504

      1

      19

      20191003

      20

      18

      20200522

      1

      18

      20220504

      1

      18

    2. 次のSQL文を指定してSETコマンドを実行し、照会したパーティションをプルーニングし、各パーティションにデータを個別に挿入します。

      set odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817');
      
      set odps.sql.reshuffle.dynamicpt=false ;
      INSERT OVERWRITE TABLE data_demo3 partition(ds,hh)
      SELECT  *
      FROM    dwd_alsc_ent_shop_info_hi
      WHERE   CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817');
      
      SELECT  ds
        ,hh,COUNT(*) as cnt
       FROM dwd_alsc_ent_shop_info_hi
       GROUP BY ds,hh ORDER BY cnt desc;