廣播執行表示一個任務執行個體會廣播到該分組所有Worker上執行,當所有Worker都執行完成,該任務才算完成。任意一台Worker執行失敗,都算該任務失敗。
應用情境
- 批量營運
- 定時廣播所有機器運行某個指令碼。
- 定時廣播所有機器清理緩衝。
- 動態拉起每台機器的某個服務,最後由一台機器回收結果修改資料庫。
- 資料彙總
- 使用JavaProcessor,preProcess的時候初始化Redis緩衝或者資料庫。
- 每台機器執行process的時候,根據自己業務返回result。
- postProcess的時候,擷取所有機器的執行結果做匯總,更新緩衝或者資料庫。
任務類型
任務類型可以選擇多種,例如指令碼或者Java任務。如果選擇Java,還支援preProcess和postProcess進階特性。
使用Java任務需要繼承JavaProcessor(1.0.7及以上版本),介面如下:
- 必需:
public ProcessResult process(JobContext context) throws Exception - 可選:
public void preProcess(JobContext context) - 可選:
public ProcessResult postProcess(JobContext context)
preProcess會在所有機器執行process之前執行,且只會執行一次。
postProcess會在所有機器執行process且都成功執行之後執行一次,可以返回結果,作為工作流程資料轉送。
Demo樣本
@Component
public class TestBroadcastJob extends JavaProcessor {
/**
* 只有一台機器會執行
*/
@Override
public void preProcess(JobContext context) {
System.out.println("TestBroadcastJob.preProcess");
}
/**
* 所有機器會執行
*/
@Override
public ProcessResult process(JobContext context) throws Exception {
int value = new Random().nextInt(10);
System.out.println("分區總數=" + context.getShardingNum() + ", 分區號=" + context.getShardingId() + ", "
+ "taskId=" + context.getTaskId() + ", value=" + value);
return new ProcessResult(true, String.valueOf(value));
}
/**
* 只有一台機器會執行
*/
@Override
public ProcessResult postProcess(JobContext context) {
System.out.println("TestBroadcastJob.postProcess");
Map<Long, String> allTaskResults = context.getTaskResults();
Map<Long, TaskStatus> allTaskStatuses = context.getTaskStatuses();
int num = 0;
for (Entry<Long, String> entry : allTaskResults.entrySet()) {
System.out.println(entry.getKey() + ":" + entry.getValue());
if (allTaskStatuses.get(entry.getKey()).equals(TaskStatus.SUCCESS)) {
num += Integer.valueOf(entry.getValue());
}
}
System.out.println("TestBroadcastJob.postProcess(), num=" + num);
return new ProcessResult(true, String.valueOf(num));
}
}