このトピックでは、MaxCompute における一般的なデータスキューのシナリオと、それに対応するソリューションについて説明します。
MapReduce
データスキューを理解するには、まず MapReduce を理解する必要があります。MapReduce は、分割統治法を用いる分散コンピューティングフレームワークです。この手法は、大規模または複雑な問題を、より小さく管理しやすいサブ問題に分割し、それらを解決した後、結果を最終的なソリューションに統合します。MapReduce は、従来の並列プログラミングフレームワークよりも高いフォールトトレランス、使いやすさ、および優れた拡張性を提供します。MapReduce を使用して並列プログラミングを行う場合、データストレージ、ノード間通信、転送メカニズムなど、分散クラスターにおけるプログラミング以外の問題を管理する必要がありません。この機能により、分散プログラミングが大幅に簡素化されます。
以下の図は、MapReduce のワークフローを示しています。
データスキュー
データスキューは、多くの場合 reduce ステージで発生します。マッパーは通常、入力ファイルに基づいてデータを均等に分割します。データスキューは、テーブル内のデータが異なるワーカー間で不均等に分散される場合に発生します。この不均等な分散により、一部のワーカーは計算を迅速に終了する一方で、他のワーカーははるかに長い時間がかかります。本番環境では、ほとんどのデータは偏っており、多くの場合 80/20 ルールに従います。例えば、フォーラムのアクティブユーザーの 20% が投稿の 80% を寄稿したり、ウェブサイトのトラフィックの 80% を 20% のユーザーが生成したりします。ビッグデータの時代では、データ量の爆発的な増加に伴い、データスキューは分散プログラムのパフォーマンスに深刻な影響を与える可能性があります。一般的な症状として、ジョブの実行進捗が 99% でスタックすることが挙げられます。
データスキューの特定方法
MaxCompute では、Logview を使用してデータスキューを特定できます。以下の手順でそのプロセスを説明します。
Fuxi ジョブで、ジョブをレイテンシーの降順でソートし、実行時間が最も長いジョブステージを選択します。
Fuxi ステージの Fuxi インスタンスリストで、タスクをレイテンシーの降順でソートします。平均よりも著しく実行時間が長いタスクを選択します。通常、リストの最初のタスクに注目できます。その StdOut ログを表示します。
StdOut の情報を使用して、対応するジョブ実行グラフを表示します。
ジョブ実行グラフのキー情報を使用して、データスキューを引き起こしている SQL コードスニペットを特定します。
次の例は、このメソッドの使用方法を示しています。
タスクの操作ログから Logview ログを見つけます。詳細については、「Logview のエントリーポイント」をご参照ください。

問題を迅速に特定するために、Logview インターフェイスに移動し、Fuxi タスクを [レイテンシー] の降順でソートし、実行時間が最も長いタスクを選択します。

タスク
R31_26_27の実行時間が最も長いです。タスクR31_26_27をクリックして、次の図に示すようにインスタンス詳細ページを開きます。
値 Latency: {min:00:00:06, avg:00:00:13, max:00:26:40}は、このタスクのすべてのインスタンスについて、最小実行時間が6 秒、平均実行時間が13 秒、最大実行時間が26 分 40 秒であることを示します。Latency(インスタンス実行時間) で降順にソートできます。実行時間が長い 4 つのインスタンスが表示されます。MaxCompute は、Fuxi インスタンスの実行時間が平均の 2 倍以上である場合、それをロングテールと見なします。これは、実行時間が26 秒を超えるタスクインスタンスはロングテールと見なされることを意味します。この場合、21 個のインスタンスの実行時間が26 秒を超えています。ロングテールインスタンスの存在が常にタスクスキューを示すわけではありません。インスタンス実行時間のavgとmaxの値も比較する必要があります。max値がavg値よりはるかに大きい場合、深刻なデータスキューを示しています。このタスクには管理が必要です。[標準出力] 列の
アイコンをクリックすると、次の図に示すように出力ログが表示されます。
問題を特定したら、[ジョブ詳細] タブに移動します。
R31_26_27を右クリックし、[すべて展開] を選択してタスクを展開します。詳細については、「Logview 2.0 を使用してジョブ情報を表示する」をご参照ください。
StreamLineRead22の前のステップであるStreamLineWriter21を調べます。このステップにより、データスキューを引き起こしているキーが明らかになります:new_uri_path_structure、cookie_x5check_userid、およびcookie_userid。この情報を使用して、データスキューを引き起こしている SQL スニペットを見つけます。
データスキューのトラブルシューティングとソリューション
データスキューの最も一般的な原因は次のとおりです:
Join
GroupBy
Count (Distinct)
ROW_NUMBER (TopN)
動的パーティション
発生頻度は、最も一般的なものから順に次のようになります:JOIN > GroupBy > Count(Distinct) > ROW_NUMBER > 動的パーティション。
Join
JOIN 操作によるデータスキューは、大きいテーブルと小さいテーブルの JOIN、大きいテーブルと中規模テーブルの JOIN、またはホットキー値がロングテールを引き起こす場合など、さまざまな状況で発生する可能性があります。
大きいテーブルと小さいテーブルの JOIN。
データスキューの例
次の例では、
t1は大きいテーブルで、t2とt3は小さいテーブルです。SELECT t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN <other_viewtable> t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_idソリューション
次の例に示すように、MAPJOIN ヒント構文を使用します。
SELECT /*+ mapjoin(t2,t3)*/ t1.ip ,t1.is_anon ,t1.user_id ,t1.user_agent ,t1.referer ,t2.ssl_ciphers ,t3.shop_province_name ,t3.shop_city_name FROM <viewtable> t1 LEFT OUTER JOIN (<other_viewtable>) t2 ON t1.header_eagleeye_traceid = t2.eagleeye_traceid LEFT OUTER JOIN ( SELECT shop_id ,city_name AS shop_city_name ,province_name AS shop_province_name FROM <tenanttable> WHERE ds = MAX_PT('<tenanttable>') AND is_valid = 1 ) t3 ON t1.shopid = t3.shop_id注意事項
小さいテーブルまたはサブクエリを参照する場合は、そのエイリアスを使用する必要があります。
MapJoin は、サブクエリを小さいテーブルとして使用することをサポートしています。
MapJoin では、非等値結合を使用したり、
ORで複数の条件を接続したりできます。ON句を省略し、mapjoin on 1 = 1を使用してデカルト積を計算することもできます。例:select /*+ mapjoin(a) */ a.id from shop a join table_name b on 1=1;。ただし、この操作はデータ膨張を引き起こす可能性があります。MapJoin では、複数の小さいテーブルをカンマ (
,) で区切ります。例:/*+ mapjoin(a,b,c)*/。map ステージ中、MapJoin は指定されたテーブルからすべてのデータをメモリにロードします。したがって、指定されたテーブルは小さい必要があります。ロードされた後のテーブルが占有する合計メモリは 512 MB を超えてはなりません。MaxCompute は圧縮ストレージを使用するため、小さいテーブルのデータサイズはメモリにロードされた後に大幅に拡大します。512 MB の制限は、データがロードされた後のサイズに適用されます。次のパラメーターを設定することで、この制限を最大 8192 MB まで増やすことができます。
SET odps.sql.mapjoin.memory.max=2048;
MapJoin 操作の制限事項
LEFT OUTER JOINでは、左側のテーブルが大きいテーブルである必要があります。RIGHT OUTER JOINでは、右側のテーブルが大きいテーブルである必要があります。FULL OUTER JOINはサポートされていません。INNER JOINでは、左側または右側のテーブルのいずれかが大きいテーブルであってもかまいません。MapJoin は最大 128 個の小さいテーブルをサポートします。それ以上指定すると構文エラーが発生します。
大きいテーブルと中規模テーブルの JOIN。
データスキューの例
次の例では、
t0は大きいテーブルで、t1は中規模テーブルです。SELECT request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'ソリューション
次の例に示すように、DISTRIBUTED MAPJOIN ヒントを使用してデータスキューを解決します。
SELECT /*+distmapjoin(t1)*/ request_datetime ,host ,URI ,eagleeye_traceid FROM <viewtable> t0 LEFT JOIN ( SELECT traceid, eleme_uid, isLogin_is FROM <servicetable> WHERE ds = '${today}' AND hh = '${hour}' ) t1 ON t0.eagleeye_traceid = t1.traceid WHERE ds = '${today}' AND hh = '${hour}'
JOIN におけるホットキー値によるロングテール。
データスキューの例
次の例では、
eleme_uidフィールドに多くのホットキー値が含まれており、データスキューを引き起こす可能性があります。SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> ) t2 ON t1.eleme_uid = t2.eleme_uid;ソリューション
この問題を解決するには、次の 4 つの方法のいずれかを使用できます。
No.
ソリューション
説明
ソリューション 1
手動でのホットバリュー分割
ホットキー値を分析して特定します。プライマリテーブルからホットキー値を持つレコードをフィルターします。まず、それらに対して MapJoin を実行します。次に、ホットキー値以外のレコードに対して MergeJoin を実行します。最後に、2 つの JOIN の結果をマージします。
ソリューション 2
SkewJoin パラメーターの設定
set odps.sql.skewjoin=true;。ソリューション 3
SkewJoin ヒント
ヒントを使用します:
/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。SkewJoin ヒントメソッドは、スキューキーを見つけるための追加ステップを追加するため、クエリの実行時間が増加します。スキューキーが既にわかっている場合は、SkewJoin パラメーターを設定して時間を節約できます。ソリューション 4
乗数テーブルを使用したモジュロ等価結合の実行
乗数テーブルを使用します。
手動でのホットキー値の分割
このメソッドは、ホットキー値を分析して特定することを含みます。まず、プライマリテーブルからホットキー値を含むレコードをフィルターし、それらに対して MapJoin を実行します。次に、ホットキー値を含まない残りのレコードに対して MergeJoin を実行します。最後に、2 つの JOIN の結果をマージします。次のコードは例を示しています:
SELECT /*+ MAPJOIN (t2) */ eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid = <skewed_value> )t1 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid = <skewed_value> ) t2 ON t1.eleme_uid = t2.eleme_uid UNION ALL SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> WHERE eleme_uid != <skewed_value> )t3 LEFT JOIN( SELECT eleme_uid, ... FROM <customertable> WHERE eleme_uid != <skewed_value> ) t4 ON t3.eleme_uid = t4.eleme_uidSkewJoin パラメーターの設定
これは一般的なソリューションです。MaxCompute は、SkewJoin 機能を有効にするための
set odps.sql.skewjoin=true;パラメーターを提供します。ただし、単に SkewJoin を有効にしてもタスクの実行には影響しません。この機能が有効になるには、odps.sql.skewinfoパラメーターも設定する必要があります。odps.sql.skewinfoパラメーターは、JOIN 最適化の詳細を指定します。以下はコマンド構文の例です。SET odps.sql.skewjoin=true; SET odps.sql.skewinfo=skewed_src:(skewed_key)[("skewed_value")]; --skewed_src はトラフィックテーブル、skewed_value はホットキー値です。次の例は、コマンドの使用方法を示しています:
--単一フィールドの単一スキュー値の場合 SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")]; --単一フィールドの複数スキュー値の場合 SET odps.sql.skewinfo=src_skewjoin1:(key)[("0")("1")];SkewJoin ヒント
SELECT文で MapJoin を実行するには、次のヒントを使用します:/*+ skewJoin(<table_name>[(<column1_name>[,<column2_name>,...])][((<value11>,<value12>)[,(<value21>,<value22>)...])]*/。このヒントでは、table_nameはスキューテーブル、column_nameはスキューカラム、valueはスキューキー値です。次の例は、このヒントの使用方法を示しています。--方法 1:テーブル名をヒントします。テーブルのエイリアスをヒントすることに注意してください。 SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1; --方法 2:テーブル名と、スキューしている可能性があると思われるカラムをヒントします。例えば、テーブル a のカラム c0 と c1 にデータスキューがあります。 SELECT /*+ skewjoin(a(c0, c1)) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2; --方法 3:テーブル名とカラムをヒントし、スキューキー値を指定します。型が STRING の場合は、値を引用符で囲みます。例えば、(a.c0=1 and a.c1="2") と (a.c0=3 and a.c1="4") の両方の値にデータスキューがあります。 SELECT /*+ skewjoin(a(c0, c1)((1, "2"), (3, "4"))) */ * FROM T0 a JOIN T1 b ON a.c0 = b.c0 AND a.c1 = b.c1 AND a.c2 = b.c2;説明値を直接指定する SkewJoin ヒントメソッドは、手動でホットキー値を分割したり、値を指定せずに SkewJoin パラメーターを設定したりするよりも効率的です。
SkewJoin ヒントでサポートされる JOIN タイプ:
Inner Join:JOIN のどちら側にもヒントできます。
Left Join、Semi Join、Anti Join:左側のテーブルにのみヒントできます。
Right Join:右側のテーブルにのみヒントできます。
Full Join:Skew Join ヒントはサポートされていません。
ヒントは集約操作を実行するためコストがかかるため、データスキューが確実にある JOIN にのみヒントを追加してください。
ヒントされた JOIN では、左の結合キーのデータの型が右の結合キーのデータの型と一致する必要があります。そうでない場合、SkewJoin ヒントは機能しません。例えば、前の例では、
a.c0の型はb.c0の型と一致する必要があり、a.c1の型はb.c1の型と一致する必要があります。サブクエリで CAST を使用して、結合キーの型が一致するようにできます。次の例は、その方法を示しています:CREATE TABLE T0(c0 int, c1 int, c2 int, c3 int); CREATE TABLE T1(c0 string, c1 int, c2 int); --方法 1: SELECT /*+ skewjoin(a) */ * FROM T0 a JOIN T1 b ON cast(a.c0 AS string) = cast(b.c0 AS string) AND a.c1 = b.c1; --方法 2: SELECT /*+ skewjoin(b) */ * FROM (SELECT cast(a.c0 AS string) AS c00 FROM T0 a) b JOIN T1 c ON b.c00 = c.c0;SkewJoin ヒントを追加すると、オプティマイザーは集約操作を実行して上位 20 個のホットキー値を取得します。値
20はデフォルトです。この値はset odps.optimizer.skew.join.topk.num = xx;を実行することで変更できます。SkewJoin ヒントは、JOIN の片側のみをヒントすることをサポートします。
ヒントされた JOIN は
left key = right key条件を持つ必要があり、デカルト積 JOIN はサポートしません。既に MapJoin ヒントがある JOIN に SkewJoin ヒントを追加することはできません。
乗数テーブルを使用したモジュロ等価結合の実行
このソリューションのロジックは、前の 3 つとは異なります。これは分割統治アプローチではありません。代わりに、INT データ型の単一カラムを持つ乗数テーブルを使用します。値は 1 から N の範囲で、N はデータスキューの度合いによって決まります。この乗数テーブルは、ユーザー行動テーブルを N 倍に拡張するために使用されます。その後、JOIN を実行するときに、ユーザー ID と
numberの両方を関連付けキーとして使用できます。元々ユーザー ID のみでデータを分散させることによって引き起こされていたデータスキューは、number条件が追加されたことにより、元のレベルの1/Nに減少します。ただし、この方法ではデータも N 倍に膨張します。SELECT eleme_uid, ... FROM ( SELECT eleme_uid, ... FROM <viewtable> )t1 LEFT JOIN( SELECT /*+mapjoin(<multipletable>)*/ eleme_uid, number ... FROM <customertable> JOIN <multipletable> ) t2 ON t1.eleme_uid = t2.eleme_uid AND mod(t1.<value_col>,10)+1 = t2.number;データ膨張の問題に対処するために、両方のテーブルでホットキー値を持つレコードのみに拡張を制限できます。ホットキー値以外のレコードは変更されません。まず、ホットキー値を持つレコードを見つけます。次に、
eleme_uid_joinという名前の新しいカラムを追加して、トラフィックテーブルとユーザー行動テーブルを別々に処理します。ユーザー ID がホットキー値の場合、CONCATを使用してランダムに割り当てられた正の整数 (例えば 0 から 1,000) を追加します。ホットキー値でない場合は、元のユーザー ID を保持します。2 つのテーブルを JOIN する際には、eleme_uid_joinカラムを使用します。この方法は、ホットキー値の乗数を増やしてスキューを減らし、ホットキー値以外のレコードの不要なデータ膨張を回避します。ただし、このロジックは元のビジネスロジック SQL を完全に変更するため、推奨されません。
GroupBy
次の例は、GroupBy 句を含む擬似コードを示しています。
SELECT shop_id
,sum(is_open) AS business_days
FROM table_xxx_di
WHERE dt BETWEEN '${bizdate_365}' AND '${bizdate}'
GROUP BY shop_id;データスキューが発生した場合、次の 3 つのソリューションのいずれかを使用して解決できます:
No. | ソリューション | 説明 |
ソリューション 1 | Group By のスキュー対策パラメーターの設定 |
|
ソリューション 2 | 乱数の追加 | ロングテールを引き起こしているキーを分割します。 |
ソリューション 3 | ローリングテーブルの作成 | コストを削減し、効率を向上させます。 |
ソリューション 1:`GROUP BY` のスキュー対策パラメーターの設定
SET odps.sql.groupby.skewindata=true;ソリューション 2:乱数の追加
ソリューション 1 とは異なり、このソリューションでは SQL 文を再書き込みする必要があります。ロングテールを引き起こすキーを分割するために乱数を追加することは、`GROUP BY` のロングテールを解決する効果的な方法です。
SQL 文
SELECT Key, COUNT(*) AS Cnt FROM TableName GROUP BY Key;について、コンバイナが使用されない場合、map ノードはデータを reduce ノードにシャッフルします。reduce ノードはその後 `COUNT` 操作を実行します。対応する実行計画はM->Rです。ロングテールを引き起こすキーを見つけた場合、そのキーの作業を次のように再分散できます:
-- ロングテールを引き起こすキーが KEY001 であると仮定します。 SELECT a.Key ,SUM(a.Cnt) AS Cnt FROM(SELECT Key ,COUNT(*) AS Cnt FROM <TableName> GROUP BY Key ,CASE WHEN KEY = 'KEY001' THEN Hash(Random()) % 50 ELSE 0 END ) a GROUP BY a.Key;変更後、実行計画は
M->R->Rになります。実行ステップ数は増えますが、ロングテールキーは 2 段階で処理されるため、全体の実行時間を短縮できます。リソース消費と時間効率はソリューション 1 と似ています。しかし、実際のシナリオでは、ロングテールは複数のキーによって引き起こされることが多いため、ロングテールキーを見つけて SQL 文を再書き込みするコストを考慮すると、ソリューション 1 の方がコスト効率が高いです。ローリングテーブルの作成
このソリューションの核心は、コストを削減し、効率を向上させることです。主な要件は、過去 1 年間のマーチャントデータを取得することです。オンラインタスクでは、毎回
T-1からT-365までのすべてのパーティションを読み取ることは、かなりのリソースを浪費します。ローリングテーブルを作成することで、過去 1 年間のデータの取得に影響を与えることなく、読み取るパーティションの数を減らすことができます。次の例は、その方法を示しています。まず、Group By を使用して集約された 365 日分のマーチャントビジネスデータを初期化します。データ更新日をマークし、データを
aという名前のテーブルに保存します。その後のオンラインタスクでは、T-2日目のテーブルaをtable_xxx_diテーブルと JOIN し、その後 Group By を使用できます。これにより、毎日読み取られるパーティションの数が 365 から 2 に減少します。プライマリキーshopidの重複が大幅に減少し、リソース消費も削減されます。-- ローリングテーブルの作成 CREATE TABLE IF NOT EXISTS m_xxx_365_df ( shop_id STRING COMMENT, last_update_ds COMMENT, 365d_open_days COMMENT ) PARTITIONED BY ( ds STRING COMMENT 'Date partition' )LIFECYCLE 7; -- 365d が 2021 年 5 月 1 日から 2022 年 5 月 1 日までと仮定し、まずテーブルを初期化します。 INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '20220501') SELECT shop_id, max(ds) as last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE dt BETWEEN '20210501' AND '20220501' GROUP BY shop_id; -- 次に、実行されるオンラインタスクは次のとおりです。 INSERT OVERWRITE TABLE m_xxx_365_df PARTITION(ds = '${bizdate}') SELECT aa.shop_id, aa.last_update_ds, 365d_open_days - COALESCE(is_open, 0) AS 365d_open_days -- 営業日の無限ローリングを排除します。 FROM ( SELECT shop_id, max(last_update_ds) AS last_update_ds, sum(365d_open_days) AS 365d_open_days FROM ( SELECT shop_id, ds AS last_update_ds, sum(is_open) AS 365d_open_days FROM table_xxx_di WHERE ds = '${bizdate}' GROUP BY shop_id UNION ALL SELECT shop_id, last_update_ds, 365d_open_days FROM m_xxx_365_df WHERE dt = '${bizdate_2}' AND last_update_ds >= '${bizdate_365}' GROUP BY shop_id ) GROUP BY shop_id ) AS aa LEFT JOIN ( SELECT shop_id, is_open FROM table_xxx_di WHERE ds = '${bizdate_366}' ) AS bb ON aa.shop_id = bb.shop_id;
Count (Distinct)
あるテーブルに次のデータ分布があるとします。
ds (パーティション) | cnt (レコード数) |
20220416 | 73025514 |
20220415 | 2292806 |
20220417 | 2319160 |
次の文を使用すると、データスキューが発生する可能性があります:
SELECT ds
,COUNT(DISTINCT shop_id) AS cnt
FROM demo_data0
GROUP BY ds;次のソリューションが利用可能です:
No. | ソリューション | 説明 |
ソリューション 1 | パラメーター設定の最適化 |
|
ソリューション 2 | 一般的な2段階集約 | パーティションフィールド値に乱数を連結します。 |
ソリューション 3 | 2段階集約に類似 | まず、2つのフィールド |
ソリューション 1:パラメーター設定による最適化
次のパラメーターを設定します。
SET odps.sql.groupby.skewindata=true;ソリューション 2:一般的な2段階集約の使用
shop_idフィールドのデータが均一でない場合、ソリューション 1 での最適化はできません。より一般的な方法は、パーティションフィールドの値に乱数を連結することです。-- 方法 1:乱数を連結します。CONCAT(ROUND(RAND(),1)*10,'_', ds) AS rand_ds SELECT SPLIT_PART(rand_ds, '_',2) ds ,COUNT(*) id_cnt FROM ( SELECT rand_ds ,shop_id FROM demo_data0 GROUP BY rand_ds,shop_id ) GROUP BY SPLIT_PART(rand_ds, '_',2); -- 方法 2:乱数フィールドを追加します。ROUND(RAND(),1)*10 AS randint10 SELECT ds ,COUNT(*) id_cnt FROM (SELECT ds ,randint10 ,shop_id FROM demo_data0 GROUP BY ds,randint10,shop_id ) GROUP BY ds;ソリューション 3:2段階集約に類似した方法の使用
GroupBy フィールドと Distinct フィールドの両方のデータが均一である場合、まず 2 つのフィールド (`ds` と `shop_id`) でグループ化し、次に
count(distinct)コマンドを使用することでクエリを最適化できます。SELECT ds ,COUNT(*) AS cnt FROM(SELECT ds ,shop_id FROM demo_data0 GROUP BY ds ,shop_id ) GROUP BY ds;
ROW_NUMBER (TopN)
次の例は、上位 10 件を示しています。
SELECT main_id
,type
FROM (SELECT main_id
,type
,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn
FROM <data_demo2>
) A
WHERE A.rn <= 10;データスキューが発生した場合、次のいずれかの方法で解決できます:
No. | ソリューション | 説明 |
ソリューション 1 | SQL を使用した2段階集約 | ランダムなカラムを追加するか、乱数を連結してパーティションのパラメーターとして使用します。 |
ソリューション 2 | ユーザー定義の集計関数 (UDAF) を使用した2段階集約 | 最小ヒープ優先度付きキューを持つ UDAF を使用して最適化します。 |
ソリューション 1:SQL を使用した2段階集約
map ステージで各パーティショングループのデータをできるだけ均一にするために、ランダムなカラムを追加し、それをパーティションのパラメーターとして使用できます。
SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(110 * rand()) % 11 AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10; -- 2. 乱数をカスタマイズします。 SELECT main_id ,type FROM (SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id ORDER BY type DESC ) rn FROM (SELECT main_id ,type FROM(SELECT main_id ,type ,ROW_NUMBER() OVER(PARTITION BY main_id,src_pt ORDER BY type DESC ) rn FROM (SELECT main_id ,type ,ceil(10 * rand()) AS src_pt FROM data_demo2 ) ) B WHERE B.rn <= 10 ) ) A WHERE A.rn <= 10;ソリューション 2:UDAF を使用した2段階集約
SQL メソッドは、保守が困難な大量のコードになる可能性があります。この場合、最小ヒープ優先度付きキューを持つユーザー定義の集計関数 (UDAF) を使用して最適化できます。これは、
iterateステージでは上位 N 個の項目のみが処理され、mergeステージでは N 個の要素のみがマージされることを意味します。プロセスは次のとおりです。iterate:最初の K 個の要素をプッシュします。後続の要素については、最小の上位要素と継続的に比較し、ヒープ内の要素を交換します。merge:2 つのヒープをマージし、上位 K 個の要素をインプレースで返します。terminate:ヒープを配列として返します。SQL 文で、配列を行に分割します。
@annotate('* -> array<string>') class GetTopN(BaseUDAF): def new_buffer(self): return [[], None] def iterate(self, buffer, order_column_val, k): # heapq.heappush(buffer, order_column_val) # buffer = [heapq.nlargest(k, buffer), k] if not buffer[1]: buffer[1] = k if len(buffer[0]) < k: heapq.heappush(buffer[0], order_column_val) else: heapq.heappushpop(buffer[0], order_column_val) def merge(self, buffer, pbuffer): first_buffer, first_k = buffer second_buffer, second_k = pbuffer k = first_k or second_k merged_heap = first_buffer + second_buffer merged_heap.sort(reverse=True) merged_heap = merged_heap[0: k] if len(merged_heap) > k else merged_heap buffer[0] = merged_heap buffer[1] = k def terminate(self, buffer): return buffer[0] SET odps.sql.python.version=cp37; SELECT main_id,type_val FROM ( SELECT main_id ,get_topn(type, 10) AS type_array FROM data_demo2 GROUP BY main_id ) LATERAL VIEW EXPLODE(type_array)type_ar AS type_val;
動的パーティション
動的パーティションは、パーティションテーブルにデータを挿入する際にパーティションキー列を指定できる機能ですが、特定の値を指定する必要はありません。代わりに、Select 句の対応するカラムがパーティション値を提供します。したがって、SQL 文を実行する前にどのパーティションが作成されるかはわかりません。SQL 文が実行された後、パーティションカラムの値に基づいてどのパーティションが作成されたかを判断できます。詳細については、「動的パーティションへのデータの挿入または上書き (DYNAMIC PARTITION)」をご参照ください。以下は SQL の例です。
CREATE TABLE total_revenues (revenue bigint) partitioned BY (region string);
INSERT OVERWRITE TABLE total_revenues PARTITION(region)
SELECT total_price AS revenue, region
FROM sale_detail;多くのシナリオで、動的パーティションを持つテーブルを作成するとデータスキューにつながる可能性があります。データスキューが発生した場合、次のソリューションを使用して解決できます。
No. | ソリューション | 説明 |
ソリューション 1 | パラメーター構成の最適化 | パラメーター構成を通じて最適化します。 |
ソリューション 2 | パーティションプルーニングの最適化 | 多くのレコードを持つパーティションを見つけ、それらをプルーニングし、別々に挿入します。 |
ソリューション 1:パラメーター構成による最適化
動的パーティションを使用すると、異なる条件を満たすデータを異なるパーティションに入れることができます。この機能により、特に多くのパーティションがある場合に、テーブルに書き込むための複数の Insert Overwrite 操作が不要になり、コードを大幅に簡素化できます。ただし、動的パーティションはあまりにも多くの小規模ファイルを作成する可能性もあります。
データスキューの例
次の単純な SQL を例にとります。
INSERT INTO TABLE part_test PARTITION(ds) SELECT * FROM part_test;K 個の Map インスタンスと N 個のターゲットパーティションがあると仮定します。
ds=1 cfile1 ds=2 ... X ds=3 cfilek ... ds=n最も極端なケースでは、
K*N個の小規模ファイルが生成される可能性があります。あまりにも多くの小規模ファイルは、ファイルシステムに多大な管理負担をかける可能性があります。そのため、MaxCompute は追加の reduce ステージを導入することで動的パーティションを処理します。同じターゲットパーティションを単一の reduce インスタンスまたは少数の reduce インスタンスに割り当てて書き込みます。この方法により、あまりにも多くの小規模ファイルが作成されるのを防ぎます。この reduce 操作は常に最後の reduce タスク操作です。MaxCompute では、この機能はデフォルトで有効になっており、次のパラメーターが `true` に設定されていることを意味します。SET odps.sql.reshuffle.dynamicpt=true;この機能をデフォルトで有効にすると、小規模ファイルが多すぎる問題が解決され、単一のインスタンスが生成するファイルが多すぎるためにタスクが失敗するのを防ぎます。ただし、データスキューや追加の reduce 操作によるコンピューティングリソースの消費など、新たな問題も発生します。したがって、これらの要素を慎重にバランスさせる必要があります。
ソリューション
set odps.sql.reshuffle.dynamicpt=true;を有効にし、追加の reduce ステージを導入する目的は、小規模ファイルが多すぎる問題を解決することです。しかし、ターゲットパーティションの数が少なく、小規模ファイルが多すぎるリスクがない場合、この機能をデフォルトで有効にすると、コンピューティングリソースが無駄になり、パフォーマンスが低下します。この状況では、set odps.sql.reshuffle.dynamicpt=false;を設定してこの機能を無効にすることで、パフォーマンスを大幅に向上させることができます。以下に例を示します。INSERT OVERWRITE TABLE ads_tb_cornucopia_pool_d PARTITION (ds, lv, tp) SELECT /*+ mapjoin(t2) */ '20150503' AS ds, t1.lv AS lv, t1.type AS tp FROM (SELECT ... FROM tbbi.ads_tb_cornucopia_user_d WHERE ds = '20150503' AND lv IN ('flat', '3rd') AND tp = 'T' AND pref_cat2_id > 0 ) t1 JOIN (SELECT ... FROM tbbi.ads_tb_cornucopia_auct_d WHERE ds = '20150503' AND tp = 'T' AND is_all = 'N' AND cat2_id > 0 ) t2 ON t1.pref_cat2_id = t2.cat2_id;上記のコードにデフォルトのパラメーターを使用すると、タスク全体の実行に約 1 時間 30 分かかります。最後の reduce タスクには約 1 時間 20 分かかり、これは総実行時間の約
90%に相当します。追加の reduce タスクを導入すると、各 reduce インスタンスのデータ分布が非常に不均一になり、ロングテールが発生します。
上記の例では、過去に作成された動的パーティションの数を分析すると、毎日約 2 つの動的パーティションしか作成されていません。したがって、
set odps.sql.reshuffle.dynamicpt=false;を安全に設定できます。タスクはわずか 9 分で完了できます。この場合、このパラメーターをfalseに設定すると、パフォーマンスが大幅に向上し、コンピューティング時間とリソースが節約され、1 つのパラメーターを設定するだけで高い限界便益が得られます。この最適化は、時間がかかり、大量のリソースを消費する大規模なタスクに限定されません。実行が速く、リソース消費が少ない小規模で通常のタスクでも、動的パーティションを使用し、動的パーティションの数が多くない限り、
odps.sql.reshuffle.dynamicptパラメーターをfalseに設定できます。この設定は、すべての場合においてリソースを節約し、パフォーマンスを向上させます。タスクの期間に関係なく、次の 3 つの条件を満たすノードは最適化できます。
動的パーティションを使用している
動的パーティションの数 <= 50
`set odps.sql.reshuffle.dynamicpt=false;` が設定されていない
ノードにこのパラメーターを設定する緊急度は、
diag_levelフィールドで識別される最後の Fuxi インスタンスの実行時間によって、次のルールに基づいて決定されます:Last_Fuxi_Inst_Timeが 30 分を超える場合:Diag_Level=4 ('Critical')。Last_Fuxi_Inst_Timeが 20 分から 30 分の場合:Diag_Level=3 ('High')。Last_Fuxi_Inst_Timeが 10 分から 20 分の場合:Diag_Level=2 ('Medium')。Last_Fuxi_Inst_Timeが 10 分未満の場合:Diag_Level=1 ('Low')。
解決策 2: 最適化されたトリミング。
動的パーティションにデータを挿入する際に map ステージで発生するデータスキューを解決するには、多くのレコードを持つパーティションを見つけ、それらをプルーニングしてから別々に挿入します。実際のシナリオに基づいて、次の例のように map ステージのパラメーター設定を変更できます:
SET odps.sql.mapper.split.size=128; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;結果は、全表スキャンが実行されたことを示しています。パフォーマンスをさらに最適化するには、次のようにシステムが生成する Reduce ジョブを無効にできます:
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi;動的パーティションにデータを挿入する際に map ステージで発生するデータスキューを解決するには、多くのレコードを持つパーティションを特定し、それらを分離してから別々に挿入します。手順は次のとおりです。
次のコマンドを使用して、多くのレコードを含むパーティションをクエリします。
SELECT ds ,hh ,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds ,hh ORDER BY cnt DESC;パーティションの一部は次のとおりです:
ds
hh
cnt
20200928
17
1052800
20191017
17
1041234
20210928
17
1034332
20190328
17
1000321
20210504
1
19
20191003
20
18
20200522
1
18
20220504
1
18
次のコマンドを使用して、上記で特定されたパーティションにデータをフィルターして挿入し、多くのレコードを含むパーティションに対しては別の挿入操作を実行します。
SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) NOT IN ('2020092817','2019101717','2021092817','2019032817'); SET odps.sql.reshuffle.dynamicpt=false ; INSERT OVERWRITE TABLE data_demo3 partition(ds,hh) SELECT * FROM dwd_alsc_ent_shop_info_hi WHERE CONCAT(ds,hh) IN ('2020092817','2019101717','2021092817','2019032817'); SELECT ds ,hh,COUNT(*) AS cnt FROM dwd_alsc_ent_shop_info_hi GROUP BY ds,hh ORDER BY cnt DESC;