すべてのプロダクト
Search
ドキュメントセンター

MaxCompute:グラフジョブ

最終更新日:Jan 07, 2025

このトピックでは、MaxCompute Graphジョブを実行する方法について説明します。

ジョブの実行

MaxComputeクライアントは、MaxCompute Graphジョブを実行するためのJARコマンドを提供します。 このコマンドは、MapReduceJARコマンドと同じ方法で使用されます。

構文:

Usage: jar [<GENERIC_OPTIONS>] <MAIN_CLASS> [ARGS]
    -conf <configuration_file>         Specify an application configuration file
    -classpath <local_file_list>       classpaths used to run mainClass
    -D <name>=<value>                  Property value pair, which will be used to run mainClass
    -local                             Run job in local mode
    -resources <resource_name_list>    file/table resources used in graph, seperate by comma

<GENERIC_OPTIONS> には、次のオプションパラメーターが含まれます。

  • -conf <configuration file>: JobConf設定ファイル。

  • -classpath <local_file_list>: ローカルモードでジョブを実行するために使用されるクラスパス。 このパラメータは、main関数が配置されているJARパッケージを指定します。

    ユーザーは、main関数とGraphジョブをSSSPなどの1つのパッケージに書き込むことを好みます。 したがって、JARパッケージは、サンプルプログラムの -resourcesパラメーターと -classpathパラメーターの両方で使用されます。 ただし、これら2つのパラメーターで使用されるパッケージの意味は異なります。

    • -resourcesは、分散実行用のGraphジョブを参照します。

    • -classpathは、ローカル実行のmain関数を参照します。 指定されたJARパッケージもローカルディレクトリに保存する必要があります。 パッケージ名は、デフォルトのファイル区切り文字で区切ります。 Windowsオペレーティングシステムでは、デフォルトのファイル区切り文字としてセミコロン (;) を使用しますが、Linuxオペレーティングシステムではコロン (:) を使用します。

  • -D <prop_name >=< prop_value>: ローカル実行用の <mainClass> のJavaプロパティ。 複数のプロパティを指定できます。

  • -local: Graphジョブをローカルモードで実行するように指定します。 プログラムのデバッグに使用されます。

  • -resources <resource_name_list>: Graphジョブの実行に使用されるリソースです。 Graphジョブが使用するリソースの名前をresource_name_listで指定する必要があります。 Graphジョブの実行中に他のMaxComputeリソースを読み取る場合は、それらのリソースの名前を <resource_name_list> に追加する必要があります。 リソース名はコンマ (,) で区切ります。 クロスプロジェクトリソースを使用する場合は、resource_name_listの前にPROJECT_NAME/resources/ を追加する必要があります。 例: -resources otherproject/resources/resfile

MaxComputeクライアントを使用してジョブを送信する代わりに、Graphジョブでmain関数を直接実行してジョブをMaxComputeに送信できます。 次の例では、PageRankアルゴリズムを使用します。

public static void main(String[] args) throws Exception {
  if (args.length < 2)
    printUsage();
  Account account = new AliyunAccount(accessId, accessKey);
  Odps odps = new Odps(account);
  odps.setEndpoint(endPoint);
  odps.setDefaultProject(project);
  SessionState ss = SessionState.get();
  ss.setOdps(odps);
  ss.setLocalRun(false);
  String resource = "mapreduce-examples.jar";
  GraphJob job = new GraphJob();
  // Add the used JAR file and other files to the class cache resource. These files correspond to those specified by -libjars in the JAR command.
  job.addCacheResourcesToClassPath(resource);
  job.setGraphLoaderClass(PageRankVertexReader.class);
  job.setVertexClass(PageRankVertex.class);
  job.addInput(TableInfo.builder().tableName(args[0]).build());
  job.addOutput(TableInfo.builder().tableName(args[1]).build());
  // default max iteration is 30
  job.setMaxIteration(30);
  if (args.length >= 3)
    job.setMaxIteration(Integer.parseInt(args[2]));
  long startTime = System.currentTimeMillis();
  job.run();
  System.out.println("Job Finished in "
      + (System.currentTimeMillis() - startTime) / 1000.0
      + " seconds");
}

入出力

MaxCompute Graphジョブの入力と出力はテーブルである必要があります。 入力または出力形式をカスタマイズすることはできません。

  • ジョブ入力定义

    GraphJob job = new GraphJob();
    job.addInput(TableInfo.builder().tableName("tblname").build());  // Use tables as the input.
    job.addInput(TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build()); // Use partitions as the input.
    // Read only the col2 and col0 columns of the input table. Use record.get(0) to obtain col2 in the load() method of GraphLoader. Both are read in the same sequence.
    job.addInput(TableInfo.builder().tableName("tblname").partSpec("pt1=a/pt2=b").build(), new String[]{"col2", "col0"});
    説明
    • 複数の入力がサポートされています。

    • addInputフレームワークは、入力テーブルからレコードを読み取り、そのレコードをユーザー定義のGraphLoaderに転送してグラフデータをロードします。

    • パーティションフィルター条件は指定できません。 制限の詳細については、「MaxCompute Graphの制限」をご参照ください。

  • ジョブ出力定义

    GraphJob job = new GraphJob();
    // If the output table is a partitioned table, the last level of partitions must be provided.
    job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").build());
    // true indicates that the code overwrites partitions specified by tableinfo. The value true is similar to the INSERT OVERWRITE statement. The value false is similar to the INSERT INTO statement.
    job.addOutput(TableInfo.builder().tableName("table_name").partSpec("pt1=a/pt2=b").lable("output1").build(), true);
    説明
    • 複数の出力がサポートされ、各出力はラベルで識別されます。

    • Graphジョブは、WorkContextのwrite() メソッドを使用して、実行時に出力テーブルにデータを書き込むことができます。 複数の出力にラベルを付ける必要があります。

リソースの読み取り

  • GraphJobを使用したリソースの読み取り

    JARコマンドに加えて、GraphJobの次の2つのメソッドを使用して、Graphによって読み取られるリソースを指定できます。

    void addCacheResources(String resourceNames)
    void addCacheResourcesToClassPath(String resourceNames)
  • WorkerContextを使用したリソースの読み取り

    WorkerContextオブジェクトからリソースを読み取ることができます。

    public byte[] readCacheFile(String resourceName) throws IOException;
    public Iterable<byte[]> readCacheArchive(String resourceName) throws IOException;
    public Iterable<byte[]> readCacheArchive(String resourceName, String relativePath)throws IOException;
    public Iterable<WritableRecord> readResourceTable(String resourceName);
    public BufferedInputStream readCacheFileAsStream(String resourceName) throws IOException;
    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName) throws IOException;
    public Iterable<BufferedInputStream> readCacheArchiveAsStream(String resourceName, String relativePath) throws IOException;
    						
    説明
    • リソースは、WorkerComputerのsetup() メソッドを使用して読み取り、WorkerValueに保存し、getWorkerValueを使用して取得することもできます。

    • リソースの読み取りと処理を同時に行う場合は、前述のストリームAPIを使用できます。 この方法は、メモリ消費を低減する。

    • 制限の詳細については、「MaxCompute Graphの制限」をご参照ください。