MaxCompute は、スキャンされたデータ量と消費された計算リソースに基づいて課金されます。予期せぬ請求額の急増の主な原因は、次の2つのパターンです。必要以上にデータをスキャンする SQL ジョブと、ワークロードが必要とする量よりも多くのリソースを割り当てる MapReduce ジョブです。このトピックでは、これらの両方を診断し、修正する方法について説明します。
ジョブ実行前のコスト見積もり
TCO ツールを使用して、ジョブを送信する前に計算コストを見積もります。SQL ジョブの場合は、CostSQL を使用して、クエリ実行前にコストをプレビューします。リソース消費アラートを設定して、予期せぬコスト増加を早期に検出します。
SQL 計算コストの削減
全表スキャンの回避
全表スキャンは、SQL 計算コストが高くなる主な原因です。MaxCompute はスキャンされたデータに基づいて課金されます。必要なデータが一部であるにもかかわらず、テーブル全体をスキャンすると、請求額が大幅に増加します。
セッションまたはプロジェクトレベルでの全表スキャンの無効化:
-- 現在のセッションで無効にする
set odps.sql.allow.fullscan=false;
-- プロジェクト全体で無効にする
SetProject odps.sql.allow.fullscan=false;列のプルーニングの使用 — SELECT * の回避:
SELECT * は常に全表スキャンをトリガーします。必要な列のみを選択してください:
-- テーブル T には列 a、b、c、d、e があります。
-- このクエリは a、b、e のみを読み取り、c と d は完全にスキップします。
SELECT a, b FROM T WHERE e < 10;パーティションプルーニングの使用 — パーティションキー列でのフィルタリング:
WHERE 句でパーティションキーを指定すると、MaxCompute は関連性のないパーティションをスキップし、一致するデータのみをスキャンします。
SELECT a, b FROM T WHERE partitiondate = '2017-10-01';パーティションフィルターがない場合、パーティションテーブルに対する JOIN または SELECT は全表スキャンにフォールバックします。JOIN を実行する前に、必ずパーティションをプルーニングしてください。パーティションプルーニングが有効にならないケースについては、「パーティションプルーニングが有効にならないシナリオ」をご参照ください。
コストのかかる SQL パターンの書き換え
一部の SQL キーワードは、追加のシャッフリングとソートをトリガーし、追加の計算リソースを消費します。根本的な原因はデータ移動です。エンジンがクエリを満たすためにノード間でデータを再配置する必要がある場合、移動されたデータ量に比例して CPU と I/O を消費します。これらのパターンを書き換えることで、その移動を削減または排除できます。
FULL OUTER JOIN を UNION ALL に置き換える
FULL OUTER JOIN は、エンジンが両方のテーブルのすべての行を照合する必要があり、大規模な中間シャッフルを生成します。UNION ALL は、欠落している値をゼロでパディングすることにより、結合を完全に排除します。
-- 元のクエリ:FULL OUTER JOIN
SELECT COALESCE(t1.id, t2.id) AS id, SUM(t1.col1) AS col1, SUM(t2.col2) AS col2
FROM (
SELECT id, col1 FROM table1
) t1
FULL OUTER JOIN (
SELECT id, col2 FROM table2
) t2
ON t1.id = t2.id
GROUP BY COALESCE(t1.id, t2.id);
-- 最適化済み:UNION ALL (結合なし、データ移動オーバーヘッドなし)
SELECT t.id, SUM(t.col1) AS col1, SUM(t.col2) AS col2
FROM (
SELECT id, col1, 0 AS col2 FROM table1
UNION ALL
SELECT id, 0 AS col1, col2 FROM table2
) t
GROUP BY t.id;GROUP BY を UNION ALL の外に移動する
UNION ALL の各ブランチ内に GROUP BY を配置すると、エンジンは2回集約を実行します。UNION の後に1回集約してください:
-- 元のクエリ:各ブランチ内の GROUP BY (二重集約)
SELECT t.id, SUM(t.val) AS val
FROM (
SELECT id, SUM(col3) AS val FROM table3 GROUP BY id
UNION ALL
SELECT id, SUM(col4) AS val FROM table4 GROUP BY id
) t
GROUP BY t.id;
-- 最適化済み:UNION 後の単一集約
SELECT t.id, SUM(t.val) AS val
FROM (
SELECT id, col3 AS val FROM table3
UNION ALL
SELECT id, col4 AS val FROM table4
) t
GROUP BY t.id;DISTINCT を GROUP BY に置き換える
大規模なデータセットに対する DISTINCT は、完全なソートを必要とします。GROUP BY は、より少ないオーバーヘッドで同じ結果を達成します。
-- 元のクエリ:DISTINCT (完全ソート)
SELECT COUNT(DISTINCT id) AS cnt FROM table1;
-- 最適化済み:GROUP BY (より効率的な重複排除)
SELECT COUNT(1) AS cnt
FROM (
SELECT id FROM table1 GROUP BY id
) t;避けるべきその他のパターン:
パーティションなしで挿入する代わりに、パーティションフィールドを指定して
INSERT INTOを使用します。これにより、SQL の複雑さが軽減され、コストが削減されます。ORDER BY の代わりに、Excel などの外部ツールを使用して一時的にエクスポートされたデータをソートします。MaxCompute の ORDER BY は、データセット全体でグローバルソートをトリガーします。
スケジューリング頻度の制御
MaxCompute は、リアルタイムクエリではなく、大規模バッチ処理向けに設計されています。数秒または数分といった短い間隔で SQL ジョブをスケジューリングすると、従量課金の下でジョブキューが蓄積され、翌日の請求額が予期せず急増する可能性があります。
頻繁にスケジュールされるジョブの実行頻度を設定する前に、CostSQL を実行してコストを見積もります。ほぼリアルタイムの結果を必要とするワークロードには、MaxCompute ではなく、専用のリアルタイムコンピューティングサービスを使用してください。
SQL を実行せずにテーブルデータをプレビュー
SELECT * FROM table LIMIT 10 を実行すると、計算リソースを消費します。代わりに組み込みのテーブルプレビュー機能を使用してください。これは、計算ジョブをトリガーせずにストレージから直接データを読み取ります。
DataWorks: [データマップ] ページを開き、テーブルを見つけ、[プレビュー タブ] を使用します。詳細については、「テーブルの詳細を表示する」をご参照ください。
MaxCompute Studio:オブジェクトツリーでテーブルをダブルクリックして、そのデータをプレビューします。
ワークロードに適したツールの選択
MaxCompute は、ミリ秒単位ではなく、数分単位で結果を返します。大規模バッチ分析には適切なツールですが、即座の応答を必要とするフロントエンドクエリには適していません。
ダッシュボード、検索結果、行ルックアップなどのフロントエンドクエリには、ApsaraDB for RDS のようなリレーショナルデータベースを使用してください。集約された MaxCompute の結果をそこに保存し、データベースからクエリを提供します。WHERE 句がなく、集約を実行せず、辞書を結合しないフロントエンドクエリは、MaxCompute では常に低速になります。
MapReduce 計算コストの削減
スプリットサイズとリデューサー数の構成
2つの構成設定が、MapReduce リソース消費に最大の影響を与えます。
スプリットサイズは、作成されるマッパーの数を制御します。デフォルトはスプリットあたり 256 MB です。スプリットサイズが小さいほど、より多くのマッパーが作成され、並列性が向上しますが、リソース使用量も増加します。ご利用のマップロジックの計算コストに基づいて、JobConf#setSplitSize を使用してこの値を調整してください。各レコードの処理コストが高い場合は、スプリットサイズを小さくしてより多くのマッパーを作成し、作業を分散させます。
リデューサー数は、マッパー数の4分の1がデフォルトであり、0から2,000までの任意の値に設定できます。リデューサーが多いほど、より多くのリソースを消費します。ご利用の集約ワークロードが必要とする数のリデューサーのみを設定し、jobconf.setNumReduceTasks(num) を使用して明示的に数を構成してください。
パイプラインモードを使用したシリアルジョブのマージ
複数の MapReduce ジョブが連鎖している場合 (あるジョブの出力が次のジョブの入力となる場合)、各中間ジョブは結果をディスクに書き込み、それを読み戻します。このディスク I/O は、連鎖全体で複合的に発生します。
パイプラインモードは、シリアルな MapReduce ジョブを単一のジョブにマージし、中間ディスクの読み書きを排除します。これにより、コストとスケジューリングのオーバーヘッドの両方が削減されます。実装例については、「パイプラインの例」をご参照ください。
入力テーブルの列のプルーニング
マッパーが入力テーブルを読み取る際に、その列の一部しか必要としない場合、行全体を読み取ると I/O が無駄になります。入力テーブルを追加する際に、必要な列を指定してください:
InputUtils.addTable(TableInfo.builder().tableName("wc_in").cols(new String[]{"c1","c2"}).build(), job);この構成後、マッパーは c1 および c2 列のみを読み取ります。列名でアクセスされるデータは影響を受けませんが、添字インデックスでアクセスされるデータは異なる動作をする可能性があります。
セットアップステージでのリソースの読み込み
リソースを読み取る各呼び出しにはオーバーヘッドが発生し、リソースは最大64回読み取ることができます。map または reduce 関数内で同じリソースを読み取ると、すべてのレコードで再読み取りされます。代わりに、setup ステージでリソースを一度だけ読み取ってください。使用例については、「リソース使用例」をご参照ください。
map または reduce 関数でのオブジェクト構築の回避
map または reduce 関数内で構築された Java オブジェクトは、レコードが呼び出されるたびに再構築されます。オブジェクトの構築を setup ステージに移動してください:
Record word;
Record one;
public void setup(TaskContext context) throws IOException {
// セットアップで一度構築 — マップ呼び出しごとにではない
word = context.createMapOutputKeyRecord();
one = context.createMapOutputValueRecord();
one.set(new Object[]{1L});
}マップ出力に重複キーがある場合のコンバイナーの使用
コンバイナーは、マップタスクの出力をリデューサーに送信する前に事前集約し、シャッフルフェーズ中にネットワークを介して転送されるデータ量を削減します。
コンバイナーは、マップ出力に同じキーを持つ複数のレコードが含まれる場合にのみ使用してください (例えば、ワードカウントジョブなど)。マップ出力キーが一意である場合、コンバイナーはメリットなしにオーバーヘッドを追加します。
次のコンバイナーは、一致するキーの値を合計します。
/**
* 値を合計することでマップ出力を結合するコンバイナークラス。
*/
public static class SumCombiner extends ReducerBase {
private Record count;
@Override
public void setup(TaskContext context) throws IOException {
count = context.createMapOutputValueRecord();
}
@Override
public void reduce(Record key, Iterator<Record> values, TaskContext context)
throws IOException {
long c = 0;
while (values.hasNext()) {
Record val = values.next();
c += (Long) val.get(0);
}
count.set(0, c);
context.write(key, count);
}
}パーティション列またはカスタムパーティショナーによるデータスキューの防止
デフォルトでは、リデューサーは完全なキーのスキーマのハッシュに基づいてデータを受け取ります。一部のキー値が他のキー値よりもはるかに一般的である場合、特定のリデューサーが著しく多くのデータを受け取ります。これは、一部のリデューサーが他のリデューサーよりもはるかに遅れて終了し、ジョブ全体を停滞させるロングテール問題です。
リデューサー間でデータをより均等に分散させるには、JobConf#setPartitionColumns を使用してパーティションキー列を指定します。データは、完全なキーではなく、それらの列のハッシュに基づいてリデューサーにルーティングされます。
jobconf.setPartitionerClass(MyPartitioner.class)
jobconf.setNumReduceTasks(num)より詳細な制御を行うには、カスタムパーティショナーを実装してください:
import com.aliyun.odps.mapred.Partitioner;
public static class MyPartitioner extends Partitioner {
@Override
public int getPartition(Record key, Record value, int numPartitions) {
// numPartitions はリデューサーの総数です。
// キーの長さに基づいて各キーをリデューサーにルーティングします。
String k = key.get(0).toString();
return k.length() % numPartitions;
}
}JVM メモリを 1:4 の CPU 対メモリ比に維持
標準構成は、1 CPU コアと 4 GB のメモリであり、odps.stage.reducer.jvm.mem は 4006 に設定されています。CPU コアに対する 1:4 の比率を超えてメモリを割り当てると、課金が増加します。過剰なプロビジョニングではなく、実際のワークロードに合わせて odps.stage.reducer.jvm.mem を調整してください。
次のステップ
ストレージコストを最適化するには、「ストレージコストの最適化」をご参照ください。
データアップロードおよびダウンロードコストを削減するには、「データアップロードおよびダウンロードコストの最適化」をご参照ください。
課金異常を分析および解決するには、「コストの管理」をご参照ください。
リソース最適化計画を生成するには、「計算リソース最適化計画の生成」をご参照ください。