すべてのプロダクト
Search
ドキュメントセンター

SchedulerX:ブロードキャスト

最終更新日:Jan 14, 2025

ブロードキャストモデルを使用する場合、ジョブインスタンスは、ジョブインスタンスが属するグループ内のすべてのワーカーに分散されます。すべてのワーカーがジョブを完了した後にのみ、ジョブは完了します。ワーカーがジョブを完了できない場合、ジョブは失敗と見なされます。

シナリオ

  • バッチ O&M 操作

    • スケジュールされた時間にすべてのワーカーでスクリプトを実行します。

    • スケジュールされた時間にすべてのワーカーでキャッシュをクリアします。

    • すべてのワーカーでサービスを動的に有効にし、ワーカーを使用して実行結果を収集し、データベースを更新します。

  • データ集約

    • JavaProcessor を使用する場合、preProcess プロセス中に Redis キャッシュまたはデータベースを初期化します。

    • ワーカーが process メソッドを呼び出した後、ビジネスロジックに基づいて実行結果を収集します。

    • ワーカーが postProcess メソッドを呼び出した後、すべてのワーカーからの実行結果を集約し、キャッシュまたはデータベースを更新します。

サポートされているジョブの種類

ブロードキャストモデルは、スクリプトジョブや Java ジョブなど、複数の種類のジョブをサポートしています。Java ジョブを使用する場合、ブロードキャストモデルは、preProcess メソッドと postProcess メソッドなどの高度な機能をサポートしています。

Java ジョブを使用する場合は、JavaProcessor 1.0.7 以降を使用する必要があります。次のコードブロックはインターフェースを示しています。

  • 必須:public ProcessResult process(JobContext context) throws Exception

  • オプション:public void preProcess(JobContext context)

  • オプション:public ProcessResult postProcess(JobContext context)

preProcess メソッドは、process メソッドが呼び出される前に、各ワーカーで 1 回だけ呼び出されます。

postProcess メソッドは、すべてのワーカーが process メソッドを呼び出した後、各ワーカーで 1 回だけ呼び出されます。実行結果はワークフローに取り込むことができます。

@Component
public class TestBroadcastJob extends JavaProcessor {

    /**
     * 1 つのワーカーのみがメソッドを呼び出します。
     */
    @Override
    public void preProcess(JobContext context) {
        System.out.println("TestBroadcastJob.preProcess");
    }

    /**
     * すべてのワーカーがメソッドを呼び出します。
     */
    @Override
    public ProcessResult process(JobContext context) throws Exception {
        int value = new Random().nextInt(10);
        System.out.println("Total number of shards=" + context.getShardingNum() + ", Shard index=" + context.getShardingId() + ", "
                + "taskId=" + context.getTaskId() + ", value=" + value);
        return new ProcessResult(true, String.valueOf(value));
    }

    /**
     * 1 つのワーカーのみがメソッドを呼び出します。
     */
    @Override
    public ProcessResult postProcess(JobContext context) {
        System.out.println("TestBroadcastJob.postProcess");
        Map<Long, String> allTaskResults = context.getTaskResults();
        Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
        int num = 0;
        for (Entry<Long, String> entry : allTaskResults.entrySet()) {
            System.out.println(entry.getKey() + ":" + entry.getValue());
            if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
                num += Integer.valueOf(entry.getValue());
            }
        }
        System.out.println("TestBroadcastJob.postProcess(), num=" + num);
        return new ProcessResult(true, String.valueOf(num));
    }

}