MaxCompute は、Graph 開発プラグインを提供していません。 ただし、Eclipse ベースの MaxCompute Graph プログラムを開発することはできます。 開発プロセスは以下のとおりです。
  1. Graph のコードをコンパイルし、ローカルデバッグを使用して基本テストを実行します。
  2. クラスタのデバッグを実行し、結果を確認します。

このセクションでは、SSSP アルゴリズムを例として使用し、 Eclipse を使用した Graph プログラムの開発とデバッグの方法について説明します。

手順

  1. Java プロジェクトを作成します (例:graph_examples)。
  2. MaxCompute クライアントの lib ディレクトリ内の jar パッケージを Eclipse プロジェクトのビルドパスに追加します。 次の図は、設定済みの Eclipse プロジェクトを示します。
  3. MaxCompute Graph プログラムを開発します。

    実際の開発プロセスでは、例 (SSSP など) をコピーしてから変更することがよくあります。 この例では、パッケージパスのみが package com.aliyun.odps.graph.example に変更されています。

  4. パッケージをコンパイルし、ビルドします。

    Eclipse 環境で、ソースコードディレクトリ (src ディレクトリ) を右クリックし、[Export] > [Java] > [jar file] の順に選択し、 jar パッケージを生成します。 ターゲットの jar パッケージを保存するパスを選択します。 たとえば、D:\\odps\\clt\\odps-graph-example-sssp.jar です。

  5. MaxCompute コンソールを使用して、SSSP を実行します。 関連する操作の詳細については、「クイックスタート」の「Graph の実行」をご参照ください。
関連する開発手順の詳細については、「Graph 開発プラグイン」の説明をご参照ください。

ローカルデバッグ

MaxCompute Graph はローカルデバッグモードをサポートします。 Eclipse を使用してブレークポイントデバッグを実行します。

手順

  1. maven パッケージ odps-graph-local をダウンロードします。
  2. Eclipse プロジェクトを選択し、Graph ジョブのメインプログラムファイル (main 関数を含む) を右クリックし、[Run As] > [Run Configurations] の順に実行して実行パラメーターを設定します。
  3. [Arguments] タブページで、Program引数を、メインプログラムの入力パラメーターとして 1 sssp_in sssp_out に設定します。
  4. [Arguments] タブページで、VM 引数を以下のとおり設定します。
    -Dodps.runner.mode=local
    -Dodps.project.name=<project.name>
    -Dodps.end.point=<end.point>
    -Dodps.access.id=<access.id> 
    -Dodps.access.key=<access.key>
    
  5. MapReduce がローカルモード (odps.end.point の値が指定されていない) の場合、warehouse に sssp_in と sssp_out テーブルを作成し、sssp_in のデータを追加する必要があります。 入力データは以下のとおりです。
    1,"2:2,3:1,4:4"
    2,"1:2,3:2,4:1"
    3,"1:1,2:2,5:1"
    4,"1:4,2:1,5:1"
    5,"3:1,4:1"

    warehouse の詳細については、「MapReduce のローカル実行」をご参照ください。

  6. [Run] をクリックします。
    MaxCompute クライアントの conf/odps_config.ini の設定を確認し、パラメーターを設定します。 上記パラメーターは共通で使用されます。 以下に、他のパラメーターについて説明します。
    • odps.runner.mode: パラメーター値はローカルです。 このパラメーターはローカルデバッグ機能に必要です。
    • odps.project.name: 必須です。 現在のプロジェクトを指定します。
    • odps.end.point: 任意です。 現在の MaxCompute サービスのアドレスを指定します。 このパラメーターが指定されていないと、テーブルまたはリソースのメタデータが warehouse からのみ読み取られ、アドレスが存在しない場合に例外が発生します。このパラメーターが指定されていると、データは最初に warehouse から読み取られ、次に、アドレスが存在しない場合は、リモート MaxCompute から読み取られます。
    • odps.access.id: MaxCompute サービスに接続するための ID を示します。 このパラメーターは odps.end.point が指定されている場合にのみ有効です。
    • odps.access.key: MaxCompute サービスに接続するためのキーを示します。 このパラメーターは odps.end.point が指定されている場合にのみ有効です。
    • odps.cache.resources: 使用中のリソースリストを指定します。 このパラメーターは jar コマンドの -resources と同じ効果があります。
    • odps.local.warehouse: ローカルの warehouse パスを示します。 このパラメーターが定されていなれば、デフォルトで ./warehouse に設定されています。
    Eclipse のローカルで SSSP デバッグが実行された後、以下の情報が出力されます。
    Counters: 3
             com.aliyun.odps.graph.local.COUNTER
                     TASK_INPUT_BYTE=211
                     TASK_INPUT_RECORD=5
                     TASK_OUTPUT_BYTE=161
                     TASK_OUTPUT_RECORD=5
     graph task finish
    上の例では、sssp_in および sssp_out テーブルがローカルの warehouse に存在している必要があります。 sssp_in および sssp_out テーブルの詳細については、「クイックスタート」の「Graph の実行」をご参照ください。

ローカルジョブの一時ディレクトリ

以下の図で示すように、ローカルデバッグを実行するたびに Eclipse プロジェクトディレクトリに一時ディレクトリが作成されます。
ローカルで実行される Graph ジョブの一時ディレクトリは以下のディレクトリとファイルを含みます。
  • counters: 実行中のジョブのカウント情報を保存します。
  • inputs: ジョブの入力データを保存します。 データは優先的にローカルの warehouse から取得されます。 データがローカルに存在しない場合、MaxCompute SDK はデータをサーバーから (odps.end.point が設定されている場合) 読み取ります。 入力はデフォルトで 10 データレコードを読み取ります。 このしきい値は -Dodps.mapred.local.record.limit パラメーターで最大 10,000 まで変更可能です。
  • outputs: ジョブの出力データを保存します。 ローカル warehouse が出力テーブルの場合、出力結果データはジョブ実行後にローカル warehouse の対応するテーブルを上書きします。
  • resources: ジョブが使用するリソースを保存します。 inputs 同様、データは優先的にローカル warehouse から取得されます。 データがローカルに存在しない場合、データは MaxCompute SDK を使用してサーバーから (odps.end.point が設定されている場合) 読み取られます。
  • job.xml: ジョブの設定を示します。
  • superstep: 反復の度にメッセージ永続化に関する情報を保存します。
ローカルデバッグ中に詳細ログの出力が必要な場合、以下の log4j 設定ファイルが src ディレクトリ log4j.properties_odps_graph_cluster_debug に配置されている必要があります。

クラスタデバッグ

ローカルデバッグ後、テストのためにクラスタへジョブを送信します。

手順

  1. MaxCompute クライアントを設定します。
  2. add jar /path/work.jar -f; コマンドを実行し、jar パッケージを更新します。
  3. jar コマンドを実行してジョブを実行し、実行ログと結果データを確認します。
クラスタで Graph を実行する方法の詳細については、「クイックスタート」 の「Graph の実行」をご参照ください。

パフォーマンスチューニング

次のセクションでは、MaxCompute Graph フレームワーク上での一般的なパフォーマンスチューニング方法を説明します。

ジョブパラメーターの設定

パフォーマンスに影響を与える GraphJob の設定には、以下のものがあります。
  • setSplitSize(long): 入力テーブルの分割サイズを示します。 単位は MB です。 値は 0 より大きくなければならず、デフォルト値は 64 です。
  • setNumWorkers(int): ジョブの Worker 数を指定します。 値の範囲は [1,1000] で、デフォルト値は –1 です。 Worker 数はジョブの入力バイト数と分割サイズによって異なります。
  • setWorkerCPU(int): Map の CPU リソースを示します。 1 コア CPU は 100 リソースを含んでいます。 値の範囲は [50, 800] で、デフォルト値は 200 です。
  • setWorkerMemory(int): Map のメモリリソースを示します。 単位は MB です。 値の範囲は [256 MB, 12 GB] で、デフォルト値は 4,096 MB です。
  • setMaxIteration(int): 反復の最大数を指定します。 デフォルト値は –1 です。 値が 0 以下の場合、反復の最大数はジョブ終了条件となりません。
  • setJobPriority(int): ジョブの優先度を指定します。 値の範囲は [0, 9] で、デフォルト値は 9 です。 値が大きいほど、優先順位は低くなります。
全体的な処理能力を向上させるための追加アクションは以下のとおりです:
  • Worker 数を増やすには、setNumWorkers() メソッドを使用することができます。
  • 分割サイズを縮小し、ジョブがデータをロードする速度を上げるには、setSplitSize() メソッドを使用することができます。
  • Worker の CPU またはメモリを増やします。
  • 反復の最大数を設定します。 アプリケーションが結果の精度について高い要件を持たない場合は、反復の回数を減らしてプロセスをスピードアップさせることができます。
データのロード速度を上げるには、setNumWorkers と setSplitSize インターフェイスを一緒に使用します。 たとえば、setNumWorkers が workerNum、setSplitSize が splitSize、入力バイトの総数が inputSize とします。 分割数は、計算式: splitNum = inputSize /splitSize を使用して計算されます。 workerNum と splitNum の関係は以下のとおりです。
  • splitNum == workerNum の場合、各 Worker は 1 つの分割をロードします。
  • splitNum > workerNum の場合、各 Worker は 1 つ以上の分割をロードします。
  • splitNum < workerNum の場合、各 Worker は 0 または 1 つの分割をロードします。

そのため、最初の 2 つの条件に一致する場合、workerNum と splitSize を調整することで高速データロードを実現できます。反復フェーズでは、 workerNum だけを調整する必要があります。

ランタイムパーティショニングを false に設定する場合、 setSplitSize を使用して Worker 数を制御することを推奨します。 3 つめの条件については、一部の Worker の頂点の数が 0 であるかもしれません。jar コマンドの前に、 setNumWorkers と setSplitSize 同様の効果がある set odps.graph.split.size=<m>; set odps.graph.worker.num=<n>; を使用できます。

もう 1 つの共通するパフォーマンスの問題はデータスキューです。 たとえば、カウンターでは、ある Worker で処理される頂点と辺の数が、他の Worker で処理される量を大きく上回ることがあります。

データスキューは通常、キーに対応する頂点、辺またはメッセージの数が、他のキーを大きく上回る場合に発生します。 データ量の多いキーは、少数の Worker で処理され、結果的にこれらの Worker の実行時間は長くなります。

この問題を解決するには、以下のステップを推奨します。
  • コンバイナを使用して、そのようなキーに対応する頂点のメッセージをローカルで集計し、送信メッセージ数を減らします。
  • サービスロジックを改善します。

コンバイナを使用する

メッセージとネットワークデータトラフィック量を保存するメモリを縮小し、ジョブの実行時間を短縮するため、コンバイナを定義します。 詳しい情報は、MaxCompute SDK の 「コンバイナ」 をご参照ください。

データ入力量を減らす

データ量が大きい場合、ディスクのデータ読み取りが処理時間を増大させることがあります。 そのため、読み取るデータのバイト数を減らすことで全体的なスループット、つまりジョブのパフォーマンスを向上させることができます。次のいずれかの方法を使用できます。
  • 入力データ量を減らす: 意思決定アプリケーションでは、データのサンプリング後にサブセットを処理することで取得できる結果は、全体の精度ではなく結果の精度にのみ影響します。 そのため、特別なデータサンプリングを実行し、データを入力テーブルにインポートして処理することができます。
  • 使用されていないフィールドの読み取りを回避する: MaxCompute Graph フレームワークの Tableinfo クラスは、全テーブルの読み取りやテーブルパーティションの読み取りではなく、特定の列 (列名配列を使用して送信される) の読み取りをサポートします。 これにより入力データ量が減り、ジョブのパフォーマンスが向上します。

組み込み jar パッケージ

デフォルトで、以下の jar パッケージは、Graph プログラムを実行する JVM にロードされます。 これらのリソースをアップロードしたり、コマンドラインで -libjars を実行する際にこれらの jar パッケージを持つ必要はありません。
  • commons-codec-1.3.jar
  • commons-io-2.0.1.jar
  • commons-lang-2.5.jar
  • commons-logging-1.0.4.jar
  • commons-logging-api-1.0.4.jar
  • guava-14.0.jar
  • json.jar
  • log4j-1.2.15.jar
  • slf4j-api-1.4.3.jar
  • slf4j-log4j12-1.4.3.jar
  • xmlenc-0.52.jar
JVM を実行するクラスパスでは、 上述のビルトイン jar パッケージはユーザーの jar パッケージより前に配置され、バージョンの競合が発生する可能性があります。 たとえば、プログラムで commons-codec-1.5.jar 内のクラスの関数を使用していても、この関数は commons-codec-1.3.jar には含まれていません。 実装メソッドが commons-codec-1.3.jar に存在するかどうかを確認するか、MaxCompute がサポート対象バージョンへアップデートするまでお待ちください。