このトピックでは、MaxCompute における一般的なデータスキューのシナリオとそのソリューションを説明します。
MapReduce
データスキューを理解するには、まず MapReduce を理解する必要があります。MapReduce は、分割統治法を採用する分散コンピューティングフレームワークです。大規模または複雑な問題を、扱いやすい小さなサブ問題に分割して解決し、それらの結果を結合して最終的な出力を生成します。従来の並列プログラミングフレームワークと比べて、MapReduce には、高いフォールトトレランス、使いやすさ、優れたスケーラビリティなどの利点があります。MapReduce で並列プログラムを実装する場合、データストレージ、ノード間通信、データ転送など、分散クラスターの基盤となる複雑さを管理する必要はありません。これにより、分散プログラミングが大幅に簡素化されます。
次の図は、MapReduce のワークフローを示しています:
データスキュー
データスキューは、Reducer ステージで最も頻繁に発生します。Mapper は通常、入力ファイルを分割することで作業を均等に分散しますが、データスキューとは、異なるワーカー間でデータが不均衡に分散された状態のことです。この不均衡により、一部のワーカーは迅速に処理を完了する一方で、他のワーカーは完了までに大幅に時間がかかり、パフォーマンスボトルネックが発生します。実際には、データは自然に偏る傾向があり、パレートの法則 (「80 対 20 の法則」とも呼ばれます) に従います。例えば、フォーラムのアクティブユーザーの 20% が投稿の 80% を占めたり、Web サイトのトラフィックの 80% がわずか 20% のユーザーからもたらされたりします。ビッグデータ処理では、この問題により分散ジョブのパフォーマンスが著しく低下します。よく見られる症状として、一部の過負荷状態のワーカーがタスクを完了するのを待つ間、ジョブが 99% の進行状態で停止しているように見えることが挙げられます。
データスキューの特定
MaxCompute では、Logview を使用してデータスキューを特定できます。 次の手順に従います:
-
[Fuxi Jobs] で、ジョブをレイテンシーの降順でソートし、ランタイムが最も長いジョブステージを選択します。
-
そのステージの Fuxi インスタンスリストで、インスタンスをレイテンシーの降順でソートします。 ランタイムが平均より著しく長いインスタンスを選択します。 通常、ソートされたリストの最初のインスタンスから始めることができます。 次に、[StdOut] 列でその出力ログを確認します。
-
[StdOut] ログの情報を使用して、対応するジョブ実行グラフを検索して調べます。
-
ジョブ実行グラフのキーとなる情報を使用して、データスキューの原因となっている SQL スニペットを特定します。
次の例では、このプロセスを説明します。
-
タスクの実行ログで Logview の URL を見つけます。 詳細については、「Logview のエントリポイント」をご参照ください。
OK 2022-05-19 15:29:13 start to get jobId: 2022-05-19 15:29:13 get jobid:20220519072913521gdkgw3ys ID = 20220519072913521gdkgw3ys log view: http://logview.alibaba-inc.com/logview/?h=http://service.odps.aliyun-inc.com/api&p=China&i=20220519072913521gdkgw3ys&token=xxx Job Queueing... Job Queueing... Job Queueing... Job Queueing... Job Queueing... Summary: resource cost: cpu 0.57 Core * Min, memory 0.57 GB * Min inputs: xxx/ds=20220519/hh=14: 286 (1166786 bytes) outputs: xxx/ds=20220519/hh=14: 671 (36754 bytes) Job run time: 0.000 Job run mode: service job 2.0 Job run engine: execution engine M1: bubble: 0 instance count: 1 run time: 0.000 instance time: min: 0.000, max: 0.000, avg: 0.000 input records: output records: AdhocSink2: 1 (min: 1, max: 1, avg: 1) metrics_output_count: -
Logview を開きます。タスクを [レイテンシー] で降順に並べ替えて、問題をすばやく特定します。Logview の [Fuxi Jobs] ページで、[Fuxi Task] 列と [レイテンシー] 列を確認し、実行時間が最も長いタスクを特定します。この例では、R31_26_27 のレイテンシーは 00:26:44.379 で、完了率はわずか 99% ですが、他のすべてのタスクは完了しています。
-
タスク
R31_26_27のランタイムが最も長いです。 タスクR31_26_27をクリックして、インスタンス詳細ページを開きます。 Fuxi タスク R31_26_27 のインスタンス詳細では、上部のレイテンシー統計にmin:00:00:06, avg:00:00:13, max:00:26:40と表示されます。 最大値と平均値の間に大きなギャップがある場合は、データスキューを示しています。 [SmartFilter] の [Long-Tails] オプションと [Data-Skew] オプションを使用すると、問題のあるインスタンスをすばやくフィルタリングできます。 ランタイムが最も長いインスタンスのレイテンシーは00:26:40.878です。Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}は、タスク内のすべてのインスタンスについて、最小ランタイムが6 s、平均ランタイムが13 s、最大ランタイムが00:26:40であることを示しています。 インスタンスをLatency(インスタンスランタイム) の降順でソートすると、4 つのインスタンスのランタイムが比較的長いことがわかります。 MaxCompute は、ランタイムが平均ランタイムの 2 倍を超える Fuxi インスタンスをロングテールとして識別します。 これは、ランタイムが26 sを超えるタスクインスタンスがロングテール (Long-Tails) として識別されることを意味します。 この例では、21 個のインスタンスのランタイムが26 sを超えています。 ロングテール インスタンスが存在しても、必ずしもデータスキューを示すわけではありません。 インスタンスのレイテンシーのavg値とmax値を比較する必要もあります。max値がavg値よりはるかに大きいタスクは、データスキューが深刻であると見なされ、最適化が必要です。 -
次の例に示すように、[StdOut] 列の
アイコンをクリックして出力ログを表示します。 ログには、StreamLineRead22 has large amounts of data (about 115GB) to be sorting, may need a long time.という重大な警告が含まれています。 このメッセージは、インスタンスでデータスキューが発生していることを示しています。 このインスタンスの実行ログは次のとおりです。[2022-05-14 21:04:41] ---------- Run Task: R31_26_27#762 ---------- [2022-05-14 21:04:46] Reader StreamLineRead22, Has total data: true, estimated size: 123518312656, record count: 2937076 [2022-05-14 21:04:46] StreamLineRead22 has large amounts of data (about 115GB) to be sorting, may need a long time. [2022-05-14 21:05:48] Reader StreamLineRead22 is sorting, current read count: 398932, dump file count: 54 [2022-05-14 21:06:48] Reader StreamLineRead22 is sorting, current read count: 835951, dump file count: 121 [2022-05-14 21:07:48] Reader StreamLineRead22 is sorting, current read count: 1214555, dump file count: 182 [2022-05-14 21:08:48] Reader StreamLineRead22 is sorting, current read count: 1654164, dump file count: 251 [2022-05-14 21:09:48] Reader StreamLineRead22 is sorting, current read count: 2076167, dump file count: 317 [2022-05-14 21:10:48] Reader StreamLineRead22 is sorting, current read count: 2464525, dump file count: 378 [2022-05-14 21:11:48] Reader StreamLineRead22 is sorting, current read count: 2856393, dump file count: 442 [2022-05-14 21:12:48] Reader StreamLineRead22 is sorting, current read count: 3245697, dump file count: 503 [2022-05-14 21:13:49] Reader StreamLineRead22 is sorting, current read count: 3656371, dump file count: 567 [2022-05-14 21:14:33] Reader StreamLineRead22 sort finish, total dump file count: 610 [2022-05-14 21:14:36] Reader StreamLineRead22 reads records: 287 [2022-05-14 21:14:37] Reader StreamLineRead22 reads records: 19275 [2022-05-14 21:14:38] Reader StreamLineRead22 reads records: 21967 [2022-05-14 21:14:38] Reader StreamLineRead22 reads records: 24569 -
問題のあるタスクを特定したら、[ジョブ詳細] タブに移動します。
R31_26_27を右クリックし、[すべて展開] を選択してタスクを展開します。詳細については、「Logview 2.0 を使用してジョブ情報を表示する」をご参照ください。ジョブの DAG では、オペレーター M26_7_10_25_30 は、データが R31_26_27 に流れる前に、R7_6、R25_3_30、J10_9_22 などの上流オペレーターからデータを集約します。このノードで [すべて展開] を選択して、すべてのインスタンスを表示します。StreamLineRead22の前にあるオペレーター、つまりStreamLineWriter21を調べて、データスキューの原因となっているキー (new_uri_path_structure、cookie_x5check_userid、cookie_userid) を特定します。これにより、データスキューの原因となっている SQL スニペットを特定できます。出力ログには、オペレーターの詳細が表示されます。StreamLineWrite21 (inner_time_ms: 1090, output_count: 3975) は、キーとしてnew_uri_path_structure、cookie_x5check_userid、cookie_useridを使用して HASH 分布を行います。下流オペレーター StreamLineRead22 の inner_time_ms は 4759、output_count は 25477 です。これら 2 つのオペレーター間で転送されたデータ量は 162,350,603,775 バイトです。StreamLineWrite21 の詳細を展開すると、その HASH 分布キーとパーティションフィールドが同一であることがわかります:cookie_userid、cookie_x5check_userid、new_uri_path_structure。データ分布とパーティショニングのこの関係を分析して、データスキューの根本原因を特定します。ハッシュ分散キーを取得した後、これらのキー名を元の SQL クエリの
join on句のフィールドにマッピングし直します。 これにより、Logview オペレーターを SQL スニペットにマッピングできます。 この例では、StreamLineWriter21のハッシュ分散キーはnew_uri_path_structure、cookie_x5check_userid、cookie_useridです。 cookie 関連のフィールド (cookie_x5check_useridとcookie_userid) は SQLjoin on句の cookie フィールドに対応し、new_uri_path_structureは path フィールドに対応します。 このマッピングを確立することで、Logview のスキューしたオペレーターから、スキューの原因となっている特定の SQL 結合条件まで追跡でき、的を絞った最適化が可能になります。
データスキューへの対処
データスキューの最も一般的な原因は次のとおりです:
-
join
-
group by
-
COUNT(DISTINCT)
-
ROW_NUMBER (Top-N)
-
動的パーティション
これらの原因は、頻度の高い順に並んでいます: join > group by > COUNT(DISTINCT) > ROW_NUMBER > dynamic partition。
ジョイン
結合操作におけるデータスキューは、大きいテーブルと小さいテーブルの結合、大きいテーブルと中規模のテーブルの結合、またはホットキーが原因のロングテールを扱う場合など、いくつかのシナリオで発生する可能性があります。
-
大きいテーブルと小さいテーブルの結合
-
データスキューの例
次の例では、
t1は大きいテーブル、t2とt3は小さいテーブルです。SELECT t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN <other_viewtable> t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_id -
解決策
次の例に示すように、MAPJOIN ヒント 構文を使用します。
SELECT /*+ mapjoin(t2,t3)*/ t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN (<other_viewtable>) t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_id-
注意事項
-
小さいテーブルまたはサブクエリを参照する場合は、そのエイリアスを使用する必要があります。
-
MapJoin では、サブクエリを小さいテーブルとして使用できます。
-
MapJoin では、非等価結合を使用したり、
orを使用して複数の条件を接続したりできます。on句を省略し、mapjoin on 1 = 1を使用してデカルト積を計算することもできます。 例:select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;。 ただし、この操作はデータ量を大幅に増加させます。 -
MapJoin では、
/*+ mapjoin(a,b,c)*/のように、複数の小さいテーブルをカンマ (,) で区切ります。 -
マップフェーズ中に、MapJoin は指定されたテーブルからすべてのデータをメモリにロードします。 したがって、指定されたテーブルは小さいテーブルである必要があります。 テーブルのインメモリサイズは 512 MB を超えることはできません。 MaxCompute は圧縮ストレージを使用するため、データサイズはメモリにロードされると大幅に増加します。 この 512 MB の制限は、拡張後のサイズに適用されます。 次のパラメーターを設定することで、メモリ制限を最大 8192 MB まで増やすことができます。
SET odps.sql.mapjoin.memory.max=8192;
-
-
MapJoin 操作の制限
-
left outer joinの場合、左側のテーブルは大きいテーブルでなければなりません。 -
right outer joinでは、右側のテーブルが大きいテーブルである必要があります。 -
full outer joinはサポートされていません。 -
inner joinの場合、左テーブルと右テーブルのどちらを大きいテーブルにしてもかまいません。 -
MapJoin は最大 128 個の小さいテーブルをサポートしています。 この制限を超えると、構文エラーが発生します。
-
-
-
-
大きいテーブルと中規模のテーブルの結合
-
データスキューの例
次の例では、
t0は大きいテーブル、t1は中規模のテーブルです。SELECT request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}' -
解決策
次の例に示すように、DISTRIBUTED MAPJOIN 構文を使用してデータスキューを解決します。
SELECT /*+distmapjoin(t1)*/ request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
-
-
ホットキーが原因のロングテールがある結合の処理
-
データスキューの例
次のクエリでは、
eleme_uid列に多くのホットキーが含まれているため、データスキューが発生しやすくなります。SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> ) t2 ON t1.eleme_uid = t2.eleme_uid; -
解決策
この問題は、次の4つの方法のいずれかで解決できます。
No.
方法
説明
方法 1
ホットキーの手動分割
メインテーブルからホットキーを持つレコードをフィルターして MapJoin で処理し、残りのレコードを通常の結合で処理します。
方法 2
SkewJoin パラメーターの設定
odps.sql.skewjoin=true;を設定します。方法 3
SkewJoin ヒントの使用
ヒント:
/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/を使用します。SkewJoin ヒントを使用すると、スキューキーを見つけるために追加のクエリが実行され、クエリのランタイムが長くなります。 スキューキーがすでにわかっている場合は、SkewJoin パラメーターを設定する方が効率的です。方法 4
乗数テーブルと剰余条件を使用した結合
乗数テーブルを使用。
-
ホットキーを手動で分割する
ホットキーを特定し、これらのレコードをメインテーブルからフィルターして MapJoin で処理します。 次に、残りのレコードを通常の結合で処理します。 最後に、
UNION ALLを使用して両方の結合の結果を結合します。 コード例は次のとおりです:SELECT /*+ MAPJOIN (t2) */ eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid = <skewed_value> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid = <skewed_value> ) t2 ON t1.eleme_uid = t2.eleme_uid UNION ALL SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid != <skewed_value> )t3 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid != <skewed_value> ) t4 ON t3.eleme_uid = t4.eleme_uid -
SkewJoin パラメーターを設定する
これは一般的なアプローチです。
set odps.sql.skewjoin=true;コマンドを実行することで、MaxCompute で SkewJoin 機能を有効にできます。 ただし、このコマンドだけではタスクの実行に影響はありません。 この機能を有効にするには、odps.sql.skewinfoパラメーターも設定する必要があります。odps.sql.skewinfoパラメーターは、Join の最適化の詳細を指定するために使用されます。 コマンド構文の例を次に示します:SET odps.sql.skewjoin=true; SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]; --skewed_src はスキューが発生しているテーブル、skewed_value はホットキーです。このパラメーターの使用例を次に示します:
-- 単一の列と単一のスキュー値の場合 SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")]; -- 単一の列と複数のスキュー値の場合 SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")]; -
SkewJoin ヒントを使用する
SELECTステートメントで次のヒントを使用します:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。この構文では、table_nameはスキューが発生しているテーブル、column_nameはスキューが発生している列、valueはスキューキーです。 いくつかのコード例を次に示します:-- 方法 1: テーブル名をヒントとして指定 (テーブルのエイリアスを使用)。 SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1; -- 方法 2: テーブル名と、スキューの可能性がある列をヒントとして指定。 たとえば、テーブル a の列 c0 と c1 でスキューが発生している場合。 SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2; -- 方法 3: テーブル、列、および特定のスキューキー値をヒントとして指定。 値が STRING 型の場合は、引用符で囲みます。 たとえば、(a.c0=1 かつ a.c1="2") と (a.c0=3 かつ a.c1="4") の値でスキューが発生している場合。 SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;説明値を指定する SkewJoin ヒントを使用する方が、ホットキーを手動で分割したり、値を指定せずに SkewJoin パラメーターを設定したりするよりも効率的です。
SkewJoin ヒントは、次の結合タイプに対応しています。
-
内部結合の場合、結合のどちらのテーブルにもヒントを適用できます。
-
左結合、セミ結合、またはアンチ結合の場合、ヒントは左側のテーブルにのみ適用できます。
-
右結合の場合、ヒントは右側のテーブルにのみ適用できます。
-
完全結合は SkewJoin ヒントに対応していません。
このヒントは、集約操作を実行することでオーバーヘッドが発生するため、データスキューが判明している結合にのみ追加してください。
SkewJoin ヒントを有効にするには、結合の両側の結合キーのデータ型が一致している必要があります。 たとえば、
a.c0の型はb.c0の型と一致する必要があり、a.c1の型はb.c1の型と一致する必要があります。 次の例に示すように、サブクエリでCASTを使用して、型の一貫性を確保できます:CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int); CREATE TABLE T1(c0 string, c1 int, c2 int); -- 方法 1: SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1; -- 方法 2: SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;SkewJoin ヒントを使用すると、オプティマイザーは集約を実行して上位 20 個のホットキーを見つけます。 値
20はデフォルト値であり、set odps.optimizer.skew.join.topk.num = xx;を設定することで変更できます。-
SkewJoin ヒントは、結合の片側にのみ適用できます。
-
ヒントが指定された結合には、
left_key = right_keyなどの等価結合条件が必要です。 デカルト積結合には対応していません。 -
すでに MapJoin ヒントがある結合に SkewJoin ヒントを追加することはできません。
-
-
乗数テーブルと剰余条件で結合する
この解決策のロジックは、前の3つの解決策とは異なります。 これは分割統治アプローチではなく、代わりに乗数テーブルを使用します。 このテーブルには、1 から N までの値を持つ単一の整数列があり、N はスキューの度合いに基づいて決定できます。 この乗数テーブルは、ユーザーの行動テーブルを N 回拡張するために使用されます。 次に、結合操作では、ユーザーID と
numberの2つの結合キーを使用します。 このようにして、number結合条件が追加されるため、ユーザーID のみに基づいてデータをパーティション分割することが原因で発生するデータスキューは、元のレベルの1/Nに減少します。 ただし、このアプローチではデータも N 回拡張されます。SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT /*+mapjoin(<multipletable>)*/ eleme_uid, number ... FROM <customertable> JOIN <multipletable> ) t2 ON t1.eleme_uid = t2.eleme_uid AND mod(t1.<value_col>,10)+1 = t2.number;データセット全体が拡張されるのを避けるために、この拡張をホットキーを持つレコードにのみ適用できます。 まず、ホットキーを特定します。 次に、トラフィックテーブルとユーザー行動テーブルの両方で、
eleme_uid_joinなどの新しい結合キー列を追加します。 ユーザーID がホットキーの場合、concatを使用して、ランダムに割り当てられた 0 以上の整数 (0 から事前定義された乗数、たとえば 1000 まで) と連結します。 それ以外の場合は、元のユーザーID を保持します。 次に、新しいeleme_uid_join列を結合に使用します。 この方法は、ホットキー以外のレコードの不要なデータ拡張を回避しながら、ホットキーのみを拡張することでスキューを軽減します。 ただし、このアプローチは SQL のビジネスロジックを著しく複雑にするため、通常は推奨しません。
-
-
Groupby
次の擬似コード例では、GROUP BY 句を使用しています。
SELECT shop_id
,sum(is_open) AS open_days
FROM table_xxx_di
WHERE dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;
データ スキューは、次の 3 つの解決策のいずれかで解消できます。
|
No. |
解決策 |
説明 |
|
解決策 1 |
GROUP BY のデータ スキューを緩和するためのパラメーターを設定する |
|
|
解決策 2 |
乱数を追加する (ソルティング) |
ロングテールキーを分割してワークロードを分散させます。 |
|
解決策 3 |
ロールアップテーブルを作成する |
履歴データを事前集計して、日次ジョブでスキャンされるデータを削減します。 |
-
解決策 1: GROUP BY のデータ スキューを緩和するためのパラメーターを設定する。
SET odps.sql.groupby.skewindata=true; -
解決策 2: 乱数を追加する。
この解決策はソルティングとも呼ばれ、
GROUP BYのロングテールを効果的に処理します。 この方法では、SQL を書き換えて乱数を追加し、スキューしたキーを分割します。Select Key,Count(*) As Cnt From TableName Group By Key;のようなクエリの場合、コンバイナが使用されないと仮定すると、マップノードは COUNT 操作のためにデータをリデュースノードにシャッフルします。 これにより、M->Rの実行計画になります。ロングテールの原因となるキーを特定した場合、次のようにそのワークロードを再分散できます。
-- ロングテールの原因となるキーが 'KEY001' であると仮定します SELECT a.Key ,SUM(a.Cnt) AS Cnt FROM(SELECT Key ,COUNT(*) AS Cnt FROM <TableName> GROUP BY Key ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50 ELSE 0 END ) a GROUP BY a.Key;この変更後、実行計画は
M->R->Rになります。 これにより Reduce ステップが追加されますが、ロングテールキーを 2 段階で処理することで、全体のランタイムを短縮できます。 リソース消費とパフォーマンスは解決策 1 と似ています。 ただし、実際のシナリオでは、複数のロングテールキーが存在する場合があります。 これらのキーを特定し、SQL を書き換えるコストを考慮すると、多くの場合、解決策 1 の方が費用対効果が高いです。 -
解決策 3: ロールアップテーブルを作成する。
この解決策は、主にコストを削減し、効率を向上させます。 過去 1 年間のデータを取得するという要件はよくあります。 オンラインジョブの場合、ジョブごとに
T-1からT-365までのすべてのパーティションをスキャンすると、大量のリソースが無駄になります。 ロールアップテーブルを作成することで、1 年間のデータへのアクセスを失うことなく、毎日読み取られるパーティションの数を減らすことができます。 例:まず、365 日分のマーチャントデータを集計し、データ更新日をマークすることで、1 回限りの初期化を実行します。 結果は、ここでは
m_xxx_365_dfというテーブルに保存します。 その後のオンラインジョブでは、T-2日目のテーブルm_xxx_365_dfをtable_xxx_diテーブルと結合し、再度GROUP BYを実行できます。 このアプローチにより、毎日読み取られるパーティションの数が 365 から 2 に減少します。 その結果、shop_idプライマリキーの重複が大幅に減少し、リソース消費が削減されます。-- ロールアップテーブルを作成します CREATE TABLE IF NOT EXISTS m_xxx_365_df ( shop_id STRING, last_update_ds STRING, `365d_open_days` BIGINT ) PARTITIONED BY ( ds STRING COMMENT '日付パーティション' )LIFECYCLE 7; -- 365日の期間が2021年5月1日から2022年5月1日までであると仮定します。まず、1回限りの初期化を実行します。 INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501') SELECT shop_id, max(ds) as last_update_ds, sum(is_open) AS `365d_open_days` FROM table_xxx_di WHERE dt BETWEEN '20210501' AND '20220501' GROUP BY shop_id; -- その後のオンラインジョブは次のとおりです: INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}') SELECT aa.shop_id, aa.last_update_ds, `365d_open_days` - COALESCE(is_open, 0) AS `365d_open_days` -- 営業日が無期限に蓄積されるのを防ぎます FROM ( SELECT shop_id, max(last_update_ds) AS last_update_ds, sum(`365d_open_days`) AS `365d_open_days` FROM ( SELECT shop_id, ds AS last_update_ds, sum(is_open) AS `365d_open_days` FROM table_xxx_di WHERE ds = '${bizdate}' GROUP BY shop_id UNION ALL SELECT shop_id, last_update_ds, `365d_open_days` FROM m_xxx_365_df WHERE ds = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}' GROUP BY shop_id ) GROUP BY shop_id ) AS aa LEFT JOIN ( SELECT shop_id, is_open FROM table_xxx_di WHERE ds = '${bizdate_366}' ) AS bb ON aa.shop_id = bb.shop_id;
COUNT(DISTINCT)
テーブル内のデータが次のように分布していると仮定します。
|
ds (パーティション) |
cnt (レコード数) |
|
20220416 |
73025514 |
|
20220415 |
2292806 |
|
20220417 |
2319160 |
次のクエリでは、データスキューが発生しやすくなります:
SELECT ds
,COUNT(DISTINCT shop_id) AS cnt
FROM demo_data0
GROUP BY ds;
解決策は次のとおりです:
|
No. |
解決策 |
説明 |
|
解決策 1 |
パラメータチューニング |
|
|
解決策 2 |
汎用的な 2 段階集計 |
パーティションフィールドの値に乱数を追加します。 |
|
解決策 3 |
2 段階集計に類似したアプローチ |
まず 2 つのグループ化フィールド |
-
解決策 1:パラメータチューニング
次のパラメータを設定します:
SET odps.sql.groupby.skewindata=true; -
解決策 2:汎用的な 2 段階集計
shop_idフィールド内のデータ分布が不均一な場合、解決策 1 は効果がない可能性があります。より汎用的な解決策として、パーティションフィールドの値に乱数を追加します。-- オプション1: 乱数を追加します。CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds SELECT SPLIT_PART(rand_ds, '_',2) ds ,COUNT(*) id_cnt FROM ( SELECT rand_ds ,shop_id FROM demo_data0 GROUP BY rand_ds,shop_id ) GROUP BY SPLIT_PART(rand_ds, '_',2); -- オプション2: 乱数列を追加します。ROUND(RAND(),1)*10 AS randint10 SELECT ds ,COUNT(*) id_cnt FROM (SELECT ds ,randint10 ,shop_id FROM demo_data0 GROUP BY ds,randint10,shop_id ) GROUP BY ds; -
解決策 3:2 段階集計に類似したアプローチ
GROUP BY フィールドのデータが均等に分布している場合は、この最適化を使用します。まず 2 つのグループ化フィールド (ds and shop_id) でグループ化し、その後
COUNT(*)を使用します。SELECT ds ,COUNT(*) AS cnt FROM(SELECT ds ,shop_id FROM demo_data0 GROUP BY ds ,shop_id ) GROUP BY ds;
ROW_NUMBER() (Top-N)
次の例は、Top-N クエリです。
SELECT main_id
,type
FROM (SELECT main_id
,type
,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
FROM <data_demo2>
) A
WHERE A.rn <= 10;
データスキューが発生した場合、次のいずれかの方法を使用して問題を解決できます。
|
番号 |
方法 |
説明 |
|
方法 1 |
SQL で 2段階集約を実行します。 |
ランダム列 (ソルティングと呼ばれる手法) を追加し、パーティションパラメーターとして使用します。 |
|
方法 2 |
UDAF で 2段階集約を実行します。 |
最小ヒープを実装した UDAF でクエリを最適化します。 |
-
方法 1: SQL による 2段階集約
Map フェーズでグループ間にデータをより均等に分散させるために、ランダム列を追加してパーティションパラメーターとして使用します。
SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(110 * rand()) % 11 AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -- 2. 乱数を直接生成します SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM(SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(10 * rand()) AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -
方法 2: UDAF による 2段階集約
純粋な SQL アプローチでは、コードが冗長になり、保守が困難になる可能性があります。より効率的な方法として、min-heap を実装した UDAF を使用してクエリを最適化する方法があります。この UDAF は、
iterateフェーズ中に上位 N 件のアイテムのみを取得し、mergeフェーズ中に N 個の要素のみをマージします。そのプロセスは次のとおりです。-
iterate: 最初の K 個の要素をプッシュします。後続の要素については、各要素を最小ヒープのトップと比較し、新しい要素の方が大きい場合はトップの要素を置き換えます。 -
merge: 2 つのヒープをマージし、上位 K 個の要素を保持します。 -
terminate: ヒープを配列として返します。 -
最終的な SQL クエリでは、配列は個々の行にネスト解除されます。
@annotate('* -> array<string>') class GetTopN(BaseUDAF): def new_buffer(self): return [[], None] def iterate(self, buffer, order_column_val, k): # heapq.heappush(buffer, order_column_val) # buffer = [heapq.nlargest(k, buffer), k] if not buffer[1]: buffer[1] = k if len(buffer[0]) < k: heapq.heappush(buffer[0], order_column_val) else: heapq.heappushpop(buffer[0], order_column_val) def merge(self, buffer, pbuffer): first_buffer, first_k = buffer second_buffer, second_k = pbuffer k = first_k or second_k merged_heap = first_buffer + second_buffer merged_heap.sort(reverse=True) merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap buffer[0] = merged_heap buffer[1] = k def terminate(self, buffer): return buffer[0] SET odps.sql.python.version=cp37; SELECT main_id,type_val FROM ( SELECT main_id ,get_topn(type, 10) AS type_array FROM data_demo2 GROUP BY main_id ) LATERAL VIEW EXPLODE(type_array)type_ar AS type_val; -
動的パーティション
動的パーティションを使用すると、パーティションテーブルにデータを挿入する際に、特定の値を指定せずにパーティション列を指定できます。この場合、パーティションの値は SELECT 句の対応する列から導出されます。そのため、SQL ステートメントの実行が完了するまで、正確なパーティションは不明です。詳細については、「動的パーティションへのデータの挿入または上書き (DYNAMIC PARTITION)」をご参照ください。次にコード例を示します。
CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);
INSERT overwrite TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue,region
FROM sale_detail;
動的パーティションを持つテーブルでは、データスキューがよく発生します。この問題を解決するには、次の解決策を使用します。
|
No. |
解決策 |
説明 |
|
解決策 1 |
パラメーターチューニング |
設定パラメーターを調整してパフォーマンスを最適化します。 |
|
解決策 2 |
プルーニング最適化 |
レコード数の多いパーティションを特定し、プルーニングしてから、個別に挿入します。 |
-
解決策 1:パラメーターチューニング
動的パーティションを使用すると、さまざまな基準を満たすデータをさまざまなパーティションにルーティングできます。この機能によりコードが簡素化されるため、特に多数のパーティションを扱う場合に、複数の
INSERT OVERWRITEステートメントは不要になります。ただし、動的パーティションは、多数のスモールファイルを作成する可能性もあります。-
データスキューの例
次の簡単な SQL ステートメントを例に説明します。
INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM part_test;K 個の Map インスタンスと N 個のターゲットパーティションがあると仮定します。
ds=1 cfile1 ds=2 ... X ds=3 cfilek ... ds=n最悪のシナリオでは、この操作によって
K*N個のスモールファイルが生成される可能性があります。スモールファイルが多すぎると、ファイルシステムに高い負荷がかかります。この問題に対処するために、MaxCompute は追加の Reduce タスクを追加し、同じターゲットパーティションのデータを 1 つまたは少数の Reduce インスタンスに転送します。これにより、過剰なスモールファイルの生成を防ぎます。この操作は、常にジョブの最後の Reduce タスクです。この機能は MaxCompute でデフォルトで有効になっており、次のパラメーターがtrueに設定されています。SET odps.sql.reshuffle.dynamicpt=true;このデフォルト設定は過剰なスモールファイルの問題を解決しますが、データスキューを引き起こす可能性があります。このアプローチは、Reduce ステージを追加することで、より多くのコンピューティングリソースも消費します。したがって、これらのトレードオフのバランスを取る必要があります。
-
解決策
set odps.sql.reshuffle.dynamicpt=true;を有効にすると、過剰なスモールファイルの生成を防ぎます。ただし、ターゲットパーティションの数が少ない場合、過剰なスモールファイルが生成されるリスクは低くなります。この場合、デフォルト設定ではコンピューティングリソースが無駄になり、パフォーマンスが低下します。代わりに、set odps.sql.reshuffle.dynamicpt=false;を設定してこの機能を無効にすると、パフォーマンスを大幅に向上させることができます。次にコード例を示します。INSERT overwrite TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp) SELECT /*+ mapjoin(t2) */ '20150503' AS ds, t1.lv AS lv, t1.type AS tp FROM (SELECT ... FROM tbbi.ads_tb_cornucopia_user_d WHERE ds = '20150503' AND lv IN ('flat', '3rd') AND tp = 'T' AND pref_cat2_id > 0 ) t1 JOIN (SELECT ... FROM tbbi.ads_tb_cornucopia_auct_d WHERE ds = '20150503' AND tp = 'T' AND is_all = 'N' AND cat2_id > 0 ) t2 ON t1.pref_cat2_id = t2.cat2_id;デフォルトのパラメーターでは、ジョブの実行に約 1 時間 30 分かかりました。最後の Reduce ステージだけで約 1 時間 20 分かかり、合計ランタイムの約
90%を占めました。追加の Reduce ステージが各 Reduce インスタンスにデータを不均等に分散させたため、ロングテールのデータスキューが発生しました。
上記の例では、統計分析によると、毎日約 2 つの動的パーティションしか生成されません。したがって、
set odps.sql.reshuffle.dynamicpt=false;を設定できます。このパラメーターをfalseに設定すると、タスクはわずか 9 分で完了します。この簡単な変更により、パフォーマンスが大幅に向上し、コンピューティング時間とコンピューティングリソースが節約され、非常に大きなメリットが得られます。この最適化は、大規模で実行時間の長いジョブに限定されません。小規模で迅速なジョブを含め、少数の動적パーティションを使用するジョブであれば、
odps.sql.reshuffle.dynamicptパラメーターをfalseに設定して、リソースを節約し、パフォーマンスを向上させることができます。ランタイムに関係なく、次の 3 つの条件を満たすノードを最適化できます。
-
動的パーティションを使用している。
-
動的パーティションの数が 50 以下である。
-
odps.sql.reshuffle.dynamicptパラメーターが true (デフォルト) に設定されている。
最後の Fuxi インスタンスのランタイムを使用して、このパラメーターを設定する緊急度を判断できます。
diag_levelフィールドは、次のルールに基づいて重大度を示します。-
Last_Fuxi_Inst_Timeが 30 分を超える場合:Diag_Level=4 ('Critical')。 -
Last_Fuxi_Inst_Timeが 20 分から 30 分の場合:Diag_Level=3 ('High')。 -
Last_Fuxi_Inst_Timeが 10 分から 20 分の場合:Diag_Level=2 ('Medium')。 -
Last_Fuxi_Inst_Timeが 10 分未満の場合:Diag_Level=1 ('Low')。
-
-
解決策 2:プルーニング最適化
動的パーティション挿入の Map フェーズで発生するデータスキューに対処するには、レコード数の多いパーティションを特定し、プルーニングしてから、個別に挿入します。次の例に示すように、Map フェーズのパラメーターを調整することから始めます。
SET odps.sql.mapper.split.size=128; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;この結果は、このアプローチが依然としてフルテーブルスキャンを実行することを示しています。システムによって追加された Reduce ジョブを無効にすることで、これをさらに最適化できます。
SET odps.sql.reshuffle.dynamicpt=false; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;プルーニング最適化は、レコード数の多いパーティションを特定して分離することで、Map フェーズのデータスキューを解決します。手順は次のとおりです。
-
次のクエリを実行して、レコード数が最も多いパーティションを見つけます。
SELECT ds ,hh ,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds ,hh ORDER BY cnt DESC;結果の例を次の表に示します。
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
1
19
20191003
20
18
20200522
1
18
20220504
1
18
-
まず、スキューしたパーティションをフィルターで除外し、残りのデータを挿入します。次に、スキューしたパーティションのデータを別の操作で挿入します。
-- スキューしたパーティションを除くすべてのデータを挿入 SET odps.sql.reshuffle.dynamicpt=false; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817'); -- スキューしたパーティションのデータを挿入 SET odps.sql.reshuffle.dynamicpt=false; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817'); -- カウントを確認して結果を検証 SELECT ds ,hh,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds,hh ORDER BY cnt desc;
-