ブロードキャストモデルを使用する場合、ジョブインスタンスは、ジョブインスタンスが属するグループ内のすべてのワーカーに分散されます。すべてのワーカーがジョブを完了した後にのみ、ジョブは完了します。ワーカーがジョブを完了できない場合、ジョブは失敗と見なされます。
シナリオ
バッチ O&M 操作
スケジュールされた時間にすべてのワーカーでスクリプトを実行します。
スケジュールされた時間にすべてのワーカーでキャッシュをクリアします。
すべてのワーカーでサービスを動的に有効にし、ワーカーを使用して実行結果を収集し、データベースを更新します。
データ集約
JavaProcessor を使用する場合、preProcess プロセス中に Redis キャッシュまたはデータベースを初期化します。
ワーカーが process メソッドを呼び出した後、ビジネスロジックに基づいて実行結果を収集します。
ワーカーが postProcess メソッドを呼び出した後、すべてのワーカーからの実行結果を集約し、キャッシュまたはデータベースを更新します。
サポートされているジョブの種類
ブロードキャストモデルは、スクリプトジョブや Java ジョブなど、複数の種類のジョブをサポートしています。Java ジョブを使用する場合、ブロードキャストモデルは、preProcess メソッドと postProcess メソッドなどの高度な機能をサポートしています。
Java ジョブを使用する場合は、JavaProcessor 1.0.7 以降を使用する必要があります。次のコードブロックはインターフェースを示しています。
必須:
public ProcessResult process(JobContext context) throws Exception
オプション:
public void preProcess(JobContext context)
オプション:
public ProcessResult postProcess(JobContext context)
preProcess メソッドは、process メソッドが呼び出される前に、各ワーカーで 1 回だけ呼び出されます。
postProcess メソッドは、すべてのワーカーが process メソッドを呼び出した後、各ワーカーで 1 回だけ呼び出されます。実行結果はワークフローに取り込むことができます。
例
@Component
public class TestBroadcastJob extends JavaProcessor {
/**
* 1 つのワーカーのみがメソッドを呼び出します。
*/
@Override
public void preProcess(JobContext context) {
System.out.println("TestBroadcastJob.preProcess");
}
/**
* すべてのワーカーがメソッドを呼び出します。
*/
@Override
public ProcessResult process(JobContext context) throws Exception {
int value = new Random().nextInt(10);
System.out.println("Total number of shards=" + context.getShardingNum() + ", Shard index=" + context.getShardingId() + ", "
+ "taskId=" + context.getTaskId() + ", value=" + value);
return new ProcessResult(true, String.valueOf(value));
}
/**
* 1 つのワーカーのみがメソッドを呼び出します。
*/
@Override
public ProcessResult postProcess(JobContext context) {
System.out.println("TestBroadcastJob.postProcess");
Map<Long, String> allTaskResults = context.getTaskResults();
Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
int num = 0;
for (Entry<Long, String> entry : allTaskResults.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
num += Integer.valueOf(entry.getValue());
}
}
System.out.println("TestBroadcastJob.postProcess(), num=" + num);
return new ProcessResult(true, String.valueOf(num));
}
}