アプリケーションプロセスで Java ジョブを実行できます。このトピックでは、Java ジョブの管理方法について説明します。
実行モード
Java ジョブは、次の実行モードをサポートしています。
スタンドアロン操作: ジョブは
groupIdグループのランダムなワーカーで実行されます。ブロードキャスト実行: ジョブは
groupIdグループに属するすべてのワーカーで同時に実行され、システムはすべてのワーカーがジョブを完了するまで待機します。ビジュアル MapReduce: MapReduce モデルです。[professional Edition] をアクティブにする必要があります。タスクの最大数は 1,000 に制限されています。キーワードでタスクの詳細な実行レコード、操作ログ、およびスタックをクエリできます。
MapReduce: 通常の MapReduce モデルです。このモードでは、多数のタスクを並列処理できます。ただし、タスクの実行情報のみをクエリできます。タスクの数が 1,000,000 未満の場合は、このモードを選択することをお勧めします。
シャード実行: このモードでは、静的シャードと動的シャードを使用してビッグデータコンピューティングビジネスを処理します。
スタンドアロン操作 モードと ブロードキャスト実行 モードを選択する場合は、JavaProcessor クラスを実装する必要があります。ビジュアル MapReduce、MapReduce、および シャード実行 モードを選択する場合は、MapJobProcessor クラスを実装する必要があります。
Processor クラスのパスは、実装クラスのフルパスです。例: com.apache.armon.test.schedulerx.processor.MySimpleJob。
JAR パッケージをアップロードしない場合、SchedulerX はアプリケーションプロセスのクラスパスで Processor 実装クラスを検索します。したがって、アプリケーションのジョブを変更するたびに、アプリケーションを再コンパイルして再公開する必要があります。
JAR パッケージをアップロードすると、SchedulerX は JAR パッケージと Processor を動的にロードします。この場合、アプリケーションのジョブを変更した後にアプリケーションを再公開する必要はありません。
プログラミングモデル
Java ジョブは、次のプログラミングモデルをサポートしています: JavaProcessor および MapJobProcessor。
JavaProcessor
オプション:
public void preProcess(JobContext context) throws Exception必須:
public ProcessResult process(JobContext context) throws Exceptionオプション:
public void postProcess(JobContext context)オプション:
public void kill(JobContext context)
MapJobProcessor
必須:
public ProcessResult process(JobContext context) throws Exceptionオプション:
public void postProcess(JobContext context)オプション:
public void kill(JobContext context)必須:
public ProcessResult map(List<? extends Object> taskList, String taskName)
ProcessResult
すべてのプロセスは ProcessResult を返す必要があります。ProcessResult には、ジョブの実行ステータスと結果、およびエラーメッセージが含まれています。
return new ProcessResult(true)が表示された場合は、ジョブが完了したことを示します。return new ProcessResult(false, ErrorMsg)が表示された場合、または例外がスローされた場合は、ジョブが失敗したことを示します。ジョブが完了し、実行結果:
return new ProcessResult(true, result)が返されます。resultは文字列で、1,000 バイトを超えることはできません。
サンプルジョブ (Hello, SchedulerX 2.0)
@Component
public class MyProcessor1 extends JavaProcessor {
@Override
public ProcessResult process(JobContext context) throws Exception {
//TODO 処理内容を記述します
System.out.println("Hello, schedulerx2.0!");
return new ProcessResult(true);
}
} 終了可能なサンプルジョブ
@Component
public class MyProcessor2 extends JavaProcessor {
private volatile boolean stop = false;
@Override
public ProcessResult process(JobContext context) throws Exception {
int N = 10000;
while (!stop && N >= 0) {
//TODO 処理内容を記述します
N--;
}
return new ProcessResult(true);
}
@Override
public void kill(JobContext context) {
stop = true;
}
@Override
public void preProcess(JobContext context) {
stop = false; //ジョブが Spring を使用して起動され、Bean が Singleton Bean の場合、preProcess を使用してフラグをリセットする必要があります。
}
} バッチ処理に Map モデルを使用するサンプルジョブ
/**
* 個々のテーブルで分散バッチ処理を実行します。
* 1. ルートジョブは、テーブルをクエリし、minId と maxId を取得するために使用されます。
* 2. PageTask をビルドし、map メソッドを呼び出してタスクを分散します。
* 3. 次のレベルで PageTask が取得された場合は、データを処理します。
*
*/
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
private static final int pageSize = 100;
static class PageTask {
private int startId;
private int endId;
public PageTask(int startId, int endId) {
this.startId = startId;
this.endId = endId;
}
public int getStartId() {
return startId;
}
public int getEndId() {
return endId;
}
}
@Override
public ProcessResult process(JobContext context) {
String taskName = context.getTaskName();
Object task = context.getTask();
if (isRootTask(context)) {
System.out.println("start root task");
Pair<Integer, Integer> idPair = queryMinAndMaxId();
int minId = idPair.getFirst();
int maxId = idPair.getSecond();
List<PageTask> taskList = Lists.newArrayList();
int step = (int) ((maxId - minId) / pageSize); //ページ数を計算します。
for (int i = minId; i < maxId; i+=step) {
taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
}
return map(taskList, "Level1Dispatch"); //プロセスは map メソッドを呼び出してタスクを分散します。
} else if (taskName.equals("Level1Dispatch")) {
PageTask record = (PageTask)task;
long startId = record.getStartId();
long endId = record.getEndId();
//TODO 処理内容を記述します
return new ProcessResult(true);
}
return new ProcessResult(true);
}
@Override
public void postProcess(JobContext context) {
//TODO 処理内容を記述します
System.out.println("all tasks is finished.");
}
private Pair<Integer, Integer> queryMinAndMaxId() {
//TODO select min(id),max(id) from xxx から
return null;
}
}