すべてのプロダクト
Search
ドキュメントセンター

SchedulerX:Java ジョブ

最終更新日:Jan 14, 2025

アプリケーションプロセスで Java ジョブを実行できます。このトピックでは、Java ジョブの管理方法について説明します。

実行モード

Java ジョブは、次の実行モードをサポートしています。

  • スタンドアロン操作: ジョブは groupId グループのランダムなワーカーで実行されます。

  • ブロードキャスト実行: ジョブは groupId グループに属するすべてのワーカーで同時に実行され、システムはすべてのワーカーがジョブを完了するまで待機します。

  • ビジュアル MapReduce: MapReduce モデルです。[professional Edition] をアクティブにする必要があります。タスクの最大数は 1,000 に制限されています。キーワードでタスクの詳細な実行レコード、操作ログ、およびスタックをクエリできます。

  • MapReduce: 通常の MapReduce モデルです。このモードでは、多数のタスクを並列処理できます。ただし、タスクの実行情報のみをクエリできます。タスクの数が 1,000,000 未満の場合は、このモードを選択することをお勧めします。

  • シャード実行: このモードでは、静的シャードと動的シャードを使用してビッグデータコンピューティングビジネスを処理します。

スタンドアロン操作 モードと ブロードキャスト実行 モードを選択する場合は、JavaProcessor クラスを実装する必要があります。ビジュアル MapReduceMapReduce、および シャード実行 モードを選択する場合は、MapJobProcessor クラスを実装する必要があります。

Processor クラスのパスは、実装クラスのフルパスです。例: com.apache.armon.test.schedulerx.processor.MySimpleJob

  • JAR パッケージをアップロードしない場合、SchedulerX はアプリケーションプロセスのクラスパスで Processor 実装クラスを検索します。したがって、アプリケーションのジョブを変更するたびに、アプリケーションを再コンパイルして再公開する必要があります。

  • JAR パッケージをアップロードすると、SchedulerX は JAR パッケージと Processor を動的にロードします。この場合、アプリケーションのジョブを変更した後にアプリケーションを再公開する必要はありません。

プログラミングモデル

Java ジョブは、次のプログラミングモデルをサポートしています: JavaProcessor および MapJobProcessor。

  • JavaProcessor

    • オプション:public void preProcess(JobContext context) throws Exception

    • 必須:public ProcessResult process(JobContext context) throws Exception

    • オプション:public void postProcess(JobContext context)

    • オプション:public void kill(JobContext context)

  • MapJobProcessor

    • 必須:public ProcessResult process(JobContext context) throws Exception

    • オプション:public void postProcess(JobContext context)

    • オプション:public void kill(JobContext context)

    • 必須:public ProcessResult map(List<? extends Object> taskList, String taskName)

ProcessResult

すべてのプロセスは ProcessResult を返す必要があります。ProcessResult には、ジョブの実行ステータスと結果、およびエラーメッセージが含まれています。

  • return new ProcessResult(true) が表示された場合は、ジョブが完了したことを示します。

  • return new ProcessResult(false, ErrorMsg) が表示された場合、または例外がスローされた場合は、ジョブが失敗したことを示します。

  • ジョブが完了し、実行結果: return new ProcessResult(true, result) が返されます。result は文字列で、1,000 バイトを超えることはできません。

サンプルジョブ (Hello, SchedulerX 2.0)

@Component
public class MyProcessor1 extends JavaProcessor {

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        //TODO 処理内容を記述します
        System.out.println("Hello, schedulerx2.0!");
        return new ProcessResult(true);
    }
}            

終了可能なサンプルジョブ

@Component
public class MyProcessor2 extends JavaProcessor {
    private volatile boolean stop = false;

    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int N = 10000;
        while (!stop && N >= 0) {
            //TODO 処理内容を記述します
            N--;
        }
        return new ProcessResult(true);
    }

    @Override
    public void kill(JobContext context) {
        stop = true;
    }

    @Override
    public void preProcess(JobContext context) {
        stop = false;  //ジョブが Spring を使用して起動され、Bean が Singleton Bean の場合、preProcess を使用してフラグをリセットする必要があります。
    }
}          

バッチ処理に Map モデルを使用するサンプルジョブ

/**
 * 個々のテーブルで分散バッチ処理を実行します。
 * 1. ルートジョブは、テーブルをクエリし、minId と maxId を取得するために使用されます。
 * 2. PageTask をビルドし、map メソッドを呼び出してタスクを分散します。
 * 3. 次のレベルで PageTask が取得された場合は、データを処理します。
 *
 */
@Component
public class ScanSingleTableJobProcessor extends MapJobProcessor {
    private static final int pageSize = 100;

    static class PageTask {
        private int startId;
        private int endId;
        public PageTask(int startId, int endId) {
             this.startId = startId;
             this.endId = endId;
        }
        public int getStartId() {
              return startId;
        }
        public int getEndId() {
              return endId;
        }
    }

    @Override
    public ProcessResult process(JobContext context) {
        String taskName = context.getTaskName();
        Object task = context.getTask();
        if (isRootTask(context)) {
            System.out.println("start root task");
            Pair<Integer, Integer> idPair = queryMinAndMaxId();
            int minId = idPair.getFirst();
            int maxId = idPair.getSecond();
            List<PageTask> taskList = Lists.newArrayList();
            int step = (int) ((maxId - minId) / pageSize); //ページ数を計算します。
            for (int i = minId; i < maxId; i+=step) {
                taskList.add(new PageTask(i, (i+step > maxId ? maxId : i+step)));
            }
            return map(taskList, "Level1Dispatch"); //プロセスは map メソッドを呼び出してタスクを分散します。
        } else if (taskName.equals("Level1Dispatch")) {
            PageTask record = (PageTask)task;
            long startId = record.getStartId();
            long endId = record.getEndId();
            //TODO 処理内容を記述します
            return new ProcessResult(true);
        }

        return new ProcessResult(true);
    }

    @Override
    public void postProcess(JobContext context) {
        //TODO 処理内容を記述します
        System.out.println("all tasks is finished.");
    }

    private Pair<Integer, Integer> queryMinAndMaxId() {
        //TODO select min(id),max(id) from xxx から
        return null;
    }

}